EDA の基礎: ドメイン イベント、コマンド、メッセージ バス
電子商取引システムを想像してください。顧客が注文を完了すると、必ず注文が完了します。 多くのことを並行して行う必要があります。在庫を減らす必要があり、電子メール通知を送信する必要があり、 フルフィルメント チームに通知する必要があり、ロイヤルティ システムを更新する必要があります。はい 彼らはこれらのサービスを調整していますか?古典的なアプローチは、Orders サービスが呼び出すことです。 他のすべてのサービスは直接: 密結合、脆弱性、不可能性 自主的に登ること。
L'イベント駆動型アーキテクチャ (EDA) このパラダイムを覆すサービス
オーダーがイベントを公開する OrderPlaced メッセージバスとすべてのサービス上で
利害関係者はそれを独自に消費します。 Orders サービスは、誰が注文を聞いているかを知りません。
回答を待っていますが、回答が可能かどうかは関係ありません。このデカップリングと
スケーラブルで回復力のある分散システムが構築される基本原則。
何を学ぶか
- EDA のドメイン イベント、コマンド、クエリの違い
- パブリッシュ/サブスクライブ パターン: プロデューサーとコンシューマーの分離
- メッセージ バス、イベント バス、メッセージ キュー: いつ何を使用するか
- イベントスキーマ: 構造、バージョン管理、および標準の CloudEvents
- EDA と同期 REST の利点とトレードオフ
- TypeScript での単純な EDA システムの実装
- EDA をいつ使用するか、いつ使用しないかを決定する方法
EDA の 3 種類のメッセージ
イベント駆動型システム内のすべてのメッセージが同じであるわけではありません。違いを理解する イベント、コマンド、クエリの間、および正しい EDA システムを設計するための最初のステップ:
| タイプ | 説明 | 方向 | 答え | Esempio |
|---|---|---|---|---|
| ドメインイベント | ドメイン内で何かが起こった | 1 → N (ブロードキャスト) | No | OrderPlaced, PaymentReceived |
| 指示 | アクションの実行リクエスト | 1 → 1 (ポイントツーポイント) | オプション (非同期 ACK) | PlaceOrder, SendEmail |
| クエリ | 1 つのデータ要求 (非同期 EDA) | 1→1、返信あり | はい (返信キュー) | GetOrderStatus 返信キュー経由 |
ドメイン イベント: EDA の中心
Un ドメインイベント ビジネス領域ですでに起こったことを説明します。 主要なプロパティ:
- 不変: 過去を説明するものであり、出版後も変更されない
- 過去に名付けたもの:
OrderPlaced、 ないPlaceOrder - 自己完結型: 消費者に必要なすべてのデータが含まれています
- 入力済み: それぞれのタイプのイベントには定義されたパターンがあります
// TypeScript: struttura di un Domain Event
interface DomainEvent {
eventId: string; // ID unico dell'evento (UUID)
eventType: string; // nome del tipo evento
occurredAt: string; // timestamp ISO 8601 (immutabile)
aggregateId: string; // ID dell'aggregato che ha generato l'evento
aggregateType: string; // tipo dell'aggregato (es. "Order")
version: number; // versione dello schema evento (per evoluzione)
payload: unknown; // dati specifici dell'evento
metadata?: {
correlationId?: string; // ID per tracciare la catena di eventi
causationId?: string; // ID del messaggio che ha causato questo evento
userId?: string; // utente che ha innescato l'azione
};
}
// Evento concreto: OrderPlaced
interface OrderPlacedEvent extends DomainEvent {
eventType: 'OrderPlaced';
aggregateType: 'Order';
payload: {
orderId: string;
customerId: string;
items: Array<{
productId: string;
quantity: number;
unitPrice: number;
}>;
totalAmount: number;
currency: string;
shippingAddress: {
street: string;
city: string;
country: string;
};
};
}
// Creare un OrderPlaced event
function createOrderPlacedEvent(order: Order): OrderPlacedEvent {
return {
eventId: crypto.randomUUID(),
eventType: 'OrderPlaced',
occurredAt: new Date().toISOString(),
aggregateId: order.id,
aggregateType: 'Order',
version: 1,
payload: {
orderId: order.id,
customerId: order.customerId,
items: order.items,
totalAmount: order.totalAmount,
currency: order.currency,
shippingAddress: order.shippingAddress,
},
metadata: {
correlationId: crypto.randomUUID(),
},
};
}
パブリッシュ/サブスクライブ パターン
パターン パブリッシュ-サブスクライブ そして EDA の基礎: 出版社 (プロデューサーは) 誰がイベントを受信するのかを知らずにメッセージ バスにイベントを送信します。購読者 (消費者は)誰が発行しているのかを知らずに、特定の種類のイベントを受信するために登録します。
// Implementazione semplice di un Event Bus in memoria (per test/sviluppo)
type EventHandler<T extends DomainEvent> = (event: T) => Promise<void>;
class InMemoryEventBus {
private handlers = new Map<string, EventHandler<DomainEvent>[]>();
subscribe<T extends DomainEvent>(eventType: string, handler: EventHandler<T>): void {
const existing = this.handlers.get(eventType) ?? [];
this.handlers.set(eventType, [...existing, handler as EventHandler<DomainEvent>]);
}
async publish(event: DomainEvent): Promise<void> {
const eventHandlers = this.handlers.get(event.eventType) ?? [];
// Pubblica in parallelo a tutti i subscriber
await Promise.allSettled(
eventHandlers.map((handler) => handler(event))
);
}
async publishAll(events: DomainEvent[]): Promise<void> {
for (const event of events) {
await this.publish(event);
}
}
}
// Utilizzo:
const eventBus = new InMemoryEventBus();
// Inventory Service si registra per OrderPlaced
eventBus.subscribe<OrderPlacedEvent>('OrderPlaced', async (event) => {
console.log(`Decrementing inventory for order ${event.payload.orderId}`);
for (const item of event.payload.items) {
await inventoryService.decrement(item.productId, item.quantity);
}
});
// Email Service si registra per OrderPlaced
eventBus.subscribe<OrderPlacedEvent>('OrderPlaced', async (event) => {
await emailService.sendOrderConfirmation(
event.payload.customerId,
event.payload.orderId
);
});
// Order Service pubblica l'evento (non conosce i subscriber)
await eventBus.publish(createOrderPlacedEvent(placedOrder));
メッセージ バス、イベント バス、メッセージ キュー: 違い
これらの用語は多くの場合同じ意味で使用されますが、特定の意味があります。
- メッセージキュー: ポイントツーポイントキュー。メッセージが配信されるのは、 唯一 消費者。例: SQS スタンダードキュー
- イベントバス: ブロードキャスト先 みんな 購読者たち。各サブスクライバーはイベントのコピーを受け取ります。例: AWS EventBridge、SNS トピック
- メッセージバス: キューとトピックの両方を含む一般的な用語。実際: メッセージ ルーティングを管理するブローカー (RabbitMQ、Kafka)
// Esempio: stessa logica su AWS SQS + SNS (architettura fan-out comune)
// Pattern fan-out: SNS Topic + SQS Queue per ogni consumer
// 1. Pubblica su SNS Topic
// 2. SNS consegna a tutte le SQS Queue sottoscritte
// 3. Ogni servizio legge dalla propria SQS Queue indipendentemente
// Terraform per il fan-out pattern:
resource "aws_sns_topic" "order_events" {
name = "order-events"
}
resource "aws_sqs_queue" "inventory_queue" {
name = "inventory-order-events"
}
resource "aws_sqs_queue" "email_queue" {
name = "email-order-events"
}
resource "aws_sns_topic_subscription" "inventory" {
topic_arn = aws_sns_topic.order_events.arn
protocol = "sqs"
endpoint = aws_sqs_queue.inventory_queue.arn
}
resource "aws_sns_topic_subscription" "email" {
topic_arn = aws_sns_topic.order_events.arn
protocol = "sqs"
endpoint = aws_sqs_queue.email_queue.arn
}
CloudEvents: イベント スキーマの標準
クラウドイベント およびその構造を標準化する CNCF 仕様。 異なるシステム間のイベント。これを採用すると相互運用性が促進され、ツールが簡素化されます。 監視とデバッグ:
// CloudEvents v1.0 - struttura standard
{
"specversion": "1.0",
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "com.company.order.placed", // Reverse DNS + evento
"source": "/orders-service/v1", // URI del servizio sorgente
"subject": "order-789", // identificativo della risorsa
"time": "2026-03-20T10:30:00Z", // timestamp ISO 8601
"datacontenttype": "application/json",
"dataschema": "https://schemas.company.com/order/placed/v1.json",
"data": {
"orderId": "order-789",
"customerId": "cust-123",
"totalAmount": 150.00,
"currency": "EUR"
}
}
// TypeScript: creare un CloudEvent con la SDK ufficiale
import { CloudEvent } from "cloudevents";
const event = new CloudEvent({
specversion: "1.0",
type: "com.company.order.placed",
source: "/orders-service/v1",
subject: `order-${orderId}`,
datacontenttype: "application/json",
dataschema: "https://schemas.company.com/order/placed/v1.json",
data: {
orderId: order.id,
customerId: order.customerId,
totalAmount: order.totalAmount,
currency: order.currency,
},
});
// Valida il CloudEvent prima di pubblicarlo
if (!event.source || !event.type) {
throw new Error("CloudEvent validation failed: missing required fields");
}
イベントのバージョン管理
イベントは複数のサービスによって個別に消費されます。パターンを変更する バージョニング戦略を持たないイベントは消費者を混乱させます。主なパターン:
// Pattern 1: Versioning nel tipo evento
// Vecchi consumer continuano a ricevere v1, nuovi consumer si registrano per v2
eventBus.subscribe('OrderPlaced.v1', handleOrderPlacedV1);
eventBus.subscribe('OrderPlaced.v2', handleOrderPlacedV2);
// Pattern 2: Backward-compatible changes (aggiunta di campi opzionali)
// SAFE: aggiungere nuovi campi opzionali (consumer ignorano i campi sconosciuti)
interface OrderPlacedEventV1 {
orderId: string;
customerId: string;
totalAmount: number;
}
interface OrderPlacedEventV2 extends OrderPlacedEventV1 {
// Aggiunto in V2: opzionale, backward-compatible
estimatedDeliveryDate?: string;
loyaltyPointsEarned?: number;
}
// Pattern 3: Parallel publishing (per breaking changes)
// Pubblica sia v1 che v2 per un periodo di transizione
async function publishOrderPlaced(order: Order): Promise<void> {
const v1Event = createOrderPlacedV1(order);
const v2Event = createOrderPlacedV2(order);
await Promise.all([
eventBus.publish(v1Event), // per consumer legacy
eventBus.publish(v2Event), // per consumer aggiornati
]);
}
// NEVER: rimuovere campi, cambiare tipi, rinominare campi obbligatori
// -> breaking change: migra prima tutti i consumer poi rimuovi v1
EDA の利点とトレードオフ
EDA を使用する場合
- 必要なデカップリング: パブリッシャーを変更せずに新しいコンシューマーを追加したい場合
- 独立したスケーラビリティ: 異なる負荷を持つ異なるコンシューマーが個別にスケーリング
- 監査証跡: 不変イベントは、システム内で起こったすべての自然なログです。
- 失敗に対する回復力: コンシューマがダウンした場合、メッセージ バスは復帰するまでメッセージを保持します。
- システム間の統合: 標準イベントを介して通信する異種システム
EDA を使用してはいけない場合
- 即時の対応が必要: ユーザーが同期結果を待たなければならない場合、EDA は不必要な遅延と複雑さを追加します。
- シンプルなシステム: 機能がほとんどないモノリスでは、メッセージ ブローカーのオーバーヘッドの恩恵を受けられません。
- 単純な分散トランザクション: 複数のサービスにわたってアトミックである必要がある操作の場合、EDA には Saga パターン (高度な複雑さ) が必要です
- EDA の経験のない小規模チーム: 学習曲線は重要です。 REST から開始し、必要に応じて EDA を追加します
完全なフロー: eコマースの例
// Flusso completo EDA per un ordine e-commerce
// 1. Order Service: riceve HTTP POST /orders
// 2. Valida, persiste, pubblica evento
class OrderService {
constructor(
private readonly orderRepo: OrderRepository,
private readonly eventBus: EventBus
) {}
async placeOrder(dto: PlaceOrderDto): Promise<Order> {
// Logica business: crea l'ordine
const order = Order.create(dto);
// Persisti nel database
await this.orderRepo.save(order);
// Pubblica gli eventi generati dall'aggregato
const events = order.getUncommittedEvents();
await this.eventBus.publishAll(events);
order.clearEvents();
return order;
}
}
// 3. Inventory Service: ascolta OrderPlaced
// - Scala indipendentemente con 5 consumer paralleli
// - Se giu, i messaggi si accumulano nella queue
// 4. Email Service: ascolta OrderPlaced
// - Invia email di conferma
// - Se fallisce, il messaggio va in DLQ per retry
// 5. Loyalty Service: ascolta OrderPlaced
// - Calcola e aggiunge punti fedeltà
// - Pubblica LoyaltyPointsEarned
// 6. Analytics Service: ascolta OrderPlaced + LoyaltyPointsEarned
// - Aggiorna le metriche in tempo reale
// Il servizio Order non sa niente di tutto questo!
// Aggiungere un nuovo consumer = zero modifiche al publisher
結論と次のステップ
EDA はパラダイム シフトです。サービスが互いに「呼び出し」合うシステムから、 サービスが「イベントを介して通信する」システム。デカップリングゲイン、 スケーラビリティと回復力は本物ですが、管理という新たな課題に直面する必要があります。 のエラーが非同期になり、デバッグには相関 ID と分散トレースが必要になります。 一貫性は「結果的」でなければなりません。
このシリーズの次の記事では、EDA を実現する高度なパターンについて説明します。 運用環境での実行可能: 不変状態のイベント ソーシング、分離のための CQRS 読み取り/書き込み、分散トランザクション用の Saga、および AWS ツール (EventBridge、SQS、SNS) クラウド環境に実装します。
イベント駆動型アーキテクチャ シリーズの今後の記事
関連シリーズ
- Apache Kafka とストリーム処理 — 大容量 EDA システムのバックボーンとしての Kafka
- 大規模な Kubernetes — Kubernetes 上で EDA マイクロサービスを調整します
- 実践的なソフトウェアアーキテクチャ — EDA と REST、モノリスとマイクロサービスの場合







