이벤트 소싱: 불변의 이벤트 시퀀스로서의 상태
은행이 어떻게 작동하는지 생각해 보십시오. 귀하의 계좌 잔액은 "데이터베이스의 숫자"가 아닙니다. 각 거래마다 업데이트됩니다." 그리고 지금까지 발생한 모든 거래의 결과는 다음과 같습니다. 예금, 인출, 이체, 이자. 어제 오후 2시 잔액을 알고 싶으시다면, 해당 시점까지의 모든 트랜잭션을 재생하여 이를 재구성할 수 있습니다. 이것은 정확히는 원리는이벤트 소싱.
엔터티의 현재 상태를 데이터베이스에 저장하고 매번 덮어쓰는 대신 편집, 이벤트 소싱 그로 이어지는 일련의 사건을 저장 그 상태로. 현재 상태는 처음부터 이벤트를 "재생"하여 달성됩니다. 그 결과 완전한 감사 추적, 시간 이동 쿼리 가능성, 쓰기(이벤트 추가)와 읽기(이벤트에 대한 투영) 사이의 자연스러운 분리.
무엇을 배울 것인가
- 이벤트 소싱 아키텍처: 이벤트 저장소, 집계, 이벤트 스트림
- TypeScript에서 이벤트로 Aggregate 구현
- 이벤트 저장소: 이벤트 데이터베이스를 구성하는 방법
- 이벤트 재생: 이벤트에서 처음부터 상태를 다시 빌드합니다.
- 시간 이동 쿼리: 역사적 순간의 집계 상태
- EventStoreDB: 이벤트 소싱을 위해 설계된 데이터베이스
- 이벤트 소싱 트레이드오프: 채택 시기와 피해야 할 시기
클래식 모델과 이벤트 소싱
주문 관리 시스템에 대한 두 가지 접근 방식을 비교해 보겠습니다.
| 나는 기다린다 | CRUD 클래식 | 이벤트 소싱 |
|---|---|---|
| 무엇이 저장되나요? | 현재 상태(UPDATE 레코드) | 이벤트 순서(INSERT 전용) |
| 감사 추적 | 아니요(또는 별도의 테이블 사용) | 예, 기본적으로 완전합니다. |
| 과거 상태 | 아니요(현재 상태만 볼 수 있음) | 예, T까지의 이벤트 재생 |
| 복잡성 | 낮은 | 중간-높음 |
| 성능 읽기 | 높음(직접 쿼리) | 검사 필요(CQRS 참조) |
| 퍼포먼스 쓰기 | 높은 | 높음(추가 전용) |
| 디버깅 | 어려움(스토리가 없는 최종 상태) | 쉬움(이벤트 재생) |
기본 개념
집계: 일관성의 단위
Un 골재 이벤트 소싱의 일관성 경계. 모든 집계 변경은 이벤트를 생성하는 메서드를 통해 발생합니다. 상태는 언제나 이벤트를 순서대로 적용하여 재구성합니다.
// TypeScript: implementazione di un Order Aggregate con Event Sourcing
// 1. Definisci gli eventi dell'Aggregate
type OrderEvent =
| { type: 'OrderCreated'; orderId: string; customerId: string; createdAt: string }
| { type: 'ItemAdded'; productId: string; quantity: number; unitPrice: number }
| { type: 'ItemRemoved'; productId: string }
| { type: 'OrderConfirmed'; confirmedAt: string }
| { type: 'OrderCancelled'; reason: string; cancelledAt: string };
// 2. Lo stato dell'Aggregate
interface OrderState {
id: string;
customerId: string;
items: Map<string, { quantity: number; unitPrice: number }>;
status: 'Draft' | 'Confirmed' | 'Cancelled';
totalAmount: number;
createdAt: string;
}
// 3. Il reducer: applica un evento allo stato (pura funzione)
function applyOrderEvent(state: OrderState | null, event: OrderEvent): OrderState {
switch (event.type) {
case 'OrderCreated':
return {
id: event.orderId,
customerId: event.customerId,
items: new Map(),
status: 'Draft',
totalAmount: 0,
createdAt: event.createdAt,
};
case 'ItemAdded': {
const newItems = new Map(state!.items);
newItems.set(event.productId, {
quantity: event.quantity,
unitPrice: event.unitPrice,
});
const total = Array.from(newItems.values())
.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0);
return { ...state!, items: newItems, totalAmount: total };
}
case 'ItemRemoved': {
const newItems = new Map(state!.items);
newItems.delete(event.productId);
const total = Array.from(newItems.values())
.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0);
return { ...state!, items: newItems, totalAmount: total };
}
case 'OrderConfirmed':
return { ...state!, status: 'Confirmed' };
case 'OrderCancelled':
return { ...state!, status: 'Cancelled' };
default:
return state!;
}
}
// 4. L'Aggregate: genera eventi in risposta a command
class OrderAggregate {
private state: OrderState | null = null;
private uncommittedEvents: OrderEvent[] = [];
private version: number = 0;
// Ricostruisce l'aggregate da una sequenza di eventi storici
static rehydrate(events: OrderEvent[]): OrderAggregate {
const aggregate = new OrderAggregate();
for (const event of events) {
aggregate.apply(event);
aggregate.version++;
}
aggregate.uncommittedEvents = []; // pulizia: gli eventi storici non sono "nuovi"
return aggregate;
}
// Metodo business: crea un nuovo ordine
create(orderId: string, customerId: string): void {
if (this.state !== null) {
throw new Error('Order already created');
}
this.raiseEvent({
type: 'OrderCreated',
orderId,
customerId,
createdAt: new Date().toISOString(),
});
}
// Metodo business: aggiunge un articolo
addItem(productId: string, quantity: number, unitPrice: number): void {
if (this.state?.status !== 'Draft') {
throw new Error('Cannot add items to a non-draft order');
}
this.raiseEvent({ type: 'ItemAdded', productId, quantity, unitPrice });
}
// Metodo business: conferma l'ordine
confirm(): void {
if (this.state?.status !== 'Draft') {
throw new Error('Only draft orders can be confirmed');
}
if (this.state.items.size === 0) {
throw new Error('Cannot confirm empty order');
}
this.raiseEvent({ type: 'OrderConfirmed', confirmedAt: new Date().toISOString() });
}
getUncommittedEvents(): OrderEvent[] {
return [...this.uncommittedEvents];
}
clearUncommittedEvents(): void {
this.uncommittedEvents = [];
}
getState(): OrderState {
if (!this.state) throw new Error('Order not initialized');
return this.state;
}
get currentVersion(): number {
return this.version;
}
private raiseEvent(event: OrderEvent): void {
this.apply(event);
this.uncommittedEvents.push(event);
this.version++;
}
private apply(event: OrderEvent): void {
this.state = applyOrderEvent(this.state, event);
}
}
이벤트 저장소: 이벤트 데이터베이스
Un 이벤트 매장 쓰기 및 쓰기에 최적화된 추가 전용 데이터베이스 일련의 사건을 읽습니다. 기본적인 구조와이벤트 스트림: 단일 Aggregate(해당 ID로 식별)에 대한 순서가 지정된 이벤트 시퀀스입니다.
// Schema dell'Event Store con PostgreSQL
CREATE TABLE event_store (
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL, -- "Order-{orderId}"
event_type TEXT NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
version BIGINT NOT NULL, -- posizione nell'event stream
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Previene concurrent writes con lo stesso version number
CONSTRAINT uq_stream_version UNIQUE (stream_id, version)
);
-- Index per lettura efficiente di uno stream
CREATE INDEX idx_event_store_stream ON event_store (stream_id, version ASC);
-- Index per proiezioni su tipo di evento
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);
이벤트 소싱 저장소
// TypeScript: EventSourcedRepository con ottimistic concurrency
class OrderRepository {
constructor(private readonly db: Database) {}
// Carica un Order dal suo stream di eventi
async findById(orderId: string): Promise<OrderAggregate | null> {
const streamId = `Order-${orderId}`;
const rows = await this.db.query(
'SELECT event_type, event_data, version FROM event_store WHERE stream_id = $1 ORDER BY version ASC',
[streamId]
);
if (rows.length === 0) return null;
const events = rows.map((row) => ({
type: row.event_type,
...row.event_data,
})) as OrderEvent[];
return OrderAggregate.rehydrate(events);
}
// Salva i nuovi eventi generati dall'Aggregate
async save(aggregate: OrderAggregate): Promise<void> {
const uncommittedEvents = aggregate.getUncommittedEvents();
if (uncommittedEvents.length === 0) return;
const orderId = aggregate.getState().id;
const streamId = `Order-${orderId}`;
// version corrente = version dopo rehydrate
let expectedVersion = aggregate.currentVersion - uncommittedEvents.length;
const client = await this.db.connect();
try {
await client.query('BEGIN');
for (const event of uncommittedEvents) {
expectedVersion++;
await client.query(
`INSERT INTO event_store (stream_id, event_type, event_data, version)
VALUES ($1, $2, $3, $4)`,
[streamId, event.type, JSON.stringify(event), expectedVersion]
);
}
await client.query('COMMIT');
aggregate.clearUncommittedEvents();
} catch (err) {
await client.query('ROLLBACK');
// Se viola il UNIQUE constraint su (stream_id, version) = conflitto concorrente
if ((err as any).code === '23505') {
throw new Error(`Optimistic concurrency conflict on stream ${streamId}`);
}
throw err;
} finally {
client.release();
}
}
}
시간 여행 쿼리
이벤트 소싱의 가장 강력한 장점 중 하나는 상태를 재구성하는 능력입니다. 역사적 순간의 총계:
// Ricostruisci lo stato dell'ordine alle 14:00 di ieri
async function getOrderStateAt(orderId: string, timestamp: Date): Promise<OrderState> {
const streamId = `Order-${orderId}`;
// Carica solo gli eventi antecedenti o uguali al timestamp
const rows = await db.query(
`SELECT event_type, event_data FROM event_store
WHERE stream_id = $1 AND created_at <= $2
ORDER BY version ASC`,
[streamId, timestamp.toISOString()]
);
const events = rows.map((row) => ({
type: row.event_type,
...row.event_data,
})) as OrderEvent[];
const aggregate = OrderAggregate.rehydrate(events);
return aggregate.getState();
}
// Esempio: "Qual era il totale dell'ordine prima dell'ultima modifica?"
const stateYesterday = await getOrderStateAt(
'order-789',
new Date('2026-03-19T14:00:00Z')
);
console.log(`Totale ieri alle 14:00: ${stateYesterday.totalAmount} EUR`);
EventStoreDB: 이벤트 소싱을 위한 기본 데이터베이스
이벤트스토어DB 이벤트 소싱을 위해 특별히 설계된 데이터베이스입니다. 이벤트 스트림을 기본 프리미티브로 제공하고 실시간 알림을 위한 구독을 제공합니다. 및 서버 측 프로젝션:
// Connessione a EventStoreDB con il client TypeScript
import { EventStoreDBClient, jsonEvent } from "@eventstore/db-client";
const client = EventStoreDBClient.connectionString(
"esdb://localhost:2113?tls=false"
);
// Scrivi eventi su uno stream
async function appendOrderEvents(
orderId: string,
events: OrderEvent[],
expectedVersion: bigint
): Promise<void> {
const streamName = `Order-${orderId}`;
const esdbEvents = events.map((event) =>
jsonEvent({
type: event.type,
data: event,
metadata: {
correlationId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
},
})
);
await client.appendToStream(streamName, esdbEvents, {
expectedRevision: expectedVersion, // optimistic concurrency
});
}
// Leggi tutti gli eventi di uno stream
async function readOrderStream(orderId: string): Promise<OrderEvent[]> {
const streamName = `Order-${orderId}`;
const events: OrderEvent[] = [];
const readResult = client.readStream(streamName, {
direction: "forwards",
fromRevision: "start",
});
for await (const resolvedEvent of readResult) {
if (resolvedEvent.event?.data) {
events.push(resolvedEvent.event.data as OrderEvent);
}
}
return events;
}
// Proiezione: conta gli ordini per status
// Eseguita server-side in EventStoreDB
const projection = `
fromAll()
.when({
$init: function() { return { confirmed: 0, cancelled: 0 }; },
'OrderConfirmed': function(state, event) { state.confirmed++; },
'OrderCancelled': function(state, event) { state.cancelled++; }
})
.outputState();
`;
스냅샷: 대규모 집계 재생 최적화
집계에 수천 개의 이벤트가 있으면 재생 속도가 느려집니다. 그만큼 스냅샷 그리고 체크포인트: 처음부터 시작하는 대신 마지막 스냅샷에서 다시 빌드합니다.
// Snapshot ogni 100 eventi
const SNAPSHOT_INTERVAL = 100;
async function findByIdWithSnapshot(orderId: string): Promise<OrderAggregate> {
// 1. Cerca l'ultimo snapshot
const snapshot = await snapshotStore.findLatest(`Order-${orderId}`);
if (snapshot) {
// 2. Carica solo gli eventi dopo lo snapshot
const events = await eventStore.loadFrom(
`Order-${orderId}`,
snapshot.version + 1
);
// 3. Riapplica gli eventi recenti sopra lo snapshot
return OrderAggregate.rehydrateFromSnapshot(snapshot.state, events);
}
// Nessun snapshot: replay dall'inizio
const allEvents = await eventStore.loadAll(`Order-${orderId}`);
return OrderAggregate.rehydrate(allEvents);
}
// Crea uno snapshot dopo ogni save
async function saveWithSnapshot(aggregate: OrderAggregate): Promise<void> {
await orderRepo.save(aggregate);
// Crea snapshot ogni N eventi
if (aggregate.currentVersion % SNAPSHOT_INTERVAL === 0) {
await snapshotStore.save({
streamId: `Order-${aggregate.getState().id}`,
version: aggregate.currentVersion,
state: aggregate.getState(),
createdAt: new Date().toISOString(),
});
}
}
이벤트 소싱 절충안
이벤트 소싱이 적합한 경우
- 필수 감사 추적: 완전한 스토리가 요구되는 금융 시스템, 의료 시스템, 법률 시스템
- 복잡한 디버깅: "테이프를 되감고" 어떻게 문제가 발생한 상태인지 이해하고 싶을 때
- 임시 비즈니스 인텔리전스: “어제 오후 4시에 초안 상태의 주문이 몇 개 있었나요?”
- 이벤트 기반 통합: 이미 EDA 아키텍처가 있는 경우 Event Sourcing이 자연스럽게 통합됩니다.
이벤트 소싱이 의미가 없는 경우
- 간단한 CRUD: 감사 요구사항이 없는 마스터 레코드 관리 시스템은 ES의 이점을 누릴 수 없습니다.
- 복잡한 쿼리: ES는 쓰기를 최적화합니다. 읽기에는 프로젝션이 포함된 CQRS가 필요합니다(추가 복잡성).
- ES 경험이 없는 팀: 학습 곡선은 중요합니다. 제대로 관리되지 않은 복잡성은 이점보다 더 클 수 있습니다.
- 엄격한 구성: 이벤트 스키마가 자주 변경되면 버전 관리가 복잡해집니다.
결론 및 다음 단계
이벤트 소싱은 지속성을 상태 업데이트에서 이벤트 추가로 변환합니다. 불변. 그 결과 기본 감사 추적, 시간 이동 쿼리 및 쓰기와 읽기의 자연스러운 분리. 가격과 복잡성: 이벤트, 스키마 버전 관리 및 쿼리에 CQRS 요구 복잡한.
다음 기사인 CQRS에서는 레이어를 구축하는 방법이라는 과제를 정확하게 다루고 있습니다. 프로젝션을 통해 이벤트 저장소와 동기화되는 최적화된 읽기 모델(읽기 모델), 이벤트 스트림을 건드리지 않고도 빠른 쿼리가 가능합니다.







