Part VII: 非同期プログラミング¶
7.1 はじめに¶
Part VI でノンブロッキング I/O を学びました。本章では、非同期プログラミングの中核となる Future/Promise、コルーチン、async/await の概念を 8 つの言語で比較します。
なぜ非同期か¶
スレッドベースの並行処理はメモリと生成コストが高く、数千の同時接続を扱うには非効率です。非同期プログラミングは、少数のスレッドで多数の I/O 操作を効率的に多重化します。
7.2 共通の本質¶
非同期処理の 3 つの構成要素¶
| 構成要素 | 説明 | 例 |
|---|---|---|
| Future/Promise | 将来の計算結果を表す型 | Future<T>, Task<T>, Promise |
| コルーチン | 中断・再開可能な関数 | async def, async fn, go ブロック |
| イベントループ | 完了した I/O を検出し、対応するコルーチンを再開する | asyncio, tokio, GHC RTS |
async/await のセマンティクス¶
async function fetch():
result = await networkCall() ← ここで中断、スレッドを解放
process(result) ← ネットワーク完了後に再開
await は「ここで結果を待つが、スレッドをブロックしない」という宣言です。
7.3 言語別実装比較¶
async/await のアプローチ¶
8 言語の非同期プログラミングは、大きく 4 つのモデルに分類できます:
| モデル | 言語 | 特徴 |
|---|---|---|
| 言語組み込み async/await | Python, C#, Rust | コンパイラ/インタプリタがステートマシンを生成 |
| モナド的合成 | Scala, F#, Haskell | for 内包表記、計算式、do 記法 |
| Virtual Thread | Java | JVM レベルの軽量スレッド |
| CSP (チャネル) | Clojure | go ブロック + チャネル通信 |
Future/Promise の型表現¶
関数型ファースト言語¶
Haskell 実装(async ライブラリ)
import Control.Concurrent.Async
-- 逐次実行
sequential :: IO (Int, Int)
sequential = do
a <- computeA
b <- computeB
return (a, b)
-- 並列実行
parallel :: IO (Int, Int)
parallel = concurrently computeA computeB
-- レース(最初に完了した方を返す)
raceExample :: IO (Either String Int)
raceExample = race fetchFromServerA fetchFromServerB
-- リソース安全な非同期
withAsync longRunningTask $ \handle -> do
result <- wait handle
processResult result
Clojure 実装(core.async)
(require '[clojure.core.async :as async :refer [chan go >! <! >!! <!! alt! timeout]])
;; チャネル作成と送受信
(def ch (chan 10))
(go (>! ch "hello")) ;; 非ブロッキング送信(go ブロック内)
(go (println (<! ch))) ;; 非ブロッキング受信(go ブロック内)
;; タイムアウト
(go
(alt!
ch ([v] (println "受信:" v))
(timeout 1000) (println "タイムアウト")))
;; パイプライン並列処理
(let [in (chan 10)
out (chan 10)]
(async/pipeline 4 out (map inc) in)
(async/onto-chan! in [1 2 3 4 5])
(<!! (async/into [] out)))
;; => [2 3 4 5 6]
マルチパラダイム言語¶
Rust 実装(async/await + tokio)
// async 関数定義
async fn compute() -> i32 {
42
}
// 逐次実行
async fn sequential() -> i32 {
let a = compute_a().await;
let b = compute_b().await;
a + b
}
// 並列実行
async fn parallel() -> (i32, i32, i32) {
tokio::join!(compute_a(), compute_b(), compute_c())
}
// タイムアウト
async fn with_timeout() {
match timeout(Duration::from_secs(5), long_task()).await {
Ok(value) => println!("Got: {:?}", value),
Err(_) => println!("Timed out"),
}
}
// 選択的実行
tokio::select! {
result = task_a() => println!("A: {:?}", result),
result = task_b() => println!("B: {:?}", result),
}
Scala 実装(Future + Promise)
import scala.concurrent.{Future, Promise, Await}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
// Future 合成
val f1: Future[Int] = Future(10)
val f2: Future[Int] = Future(20)
// for 内包表記(モナド的合成)
val sum: Future[Int] = for
a <- f1
b <- f2
yield a + b
// 並列実行
val results: Future[List[String]] = Future.sequence(futures)
// Promise(書き込み側)
val promise = Promise[Int]()
val future = promise.future
promise.success(42)
// エラーハンドリング
val recovered = future.recover {
case _: RuntimeException => 0
}
F# 実装(Async ワークフロー)
// Async 計算式
let computation : Async<int> =
async {
printfn "Starting..."
do! Async.Sleep(1000)
printfn "Done!"
return 42
}
// 逐次実行
let sequential =
async {
let! a = computeA()
let! b = computeB()
return a + b
}
// 並列実行
let parallel =
[computeA(); computeB(); computeC()]
|> Async.Parallel
// キャンセル
let cancellable (ct: CancellationToken) =
async {
for i in 1..100 do
ct.ThrowIfCancellationRequested()
do! Async.Sleep(100)
}
// Task との相互運用
let asyncFromTask = someTask |> Async.AwaitTask
let taskFromAsync = someAsync |> Async.StartAsTask
OOP + 並行処理ライブラリ言語¶
Java 実装(CompletableFuture + Virtual Thread)
// CompletableFuture チェーン
CompletableFuture.supplyAsync(() -> fetchData())
.thenApply(data -> process(data))
.thenAccept(result -> save(result));
// 並列実行
CompletableFuture.allOf(future1, future2).join();
// Virtual Thread(Java 21+)
Thread.startVirtualThread(() -> {
// I/O バウンドの処理
});
var executor = Executors.newVirtualThreadPerTaskExecutor();
C# 実装(async/await)
// async/await(言語機能)
async Task<string> FetchDataAsync() {
var data = await httpClient.GetStringAsync(url);
return Process(data);
}
// 並列実行
await Task.WhenAll(task1, task2, task3);
// 最初の完了
var completed = await Task.WhenAny(task1, task2);
// ValueTask(最適化版)
public ValueTask<int> GetCachedValueAsync() {
if (_cache.TryGetValue(key, out var value))
return new ValueTask<int>(value);
return new ValueTask<int>(FetchFromDatabaseAsync());
}
Python 実装(asyncio)
import asyncio
# コルーチン定義
async def fetch_data(url: str) -> str:
await asyncio.sleep(1) # ノンブロッキング遅延
return f"Data from {url}"
# 並列実行
async def main():
results = await asyncio.gather(
fetch_data("url1"),
fetch_data("url2"),
fetch_data("url3"),
)
print(results)
asyncio.run(main())
7.4 比較分析¶
非同期モデルの分類¶
| モデル | 言語 | 内部実装 | メモリ効率 |
|---|---|---|---|
| スタックレスコルーチン | Rust, Python, C# | コンパイラ生成ステートマシン | 最高 |
| スタックフルコルーチン | Java (Virtual Thread) | JVM 管理の仮想スタック | 高い |
| 計算式/モナド | Scala, F#, Haskell | 関数的合成 | 高い |
| CSP チャネル | Clojure | go ブロック + チャネル | 中程度 |
並列合成の構文¶
| 操作 | Python | Java | C# | Scala | F# | Rust | Haskell | Clojure |
|---|---|---|---|---|---|---|---|---|
| 全並列 | gather() |
allOf() |
WhenAll() |
sequence() |
Parallel |
join! |
concurrently |
alt! |
| 最初の完了 | wait(FIRST_COMPLETED) |
anyOf() |
WhenAny() |
firstCompletedOf |
— | select! |
race |
alt! |
| タイムアウト | wait_for() |
orTimeout() |
タスク+タイマー | Await.result(_, duration) |
Async.Sleep |
timeout() |
timeout |
(timeout ms) |
| エラー回復 | try/except |
exceptionally() |
try/catch |
recover |
try/with |
? 演算子 |
try |
try/catch |
コルーチンの実装方式¶
コンパイラ生成 ┌──────────────────────────────┐
ステートマシン │ Rust async fn │ ← ゼロコスト
│ C# async Task │
│ Python async def │
├──────────────────────────────┤
JVM 管理 │ Java Virtual Thread │ ← スタックフル
├──────────────────────────────┤
関数的合成 │ Scala Future + for │ ← モナド的
│ F# async { let! } │
│ Haskell IO + async │
├──────────────────────────────┤
CSP │ Clojure go ブロック │ ← チャネルベース
└──────────────────────────────┘
7.5 実践的な選択指針¶
I/O バウンド処理に適した言語¶
最も適している:
- Rust (tokio) — ゼロコスト async/await。メモリ効率が最高
- C# — 言語レベルの async/await。.NET エコシステムとの統合
- Java (Virtual Thread) — 既存のブロッキング API をそのまま軽量化
表現力が高い:
- Scala —
for内包表記で Future を自然に合成 - F# —
async { }計算式で宣言的に記述 - Haskell —
asyncライブラリで安全な並行処理
プロトタイピング:
- Python —
asyncioで素早く非同期サーバーを構築 - Clojure — CSP モデルでチャネルベースの設計
CPU バウンド vs I/O バウンドの使い分け¶
| 処理タイプ | 推奨アプローチ | 言語例 |
|---|---|---|
| I/O バウンド | async/await, コルーチン | Python asyncio, Rust tokio, C# Task |
| CPU バウンド | スレッドプール, 並列イテレータ | Rayon, parallelStream, multiprocessing |
| 混合 | async + スレッドプール | tokio::spawn_blocking, Task.Run |
7.6 まとめ¶
言語横断的な学び¶
- async/await は普遍的なパターン — 構文は異なるが、セマンティクス(中断・再開)は同一
- ゼロコスト vs 便利さのトレードオフ — Rust のゼロコスト Future vs Python の手軽さ
- モナド的合成は FP 言語の強み —
for/async { }/do記法で自然に合成 - CSP は独自のモデル — Clojure の
goブロック + チャネルは他と根本的に異なる - Virtual Thread は革命的 — Java 21 で従来のブロッキング API が軽量化
各言語の個別記事¶
| 言語 | 個別記事 |
|---|---|
| Python | Part VII - 非同期プログラミング |
| Java | Part VII - 非同期プログラミング |
| C# | Part VII - 非同期プログラミング |
| Scala | Part VII - 非同期プログラミング |
| F# | Part VII - 非同期プログラミング |
| Rust | Part VII - 非同期プログラミング |
| Haskell | Part VII - 非同期プログラミング |
| Clojure | Part VII - 非同期プログラミング |