実践データベース設計:生産管理システム 研究 3 - gRPC サービスの実装¶
はじめに¶
本研究では、API サーバー構成(第32章)とは異なるアプローチとして、gRPC による生産管理システムを実装します。Protocol Buffers によるスキーマ駆動開発と HTTP/2 の双方向ストリーミングを活用した、高性能な API を構築します。
第32章で構築したヘキサゴナルアーキテクチャ(ドメイン層・アプリケーション層)はそのまま共有し、Input Adapter として gRPC サービス層のみを追加します。
第42章:gRPC サーバーの基礎¶
42.1 gRPC とは¶
gRPC は Google が開発した高性能な RPC(Remote Procedure Call)フレームワークです。Protocol Buffers をインターフェース定義言語(IDL)として使用し、HTTP/2 上で動作します。
REST API との比較:
| 特徴 | REST API | gRPC |
|---|---|---|
| プロトコル | HTTP/1.1 | HTTP/2 |
| データ形式 | JSON (テキスト) | Protocol Buffers (バイナリ) |
| スキーマ | OpenAPI (任意) | .proto (必須) |
| ストリーミング | WebSocket 別実装 | ネイティブサポート |
| コード生成 | 手動/ツール | 自動生成 |
| パフォーマンス | 標準 | 高速 |
42.2 4つの通信パターン¶
gRPC は 4 つの通信パターンをサポートします:
用途:
- Unary RPC: 通常の API 呼び出し(CRUD 操作)
- Server Streaming: 大量データの配信(MRP 実行結果)
- Client Streaming: バッチ登録(複数品目の一括登録)
- Bidirectional Streaming: リアルタイム通信(進捗監視)
42.3 gRPC におけるヘキサゴナルアーキテクチャ¶
gRPC を導入しても、第32章で構築したヘキサゴナルアーキテクチャ(ドメイン層・アプリケーション層)はそのまま共有し、Input Adapter として gRPC サービス層のみを追加します。
gRPC でもヘキサゴナルアーキテクチャを維持する理由:
- 再利用性: 既存の UseCase/Repository をそのまま活用
- 並行運用: REST API と gRPC API の両方を提供可能
- テスト容易性: ドメインロジックは通信プロトコルに依存しない
- 移行容易性: 段階的に REST から gRPC へ移行可能
42.4 ディレクトリ構成¶
既存の構成に infrastructure/grpc/ を追加するだけです。
src/main/java/com/example/production/
├── domain/ # ドメイン層(API版と共通)
│ ├── model/
│ │ ├── item/
│ │ ├── bom/
│ │ ├── order/
│ │ └── inventory/
│ └── exception/
│
├── application/ # アプリケーション層(API版と共通)
│ ├── port/
│ │ ├── in/ # Input Port(ユースケース)
│ │ └── out/ # Output Port(リポジトリ)
│ └── service/
│
├── infrastructure/
│ ├── persistence/ # Output Adapter(DB実装)- 既存
│ │ ├── mapper/
│ │ └── repository/
│ ├── rest/ # Input Adapter(REST実装)- 既存
│ │ ├── controller/
│ │ ├── dto/
│ │ └── exception/
│ └── grpc/ # Input Adapter(gRPC実装)- 新規追加
│ ├── service/ # gRPC Service 実装
│ ├── mapper/ # Proto ⇔ Domain 変換
│ └── interceptor/ # 共通処理(ログ、エラー等)
│
├── config/
│
└── src/main/proto/ # Protocol Buffers 定義
├── common.proto
├── item.proto
├── bom.proto
├── purchase_order.proto
└── mrp.proto
42.5 技術スタックの追加¶
既存の build.gradle.kts に gRPC 関連の依存関係を追加します。
build.gradle.kts(差分)
import com.google.protobuf.gradle.*
plugins {
// 既存のプラグイン...
id("com.google.protobuf") version "0.9.4" // 追加
}
dependencies {
// 既存の依存関係(Spring Boot, MyBatis, PostgreSQL等)はそのまま
// gRPC 関連を追加
implementation("net.devh:grpc-spring-boot-starter:3.0.0.RELEASE")
implementation("io.grpc:grpc-protobuf:1.60.0")
implementation("io.grpc:grpc-stub:1.60.0")
implementation("io.grpc:grpc-services:1.60.0") // Health check, Reflection
// Protocol Buffers
implementation("com.google.protobuf:protobuf-java:3.25.1")
implementation("com.google.protobuf:protobuf-java-util:3.25.1")
// Test
testImplementation("io.grpc:grpc-testing:1.60.0")
}
// Protocol Buffers コンパイル設定
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.25.1"
}
plugins {
id("grpc") {
artifact = "io.grpc:protoc-gen-grpc-java:1.60.0"
}
}
generateProtoTasks {
all().forEach { task ->
task.plugins {
id("grpc")
}
}
}
}
追加パッケージの説明:
| パッケージ | 用途 |
|---|---|
| grpc-spring-boot-starter | Spring Boot との統合 |
| grpc-protobuf | Protocol Buffers サポート |
| grpc-services | ヘルスチェック、リフレクション |
| protobuf-java-util | JSON ⇔ Proto 変換 |
38.6 Protocol Buffers 定義¶
src/main/proto/common.proto
syntax = "proto3";
package production;
option java_package = "com.example.production.grpc";
option java_multiple_files = true;
// 共通メッセージ
message Empty {}
message Timestamp {
int64 seconds = 1;
int32 nanos = 2;
}
message Date {
int32 year = 1;
int32 month = 2;
int32 day = 3;
}
message Decimal {
string value = 1; // 文字列で精度を維持
}
message PageRequest {
int32 page = 1;
int32 size = 2;
}
message PageResponse {
int32 total_elements = 1;
int32 total_pages = 2;
int32 current_page = 3;
}
// エラーレスポンス
message ErrorDetail {
string code = 1;
string message = 2;
map<string, string> metadata = 3;
}
src/main/proto/item.proto
syntax = "proto3";
package production;
import "common.proto";
option java_package = "com.example.production.grpc";
option java_multiple_files = true;
// 品目区分
enum ItemCategory {
ITEM_CATEGORY_UNSPECIFIED = 0;
PRODUCT = 1; // 製品
SEMI_PRODUCT = 2; // 半製品
PART = 3; // 部品
MATERIAL = 4; // 材料
}
// 品目メッセージ
message Item {
string item_code = 1;
Date effective_date = 2;
string item_name = 3;
ItemCategory category = 4;
string item_group_code = 5;
string unit_code = 6;
string location_code = 7;
int32 lead_time = 8;
int32 safety_stock = 9;
}
// リクエスト/レスポンス
message GetItemRequest {
string item_code = 1;
}
message GetItemsRequest {
ItemCategory category = 1; // optional filter
PageRequest page = 2;
}
message GetItemsResponse {
repeated Item items = 1;
PageResponse page = 2;
}
message CreateItemRequest {
string item_code = 1;
string item_name = 2;
ItemCategory category = 3;
string item_group_code = 4;
string unit_code = 5;
string location_code = 6;
int32 lead_time = 7;
int32 safety_stock = 8;
}
message UpdateItemRequest {
string item_code = 1;
string item_name = 2;
ItemCategory category = 3;
int32 lead_time = 4;
int32 safety_stock = 5;
}
message DeleteItemRequest {
string item_code = 1;
}
message BatchCreateResponse {
int32 success_count = 1;
int32 failed_count = 2;
repeated string failed_codes = 3;
}
// 品目サービス
service ItemService {
// Unary RPC
rpc GetItem(GetItemRequest) returns (Item);
rpc CreateItem(CreateItemRequest) returns (Item);
rpc UpdateItem(UpdateItemRequest) returns (Item);
rpc DeleteItem(DeleteItemRequest) returns (Empty);
// Server Streaming - 品目一覧のストリーム配信
rpc StreamItems(GetItemsRequest) returns (stream Item);
// Client Streaming - バッチ登録
rpc BatchCreateItems(stream CreateItemRequest) returns (BatchCreateResponse);
}
38.7 API サーバー版との Controller の違い¶
API サーバー版(REST Controller)
@RestController // JSON を返す
@RequestMapping("/api/items")
public class ItemApiController {
private final ItemUseCase itemUseCase; // 共有
@GetMapping("/{itemCode}")
public ResponseEntity<ItemResponse> getItem(@PathVariable String itemCode) {
Item item = itemUseCase.getItemByCode(itemCode);
return ResponseEntity.ok(ItemResponse.from(item));
}
}
gRPC 版(gRPC Service)
@GrpcService // gRPC サービスとして公開
public class ItemGrpcService extends ItemServiceGrpc.ItemServiceImplBase {
private final ItemUseCase itemUseCase; // 共有(同じインスタンス)
private final ProtoMapper mapper;
@Override
public void getItem(GetItemRequest request,
StreamObserver<Item> responseObserver) {
// 同じ UseCase を呼び出し
var item = itemUseCase.getItemByCode(request.getItemCode());
responseObserver.onNext(mapper.toProto(item));
responseObserver.onCompleted();
}
}
共通点と相違点:
| 観点 | REST Controller | gRPC Service |
|---|---|---|
| UseCase | 共有(同一インスタンス) | 共有(同一インスタンス) |
| アノテーション | @RestController |
@GrpcService |
| リクエスト | DTO(JSON) | Proto Message |
| レスポンス | ResponseEntity<T> |
StreamObserver<T> |
| 変換 | DTO ⇔ Domain | Proto ⇔ Domain |
| エラー処理 | 例外ハンドラ | Status + onError |
第43章:マスタ API の実装¶
43.1 Proto マッパーの実装¶
既存のドメインモデルと Protocol Buffers メッセージ間の変換を行います。
ProtoMapper.java
package com.example.production.infrastructure.grpc.mapper;
import com.example.production.domain.model.item.Item;
import com.example.production.domain.model.item.ItemCategory;
import com.example.production.grpc.*;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.time.LocalDate;
/**
* ドメインモデルと Protocol Buffers メッセージ間の変換
*/
@Component
public class ProtoMapper {
// ========== Item ==========
public com.example.production.grpc.Item toProto(Item domain) {
return com.example.production.grpc.Item.newBuilder()
.setItemCode(domain.getItemCode())
.setEffectiveDate(toProtoDate(domain.getEffectiveDate()))
.setItemName(domain.getItemName())
.setCategory(toProtoCategory(domain.getCategory()))
.setItemGroupCode(nullToEmpty(domain.getItemGroupCode()))
.setUnitCode(nullToEmpty(domain.getUnitCode()))
.setLocationCode(nullToEmpty(domain.getLocationCode()))
.setLeadTime(nullToZero(domain.getLeadTime()))
.setSafetyStock(nullToZero(domain.getSafetyStock()))
.build();
}
public Item toDomain(CreateItemRequest request) {
return Item.builder()
.itemCode(request.getItemCode())
.effectiveDate(LocalDate.now())
.itemName(request.getItemName())
.category(toDomainCategory(request.getCategory()))
.itemGroupCode(emptyToNull(request.getItemGroupCode()))
.unitCode(emptyToNull(request.getUnitCode()))
.locationCode(emptyToNull(request.getLocationCode()))
.leadTime(zeroToNull(request.getLeadTime()))
.safetyStock(zeroToNull(request.getSafetyStock()))
.build();
}
// ========== Date ==========
public Date toProtoDate(LocalDate date) {
if (date == null) {
return Date.getDefaultInstance();
}
return Date.newBuilder()
.setYear(date.getYear())
.setMonth(date.getMonthValue())
.setDay(date.getDayOfMonth())
.build();
}
public LocalDate toDomainDate(Date protoDate) {
if (protoDate.getYear() == 0) {
return null;
}
return LocalDate.of(
protoDate.getYear(),
protoDate.getMonth(),
protoDate.getDay()
);
}
// ========== Decimal ==========
public Decimal toProtoDecimal(BigDecimal value) {
if (value == null) {
return Decimal.getDefaultInstance();
}
return Decimal.newBuilder()
.setValue(value.toPlainString())
.build();
}
public BigDecimal toDomainDecimal(Decimal protoDecimal) {
if (protoDecimal.getValue().isEmpty()) {
return null;
}
return new BigDecimal(protoDecimal.getValue());
}
// ========== ItemCategory ==========
public com.example.production.grpc.ItemCategory toProtoCategory(ItemCategory domain) {
if (domain == null) {
return com.example.production.grpc.ItemCategory.ITEM_CATEGORY_UNSPECIFIED;
}
return switch (domain) {
case PRODUCT -> com.example.production.grpc.ItemCategory.PRODUCT;
case SEMI_PRODUCT -> com.example.production.grpc.ItemCategory.SEMI_PRODUCT;
case PART -> com.example.production.grpc.ItemCategory.PART;
case MATERIAL -> com.example.production.grpc.ItemCategory.MATERIAL;
};
}
public ItemCategory toDomainCategory(com.example.production.grpc.ItemCategory proto) {
return switch (proto) {
case PRODUCT -> ItemCategory.PRODUCT;
case SEMI_PRODUCT -> ItemCategory.SEMI_PRODUCT;
case PART -> ItemCategory.PART;
case MATERIAL -> ItemCategory.MATERIAL;
default -> null;
};
}
// ========== Utility ==========
private String nullToEmpty(String value) {
return value == null ? "" : value;
}
private String emptyToNull(String value) {
return value == null || value.isEmpty() ? null : value;
}
private int nullToZero(Integer value) {
return value == null ? 0 : value;
}
private Integer zeroToNull(int value) {
return value == 0 ? null : value;
}
}
43.2 品目 gRPC サービスの TDD 実装¶
Red: 失敗するテストを書く¶
ItemGrpcServiceTest.java
package com.example.production.infrastructure.grpc.service;
import com.example.production.domain.model.item.Item;
import com.example.production.domain.model.item.ItemCategory;
import com.example.production.grpc.*;
import com.example.production.infrastructure.persistence.mapper.ItemMapper;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.*;
@SpringBootTest(properties = {
"grpc.server.port=9090",
"grpc.server.in-process-name=test"
})
@Testcontainers
@DisplayName("品目 gRPC サービス")
class ItemGrpcServiceTest {
@Container
static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:16")
.withDatabaseName("production_test")
.withUsername("test")
.withPassword("test");
@DynamicPropertySource
static void configureProperties(DynamicPropertyRegistry registry) {
registry.add("spring.datasource.url", postgres::getJdbcUrl);
registry.add("spring.datasource.username", postgres::getUsername);
registry.add("spring.datasource.password", postgres::getPassword);
}
@Autowired
private ItemMapper itemMapper;
private ManagedChannel channel;
private ItemServiceGrpc.ItemServiceBlockingStub blockingStub;
private ItemServiceGrpc.ItemServiceStub asyncStub;
@BeforeEach
void setUp() {
channel = ManagedChannelBuilder.forAddress("localhost", 9090)
.usePlaintext()
.build();
blockingStub = ItemServiceGrpc.newBlockingStub(channel);
asyncStub = ItemServiceGrpc.newStub(channel);
// テストデータをクリア
itemMapper.deleteAll();
}
@AfterEach
void tearDown() {
channel.shutdownNow();
}
@Nested
@DisplayName("GetItem (Unary)")
class GetItemTests {
@Test
@DisplayName("品目コードで品目を取得できる")
void shouldGetItemByCode() {
// Arrange
insertTestItem("PROD-001", "テスト製品", ItemCategory.PRODUCT);
// Act
var request = GetItemRequest.newBuilder()
.setItemCode("PROD-001")
.build();
var response = blockingStub.getItem(request);
// Assert
assertThat(response.getItemCode()).isEqualTo("PROD-001");
assertThat(response.getItemName()).isEqualTo("テスト製品");
assertThat(response.getCategory())
.isEqualTo(com.example.production.grpc.ItemCategory.PRODUCT);
}
@Test
@DisplayName("存在しない品目はNOT_FOUNDを返す")
void shouldThrowNotFoundForNonExistentItem() {
var request = GetItemRequest.newBuilder()
.setItemCode("NOT-EXIST")
.build();
assertThatThrownBy(() -> blockingStub.getItem(request))
.isInstanceOf(StatusRuntimeException.class)
.hasMessageContaining("NOT_FOUND");
}
}
@Nested
@DisplayName("StreamItems (Server Streaming)")
class StreamItemsTests {
@Test
@DisplayName("品目一覧をストリームで取得できる")
void shouldStreamItems() {
// Arrange
for (int i = 1; i <= 5; i++) {
insertTestItem("ITEM-" + String.format("%03d", i),
"品目" + i, ItemCategory.PRODUCT);
}
// Act
var request = GetItemsRequest.newBuilder().build();
Iterator<com.example.production.grpc.Item> iterator =
blockingStub.streamItems(request);
// Assert
List<com.example.production.grpc.Item> items = new ArrayList<>();
iterator.forEachRemaining(items::add);
assertThat(items).hasSize(5);
}
}
@Nested
@DisplayName("BatchCreateItems (Client Streaming)")
class BatchCreateItemsTests {
@Test
@DisplayName("バッチで品目を登録できる")
void shouldBatchCreateItems() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
final BatchCreateResponse[] result = new BatchCreateResponse[1];
var requestObserver = asyncStub.batchCreateItems(
new io.grpc.stub.StreamObserver<BatchCreateResponse>() {
@Override
public void onNext(BatchCreateResponse response) {
result[0] = response;
}
@Override
public void onError(Throwable t) {
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
// Act - 3件の品目を送信
for (int i = 1; i <= 3; i++) {
requestObserver.onNext(CreateItemRequest.newBuilder()
.setItemCode("BATCH-" + String.format("%03d", i))
.setItemName("バッチ品目" + i)
.setCategory(com.example.production.grpc.ItemCategory.PRODUCT)
.build());
}
requestObserver.onCompleted();
// Assert
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(result[0].getSuccessCount()).isEqualTo(3);
}
}
private void insertTestItem(String code, String name, ItemCategory category) {
itemMapper.insert(Item.builder()
.itemCode(code)
.effectiveDate(LocalDate.of(2025, 1, 1))
.itemName(name)
.category(category)
.build());
}
}
Green: gRPC サービスの実装¶
ItemGrpcService.java
package com.example.production.infrastructure.grpc.service;
import com.example.production.application.port.in.*;
import com.example.production.domain.exception.DuplicateItemException;
import com.example.production.domain.exception.ItemNotFoundException;
import com.example.production.domain.model.item.Item;
import com.example.production.grpc.*;
import com.example.production.infrastructure.grpc.mapper.ProtoMapper;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.List;
/**
* 品目 gRPC サービス実装
* 既存の ItemUseCase を Input Adapter として呼び出す
*/
@GrpcService
public class ItemGrpcService extends ItemServiceGrpc.ItemServiceImplBase {
private final ItemUseCase itemUseCase; // 既存の UseCase を注入
private final ProtoMapper mapper;
public ItemGrpcService(ItemUseCase itemUseCase, ProtoMapper mapper) {
this.itemUseCase = itemUseCase;
this.mapper = mapper;
}
/**
* 品目取得(Unary RPC)
*/
@Override
public void getItem(GetItemRequest request,
StreamObserver<com.example.production.grpc.Item> responseObserver) {
try {
// 既存の UseCase を呼び出し
Item item = itemUseCase.getItemByCode(request.getItemCode());
responseObserver.onNext(mapper.toProto(item));
responseObserver.onCompleted();
} catch (ItemNotFoundException e) {
responseObserver.onError(Status.NOT_FOUND
.withDescription(e.getMessage())
.asRuntimeException());
}
}
/**
* 品目登録(Unary RPC)
*/
@Override
@Transactional
public void createItem(CreateItemRequest request,
StreamObserver<com.example.production.grpc.Item> responseObserver) {
try {
// Proto → Command 変換
CreateItemCommand command = CreateItemCommand.builder()
.itemCode(request.getItemCode())
.itemName(request.getItemName())
.category(mapper.toDomainCategory(request.getCategory()))
.itemGroupCode(emptyToNull(request.getItemGroupCode()))
.unitCode(emptyToNull(request.getUnitCode()))
.locationCode(emptyToNull(request.getLocationCode()))
.leadTime(zeroToNull(request.getLeadTime()))
.safetyStock(zeroToNull(request.getSafetyStock()))
.build();
// 既存の UseCase を呼び出し
Item item = itemUseCase.createItem(command);
responseObserver.onNext(mapper.toProto(item));
responseObserver.onCompleted();
} catch (DuplicateItemException e) {
responseObserver.onError(Status.ALREADY_EXISTS
.withDescription(e.getMessage())
.asRuntimeException());
}
}
/**
* 品目更新(Unary RPC)
*/
@Override
@Transactional
public void updateItem(UpdateItemRequest request,
StreamObserver<com.example.production.grpc.Item> responseObserver) {
try {
UpdateItemCommand command = UpdateItemCommand.builder()
.itemCode(request.getItemCode())
.itemName(emptyToNull(request.getItemName()))
.category(mapper.toDomainCategory(request.getCategory()))
.leadTime(zeroToNull(request.getLeadTime()))
.safetyStock(zeroToNull(request.getSafetyStock()))
.build();
Item item = itemUseCase.updateItem(command);
responseObserver.onNext(mapper.toProto(item));
responseObserver.onCompleted();
} catch (ItemNotFoundException e) {
responseObserver.onError(Status.NOT_FOUND
.withDescription(e.getMessage())
.asRuntimeException());
}
}
/**
* 品目削除(Unary RPC)
*/
@Override
@Transactional
public void deleteItem(DeleteItemRequest request,
StreamObserver<Empty> responseObserver) {
try {
itemUseCase.deleteItem(request.getItemCode());
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
} catch (ItemNotFoundException e) {
responseObserver.onError(Status.NOT_FOUND
.withDescription(e.getMessage())
.asRuntimeException());
}
}
/**
* 品目一覧のストリーム配信(Server Streaming RPC)
*/
@Override
public void streamItems(GetItemsRequest request,
StreamObserver<com.example.production.grpc.Item> responseObserver) {
try {
List<Item> items;
if (request.getCategory() !=
com.example.production.grpc.ItemCategory.ITEM_CATEGORY_UNSPECIFIED) {
items = itemUseCase.getItemsByCategory(
mapper.toDomainCategory(request.getCategory()));
} else {
items = itemUseCase.getAllItems();
}
// ストリーミング配信
for (Item item : items) {
responseObserver.onNext(mapper.toProto(item));
}
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.asRuntimeException());
}
}
/**
* バッチ登録(Client Streaming RPC)
*/
@Override
@Transactional
public StreamObserver<CreateItemRequest> batchCreateItems(
StreamObserver<BatchCreateResponse> responseObserver) {
return new StreamObserver<>() {
private final List<String> successCodes = new ArrayList<>();
private final List<String> failedCodes = new ArrayList<>();
@Override
public void onNext(CreateItemRequest request) {
try {
CreateItemCommand command = CreateItemCommand.builder()
.itemCode(request.getItemCode())
.itemName(request.getItemName())
.category(mapper.toDomainCategory(request.getCategory()))
.build();
itemUseCase.createItem(command);
successCodes.add(request.getItemCode());
} catch (Exception e) {
failedCodes.add(request.getItemCode());
}
}
@Override
public void onError(Throwable t) {
responseObserver.onError(Status.INTERNAL
.withDescription(t.getMessage())
.asRuntimeException());
}
@Override
public void onCompleted() {
responseObserver.onNext(BatchCreateResponse.newBuilder()
.setSuccessCount(successCodes.size())
.setFailedCount(failedCodes.size())
.addAllFailedCodes(failedCodes)
.build());
responseObserver.onCompleted();
}
};
}
private String emptyToNull(String value) {
return value == null || value.isEmpty() ? null : value;
}
private Integer zeroToNull(int value) {
return value == 0 ? null : value;
}
}
43.3 BOM gRPC サービスの実装¶
BomGrpcService.java
package com.example.production.infrastructure.grpc.service;
import com.example.production.application.service.BomNode;
import com.example.production.application.service.BomService;
import com.example.production.application.service.WhereUsedResult;
import com.example.production.grpc.*;
import com.example.production.infrastructure.grpc.mapper.ProtoMapper;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;
import java.util.List;
/**
* BOM gRPC サービス実装
* 既存の BomService を Input Adapter として呼び出す
*/
@GrpcService
public class BomGrpcService extends BomServiceGrpc.BomServiceImplBase {
private final BomService bomService; // 既存のサービスを注入
private final ProtoMapper mapper;
public BomGrpcService(BomService bomService, ProtoMapper mapper) {
this.bomService = bomService;
this.mapper = mapper;
}
/**
* 部品展開(Server Streaming)
*/
@Override
public void explodeBom(ExplodeBomRequest request,
StreamObserver<com.example.production.grpc.BomNode> responseObserver) {
try {
BomNode root = bomService.explodeBom(request.getItemCode());
// 深さ優先でストリーミング配信
streamBomNode(root, responseObserver);
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.asRuntimeException());
}
}
private void streamBomNode(BomNode node,
StreamObserver<com.example.production.grpc.BomNode> observer) {
observer.onNext(toProtoBomNode(node));
if (node.getChildren() != null) {
for (BomNode child : node.getChildren()) {
streamBomNode(child, observer);
}
}
}
private com.example.production.grpc.BomNode toProtoBomNode(BomNode domain) {
var builder = com.example.production.grpc.BomNode.newBuilder()
.setItemCode(domain.getItemCode())
.setItemName(domain.getItemName())
.setRequiredQuantity(mapper.toProtoDecimal(domain.getRequiredQuantity()))
.setLevel(domain.getLevel());
if (domain.getChildren() != null) {
for (BomNode child : domain.getChildren()) {
builder.addChildren(toProtoBomNode(child));
}
}
return builder.build();
}
/**
* 使用先照会
*/
@Override
public void whereUsed(WhereUsedRequest request,
StreamObserver<WhereUsedResponse> responseObserver) {
try {
List<WhereUsedResult> results = bomService.whereUsed(request.getItemCode());
var response = WhereUsedResponse.newBuilder();
for (WhereUsedResult result : results) {
response.addResults(com.example.production.grpc.WhereUsedResult.newBuilder()
.setParentItemCode(result.getParentItemCode())
.setItemName(result.getItemName())
.setRequiredQuantity(mapper.toProtoDecimal(result.getRequiredQuantity()))
.build());
}
responseObserver.onNext(response.build());
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.asRuntimeException());
}
}
}
第44章:トランザクション API の実装¶
44.1 発注 gRPC サービス¶
src/main/proto/purchase_order.proto
syntax = "proto3";
package production;
import "common.proto";
option java_package = "com.example.production.grpc";
option java_multiple_files = true;
// 発注ステータス
enum OrderStatus {
ORDER_STATUS_UNSPECIFIED = 0;
DRAFT = 1;
CONFIRMED = 2;
CANCELLED = 3;
COMPLETED = 4;
}
// 発注ヘッダ
message PurchaseOrder {
string order_number = 1;
string supplier_code = 2;
Date order_date = 3;
OrderStatus status = 4;
repeated PurchaseOrderDetail details = 5;
Decimal total_amount = 6;
}
// 発注明細
message PurchaseOrderDetail {
int32 line_number = 1;
string item_code = 2;
Decimal order_quantity = 3;
Decimal unit_price = 4;
Date delivery_date = 5;
Decimal received_quantity = 6;
}
// リクエスト
message GetOrderRequest {
string order_number = 1;
}
message GetOrdersRequest {
OrderStatus status = 1;
PageRequest page = 2;
}
message CreateOrderRequest {
string supplier_code = 1;
repeated CreateOrderDetailRequest details = 2;
}
message CreateOrderDetailRequest {
string item_code = 1;
Decimal order_quantity = 2;
Decimal unit_price = 3;
Date delivery_date = 4;
}
message ConfirmOrderRequest {
string order_number = 1;
}
// 入荷登録(双方向ストリーミング)
message RecordReceivingRequest {
string order_number = 1;
int32 line_number = 2;
Decimal received_quantity = 3;
Date receiving_date = 4;
}
message RecordReceivingResponse {
string order_number = 1;
int32 line_number = 2;
Decimal total_received = 3;
bool is_completed = 4;
}
// 発注サービス
service PurchaseOrderService {
rpc GetOrder(GetOrderRequest) returns (PurchaseOrder);
rpc StreamOrders(GetOrdersRequest) returns (stream PurchaseOrder);
rpc CreateOrder(CreateOrderRequest) returns (PurchaseOrder);
rpc ConfirmOrder(ConfirmOrderRequest) returns (PurchaseOrder);
rpc CancelOrder(GetOrderRequest) returns (Empty);
// 入荷登録(双方向ストリーミング)
rpc RecordReceiving(stream RecordReceivingRequest) returns (stream RecordReceivingResponse);
}
PurchaseOrderGrpcService.java
package com.example.production.infrastructure.grpc.service;
import com.example.production.application.port.in.*;
import com.example.production.domain.model.purchase.PurchaseOrder;
import com.example.production.grpc.*;
import com.example.production.infrastructure.grpc.mapper.ProtoMapper;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
* 発注 gRPC サービス実装
* 既存の PurchaseOrderUseCase を Input Adapter として呼び出す
*/
@GrpcService
public class PurchaseOrderGrpcService
extends PurchaseOrderServiceGrpc.PurchaseOrderServiceImplBase {
private final PurchaseOrderUseCase useCase; // 既存の UseCase
private final ProtoMapper mapper;
public PurchaseOrderGrpcService(PurchaseOrderUseCase useCase, ProtoMapper mapper) {
this.useCase = useCase;
this.mapper = mapper;
}
@Override
public void getOrder(GetOrderRequest request,
StreamObserver<com.example.production.grpc.PurchaseOrder> responseObserver) {
try {
PurchaseOrder order = useCase.getOrder(request.getOrderNumber());
responseObserver.onNext(toProto(order));
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(Status.NOT_FOUND
.withDescription(e.getMessage())
.asRuntimeException());
}
}
@Override
public void streamOrders(GetOrdersRequest request,
StreamObserver<com.example.production.grpc.PurchaseOrder> responseObserver) {
try {
List<PurchaseOrder> orders = useCase.getAllOrders();
for (PurchaseOrder order : orders) {
responseObserver.onNext(toProto(order));
}
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.asRuntimeException());
}
}
@Override
@Transactional
public void createOrder(CreateOrderRequest request,
StreamObserver<com.example.production.grpc.PurchaseOrder> responseObserver) {
try {
CreatePurchaseOrderCommand command = CreatePurchaseOrderCommand.builder()
.supplierCode(request.getSupplierCode())
.details(request.getDetailsList().stream()
.map(d -> CreatePurchaseOrderCommand.PurchaseOrderDetailCommand.builder()
.itemCode(d.getItemCode())
.orderQuantity(mapper.toDomainDecimal(d.getOrderQuantity()))
.unitPrice(mapper.toDomainDecimal(d.getUnitPrice()))
.deliveryDate(mapper.toDomainDate(d.getDeliveryDate()))
.build())
.toList())
.build();
PurchaseOrder order = useCase.createOrder(command);
responseObserver.onNext(toProto(order));
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.asRuntimeException());
}
}
/**
* 入荷登録(双方向ストリーミング)
*/
@Override
public StreamObserver<RecordReceivingRequest> recordReceiving(
StreamObserver<RecordReceivingResponse> responseObserver) {
return new StreamObserver<>() {
@Override
public void onNext(RecordReceivingRequest request) {
try {
var result = useCase.recordReceiving(
request.getOrderNumber(),
request.getLineNumber(),
mapper.toDomainDecimal(request.getReceivedQuantity()),
mapper.toDomainDate(request.getReceivingDate())
);
// 即座にレスポンス
responseObserver.onNext(RecordReceivingResponse.newBuilder()
.setOrderNumber(request.getOrderNumber())
.setLineNumber(request.getLineNumber())
.setTotalReceived(mapper.toProtoDecimal(result.getTotalReceived()))
.setIsCompleted(result.isCompleted())
.build());
} catch (Exception e) {
responseObserver.onNext(RecordReceivingResponse.newBuilder()
.setOrderNumber(request.getOrderNumber())
.setLineNumber(request.getLineNumber())
.setIsCompleted(false)
.build());
}
}
@Override
public void onError(Throwable t) {
responseObserver.onError(Status.INTERNAL
.withDescription(t.getMessage())
.asRuntimeException());
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
private com.example.production.grpc.PurchaseOrder toProto(PurchaseOrder domain) {
var builder = com.example.production.grpc.PurchaseOrder.newBuilder()
.setOrderNumber(domain.getOrderNumber())
.setSupplierCode(domain.getSupplierCode())
.setOrderDate(mapper.toProtoDate(domain.getOrderDate()))
.setStatus(toProtoStatus(domain.getStatus()))
.setTotalAmount(mapper.toProtoDecimal(domain.getTotalAmount()));
if (domain.getDetails() != null) {
for (var detail : domain.getDetails()) {
builder.addDetails(PurchaseOrderDetail.newBuilder()
.setLineNumber(detail.getLineNumber())
.setItemCode(detail.getItemCode())
.setOrderQuantity(mapper.toProtoDecimal(detail.getOrderQuantity()))
.setUnitPrice(mapper.toProtoDecimal(detail.getUnitPrice()))
.setDeliveryDate(mapper.toProtoDate(detail.getDeliveryDate()))
.setReceivedQuantity(mapper.toProtoDecimal(detail.getReceivedQuantity()))
.build());
}
}
return builder.build();
}
private OrderStatus toProtoStatus(
com.example.production.domain.model.purchase.OrderStatus domain) {
return switch (domain) {
case DRAFT -> OrderStatus.DRAFT;
case CONFIRMED -> OrderStatus.CONFIRMED;
case CANCELLED -> OrderStatus.CANCELLED;
case COMPLETED -> OrderStatus.COMPLETED;
};
}
}
44.2 MRP gRPC サービス¶
MrpGrpcService.java
package com.example.production.infrastructure.grpc.service;
import com.example.production.application.service.MrpService;
import com.example.production.grpc.*;
import com.example.production.infrastructure.grpc.mapper.ProtoMapper;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
/**
* MRP gRPC サービス実装
* 既存の MrpService を Input Adapter として呼び出す
*/
@GrpcService
public class MrpGrpcService extends MrpServiceGrpc.MrpServiceImplBase {
private final MrpService mrpService; // 既存のサービス
private final ProtoMapper mapper;
public MrpGrpcService(MrpService mrpService, ProtoMapper mapper) {
this.mrpService = mrpService;
this.mapper = mapper;
}
/**
* MRP 実行(進捗をストリーミング)
*/
@Override
public void executeMrp(ExecuteMrpRequest request,
StreamObserver<MrpProgress> responseObserver) {
var executor = Executors.newSingleThreadExecutor();
CompletableFuture.runAsync(() -> {
try {
// フェーズ1: 需要取得
sendProgress(responseObserver, "需要取得", 0, 4,
"基準生産計画から需要を取得中...");
// フェーズ2: BOM 展開
sendProgress(responseObserver, "BOM展開", 1, 4,
"部品構成表を展開中...");
// フェーズ3: 正味所要量計算
sendProgress(responseObserver, "正味所要量計算", 2, 4,
"在庫と照合して正味所要量を計算中...");
// フェーズ4: 計画オーダ生成
sendProgress(responseObserver, "計画オーダ生成", 3, 4,
"ロットサイズを考慮して計画オーダを生成中...");
// 実際の MRP 実行
var startDate = mapper.toDomainDate(request.getStartDate());
var endDate = mapper.toDomainDate(request.getEndDate());
mrpService.execute(startDate, endDate);
// 完了
sendProgress(responseObserver, "完了", 4, 4,
"MRP 実行が完了しました");
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(io.grpc.Status.INTERNAL
.withDescription(e.getMessage())
.asRuntimeException());
}
}, executor).whenComplete((v, t) -> executor.shutdown());
}
private void sendProgress(StreamObserver<MrpProgress> observer,
String phase, int current, int total, String message) {
observer.onNext(MrpProgress.newBuilder()
.setPhase(phase)
.setCurrent(current)
.setTotal(total)
.setMessage(message)
.build());
}
/**
* MRP 実行(同期)
*/
@Override
public void executeMrpSync(ExecuteMrpRequest request,
StreamObserver<MrpResult> responseObserver) {
try {
var startDate = mapper.toDomainDate(request.getStartDate());
var endDate = mapper.toDomainDate(request.getEndDate());
long startTime = System.currentTimeMillis();
var result = mrpService.execute(startDate, endDate);
long executionTime = System.currentTimeMillis() - startTime;
var response = MrpResult.newBuilder()
.setPeriodStart(request.getStartDate())
.setPeriodEnd(request.getEndDate())
.setStatistics(MrpStatistics.newBuilder()
.setTotalItemsProcessed(result.getPlannedOrders().size())
.setShortageItemCount(result.getShortageItems().size())
.setExecutionTimeMs(executionTime)
.build());
for (var order : result.getPlannedOrders()) {
response.addPlannedOrders(toProtoPlannedOrder(order));
}
for (var item : result.getShortageItems()) {
response.addShortageItems(toProtoShortageItem(item));
}
responseObserver.onNext(response.build());
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(io.grpc.Status.INTERNAL
.withDescription(e.getMessage())
.asRuntimeException());
}
}
private PlannedOrder toProtoPlannedOrder(
com.example.production.application.service.PlannedOrder domain) {
return PlannedOrder.newBuilder()
.setItemCode(domain.getItemCode())
.setItemName(domain.getItemName())
.setQuantity(mapper.toProtoDecimal(domain.getQuantity()))
.setDueDate(mapper.toProtoDate(domain.getDueDate()))
.build();
}
private ShortageItem toProtoShortageItem(
com.example.production.application.service.ShortageItem domain) {
return ShortageItem.newBuilder()
.setItemCode(domain.getItemCode())
.setItemName(domain.getItemName())
.setShortageQuantity(mapper.toProtoDecimal(domain.getShortageQuantity()))
.setRecommendedOrderDate(mapper.toProtoDate(domain.getRecommendedOrderDate()))
.build();
}
}
第45章:エラーハンドリングとストリーミング¶
45.1 gRPC ステータスコード¶
gRPC は標準化されたステータスコードを使用します:
| コード | 説明 | 使用例 |
|---|---|---|
OK |
成功 | 正常完了 |
INVALID_ARGUMENT |
無効な引数 | バリデーションエラー |
NOT_FOUND |
見つからない | 品目が存在しない |
ALREADY_EXISTS |
既に存在 | 品目コード重複 |
FAILED_PRECONDITION |
前提条件エラー | 発注確定済みの取消 |
INTERNAL |
内部エラー | サーバー側エラー |
45.2 グローバルインターセプター¶
LoggingInterceptor.java
package com.example.production.infrastructure.grpc.interceptor;
import io.grpc.*;
import net.devh.boot.grpc.server.interceptor.GrpcGlobalServerInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
/**
* ロギング & エラーハンドリングインターセプター
*/
@GrpcGlobalServerInterceptor
public class LoggingInterceptor implements ServerInterceptor {
private static final Logger log = LoggerFactory.getLogger(LoggingInterceptor.class);
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
String methodName = call.getMethodDescriptor().getFullMethodName();
Instant startTime = Instant.now();
log.info("gRPC Request: {}", methodName);
ServerCall<ReqT, RespT> wrappedCall = new ForwardingServerCall
.SimpleForwardingServerCall<>(call) {
@Override
public void close(Status status, Metadata trailers) {
Duration duration = Duration.between(startTime, Instant.now());
log.info("gRPC Response: {} - Status: {} - Duration: {}ms",
methodName, status.getCode(), duration.toMillis());
super.close(status, trailers);
}
};
return next.startCall(wrappedCall, headers);
}
}
45.3 ヘルスチェックの実装¶
HealthGrpcService.java
package com.example.production.infrastructure.grpc.service;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;
import javax.sql.DataSource;
import java.sql.Connection;
/**
* gRPC ヘルスチェックサービス
*/
@GrpcService
public class HealthGrpcService extends HealthGrpc.HealthImplBase {
private final DataSource dataSource;
public HealthGrpcService(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public void check(HealthCheckRequest request,
StreamObserver<HealthCheckResponse> responseObserver) {
HealthCheckResponse.ServingStatus status = checkHealth()
? HealthCheckResponse.ServingStatus.SERVING
: HealthCheckResponse.ServingStatus.NOT_SERVING;
responseObserver.onNext(HealthCheckResponse.newBuilder()
.setStatus(status)
.build());
responseObserver.onCompleted();
}
private boolean checkHealth() {
try (Connection conn = dataSource.getConnection()) {
return conn.isValid(5);
} catch (Exception e) {
return false;
}
}
}
45.4 設定ファイル¶
application.yml(gRPC 追加設定)
grpc:
server:
port: 9090
reflection-service-enabled: true # 開発時にリフレクション有効化
# 既存の設定はそのまま
spring:
datasource:
url: jdbc:postgresql://localhost:5432/production
username: postgres
password: postgres
mybatis:
mapper-locations: classpath:mapper/*.xml
45.5 gRPC クライアントからの呼び出し¶
# サービス一覧
grpcurl -plaintext localhost:9090 list
# 品目取得
grpcurl -plaintext -d '{"item_code": "PROD-001"}' \
localhost:9090 production.ItemService/GetItem
# 品目一覧(ストリーミング)
grpcurl -plaintext \
localhost:9090 production.ItemService/StreamItems
まとめ¶
本研究では、gRPC による生産管理システム API を実装しました。
API サーバー版との比較¶
| 観点 | REST API(第32章) | gRPC(本研究) |
|---|---|---|
| プロトコル | HTTP/1.1 + JSON | HTTP/2 + Protobuf |
| スキーマ | OpenAPI(任意) | .proto(必須) |
| ストリーミング | WebSocket 別実装 | ネイティブ |
| パフォーマンス | 標準 | 高速 |
| ドメイン層 | 共有 | 共有 |
| アプリケーション層 | 共有 | 共有 |
| Input Adapter | REST Controller | gRPC Service |
実装した gRPC サービス¶
| サービス | メソッド | パターン |
|---|---|---|
| ItemService | GetItem, CreateItem, UpdateItem, DeleteItem | Unary |
| StreamItems | Server Streaming | |
| BatchCreateItems | Client Streaming | |
| BomService | ExplodeBom | Server Streaming |
| WhereUsed | Unary | |
| PurchaseOrderService | GetOrder, CreateOrder, ConfirmOrder | Unary |
| StreamOrders | Server Streaming | |
| RecordReceiving | Bidirectional | |
| MrpService | ExecuteMrp | Server Streaming |
| ExecuteMrpSync | Unary |
技術スタック¶
- grpc-spring-boot-starter: Spring Boot 統合
- Protocol Buffers 3: スキーマ定義
- io.grpc: gRPC Java 実装
- gRPC Health Check: ヘルスチェック
gRPC を選択すべき場面¶
- マイクロサービス間通信: 高パフォーマンスが必要
- リアルタイム通信: 進捗通知、チャット等
- 大量データ転送: ストリーミングで効率的に
- 多言語クライアント: Proto からコード生成
- 型安全性重視: スキーマ駆動開発