Part V: 並行処理¶
本章では、関数型プログラミングにおける並行処理を学びます。asyncio を使った非同期処理、Ref(アトミック参照)による安全な共有状態管理、そして Fiber(軽量タスク)によるバックグラウンド処理を習得します。
第10章: 並行・並列処理¶
10.1 並行処理の課題¶
従来の並行処理には多くの課題があります:
- デッドロック
- 競合状態(Race Condition)
- 共有状態の管理の複雑さ
- スレッドのオーバーヘッド
関数型プログラミングでは、これらの問題を以下のアプローチで解決します:
- イミュータブルデータ: 共有状態の変更による競合を防ぐ
- Ref(アトミック参照): スレッドセーフな状態管理
- asyncio: 軽量な非同期処理
- 宣言的な並行処理: 副作用を明示的に管理
10.2 Ref - アトミックな共有状態¶
Ref は、複数の並行処理から安全にアクセスできるアトミックな参照です。Python では threading.Lock を使って実装します。
同期版 Ref¶
import threading
from typing import Generic, TypeVar, Callable
T = TypeVar("T")
U = TypeVar("U")
class Ref(Generic[T]):
"""スレッドセーフなアトミック参照。"""
def __init__(self, initial: T) -> None:
self._value = initial
self._lock = threading.Lock()
def get(self) -> T:
"""現在の値を取得する。"""
with self._lock:
return self._value
def set(self, value: T) -> None:
"""値を設定する。"""
with self._lock:
self._value = value
def update(self, f: Callable[[T], T]) -> None:
"""アトミックに値を更新する。"""
with self._lock:
self._value = f(self._value)
def modify(self, f: Callable[[T], tuple[T, U]]) -> U:
"""アトミックに値を更新し、結果を返す。"""
with self._lock:
new_value, result = f(self._value)
self._value = new_value
return result
# 使用例
ref = Ref(0)
ref.update(lambda x: x + 1)
ref.update(lambda x: x + 1)
ref.update(lambda x: x + 1)
print(ref.get()) # 3
非同期版 AsyncRef¶
asyncio と組み合わせる場合は asyncio.Lock を使用します:
import asyncio
class AsyncRef(Generic[T]):
"""非同期コンテキスト用のアトミック参照。"""
def __init__(self, initial: T) -> None:
self._value = initial
self._lock = asyncio.Lock()
async def get(self) -> T:
async with self._lock:
return self._value
async def set(self, value: T) -> None:
async with self._lock:
self._value = value
async def update(self, f: Callable[[T], T]) -> None:
async with self._lock:
self._value = f(self._value)
# 使用例
async def example():
ref = AsyncRef(0)
await ref.update(lambda x: x + 1)
return await ref.get()
asyncio.run(example()) # 1
10.3 チェックインのリアルタイム集計¶
都市へのチェックインをリアルタイムで集計し、ランキングを更新する例を見ていきます。
from dataclasses import dataclass
@dataclass(frozen=True)
class City:
"""都市を表すデータクラス。"""
name: str
@dataclass(frozen=True)
class CityStats:
"""都市のチェックイン統計。"""
city: City
check_ins: int
トップ都市の計算(純粋関数)¶
def top_cities(city_check_ins: dict[City, int], n: int = 3) -> list[CityStats]:
"""チェックイン数上位の都市を取得する。"""
stats = [CityStats(city, count) for city, count in city_check_ins.items()]
return sorted(stats, key=lambda s: s.check_ins, reverse=True)[:n]
cities = {City("Tokyo"): 100, City("Osaka"): 50, City("Kyoto"): 75}
result = top_cities(cities, 2)
# [CityStats(city=City(name='Tokyo'), check_ins=100),
# CityStats(city=City(name='Kyoto'), check_ins=75)]
チェックインの更新(純粋関数)¶
def update_check_ins(
city_check_ins: dict[City, int], city: City
) -> dict[City, int]:
"""チェックインを追加する(新しい辞書を返す)。"""
new_check_ins = dict(city_check_ins)
new_check_ins[city] = new_check_ins.get(city, 0) + 1
return new_check_ins
10.4 par_sequence - 並列実行¶
asyncio.gather を使用して複数の非同期タスクを並列実行します。
from typing import Any, Coroutine
async def par_sequence(
tasks: list[Coroutine[Any, Any, T]],
) -> list[T]:
"""複数の非同期タスクを並列実行する。"""
return list(await asyncio.gather(*tasks))
async def example():
async def task(n: int) -> int:
await asyncio.sleep(0.1) # シミュレーション
return n * 2
# 並列実行: 約0.1秒(順次実行なら約0.3秒)
result = await par_sequence([task(1), task(2), task(3)])
return result # [2, 4, 6]
par_traverse - リストへの並列適用¶
async def par_traverse(
items: list[T],
f: Callable[[T], Coroutine[Any, Any, U]],
) -> list[U]:
"""リストの各要素に非同期関数を並列適用する。"""
return await par_sequence([f(item) for item in items])
async def example():
async def fetch_data(id: int) -> str:
await asyncio.sleep(0.1)
return f"data_{id}"
# 並列にデータを取得
results = await par_traverse([1, 2, 3, 4, 5], fetch_data)
# ["data_1", "data_2", "data_3", "data_4", "data_5"]
10.5 サイコロを並行して振る¶
import random
async def cast_the_die() -> int:
"""非同期でサイコロを振る。"""
await asyncio.sleep(0) # yield control
return random.randint(1, 6)
async def cast_the_die_twice() -> int:
"""2つのサイコロを並行して振り、合計を返す。"""
results = await par_sequence([cast_the_die(), cast_the_die()])
return sum(results)
async def cast_dice_parallel(n: int) -> list[int]:
"""n個のサイコロを並行して振る。"""
return await par_sequence([cast_the_die() for _ in range(n)])
10.6 Ref と par_sequence の組み合わせ¶
複数の非同期タスクから共有状態を安全に更新します:
async def increment_three_times() -> int:
"""カウンターを3回インクリメントする。"""
counter = AsyncRef(0)
await counter.update(lambda x: x + 1)
await counter.update(lambda x: x + 1)
await counter.update(lambda x: x + 1)
return await counter.get() # 3
偶数をカウントする例¶
async def count_evens(tasks: list[Coroutine[Any, Any, int]]) -> int:
"""複数の非同期タスクを並列実行し、偶数の結果をカウントする。"""
counter = AsyncRef(0)
async def count_if_even(task: Coroutine[Any, Any, int]) -> None:
result = await task
if result % 2 == 0:
await counter.update(lambda x: x + 1)
await par_sequence([count_if_even(t) for t in tasks])
return await counter.get()
10.7 チェックイン処理の並行版¶
async def store_check_in(
stored_check_ins: AsyncRef[dict[City, int]], city: City
) -> None:
"""チェックインを保存する。"""
await stored_check_ins.update(lambda m: update_check_ins(m, city))
async def update_ranking(
stored_check_ins: AsyncRef[dict[City, int]],
stored_ranking: AsyncRef[list[CityStats]],
) -> None:
"""ランキングを更新する。"""
check_ins = await stored_check_ins.get()
await stored_ranking.set(top_cities(check_ins))
10.8 Fiber - 軽量タスク¶
Fiber は asyncio.Task のラッパーで、キャンセル可能なバックグラウンドタスクを表現します。
from dataclasses import dataclass
@dataclass
class Fiber(Generic[T]):
"""軽量タスク(asyncio.Task のラッパー)。"""
_task: asyncio.Task[T]
def cancel(self) -> None:
"""タスクをキャンセルする。"""
self._task.cancel()
async def join(self) -> T:
"""タスクの完了を待つ。"""
return await self._task
@property
def is_done(self) -> bool:
"""タスクが完了しているか。"""
return self._task.done()
def start_fiber(coro: Coroutine[Any, Any, T]) -> Fiber[T]:
"""コルーチンを Fiber として起動する。"""
task = asyncio.create_task(coro)
return Fiber(task)
async def example():
async def background_task():
for i in range(10):
await asyncio.sleep(0.1)
print(f"Working... {i}")
fiber = start_fiber(background_task())
# 他の処理を行う
await asyncio.sleep(0.35)
# バックグラウンドタスクをキャンセル
fiber.cancel()
10.9 タイムアウト付き収集¶
指定時間内にデータを収集し、タイムアウト後に結果を返す例:
async def collect_for(
duration_seconds: float, interval_seconds: float = 0.1
) -> list[int]:
"""指定時間、定期的に乱数を収集する。"""
collected: AsyncRef[list[int]] = AsyncRef([])
async def producer() -> None:
while True:
await asyncio.sleep(interval_seconds)
value = random.randint(0, 99)
await collected.update(lambda lst: lst + [value])
fiber = start_fiber(producer())
await asyncio.sleep(duration_seconds)
fiber.cancel()
try:
await fiber.join()
except asyncio.CancelledError:
pass
return await collected.get()
10.10 バックグラウンド処理¶
呼び出し元に制御を返しつつ、バックグラウンドで処理を続ける設計:
@dataclass
class ProcessingCheckIns:
"""チェックイン処理の制御ハンドル。"""
current_ranking: Callable[[], Coroutine[Any, Any, list[CityStats]]]
stop: Callable[[], None]
async def process_check_ins_background(
check_ins: list[City],
) -> ProcessingCheckIns:
"""チェックイン処理をバックグラウンドで開始する。"""
stored_check_ins: AsyncRef[dict[City, int]] = AsyncRef({})
stored_ranking: AsyncRef[list[CityStats]] = AsyncRef([])
async def check_in_processor() -> None:
for city in check_ins:
await store_check_in(stored_check_ins, city)
await asyncio.sleep(0)
async def ranking_updater() -> None:
while True:
await update_ranking(stored_check_ins, stored_ranking)
await asyncio.sleep(0.01)
check_in_fiber = start_fiber(check_in_processor())
ranking_fiber = start_fiber(ranking_updater())
def stop() -> None:
check_in_fiber.cancel()
ranking_fiber.cancel()
async def get_ranking() -> list[CityStats]:
return await stored_ranking.get()
return ProcessingCheckIns(current_ranking=get_ranking, stop=stop)
# 使用例
async def main():
cities = [City("Tokyo"), City("Osaka"), City("Tokyo"), City("Kyoto")]
processing = await process_check_ins_background(cities)
await asyncio.sleep(0.1)
ranking = await processing.current_ranking()
print(ranking) # [CityStats(city=City(name='Tokyo'), check_ins=2), ...]
processing.stop()
10.11 順次 vs 並列の比較¶
async def sequential_sleep(count: int, seconds: float) -> float:
"""逐次的にスリープする。"""
for _ in range(count):
await asyncio.sleep(seconds)
return count * seconds
async def parallel_sleep(count: int, seconds: float) -> float:
"""並列にスリープする。"""
await par_sequence([asyncio.sleep(seconds) for _ in range(count)])
return seconds # 並列なので最長のタスク時間のみ
import time
# 順次実行: 約0.3秒
start = time.time()
asyncio.run(sequential_sleep(3, 0.1))
print(f"Sequential: {time.time() - start:.2f}s") # ~0.30s
# 並列実行: 約0.1秒
start = time.time()
asyncio.run(parallel_sleep(3, 0.1))
print(f"Parallel: {time.time() - start:.2f}s") # ~0.10s
10.12 スレッドベースの並列処理¶
CPU バウンドな処理にはスレッドを使用します:
import threading
def run_in_threads(tasks: list[Callable[[], T]]) -> list[T]:
"""複数のタスクをスレッドで並列実行する。"""
results: list[T] = []
lock = threading.Lock()
def run_task(task: Callable[[], T]) -> None:
result = task()
with lock:
results.append(result)
threads = [threading.Thread(target=run_task, args=(task,)) for task in tasks]
for t in threads:
t.start()
for t in threads:
t.join()
return results
def parallel_map(items: list[T], f: Callable[[T], U]) -> list[U]:
"""リストの各要素に関数を並列適用する。"""
results: list[tuple[int, U]] = []
lock = threading.Lock()
def process(index: int, item: T) -> None:
result = f(item)
with lock:
results.append((index, result))
threads = [
threading.Thread(target=process, args=(i, item))
for i, item in enumerate(items)
]
for t in threads:
t.start()
for t in threads:
t.join()
return [r for _, r in sorted(results)]
# 使用例
result = parallel_map([1, 2, 3, 4, 5], lambda x: x * x)
# [1, 4, 9, 16, 25]
まとめ¶
Part V で学んだこと¶
| コンポーネント | 用途 |
|---|---|
Ref |
スレッドセーフな共有状態(同期版) |
AsyncRef |
非同期コンテキストでの共有状態 |
par_sequence |
非同期タスクの並列実行 |
par_traverse |
リストへの並列関数適用 |
Fiber |
キャンセル可能なバックグラウンドタスク |
start_fiber |
Fiber の起動 |
asyncio.sleep |
非同期スリープ(スレッドをブロックしない) |
Python と Scala の対応¶
| Scala | Python |
|---|---|
Ref[IO, A] |
AsyncRef[T] |
parSequence |
par_sequence / asyncio.gather |
Fiber |
Fiber (asyncio.Task のラッパー) |
fiber.start |
start_fiber() |
fiber.cancel |
fiber.cancel() |
IO.sleep |
asyncio.sleep |
foreverM |
while True: ... |
キーポイント¶
- Ref/AsyncRef: 複数の並行処理から安全にアクセスできるアトミックな参照
- par_sequence: 非同期タスクのリストを並列実行して結果を集約
- Fiber: asyncio.Task のラッパーで、キャンセル可能なバックグラウンド処理
- start_fiber: Fiber をバックグラウンドで起動し、すぐに制御を返す
- asyncio.sleep: Fiber をスリープさせ、スレッドは解放する
設計パターン¶
パターン1: 並列集約¶
async def aggregate_data(sources: list[str]) -> list[Data]:
return await par_traverse(sources, fetch_from_source)
パターン2: 共有状態の更新¶
async def process_items(items: list[Item]) -> int:
counter = AsyncRef(0)
async def process(item: Item) -> None:
if is_valid(item):
await counter.update(lambda x: x + 1)
await par_sequence([process(item) for item in items])
return await counter.get()
パターン3: バックグラウンド処理¶
async def start_background_service() -> ServiceHandle:
state = AsyncRef(initial_state())
async def worker():
while True:
await do_work(state)
await asyncio.sleep(interval)
fiber = start_fiber(worker())
return ServiceHandle(
get_state=state.get,
stop=fiber.cancel
)
演習問題¶
問題 1: Ref の基本¶
カウンターを 0 から始めて、3回インクリメントした結果を返す関数を実装してください。
async def increment_three_times() -> int:
...
# 期待される動作
asyncio.run(increment_three_times()) # 3
解答
async def increment_three_times() -> int:
counter = AsyncRef(0)
await counter.update(lambda x: x + 1)
await counter.update(lambda x: x + 1)
await counter.update(lambda x: x + 1)
return await counter.get()
問題 2: 並列実行¶
3つの非同期タスクを並列実行し、結果の合計を返す関数を実装してください。
async def sum_parallel(
task1: Coroutine[Any, Any, int],
task2: Coroutine[Any, Any, int],
task3: Coroutine[Any, Any, int],
) -> int:
...
# 期待される動作
async def pure(n):
return n
asyncio.run(sum_parallel(pure(1), pure(2), pure(3))) # 6
解答
async def sum_parallel(
task1: Coroutine[Any, Any, int],
task2: Coroutine[Any, Any, int],
task3: Coroutine[Any, Any, int],
) -> int:
results = await par_sequence([task1, task2, task3])
return sum(results)
問題 3: 並行カウント¶
複数の非同期タスクを並行実行し、偶数を返した回数をカウントする関数を実装してください。
async def count_evens(tasks: list[Coroutine[Any, Any, int]]) -> int:
...
# 使用例
async def pure(n):
return n
tasks = [pure(1), pure(2), pure(3), pure(4)]
asyncio.run(count_evens(tasks)) # 2
解答
async def count_evens(tasks: list[Coroutine[Any, Any, int]]) -> int:
counter = AsyncRef(0)
async def count_if_even(task: Coroutine[Any, Any, int]) -> None:
result = await task
if result % 2 == 0:
await counter.update(lambda x: x + 1)
await par_sequence([count_if_even(t) for t in tasks])
return await counter.get()
問題 4: タイムアウト付き実行¶
指定時間後に Fiber をキャンセルし、それまでに蓄積された結果を返す関数を実装してください。
async def collect_for(duration: float) -> list[int]:
...
# 期待される動作
# 0.5秒間、100msごとに乱数を生成してリストに追加
# 約5個の要素が返される
asyncio.run(collect_for(0.5))
解答
async def collect_for(duration: float) -> list[int]:
collected: AsyncRef[list[int]] = AsyncRef([])
async def producer() -> None:
while True:
await asyncio.sleep(0.1)
value = random.randint(0, 99)
await collected.update(lambda lst: lst + [value])
fiber = start_fiber(producer())
await asyncio.sleep(duration)
fiber.cancel()
try:
await fiber.join()
except asyncio.CancelledError:
pass
return await collected.get()
問題 5: 並行マップ更新¶
複数の更新を並行して Map に適用し、最終的な Map を返す関数を実装してください。
@dataclass(frozen=True)
class Update:
key: str
value: int
async def apply_updates(updates: list[Update]) -> dict[str, int]:
...
# 期待される動作
updates = [
Update("a", 1),
Update("b", 2),
Update("a", 3), # "a" を上書き
Update("c", 4)
]
asyncio.run(apply_updates(updates)) # {"a": 3, "b": 2, "c": 4} (順序不定)
解答
async def apply_updates(updates: list[Update]) -> dict[str, int]:
map_ref: AsyncRef[dict[str, int]] = AsyncRef({})
async def apply_update(update: Update) -> None:
await map_ref.update(
lambda m: {**m, update.key: update.value}
)
await par_sequence([apply_update(u) for u in updates])
return await map_ref.get()