Skip to content

Part VIII: 分散並列処理

概要

本章では、MapReduce パターンを使った分散並列処理を学びます。行列乗算とワードカウントの実装を通じて、大規模データの並列処理手法を理解します。


第13章: MapReduce パターン

MapReduce とは

MapReduce は、大規模データを並列処理するためのプログラミングモデルです。

処理フロー

uml diagram


行列乗算の並列化

逐次処理版

#!/usr/bin/env python3

"""行列乗算(逐次処理版)"""

from typing import List

Row = List[int]
Matrix = List[Row]


def matrix_multiply(matrix_a: Matrix, matrix_b: Matrix) -> Matrix:
    """2つの行列を乗算"""
    num_rows_a = len(matrix_a)
    num_cols_a = len(matrix_a[0])
    num_cols_b = len(matrix_b[0])

    result = [[0] * num_cols_b for _ in range(num_rows_a)]

    for i in range(num_rows_a):
        for j in range(num_cols_b):
            for k in range(num_cols_a):
                result[i][j] += matrix_a[i][k] * matrix_b[k][j]

    return result

並列処理版

#!/usr/bin/env python3

"""行列乗算(並列処理版)"""

from typing import List, Tuple
import random
from multiprocessing import Pool

Row = List[int]
Column = List[int]
Matrix = List[Row]


def process_row(args: Tuple[Matrix, Matrix, int]) -> Column:
    """1行分の計算を実行"""
    matrix_a, matrix_b, row_idx = args
    num_cols_a = len(matrix_a[0])
    num_cols_b = len(matrix_b[0])

    result_row = [0] * num_cols_b
    for j in range(num_cols_b):
        for k in range(num_cols_a):
            result_row[j] += matrix_a[row_idx][k] * matrix_b[k][j]
    return result_row


def matrix_multiply_parallel(matrix_a: Matrix, matrix_b: Matrix) -> Matrix:
    """行列を並列で乗算"""
    num_rows_a = len(matrix_a)
    num_cols_a = len(matrix_a[0])
    num_rows_b = len(matrix_b)

    if num_cols_a != num_rows_b:
        raise ArithmeticError(
            f"Invalid dimensions: {num_rows_a}x{num_cols_a} * "
            f"{num_rows_b}x{len(matrix_b[0])}")

    # 各行の計算を並列実行
    with Pool() as pool:
        args = [(matrix_a, matrix_b, i) for i in range(num_rows_a)]
        results = pool.map(process_row, args)

    return results


if __name__ == "__main__":
    rows, cols = 100, 100
    A = [[random.randint(0, 10) for _ in range(cols)]
         for _ in range(rows)]
    B = [[random.randint(0, 10) for _ in range(rows)]
         for _ in range(cols)]

    C = matrix_multiply_parallel(A, B)
    print(f"Result matrix shape: {len(C)}x{len(C[0])}")

パフォーマンス比較

import time
from multiprocessing import Pool


def time_sequential(A, B):
    start = time.perf_counter()
    matrix_multiply(A, B)
    return time.perf_counter() - start


def time_parallel(A, B):
    start = time.perf_counter()
    matrix_multiply_parallel(A, B)
    return time.perf_counter() - start


# 500x500 行列の場合
# Sequential: ~15秒
# Parallel (4 cores): ~5秒

ワードカウント

MapReduce によるワードカウント

#!/usr/bin/env python3

"""MapReduce によるワードカウント"""

from collections import defaultdict
from multiprocessing import Pool
import typing as T


def map_function(text: str) -> T.List[T.Tuple[str, int]]:
    """Map: テキストを (word, 1) のペアに変換"""
    words = text.lower().split()
    return [(word, 1) for word in words]


def reduce_function(word_counts: T.List[T.Tuple[str, int]]) -> T.Dict[str, int]:
    """Reduce: 同じ単語のカウントを集約"""
    result = defaultdict(int)
    for word, count in word_counts:
        result[word] += count
    return dict(result)


def word_count_mapreduce(texts: T.List[str]) -> T.Dict[str, int]:
    """MapReduce でワードカウント"""

    # Map フェーズ(並列)
    with Pool() as pool:
        mapped = pool.map(map_function, texts)

    # Shuffle: 全ての結果をフラット化
    shuffled = [pair for sublist in mapped for pair in sublist]

    # Reduce フェーズ
    result = reduce_function(shuffled)

    return result


if __name__ == "__main__":
    texts = [
        "Hello world hello",
        "World of programming",
        "Hello programming world",
    ]

    counts = word_count_mapreduce(texts)
    for word, count in sorted(counts.items(), key=lambda x: -x[1]):
        print(f"{word}: {count}")

実行結果

world: 3
hello: 3
programming: 2
of: 1

分散処理アーキテクチャ

マスター・ワーカー構成

uml diagram

ワーカーの実装例

#!/usr/bin/env python3

"""分散ワードカウントのワーカー"""

import socket
import pickle
from collections import defaultdict


def count_words(text: str) -> dict:
    """テキスト内の単語をカウント"""
    counts = defaultdict(int)
    for word in text.lower().split():
        counts[word] += 1
    return dict(counts)


def worker(host: str = 'localhost', port: int = 9000) -> None:
    """ワーカープロセス"""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((host, port))
        print("Connected to master")

        while True:
            # タスクを受信
            data = s.recv(4096)
            if not data:
                break

            text = pickle.loads(data)
            print(f"Processing: {text[:50]}...")

            # 処理を実行
            result = count_words(text)

            # 結果を送信
            s.send(pickle.dumps(result))


if __name__ == "__main__":
    worker()

スケーリングの考慮事項

水平スケーリング

要素 説明
データ分割 データを均等に分配
負荷分散 ワーカー間で作業量を均等化
耐障害性 ワーカー障害時のリカバリ
通信コスト ネットワークオーバーヘッド

Amdahl の法則

並列化による高速化の限界を示す法則:

Speedup = 1 / (S + P/N)

S: 逐次部分の割合
P: 並列部分の割合 (P = 1 - S)
N: プロセッサ数

まとめ

本シリーズで学んだ内容:

Part トピック キーポイント
I 逐次処理 基本概念、パフォーマンス測定
II プロセス/スレッド GIL、スレッドプール
III マルチタスキング タイムシェアリング
IV 並列パターン Fork/Join、パイプライン
V 同期 Lock、Semaphore、デッドロック
VI ノンブロッキング I/O イベントループ、Reactor
VII 非同期 asyncio、コルーチン
VIII 分散処理 MapReduce

参考コード


参考資料