Skip to content

Part IV: タスク分解と並列パターン

概要

本章では、Fork/Join パターンと Pipeline パターンを学びます。


Fork/Join パターン

VoteCounter 実装

(defn count-votes-sequential
  "逐次で投票をカウント"
  [votes]
  (reduce (fn [acc vote]
            (update acc vote (fnil inc 0)))
          {}
          votes))

(defn merge-counts
  "複数のカウント結果をマージ"
  [counts]
  (reduce (fn [acc m]
            (merge-with + acc m))
          {}
          counts))

並列 Fork/Join

(defn count-votes-parallel
  "並列で投票をカウント(Fork/Join パターン)"
  [votes num-workers]
  (if (empty? votes)
    {}
    (let [chunks (partition-all (max 1 (quot (count votes) num-workers)) votes)
          futures (doall (map #(future (count-votes-sequential %)) chunks))
          results (map deref futures)]
      (merge-counts results))))

Pipeline パターン

core.async を使用

(require '[clojure.core.async :as async :refer [chan go >! <! >!! <!!]])

(defn create-pipeline
  "入力を2倍にするパイプラインを作成"
  []
  (let [input (chan 10)
        output (chan 10)]
    (go
      (loop []
        (when-let [v (<! input)]
          (>! output (* v 2))
          (recur))))
    {:input input :output output}))

パイプライン操作

(defn send-to-pipeline! [pipeline value]
  (>!! (:input pipeline) value))

(defn receive-from-pipeline! [pipeline]
  (<!! (:output pipeline)))

;; 使用例
(let [p (create-pipeline)]
  (send-to-pipeline! p 5)
  (receive-from-pipeline! p))  ;; => 10

チェーンパイプライン

(defn run-chain-pipeline
  "チェーンパイプラインを実行"
  [initial-value functions]
  (if (empty? functions)
    initial-value
    (let [channels (repeatedly (inc (count functions)) #(chan 1))]
      (doseq [[in-ch out-ch f] (map vector channels (rest channels) functions)]
        (go
          (when-let [v (<! in-ch)]
            (>! out-ch (f v)))))
      (>!! (first channels) initial-value)
      (<!! (last channels)))))

;; 使用例
(run-chain-pipeline 5 [#(* % 2) #(+ % 3) #(* % 10)])
;; => 130  ; ((5 * 2) + 3) * 10

次のステップ

Part V では、STM による同期と排他制御を学びます。