イベントソーシング: 不変の一連のイベントとしての状態
銀行の仕組みを考えてみましょう: 口座残高は「データベース内の数字」ではありません トランザクションごとに更新されます。」これまでに行われたすべてのトランザクションの結果: 入金、引き出し、送金、利息。昨日の午後2時の残高を知りたい場合は、 その時点までのすべてのトランザクションを再実行することで再構築できます。これは まさにその原理イベントソーシング.
エンティティの現在の状態をデータベースに保存し、毎回上書きするのではなく 編集、 イベントソーシング そこに至るまでの一連の出来事を保存する その状態に。イベントを最初から「再生」することで現在の状態が得られます。 その結果、完全な監査証跡、タイムトラベルクエリの可能性を備えたシステムが誕生しました。 そして、書くこと(イベントの追加)と読むこと(イベントへの投影)との間の自然な分離。
何を学ぶか
- イベント ソーシング アーキテクチャ: イベント ストア、集約、イベント ストリーム
- TypeScript でイベントを含む集計を実装する
- イベント ストア: イベント データベースを構造化する方法
- イベント リプレイ: イベントから状態を最初から再構築します。
- タイムトラベル クエリ: 過去の任意の瞬間における集計のステータス
- EventStoreDB: イベント ソーシング用に設計されたデータベース
- イベント ソーシングのトレードオフ: いつ採用し、いつ回避するか
クラシック モデルとイベント ソーシング
注文管理システムの 2 つのアプローチを比較してみましょう。
| 待ってます | CRUD クラシック | イベントソーシング |
|---|---|---|
| 保存されるもの | 現在のステータス(UPDATEレコード) | イベントのシーケンス (INSERT のみ) |
| 監査証跡 | いいえ (または別のテーブルを使用) | はい、ネイティブかつ完全です |
| 過去の状態 | いいえ (現在のステータスのみが表示されます) | はい、T までのイベントをリプレイします |
| 複雑 | 低い | 中~高 |
| パフォーマンスの読み取り | 高 (直接クエリ) | スクリーニングが必要です (CQRS を参照) |
| パフォーマンス書き込み | 高い | 高 (追加のみ) |
| デバッグ | ハード(ストーリーのないエンド状態) | 簡単(イベントリプレイ) |
基本的な概念
集計: 一貫性の単位
Un 集計 イベント ソーシングの一貫性境界。すべての 集計への変更は、イベントを生成するメソッドを通じて行われます。状態は常に イベントを順番に適用して再構築します。
// TypeScript: implementazione di un Order Aggregate con Event Sourcing
// 1. Definisci gli eventi dell'Aggregate
type OrderEvent =
| { type: 'OrderCreated'; orderId: string; customerId: string; createdAt: string }
| { type: 'ItemAdded'; productId: string; quantity: number; unitPrice: number }
| { type: 'ItemRemoved'; productId: string }
| { type: 'OrderConfirmed'; confirmedAt: string }
| { type: 'OrderCancelled'; reason: string; cancelledAt: string };
// 2. Lo stato dell'Aggregate
interface OrderState {
id: string;
customerId: string;
items: Map<string, { quantity: number; unitPrice: number }>;
status: 'Draft' | 'Confirmed' | 'Cancelled';
totalAmount: number;
createdAt: string;
}
// 3. Il reducer: applica un evento allo stato (pura funzione)
function applyOrderEvent(state: OrderState | null, event: OrderEvent): OrderState {
switch (event.type) {
case 'OrderCreated':
return {
id: event.orderId,
customerId: event.customerId,
items: new Map(),
status: 'Draft',
totalAmount: 0,
createdAt: event.createdAt,
};
case 'ItemAdded': {
const newItems = new Map(state!.items);
newItems.set(event.productId, {
quantity: event.quantity,
unitPrice: event.unitPrice,
});
const total = Array.from(newItems.values())
.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0);
return { ...state!, items: newItems, totalAmount: total };
}
case 'ItemRemoved': {
const newItems = new Map(state!.items);
newItems.delete(event.productId);
const total = Array.from(newItems.values())
.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0);
return { ...state!, items: newItems, totalAmount: total };
}
case 'OrderConfirmed':
return { ...state!, status: 'Confirmed' };
case 'OrderCancelled':
return { ...state!, status: 'Cancelled' };
default:
return state!;
}
}
// 4. L'Aggregate: genera eventi in risposta a command
class OrderAggregate {
private state: OrderState | null = null;
private uncommittedEvents: OrderEvent[] = [];
private version: number = 0;
// Ricostruisce l'aggregate da una sequenza di eventi storici
static rehydrate(events: OrderEvent[]): OrderAggregate {
const aggregate = new OrderAggregate();
for (const event of events) {
aggregate.apply(event);
aggregate.version++;
}
aggregate.uncommittedEvents = []; // pulizia: gli eventi storici non sono "nuovi"
return aggregate;
}
// Metodo business: crea un nuovo ordine
create(orderId: string, customerId: string): void {
if (this.state !== null) {
throw new Error('Order already created');
}
this.raiseEvent({
type: 'OrderCreated',
orderId,
customerId,
createdAt: new Date().toISOString(),
});
}
// Metodo business: aggiunge un articolo
addItem(productId: string, quantity: number, unitPrice: number): void {
if (this.state?.status !== 'Draft') {
throw new Error('Cannot add items to a non-draft order');
}
this.raiseEvent({ type: 'ItemAdded', productId, quantity, unitPrice });
}
// Metodo business: conferma l'ordine
confirm(): void {
if (this.state?.status !== 'Draft') {
throw new Error('Only draft orders can be confirmed');
}
if (this.state.items.size === 0) {
throw new Error('Cannot confirm empty order');
}
this.raiseEvent({ type: 'OrderConfirmed', confirmedAt: new Date().toISOString() });
}
getUncommittedEvents(): OrderEvent[] {
return [...this.uncommittedEvents];
}
clearUncommittedEvents(): void {
this.uncommittedEvents = [];
}
getState(): OrderState {
if (!this.state) throw new Error('Order not initialized');
return this.state;
}
get currentVersion(): number {
return this.version;
}
private raiseEvent(event: OrderEvent): void {
this.apply(event);
this.uncommittedEvents.push(event);
this.version++;
}
private apply(event: OrderEvent): void {
this.state = applyOrderEvent(this.state, event);
}
}
イベント ストア: イベント データベース
Un イベントストア 書き込み用に最適化された追加専用データベースと、 一連の出来事を読む。基本的な構造と、イベントストリーム: 単一の集約 (ID によって識別される) のイベントの順序付けられたシーケンス。
// Schema dell'Event Store con PostgreSQL
CREATE TABLE event_store (
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL, -- "Order-{orderId}"
event_type TEXT NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
version BIGINT NOT NULL, -- posizione nell'event stream
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Previene concurrent writes con lo stesso version number
CONSTRAINT uq_stream_version UNIQUE (stream_id, version)
);
-- Index per lettura efficiente di uno stream
CREATE INDEX idx_event_store_stream ON event_store (stream_id, version ASC);
-- Index per proiezioni su tipo di evento
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);
イベントソーシングリポジトリ
// TypeScript: EventSourcedRepository con ottimistic concurrency
class OrderRepository {
constructor(private readonly db: Database) {}
// Carica un Order dal suo stream di eventi
async findById(orderId: string): Promise<OrderAggregate | null> {
const streamId = `Order-${orderId}`;
const rows = await this.db.query(
'SELECT event_type, event_data, version FROM event_store WHERE stream_id = $1 ORDER BY version ASC',
[streamId]
);
if (rows.length === 0) return null;
const events = rows.map((row) => ({
type: row.event_type,
...row.event_data,
})) as OrderEvent[];
return OrderAggregate.rehydrate(events);
}
// Salva i nuovi eventi generati dall'Aggregate
async save(aggregate: OrderAggregate): Promise<void> {
const uncommittedEvents = aggregate.getUncommittedEvents();
if (uncommittedEvents.length === 0) return;
const orderId = aggregate.getState().id;
const streamId = `Order-${orderId}`;
// version corrente = version dopo rehydrate
let expectedVersion = aggregate.currentVersion - uncommittedEvents.length;
const client = await this.db.connect();
try {
await client.query('BEGIN');
for (const event of uncommittedEvents) {
expectedVersion++;
await client.query(
`INSERT INTO event_store (stream_id, event_type, event_data, version)
VALUES ($1, $2, $3, $4)`,
[streamId, event.type, JSON.stringify(event), expectedVersion]
);
}
await client.query('COMMIT');
aggregate.clearUncommittedEvents();
} catch (err) {
await client.query('ROLLBACK');
// Se viola il UNIQUE constraint su (stream_id, version) = conflitto concorrente
if ((err as any).code === '23505') {
throw new Error(`Optimistic concurrency conflict on stream ${streamId}`);
}
throw err;
} finally {
client.release();
}
}
}
タイムトラベルクエリ
イベント ソーシングの最も強力な利点の 1 つは、状態を再構築できることです。 過去の任意の瞬間における集計値:
// Ricostruisci lo stato dell'ordine alle 14:00 di ieri
async function getOrderStateAt(orderId: string, timestamp: Date): Promise<OrderState> {
const streamId = `Order-${orderId}`;
// Carica solo gli eventi antecedenti o uguali al timestamp
const rows = await db.query(
`SELECT event_type, event_data FROM event_store
WHERE stream_id = $1 AND created_at <= $2
ORDER BY version ASC`,
[streamId, timestamp.toISOString()]
);
const events = rows.map((row) => ({
type: row.event_type,
...row.event_data,
})) as OrderEvent[];
const aggregate = OrderAggregate.rehydrate(events);
return aggregate.getState();
}
// Esempio: "Qual era il totale dell'ordine prima dell'ultima modifica?"
const stateYesterday = await getOrderStateAt(
'order-789',
new Date('2026-03-19T14:00:00Z')
);
console.log(`Totale ieri alle 14:00: ${stateYesterday.totalAmount} EUR`);
EventStoreDB: イベント ソーシング用のネイティブ データベース
イベントストアDB イベント ソーシング専用に設計されたデータベースです。 ネイティブ プリミティブとしてイベント ストリーム、リアルタイム通知のサブスクリプションを提供します。 そしてサーバー側のプロジェクション:
// Connessione a EventStoreDB con il client TypeScript
import { EventStoreDBClient, jsonEvent } from "@eventstore/db-client";
const client = EventStoreDBClient.connectionString(
"esdb://localhost:2113?tls=false"
);
// Scrivi eventi su uno stream
async function appendOrderEvents(
orderId: string,
events: OrderEvent[],
expectedVersion: bigint
): Promise<void> {
const streamName = `Order-${orderId}`;
const esdbEvents = events.map((event) =>
jsonEvent({
type: event.type,
data: event,
metadata: {
correlationId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
},
})
);
await client.appendToStream(streamName, esdbEvents, {
expectedRevision: expectedVersion, // optimistic concurrency
});
}
// Leggi tutti gli eventi di uno stream
async function readOrderStream(orderId: string): Promise<OrderEvent[]> {
const streamName = `Order-${orderId}`;
const events: OrderEvent[] = [];
const readResult = client.readStream(streamName, {
direction: "forwards",
fromRevision: "start",
});
for await (const resolvedEvent of readResult) {
if (resolvedEvent.event?.data) {
events.push(resolvedEvent.event.data as OrderEvent);
}
}
return events;
}
// Proiezione: conta gli ordini per status
// Eseguita server-side in EventStoreDB
const projection = `
fromAll()
.when({
$init: function() { return { confirmed: 0, cancelled: 0 }; },
'OrderConfirmed': function(state, event) { state.confirmed++; },
'OrderCancelled': function(state, event) { state.cancelled++; }
})
.outputState();
`;
スナップショット: 大規模な集約の再生の最適化
集約に数千のイベントがある場合、再生は遅くなります。の スナップショット そしてチェックポイント: 最初から開始するのではなく、最後のスナップショットから再構築します。
// Snapshot ogni 100 eventi
const SNAPSHOT_INTERVAL = 100;
async function findByIdWithSnapshot(orderId: string): Promise<OrderAggregate> {
// 1. Cerca l'ultimo snapshot
const snapshot = await snapshotStore.findLatest(`Order-${orderId}`);
if (snapshot) {
// 2. Carica solo gli eventi dopo lo snapshot
const events = await eventStore.loadFrom(
`Order-${orderId}`,
snapshot.version + 1
);
// 3. Riapplica gli eventi recenti sopra lo snapshot
return OrderAggregate.rehydrateFromSnapshot(snapshot.state, events);
}
// Nessun snapshot: replay dall'inizio
const allEvents = await eventStore.loadAll(`Order-${orderId}`);
return OrderAggregate.rehydrate(allEvents);
}
// Crea uno snapshot dopo ogni save
async function saveWithSnapshot(aggregate: OrderAggregate): Promise<void> {
await orderRepo.save(aggregate);
// Crea snapshot ogni N eventi
if (aggregate.currentVersion % SNAPSHOT_INTERVAL === 0) {
await snapshotStore.save({
streamId: `Order-${aggregate.getState().id}`,
version: aggregate.currentVersion,
state: aggregate.getState(),
createdAt: new Date().toISOString(),
});
}
}
イベントソーシングのトレードオフ
イベントソーシングが合理的な場合
- 必須の監査証跡: 完全なストーリーが要求される金融システム、医療システム、法制度
- 複雑なデバッグ: 「テープを巻き戻す」ことができ、どのようにして問題のある状態に至ったのかを理解したい場合
- 時間的ビジネス インテリジェンス: 「昨日の午後 4 時の時点で、下書きステータスの注文は何件ありましたか?」
- イベント駆動型の統合: すでに EDA アーキテクチャをお持ちの場合、イベント ソーシングは自然に統合されます
イベントソーシングが意味をなさない場合
- 単純な CRUD: 監査要件のないマスターレコード管理システムは ES の恩恵を受けられません。
- 複雑なクエリ: ES は書き込みを最適化します。読み取りには投影を伴う CQRS が必要です (さらに複雑になります)
- ES 経験のないチーム: 学習曲線は重要です。管理が不十分な複雑性が利点を上回る可能性がある
- 厳格なスキーム: イベントスキーマが頻繁に変更される場合、バージョン管理が複雑になります
結論と次のステップ
イベント ソーシングは、永続性を状態の更新からイベントの追加に変換します。 不変。その結果、ネイティブ監査証跡、タイムトラベル クエリ、および 書くことと読むことの自然な分離。価格と複雑さ: の再現 イベント、スキーマ バージョンの管理、クエリに対する CQRS の要求 コンプレックス。
次の記事 — CQRS — はまさにこの課題、つまりレイヤーの構築方法に対処します。 プロジェクションを介してイベント ストアと同期する最適化された読み取りモデル (読み取りモデル)、 イベント ストリームに影響を与えることなく、高速なクエリを実行できるようになります。







