Skip to content

第34章:メッセージングパターン

本章では、エンタープライズインテグレーションの基盤となるメッセージングパターンについて解説します。Gregor Hohpe と Bobby Woolf による「Enterprise Integration Patterns」で体系化されたパターンを、基幹業務システムの文脈で理解していきます。


34.1 メッセージングの基礎

メッセージングシステムの全体像

メッセージングシステムは、分散システム間の非同期通信を実現するための基盤です。送信側と受信側が直接接続する必要がなく、疎結合なシステム統合を可能にします。

uml diagram

メッセージチャネル(Message Channel)

メッセージチャネルは、送信側と受信側をつなぐ論理的なパイプです。チャネルの種類によって、メッセージの配信方法が異なります。

uml diagram

基幹業務システムにおけるチャネル設計

uml diagram

メッセージ(Message)の構造

メッセージは、ヘッダーとボディで構成されます。ヘッダーにはルーティングや処理に必要なメタデータを、ボディにはビジネスデータを含めます。

uml diagram

売上イベントメッセージの例

uml diagram

Java 実装例
// メッセージヘッダー
public record MessageHeader(
    String messageId,
    String correlationId,
    String replyTo,
    Instant timestamp,
    String messageType,
    String source,
    Map<String, Object> properties
) {
    public static MessageHeader create(String messageType, String source) {
        return new MessageHeader(
            UUID.randomUUID().toString(),
            null,
            null,
            Instant.now(),
            messageType,
            source,
            new HashMap<>()
        );
    }

    public MessageHeader withCorrelationId(String correlationId) {
        return new MessageHeader(
            messageId, correlationId, replyTo, timestamp,
            messageType, source, properties
        );
    }
}

// メッセージ本体
public record Message<T>(
    MessageHeader header,
    T payload
) {
    public static <T> Message<T> of(String messageType, String source, T payload) {
        return new Message<>(
            MessageHeader.create(messageType, source),
            payload
        );
    }
}

// 売上イベント
public record SalesCompletedEvent(
    String salesId,
    String orderNumber,
    String customerId,
    LocalDate salesDate,
    BigDecimal totalAmount,
    BigDecimal taxAmount,
    List<SalesLineEvent> lines
) {}

// 使用例
Message<SalesCompletedEvent> message = Message.of(
    "SalesCompleted",
    "sales-service",
    new SalesCompletedEvent(
        "SLS-2024-00456",
        "ORD-2024-00123",
        "CUS-001",
        LocalDate.of(2024, 1, 15),
        new BigDecimal("108000"),
        new BigDecimal("8000"),
        lines
    )
).header().withCorrelationId("ORD-2024-00123");

パイプとフィルター(Pipes and Filters)

パイプとフィルターパターンは、複雑な処理を小さな独立したステップ(フィルター)に分解し、パイプでつなげて処理する方式です。

uml diagram

売上仕訳生成パイプライン

uml diagram

Java 実装例
// フィルターインターフェース
@FunctionalInterface
public interface Filter<T> {
    T process(T input) throws FilterException;

    default Filter<T> andThen(Filter<T> next) {
        return input -> next.process(this.process(input));
    }
}

// パイプラインビルダー
public class Pipeline<T> {
    private final List<Filter<T>> filters = new ArrayList<>();

    public Pipeline<T> addFilter(Filter<T> filter) {
        filters.add(filter);
        return this;
    }

    public T execute(T input) throws FilterException {
        T result = input;
        for (Filter<T> filter : filters) {
            result = filter.process(result);
        }
        return result;
    }
}

// 売上仕訳変換コンテキスト
public class SalesJournalContext {
    private SalesCompletedEvent salesEvent;
    private JournalPattern pattern;
    private List<JournalLine> journalLines;
    private JournalEntry journalEntry;
    private List<String> errors = new ArrayList<>();

    // getter/setter
}

// バリデーションフィルター
public class ValidationFilter implements Filter<SalesJournalContext> {
    @Override
    public SalesJournalContext process(SalesJournalContext ctx) {
        SalesCompletedEvent event = ctx.getSalesEvent();

        if (event.salesId() == null || event.salesId().isBlank()) {
            ctx.getErrors().add("売上IDは必須です");
        }
        if (event.totalAmount().compareTo(BigDecimal.ZERO) <= 0) {
            ctx.getErrors().add("売上金額は正の値である必要があります");
        }

        if (!ctx.getErrors().isEmpty()) {
            throw new ValidationException(ctx.getErrors());
        }
        return ctx;
    }
}

// 仕訳パターン判定フィルター
public class PatternDeterminationFilter implements Filter<SalesJournalContext> {
    private final JournalPatternRepository patternRepository;

    @Override
    public SalesJournalContext process(SalesJournalContext ctx) {
        JournalPattern pattern = patternRepository.findByProductAndCustomerGroup(
            ctx.getSalesEvent().productGroup(),
            ctx.getSalesEvent().customerGroup()
        ).orElseThrow(() -> new PatternNotFoundException("仕訳パターンが見つかりません"));

        ctx.setPattern(pattern);
        return ctx;
    }
}

// パイプラインの構築と実行
public class SalesJournalPipeline {
    private final Pipeline<SalesJournalContext> pipeline;

    public SalesJournalPipeline(
            ValidationFilter validationFilter,
            PatternDeterminationFilter patternFilter,
            JournalLineGenerationFilter generationFilter,
            BalanceValidationFilter balanceFilter) {

        this.pipeline = new Pipeline<SalesJournalContext>()
            .addFilter(validationFilter)
            .addFilter(patternFilter)
            .addFilter(generationFilter)
            .addFilter(balanceFilter);
    }

    public JournalEntry process(SalesCompletedEvent event) {
        SalesJournalContext ctx = new SalesJournalContext();
        ctx.setSalesEvent(event);

        SalesJournalContext result = pipeline.execute(ctx);
        return result.getJournalEntry();
    }
}

34.2 メッセージルーティング

メッセージルーティングは、メッセージの内容や属性に基づいて、適切な宛先にメッセージを振り分ける機能です。

Content-Based Router(内容ベースルーター)

メッセージの内容を検査し、条件に基づいて異なるチャネルにルーティングします。

uml diagram

基幹業務システムでの適用例

uml diagram

Java 実装例
// ルーティング条件
public interface RoutingCondition<T> {
    boolean matches(T message);
    String getDestination();
}

// Content-Based Router
public class ContentBasedRouter<T> {
    private final List<RoutingCondition<T>> conditions = new ArrayList<>();
    private String defaultDestination;

    public ContentBasedRouter<T> when(Predicate<T> predicate, String destination) {
        conditions.add(new RoutingCondition<>() {
            @Override
            public boolean matches(T message) {
                return predicate.test(message);
            }
            @Override
            public String getDestination() {
                return destination;
            }
        });
        return this;
    }

    public ContentBasedRouter<T> otherwise(String destination) {
        this.defaultDestination = destination;
        return this;
    }

    public String route(T message) {
        return conditions.stream()
            .filter(c -> c.matches(message))
            .findFirst()
            .map(RoutingCondition::getDestination)
            .orElse(defaultDestination);
    }
}

// 取引イベントルーター
public class TransactionEventRouter {
    private final ContentBasedRouter<TransactionEvent> router;
    private final MessageSender messageSender;

    public TransactionEventRouter(MessageSender messageSender) {
        this.messageSender = messageSender;
        this.router = new ContentBasedRouter<TransactionEvent>()
            .when(e -> e.type() == TransactionType.ORDER, "order-queue")
            .when(e -> e.type() == TransactionType.SHIPMENT, "shipment-queue")
            .when(e -> e.type() == TransactionType.SALES, "sales-queue")
            .when(e -> e.type() == TransactionType.PURCHASE_ORDER, "po-queue")
            .when(e -> e.type() == TransactionType.RECEIPT, "receipt-queue")
            .when(e -> e.type() == TransactionType.PURCHASE, "purchase-queue")
            .otherwise("dead-letter-queue");
    }

    public void route(TransactionEvent event) {
        String destination = router.route(event);
        messageSender.send(destination, event);
    }
}

Message Filter(メッセージフィルター)

条件に合致するメッセージのみを通過させ、それ以外は破棄または別のチャネルに送ります。

uml diagram

高額取引フィルターの例

uml diagram

Splitter / Aggregator(分割 / 集約)

大きなメッセージを複数の小さなメッセージに分割し、処理後に再び集約するパターンです。

uml diagram

基幹業務システムでの適用例

uml diagram

Java 実装例
// Splitter
public class OrderLineSplitter {
    public List<Message<OrderLineEvent>> split(Message<OrderEvent> orderMessage) {
        OrderEvent order = orderMessage.payload();
        return order.lines().stream()
            .map(line -> {
                OrderLineEvent lineEvent = new OrderLineEvent(
                    order.orderId(),
                    line.lineNumber(),
                    line.productId(),
                    line.quantity(),
                    line.unitPrice()
                );
                return Message.of("OrderLine", "splitter", lineEvent)
                    .withCorrelationId(order.orderId());
            })
            .toList();
    }
}

// Aggregator
public class AllocationResultAggregator {
    private final Map<String, AggregationState> states = new ConcurrentHashMap<>();

    public record AggregationState(
        String correlationId,
        int expectedCount,
        List<AllocationResult> results,
        Instant startTime
    ) {}

    public void aggregate(Message<AllocationResult> message) {
        String correlationId = message.header().correlationId();
        AllocationResult result = message.payload();

        states.compute(correlationId, (id, state) -> {
            if (state == null) {
                // 初期状態
                state = new AggregationState(
                    id,
                    getExpectedCount(id),
                    new ArrayList<>(),
                    Instant.now()
                );
            }
            state.results().add(result);
            return state;
        });

        checkCompletion(correlationId);
    }

    private void checkCompletion(String correlationId) {
        AggregationState state = states.get(correlationId);
        if (state == null) return;

        // 完了条件:すべての結果が揃った
        if (state.results().size() >= state.expectedCount()) {
            AggregatedAllocationResult aggregated = new AggregatedAllocationResult(
                correlationId,
                state.results()
            );
            publishAggregatedResult(aggregated);
            states.remove(correlationId);
        }
        // タイムアウトチェック
        else if (Duration.between(state.startTime(), Instant.now()).toMinutes() > 5) {
            handleTimeout(state);
        }
    }
}

Resequencer(再順序付け)

順序が乱れて到着したメッセージを、正しい順序に並べ替えてから処理します。

uml diagram


34.3 メッセージ変換

異なるシステム間でデータをやり取りする際、データ形式やスキーマの変換が必要になります。

Message Translator(メッセージ変換)

あるシステムのメッセージ形式を、別のシステムが理解できる形式に変換します。

uml diagram

売上から仕訳への変換例

uml diagram

Java 実装例
// メッセージ変換インターフェース
public interface MessageTranslator<S, T> {
    T translate(S source);
}

// 売上から仕訳への変換
@Component
public class SalesJournalTranslator
        implements MessageTranslator<SalesCompletedEvent, JournalEntryCommand> {

    private final JournalPatternRepository patternRepository;
    private final AccountRepository accountRepository;

    @Override
    public JournalEntryCommand translate(SalesCompletedEvent sales) {
        // 仕訳パターン取得
        JournalPattern pattern = patternRepository.findBySalesType(
            sales.salesType()
        ).orElseThrow();

        // 借方明細(売掛金)
        List<JournalLineCommand> debitLines = List.of(
            new JournalLineCommand(
                DebitCredit.DEBIT,
                pattern.debitAccountCode(),
                sales.totalAmount(),
                "売掛金計上 " + sales.customerName()
            )
        );

        // 貸方明細(売上 + 仮受消費税)
        List<JournalLineCommand> creditLines = new ArrayList<>();
        creditLines.add(new JournalLineCommand(
            DebitCredit.CREDIT,
            pattern.creditAccountCode(),
            sales.totalAmount().subtract(sales.taxAmount()),
            "売上計上 " + sales.salesId()
        ));
        creditLines.add(new JournalLineCommand(
            DebitCredit.CREDIT,
            pattern.taxAccountCode(),
            sales.taxAmount(),
            "仮受消費税"
        ));

        // 仕訳伝票作成
        return new JournalEntryCommand(
            generateJournalId(),
            sales.salesDate(),
            JournalType.SALES,
            sales.salesId(),
            Stream.concat(debitLines.stream(), creditLines.stream()).toList()
        );
    }
}

Envelope Wrapper(エンベロープラッパー)

メッセージに追加のメタデータを付与するためのラッパーです。

uml diagram

Java 実装例
// エンベロープ
public record MessageEnvelope<T>(
    RoutingInfo routing,
    SecurityInfo security,
    TraceInfo trace,
    T payload
) {
    public record RoutingInfo(
        String source,
        String destination,
        String replyTo,
        int priority
    ) {}

    public record SecurityInfo(
        String authToken,
        String userId,
        List<String> roles
    ) {}

    public record TraceInfo(
        String traceId,
        String spanId,
        String parentSpanId,
        Instant timestamp
    ) {}

    public static <T> MessageEnvelope<T> wrap(
            T payload,
            String source,
            String destination) {
        return new MessageEnvelope<>(
            new RoutingInfo(source, destination, null, 0),
            null,
            new TraceInfo(
                UUID.randomUUID().toString(),
                UUID.randomUUID().toString(),
                null,
                Instant.now()
            ),
            payload
        );
    }
}

// Envelope Wrapper
@Component
public class EnvelopeWrapper {
    private final SecurityContext securityContext;
    private final TraceContext traceContext;

    public <T> MessageEnvelope<T> wrap(T payload, String destination) {
        return new MessageEnvelope<>(
            new RoutingInfo(
                "current-service",
                destination,
                null,
                0
            ),
            new SecurityInfo(
                securityContext.getToken(),
                securityContext.getUserId(),
                securityContext.getRoles()
            ),
            new TraceInfo(
                traceContext.getTraceId(),
                traceContext.newSpanId(),
                traceContext.getCurrentSpanId(),
                Instant.now()
            ),
            payload
        );
    }

    public <T> T unwrap(MessageEnvelope<T> envelope) {
        // セキュリティ検証
        validateSecurity(envelope.security());
        // トレース情報の伝播
        propagateTrace(envelope.trace());
        return envelope.payload();
    }
}

Content Enricher(コンテンツエンリッチャー)

メッセージに不足している情報を外部ソースから取得して追加します。

uml diagram

受注エンリッチメントの例

uml diagram

Java 実装例
// Enricherインターフェース
public interface ContentEnricher<T> {
    T enrich(T message);
}

// 受注エンリッチャーチェーン
@Component
public class OrderEnricherChain {
    private final List<ContentEnricher<OrderEvent>> enrichers;

    public OrderEnricherChain(
            CustomerEnricher customerEnricher,
            ProductEnricher productEnricher,
            PriceEnricher priceEnricher,
            InventoryEnricher inventoryEnricher) {
        this.enrichers = List.of(
            customerEnricher,
            productEnricher,
            priceEnricher,
            inventoryEnricher
        );
    }

    public OrderEvent enrich(OrderEvent order) {
        OrderEvent enriched = order;
        for (ContentEnricher<OrderEvent> enricher : enrichers) {
            enriched = enricher.enrich(enriched);
        }
        return enriched;
    }
}

// 顧客情報エンリッチャー
@Component
public class CustomerEnricher implements ContentEnricher<OrderEvent> {
    private final CustomerRepository customerRepository;

    @Override
    public OrderEvent enrich(OrderEvent order) {
        Customer customer = customerRepository.findById(order.customerId())
            .orElseThrow(() -> new CustomerNotFoundException(order.customerId()));

        return order.toBuilder()
            .customerName(customer.name())
            .customerAddress(customer.address())
            .creditLimit(customer.creditLimit())
            .build();
    }
}

// 商品情報エンリッチャー
@Component
public class ProductEnricher implements ContentEnricher<OrderEvent> {
    private final ProductRepository productRepository;

    @Override
    public OrderEvent enrich(OrderEvent order) {
        List<OrderLineEvent> enrichedLines = order.lines().stream()
            .map(line -> {
                Product product = productRepository.findById(line.productId())
                    .orElseThrow();
                return line.toBuilder()
                    .productName(product.name())
                    .productCategory(product.category())
                    .taxRate(product.taxRate())
                    .build();
            })
            .toList();

        return order.toBuilder()
            .lines(enrichedLines)
            .build();
    }
}

Canonical Data Model(標準データモデル)

複数のシステム間で共通に使用するデータモデルを定義し、変換の複雑さを軽減します。

uml diagram

基幹業務システムの標準データモデル

uml diagram

Java 実装例
// 標準取引先モデル
public record StandardPartner(
    String partnerId,
    PartnerType partnerType,
    String name,
    Address address,
    List<Contact> contacts,
    Map<String, String> attributes
) {
    public enum PartnerType {
        CUSTOMER,    // 顧客
        SUPPLIER,    // 仕入先
        SUBCONTRACTOR // 外注先
    }
}

// 標準トランザクションモデル
public record StandardTransaction(
    String transactionId,
    TransactionType transactionType,
    LocalDate transactionDate,
    String partnerId,
    Money totalAmount,
    List<TransactionLine> lines,
    Map<String, String> attributes
) {
    public enum TransactionType {
        ORDER,           // 受注
        SHIPMENT,        // 出荷
        SALES,           // 売上
        PURCHASE_ORDER,  // 発注
        RECEIPT,         // 入荷
        PURCHASE,        // 仕入
        WORK_ORDER,      // 製造指示
        COMPLETION       // 完成
    }
}

// 販売固有モデルから標準モデルへの変換
@Component
public class SalesOrderToCanonicalTranslator
        implements MessageTranslator<SalesOrder, StandardTransaction> {

    @Override
    public StandardTransaction translate(SalesOrder order) {
        return new StandardTransaction(
            "TXN-" + order.orderId(),
            TransactionType.ORDER,
            order.orderDate(),
            "PTN-" + order.customerId(),
            Money.of(order.totalAmount(), "JPY"),
            order.lines().stream()
                .map(this::translateLine)
                .toList(),
            Map.of(
                "originalOrderId", order.orderId(),
                "salesRepId", order.salesRepId()
            )
        );
    }

    private TransactionLine translateLine(SalesOrderLine line) {
        return new TransactionLine(
            line.lineNumber(),
            "PRD-" + line.productCode(),
            Quantity.of(line.quantity(), line.unit()),
            Money.of(line.unitPrice(), "JPY"),
            Money.of(line.amount(), "JPY")
        );
    }
}

// 標準モデルから会計固有モデルへの変換
@Component
public class CanonicalToJournalTranslator
        implements MessageTranslator<StandardTransaction, JournalEntry> {

    @Override
    public JournalEntry translate(StandardTransaction transaction) {
        // 取引種別に応じた仕訳パターンを適用
        JournalPattern pattern = determinePattern(transaction.transactionType());

        return new JournalEntry(
            generateJournalId(),
            transaction.transactionDate(),
            pattern.journalType(),
            createJournalLines(transaction, pattern)
        );
    }
}

34.4 まとめ

本章では、エンタープライズインテグレーションの基盤となるメッセージングパターンについて解説しました。

学んだパターン一覧

カテゴリ パターン 用途
基礎 Message Channel メッセージの伝送経路
Message データとメタデータの構造
Pipes and Filters 処理の分解と連結
ルーティング Content-Based Router 内容に基づく振り分け
Message Filter 条件による通過/破棄
Splitter/Aggregator 分割と集約
Resequencer 順序の再整列
変換 Message Translator 形式変換
Envelope Wrapper メタデータ付与
Content Enricher 情報の追加
Canonical Data Model 標準データ形式

基幹業務システムへの適用ポイント

  1. チャネル設計

  2. Point-to-Point:受注処理、仕訳生成など1対1の処理

  3. Pub/Sub:イベント通知、複数システムへの連携

  4. ルーティング設計

  5. 取引種別による振り分け

  6. 金額閾値によるフィルタリング
  7. 月次締めでの分割/集約

  8. 変換設計

  9. 売上→仕訳の自動変換

  10. マスタ情報のエンリッチメント
  11. 標準データモデルによる統合

次章の予告

第35章では、システム間連携パターンについて詳しく解説します。販売管理と財務会計、販売管理と生産管理、生産管理と財務会計の具体的な連携方法を学びます。