イベント ソーシングと CQRS の併用: プロジェクション、スナップショット、一貫性
イベント ソーシングのみが、不変かつ監査可能な方法で書き込み側を処理します。 CQRS のみで最適化 読み書きは別々に。使用済み 一緒に、彼らは最も難しい問題の 1 つを解決します ソフトウェア アーキテクチャの説明: 完全に監査可能で、読み取り値に応じて拡張可能なシステムを構築する方法 クエリのパフォーマンスが優れています。しかし、この能力には代償が伴います。維持する予測と再生のためのスナップショットです。 効率的であり、書き込みモデルと読み取りモデル間の一貫性を管理します。
概要: イベント ソーシングと CQRS の分離
それらを組み合わせる前に、2 つのパターンを個別にまとめてみましょう。
- イベントソーシング: アグリゲートの現在の状態をデータベースに保存する代わりに、 あなたはそれを保存します 一連の出来事 誰がそれを作成したのか。状態が再構築される 再生(再生中) 最初からのイベント。利点: 完全な監査証跡、 タイムトラベル、イベントから得られるビジネスの洞察。
- CQRS: コマンド側 (書き込みモデル) とクエリ側 (読み取りモデル) は分離されています。 書き込みモデルはコマンドを受け取り、イベントを生成します。読み取りモデルは次のように最適化されています。 イベントから構築されたアプリケーション固有のクエリ。
この組み合わせは自然です。イベント ソーシングはイベントを生成し、CQRS はそれらを消費してビルドします。 最適化された読み取りモデル (ファイル 投影).
イベント ソーシングと CQRS を併用する場合
- 監査証跡が必須のシステム (フィンテック、ヘルスケア、電子商取引)
- 読み取りモデルの構造が書き込みモデルと大きく異なる場合
- 読み取りを書き込みと比較して 10 ~ 100 倍にスケールする必要がある場合
- タイムトラベルクエリを含むシステム (「3 月 15 日の状況は何ですか?」)
- 使用しないでください 単純な CRUD アプリケーションの場合: 大幅なやり過ぎ
完全なアーキテクチャ: エンドツーエンドのフロー
イベント ソーシング + CQRS システムにおけるリクエストの流れは次のとおりです。
- クライアントは 指示 (例えば。
EffettuaOrdineCommand) - Il コマンドハンドラー イベント ストアから集約をロードします (イベントのリプレイ)
- アグリゲートはコマンドを検証し、1 つ以上のコマンドを生成します。 ドメインイベント (例えば。
OrdineEffettuatoEvent) - イベントは永続化されますイベントストア (追加のみ)
- イベントが公開されているのは、 メッセージバス (Kafka、EventBridge など)
- 1 つ以上 プロジェクションハンドラー イベントを消費し、 モデルの読み取り
- 読み取りモデル (通常は読み取りに最適化されたデータベース) から読み取られるクライアント クエリ
実装: イベント ソーシングを使用した集約
// OrdineAggregate.java - Aggregate con Event Sourcing
// Segue il pattern "eventi come source of truth"
import java.util.*;
import java.time.Instant;
public class OrdineAggregate {
// ID univoco dell'aggregate
private String ordineId;
// Versione corrente (numero di eventi applicati)
private long version = -1;
// Stato ricostruito dagli eventi
private StatoOrdine stato;
private String clienteId;
private List<LineaOrdine> linee;
private BigDecimal totale;
// Lista degli eventi prodotti dalla sessione corrente (non ancora persistiti)
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// Costruttore privato: si crea sempre tramite replay o factory method
private OrdineAggregate() {}
// ===== FACTORY METHOD: ricostruisce lo stato dal replay degli eventi =====
public static OrdineAggregate reconstituteFrom(List<DomainEvent> events) {
OrdineAggregate aggregate = new OrdineAggregate();
events.forEach(aggregate::apply);
return aggregate;
}
// ===== COMMAND HANDLING: valida il comando e produce eventi =====
public void effettuaOrdine(EffettuaOrdineCommand cmd) {
// Validazione business rules
if (this.stato != null) {
throw new IllegalStateException("Ordine gia esistente: " + ordineId);
}
if (cmd.getLinee() == null || cmd.getLinee().isEmpty()) {
throw new IllegalArgumentException("L'ordine deve avere almeno una linea");
}
// Produce l'evento (nessuna modifica di stato diretta!)
OrdineEffettuatoEvent event = new OrdineEffettuatoEvent(
cmd.getOrdineId(),
cmd.getClienteId(),
cmd.getLinee(),
Instant.now()
);
// Applica l'evento localmente e aggiungilo agli uncommitted
applyAndRecord(event);
}
public void confermaPagamento(String metodoPagamento, String transactionId) {
if (this.stato != StatoOrdine.IN_ATTESA_PAGAMENTO) {
throw new IllegalStateException("Ordine non in attesa di pagamento");
}
PagamentoConfermatoEvent event = new PagamentoConfermatoEvent(
ordineId, metodoPagamento, transactionId, Instant.now()
);
applyAndRecord(event);
}
// ===== APPLY: modifica lo stato interno applicando un evento =====
// Questi metodi devono essere DETERMINISTICI e SENZA SIDE EFFECTS
private void apply(OrdineEffettuatoEvent event) {
this.ordineId = event.getOrdineId();
this.clienteId = event.getClienteId();
this.linee = new ArrayList<>(event.getLinee());
this.totale = event.getTotale();
this.stato = StatoOrdine.IN_ATTESA_PAGAMENTO;
this.version++;
}
private void apply(PagamentoConfermatoEvent event) {
this.stato = StatoOrdine.PAGATO;
this.version++;
}
private void apply(DomainEvent event) {
// Dispatch dinamico per tipo di evento
if (event instanceof OrdineEffettuatoEvent e) apply(e);
else if (event instanceof PagamentoConfermatoEvent e) apply(e);
else throw new IllegalArgumentException("Evento sconosciuto: " + event.getClass());
}
private void applyAndRecord(DomainEvent event) {
apply(event);
uncommittedEvents.add(event);
}
public List<DomainEvent> getUncommittedEvents() {
return Collections.unmodifiableList(uncommittedEvents);
}
public void clearUncommittedEvents() {
uncommittedEvents.clear();
}
public long getVersion() { return version; }
public String getOrdineId() { return ordineId; }
}
イベント ストア: イベントの永続性
L'イベントストア これは、各集計のイベントのシーケンスを保持するデータベースです。 基本的な要件は、聖典が次のとおりであることです。 追加のみ そしてサポート の オプティミスティック同時実行チェック: 集約の予期されるバージョンを指定します その間に他の誰かがイベント (競合) を書き込んだ場合、書き込みは失敗します。
// EventStore.java - Interfaccia e implementazione PostgreSQL
import java.util.*;
public interface EventStore {
// Carica tutti gli eventi per un aggregate (per il replay)
List<DomainEvent> loadEvents(String aggregateId);
// Persiste nuovi eventi con optimistic concurrency check
// expectedVersion: la versione che ci aspettavamo (fallisce se diversa)
void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion);
}
// Implementazione con PostgreSQL
public class PostgresEventStore implements EventStore {
private final DataSource dataSource;
private final EventSerializer serializer;
// DDL della tabella event_store
// CREATE TABLE event_store (
// id BIGSERIAL PRIMARY KEY,
// aggregate_id VARCHAR(36) NOT NULL,
// version BIGINT NOT NULL,
// event_type VARCHAR(255) NOT NULL,
// event_data JSONB NOT NULL,
// occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// UNIQUE(aggregate_id, version) -- optimistic locking
// );
@Override
public List<DomainEvent> loadEvents(String aggregateId) {
String sql = "SELECT event_type, event_data, version " +
"FROM event_store WHERE aggregate_id = ? ORDER BY version ASC";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, aggregateId);
ResultSet rs = ps.executeQuery();
List<DomainEvent> events = new ArrayList<>();
while (rs.next()) {
events.add(serializer.deserialize(
rs.getString("event_type"),
rs.getString("event_data")
));
}
return events;
} catch (SQLException e) {
throw new EventStoreException("Errore caricamento eventi per " + aggregateId, e);
}
}
@Override
public void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion) {
String sql = "INSERT INTO event_store (aggregate_id, version, event_type, event_data) " +
"VALUES (?, ?, ?, ?::jsonb)";
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
// Optimistic concurrency check
long currentVersion = getCurrentVersion(conn, aggregateId);
if (currentVersion != expectedVersion) {
throw new OptimisticConcurrencyException(
"Conflitto per aggregate " + aggregateId +
": atteso version " + expectedVersion + ", trovato " + currentVersion
);
}
try (PreparedStatement ps = conn.prepareStatement(sql)) {
long version = expectedVersion + 1;
for (DomainEvent event : events) {
ps.setString(1, aggregateId);
ps.setLong(2, version++);
ps.setString(3, event.getClass().getSimpleName());
ps.setString(4, serializer.serialize(event));
ps.addBatch();
}
ps.executeBatch();
}
conn.commit();
} catch (SQLException e) {
throw new EventStoreException("Errore scrittura eventi", e);
}
}
}
プロジェクション: イベントから読み取りモデルを構築する
Una 投影 ビューを構築するイベント コンシューマです (読み取りモデル) クエリ用に最適化されています。すべての投影は冪等であり、 イベント ストアの先頭からすべてのイベントを再度読み取ることで、最初から再構築できます。
予測は各イベントの後に非同期的に更新されます。これがソースです。 の 最終的な整合性。コマンドを実行したばかりのクライアントは、 読み取りモデルの結果はすぐには表示されません。
// OrdiniProjectionHandler.java - Costruisce il read model degli ordini
// Ascolta gli eventi e aggiorna la tabella di lettura ottimizzata
@Component
public class OrdiniProjectionHandler {
private final OrdiniReadRepository readRepo;
// Gestisce OrdineEffettuatoEvent
@EventHandler
public void on(OrdineEffettuatoEvent event) {
OrdineReadModel readModel = OrdineReadModel.builder()
.ordineId(event.getOrdineId())
.clienteId(event.getClienteId())
.stato("IN_ATTESA_PAGAMENTO")
.totale(event.getTotale())
.linee(event.getLinee())
.dataCreazione(event.getOccurredAt())
.build();
readRepo.save(readModel);
}
// Gestisce PagamentoConfermatoEvent
@EventHandler
public void on(PagamentoConfermatoEvent event) {
readRepo.updateStato(event.getOrdineId(), "PAGATO");
readRepo.updateDataPagamento(event.getOrdineId(), event.getOccurredAt());
}
// Query ottimizzate sul read model
public List<OrdineReadModel> getOrdiniCliente(String clienteId) {
return readRepo.findByClienteId(clienteId);
}
public List<OrdineReadModel> getOrdiniInAttesa() {
return readRepo.findByStato("IN_ATTESA_PAGAMENTO");
}
}
// Schema del read model (tabella ottimizzata per le query piu comuni)
// CREATE TABLE ordini_read (
// ordine_id VARCHAR(36) PRIMARY KEY,
// cliente_id VARCHAR(36) NOT NULL,
// stato VARCHAR(50) NOT NULL,
// totale DECIMAL(10,2),
// data_creazione TIMESTAMPTZ,
// data_pagamento TIMESTAMPTZ,
// linee JSONB,
// -- Indici ottimizzati per le query frequenti
// CONSTRAINT idx_cliente_id USING btree(cliente_id),
// CONSTRAINT idx_stato USING btree(stato)
// );
スナップショット: 多数のイベントを含む集計の再生の最適化
アグリゲートは時間の経過とともにイベントを蓄積するため、 フルリプレイ 遅くなります: ステータス更新が 100 件ある注文には、準備が整うまでに 100 件のクエリと 100 件の適用が必要です 新しいコマンドを管理します。の スナップショット この問題を解決します: 定期的に集約の現在の状態が保存され (スナップショット)、次回のリロード時に保存されます。 ストーリーの最初からではなく、スナップショットから始めます。
// SnapshotStore.java - Gestione degli snapshot degli aggregati
public interface SnapshotStore {
Optional<AggregateSnapshot> loadLatestSnapshot(String aggregateId);
void saveSnapshot(String aggregateId, Object aggregateState, long version);
}
// AggregateRepository con supporto snapshot
public class SnapshottingAggregateRepository {
private static final int SNAPSHOT_THRESHOLD = 50; // snapshot ogni 50 eventi
private final EventStore eventStore;
private final SnapshotStore snapshotStore;
public OrdineAggregate load(String ordineId) {
// 1. Prova a caricare lo snapshot piu recente
Optional<AggregateSnapshot> snapshot = snapshotStore.loadLatestSnapshot(ordineId);
OrdineAggregate aggregate;
long fromVersion;
if (snapshot.isPresent()) {
// Parte dallo snapshot invece che dall'evento 0
aggregate = (OrdineAggregate) snapshot.get().getState();
fromVersion = snapshot.get().getVersion() + 1;
} else {
aggregate = new OrdineAggregate();
fromVersion = 0;
}
// 2. Carica solo gli eventi DOPO lo snapshot
List<DomainEvent> recentEvents = eventStore.loadEventsFrom(ordineId, fromVersion);
recentEvents.forEach(aggregate::applyEvent);
return aggregate;
}
public void save(OrdineAggregate aggregate) {
List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
eventStore.appendEvents(
aggregate.getOrdineId(),
uncommitted,
aggregate.getVersion() - uncommitted.size()
);
aggregate.clearUncommittedEvents();
// Crea uno snapshot se l'aggregate ha accumulato abbastanza eventi
if (aggregate.getVersion() % SNAPSHOT_THRESHOLD == 0) {
snapshotStore.saveSnapshot(
aggregate.getOrdineId(),
aggregate, // serializza lo stato corrente
aggregate.getVersion()
);
}
}
}
// Tabella snapshot
// CREATE TABLE aggregate_snapshots (
// aggregate_id VARCHAR(36) NOT NULL,
// version BIGINT NOT NULL,
// state JSONB NOT NULL,
// created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// PRIMARY KEY (aggregate_id, version)
// );
最終的な整合性の管理
イベント ソーシング + CQRS の主な課題は次のとおりです。 不一致ウィンドウ: コマンドが正常に実行された後、読み取りモデルはまだ更新されていません (投影はまだイベントを処理中です)。その後すぐに読むクライアント 変更が表示されない場合があります。
この状況を管理するには、いくつかの戦略があります。
戦略 1: バージョンチェックを使用したポーリング
// Il client aspetta che il read model raggiunga la versione attesa
// Utile per operazioni sequenziali: "aspetta che l'ordine sia visibile prima di procedere"
async function waitForProjection(ordineId, expectedVersion, maxWaitMs = 5000) {
const startTime = Date.now();
while (Date.now() - startTime < maxWaitMs) {
const ordine = await readModel.getOrdine(ordineId);
if (ordine && ordine.version >= expectedVersion) {
return ordine; // Projection aggiornata
}
await sleep(100); // Aspetta 100ms e riprova
}
throw new ProjectionTimeoutError(
`Timeout: read model non aggiornato dopo ${maxWaitMs}ms per ordine ${ordineId}`
);
}
戦略 2: コマンド応答でバージョンを返す
// Command Handler restituisce la versione dell'evento prodotto
// Il client la usa per il version check o per il polling
@PostMapping("/ordini")
public ResponseEntity<CommandResponse> effettuaOrdine(@RequestBody EffettuaOrdineCommand cmd) {
OrdineAggregate aggregate = ordineRepository.load(cmd.getOrdineId());
aggregate.effettuaOrdine(cmd);
ordineRepository.save(aggregate);
return ResponseEntity.ok(CommandResponse.builder()
.aggregateId(cmd.getOrdineId())
.version(aggregate.getVersion()) // la versione dopo il salvataggio
.message("Ordine effettuato con successo")
.build());
}
戦略 3: 重要なデータの同期投影
// Per alcune proiezioni critiche, aggiorna il read model nella stessa transazione
// del salvataggio degli eventi (sacrificando un po' di disaccoppiamento)
// Usato quando il client deve assolutamente vedere i dati immediatamente
@Transactional
public void saveWithSyncProjection(OrdineAggregate aggregate) {
// 1. Salva eventi nell'Event Store
List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
eventStore.appendEvents(aggregate.getOrdineId(), uncommitted, ...);
// 2. Aggiorna il read model nella STESSA transazione (sincrono)
uncommitted.forEach(event -> {
if (event instanceof OrdineEffettuatoEvent e) {
ordiniReadRepo.save(OrdineReadModel.from(e));
}
});
// 3. Pubblica gli eventi sul bus per le projection asincrone degli altri servizi
eventBus.publish(uncommitted);
aggregate.clearUncommittedEvents();
}
プロジェクションの再構築
イベント ソーシングの大きな利点の 1 つは、再構築できることです。 イベント ストアの最初からすべてのイベントを再読み込みすることで、投影を最初から実行できます。 これにより、新しい読み取りモデルを遡及的に追加したり、バグを修正したりすることができます 既存の予測では。
// ProjectionRebuilder.java - Ricostruisce una projection da zero
public class ProjectionRebuilder {
private final EventStore eventStore;
public void rebuild(ProjectionHandler handler, String fromAggregateType) {
System.out.println("Avvio rebuild projection: " + handler.getClass().getSimpleName());
// 1. Svuota il read model corrente
handler.reset();
// 2. Legge tutti gli eventi in ordine cronologico
// (implementazione dipende dall'event store specifico)
AtomicLong processed = new AtomicLong(0);
eventStore.streamAllEvents(fromAggregateType, event -> {
handler.handle(event);
long count = processed.incrementAndGet();
if (count % 1000 == 0) {
System.out.println("Processati " + count + " eventi...");
}
});
System.out.println("Rebuild completato: " + processed.get() + " eventi processati");
}
}
ベストプラクティスとアンチパターン
アンチパターン: 過去の出来事を変える
イベント ストアのイベントは次のとおりです。 不変。書き込み後は絶対に編集しないでください。
イベント スキーマを変更する必要がある場合は、新しいバージョンのイベントを作成します
(OrdineEffettuatoV2Event)、デシリアライザーでアップキャストを処理します。
過去のイベントを変更すると、監査証跡の整合性が損なわれ、再生が中断されます。
- トランザクションごとに 1 つの集計: 同じ作業単位内の複数の集計を編集しないでください。 Saga を使用して、クロス集計操作を調整します。
- 過去の事実としての出来事: イベント名は、すでに起こったことを説明します (
OrdineEffettuatoEvent、 ないEffettuaOrdineEvent). - べき等投影: 各プロジェクション ハンドラーは、同じイベントで複数回呼び出しても安全である必要があります (少なくとも 1 回の配信)。
- コマンドなどのイベントは使用しないでください: イベントのパブリッシュでは、別のアグリゲートでコマンドを直接起動してはなりません。 Saga または Process Manager を使用します。
- イベント数に基づくスナップショットポリシー: 50 ~ 100 イベントごとにスナップショットを作成します。頻度が高すぎる = シリアル化のオーバーヘッド。レアすぎる = リプレイが遅い。
シリーズの次のステップ
- 第5条 – サーガパターン: 操作に複数の集計が含まれる場合 または複数のサービスの場合、Saga パターンは 2PC なしで分散トランザクションを処理します。 失敗した場合の補償トランザクション付き。
- 第 6 条 – AWS EventBridge: イベント ストアからイベントを公開する方法 EventBridge 上でサーバーレス方法で Lambda、SQS、その他のターゲットに到達します。
他シリーズとの連携
- Apache Kafka (シリーズ 38): Kafka は理想的なイベント ストアおよびメッセージ バスです 実稼働環境でのイベント ソーシング用。 Kafka Streams と Kafka Connect の記事は統合されています ここで説明されているパターンを直接使用します。
- PostgreSQL AI と pgvector: 投影の読み取りモデルは次のようになります。 類似性のためのベクトル列を含む特殊なインデックスを備えた PostgreSQL データベース 注文のテキスト説明のセマンティクス。







