Part VI: ノンブロッキング I/O¶
6.1 はじめに¶
Part V でロックベースの同期を学びましたが、ロックにはデッドロックやコンテキストスイッチのオーバーヘッドが伴います。本章では、スレッドをブロックせずに I/O を処理するノンブロッキング I/O と、それを効率的に管理するイベントループ・Reactor パターンを学びます。
なぜノンブロッキング I/O か¶
| モデル | 接続あたりのコスト | 10,000 接続時 | スケーラビリティ |
|---|---|---|---|
| スレッドパーコネクション | ~1MB(スタック) | ~10GB メモリ | 低い |
| ノンブロッキング I/O | ~KB(イベント登録) | ~数十 MB | 高い |
従来の「1 接続 = 1 スレッド」モデルでは、大量の同時接続を処理できません。ノンブロッキング I/O は、少数のスレッドで数千の接続を効率的に多重化します。
6.2 共通の本質¶
ブロッキング vs ノンブロッキング¶
ブロッキング I/O:
Thread → recv() → [ブロック...データ到着を待つ...] → データ処理
(スレッドは待機中も占有される)
ノンブロッキング I/O:
Thread → recv() → [データなし: すぐ返る] → 他の処理を実行
→ [データあり: すぐ返る] → データ処理
Reactor パターンの構造¶
┌─────────────────┐
Client 1 ─────────┤ ├──→ Handler A
Client 2 ─────────┤ Reactor ├──→ Handler B
Client 3 ─────────┤ (Selector) ├──→ Handler C
Client N ─────────┤ ├──→ Handler D
└─────────────────┘
↑
I/O Multiplexer
(select/poll/epoll)
3 つの構成要素:
- Reactor(ディスパッチャ) — イベントを検出し、対応するハンドラに振り分ける
- Handler(コールバック) — 特定のイベントに対する処理を実行
- Multiplexer(多重化) — OS レベルの I/O 監視(
select,poll,epoll,kqueue,IOCP)
イベントループのライフサイクル¶
while running:
1. I/O Multiplexer に登録されたソケットを監視
2. 準備完了のイベントを取得
3. 各イベントに対応するハンドラを呼び出す
4. ハンドラ完了後、1 に戻る
6.3 言語別実装比較¶
I/O 多重化のメカニズム¶
| 言語 | 低レベル API | 高レベル API | I/O モデル |
|---|---|---|---|
| Python | select / selectors |
asyncio |
Selector ベース |
| Java | Selector / Channel (NIO) |
CompletableFuture |
Channel + Buffer |
| C# | Socket.Select() |
async/await + Task |
IOCP ベース |
| Scala | Java NIO Selector |
Future |
JVM Channel |
| F# | .NET Socket |
async { } ワークフロー |
.NET 非同期 |
| Rust | mio (低レベル) |
tokio (高レベル) |
epoll/kqueue |
| Haskell | GHC I/O Manager | async ライブラリ |
Green Thread |
| Clojure | Java NIO | future / core.async |
JVM Channel |
Reactor パターンの実装¶
関数型ファースト言語¶
Haskell 実装(async ライブラリ)
import Control.Concurrent.Async
-- 非同期 I/O
handle <- async $ readFile "file.txt"
doSomethingElse
content <- wait handle
-- 複数の非同期タスクを並列実行
downloadAll :: [String] -> IO [String]
downloadAll urls = mapConcurrently fetchData urls
-- レース: 最初に完了した方を返す
result <- race fetchFromServerA fetchFromServerB
Clojure 実装(future + promise)
;; future: 自動的に非同期実行
(def result (future
(Thread/sleep 1000)
(+ 1 2)))
@result ;; => 3(ブロッキング取得)
;; タイムアウト付き
(deref result 500 :timeout)
;; promise: 明示的に値を配信
(def p (promise))
(future
(Thread/sleep 100)
(deliver p :done))
@p ;; => :done
;; 複数の非同期タスク
(defn fetch-all [urls]
(let [futures (doall (map #(future (fetch-url %)) urls))]
(map deref futures)))
マルチパラダイム言語¶
Rust 実装(tokio)
use tokio;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let result = fetch_data("url").await;
println!("{}", result);
}
async fn fetch_data(url: &str) -> String {
sleep(Duration::from_secs(1)).await;
format!("Data from {}", url)
}
// 複数タスクの並列実行
async fn parallel_fetch() -> (String, String, String) {
tokio::join!(
fetch_data("url1"),
fetch_data("url2"),
fetch_data("url3"),
)
}
Scala 実装(Java NIO ラッパー)
import java.nio.channels.{Selector, SocketChannel, SelectionKey}
val selector = Selector.open()
channel.configureBlocking(false)
channel.register(selector, SelectionKey.OP_READ)
while true do
selector.select()
val keys = selector.selectedKeys().iterator()
while keys.hasNext do
val key = keys.next()
keys.remove()
if key.isReadable then handleRead(key)
if key.isWritable then handleWrite(key)
F# 実装(Async ワークフロー)
// ノンブロッキング読み取り
let readFileAsync path : Async<string> =
async {
use reader = new StreamReader(path)
let! content = reader.ReadToEndAsync() |> Async.AwaitTask
return content
}
// 並列実行
let results =
[readFileAsync "a.txt"; readFileAsync "b.txt"; readFileAsync "c.txt"]
|> Async.Parallel
|> Async.RunSynchronously
OOP + 並行処理ライブラリ言語¶
Java 実装(NIO Selector)
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select(); // ブロック: イベントが来るまで待機
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) handleAccept(key);
if (key.isReadable()) handleRead(key);
if (key.isWritable()) handleWrite(key);
}
}
C# 実装(async/await)
// 言語レベルの非同期サポート
public async Task<string> FetchDataAsync(string url)
{
using var client = new HttpClient();
return await client.GetStringAsync(url);
}
// 複数の非同期タスクを並列実行
var tasks = urls.Select(url => FetchDataAsync(url));
var results = await Task.WhenAll(tasks);
Python 実装(Selector + Reactor)
import selectors
class PizzaReactor:
def __init__(self, port):
self.selector = selectors.DefaultSelector()
self.server_socket = socket.socket()
self.server_socket.setblocking(False)
self.server_socket.bind(('', port))
self.server_socket.listen()
self.selector.register(self.server_socket, selectors.EVENT_READ, self._on_accept)
def _on_accept(self, sock):
conn, addr = sock.accept()
conn.setblocking(False)
self.selector.register(conn, selectors.EVENT_READ, self._on_read)
def _on_read(self, conn):
data = conn.recv(1024)
if data:
self.selector.modify(conn, selectors.EVENT_WRITE,
lambda c: self._on_write(c, data.decode()))
def _on_write(self, conn, message):
conn.send(f"Thank you for {message}!\n".encode())
self.selector.modify(conn, selectors.EVENT_READ, self._on_read)
def run(self):
while True:
events = self.selector.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
6.4 比較分析¶
抽象度のスペクトラム¶
最高レベル ┌────────────────────────────────┐
│ Haskell: GHC が自動管理 │ ← ブロッキング API で OK
│ C#: async/await (言語機能) │
├────────────────────────────────┤
高レベル │ Rust: tokio async/await │
│ F#: async { } ワークフロー │
│ Clojure: future / core.async │
├────────────────────────────────┤
中レベル │ Java: NIO Selector + Channel │
│ Scala: Java NIO ラッパー │
├────────────────────────────────┤
低レベル │ Python: selectors + コールバック │ ← 手動 Reactor 実装
└────────────────────────────────┘
イベントループの利点と欠点¶
| 利点 | 欠点 |
|---|---|
| ロック不要(単一スレッド実行) | コールバック地獄のリスク |
| 軽量(接続あたりのメモリ最小) | CPU バウンド処理で全体がブロック |
| スケーラブル(C10K 問題を解決) | デバッグが困難 |
| 予測可能な実行順序 | 複雑な状態管理 |
OS レベルの I/O 多重化¶
| システムコール | OS | 特徴 |
|---|---|---|
select |
全 OS | 古典的。FD 数に上限あり(1024) |
poll |
Linux, macOS | select の改良。FD 数制限なし |
epoll |
Linux | 高性能。イベント通知方式 |
kqueue |
macOS, BSD | epoll 相当。macOS 標準 |
IOCP |
Windows | 完了通知方式。.NET の基盤 |
各言語のランタイムやライブラリが、OS に応じて最適な多重化を自動選択します。
6.5 実践的な選択指針¶
高負荷サーバーに適した言語¶
最も適している:
- Rust (tokio) — ゼロコスト async/await + epoll/kqueue 統合。メモリ効率が最高
- Java (NIO) — 成熟した NIO フレームワーク。Netty などの実績あるライブラリ
生産性が高い:
- C# — 言語レベルの async/await。ASP.NET で大規模 Web サーバーの実績
- Haskell — GHC が自動管理。開発者は非同期を意識しない
プロトタイピング:
- Python —
asyncioで素早くプロトタイプ。ただし CPU バウンドには不向き - Clojure —
core.asyncで CSP モデルの非同期設計
コールバック地獄の回避¶
| 言語 | 解決策 |
|---|---|
| Python | asyncio + async/await(Part VII で詳述) |
| Java | CompletableFuture チェーン / Virtual Thread |
| C# | async/await(言語機能) |
| Rust | async/await + tokio |
| Haskell | do 記法(モナド合成) |
| Clojure | go ブロック(CSP モデル) |
| Scala | for 内包表記(モナド合成) |
| F# | async { } 計算式 |
6.6 まとめ¶
言語横断的な学び¶
- ノンブロッキング I/O は C10K 問題の解法 — 少数スレッドで大量接続を処理
- Reactor パターンは普遍的 — 全言語で同じ設計思想だが、抽象度が異なる
- イベントループはロック不要 — 単一スレッド実行で競合状態を回避
- 抽象度と制御のトレードオフ — Haskell(全自動)vs Python(手動 Reactor)
- OS の多重化 API が基盤 — epoll/kqueue/IOCP を言語ランタイムが抽象化
各言語の個別記事¶
| 言語 | 個別記事 |
|---|---|
| Python | Part VI - ノンブロッキング I/O |
| Java | Part VI - ノンブロッキング I/O |
| C# | Part VI - ノンブロッキング I/O |
| Scala | Part VI - ノンブロッキング I/O |
| F# | Part VI - ノンブロッキング I/O |
| Rust | Part VI - ノンブロッキング I/O |
| Haskell | Part VI - ノンブロッキング I/O |
| Clojure | Part VI - ノンブロッキング I/O |