Part VIII: 分散並列処理¶
概要¶
本章では、MapReduce パターンと mapConcurrently を学びます。
MapReduce パターン¶
ワードカウント実装¶
import Control.Concurrent.Async (mapConcurrently)
import Data.Char (toLower)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
-- | Map: Convert text to (word, 1) pairs
mapPhase :: String -> [(String, Int)]
mapPhase text =
[(map toLower word, 1) | word <- words text, not (null word)]
-- | Reduce: Aggregate word counts
reducePhase :: [(String, Int)] -> Map String Int
reducePhase = foldl' addPair Map.empty
where
addPair acc (word, count) = Map.insertWith (+) word count acc
-- | MapReduce: Count words in multiple texts
countWords :: [String] -> Map String Int
countWords texts =
let mapped = concatMap mapPhase texts
in reducePhase mapped
並列版¶
-- | MapReduce with parallel execution
countWordsParallel :: [String] -> IO (Map String Int)
countWordsParallel [] = return Map.empty
countWordsParallel texts = do
-- Map phase (parallel)
mapped <- mapConcurrently (return . mapPhase) texts
-- Reduce phase
return $ reducePhase (concat mapped)
使用例¶
main :: IO ()
main = do
let texts = ["hello world", "hello haskell", "world of haskell"]
result <- countWordsParallel texts
-- fromList [("hello",2),("haskell",2),("of",1),("world",2)]
print result
mapConcurrently¶
import Control.Concurrent.Async
-- 並列 map
results <- mapConcurrently processItem items
-- 並列 filter + map
results <- mapConcurrently (\x ->
if condition x
then return (Just (process x))
else return Nothing
) items
まとめ¶
本シリーズで学んだ内容:
| Part | トピック | キーポイント |
|---|---|---|
| I | 逐次処理 | Maybe、純粋関数 |
| II | スレッド | forkIO、async |
| III | マルチタスキング | STM、TVar |
| IV | 並列パターン | Fork/Join、Pipeline |
| V | 同期 | STM、デッドロックフリー |
| VI | ノンブロッキング | async |
| VII | 非同期 | race、concurrently |
| VIII | 分散処理 | MapReduce |