Part VIII: 分散並列処理¶
8.1 はじめに¶
本章では、大規模データを複数のプロセッサで効率的に処理する MapReduce パターンを学びます。ワードカウントを題材に、8 つの言語がそれぞれどのような並列コレクション・並列イテレータを提供しているかを比較します。
MapReduce とは¶
MapReduce は 3 つのフェーズで構成されます:
入力データ → [Map] → (key, value) ペア → [Shuffle] → グループ化 → [Reduce] → 最終結果
- Map: 各入力要素を (key, value) ペアに変換
- Shuffle: ペアをキーごとにグループ化
- Reduce: 各グループの値を集約
8.2 共通の本質¶
ワードカウントのアルゴリズム¶
入力: ["hello world", "hello rust", "world of rust"]
Map:
"hello world" → [("hello", 1), ("world", 1)]
"hello rust" → [("hello", 1), ("rust", 1)]
"world of rust" → [("world", 1), ("of", 1), ("rust", 1)]
Shuffle:
"hello" → [1, 1]
"world" → [1, 1]
"rust" → [1, 1]
"of" → [1]
Reduce:
"hello" → 2, "world" → 2, "rust" → 2, "of" → 1
並列化のポイント¶
Map フェーズは恥ずかしいほど並列(embarrassingly parallel)です。各テキストの処理は完全に独立しており、ロックや同期なしで並列化できます。
8.3 言語別実装比較¶
Map フェーズ¶
関数型ファースト言語¶
Haskell 実装
mapPhase :: String -> [(String, Int)]
mapPhase text = [(map toLower word, 1) | word <- words text, not (null word)]
Clojure 実装
(defn map-phase [text]
(->> (str/split (str/lower-case text) #"\s+")
(filter (complement str/blank?))
(map (fn [word] [word 1]))))
マルチパラダイム言語¶
Rust 実装
pub fn map(text: &str) -> Vec<(String, usize)> {
text.to_lowercase()
.split_whitespace()
.filter(|s| !s.is_empty())
.map(|word| (word.to_string(), 1))
.collect()
}
Scala 実装
def map(text: String): List[(String, Int)] =
text.toLowerCase
.split("\\s+")
.filter(_.nonEmpty)
.map(word => (word, 1))
.toList
F# 実装
let map (text: string) : (string * int) list =
text.ToLower().Split([|' '; '\t'; '\n'|], StringSplitOptions.RemoveEmptyEntries)
|> Array.map (fun word -> (word, 1))
|> Array.toList
OOP + 並行処理ライブラリ言語¶
Java 実装
public static List<Map.Entry<String, Integer>> map(String text) {
return Arrays.stream(text.toLowerCase().split("\\s+"))
.map(word -> Map.entry(word, 1))
.toList();
}
C# 実装
public static List<KeyValuePair<string, int>> Map(string text) {
return text.ToLower()
.Split(' ', StringSplitOptions.RemoveEmptyEntries)
.Select(word => new KeyValuePair<string, int>(word, 1))
.ToList();
}
Python 実装
def map_function(text: str) -> List[Tuple[str, int]]:
words = text.lower().split()
return [(word, 1) for word in words]
並列 MapReduce(全体)¶
並列化メカニズムの比較¶
| 言語 | 並列化 API | 特徴 |
|---|---|---|
| Python | multiprocessing.Pool.map() |
プロセスベース(GIL 回避) |
| Java | parallelStream() |
ForkJoinPool 自動管理 |
| C# | AsParallel() (PLINQ) |
LINQ との統合 |
| Scala | .par コレクション |
ワンメソッドで並列化 |
| F# | Array.Parallel.collect |
Array 専用並列モジュール |
| Rust | par_iter() (Rayon) |
ワークスティーリング |
| Haskell | mapConcurrently |
非同期並列マップ |
| Clojure | pmap |
並列マップ関数 |
Rust 実装(完全並列 fold/reduce)
pub fn count_words_full_parallel(texts: &[&str]) -> HashMap<String, usize> {
texts
.par_iter()
.flat_map(|text| map(text))
.fold(
|| HashMap::new(),
|mut acc, (word, count)| {
*acc.entry(word).or_insert(0) += count;
acc
},
)
.reduce(
|| HashMap::new(),
|mut a, b| {
for (k, v) in b {
*a.entry(k).or_insert(0) += v;
}
a
},
)
}
Java 実装(parallelStream)
public static Map<String, Integer> wordCount(List<String> texts) {
List<Map.Entry<String, Integer>> mapped = texts.parallelStream()
.flatMap(text -> map(text).stream())
.toList();
return reduce(mapped);
}
public static Map<String, Integer> reduce(List<Map.Entry<String, Integer>> pairs) {
return pairs.stream()
.collect(Collectors.groupingBy(
Map.Entry::getKey,
Collectors.summingInt(Map.Entry::getValue)
));
}
C# 実装(PLINQ)
public static Dictionary<string, int> CountWords(List<string> texts) {
var mapped = texts
.AsParallel()
.SelectMany(text => Map(text))
.ToList();
return Reduce(mapped);
}
Haskell 実装
countWordsParallel :: [String] -> IO (Map String Int)
countWordsParallel [] = return Map.empty
countWordsParallel texts = do
mapped <- mapConcurrently (return . mapPhase) texts
return $ reducePhase (concat mapped)
Clojure 実装(pmap)
(defn count-words-parallel [texts]
(if (empty? texts)
{}
(->> texts
(pmap map-phase)
(apply concat)
reduce-phase)))
8.4 比較分析¶
並列化の手軽さ¶
最も簡単 ┌──────────────────────────────┐
│ Scala: .par │ ← 1 メソッド追加
│ C#: .AsParallel() │
│ Clojure: map → pmap │
├──────────────────────────────┤
手軽 │ Java: stream → parallelStream │
│ Rust: iter → par_iter │
│ F#: Array.map → Parallel.map │
├──────────────────────────────┤
やや複雑 │ Haskell: mapConcurrently │ ← IO モナドが必要
│ Python: Pool.map() │ ← プロセス分離が必要
└──────────────────────────────┘
Reduce フェーズの実装¶
| 言語 | Reduce 方法 | 特徴 |
|---|---|---|
| Python | defaultdict(int) + ループ |
命令的 |
| Java | Collectors.groupingBy + summingInt |
宣言的コレクター |
| C# | GroupBy + Sum |
LINQ 統合 |
| Scala | groupBy + mapValues + sum |
関数型コレクション |
| F# | List.groupBy + List.sumBy |
パイプライン |
| Rust | HashMap.entry().or_insert() |
Entry API |
| Haskell | foldl' + Map.insertWith (+) |
純粋関数型 |
| Clojure | reduce + update + fnil |
関数合成 |
パフォーマンス特性¶
| 言語 | 並列化方式 | スレッドプール | 最適なケース |
|---|---|---|---|
| Python | プロセスプール | OS プロセス | CPU バウンド(GIL 回避) |
| Java | ForkJoinPool | ワークスティーリング | 汎用並列処理 |
| C# | ThreadPool | .NET ThreadPool | LINQ クエリ並列化 |
| Scala | Fork/Join | ワークスティーリング | コレクション操作 |
| F# | .NET ThreadPool | .NET ThreadPool | 配列操作 |
| Rust | Rayon | ワークスティーリング | ゼロコスト並列化 |
| Haskell | GHC RTS | Green Thread | I/O 並行処理 |
| Clojure | JVM ThreadPool | 固定プール | 関数適用 |
アムダールの法則¶
高速化率 = 1 / (S + P/N)
S = 逐次処理の割合
P = 並列化可能な割合 (P = 1 - S)
N = プロセッサ数
例: 逐次 10%、並列 90%、4 コア
高速化率 = 1 / (0.1 + 0.9/4) = 3.08 倍
並列化可能な部分(Map フェーズ)の割合が大きいほど、コア数に比例した高速化が期待できます。
8.5 実践的な選択指針¶
大規模データ処理に適した言語¶
最も適している:
- Rust (Rayon) —
fold+reduceで Map と Reduce を一貫して並列化。ゼロコスト抽象化 - Java (parallelStream) — エンタープライズ規模のデータ処理。Stream API の成熟度
手軽さ重視:
- Scala (.par) — ワンメソッドで並列化。ただし順序保証に注意
- C# (PLINQ) —
.AsParallel()で既存 LINQ クエリを即座に並列化 - Clojure (pmap) —
mapをpmapに置き換えるだけ
関数型アプローチ:
- Haskell — 純粋関数は安全に並列化。
mapConcurrentlyが宣言的 - F# —
Array.Parallelモジュールで配列操作を並列化
8.6 まとめ¶
言語横断的な学び¶
- MapReduce は関数型の自然な拡張 —
map+reduceを並列化するだけで大規模データ処理が可能 - 並列化の手軽さは言語の設計思想 —
.par/.AsParallel()/pmapのワンメソッド並列化 - Reduce の表現力 — 宣言的 (Java Collectors) vs 命令的 (Python ループ) vs 関数型 (Haskell foldl')
- アムダールの法則 — 逐次部分の割合が並列化の効果を決定
各言語の個別記事¶
| 言語 | 個別記事 |
|---|---|
| Python | Part VIII - 分散並列処理 |
| Java | Part VIII - 分散並列処理 |
| C# | Part VIII - 分散並列処理 |
| Scala | Part VIII - 分散並列処理 |
| F# | Part VIII - 分散並列処理 |
| Rust | Part VIII - 分散並列処理 |
| Haskell | Part VIII - 分散並列処理 |
| Clojure | Part VIII - 分散並列処理 |