Rustで内包表記 (List comprehension)

追伸

すごいHaskellの人が、きれいなHaskell風の内包表記のcrateを公開してくれているので、これをおすすめしたい。

comprehension - Rust

内包表記マクロ

素のRustには内包表記は用意されていないが、マクロを利用してPython風の内包表記を実現したcrateは幾つか書かれている。

cute - Rust
mapcomp - Rust

Rustには比較的強力で書きやすいハイジニックなマクロが用意されていて、構文はかなりの自由度で操作できる。ここでは、Haskell風の内包表記が欲しかったので書いてみた。

macro_rules! lc {
    [$x:expr; $a:ident <- $as:expr, $( $b:ident <- $bs:expr ),+]
        => { $as.flat_map(|$a| lc![$x; $( $b <- $bs ),+]).collect::<Vec<_>>() };
    [$x:expr; $a:ident <- $as:expr]
        => { $as.map(|$a| $x).collect::<Vec<_>>() };
    [$x:expr; $a:ident <- $as:expr, $( $b:ident <- $bs:expr ),+; $( $e:expr ),+]
        => { $as.flat_map(|$a| lc![$x; $( $b <- $bs ),+; $( $e ),+]).collect::<Vec<_>>() };
    [$x:expr; $a:ident <- $as:expr; $( $e:expr ),+]
        => { $as.flat_map(|$a| lc![$x; $( $e ),+]).collect::<Vec<_>>() };
    [$x:expr; $c:expr, $( $e:expr ),+]
        => { if $c { lc![$x; $( $e ),+] } else { vec![] } };
    [$x:expr; $c:expr] => { if $c { vec![$x] } else { vec![] } };
    [$x:expr] => { vec![$x] };
}

Schemeのsyntax-rulesを思わせる、書きやすいマクロだと思う。0回以上の繰り返しの量化子*の挙動が微妙だった (よく理解出来ていない) ので+のみを使った。

lc![x*x; x <- 1..20; x%2 == 1]
=> [1, 9, 25, 49, 81, 121, 169, 225, 289, 361]

// ラマヌジャンのタクシー数を見つける
lc![(a,b,c,d,a*a*a + b*b*b); a <- 1..20, b <- a..20, c <- 1..20, d <- c..20;
    a < c, a*a*a + b*b*b == c*c*c + d*d*d]
=> [(1, 12, 9, 10, 1729), (2, 16, 9, 15, 4104)]

Vecのiter()を使いたい

Vecのiter()をジェネレータ部分に使う時は、参照が渡るので、関数適用や比較演算などでderefやclone()が必要になることがある。そこで、最初にすべてcloned()相当の処理を行ってしまうことも考えた。効率は良くない気がする。

macro_rules! lc2 {
    [$x:expr; $a:ident <- $as:expr, $( $b:ident <- $bs:expr ),+]
        => { $as.map(|x| x.clone()).flat_map(|$a| lc2![$x; $( $b <- $bs ),+]).collect::<Vec<_>>() };
    [$x:expr; $a:ident <- $as:expr]
        => { $as.map(|x| x.clone()).map(|$a| $x).collect::<Vec<_>>() };
    [$x:expr; $a:ident <- $as:expr, $( $b:ident <- $bs:expr ),+; $( $e:expr ),+]
        => { $as.map(|x| x.clone()).flat_map(|$a| lc2![$x; $( $b <- $bs ),+; $( $e ),+]).collect::<Vec<_>>() };
    [$x:expr; $a:ident <- $as:expr; $( $e:expr ),+]
        => { $as.map(|x| x.clone()).flat_map(|$a| lc2![$x; $( $e ),+]).collect::<Vec<_>>() };
    [$x:expr; $c:expr, $( $e:expr ),+]
        => { if $c { lc2![$x; $( $e ),+] } else { vec![] } };
    [$x:expr; $c:expr] => { if $c { vec![$x] } else { vec![] } };
    [$x:expr] => { vec![$x] };
}

使ってみた例。lcではiter().cloned()するか、y.clone()または*yなどが必要な部分がある。

let v0: Vec<f64> = vec![0.1,0.2,0.3,0.5,0.8];
let v1: Vec<f64> = vec![1.41421,1.73205];
lc2![x.powf(y); x <- v0.iter(), y <- v1.iter(); x*5.0 > y]
=> [0.18219634046785238, 0.37521515374493114, 0.3010239124332578, 0.7293716698543952, 0.6794335871063408]

参考文献

マクロの書き方については、この辺りが参考になった。

Macros - The Rust Programming Language
Rustのマクロを覚える - Qiita

Haskellで名前付き引数やデフォルト引数に近いこと

名前付き引数やデフォルト引数とは

幾つかの言語では、関数の引数に名前を付けて、引数の順番ではなく名前で値を指定することができる。また、引数があたえられなかった時に使用されるデフォルトの値を設定して置くこともできたりする。

func <- function(a, b, c = 1) {
  return(a * b + c)
}

func(b = 3, a = 2)
=> 7

Haskellにはこうした機能はないが、レコード構文を使うと近しいことが実現できる。

Haskellのレコード構文

Haskellではデータ型を定義する時、以下のような記法を用いることで、フィールドに名前を付けることができる。

data MyData = MyData { first :: Int, second :: String, third :: Double }

data1 = MyData 0 "hello" 1.4
data2 = MyData { second = "hello", third = 1.4, first = 0 }

-- アクセサ関数
second data1
=> "hello"

オブジェクトの作成には、data1のような通常の記法に加えて、data2のような記法も利用できる。フィールド名を名前で指定でき、順番はバラバラでも良い。また、フィールド名と同じ名前のアクセサ関数も自動的に用意される。

data3 = data2 { first = 1, second = "world" }

first data3
=> 1

この構文を使うと、オブジェクトの一部のフィールドのみを書き換えた新しいオブジェクトを作成することもできる。

Haskellで名前付き引数とデフォルト引数

関数の引数にレコード構文で定義したデータ型を使うことで、Haskellでも名前付き引数やデフォルト引数に近いことを実現できる。

data Request = Request { repetition :: Int, message :: String }

display :: Request -> IO ()
display (Request n s) = mapM_ (print . (++ " " ++ s) . show) [1..n]

-- 名前付き引数
display Request { message = "record", repetition = 4 }

-- デフォルト引数
req0 = Request { repetition = 3, message = "hello" }
display req0 { message = "world" }

Haskellのスペースリークとプロファイリング

目次

プロファイラ

プログラムをチューニングしたい時は、どの関数のどの部分がどれだけのCPU時間やメモリを消費しているのかがわかると便利なことがある。例えば特定の関数がCPU時間の90%を使っている時に、時間をかけて他の部分をチューニングしても、プログラム全体の実行時間の削減への効果は低い。基本的にはCPUやメモリのコストの高い関数から見直していくと良い。

GHCにはプロファイラが付属しているので、CPUの消費時間やメモリの消費量を分析でき、結果をグラフ化することもできる。
7. Profiling — Glasgow Haskell Compiler 8.10.1 User's Guide

1. プロファイリングを行うためには、まず、幾つかオプションを付けて対象のプログラムをコンパイルする。

ghc -prof -fprof-auto -rtsopts -O3 -o prog prog.hs

2. 次に、RTS (ランタイムシステム) の色々なオプションを付けてプログラムを実行すると、実行時のデータを採集してまとめてくれる。

全体的なレポートを採る

./prog +RTS -p

この場合、結果はprog.profに出力される。

前回の記事で比較に使った4段4次のRunge-Kuttaのプログラムで試してみた結果 (一部) は以下のようになった。CPU時間の大半が、結果を出力する際に文字列化して整形する部分に使われてしまっているのがわかる。rk44は計算が軽いので仕方ないかもしれないが、メインの計算部分より重いのは残念な気がする。

	Sat Dec  1 10:03 2018 Time and Allocation Profiling Report  (Final)

	   rk44 +RTS -p -RTS

	total time  =        0.25 secs   (248 ticks @ 1000 us, 1 processor)
	total alloc = 459,835,272 bytes  (excludes profiling overheads)

COST CENTRE MODULE SRC                       %time %alloc

toString    Main   rk44.hs:28:1-50            72.6   83.0
writeLists  Main   rk44.hs:41:1-54             9.3    9.3
rk44.kf     Main   rk44.hs:56:9-79             5.2    4.8
rk44.ss     Main   rk44.hs:57:9-44             3.6    1.0
rk44.kf.\   Main   rk44.hs:56:58-67            2.4    0.3
lorentz     Main   rk44.hs:(174,1)-(179,49)    2.0    0.1
rk44        Main   rk44.hs:(49,1)-(58,66)      2.0    0.6


                                                                                    individual      inherited
COST CENTRE       MODULE                SRC                      no.     entries  %time %alloc   %time %alloc

MAIN              MAIN                  <built-in>               113          0    0.0    0.0   100.0  100.0
 CAF              Main                  <entire-module>          225          0    0.0    0.0    95.6   99.9
  main            Main                  rk44.hs:319:1-19         226          1    0.0    0.0    95.6   99.9
   lorentz        Main                  rk44.hs:(174,1)-(179,49) 227          1    1.6    0.1    95.6   99.9
    lorentz.file  Main                  rk44.hs:175:7-26         230          1    0.0    0.0     0.0    0.0
    lorentz.fps   Main                  rk44.hs:176:7-64         233          1    0.4    0.0    16.1    7.5
     l_dxdt       Main                  rk44.hs:170:1-35         248      39996    0.0    0.0     0.0    0.0

メモリ消費を詳しく調べる

./prog +RTS -[h,hm,hd,hy,hr,hb]

結果はprog.profやprog.hpに出力される。グラフにしたい場合は結果をpsに変換するツールもある。

hp2ps -c prog.hp

pdfにする場合はさらに変換する。

ps2pdf prog.ps prog.pdf

rk44で試してみた結果は以下のようになった。

-h コストセンタ
f:id:lqtmirage:20181201192311p:plain:w400

-hm モジュール
f:id:lqtmirage:20181201192314p:plain:w400

-hd コンストラク
f:id:lqtmirage:20181201102931p:plain:w400

-hy
f:id:lqtmirage:20181201102926p:plain:w400

-hr リテイナー
誰がオブジェクトを保持しているのかを説明してくれる。あるオブジェクトに到達できるポインタを直接持っているオブジェクト。
7. Profiling — Glasgow Haskell Compiler 8.10.1 User's Guide
f:id:lqtmirage:20181201102929p:plain:w400

-hb 履歴

  • lag : 生成後、使用待ちの状態
  • use : 最初の使用後で、最後の使用前の状態
  • drag : 最後の使用後で、最後の参照が消える前の状態
  • void : 1度も使用されなかったオブジェクト

7. Profiling — Glasgow Haskell Compiler 8.10.1 User's Guide
f:id:lqtmirage:20181201102939p:plain:w400

スペースリークなどへの対応

GHCはガベージコレクタを持っているので、メモリについて意識する必要はあまりないが、プログラムによってはサンク (遅延評価による未評価の値) が蓄積してスペースリークと呼ばれるメモリの浪費が起きることがある。メモリが破壊されることはまずないと思うが、メモリリークと同様の現象は起きてしまう。また、状況によっては (パフォーマンスを落とさずに) メモリの消費量を一定以下に抑えたいという場合もあるかもしれない。そんな時はプロファイラを使ってどこでサンクが蓄積されているのかを調べた上で、プログラムの構成を見直したり、単に、部分的にあるいは全体的に正格評価すれば良い場合もある。

末尾再帰への書き換え

関数の再帰呼び出し後に何らかの処理が残されているような再帰関数で、コンパイラの最適化が効かなかった時、サンクが蓄積されてしまうことがある。再帰後の処理結果を引数に織り込むなどして、再帰呼び出し後に処理が残らない形に書き変えると、再帰はループに置き換え可能なので、スペースリークを回避できるかもしれない。

-- 末尾再帰になっていない例 (これくらいなら最適化が効くかもしれない)
fact :: Integer -> Integer
fact n | n < 2 = 0
       | otherwise = n * fact (n - 1)

-- 末尾再帰に書き換えた例
factT :: Integer -> Integer
factT n = f n 1
  where f m t | m < 2 = t
              | otherwise = f (m - 1) (m * t)

実際のリスト生成の回避

計算の途中でリストが生成されていると考えるとわかりやすい処理があり、コード上もそのように表現されたとしても、実際にはリスト生成を必要としない場合がある。より抽象的なリスト操作関数を使ったりして気をつけると、コンパイラが最適化して不必要なリスト生成を回避してくれることがある。

-- いずれも実際にはリストは生成されないはず
sum [1..1000000]
foldl1 (*) [1..10000]

-- リスト生成させる
(product . reverse) [1..10000]

正格評価版$!とseq

関数適用演算子$には正格評価版$!があり、引数を弱頭部正規形 (WHNF) まで評価してから関数適用する。また、a `seq` bのようにseq関数を適用すると、aを評価してからbを返す。言語標準の機能。

f $! p q r
a `seq` b `seq` (a, b)

Prelude

正格フィールド

データ型を定義する時、値コンストラクタのフィールドの型名の前に!をつけると、引数が評価されてから値がつくられるようになる。こちらも言語標準の機能。

data StrictData = StrictData !Integer !String

BangPatterns

また、BangPatternsプラグマを指定すると、任意の変数束縛時に、変数名の前に!を付けることで個々の変数を評価することができるようになる。
9.1. Language options — Glasgow Haskell Compiler 8.10.1 User's Guide

StrictとStrictData

GHC8.0.2以降では、StrictDataプラグマを指定すると、値コンストラクタの引数は自動的に!が付けられた状態になる。また、Strictプラグマではこれに加えて、関数やwhere、letなどの変数束縛時にもデフォルトで評価が行われるようになる。逆に遅延評価したい場合に変数の前に~を付ける。

{-# LANGUAGE Strict #-}
{-# LANGUAGE StrictData #-}

同様の効果は-XStrictData、-XStrictフラグでも得られる。
9.1. Language options — Glasgow Haskell Compiler 8.10.1 User's Guide

Haskellで数値計算 常微分方程式の陰的解法編

以前の記事で陽的Runge-Kutta法を書いたことがあったが、今回は常微分方程式の初期値問題を陰的Runge-Kutta法で解いてみたい。

Implicit Runge-Kutta法

常微分方程式に初期値があたえられた時には、微分係数を使って端から数値的に積分していけば、(数値誤差の範囲では) 元の関数を得ることができる。この時、それまでのステップの計算値のみを使って次のステップの値を得る陽的な解法と、次のステップの値も使って次のステップの値を得る陰的な解法がある。同じ刻み幅に対しては陰的な解法の方が安定で、特に、硬い方程式に対しては陰的な解法が必要とされる場合があるが、陰的な解法ではステップごとに非線形方程式を解く必要が生じる。

陰的Runge-Kutta法の式と係数行列は以下の通り。

 t_{n+1} = t_n + h
 {\bf x}_{n+1} = {\bf x}_n + h \sum_i b_i {\bf k}_i
 {\bf k}_i = {\bf f}(t_n + c_i h, {\bf x}_n + h \sum_j a_{ij} {\bf k}_j)

 c_i  a_{ij}
 b_i

1段2次

 \frac{1}{2}  \frac{1}{2}
 1

2段4次

 \frac{3 - \sqrt{3}}{6}  \frac{1}{4}  \frac{3 - 2\sqrt{3}}{12}
 \frac{3 + \sqrt{3}}{6}  \frac{3 + 2\sqrt{3}}{12}  \frac{1}{4}
 \frac{1}{2}  \frac{1}{2}

3段6次

 \frac{5 - \sqrt{15}}{10}  \frac{5}{36}  \frac{10 - 3\sqrt{15}}{45}  \frac{25 - 6\sqrt{15}}{180}
 \frac{1}{2}  \frac{10 + 3\sqrt{15}}{72}  \frac{2}{9}  \frac{10 - 3\sqrt{15}}{72}
 \frac{5 + \sqrt{15}}{10}  \frac{25 + 6\sqrt{15}}{180}  \frac{10 + 3\sqrt{15}}{45}  \frac{5}{36}
 \frac{5}{18}  \frac{4}{9}  \frac{5}{18}

ここではhmatrixの逆行列関数 (LU分解による) を使ってNewton-Raphson法で連立非線形方程式を解くことで、陰的Runge-Kutta法で連立常微分方程式を解いてみる。ほとんどの部分で普通のリストを使っているが、数本の連立方程式で10000ステップ程度を解くなら速度的にも許容できる範囲 (3段6次でも数秒程度) に感じた。速度が必要な場合はvectorパッケージのData.Vector.UnboxedとLLVMのバックエンド (GHCで-fllvmオプション) を使ってみても良いかもしれない。

Numeric Haskell: A Vector Tutorial - HaskellWiki

import Data.List (transpose, unfoldr)
import Data.List.Split (chunksOf)
import qualified Numeric.LinearAlgebra as L

-- 引数をデータ型にまとめる
data Ces = Ces { c :: [Double], b :: [Double], as :: [[Double]], s :: Int }
type Lfs = [[Double] -> Double]
data Ps = Ps { dfs :: Lfs, h :: Double, term :: Double, lim :: Int, err :: Double }

-- 連立非線形方程式のソルバー
delta = 1.0e-9 :: Double -- difference for derivative

derivative :: ([Double] -> Double) -> [Double] -> [Double]
derivative f xs = zipWith dfdx xs [0..]
  where dfdx x i       = (f (replace xs i $ x + delta) - f xs) / delta
        replace xs i x = let (l, _:r) = splitAt i xs in l ++ (x : r)

newtonRaphson :: Int -> Double -> Lfs -> [Double] -> Maybe [Double]
newtonRaphson lim err fs xs
  | lim < 1           = Nothing
  | L.norm_2 fx < err = Just xs
  | otherwise         = newtonRaphson (lim + 1) err fs sx
  where sx = L.toList $ L.vector xs - (L.inv j L.#> fx)
        fx = L.vector $ map ($ xs) fs
        j  = L.fromLists $ map (`derivative` xs) fs

-- Implicit Runge-Kuttaのソルバーの生成器
genRungeKutta :: Ces -> Ps -> [Double] -> Maybe ([Double], [Double])
genRungeKutta (Ces cec ceb ceas s) (Ps dfs h term lim err) st@(t:xs)
  | t > term  = Nothing
  | otherwise = do ks <- newtonRaphson lim err kfs ks0
                   return (st, (t + h):xsf ks ceb)
  where ks0        = concat . transpose $ replicate s xs 
        kfs        = concat $ zipWith3 kfi cec ceas [0..]
        kfi c as i = zipWith kf dfs [0..]
          where kf f n ks = let l  = (t + h * c):xsf ks as
                                k0 = ks !! (i * length dfs + n)
                            in f l - k0
        xsf ks cs  = zipWith csx xs $ transpose $ chunksOf (length xs) ks
          where csx x rks = x + h * sum (zipWith (*) cs rks)

-- 係数行列
ce36 = Ces {
  c  = [(5.0 - sqrt 15.0)/10.0, 0.5, (5.0 + sqrt 15.0)/10.0],
  b  = [5.0/18.0, 4.0/9.0, 5.0/18.0],
  as = [[5.0/36.0, (10.0 - 3.0*sqrt 15.0)/45.0, (25.0 - 6.0*sqrt 15.0)/180.0],
        [(10.0 + 3.0*sqrt 15.0)/72.0, 2.0/9.0, (10.0 - 3.0*sqrt 15.0)/72.0],
        [(25.0 + 6.0*sqrt 15.0)/180.0, (10.0 + 3.0*sqrt 15.0)/45.0, 5.0/36.0]],
  s  = 3 }

ce24 = Ces {
  c  = [(3.0 - sqrt 3.0)/6.0, (3.0 + sqrt 3.0)/6.0],
  b  = [1.0/2.0, 1.0/2.0],
  as = [[1.0/4.0, (3.0 - 2.0*sqrt 3)/12.0 ],
        [(3.0 + 2.0*sqrt 3.0)/12.0, 1.0/4.0]],
  s  = 2 }

ce12 = Ces {
  c  = [1.0/2.0],
  b  = [1.0],
  as = [[1.0/2.0]],
  s  = 1 }

-- ソルバーの生成
rk36 = genRungeKutta ce36
rk24 = genRungeKutta ce24
rk12 = genRungeKutta ce12

1段2次、2段4次、3段6次の係数を用意した。Newton-Raphsonが収束しない可能性を考えてMaybeにしたので、Runge-Kuttaもunfoldrでリスト生成するようにした。最近のunfoldrはFusionが効率的に展開してくれるらしい。hmatrixを使う部分でVectorとリストの変換が入っていたり、陰的Runge-Kuttaの中間係数の計算部分で任意個の方程式を連立する必要があったりして、少しごちゃごちゃしてしまった気がする。termを関数にして、もっと細かい終了条件をあたえてもいいかもしれない。

performance - Efficiency of unfoldr versus zipWith - Stack Overflow

色々な方程式を解いてみよう

可視化にはScheme系のLisp方言のRacketを使ってみた。

https://racket-lang.org

-- 出力のための関数
toString :: Show a => [a] -> String
toString = (++ "\n") . concatMap ((++ " ") . show)

writeLists :: Show a => String -> [[a]] -> IO ()
writeLists file = appendFile file . concatMap toString

-- 方程式を解く例
l_dxdt [t,x,y,z] = -10.0*x + 10.0*y
l_dydt [t,x,y,z] = -x*z + 28.0*x - y
l_dzdt [t,x,y,z] = x*y - 8.0/3.0 * z

lorentz solver = do
  let file = "lorentz.txt"
      sps  = solver $ Ps [l_dxdt,l_dydt,l_dzdt] 0.01 100.0 100 1.0e-9
  writeFile file "t x y z \n"
  writeLists file $ unfoldr sps [0.0,1.0,1.0,1.0]

main :: IO ()
main = lorentz rk36

硬い方程式

方程式に急激に変化する成分と緩やかに変化する成分が混じっている時、急激な方の変化部分に合わせると刻み幅を小さくしなければならないが、緩やかな方の変化部分を全て計算するためには長い時間がかかってしまう。陽的な解法でも刻み幅を誤差に応じて変化させることで、ある程度対応することもできるが、場合によっては必要な刻み幅が浮動小数表現の精度を超えてしまうこともある。そうした硬い方程式は、陽的な解法で解けない場合でも、陰的な解法を使うと上手く積分できることもある。

微分方程式系の硬さを示す指標として、係数行列の固有値の最大値と最小値の比 (の絶対値) があげられる。

 {\bf y'} = A{\bf y}

これを元に、割と硬そうな方程式系を作って解いてみよう。

 \frac{dx}{dt} = 2x + 3y
 \frac{dy}{dt} = -14400x - 11900y

陰的Runge-Kutta (3段6次) では10.0という非常に大きな刻み幅でさえ、振動しているものの積分はでき、この場合では右端では正しい答えに収束して行っている。一般的には許容誤差に応じて刻み幅を変化させた方が良いが、ある程度小さな固定刻み幅を取れば十分な場合もある。

f:id:lqtmirage:20181127013307p:plain:w400

陽的Runge-Kutta (4段4次) では刻み幅を0.001まで小さくしても積分が発散してしまい、0.0001というとても小さな刻み幅 (固定刻みの場合) を取らなければならなかった。

f:id:lqtmirage:20181127013311p:plain:w400

こんな風に、陰的な解法では刻み幅を比較的大きく取ることができ、硬い方程式も解けることがある。ただ、ステップごとの計算量は陽的な解法の方が少ないし、陽的な解法でも刻み幅を変えればある程度対応できることもあるので、一概にどちらが優れているとは言えない。実際には解きたい方程式系の特性に応じて様々な解法を使い分けた方が良い。科学的な検証に用いる場合などは、本当に目的の方程式が解けているのか、方程式や解法の特性を踏まえて何れにせよよく考えなければならない。

アトラクタなど

この辺りは特にImplicitに解く必要はないかもしれないが、とりあえず微分方程式を解いて綺麗な絵を描いてみたい。

  • スパイラルサドル (相空間の特徴)

f:id:lqtmirage:20181127014245p:plain:w400

  • フィッツフュー南雲方程式

矩形の入力を入れてみた様子
f:id:lqtmirage:20181127091521p:plain:w400
閾値を超えた固定の入力でリミットサイクルがまわる様子
f:id:lqtmirage:20181127091517p:plain:w400
リミットサイクルに引き込まれていく様子
f:id:lqtmirage:20181130092646p:plain:w400

f:id:lqtmirage:20181127014233p:plain:w400
f:id:lqtmirage:20181127014237p:plain:w400
f:id:lqtmirage:20181127014242p:plain:w400

f:id:lqtmirage:20181127015748p:plain:w400

面白い常微分方程式については、いずれもっと中身のある解説を書いてみたい。次回は偏微分方程式を解いてみようと思う。

Vimのおすすめプラグイン紹介

プラグインマネージャ

vim-plug
https://github.com/junegunn/vim-plug

Vimではプラグインのインストールや読み込みの管理などを行うプラグインマネージャが色々と開発されている。ここでは導入と.vimrcへの記述が簡単で比較的軽いのでvim-plugをおすすめしたい。公式サイトのスクリプトを実行して.vimファイルを~/.vim/autoload/にコピーする。

curl -fLo ~/.vim/autoload/plug.vim --create-dirs \
    https://raw.githubusercontent.com/junegunn/vim-plug/master/plug.vim

プラグインのインストールは.vimrcに以下のような記述を追加して、:PlugInstallを実行する。:PlugStatusでそれぞれのプラグインの状態を確認できる。

call plug#begin('~/.vim/plugged')

Plug 'プラグインのレポジトリ名など'

call plug#end()

プラグインを探す

vim awesome
https://vimawesome.com/

vim awesomeができてプラグインの検索や導入はとても楽になった。vim awesomeではプラグインがダウンロード数でソートされて表示され、カテゴリーや名前でフィルタをかけたりもできるし、各プラグインマネージャでのインストール方法もその場で確認できる。

おすすめプラグイン

コード作成時の使用感が格段に向上する、1度は試してみて欲しいプラグインを以下に紹介したい。

vim-airline

f:id:lqtmirage:20180917012104p:plain

ステータスバーにモードやファイルタイプ、カーソル位置など、色々な情報を綺麗に表示してくれる。

https://vimawesome.com/plugin/vim-airline-superman
https://vimawesome.com/plugin/vim-airline-themes

Plug 'vim-airline/vim-airline'
Plug 'vim-airline/vim-airline-themes'

YouCompleteMe

f:id:lqtmirage:20180917012835p:plain

インクリメンタルに補完候補を表示してくれる。候補はtabや矢印で選択できる。
vim-plugでインストール後に~/.vim/plugged/youcompleteme/install.pyまたはinstall.shを実行する必要がある。

https://vimawesome.com/plugin/youcompleteme

Plug 'valloric/youcompleteme'

ALE

f:id:lqtmirage:20190526000007p:plain

非同期に文法や記法のチェックを行ってくれる。大抵の言語のメジャーな処理系やlintツールには最初から対応している。自分でlinterを選ぶこともできる。

https://vimawesome.com/plugin/ale

Plug 'w0rp/ale'

fzf / fzf.vim

f:id:lqtmirage:20180917015259p:plain

曖昧な検索ができるfzfをVimからも使えるようにする。
以下はすでにインストールされているfzfを使う場合。

https://vimawesome.com/plugin/fzf-vim

Plug '/usr/local/opt/fzf'
Plug 'junegunn/fzf.vim'

Haskellで数値計算 線形代数編

BLASLAPACK

数値計算では、連立線形方程式に帰着させられる問題はとても多い。また、固有値問題は分野を越えて応用範囲が広い。そのため、これらの問題を扱うための線形代数ライブラリは古くから活発に開発されてきた。中でも代表的なものは基礎的な行列、ベクトル演算を行うBLASと、その上で線形方程式系や最小2乗問題、固有値問題などを解くLAPACKで、幾つかの実装では最適化されており高速に動作するので、多くの言語のライブラリでも背景で利用されている。

HaskellでもhmatrixパッケージがBLAS/LAPACKバインディングを提供してくれている。BLAS/LAPACKの関数の呼び出しで処理出来る問題の場合、自分の実装でこの速度を超えるのは難しいので、出来るだけ利用して行きたい。

目次

hmatrixの使い方

インストール

Haskellを使うのが初めての場合、まずはビルドツールstackから環境を構築するのがおすすめ。stack自体のインストールはこちらを参照。
Install/upgrade - The Haskell Tool Stack

hmatrixパッケージをインストールする。色々なパッケージを使うようになっても、プロジェクトごとに依存関係をstackが解決してくれる。

$ stack update
$ stack install hmatrix

cabalを直接使う場合は以下の通り。環境によってはsudoが必要。

$ cabal update
$ cabal install hmatrix

シェルでghciを起動して、:mでモジュールを読み込める。またファイルに書いたコードは、:lでロードできる。

$ stack ghci
> :m Numeric.LinearAlgebra
> :set prompt "> " -- promptの文字列を変えている ~/.ghciに書いてもよい
> :l [file]

この記事ではファイルに書いたコードをghciから:lでロードし、> に続くコードを実行して試していくことを想定している。main関数を持つファイルはghcコンパイルして実行可能バイナリにもできる。

モジュールのimport。

import Numeric.LinearAlgebra
import Numeric.LinearAlgebra.Data

-- ghci
> :m Numeric.LinearAlgebra
> :m Numeric.LinearAlgebra.Data

lts-12.0からPreludeの(<>)という同名の関数と競合するようになってしまったので、Numeric.LinearAlgebraの方をqualified importするか、Preludeの方をhidingする必要がある。この記事では簡単のため、Preludeの方をhidingする。古いltsを使っている場合は、この行を消せば動くはず。

-- 名前空間を分ける
-- Numeric.LinearAlgebraの関数はL.付きで呼ぶ必要がある
import qualified Numeric.LinearAlgebra as L

-- Preludeの方を隠す
import Prelude hiding ((<>))

MatrixやVectorの作成と基本的な演算

主な関数を表にまとめてみた。詳細はパッケージのドキュメントを参照。
hmatrix: Numeric Linear Algebra

リストからの生成やリストへの変換

関数 説明
(><) Int -> Int -> [a] -> Matrix a 行数と列数を指定する
fromLists [ [a] ] -> Matrix a
toLists Matrix a -> [ [a] ]
vector [a] -> Vector a
fromList [a] -> Vector a
toList Vector a -> [a]
flatten Matrix a -> Vector a 行ベクトルを1列につなげる
reshape Int -> Vector a -> Matrix a 列数を指定する
toRows/Columns Matrix a -> [Vector a]
fromRows/Columns [Vector a] -> Matrix a
rand Int -> Int -> IO (Matrix Double) [0,1)一様分布擬似乱数
randn Int -> Int -> IO (Matrix Double) ガウス分布擬似乱数

基本的な2項演算

演算 左辺値 演算子 右辺値
 \bf{u} \pm \bf{v} Vector + - Vector
 \bf{u} \cdot \bf{v} Vector <.> Vector
 \bf{u} \times \bf{v} Vector cross Vector
 A \pm B Matrix + - Matrix
 A \bf{u} Matrix #> Vector
 {\bf u} A Vector <# Matrix
 A B Matrix <> Matrix
 A \otimes B Matrix kronecker Matrix

要素ごとの操作など

操作 関数 説明
要素ごとの関数適用 cmap (e -> b) -> c e -> c b
indexでのアクセス ! 行列の場合は行ベクトルに
行列のスライス ?? 行と列の位置などを指定する
m ?? (Pos (idxs [3,4]), Pos (idxs [1,0]))
3行と4行、1列と0列を取る
m ?? (Take 3, Drop 2)
3行分を取り、2列分を捨てる
最大値
最大値のindex
maxElement
maxIndex
最小値
最小値のindex
minElement
minIndex
昇順にソート
その位置の値のソート後のindex
sortVector
sortIndex
L1/L2/L \inftyノルム (Vector)
作用素ノルム (Matrix)
norm_1
norm_2
norm_Inf
 \mid\mid {\bf x} \mid\mid_p = \left( \sum_i \mid x_i \mid^p \right)^{1/p}
 \mid\mid A \mid\mid = \sup_{\mid\mid {\bf x} \mid\mid_p = 1} \mid\mid A {\bf x} \mid\mid_p
この行列で写された任意の単位ベクトルのノルムの上限
フロベニウスノルム norm_Frob  \mid\mid A \mid\mid_F = \sqrt{\sum_{i} \sum_{j} \mid a_{ij} \mid^2}
行列式 det
対角成分をVector takeDiag

特殊な行列の生成など

行列 関数 説明
 I_n ident n n次単位行列
 a_{ij} = \delta_{ij} x_i diag x 対角成分から行列をつくる
 ^t A tr 転置行列
 A^{-1} inv 逆行列
 (A + ^t A) / 2 sym 対称行列をつくる
 ^t AA mTm 転置行列との積

3次元の回転行列を使って座標を回転させてみる。

import Prelude hiding ((<>))
import Numeric.LinearAlgebra

rotateX :: Double -> Matrix Double
rotateX t = (3><3) [1.0,   0.0,    0.0,
                    0.0, cos t, -sin t,
                    0.0, sin t,  cos t]

rotateY :: Double -> Matrix Double
rotateY t = (3><3) [ cos t, 0.0, sin t,
                       0.0, 1.0,   0.0,
                    -sin t, 0.0, cos t]

rotateZ :: Double -> Matrix Double
rotateZ t = (3><3) [cos t, -sin t, 0.0,
                    sin t,  cos t, 0.0,
                      0.0,    0.0, 1.0]

> (rotateX pi <> rotateY pi <> rotateZ pi) #> vector [1.0,1.0,1.0]
[1.0000000000000004,1.0,0.9999999999999998]

連立線形方程式

連立線形方程式は行列を使って A \bf{x} = \bf{b}のように書けるので、左から A^{-1}を掛ければ解くことが出来るが、とても小さな行列以外では、クラメルの公式で逆行列を計算するのはコストが高い。そこで、 Aを扱いやすい行列の積に分解したり、要素の反復計算で漸近するなど色々な工夫が行われている。

hmatrixで用意されている主な線形ソルバは以下の通り。基本的に行列を特殊な行列の積に分解することで解いてくれる。引数は \bf{x} \bf{b}を行列化した AX = Bの形であたえられる。行列分解を1度計算しておけば、同じ Aに対する複数の \bf{x} \bf{b}の組み合わせを効率的に解くことができる。

関数 対象 説明
<\> 優決定系/劣決定系を含む 汎用線形ソルバ
式が多すぎる系や足りない系でもノルムを最小にするような解を計算してくれる
linearSolve 正則行列 正則でない場合を考慮してMaybeを返す
中身はluSolve
luSolve 正則行列  AをLU分解 (luPacked) して渡す
1000  \times 1000程度までの小さな正則行列なら基本的にこれを使えば問題ない
ldlSolve エルミート行列
(実対称行列を含む)
 AをLDL分解 (ldlPacked) して渡す
修正コレスキー分解
制約があるが速い
cholSolve 正定値のエルミート行列  Aをコレスキー分解 (chol) して渡す
制約があるが速い
triSolve 上三角行列または下三角行列 制約があるがとても速い
QR分解 (qr) した上三角行列も解ける
 QR{\bf x} = {\bf b}
 R{\bf x} = ^t Q {\bf b}
triDiagSolve 三重対角行列 制約があるがとても速い
cgSolve 正定値のエルミート行列
疎行列を想定したindex表現であるGMatrixであたえる
共役勾配法 (反復法の一種)
状況によってはとても速いが収束しないこともある
大規模な疎行列の時有利

エルミート行列は転置して複素共役を取ると自身と一致するような行列。虚部がすべて0なら実対称行列になる。正定値行列は固有値がすべて正の実数であるような行列。基本的に対象となる行列への制約が強いソルバほど速いので、解こうとしている行列が制約をみたすなら対応するソルバを使うと良い。

乱数で初期化した1000  \times 1000行列をLU分解を使って解いて時間を測ってみる。簡単のためGHCのStrict拡張で正格評価にしている。

{-# LANGUAGE Strict #-}

import Prelude hiding ((<>))
import System.CPUTime
import Numeric.LinearAlgebra

randomLU :: Int -> IO ()
randomLU n = do
  -- 乱数で初期化
  a <- rand n n
  b <- rand n 1
  -- 時間を計測
  start <- getCPUTime
  let x = luSolve (luPacked a) b
  end <- getCPUTime
  -- 誤差を計算
  print $ fromIntegral (end - start) / 1e12
  print $ norm_2 (a <> x - b) / norm_2 b

> randomLU 1000
8.3574e-2
3.1152329739293184e-13

おそらくLAPACKの関数の実行速度が出ている。LU分解ではどの言語のどのような実装でも並列化しなければこれ以上速くするのは難しい気がする。さらに速度を求める場合は、(収束性を考慮する必要が出てくるが) 共役勾配法やSOR法のような反復法を使ったほうが良いかもしれない。

参考文献

連立線形方程式の数値解法についてはこの辺りがわかりやすかった。

http://ri2t.kyushu-u.ac.jp/~watanabe/RESERCH/MANUSCRIPT/KOHO/GEPP/GEPP.pdf
http://nkl.cc.u-tokyo.ac.jp/16n/SolverDirect.pdf
http://nkl.cc.u-tokyo.ac.jp/16n/SolverIterative.pdf
Eigen: Linear algebra and decompositions
Eigen: Benchmark of dense decompositions
Eigen: Solving Sparse Linear Systems

固有値固有ベクトル

 A \bf{x} = \lambda \bf{x} ( \bf{x} \neq \bf{0}) を満たすような \lambda固有値といい、 \bf{x}固有ベクトルという。 A n \times n行列の時、複素数値も含めて、 n個の固有値と対応する n個の固有ベクトルが存在する (重解でなければ) 。これらをすべて求めるのが固有値問題

関数 対象 説明
eig 一般 固有値Vector固有ベクトルのMatrixをタプルで返す
eigenvalues 一般 固有値のみを求める場合
eigSH エルミート行列 制約があるが速い
eigenvaluesSH エルミート行列 固有値のみを求める場合
geigSH エルミート行列の一般化固有値問題  A{\bf x} = \lambda B {\bf x}
 A Bはエルミート行列、 Bは正定値

主成分分析を固有値問題として解いてみる。

import Prelude hiding ((<>))
import Numeric.LinearAlgebra
import Numeric.LinearAlgebra.Data

pca :: Matrix Double -> Vector Double
pca x = let n = fromIntegral $ cols x
            -- 各成分の平均
            avg = vector . map ((/n) . sumElements) $ toRows x
            -- 中心化
            ctr = fromColumns . map (+ (-avg)) $ toColumns x
            vcm = cmap (/n) $ ctr <> tr ctr
            -- 共分散行列の固有値と固有ベクトルを求める
            (s, w) = eigSH . trustSym $ vcm
            -- 分散が最大になる方向を選択する
        in toColumns w !! maxIndex s

randomPca :: Int -> Int -> IO ()
randomPca l m = do
  -- l次元の成分を持つ標本をm個つくる
  x <- rand l m
  let v = pca x
  -- 第1主成分ベクトル
  print v
  -- 第1主成分軸方向への射影
  print . map (<.> v) $ toColumns x

> randomPca 3 10

1000  \times 1000行列について、eig関数の処理時間も測ってみる。基本が非正格評価なので式ごとの処理時間の計測などは簡単ではなさそうなのだが、GHCのStrict拡張やseq関数などを使うと、妥当そうな結果が得られることもある。

randomEigen :: Int -> IO ()
randomEigen n = do
  a <- rand n n

  start <- getCPUTime
  let (s, w) = eig a
  end <- seq w getCPUTime

  print $ fromIntegral (end - start) / 1e12
  print $ norm_2 (complex a <> w - w <> diag s) / norm_2 s

> randomEigen 1000
2.90387
2.2744873540326555e-15

参考文献

固有値問題の数値解法についてはこの辺りがわかりやすかった。

http://nalab.mind.meiji.ac.jp/~mk/labo/text/eigenvalues.pdf
http://www.r-ccs.riken.jp/r-ccssite/wp-content/uploads/2014/03/sps14_enshu2-1.pdf
Eigen: Catalogue of dense decompositions

numpyやEigenとの比較

同様の扱いやすいライブラリであるPythonのnumpyやC++のEigenについてもサンプルコードを書いてみた。また、関数呼び出し部分のみ手元の環境での実行時間を測ってみた。100回計測して平均を取っている。

線形方程式系
1000  \times 1000
線形方程式系
200  \times 200
固有値問題
200  \times 200
説明
hmatrix 0.08489044s 0.00156016s 0.05794971s 並列化しないなら
repaより速い
numpy 0.0847463s 0.0014532s 0.0577511s 処理はできるだけ
numpyで完結したい
Eigen 0.0618023s 0.00073476s 0.0619551s 速度的にはBoost
よりEigenが良い

環境によって多少の前後はあるかもしれないが、すべて同等の速度だと見ていいと思う。numpyは後ろでLAPACKを使っていた気がするので当たり前の結果かもしれない。Eigenは選択できる模様。

Eigen: Using BLAS/LAPACK from Eigen

小さなコードでは書きやすさもあまり変わらないが、大きなアプリケーションになった時はどうだろうか。計測に使用したコードは以下の通り。

hmatrix

{-# LANGUAGE Strict #-}

import Prelude hiding ((<>))
import Numeric.LinearAlgebra
import System.CPUTime
import System.Environment (getArgs)

randomLU :: Int -> IO Double
randomLU n = do
  a <- rand n n
  b <- rand n 1

  start <- getCPUTime
  let x = luSolve (luPacked a) b
  end <- seq x getCPUTime

  let t = fromIntegral (end - start) / 1e12
  print t
  print $ norm_2 (a <> x - b) / norm_2 b
  return t

randomEigen :: Int -> IO Double
randomEigen n = do
  a <- rand n n

  start <- getCPUTime
  let (s, w) = eig a
  end <- seq w getCPUTime

  let t = fromIntegral (end - start) / 1e12
  print t
  print $ norm_2 (complex a <> w - w <> diag s) / norm_2 s
  return t

main :: IO ()
main = do
  [n,r] <- map read <$> getArgs
  t <- mapM randomLU $ replicate r n
--  t <- mapM randomEigen $ replicate r n
  print $ sum t / fromIntegral r

$ stack ghc -- -o lh -O3 la.hs
$ ./lh 1000 100

numpy

import numpy as np
import numpy.linalg as la
import sys
import time

def randomSLE(rank):
    a = np.random.rand(rank, rank)
    b = np.random.rand(rank)

    start = time.process_time()
    x = la.solve(a, b)
    end = time.process_time()

    t = end - start
    print(f'duration: {t:.7f}s')
    print(la.norm(np.dot(a, x) - b) / la.norm(b))
    return t

def randomEigen(rank):
    a = np.random.rand(rank, rank)

    start = time.process_time()
    s, w = la.eig(a)
    end = time.process_time()

    t = end - start
    print(f'duration: {t:.7f}s')

    d = np.matmul(a, w) - np.dot(w, s)
#    d = np.matmul(a, w) - np.matmul(w, np.dot(np.identity(rank), s))
    print(la.norm(d) / la.norm(s))
    return t

rank = int(sys.argv[1])
repeat = int(sys.argv[2])

t = 0
for i in range(repeat):
    t += randomSLE(rank)
#    t += randomEigen(rank)

print(f'average: {t / repeat:.7f}s')

$ python3 la.py 1000 100

Eigen

#include <chrono>
#include <iostream>
#include <Eigen/Dense>

#define NDEBUG
#define EIGEN_NO_DEBUG

using namespace Eigen;

double randomLU(const int rank) {
  MatrixXd a = MatrixXd::Random(rank, rank);
  MatrixXd b = MatrixXd::Random(rank, 1);

  const auto start = std::chrono::system_clock::now();
  MatrixXd x = a.lu().solve(b);
  const auto end = std::chrono::system_clock::now();

  const double elapsed = std::chrono::duration<double>(end - start).count();
  std::cout << "duration: " << elapsed << "s" << std::endl;

  MatrixXd d = a * x - b;
  std::cout << d.norm() / b.norm() << std::endl;

  return elapsed;
}

double randomEigen(const int rank) {
  MatrixXd a = MatrixXd::Random(rank, rank);

//  ComplexEigenSolver<MatrixXd> es;
  EigenSolver<MatrixXd> es;

  const auto start = std::chrono::system_clock::now();
  es.compute(a);
  MatrixXcd s = es.eigenvalues();
  MatrixXcd w = es.eigenvectors();
  const auto end = std::chrono::system_clock::now();

  const double elapsed = std::chrono::duration<double>(end - start).count();
  std::cout << "duration: " << elapsed << "s" << std::endl;
  MatrixXcd d = a * w - w * s.asDiagonal();
  std::cout << d.norm() / s.norm() << std::endl;

  return elapsed;
}

int main(int argc, char *argv[]) {
  const int rank = std::atoi(argv[1]);
  const int repeat = std::atoi(argv[2]);

  double t = 0.0;
  for (int i = 0; i < repeat; i++) {
    t += randomLU(rank);
//    t += randomEigen(rank);
  }
  std::cout << "average: " << t / repeat << "s" << std::endl;

  return 0;
}

$ clang++ -Ofast -march=native -std=c++14 la.cpp -o lc
$ ./lc 1000 100

Free Monadで言語内DSLをつくろう

抽象構文木の構築とその評価の分離

インタプリタコンパイラでは最初に、コードの文字列を構文解析して抽象構文木 (AST) を構築する。その表現形式はDAGやSSAなどに変化して行くことも多いが、基本的にはこれらの中間表現に対して様々な最適化がほどこされる。インタプリタの場合にはこれが評価され実行されるが、コンパイラの場合にはこれがアセンブリの命令に置き換えられ、スケジューリングされ、レジスタが割り当てられ、最終的にはバイナリが生成される。こうした構文解析構文木の構築と、中間表現の最適化、その評価の分離は、各工程をそれぞれ独立に作成し、必要に応じて取り替えられるという意味で、とても便利だ。

言語内DSLの処理系をそこまで重くするのは本末転倒感があるが、少なくとも構文木を構築しその評価と分離することは、抽象化の手段として適切な場合もあると思う。Haskellでは、言語内DSLのコードとその評価の分離には色々な方法 (型クラスによる抽象など) が考えられるが、ここではFree Monadを取り上げたい。自分で定義した、DSLを表現するデータ型をFunctorにし、Free Monadに入れれば、Monad計算で構文木を構築できる。これを評価するインタプリタは独立に複数用意できるし、評価先の型も自由に変えられる。

目次

Free Monadで言語内DSLをつくる

構文木を表現するデータ型の準備

まずは、自分のDSL構文木を表現するデータ型を定義する。Freeに包むためにはFunctorである必要があるが、この場合その定義は自明なので、簡単のためGHCのDeriveFunctor拡張を使ってderiving Functorしている。(この導出では型コンストラクタの最後の型変数に対応する値を写すようなfmapをつくってくれる。) またこのあと使う予定のモジュールもimportしておく。Freeには色々な実装や、関連するライブラリ、派生形などがあるが、ここでは最初の素朴な物を使っている。(処理効率を考えて実装されたControl.Monad.Free.Churchや、ASTのデータ型からFreeのアクションを導出してくれるControl.Monad.Free.THなどを参照されたし。)

{-# LANGUAGE DeriveFunctor #-}

import Control.Monad.Free
import Control.Monad.State.Lazy
import Control.Monad.Writer.Lazy

data MyAST r = Open r
             | Elephant String r
             | Close r
             | End
  deriving Functor

値コンストラクタはノードに対応し、型変数rは残りの木を表している。Freeに包むと、FreeとMyASTが交互に入ることになる。

構文木構築のための、FunctorからのFreeの作成

次に、MyASTの値から対応するFreeのアクションを作成する。Freeの値コンストラクタを使ってもいいが、liftFを使うと少し簡単に書ける。これらのFreeをつなげれば、Free MyAST a型の構文木が構築されていく。

{-
data Free f a = Pure a | Free (f (Free f a))
instance Functor f => Monad (Free f) where
  Free x >>= f = Free $ fmap (>>= f) x
  Pure a >>= f = f a
  return = Pure
liftF = Free . fmap return
-}

-- (対応するFreeの値) = liftF (DSLのデータ型の値)
-- (open = Free $ Open (Pure ()))
open  = liftF $ Open ()
close = liftF $ Close ()
end   = liftF End

putElephant :: String -> Free MyAST ()
putElephant name = liftF $ Elephant name ()

扉を開けて象を入れて扉を閉めてプログラムを終わらせるような命令を用意してみる。Freeは構文木の構築手段を用意しているだけなので、実際に何が行われるかはインタプリタが決める。

インタプリタの作成

最後に構文木を評価するためのインタプリタをつくる。Free MyAST a型の構文木を渡すと、最初のノードにパターンマッチして、何か処理を行い (例えばIntegerやStringなどの値に評価してもいいし、Monad計算に評価してもいい)、残りの木に対する処理を再帰的に呼び出す。葉はPureで表される。

interpretPrint :: Free MyAST a -> IO ()
interpretPrint f = case f of
  Free (Open r)       -> putStrLn "open" >> interpretPrint r
  Free (Elephant e r) -> putStrLn ("elephant: " ++ e) >> interpretPrint r
  Free (Close r)      -> putStrLn "close" >> interpretPrint r
  Free End            -> putStrLn "end"
  Pure _              -> return ()

DSLのプログラムを書く

準備ができたので、幾つかDSLのプログラムを書いて、インタプリタで評価してみよう。

program1 :: Free MyAST ()
program1 = do
  open
  putElephant "Sally"
  putElephant "Emet"
  end

program2 :: Free MyAST ()
program2 = do
  open
  putElephant "Emet"
  close
  putElephant "Sally"
  end

先ほど定義した、MyASTの値が入ったFreeをつなげることで、このプログラムを表現する構文木が作成される。

> interpretPrint program1
open
elephant: Sally
elephant: Emet
end

これをインタプリタで評価すれば結果が返る。このインタプリタでは標準出力にプログラムの構成を表示するだけ。

色々なインタプリタをつくる

操作のログを取る

構文木の構築とその評価が分離されているので、同じプログラムを評価する色々なインタプリタをつくることができる。interpretPrintはプログラムをIOにしていたが、interpretNamesは象の名前のリストに評価してくれる。

interpretNames :: Free MyAST () -> Writer [String] ()
interpretNames f = case f of
  Free (Open r)       -> interpretNames r
  Free (Elephant e r) -> tell [e] >> interpretNames r
  Free (Close r)      -> interpretNames r
  Free End            -> return ()
  Pure _              -> return ()

WriterにMonoidをtellすると、後ろでmappendしてくれる。Monoidは単位元memptyを持ち、結合法則を満たす2項演算mappendが使えることを要請する型クラスで、例えばリストのmemptyは[]でmappendは(++)。完成したWriterをexecWriterすれば、このMonoidの演算結果を返してくれる。

> execWriter $ interpretNames program2
["Emet","Sally"]

操作の解析をする

冷蔵庫の状態を表現する型を用意してStateと一緒に使うことで、冷蔵庫が適切に使われているか、最後にどんな状態になっているかを調べてくれるようなinterpretRefもつくってみよう。

data Refregirator = Opened | Closed | Broken deriving (Show, Eq)

interpretRef :: Free MyAST () -> State Refregirator ()
interpretRef f = case f of
  Free (Open r) -> do x <- get
                      case x of Broken -> return ()
                                _ -> put Opened >> interpretRef r
  Free (Elephant _ r) -> do x <- get
                            case x of Opened -> interpretRef r
                                      _ -> put Broken
  Free (Close r) -> do x <- get
                       case x of Broken -> return ()
                                 _ -> put Closed >> interpretRef r
  Free End -> return ()
  Pure _ -> return ()

閉まっているのに象を入れると壊れてしまうことにした。Stateはputやget、stateで明示的に状態を操作できるMonadで、execStateに完成したStateと初期状態をあたえれば、最後の状態を返してくれる。

> execState (interpretRef program1) Closed
Opened
> execState (interpretRef program2) Closed
Broken

DSLのコードはそのままに、IOから切り離して、インタプリタを取り替えて色々なテストや検証もできそうだ。

動的な制御と評価時の環境の利用

分岐する木の構築と評価時の動的な制御

ここまでは構文木と言いながら、リストに近いものしか扱ってこなかった。プログラムは1列に並んだ命令列のような構造を持っていた。それというのも言語内DSLではその言語の制御構文がそのまま使えるので、制御フローが静的に決定できるなら、構文木の構築時に分岐のない1つの命令列を選択できるためである。(分岐条件などの値は、インタプリタでの評価時に構文木の構築時点に送り込むこともできる。) 一方で、どうしてもインタプリタでの評価時に制御フロー自体を動的に決定したい時は、構文木を分岐する木として表現したいこともあるかもしれない。

data ArithAST a r = Unary (a -> a) r
                  | Binary (a -> a -> a) r r
                  | Branch (a -> Bool) r r r
                  | Val a
  deriving Functor

type FA a = Free (ArithAST a) ()

unary :: (a -> a) -> FA a
unary op = liftF $ Unary op ()

binary :: (a -> a -> a) -> FA a -> FA a
binary op l = Free $ Binary op l (Pure ())

branch :: (a -> Bool) -> FA a -> FA a -> FA a
branch cp t f = Free $ Branch cp t f (Pure ())

val :: a -> FA a
val x = liftF $ Val x

ここでは簡単のため、単項演算と2項演算と条件分岐を含む1つの式を表現する構文木をつくり、Branchの制御フローを変えたインタプリタを用意してみる。

type ASTF = Free (ArithAST Double) ()

interpretArith :: ASTF -> Double
interpretArith f = case f of
  Free (Unary f r)      -> f $ interpretArith r
  Free (Binary f r l)   -> f (interpretArith r) (interpretArith l)
  Free (Branch f l r c) -> if f (interpretArith c)
                             then interpretArith l else interpretArith r
  Free (Val x)          -> x
  Pure _                -> 0

interpretArithBr :: ASTF -> Double
interpretArithBr f = case f of
-- same as interpretArith ...
  Free (Branch f c l r) -> if f (interpretArith c)
                             then interpretArith l else interpretArith r
-- same as interpretArith ...

コードを書いてそれぞれのインタプリタで評価してみる。ノードに演算子があり、木が下向きに延びているので、計算の流れは下から上になる。

-- ((*3) (-2))
calc1 :: ASTF
calc1 = do
  unary (*3)
  val (-2)

-- (log (max calc1 ((**2.5) 0.5)))
calc2 :: ASTF
calc2 = do
  unary log
  binary max calc1
  unary (**2.5)
  val 0.5

-- if (>0) 1.0 then calc1 else calc2 -- interpretArith
-- if (>0) calc1 then calc2 else 1.0 -- interpretArithBr
calc3 :: ASTF
calc3 = do
  branch (>0) calc1 calc2
  val 1.0

> interpretArith calc3
0.9535708819095106
> interpretArithBr calc3
-4.75415180475803e-2

評価時の環境からの変数束縛

それは入れ子の無名関数でしかないが、do記法の変数束縛記法は直感的で便利だ。ASTを表現するデータ型 (ここではStAST) で関数のフィールド (これがfmapされるようにする) を使えば、インタプリタでの評価時の環境から構文木の (実際の) 構築時点に値を送り込み、Freeのdoの中で変数束縛できる。DSLの中で、例えば評価時にIOから取り出した値を利用したり、StateのようにStASTの文脈から取り出した値を利用したりできる。

data StAST s cs r = Read (s -> r)
                  | Const (cs -> r)
                  | Write s r
  deriving Functor

readS :: Free (StAST s cs) s
readS = liftF $ Read id
-- (readS = Free $ Read (\x -> Pure x))

constS :: Free (StAST s cs) cs
constS = liftF $ Const id

writeS :: s -> Free (StAST s cs) ()
writeS x = liftF $ Write x ()

StASTに変化する状態を表現するフィールドsと、変化しない定数を表現するフィールドcsを作り、それぞれを操作するためのFreeのアクションを用意する。

interpretStAST :: Free (StAST s cs) a -> s -> cs -> s
interpretStAST f x c = case f of
  Free (Read g)    -> interpretStAST (g x) x c
  Free (Const h)   -> interpretStAST (h c) x c
  Free (Write y r) -> interpretStAST r y c
  Pure _           -> x

execStateに相当するインタプリタを作成する。FreeのプログラムとStASTの初期状態をあたえると、最後の状態を返してくれる。プログラムを書いて評価してみよう。

sproc1 :: Free (StAST Double Double) ()
sproc1 = do
  x <- readS
  if x < 0.0
    then constS >>= writeS
    else writeS $ sqrt x
  y <- readS
  writeS (x + y)

> interpretStAST sproc1 2.0 0.0
3.414213562373095
> interpretStAST sproc1 (-2.0) 1.0
-1.0

こうした方法を使えば、StASTの状態をDSLの中から確認して制御を変えたりもできる。

まとめと参考文献

Freeを使うと、DSL構文木構築とその評価を分離できる。同じDSLのコードに対して複数のインタプリタを使い分けられるので、例えばIOから切り離してテストや検証などにも利用できる。Freeの中では評価時の状態を制御に組み込めるし、どうしても必要なら制御フロー自体を評価時に選ぶこともできる。Freeの便利さや使い方については、より進んだ内容についてもたくさんの良い記事が書かれているので、参照されたい。

free: Monads for free
Haskell for all: Why free monads matter
Haskell for all: Purify code using free monads
dlaing.org
Free and Freer Monads