Skip to content

Part IV: IO と副作用の管理

本章では、関数型プログラミングにおける副作用の扱い方を学びます。IO モナドを使って副作用を純粋関数内で安全に記述し、ストリーム処理で無限のデータを扱う方法を習得します。


第8章: IO モナドの導入

8.1 副作用の問題

純粋関数は副作用を持ちません。しかし、実際のプログラムには副作用が必要です:

  • ファイルの読み書き
  • ネットワーク通信
  • データベースアクセス
  • 乱数生成
  • 現在時刻の取得

uml diagram

8.2 IO モナドとは

IO モナドは「副作用を持つ計算の記述」を表す型です。

  • IO は「実行すると値を返す副作用のある計算」
  • IO 値を作成しただけでは副作用は発生しない
  • run! で実際に実行される

uml diagram

8.3 Ruby での IO モナド実装

ソースファイル: app/ruby/src/ch08_io.rb

Ruby には Cats Effect のような IO ライブラリがないため、シンプルな IO クラスを実装します:

class IO
  def initialize(&computation)
    @computation = computation
  end

  # 副作用のある計算から IO を作成(遅延評価)
  def self.delay(&block)
    new(&block)
  end

  # 純粋な値から IO を作成(副作用なし)
  def self.pure(value)
    new { value }
  end

  # IO を実行して結果を返す(安全でない!)
  def run!
    @computation.call
  end
end

8.4 サイコロを振る例

不純な関数(副作用あり)

def cast_the_die_impure
  rand(1..6)
end

# 呼び出すたびに異なる値が返る
puts cast_the_die_impure  # 3
puts cast_the_die_impure  # 5
puts cast_the_die_impure  # 1

IO を使った純粋な記述

def cast_the_die
  IO.delay { rand(1..6) }
end

# IO 値を作成(この時点では実行されない)
die_cast = cast_the_die
puts die_cast  # IO(...) - まだ実行されていない

# 実際に実行
puts die_cast.run!  # 4

8.5 IO の作成方法

メソッド 用途
IO.delay { expr } 副作用のある式をラップ IO.delay { puts "hello" }
IO.pure(value) 既存の値をラップ(副作用なし) IO.pure(42)
IO.unit 何もしない IO IO.unit (= IO.pure(nil))
# IO.delay: 副作用を遅延実行
delayed_print = IO.delay { puts "Hello" }
# puts はまだ実行されていない

# IO.pure: 既存の値をラップ
pure_value = IO.pure(42)
# 副作用なし、即座に 42 が確定

8.6 IO の合成

IO 値は bindfmap で合成できます。

def cast_the_die_twice
  cast_the_die.bind do |first|
    cast_the_die.fmap { |second| first + second }
  end
end

# まだ実行されていない
program = cast_the_die_twice

# 実行
result = program.run!

uml diagram

8.7 ミーティングスケジューリングの例

より実践的な例として、ミーティングのスケジューリングを見てみましょう。

MeetingTime = Struct.new(:start_hour, :end_hour, keyword_init: true)

# 副作用のある API 呼び出しを IO でラップ
def calendar_entries(name)
  IO.delay { calendar_entries_api_call(name) }
end

def create_meeting(names, meeting)
  IO.delay { create_meeting_api_call(names, meeting) }
end

予定の取得

def scheduled_meetings(person1, person2)
  calendar_entries(person1).bind do |entries1|
    calendar_entries(person2).fmap do |entries2|
      entries1 + entries2
    end
  end
end

空き時間の計算(純粋関数)

def possible_meetings(existing_meetings, start_hour, end_hour, length_hours)
  slots = (start_hour..(end_hour - length_hours)).map do |start|
    MeetingTime.new(start_hour: start, end_hour: start + length_hours)
  end

  slots.select do |slot|
    existing_meetings.none? { |meeting| meetings_overlap?(meeting, slot) }
  end
end

8.8 or_else によるエラーハンドリング

IO の or_else メソッドで、失敗時のフォールバックを指定できます。

year = IO.pure(996)
no_year = IO.delay { raise 'no year' }

year.or_else(IO.pure(2020)).run!     # 996
no_year.or_else(IO.pure(2020)).run!  # 2020

uml diagram

リトライ戦略

# API 呼び出しを3回までリトライ
calendar_entries('Alice')
  .or_else(calendar_entries('Alice'))
  .or_else(calendar_entries('Alice'))
  .or_else(IO.pure([]))  # 全部失敗したら空リスト

汎用リトライ関数

def retry_action(action, max_retries)
  (0...max_retries).reduce(action) do |program, _|
    program.or_else(action)
  end
end

# 使用例
retry_action(calendar_entries('Alice'), 10)

8.9 sequence による IO のリスト処理

[IO[A]]IO[[A]] に変換するには sequence を使います。

def sequence(io_array)
  io_array.reduce(IO.pure([])) do |acc, io|
    acc.bind do |results|
      io.fmap { |result| results + [result] }
    end
  end
end

actions = [IO.pure(1), IO.pure(2), IO.pure(3)]
combined = sequence(actions)

combined.run!  # [1, 2, 3]

複数人の予定を取得

def scheduled_meetings_for_all(attendees)
  traverse(attendees) do |attendee|
    retry_with_default(calendar_entries(attendee), 3, [])
  end.fmap(&:flatten)
end

第9章: ストリーム処理

9.1 ストリームとは

ストリームは、要素の(潜在的に無限の)シーケンスを表します。Ruby では Enumerator::Lazy を使用します。

uml diagram

9.2 LazyStream の実装

ソースファイル: app/ruby/src/ch09_streams.rb

Ruby の Enumerator::Lazy をラップした LazyStream クラスを実装します:

class LazyStream
  def initialize(enumerator)
    @enumerator = enumerator.lazy
  end

  def self.from_array(array)
    new(array.each)
  end

  def self.repeat(*values)
    enum = Enumerator.new do |yielder|
      loop do
        values.each { |v| yielder << v }
      end
    end
    new(enum)
  end

  def take(n)
    LazyStream.new(@enumerator.take(n))
  end

  def to_array
    @enumerator.to_a
  end
end

9.3 純粋なストリーム

# 有限ストリーム
numbers = LazyStream.from_array([1, 2, 3])
numbers.to_array  # [1, 2, 3]

# フィルタリング
odd_numbers = numbers.filter(&:odd?)
odd_numbers.to_array  # [1, 3]

9.4 無限ストリーム

# repeat で無限に繰り返し
infinite_123s = LazyStream.repeat(1, 2, 3)

# take で有限個を取得
infinite_123s.take(8).to_array  # [1, 2, 3, 1, 2, 3, 1, 2]

uml diagram

9.5 便利なストリーム生成メソッド

# 整数ストリーム(無限)
LazyStream.from(1).take(5).to_array  # [1, 2, 3, 4, 5]

# iterate で漸化式
LazyStream.iterate(1) { |n| n * 2 }.take(5).to_array  # [1, 2, 4, 8, 16]

# continually でブロックを繰り返し実行
counter = 0
LazyStream.continually { counter += 1 }.take(3).to_array  # [1, 2, 3]

9.6 IO を含むストリーム

class IOStream
  def initialize(&stream_fn)
    @stream_fn = stream_fn
  end

  # IO アクションから無限ストリームを作成
  def self.repeat_eval(io_action)
    enum = Enumerator.new do |yielder|
      loop { yielder << io_action.run! }
    end
    new { enum.lazy }
  end

  # リストにコンパイル(IO を返す)
  def compile_to_list
    IO.delay { @stream_fn.call.to_a }
  end
end

# 無限にサイコロを振るストリーム
infinite_die_casts = IOStream.repeat_eval(cast_the_die)

ストリームの実行

# 最初の3回を取得
first_three = infinite_die_casts.take(3).compile_to_list
first_three.run!  # [4, 2, 6]

# 6 が出るまで振り続ける
six = infinite_die_casts.filter { |n| n == 6 }.take(1).compile_to_list
six.run!  # [6]

9.7 ストリームの主要操作

操作 説明
take(n) 最初の n 要素を取得 stream.take(3)
filter(p) 条件を満たす要素のみ stream.filter(&:even?)
fmap(f) 各要素を変換 stream.fmap { \|x\| x * 2 }
append(s) 別のストリームを結合 stream1.append(stream2)
repeat 無限に繰り返し stream.repeat
sliding(n) スライディングウィンドウ stream.sliding(3)

9.8 通貨交換レートの例

為替レートを監視して、上昇トレンドを検出する例です。

トレンド判定(純粋関数)

def trending?(rates)
  return false if rates.size <= 1

  rates.each_cons(2).all? { |prev, curr| curr > prev }
end

trending?([0.81, 0.82, 0.83])  # true (上昇トレンド)
trending?([0.81, 0.84, 0.83])  # false

レートのストリーム

def rates_stream(from_currency, to_currency)
  IOStream.repeat_eval(exchange_table(from_currency))
    .fmap { |table| extract_single_currency_rate(to_currency, table) }
    .filter { |rate| !rate.nil? }
end

9.9 スライディングウィンドウ

sliding で連続する要素をグループ化できます。

stream = LazyStream.from_array([1, 2, 3, 4, 5])

stream.sliding(3).to_array
# [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

uml diagram

トレンド検出

def detect_trend(values, window_size)
  LazyStream.from_array(values)
    .sliding(window_size)
    .fmap { |window| { window: window, trending: trending?(window) } }
end

detect_trend([1, 2, 3, 4, 3], 3).to_array
# [{window: [1,2,3], trending: true},
#  {window: [2,3,4], trending: true},
#  {window: [3,4,3], trending: false}]

9.10 ストリームの結合

2つのストリームを zip で結合できます。

stream1 = LazyStream.from_array([1, 2, 3])
stream2 = LazyStream.from_array(['a', 'b', 'c'])

stream1.zip(stream2).to_array
# [[1, 'a'], [2, 'b'], [3, 'c']]

# 左の値のみ保持
stream1.zip_left(stream2).to_array  # [1, 2, 3]

uml diagram

9.11 便利なストリーム関数

# フィボナッチ数列
def fibonacci
  enum = Enumerator.new do |yielder|
    a, b = 0, 1
    loop do
      yielder << a
      a, b = b, a + b
    end
  end
  LazyStream.new(enum)
end

fibonacci.take(10).to_array  # [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

# 素数
def primes
  LazyStream.from(2).filter { |n| prime?(n) }
end

primes.take(10).to_array  # [2, 3, 5, 7, 11, 13, 17, 19, 23, 29]

# 累積和
def running_sum(stream)
  scan(stream, 0) { |acc, x| acc + x }
end

running_sum(LazyStream.from_array([1, 2, 3, 4, 5])).to_array
# [0, 1, 3, 6, 10, 15]

まとめ

Part IV で学んだこと

uml diagram

IO と LazyStream の比較

特性 IO LazyStream
要素数 1つ 0個以上(無限も可)
実行 run! to_array
用途 単一の副作用 連続した値の処理

Ruby と Scala の対応

Scala (Cats Effect / fs2) Ruby
IO[A] IO クラス
IO.delay(expr) IO.delay { expr }
IO.pure(value) IO.pure(value)
flatMap bind
map fmap
unsafeRunSync() run!
Stream[Pure, A] LazyStream
Stream.eval(io) IOStream.eval(io)
.compile.toList .compile_to_list

キーポイント

  1. IO モナド: 副作用を「記述」として扱い、実行を遅延させる
  2. IO.delay: 副作用のある式を IO にラップ
  3. or_else: 失敗時のフォールバックを指定
  4. sequence: [IO[A]]IO[[A]]
  5. LazyStream: 潜在的に無限のシーケンスを遅延評価で処理
  6. sliding: 連続する要素をグループ化してパターンを検出

次のステップ

Part V では、以下のトピックを学びます:

  • 並行・並列処理
  • 共有状態管理

演習問題

問題 1: IO の基本

以下の関数を実装してください。

def print_and_return(message)
  # メッセージをコンソールに出力し、同じメッセージを返す IO
  ???
end

# 期待される動作
# print_and_return("Hello").run! は
# "Hello" をコンソールに出力し、"Hello" を返す
解答
def print_and_return(message)
  IO.delay do
    puts message
    message
  end
end

# または
def print_and_return(message)
  IO.delay { puts message }.fmap { message }
end

問題 2: IO の合成

以下の関数を実装してください。2つの IO を順番に実行し、結果を結合します。

def combine_io(io1, io2, &fn)
  ???
end

# 期待される動作
result = combine_io(IO.pure(1), IO.pure(2)) { |a, b| a + b }
result.run!  # 3
解答
def combine_io(io1, io2, &fn)
  io1.bind do |a|
    io2.fmap { |b| fn.call(a, b) }
  end
end

問題 3: リトライ

以下の関数を実装してください。指定回数リトライし、全部失敗したらデフォルト値を返します。

def retry_with_default(action, max_retries, default)
  ???
end
解答
def retry_with_default(action, max_retries, default)
  (0...max_retries)
    .reduce(action) { |program, _| program.or_else(action) }
    .or_else(IO.pure(default))
end

問題 4: ストリーム操作

以下のストリームを作成してください。

# 1. 1から10までの偶数のストリーム
evens = ???

# 2. 無限に交互に true/false を返すストリーム
alternating = ???

# 3. 最初の5つの要素の合計を計算
sum = LazyStream.from_array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).take(5).???
解答
# 1. 1から10までの偶数
evens = LazyStream.from_range(1..10).filter(&:even?)
# または
evens = LazyStream.from_array([2, 4, 6, 8, 10])

# 2. 無限に交互に true/false
alternating = LazyStream.repeat(true, false)

# 3. 最初の5つの要素の合計
sum = LazyStream.from_array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
  .take(5)
  .to_array
  .sum  # 15

# または fold を使用
sum = LazyStream.from_array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
  .take(5)
  .fold(0) { |acc, x| acc + x }  # 15

問題 5: トレンド検出

以下の関数を実装してください。直近3つの値が全て同じかどうかを判定します。

def stable?(values)
  ???
end

# 期待される動作
stable?([5, 5, 5])    # true
stable?([5, 5, 6])    # false
stable?([5, 6, 5])    # false
stable?([5])          # false (3つ未満は false)
解答
def stable?(values)
  values.size >= 3 && values.uniq.size == 1
end

# または
def stable?(values)
  return false if values.size < 3

  values.each_cons(2).all? { |prev, curr| prev == curr }
end

問題 6: フィボナッチ数列

iterate を使ってフィボナッチ数列のストリームを作成してください。

def fibonacci_stream
  ???
end

fibonacci_stream.take(10).to_array
# [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
解答
def fibonacci_stream
  enum = Enumerator.new do |yielder|
    a, b = 0, 1
    loop do
      yielder << a
      a, b = b, a + b
    end
  end
  LazyStream.new(enum)
end

# または iterate を使用(ペアを使う)
def fibonacci_stream
  LazyStream.iterate([0, 1]) { |pair| [pair[1], pair[0] + pair[1]] }
    .fmap { |pair| pair[0] }
end

問題 7: 移動平均

スライディングウィンドウを使って移動平均を計算してください。

def moving_average(values, window_size)
  ???
end

moving_average([1, 2, 3, 4, 5], 3).to_array
# [2.0, 3.0, 4.0]  # (1+2+3)/3, (2+3+4)/3, (3+4+5)/3
解答
def moving_average(values, window_size)
  LazyStream.from_array(values)
    .sliding(window_size)
    .fmap { |window| window.sum.to_f / window.size }
end