Skip to content

Part IV: タスク分解と並列パターン

概要

本章では、複雑な問題を並列化可能なサブタスクに分割する手法を学びます。Fork/Join パターンとパイプラインパターンの実装を通じて、並列処理のデザインパターンを理解します。


第7章: 並列パターン

データ並列 vs タスク並列

パターン 説明
データ並列 同じ操作を複数のデータに適用 配列の各要素を2倍
タスク並列 異なる操作を同時に実行 入力・計算・出力

Fork/Join パターン

問題を分割し、並列に処理した後、結果を統合するパターンです。

投票集計の例

#!/usr/bin/env python3

"""Fork/Join パターンによる投票集計"""

import typing as T
import random
from math import ceil
from threading import Thread


class StaffMember(Thread):
    """投票用紙を集計するスタッフ"""

    def __init__(self, votes: T.List[int]):
        super().__init__()
        self.votes = votes
        self.summary: T.Dict[int, int] = {}

    def run(self) -> None:
        for vote in self.votes:
            if vote in self.summary:
                self.summary[vote] += 1
            else:
                self.summary[vote] = 1


def count_votes(votes: T.List[int], num_workers: int = 3) -> T.Dict[int, int]:
    """投票を複数のワーカーで並列集計"""
    workers = []
    vote_count = len(votes)
    votes_per_worker = ceil(vote_count / num_workers)

    # Fork: 投票用紙をワーカーに分配
    for i in range(num_workers):
        start = i * votes_per_worker
        end = start + votes_per_worker
        pile = votes[start:end]
        worker = StaffMember(pile)
        workers.append(worker)
        worker.start()

    # Join: 全ワーカーの完了を待機
    for worker in workers:
        worker.join()

    # 結果の統合
    total: T.Dict[int, int] = {}
    for worker in workers:
        for candidate, count in worker.summary.items():
            total[candidate] = total.get(candidate, 0) + count

    return total


if __name__ == "__main__":
    # 10万票を生成
    votes = [random.randint(1, 10) for _ in range(100000)]
    result = count_votes(votes)
    print(f"投票結果: {result}")

Fork/Join の流れ

uml diagram


パイプラインパターン

処理をステージに分割し、各ステージを並列に実行するパターンです。

洗濯パイプラインの例

#!/usr/bin/env python3

"""パイプラインパターンによる洗濯処理"""

import time
from queue import Queue
from threading import Thread

Washload = str


class Washer(Thread):
    """洗濯機を表すスレッド"""

    def __init__(self, in_queue: Queue, out_queue: Queue):
        super().__init__()
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self) -> None:
        while True:
            washload = self.in_queue.get()
            print(f"Washer: washing {washload}...")
            time.sleep(4)  # 洗濯に4秒
            self.out_queue.put(washload)
            self.in_queue.task_done()


class Dryer(Thread):
    """乾燥機を表すスレッド"""

    def __init__(self, in_queue: Queue, out_queue: Queue):
        super().__init__()
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self) -> None:
        while True:
            washload = self.in_queue.get()
            print(f"Dryer: drying {washload}...")
            time.sleep(2)  # 乾燥に2秒
            self.out_queue.put(washload)
            self.in_queue.task_done()


class Folder(Thread):
    """たたみ処理を表すスレッド"""

    def __init__(self, in_queue: Queue):
        super().__init__()
        self.in_queue = in_queue

    def run(self) -> None:
        while True:
            washload = self.in_queue.get()
            print(f"Folder: folding {washload}...")
            time.sleep(1)  # たたみに1秒
            print(f"Folder: {washload} done!")
            self.in_queue.task_done()


class Pipeline:
    """洗濯パイプライン"""

    def run_concurrently(self) -> None:
        # キューのセットアップ
        to_be_washed: Queue = Queue()
        to_be_dried: Queue = Queue()
        to_be_folded: Queue = Queue()

        # 洗濯物を投入
        for i in range(4):
            to_be_washed.put(f'Washload #{i}')

        # パイプラインを開始
        Washer(to_be_washed, to_be_dried).start()
        Dryer(to_be_dried, to_be_folded).start()
        Folder(to_be_folded).start()

        # 完了を待機
        to_be_washed.join()
        to_be_dried.join()
        to_be_folded.join()
        print("All done!")


if __name__ == "__main__":
    pipeline = Pipeline()
    pipeline.run_concurrently()

パイプラインの流れ

uml diagram

逐次処理との比較

処理方式 4つの洗濯物
逐次処理 (4+2+1) × 4 = 28秒
パイプライン 4+4+4+4 + 2 + 1 = 23秒

Queue によるスレッド間通信

queue.Queue はスレッドセーフなキューです。

メソッド 説明
put(item) アイテムを追加(ブロッキング)
get() アイテムを取得(ブロッキング)
task_done() タスク完了を通知
join() 全タスク完了を待機

次のステップ

Part V では、同期と排他制御を学びます。レースコンディション、デッドロックなどの問題と、Lock、Semaphore による解決策を理解します。


参考コード