Skip to content

Part IV: タスク分解と並列パターン

概要

本章では、Fork/Join パターンとパイプラインパターンを学びます。


Fork/Join パターン: 投票カウンター

逐次カウント

module VoteCounter =
    /// Count votes sequentially
    let countVotes (votes: string list) : Map<string, int> =
        votes
        |> List.groupBy id
        |> List.map (fun (key, group) -> (key, List.length group))
        |> Map.ofList

    /// Merge two vote count results
    let mergeResults (a: Map<string, int>) (b: Map<string, int>) : Map<string, int> =
        let allKeys = Set.union (Map.keys a |> Set.ofSeq) (Map.keys b |> Set.ofSeq)
        allKeys
        |> Set.toList
        |> List.map (fun key ->
            let countA = Map.tryFind key a |> Option.defaultValue 0
            let countB = Map.tryFind key b |> Option.defaultValue 0
            (key, countA + countB))
        |> Map.ofList

並列カウント

/// Count votes using fork-join pattern
let countVotesParallel (votes: string list) : Map<string, int> =
    if List.isEmpty votes then
        Map.empty
    else
        let numCores = Environment.ProcessorCount
        let chunkSize = max 1 (List.length votes / numCores)
        let chunks = votes |> List.chunkBySize chunkSize

        let results =
            chunks
            |> List.map (fun chunk -> async { return countVotes chunk })
            |> Async.Parallel
            |> Async.RunSynchronously
            |> Array.toList

        results |> List.reduce mergeResults

パイプラインパターン

type Stage = { Name: string; Processor: obj -> obj }
type Pipeline<'T> = { Stages: Stage list }

module Pipeline =
    let createPipeline<'T> : Pipeline<'T> =
        { Stages = [] }

    let addStage (name: string) (processor: obj -> obj) (pipeline: Pipeline<'T>) : Pipeline<'T> =
        { pipeline with Stages = pipeline.Stages @ [{ Name = name; Processor = processor }] }

    let run (pipeline: Pipeline<'T>) (data: 'T list) : obj list =
        if List.isEmpty pipeline.Stages then
            data |> List.map (fun x -> x :> obj)
        else
            data
            |> List.map (fun item ->
                pipeline.Stages
                |> List.fold (fun current stage -> stage.Processor current) (item :> obj)
            )

使用例

let pipeline =
    Pipeline.createPipeline<int>
    |> Pipeline.addStage "double" (fun x -> (x :?> int) * 2 :> obj)
    |> Pipeline.addStage "addOne" (fun x -> (x :?> int) + 1 :> obj)

let results = Pipeline.run pipeline [1; 2; 3]
// [3; 5; 7]

次のステップ

Part V では、同期と排他制御を学びます。