Part IV: タスク分解と並列パターン¶
概要¶
本章では、Fork/Join パターンとパイプラインパターンを学びます。
Fork/Join パターン: 投票カウンター¶
逐次カウント¶
object VoteCounter:
/** Count votes sequentially */
def countVotes(votes: List[String]): Map[String, Int] =
votes.groupBy(identity).view.mapValues(_.size).toMap
/** Merge two vote count results */
def mergeResults(a: Map[String, Int], b: Map[String, Int]): Map[String, Int] =
(a.keys ++ b.keys).map { key =>
key -> (a.getOrElse(key, 0) + b.getOrElse(key, 0))
}.toMap
並列カウント¶
import scala.concurrent.{Future, Await, ExecutionContext}
import scala.concurrent.duration.Duration
given ExecutionContext = ExecutionContext.global
/** Count votes using fork-join pattern */
def countVotesParallel(votes: List[String]): Map[String, Int] =
if votes.isEmpty then return Map.empty
val numCores = Runtime.getRuntime.availableProcessors()
val chunkSize = math.max(1, votes.size / numCores)
val chunks = votes.grouped(chunkSize).toList
val futures = chunks.map { chunk =>
Future(countVotes(chunk))
}
val results = futures.map(f => Await.result(f, Duration.Inf))
results.reduce(mergeResults)
パイプラインパターン¶
case class Stage(name: String, processor: Any => Any)
class Pipeline[T](stages: List[Stage] = List.empty):
/** Add a stage to the pipeline */
def addStage[A, B](name: String, processor: A => B): Pipeline[T] =
new Pipeline[T](stages :+ Stage(name, x => processor(x.asInstanceOf[A])))
/** Process data through all stages */
def process(data: List[T]): List[Any] =
if stages.isEmpty then return data
data.map { item =>
stages.foldLeft[Any](item) { (current, stage) =>
stage.processor(current)
}
}
object Pipeline:
def apply[T](): Pipeline[T] = new Pipeline[T]()
使用例¶
val pipeline = Pipeline[Int]()
.addStage("double", (x: Int) => x * 2)
.addStage("addOne", (x: Int) => x + 1)
val results = pipeline.process(List(1, 2, 3))
// List(3, 5, 7)
ポイント¶
- groupBy + mapValues: 関数型スタイルの集計
- foldLeft: ステージを順次適用
- ビルダーパターン: 流暢なインターフェース
次のステップ¶
Part V では、同期と排他制御を学びます。