Skip to content

Part IV: タスク分解と並列パターン

概要

本章では、Fork/Join と Pipeline パターンを学びます。


Fork/Join パターン

投票カウンター

use rayon::prelude::*;
use std::collections::HashMap;

/// Vote Counter using Fork/Join pattern
pub fn count_votes(votes: &[&str]) -> HashMap<String, usize> {
    if votes.is_empty() {
        return HashMap::new();
    }

    // Use parallel fold and reduce
    votes
        .par_iter()
        .fold(
            || HashMap::new(),
            |mut acc: HashMap<String, usize>, &vote| {
                *acc.entry(vote.to_string()).or_insert(0) += 1;
                acc
            },
        )
        .reduce(
            || HashMap::new(),
            |mut a, b| {
                for (k, v) in b {
                    *a.entry(k).or_insert(0) += v;
                }
                a
            },
        )
}

Pipeline パターン

Pipeline 構造体

/// Pipeline that processes items through multiple stages
pub struct Pipeline {
    stages: Vec<Box<dyn Fn(i32) -> i32 + Send>>,
}

impl Pipeline {
    pub fn new() -> Self {
        Pipeline { stages: Vec::new() }
    }

    pub fn add_stage<F>(mut self, processor: F) -> Self
    where
        F: Fn(i32) -> i32 + Send + 'static,
    {
        self.stages.push(Box::new(processor));
        self
    }

    pub fn process(&self, input: i32) -> i32 {
        self.stages.iter().fold(input, |acc, stage| stage(acc))
    }
}

チャネルによる並行パイプライン

use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread;

/// Concurrent pipeline using channels
pub fn concurrent_pipeline<T>(
    input: Vec<T>,
    processors: Vec<Box<dyn Fn(T) -> T + Send + 'static>>,
) -> Vec<T>
where
    T: Send + 'static + Clone,
{
    if processors.is_empty() {
        return input;
    }

    let (first_tx, mut current_rx): (Sender<T>, Receiver<T>) = channel();

    // Send initial input
    let input_thread = thread::spawn(move || {
        for item in input {
            first_tx.send(item).unwrap();
        }
    });

    // Create processor threads
    let mut handles = vec![input_thread];

    for processor in processors {
        let (tx, rx): (Sender<T>, Receiver<T>) = channel();
        let prev_rx = current_rx;
        current_rx = rx;

        let handle = thread::spawn(move || {
            while let Ok(item) = prev_rx.recv() {
                let result = processor(item);
                if tx.send(result).is_err() {
                    break;
                }
            }
        });
        handles.push(handle);
    }

    // Collect results
    let mut results = Vec::new();
    while let Ok(item) = current_rx.recv() {
        results.push(item);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    results
}

使用例

fn main() {
    // Pipeline
    let pipeline = Pipeline::new()
        .add_stage(|x| x + 1)
        .add_stage(|x| x * 2)
        .add_stage(|x| x - 3);

    let result = pipeline.process(5);
    // (5 + 1) * 2 - 3 = 9
    println!("Result: {}", result);

    // Fork/Join
    let votes = vec!["A", "B", "A", "A", "B", "C"];
    let counts = count_votes(&votes);
    // {"A": 3, "B": 2, "C": 1}
}

次のステップ

Part V では、同期と排他制御を学びます。