Rust で学ぶ関数型プログラミング Part V: 並行処理¶
はじめに¶
Part IV では async/await とストリーム処理を学びました。Part V では、関数型プログラミングにおける並行処理を扱います。
Scala では Ref(アトミック参照)と Fiber(軽量スレッド)を使いますが、Rust では Arc、Mutex/RwLock、tokio タスク、そしてチャネルを使って同様の概念を実現します。
第10章: 並行・並列処理¶
10.1 共有状態の基本 - Arc と Mutex¶
Rust の所有権システムでは、複数のスレッドからデータを共有するために Arc(Atomic Reference Counting)と Mutex を組み合わせます。
use std::sync::Arc;
use tokio::sync::Mutex;
/// アトミックカウンター
#[derive(Clone)]
pub struct AtomicCounter {
value: Arc<Mutex<i32>>,
}
impl AtomicCounter {
pub fn new(initial: i32) -> Self {
Self {
value: Arc::new(Mutex::new(initial)),
}
}
pub async fn get(&self) -> i32 {
*self.value.lock().await
}
pub async fn increment(&self) {
let mut guard = self.value.lock().await;
*guard += 1;
}
pub async fn update<F>(&self, f: F)
where
F: FnOnce(i32) -> i32,
{
let mut guard = self.value.lock().await;
*guard = f(*guard);
}
}
Scala の Ref と比較すると:
| Scala | Rust |
|---|---|
Ref.of[IO, Int](0) |
Arc::new(Mutex::new(0)) |
ref.get |
arc.lock().await |
ref.update(_ + 1) |
*guard += 1 |
10.2 RwLock - 読み書きロック¶
複数の読み取りを同時に許可しつつ、書き込みは排他的に行う場合は RwLock を使います。
use tokio::sync::RwLock;
#[derive(Clone)]
pub struct SharedValue<T> {
value: Arc<RwLock<T>>,
}
impl<T: Clone> SharedValue<T> {
pub fn new(initial: T) -> Self {
Self {
value: Arc::new(RwLock::new(initial)),
}
}
pub async fn read(&self) -> T {
self.value.read().await.clone()
}
pub async fn write(&self, value: T) {
*self.value.write().await = value;
}
}
10.3 チェックインのリアルタイム集計¶
都市へのチェックインをリアルタイムで集計する例を実装します。
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct City {
pub name: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CityStats {
pub city: City,
pub check_ins: i32,
}
/// トップN都市を計算(純粋関数)
pub fn top_cities(city_check_ins: &HashMap<City, i32>, n: usize) -> Vec<CityStats> {
let mut stats: Vec<CityStats> = city_check_ins
.iter()
.map(|(city, &check_ins)| CityStats::new(city.clone(), check_ins))
.collect();
stats.sort_by(|a, b| b.check_ins.cmp(&a.check_ins));
stats.truncate(n);
stats
}
純粋関数として top_cities を実装することで、テストが容易になり、並行処理の複雑さから分離できます。
10.4 共有状態を使ったチェックイン処理¶
#[derive(Clone)]
pub struct CheckInStore {
check_ins: Arc<RwLock<HashMap<City, i32>>>,
}
impl CheckInStore {
pub fn new() -> Self {
Self {
check_ins: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn store_check_in(&self, city: City) {
let mut guard = self.check_ins.write().await;
*guard.entry(city).or_insert(0) += 1;
}
pub async fn get_top(&self, n: usize) -> Vec<CityStats> {
let check_ins = self.check_ins.read().await;
top_cities(&check_ins, n)
}
}
10.5 並列実行 - tokio::spawn と JoinHandle¶
複数のタスクを並列実行するには tokio::spawn を使います。
pub async fn run_parallel<T, U, F, Fut>(items: Vec<T>, f: F) -> Vec<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = U> + Send,
{
let f = Arc::new(f);
let handles: Vec<JoinHandle<U>> = items
.into_iter()
.map(|item| {
let f = Arc::clone(&f);
tokio::spawn(async move { f(item).await })
})
.collect();
let mut results = Vec::with_capacity(handles.len());
for handle in handles {
results.push(handle.await.unwrap());
}
results
}
Scala の parSequence に相当します:
| Scala | Rust |
|---|---|
List(io1, io2, io3).parSequence |
futures::future::join_all(...) |
List.fill(3)(io).parSequence |
tokio::spawn + join_all |
10.6 タスクのキャンセル¶
バックグラウンドタスクをキャンセルするには JoinHandle::abort() を使います。
pub struct BackgroundTask<T> {
handle: JoinHandle<T>,
}
impl<T> BackgroundTask<T> {
pub fn cancel(self) {
self.handle.abort();
}
pub async fn join(self) -> Result<T, tokio::task::JoinError> {
self.handle.await
}
}
/// バックグラウンドでカウントアップするタスクを開始
pub fn start_counter_task(
interval: std::time::Duration,
) -> (AtomicCounter, BackgroundTask<()>) {
let counter = AtomicCounter::new(0);
let counter_clone = counter.clone();
let handle = tokio::spawn(async move {
loop {
tokio::time::sleep(interval).await;
counter_clone.increment().await;
}
});
(counter, BackgroundTask { handle })
}
10.7 チェックイン処理システム¶
複数のタスクを並行実行して、チェックインの処理とランキングの更新を行います。
pub struct ProcessingCheckIns {
store: CheckInStore,
ranking: SharedValue<Vec<CityStats>>,
tasks: Vec<JoinHandle<()>>,
}
impl ProcessingCheckIns {
pub async fn current_ranking(&self) -> Vec<CityStats> {
self.ranking.read().await
}
pub fn stop(self) {
for task in self.tasks {
task.abort();
}
}
}
pub async fn process_check_ins(
cities: Vec<City>,
ranking_update_interval: std::time::Duration,
) -> ProcessingCheckIns {
let store = CheckInStore::new();
let ranking = SharedValue::new(Vec::new());
// チェックイン処理タスク
let store_clone = store.clone();
let check_in_task = tokio::spawn(async move {
for city in cities {
store_clone.store_check_in(city).await;
}
});
// ランキング更新タスク
let store_clone = store.clone();
let ranking_clone = ranking.clone();
let ranking_task = tokio::spawn(async move {
loop {
tokio::time::sleep(ranking_update_interval).await;
let top = store_clone.get_top(3).await;
ranking_clone.write(top).await;
}
});
ProcessingCheckIns {
store,
ranking,
tasks: vec![check_in_task, ranking_task],
}
}
第11章: チャネルと並行パターン¶
11.1 mpsc チャネル - 複数送信者・単一受信者¶
Go の goroutine と channel に似た概念で、並行処理間の通信を行います。
use tokio::sync::mpsc;
pub async fn basic_mpsc_example() -> Vec<i32> {
let (tx, mut rx) = mpsc::channel(10);
// 送信タスク
tokio::spawn(async move {
for i in 1..=3 {
tx.send(i).await.unwrap();
}
});
// 受信
let mut results = Vec::new();
while let Some(value) = rx.recv().await {
results.push(value);
}
results
}
11.2 oneshot チャネル - 一回限りの通信¶
結果を一度だけ返す場合に使います。
use tokio::sync::oneshot;
pub async fn compute_with_oneshot(input: i32) -> i32 {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let result = input * input;
tx.send(result).unwrap();
});
rx.await.unwrap()
}
/// 複数の計算を並列実行して結果を収集
pub async fn parallel_compute(inputs: Vec<i32>) -> Vec<i32> {
let mut receivers = Vec::with_capacity(inputs.len());
for input in inputs {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let result = input * input;
let _ = tx.send(result);
});
receivers.push(rx);
}
let mut results = Vec::with_capacity(receivers.len());
for rx in receivers {
results.push(rx.await.unwrap());
}
results
}
11.3 broadcast チャネル - 複数受信者¶
複数の受信者に同じメッセージを配信します。
use tokio::sync::broadcast;
pub async fn broadcast_example(receiver_count: usize) -> Vec<Vec<String>> {
let (tx, _) = broadcast::channel(16);
// 受信タスクを起動
let mut handles = Vec::new();
for _ in 0..receiver_count {
let mut rx = tx.subscribe();
handles.push(tokio::spawn(async move {
let mut received = Vec::new();
while let Ok(msg) = rx.recv().await {
received.push(msg);
}
received
}));
}
// メッセージを送信
tx.send("Hello".to_string()).unwrap();
tx.send("World".to_string()).unwrap();
drop(tx);
// 結果を収集
let mut results = Vec::new();
for handle in handles {
results.push(handle.await.unwrap());
}
results
}
11.4 ワーカープールパターン¶
固定数のワーカーで並列処理を行うパターンです。
pub async fn worker_pool<T, U, F>(items: Vec<T>, worker_count: usize, f: F) -> Vec<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + Sync + Clone + 'static,
{
let (work_tx, work_rx) = async_channel::bounded(items.len());
let (result_tx, mut result_rx) = mpsc::channel(items.len());
let work_rx = Arc::new(work_rx);
// ワーカーを起動
let mut handles = Vec::new();
for _ in 0..worker_count {
let work_rx = Arc::clone(&work_rx);
let result_tx = result_tx.clone();
let f = f.clone();
handles.push(tokio::spawn(async move {
while let Ok(item) = work_rx.recv().await {
let result = f(item);
let _ = result_tx.send(result).await;
}
}));
}
// 作業をキューに追加
for item in items {
work_tx.send(item).await.unwrap();
}
work_tx.close();
drop(result_tx);
// 結果を収集
let mut results = Vec::new();
while let Some(result) = result_rx.recv().await {
results.push(result);
}
results
}
11.5 パイプラインパターン¶
ストリーム処理をパイプライン形式で構築します。
pub struct Pipeline<T> {
receiver: mpsc::Receiver<T>,
}
impl<T: Send + 'static> Pipeline<T> {
pub fn from_iter<I>(iter: I) -> Self
where
I: IntoIterator<Item = T> + Send + 'static,
I::IntoIter: Send,
{
let (tx, rx) = mpsc::channel(100);
tokio::spawn(async move {
for item in iter {
if tx.send(item).await.is_err() { break; }
}
});
Pipeline { receiver: rx }
}
pub fn map<U, F>(self, f: F) -> Pipeline<U>
where
U: Send + 'static,
F: Fn(T) -> U + Send + 'static,
{
let (tx, rx) = mpsc::channel(100);
let mut receiver = self.receiver;
tokio::spawn(async move {
while let Some(item) = receiver.recv().await {
if tx.send(f(item)).await.is_err() { break; }
}
});
Pipeline { receiver: rx }
}
pub async fn collect(mut self) -> Vec<T> {
let mut results = Vec::new();
while let Some(item) = self.receiver.recv().await {
results.push(item);
}
results
}
}
// 使用例
let results = Pipeline::from_iter(1..=10)
.map(|x| x * 2)
.filter(|x| *x > 10)
.collect()
.await;
// [12, 14, 16, 18, 20]
11.6 アクターパターン¶
メッセージパッシングで状態を管理するアクターモデルを実装します。
pub enum CounterMessage {
Increment,
Decrement,
Get(oneshot::Sender<i32>),
Stop,
}
pub struct CounterActor {
sender: mpsc::Sender<CounterMessage>,
}
impl CounterActor {
pub fn new(initial: i32) -> Self {
let (tx, mut rx) = mpsc::channel::<CounterMessage>(32);
tokio::spawn(async move {
let mut value = initial;
while let Some(msg) = rx.recv().await {
match msg {
CounterMessage::Increment => value += 1,
CounterMessage::Decrement => value -= 1,
CounterMessage::Get(reply) => {
let _ = reply.send(value);
}
CounterMessage::Stop => break,
}
}
});
CounterActor { sender: tx }
}
pub async fn increment(&self) {
let _ = self.sender.send(CounterMessage::Increment).await;
}
pub async fn get(&self) -> i32 {
let (tx, rx) = oneshot::channel();
let _ = self.sender.send(CounterMessage::Get(tx)).await;
rx.await.unwrap_or(0)
}
}
11.7 銀行口座アクター¶
より実践的なアクターの例として、銀行口座を実装します。
pub enum BankMessage {
Deposit(i32),
Withdraw(i32, oneshot::Sender<Result<(), String>>),
GetBalance(oneshot::Sender<i32>),
Stop,
}
pub struct BankAccount {
sender: mpsc::Sender<BankMessage>,
}
impl BankAccount {
pub fn new(initial_balance: i32) -> Self {
let (tx, mut rx) = mpsc::channel::<BankMessage>(32);
tokio::spawn(async move {
let mut balance = initial_balance;
while let Some(msg) = rx.recv().await {
match msg {
BankMessage::Deposit(amount) => balance += amount,
BankMessage::Withdraw(amount, reply) => {
if balance >= amount {
balance -= amount;
let _ = reply.send(Ok(()));
} else {
let _ = reply.send(Err("Insufficient funds".to_string()));
}
}
BankMessage::GetBalance(reply) => { let _ = reply.send(balance); }
BankMessage::Stop => break,
}
}
});
BankAccount { sender: tx }
}
pub async fn withdraw(&self, amount: i32) -> Result<(), String> {
let (tx, rx) = oneshot::channel();
let _ = self.sender.send(BankMessage::Withdraw(amount, tx)).await;
rx.await.unwrap_or(Err("Actor not responding".to_string()))
}
}
11.8 イベントバス¶
Pub/Sub パターンでイベントを配信します。
#[derive(Debug, Clone)]
pub enum Event {
UserLoggedIn { user_id: String },
UserLoggedOut { user_id: String },
MessageSent { from: String, to: String, content: String },
}
pub struct EventBus {
sender: broadcast::Sender<Event>,
}
impl EventBus {
pub fn new() -> Self {
let (sender, _) = broadcast::channel(100);
EventBus { sender }
}
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.sender.subscribe()
}
pub fn publish(&self, event: Event) {
let _ = self.sender.send(event);
}
}
11.9 セマフォによる同時実行制限¶
同時実行数を制限する場合は Semaphore を使います。
use tokio::sync::Semaphore;
pub async fn rate_limited_process<T, U, F>(
items: Vec<T>,
max_concurrent: usize,
f: F,
) -> Vec<U>
where
T: Send + 'static,
U: Send + Clone + 'static,
F: Fn(T) -> U + Send + Sync + Clone + 'static,
{
let semaphore = Arc::new(Semaphore::new(max_concurrent));
let results = Arc::new(Mutex::new(Vec::with_capacity(items.len())));
let mut handles = Vec::new();
for item in items {
let semaphore = Arc::clone(&semaphore);
let results = Arc::clone(&results);
let f = f.clone();
handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
let result = f(item);
results.lock().await.push(result);
}));
}
for handle in handles {
let _ = handle.await;
}
// ...
}
Scala との対比¶
| 概念 | Scala (cats-effect) | Rust (tokio) |
|---|---|---|
| アトミック参照 | Ref[IO, A] |
Arc<Mutex<A>> / Arc<RwLock<A>> |
| 軽量スレッド | Fiber[IO, E, A] |
JoinHandle<A> |
| 並列実行 | parSequence |
join_all / tokio::spawn |
| 永続実行 | foreverM |
loop { ... } |
| キャンセル | fiber.cancel |
handle.abort() |
| チャネル | Queue[IO, A] |
mpsc::channel |
| 同時実行制限 | Semaphore |
tokio::sync::Semaphore |
まとめ¶
Part V では以下を学びました:
第10章: 並行・並列処理¶
- Arc と Mutex: 共有状態の安全な管理
- RwLock: 読み書きロックによる効率的な共有
- tokio::spawn: 並列タスクの起動
- JoinHandle: タスクの完了待機とキャンセル
- 純粋関数との分離: ビジネスロジックと並行処理の分離
第11章: チャネルと並行パターン¶
- mpsc: 複数送信者・単一受信者チャネル
- oneshot: 一回限りの結果返却
- broadcast: 複数受信者へのメッセージ配信
- ワーカープール: 固定数ワーカーによる並列処理
- パイプライン: ストリーム処理のパイプライン構築
- アクター: メッセージパッシングによる状態管理
- セマフォ: 同時実行数の制限
Rust の所有権システムと tokio の組み合わせは、コンパイル時にデータ競合を防ぎつつ、高性能な並行処理を実現します。
次のステップ¶
Part VI では、以下のトピックを扱う予定です:
- 実践的なアプリケーション構築
- トレイトによる抽象化
- テスト戦略
- 依存性注入
これらのパターンを学ぶことで、保守性の高い実際のアプリケーションを構築できるようになります。