TypeScript を使用した CQRS とイベント ソーシング
CQRS (コマンドクエリの責任分担) e イベントソーシング これらは 2 つのアーキテクチャ パターンであり、組み合わせることで根本的に異なる方法を提供します。 ソフトウェアシステムを設計します。同じデータ モデルの読み取りと書き込みの代わりに、CQRS 書き込み操作 (コマンド) と読み取り操作 (クエリ) を分離します。イベントソーシング、 次に、従来の状態の永続性を順序付けられたシーケンスに置き換えます。 システム内で起こったすべてを記述する不変イベントの集まり。
この記事では、実装を使用して両方のパターンを詳しく調査します。 TypeScript の具象。注文管理システムを例として説明します。 コマンド、クエリ、イベント ストア、プロジェクション、スナップショット。
何を学ぶか
- CQRS パターン: コマンドとクエリの分離
- TypeScript のコマンド バスとクエリ バス
- イベントソーシング: イベントベースの永続性
- イベントストアの実装
- 投影と読み取りモデル
- パフォーマンスを最適化するためのスナップショット
- CQRS とイベント ソーシングを使用する場合 (および使用しない場合)
CQRS: コマンドとクエリの分離
CQRS の背後にある原理は単純です。 データの書き込みに使用されるモデル 読むために使用したものと同じである必要はありません。伝統的な建築の中で、 単一のモデル (多くの場合、単一のテーブル) が書き込み操作と書き込み操作の両方を処理します。 読んでるもの。 CQRS は 2 つの異なるモデルを導入し、それぞれが最適化されています。 自分自身の目的。
コマンド
Un 指示 システムの状態を変更する意図を表します。 これは、操作を実行するために必要なすべてのデータを含む不変オブジェクトです。 コマンドはデータを返しません。コマンドの唯一の効果は状態を変更することです。
// cqrs/command.ts
export interface Command {
readonly type: string;
}
export interface CommandHandler<T extends Command> {
execute(command: T): Promise<void>;
}
// commands/create-order.command.ts
export class CreateOrderCommand implements Command {
readonly type = 'CreateOrder';
constructor(
public readonly orderId: string,
public readonly customerId: string,
public readonly items: {
productId: string;
productName: string;
quantity: number;
unitPrice: number;
}[]
) {}
}
// commands/handlers/create-order.handler.ts
export class CreateOrderHandler
implements CommandHandler<CreateOrderCommand> {
constructor(private readonly eventStore: EventStore) {}
async execute(command: CreateOrderCommand): Promise<void> {
// Verifica che l'ordine non esista gia
const existing = await this.eventStore.getEvents(command.orderId);
if (existing.length > 0) {
throw new Error('Ordine già esistente');
}
// Crea l'aggregate e genera gli eventi
const order = Order.create(
command.orderId,
command.customerId,
command.items
);
// Salva gli eventi generati
const events = order.pullUncommittedEvents();
await this.eventStore.appendEvents(command.orderId, events, 0);
}
}
クエリ
Una クエリ データのリクエストを表します。のステータスは変わりません システムを実行し、特定の使用例に最適化された結果を返します。モデル (読み取りモデル) は非正規化、プリコンパイル、正確に構造化可能 UI の必要に応じて。
// cqrs/query.ts
export interface Query {
readonly type: string;
}
export interface QueryHandler<TQuery extends Query, TResult> {
execute(query: TQuery): Promise<TResult>;
}
// queries/get-order-summary.query.ts
export class GetOrderSummaryQuery implements Query {
readonly type = 'GetOrderSummary';
constructor(public readonly orderId: string) {}
}
export interface OrderSummaryReadModel {
orderId: string;
customerName: string;
status: string;
itemCount: number;
totalAmount: number;
createdAt: string;
lastUpdatedAt: string;
}
// queries/handlers/get-order-summary.handler.ts
export class GetOrderSummaryHandler
implements QueryHandler<GetOrderSummaryQuery, OrderSummaryReadModel> {
constructor(private readonly readDb: ReadDatabase) {}
async execute(
query: GetOrderSummaryQuery
): Promise<OrderSummaryReadModel> {
const summary = await this.readDb.findOne<OrderSummaryReadModel>(
'order_summaries',
{ orderId: query.orderId }
);
if (!summary) {
throw new Error('Ordine non trovato');
}
return summary;
}
}
コマンドバスとクエリバス
Il バス これは、コマンド/クエリをそれぞれのハンドラーに接続するメカニズムです。 これは集中ディスパッチャとして機能し、コマンドを送信する者とそれを管理する者を切り離します。
// cqrs/command-bus.ts
export class CommandBus {
private handlers = new Map<string, CommandHandler<any>>();
register<T extends Command>(
commandType: string,
handler: CommandHandler<T>
): void {
if (this.handlers.has(commandType)) {
throw new Error(`Handler già registrato per: ${commandType}`);
}
this.handlers.set(commandType, handler);
}
async dispatch(command: Command): Promise<void> {
const handler = this.handlers.get(command.type);
if (!handler) {
throw new Error(`Nessun handler per il comando: ${command.type}`);
}
await handler.execute(command);
}
}
// cqrs/query-bus.ts
export class QueryBus {
private handlers = new Map<string, QueryHandler<any, any>>();
register<TQuery extends Query, TResult>(
queryType: string,
handler: QueryHandler<TQuery, TResult>
): void {
this.handlers.set(queryType, handler);
}
async dispatch<TResult>(query: Query): Promise<TResult> {
const handler = this.handlers.get(query.type);
if (!handler) {
throw new Error(`Nessun handler per la query: ${query.type}`);
}
return handler.execute(query);
}
}
イベント ソーシング: イベントのシーケンスとしての状態
イベント ソーシングは、永続性に関する従来の考え方を覆します。代わりに 保存してください 現在の状態 エンティティのそれぞれを保存します イベント それがその状態を引き起こしました。現在の状態はすべて再生することで取得されます。 時系列に沿った出来事。
イベントソーシングの利点
| アドバンテージ | 説明 |
|---|---|
| 完全な監査証跡 | すべての変更は記録されます。いつでも状態を再構築できる |
| 高度なデバッグ | イベントを再現することで、どのようにして現在の状態に至ったのかを正確に理解できます。 |
| 複数の投影 | 同じイベントから、さまざまなニーズに応じてさまざまな読み取りモデルを構築できます |
| 時間的進化 | 過去の出来事を再処理して、遡及的に新しい予測を作成できます |
| データ損失なし | イベントは不変であり、追加のみです。何も削除されない |
イベントソーシング用のドメインイベント
// events/base-event.ts
export interface DomainEvent {
readonly eventType: string;
readonly aggregateId: string;
readonly occurredOn: Date;
readonly version: number;
readonly payload: Record<string, unknown>;
}
// events/order-events.ts
export class OrderCreatedEvent implements DomainEvent {
readonly eventType = 'OrderCreated';
readonly occurredOn = new Date();
constructor(
public readonly aggregateId: string,
public readonly version: number,
public readonly payload: {
customerId: string;
items: {
productId: string;
productName: string;
quantity: number;
unitPrice: number;
}[];
}
) {}
}
export class OrderConfirmedEvent implements DomainEvent {
readonly eventType = 'OrderConfirmed';
readonly occurredOn = new Date();
constructor(
public readonly aggregateId: string,
public readonly version: number,
public readonly payload: {
confirmedAt: string;
totalAmount: number;
}
) {}
}
export class OrderItemAddedEvent implements DomainEvent {
readonly eventType = 'OrderItemAdded';
readonly occurredOn = new Date();
constructor(
public readonly aggregateId: string,
public readonly version: number,
public readonly payload: {
productId: string;
productName: string;
quantity: number;
unitPrice: number;
}
) {}
}
export class OrderCancelledEvent implements DomainEvent {
readonly eventType = 'OrderCancelled';
readonly occurredOn = new Date();
constructor(
public readonly aggregateId: string,
public readonly version: number,
public readonly payload: {
reason: string;
cancelledAt: string;
}
) {}
}
イベントソーシングによる集約
イベント ソース システムでは、アグリゲートは状態を直接維持するのではなく、 イベントを適用して再構築します。あらゆるビジネス運営が新たなイベントを生成します これらは蓄積され、イベント ストアに保存されます。
// aggregates/order.aggregate.ts
export class Order {
private _id: string = '';
private _customerId: string = '';
private _status: OrderStatus = OrderStatus.CREATED;
private _items: OrderItem[] = [];
private _version: number = 0;
private _uncommittedEvents: DomainEvent[] = [];
// Factory method: crea un nuovo ordine
static create(
orderId: string,
customerId: string,
items: { productId: string; productName: string;
quantity: number; unitPrice: number }[]
): Order {
const order = new Order();
order.apply(new OrderCreatedEvent(orderId, 1, {
customerId,
items,
}));
return order;
}
// Ricostruzione da eventi storici
static fromHistory(events: DomainEvent[]): Order {
const order = new Order();
for (const event of events) {
order.applyFromHistory(event);
}
return order;
}
confirm(): void {
if (this._status !== OrderStatus.CREATED) {
throw new Error('Solo ordini in stato CREATED possono essere confermati');
}
if (this._items.length === 0) {
throw new Error('Impossibile confermare un ordine vuoto');
}
const total = this._items.reduce(
(sum, item) => sum + item.quantity * item.unitPrice, 0
);
this.apply(new OrderConfirmedEvent(this._id, this._version + 1, {
confirmedAt: new Date().toISOString(),
totalAmount: total,
}));
}
cancel(reason: string): void {
if (this._status === OrderStatus.SHIPPED) {
throw new Error('Impossibile cancellare un ordine già spedito');
}
this.apply(new OrderCancelledEvent(this._id, this._version + 1, {
reason,
cancelledAt: new Date().toISOString(),
}));
}
// Applica un nuovo evento (genera uncommitted event)
private apply(event: DomainEvent): void {
this.mutate(event);
this._uncommittedEvents.push(event);
}
// Applica un evento storico (senza generare uncommitted)
private applyFromHistory(event: DomainEvent): void {
this.mutate(event);
}
// Modifica lo stato interno in base all'evento
private mutate(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderCreated':
this._id = event.aggregateId;
this._customerId = (event.payload as any).customerId;
this._items = (event.payload as any).items.map((i: any) => ({
productId: i.productId,
productName: i.productName,
quantity: i.quantity,
unitPrice: i.unitPrice,
}));
this._status = OrderStatus.CREATED;
break;
case 'OrderConfirmed':
this._status = OrderStatus.CONFIRMED;
break;
case 'OrderItemAdded':
this._items.push(event.payload as any);
break;
case 'OrderCancelled':
this._status = OrderStatus.CANCELLED;
break;
}
this._version = event.version;
}
pullUncommittedEvents(): DomainEvent[] {
const events = [...this._uncommittedEvents];
this._uncommittedEvents = [];
return events;
}
get id(): string { return this._id; }
get version(): number { return this._version; }
get status(): OrderStatus { return this._status; }
}
イベントストア
L'イベントストア イベントデータベースです。各イベントストリームは対応します 集合体に。基本的な操作は 2 つあります。新しいイベントをハングすることと、イベントを読み取ることです。 ストリームのイベント。イベント ストアは注文を保証し、楽観的なイベントをサポートします。 バージョン管理による同時実行性。
// infrastructure/event-store.ts
export interface EventStore {
appendEvents(
streamId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void>;
getEvents(streamId: string): Promise<DomainEvent[]>;
getEventsFromVersion(
streamId: string,
fromVersion: number
): Promise<DomainEvent[]>;
getAllEvents(): Promise<DomainEvent[]>;
}
export class InMemoryEventStore implements EventStore {
private streams = new Map<string, DomainEvent[]>();
private allEvents: DomainEvent[] = [];
private subscribers: ((event: DomainEvent) => void)[] = [];
async appendEvents(
streamId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void> {
const currentEvents = this.streams.get(streamId) || [];
const currentVersion = currentEvents.length > 0
? currentEvents[currentEvents.length - 1].version
: 0;
// Optimistic concurrency check
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Conflitto di concorrenza: versione attesa ${expectedVersion}, ` +
`versione corrente ${currentVersion}`
);
}
const updatedStream = [...currentEvents, ...events];
this.streams.set(streamId, updatedStream);
this.allEvents.push(...events);
// Notifica i subscriber (per le proiezioni)
for (const event of events) {
this.subscribers.forEach(sub => sub(event));
}
}
async getEvents(streamId: string): Promise<DomainEvent[]> {
return this.streams.get(streamId) || [];
}
async getEventsFromVersion(
streamId: string,
fromVersion: number
): Promise<DomainEvent[]> {
const events = this.streams.get(streamId) || [];
return events.filter(e => e.version > fromVersion);
}
async getAllEvents(): Promise<DomainEvent[]> {
return [...this.allEvents];
}
subscribe(handler: (event: DomainEvent) => void): void {
this.subscribers.push(handler);
}
}
投影と読み取りモデル
Le 投影 それらはイベントを次のようなものに変換するメカニズムです。 モデルの読み取り 読書用に最適化されています。各投影は詳細を聞きます イベントをタイプし、非正規化読み取りモデルを更新します。このモデルは考え抜かれています 質問にできるだけ効率的に答えるため。
// projections/order-summary.projection.ts
export interface OrderSummaryReadModel {
orderId: string;
customerId: string;
status: string;
itemCount: number;
totalAmount: number;
createdAt: string;
updatedAt: string;
}
export class OrderSummaryProjection {
private summaries = new Map<string, OrderSummaryReadModel>();
constructor(eventStore: EventStore) {
// Sottoscrizione agli eventi rilevanti
eventStore.subscribe((event) => this.handleEvent(event));
}
private handleEvent(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderCreated':
this.onOrderCreated(event);
break;
case 'OrderConfirmed':
this.onOrderConfirmed(event);
break;
case 'OrderItemAdded':
this.onOrderItemAdded(event);
break;
case 'OrderCancelled':
this.onOrderCancelled(event);
break;
}
}
private onOrderCreated(event: DomainEvent): void {
const payload = event.payload as any;
const items = payload.items || [];
const totalAmount = items.reduce(
(sum: number, i: any) => sum + i.quantity * i.unitPrice, 0
);
this.summaries.set(event.aggregateId, {
orderId: event.aggregateId,
customerId: payload.customerId,
status: 'created',
itemCount: items.length,
totalAmount,
createdAt: event.occurredOn.toISOString(),
updatedAt: event.occurredOn.toISOString(),
});
}
private onOrderConfirmed(event: DomainEvent): void {
const summary = this.summaries.get(event.aggregateId);
if (summary) {
summary.status = 'confirmed';
summary.totalAmount = (event.payload as any).totalAmount;
summary.updatedAt = event.occurredOn.toISOString();
}
}
private onOrderItemAdded(event: DomainEvent): void {
const summary = this.summaries.get(event.aggregateId);
if (summary) {
const payload = event.payload as any;
summary.itemCount += 1;
summary.totalAmount += payload.quantity * payload.unitPrice;
summary.updatedAt = event.occurredOn.toISOString();
}
}
private onOrderCancelled(event: DomainEvent): void {
const summary = this.summaries.get(event.aggregateId);
if (summary) {
summary.status = 'cancelled';
summary.updatedAt = event.occurredOn.toISOString();
}
}
// Query methods
getById(orderId: string): OrderSummaryReadModel | undefined {
return this.summaries.get(orderId);
}
getByCustomer(customerId: string): OrderSummaryReadModel[] {
return Array.from(this.summaries.values())
.filter(s => s.customerId === customerId);
}
getByStatus(status: string): OrderSummaryReadModel[] {
return Array.from(this.summaries.values())
.filter(s => s.status === status);
}
}
スナップショット: パフォーマンスの最適化
時間が経つにつれて、集計のイベント数が大幅に増加する可能性があります。 何千ものイベントから状態を再構築するには費用がかかります。の スナップショット アグリゲートの状態のスナップショットを定期的に保存することで、この問題を解決します。 再構築は最新のスナップショットから開始され、後続のイベントのみが適用されます。
// infrastructure/snapshot-store.ts
export interface Snapshot {
aggregateId: string;
version: number;
state: Record<string, unknown>;
createdAt: Date;
}
export interface SnapshotStore {
save(snapshot: Snapshot): Promise<void>;
getLatest(aggregateId: string): Promise<Snapshot | null>;
}
export class InMemorySnapshotStore implements SnapshotStore {
private snapshots = new Map<string, Snapshot>();
async save(snapshot: Snapshot): Promise<void> {
this.snapshots.set(snapshot.aggregateId, snapshot);
}
async getLatest(aggregateId: string): Promise<Snapshot | null> {
return this.snapshots.get(aggregateId) || null;
}
}
// repository/order.repository.ts
export class EventSourcedOrderRepository {
private static readonly SNAPSHOT_INTERVAL = 50;
constructor(
private readonly eventStore: EventStore,
private readonly snapshotStore: SnapshotStore
) {}
async getById(orderId: string): Promise<Order> {
// 1. Cerca lo snapshot più recente
const snapshot = await this.snapshotStore.getLatest(orderId);
let events: DomainEvent[];
let order: Order;
if (snapshot) {
// 2a. Ricostruisci dallo snapshot + eventi successivi
order = Order.fromSnapshot(snapshot.state);
events = await this.eventStore.getEventsFromVersion(
orderId,
snapshot.version
);
} else {
// 2b. Ricostruisci da tutti gli eventi
events = await this.eventStore.getEvents(orderId);
order = Order.fromHistory(events);
return order;
}
// 3. Applica gli eventi mancanti
for (const event of events) {
order.applyFromHistory(event);
}
return order;
}
async save(order: Order): Promise<void> {
const uncommitted = order.pullUncommittedEvents();
const expectedVersion = order.version - uncommitted.length;
await this.eventStore.appendEvents(
order.id,
uncommitted,
expectedVersion
);
// Crea snapshot se necessario
if (order.version % EventSourcedOrderRepository.SNAPSHOT_INTERVAL === 0) {
await this.snapshotStore.save({
aggregateId: order.id,
version: order.version,
state: order.toSnapshot(),
createdAt: new Date(),
});
}
}
}
完全なフロー: コマンドからクエリまで
CQRS + イベント ソーシング システムの操作の完全なフローを次から見てみましょう。 更新された読み取りモデルに対するクエリが実行されるまでコマンドを受信します。
// app/order-service.ts
export class OrderService {
constructor(
private readonly commandBus: CommandBus,
private readonly queryBus: QueryBus
) {}
// Lato scrittura: invia un comando
async createOrder(
customerId: string,
items: { productId: string; productName: string;
quantity: number; unitPrice: number }[]
): Promise<string> {
const orderId = crypto.randomUUID();
await this.commandBus.dispatch(
new CreateOrderCommand(orderId, customerId, items)
);
return orderId;
}
async confirmOrder(orderId: string): Promise<void> {
await this.commandBus.dispatch(
new ConfirmOrderCommand(orderId)
);
}
// Lato lettura: esegui una query
async getOrderSummary(orderId: string): Promise<OrderSummaryReadModel> {
return this.queryBus.dispatch<OrderSummaryReadModel>(
new GetOrderSummaryQuery(orderId)
);
}
async getCustomerOrders(
customerId: string
): Promise<OrderSummaryReadModel[]> {
return this.queryBus.dispatch<OrderSummaryReadModel[]>(
new GetCustomerOrdersQuery(customerId)
);
}
}
// Configurazione iniziale
function bootstrap(): OrderService {
// Event Store
const eventStore = new InMemoryEventStore();
const snapshotStore = new InMemorySnapshotStore();
const orderRepo = new EventSourcedOrderRepository(eventStore, snapshotStore);
// Proiezioni
const orderSummaryProjection = new OrderSummaryProjection(eventStore);
const readDb = new InMemoryReadDatabase(orderSummaryProjection);
// Command Bus
const commandBus = new CommandBus();
commandBus.register('CreateOrder', new CreateOrderHandler(eventStore));
commandBus.register('ConfirmOrder', new ConfirmOrderHandler(orderRepo));
// Query Bus
const queryBus = new QueryBus();
queryBus.register('GetOrderSummary', new GetOrderSummaryHandler(readDb));
queryBus.register('GetCustomerOrders', new GetCustomerOrdersHandler(readDb));
return new OrderService(commandBus, queryBus);
}
イベントソーシングを使用しない CQRS
強調することが重要です CQRS とイベント ソーシングは独立したパターンです。 イベント ソーシングなしで CQRS を採用し、従来のデータベースを維持して、 同期または非同期プロジェクションを介して個別の読み取りモデルを作成および作成します。
| アプローチ | 複雑 | 利点 | 制限事項 |
|---|---|---|---|
| 単純な CQRS | 平均 | 最適化された読み取りモデル、明確な分離 | ネイティブ監査証跡なし、一貫性の可能性あり |
| CQRS + イベントソーシング | 高い | 監査証跡、状態の再構築、複数の予測 | 管理の複雑さ、学習曲線 |
| 伝統的な建築 | 低い | シンプルで使い慣れた成熟したツール | 同じモデルで読み取りと書き込みができるため、柔軟性が低下します |
CQRS とイベント ソーシングを使用する場合
理想的なシナリオ
- 監査証跡の要件: 規制対象セクター (金融、ヘルスケア、コンプライアンス)
- 非対称の読み取りと書き込み: 書き込みよりも読み取りの方がはるかに多い (またはその逆)
- 複数の投影: 非常に異なる形式で読み取る必要があるデータ
- 連携システム:複数のユーザーが同じデータを同時に編集します
- イベント駆動型アーキテクチャ: イベントを介して通信するシステムとの統合
避けるべき場合
- シンプルな CRUD アプリケーション: 追加された複雑さは利点を正当化しません
- 経験のない小規模チーム: 学習曲線は重要です
- 強力な一貫性要件: CQRS では、読み取りと書き込みの間に可能な一貫性が導入されます。
- プロトタイプと MVP: スケーラビリティよりも初期の開発スピードが重要
課題と考慮事項
| チャレンジ | 説明 | 緩和戦略 |
|---|---|---|
| 可能な一貫性 | 読み取りモデルは最後の書き込みをすぐに反映しない可能性があります | 楽観的な UI、ポーリング、リアルタイム通知 |
| イベントの進化 | 出来事の構造は時間とともに変化する | イベントのバージョニング、アップキャスト、スキーマ レジストリ |
| 投影の再構築 | 数百万のイベントの再処理には時間がかかる場合があります | スナップショット、並列処理、増分投影 |
| 運用の複雑さ | 監視および保守するコンポーネントの増加 | 可観測性、構造化ロギング、ヘルスチェック |
結論
CQRS とイベント ソーシングは、規模に関する特定の問題を解決する強力なツールです。 監査と柔軟性。 CQRS は読み取りパターンと書き込みパターンを分離し、次のことを可能にします。 それぞれを個別に最適化します。イベントソーシングは永続性を 事実の不変の記録、タイムトラベルのデバッグなどの可能性を開く 遡及的投影とネイティブ監査証跡。
他の高度なパターンと同様に、コンテキストを注意深く評価する必要があります。私はそうではありません あらゆるアーキテクチャ上の問題を適切なシナリオで解決します - 複雑なシステム、 イベント駆動型ドメイン、コンプライアンス要件 - アーキテクチャが持つ利点を提供します。 伝統的なものは太刀打ちできません。重要なのは、単純な形式の CQRS から始めることです そして、付加価値が明確で測定可能な場合にのみイベント ソーシングを導入します。







