Skip to content

Part VIII: 分散並列処理

概要

本章では、MapReduce パターンと Scala の並列コレクションを学びます。


MapReduce パターン

ワードカウント実装

import scala.collection.parallel.CollectionConverters.*

object WordCount:

  /** Map: Convert text to (word, 1) pairs */
  def map(text: String): List[(String, Int)] =
    text.toLowerCase
      .split("\\s+")
      .filter(_.nonEmpty)
      .map(word => (word, 1))
      .toList

  /** Reduce: Aggregate word counts */
  def reduce(pairs: List[(String, Int)]): Map[String, Int] =
    pairs.groupBy(_._1).view.mapValues(_.map(_._2).sum).toMap

  /** MapReduce: Count words in multiple texts using parallel collections */
  def countWords(texts: List[String]): Map[String, Int] =
    // Map phase (parallel)
    val mapped = texts.par.flatMap(map).toList

    // Reduce phase
    reduce(mapped)

使用例

val texts = List(
  "hello world",
  "hello scala",
  "world of scala"
)

val result = WordCount.countWords(texts)
// Map(hello -> 2, world -> 2, scala -> 2, of -> 1)

並列コレクション

基本的な使い方

import scala.collection.parallel.CollectionConverters.*

val numbers = (1 to 1000000).toList

// 逐次処理
val seqSum = numbers.map(_ * 2).sum

// 並列処理
val parSum = numbers.par.map(_ * 2).sum

注意点

// 順序が保証されない
val result = (1 to 10).par.map(x => s"$x: ${Thread.currentThread.getName}")
// 順序はランダム

// 副作用のある操作は危険
var counter = 0
(1 to 1000).par.foreach(_ => counter += 1) // 競合状態
// counter < 1000 になる可能性あり

並列コレクションの選択

コレクション 並列版 適した操作
List ParVector map, filter
Array ParArray インデックスアクセス
Range ParRange 連続処理
Map ParMap キー操作

まとめ

本シリーズで学んだ内容:

Part トピック キーポイント
I 逐次処理 基本概念、ブルートフォース
II スレッド Thread、Future
III マルチタスキング ゲームループ
IV 並列パターン Fork/Join、Pipeline
V 同期 synchronized、デッドロック回避
VI ノンブロッキング NIO、Selector
VII 非同期 Future、Promise
VIII 分散処理 MapReduce、並列コレクション

参考資料