Chapter 10: 並行処理パターン¶
Elixir における並行処理パターンを学びます。アクターモデルと関数型プログラミングを組み合わせた安全な並行処理を実装します。
概要¶
Elixir は Erlang VM 上で動作し、軽量プロセスによる並行処理をサポートします。このチャプターでは、プロセス、Agent、GenServer、Task を使った並行処理パターンを紹介します。
主なトピック¶
- プロセスとメッセージパッシング
- Agent(状態管理)
- GenServer(汎用サーバー)
- Task(非同期処理)
- 並列処理パターン
- 同期パターン
基本的なプロセス¶
defmodule BasicProcess do
def start_echo do
spawn(fn -> echo_loop() end)
end
defp echo_loop do
receive do
{:echo, from, message} ->
send(from, {:echoed, message})
echo_loop()
:stop -> :ok
end
end
end
# 使用例
pid = BasicProcess.start_echo()
send(pid, {:echo, self(), "hello"})
receive do
{:echoed, msg} -> msg # => "hello"
end
Agent(状態管理)¶
defmodule Counter do
def start(initial_value \\ 0) do
Agent.start_link(fn -> initial_value end)
end
def get(counter), do: Agent.get(counter, & &1)
def increment(counter), do: Agent.update(counter, &(&1 + 1))
def decrement(counter), do: Agent.update(counter, &(&1 - 1))
def add(counter, value), do: Agent.update(counter, &(&1 + value))
def reset(counter), do: Agent.update(counter, fn _ -> 0 end)
end
# 並行アクセスでも安全
{:ok, counter} = Counter.start(0)
tasks = for _ <- 1..100 do
Task.async(fn -> Counter.increment(counter) end)
end
Enum.each(tasks, &Task.await/1)
Counter.get(counter) # => 100
GenServer¶
defmodule BankAccount do
use GenServer
# Client API
def open(account_id, initial_balance \\ 0) do
GenServer.start_link(__MODULE__, %{id: account_id, balance: initial_balance})
end
def balance(account), do: GenServer.call(account, :balance)
def deposit(account, amount), do: GenServer.call(account, {:deposit, amount})
def withdraw(account, amount), do: GenServer.call(account, {:withdraw, amount})
# Server Callbacks
@impl true
def init(state), do: {:ok, state}
@impl true
def handle_call(:balance, _from, state) do
{:reply, state.balance, state}
end
@impl true
def handle_call({:deposit, amount}, _from, state) when amount > 0 do
new_balance = state.balance + amount
{:reply, {:ok, new_balance}, %{state | balance: new_balance}}
end
@impl true
def handle_call({:withdraw, amount}, _from, state) do
if state.balance >= amount do
new_balance = state.balance - amount
{:reply, {:ok, new_balance}, %{state | balance: new_balance}}
else
{:reply, {:error, "残高不足"}, state}
end
end
end
Task(非同期処理)¶
defmodule AsyncTasks do
def parallel_execute(functions) do
functions
|> Enum.map(&Task.async/1)
|> Enum.map(&Task.await/1)
end
def parallel_map(list, fun) do
list
|> Enum.map(fn item -> Task.async(fn -> fun.(item) end) end)
|> Enum.map(&Task.await/1)
end
def with_timeout(fun, timeout_ms) do
task = Task.async(fun)
case Task.yield(task, timeout_ms) || Task.shutdown(task) do
{:ok, result} -> {:ok, result}
nil -> {:error, :timeout}
end
end
def race(functions) do
parent = self()
pids = Enum.map(functions, fn fun ->
spawn(fn ->
result = fun.()
send(parent, {:result, self(), result})
end)
end)
receive do
{:result, _pid, result} ->
Enum.each(pids, fn pid ->
if Process.alive?(pid), do: Process.exit(pid, :kill)
end)
result
end
end
end
並列処理パターン¶
Map-Reduce¶
def map_reduce(data, mapper, reducer) do
data
|> Task.async_stream(mapper, ordered: false)
|> Enum.map(fn {:ok, result} -> result end)
|> reducer.()
end
# 使用例
data = [1, 2, 3, 4, 5]
map_reduce(data, &(&1 * 2), &Enum.sum/1) # => 30
ファンアウト・ファンイン¶
def fan_out_fan_in(data, workers) do
workers
|> Enum.map(fn worker -> Task.async(fn -> worker.(data) end) end)
|> Enum.map(&Task.await/1)
end
バッチ処理¶
def batch_process(data, batch_size, processor) do
data
|> Enum.chunk_every(batch_size)
|> Task.async_stream(processor)
|> Enum.map(fn {:ok, result} -> result end)
end
同期パターン¶
セマフォ¶
def semaphore(max_permits) do
Agent.start_link(fn -> max_permits end)
end
def acquire(sem) do
case Agent.get_and_update(sem, fn
0 -> {0, 0}
n -> {n, n - 1}
end) do
0 -> Process.sleep(10); acquire(sem)
_ -> :ok
end
end
def release(sem) do
Agent.update(sem, &(&1 + 1))
end
リソースプール¶
def resource_pool(resources) do
Agent.start_link(fn -> resources end)
end
def borrow(pool) do
Agent.get_and_update(pool, fn
[] -> {{:error, :empty}, []}
[head | tail] -> {{:ok, head}, tail}
end)
end
def return(pool, resource) do
Agent.update(pool, fn resources -> [resource | resources] end)
end
イベントバス¶
defmodule EventBus do
def start, do: Agent.start_link(fn -> %{} end)
def subscribe(bus, event_type, subscriber) do
Agent.update(bus, fn subscriptions ->
current = Map.get(subscriptions, event_type, [])
Map.put(subscriptions, event_type, [subscriber | current])
end)
end
def publish(bus, event_type, payload) do
subscribers = Agent.get(bus, fn subs -> Map.get(subs, event_type, []) end)
Enum.each(subscribers, fn sub -> send(sub, {:event, event_type, payload}) end)
:ok
end
end
まとめ¶
- プロセスで軽量な並行処理
- Agentで状態を安全に管理
- GenServerでステートフルなサーバーを実装
- Taskで非同期処理を簡潔に
- 並列パターン(Map-Reduce、ファンアウト・ファンイン)で効率的な処理
- 同期パターン(セマフォ、リソースプール)で競合を制御