Skip to content

Part VIII: 分散並列処理

概要

本章では、MapReduce パターンと mapConcurrently を学びます。


MapReduce パターン

ワードカウント実装

import Control.Concurrent.Async (mapConcurrently)
import Data.Char (toLower)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map

-- | Map: Convert text to (word, 1) pairs
mapPhase :: String -> [(String, Int)]
mapPhase text =
    [(map toLower word, 1) | word <- words text, not (null word)]

-- | Reduce: Aggregate word counts
reducePhase :: [(String, Int)] -> Map String Int
reducePhase = foldl' addPair Map.empty
  where
    addPair acc (word, count) = Map.insertWith (+) word count acc

-- | MapReduce: Count words in multiple texts
countWords :: [String] -> Map String Int
countWords texts =
    let mapped = concatMap mapPhase texts
    in reducePhase mapped

並列版

-- | MapReduce with parallel execution
countWordsParallel :: [String] -> IO (Map String Int)
countWordsParallel [] = return Map.empty
countWordsParallel texts = do
    -- Map phase (parallel)
    mapped <- mapConcurrently (return . mapPhase) texts
    -- Reduce phase
    return $ reducePhase (concat mapped)

使用例

main :: IO ()
main = do
    let texts = ["hello world", "hello haskell", "world of haskell"]

    result <- countWordsParallel texts
    -- fromList [("hello",2),("haskell",2),("of",1),("world",2)]
    print result

mapConcurrently

import Control.Concurrent.Async

-- 並列 map
results <- mapConcurrently processItem items

-- 並列 filter + map
results <- mapConcurrently (\x ->
    if condition x
        then return (Just (process x))
        else return Nothing
    ) items

まとめ

本シリーズで学んだ内容:

Part トピック キーポイント
I 逐次処理 Maybe、純粋関数
II スレッド forkIO、async
III マルチタスキング STM、TVar
IV 並列パターン Fork/Join、Pipeline
V 同期 STM、デッドロックフリー
VI ノンブロッキング async
VII 非同期 race、concurrently
VIII 分散処理 MapReduce

参考資料