概要: イベント ソーシングと CQRS の分離

それらを組み合わせる前に、2 つのパターンを個別にまとめてみましょう。

  • イベントソーシング: アグリゲートの現在の状態をデータベースに保存する代わりに、 あなたはそれを保存します 一連の出来事 誰がそれを作成したのか。状態が再構築される 再生(再生中) 最初からのイベント。利点: 完全な監査証跡、 タイムトラベル、イベントから得られるビジネスの洞察。
  • CQRS: コマンド側 (書き込みモデル) とクエリ側 (読み取りモデル) は分離されています。 書き込みモデルはコマンドを受け取り、イベントを生成します。読み取りモデルは次のように最適化されています。 イベントから構築されたアプリケーション固有のクエリ。

この組み合わせは自然です。イベント ソーシングはイベントを生成し、CQRS はそれらを消費してビルドします。 最適化された読み取りモデル (ファイル 投影).

イベント ソーシングと CQRS を併用する場合

  • 監査証跡が必須のシステム (フィンテック、ヘルスケア、電子商取引)
  • 読み取りモデルの構造が書き込みモデルと大きく異なる場合
  • 読み取りを書き込みと比較して 10 ~ 100 倍にスケールする必要がある場合
  • タイムトラベルクエリを含むシステム (「3 月 15 日の状況は何ですか?」)
  • 使用しないでください 単純な CRUD アプリケーションの場合: 大幅なやり過ぎ

完全なアーキテクチャ: エンドツーエンドのフロー

イベント ソーシング + CQRS システムにおけるリクエストの流れは次のとおりです。

  1. クライアントは 指示 (例えば。 EffettuaOrdineCommand)
  2. Il コマンドハンドラー イベント ストアから集約をロードします (イベントのリプレイ)
  3. アグリゲートはコマンドを検証し、1 つ以上のコマンドを生成します。 ドメインイベント (例えば。 OrdineEffettuatoEvent)
  4. イベントは永続化されますイベントストア (追加のみ)
  5. イベントが公開されているのは、 メッセージバス (Kafka、EventBridge など)
  6. 1 つ以上 プロジェクションハンドラー イベントを消費し、 モデルの読み取り
  7. 読み取りモデル (通常は読み取りに最適化されたデータベース) から読み取られるクライアント クエリ

実装: イベント ソーシングを使用した集約

// OrdineAggregate.java - Aggregate con Event Sourcing
// Segue il pattern "eventi come source of truth"

import java.util.*;
import java.time.Instant;

public class OrdineAggregate {

    // ID univoco dell'aggregate
    private String ordineId;
    // Versione corrente (numero di eventi applicati)
    private long version = -1;
    // Stato ricostruito dagli eventi
    private StatoOrdine stato;
    private String clienteId;
    private List<LineaOrdine> linee;
    private BigDecimal totale;

    // Lista degli eventi prodotti dalla sessione corrente (non ancora persistiti)
    private final List<DomainEvent> uncommittedEvents = new ArrayList<>();

    // Costruttore privato: si crea sempre tramite replay o factory method
    private OrdineAggregate() {}

    // ===== FACTORY METHOD: ricostruisce lo stato dal replay degli eventi =====
    public static OrdineAggregate reconstituteFrom(List<DomainEvent> events) {
        OrdineAggregate aggregate = new OrdineAggregate();
        events.forEach(aggregate::apply);
        return aggregate;
    }

    // ===== COMMAND HANDLING: valida il comando e produce eventi =====

    public void effettuaOrdine(EffettuaOrdineCommand cmd) {
        // Validazione business rules
        if (this.stato != null) {
            throw new IllegalStateException("Ordine gia esistente: " + ordineId);
        }
        if (cmd.getLinee() == null || cmd.getLinee().isEmpty()) {
            throw new IllegalArgumentException("L'ordine deve avere almeno una linea");
        }

        // Produce l'evento (nessuna modifica di stato diretta!)
        OrdineEffettuatoEvent event = new OrdineEffettuatoEvent(
            cmd.getOrdineId(),
            cmd.getClienteId(),
            cmd.getLinee(),
            Instant.now()
        );

        // Applica l'evento localmente e aggiungilo agli uncommitted
        applyAndRecord(event);
    }

    public void confermaPagamento(String metodoPagamento, String transactionId) {
        if (this.stato != StatoOrdine.IN_ATTESA_PAGAMENTO) {
            throw new IllegalStateException("Ordine non in attesa di pagamento");
        }

        PagamentoConfermatoEvent event = new PagamentoConfermatoEvent(
            ordineId, metodoPagamento, transactionId, Instant.now()
        );
        applyAndRecord(event);
    }

    // ===== APPLY: modifica lo stato interno applicando un evento =====
    // Questi metodi devono essere DETERMINISTICI e SENZA SIDE EFFECTS

    private void apply(OrdineEffettuatoEvent event) {
        this.ordineId = event.getOrdineId();
        this.clienteId = event.getClienteId();
        this.linee = new ArrayList<>(event.getLinee());
        this.totale = event.getTotale();
        this.stato = StatoOrdine.IN_ATTESA_PAGAMENTO;
        this.version++;
    }

    private void apply(PagamentoConfermatoEvent event) {
        this.stato = StatoOrdine.PAGATO;
        this.version++;
    }

    private void apply(DomainEvent event) {
        // Dispatch dinamico per tipo di evento
        if (event instanceof OrdineEffettuatoEvent e) apply(e);
        else if (event instanceof PagamentoConfermatoEvent e) apply(e);
        else throw new IllegalArgumentException("Evento sconosciuto: " + event.getClass());
    }

    private void applyAndRecord(DomainEvent event) {
        apply(event);
        uncommittedEvents.add(event);
    }

    public List<DomainEvent> getUncommittedEvents() {
        return Collections.unmodifiableList(uncommittedEvents);
    }

    public void clearUncommittedEvents() {
        uncommittedEvents.clear();
    }

    public long getVersion() { return version; }
    public String getOrdineId() { return ordineId; }
}

イベント ストア: イベントの永続性

L'イベントストア これは、各集計のイベントのシーケンスを保持するデータベースです。 基本的な要件は、聖典が次のとおりであることです。 追加のみ そしてサポート の オプティミスティック同時実行チェック: 集約の予期されるバージョンを指定します その間に他の誰かがイベント (競合) を書き込んだ場合、書き込みは失敗します。

// EventStore.java - Interfaccia e implementazione PostgreSQL
import java.util.*;

public interface EventStore {
    // Carica tutti gli eventi per un aggregate (per il replay)
    List<DomainEvent> loadEvents(String aggregateId);

    // Persiste nuovi eventi con optimistic concurrency check
    // expectedVersion: la versione che ci aspettavamo (fallisce se diversa)
    void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion);
}

// Implementazione con PostgreSQL
public class PostgresEventStore implements EventStore {

    private final DataSource dataSource;
    private final EventSerializer serializer;

    // DDL della tabella event_store
    // CREATE TABLE event_store (
    //   id           BIGSERIAL PRIMARY KEY,
    //   aggregate_id VARCHAR(36) NOT NULL,
    //   version      BIGINT NOT NULL,
    //   event_type   VARCHAR(255) NOT NULL,
    //   event_data   JSONB NOT NULL,
    //   occurred_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    //   UNIQUE(aggregate_id, version)  -- optimistic locking
    // );

    @Override
    public List<DomainEvent> loadEvents(String aggregateId) {
        String sql = "SELECT event_type, event_data, version " +
                     "FROM event_store WHERE aggregate_id = ? ORDER BY version ASC";

        try (Connection conn = dataSource.getConnection();
             PreparedStatement ps = conn.prepareStatement(sql)) {
            ps.setString(1, aggregateId);
            ResultSet rs = ps.executeQuery();

            List<DomainEvent> events = new ArrayList<>();
            while (rs.next()) {
                events.add(serializer.deserialize(
                    rs.getString("event_type"),
                    rs.getString("event_data")
                ));
            }
            return events;

        } catch (SQLException e) {
            throw new EventStoreException("Errore caricamento eventi per " + aggregateId, e);
        }
    }

    @Override
    public void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion) {
        String sql = "INSERT INTO event_store (aggregate_id, version, event_type, event_data) " +
                     "VALUES (?, ?, ?, ?::jsonb)";

        try (Connection conn = dataSource.getConnection()) {
            conn.setAutoCommit(false);

            // Optimistic concurrency check
            long currentVersion = getCurrentVersion(conn, aggregateId);
            if (currentVersion != expectedVersion) {
                throw new OptimisticConcurrencyException(
                    "Conflitto per aggregate " + aggregateId +
                    ": atteso version " + expectedVersion + ", trovato " + currentVersion
                );
            }

            try (PreparedStatement ps = conn.prepareStatement(sql)) {
                long version = expectedVersion + 1;
                for (DomainEvent event : events) {
                    ps.setString(1, aggregateId);
                    ps.setLong(2, version++);
                    ps.setString(3, event.getClass().getSimpleName());
                    ps.setString(4, serializer.serialize(event));
                    ps.addBatch();
                }
                ps.executeBatch();
            }

            conn.commit();

        } catch (SQLException e) {
            throw new EventStoreException("Errore scrittura eventi", e);
        }
    }
}

プロジェクション: イベントから読み取りモデルを構築する

Una 投影 ビューを構築するイベント コンシューマです (読み取りモデル) クエリ用に最適化されています。すべての投影は冪等であり、 イベント ストアの先頭からすべてのイベントを再度読み取ることで、最初から再構築できます。

予測は各イベントの後に非同期的に更新されます。これがソースです。 の 最終的な整合性。コマンドを実行したばかりのクライアントは、 読み取りモデルの結果はすぐには表示されません。

// OrdiniProjectionHandler.java - Costruisce il read model degli ordini
// Ascolta gli eventi e aggiorna la tabella di lettura ottimizzata

@Component
public class OrdiniProjectionHandler {

    private final OrdiniReadRepository readRepo;

    // Gestisce OrdineEffettuatoEvent
    @EventHandler
    public void on(OrdineEffettuatoEvent event) {
        OrdineReadModel readModel = OrdineReadModel.builder()
            .ordineId(event.getOrdineId())
            .clienteId(event.getClienteId())
            .stato("IN_ATTESA_PAGAMENTO")
            .totale(event.getTotale())
            .linee(event.getLinee())
            .dataCreazione(event.getOccurredAt())
            .build();

        readRepo.save(readModel);
    }

    // Gestisce PagamentoConfermatoEvent
    @EventHandler
    public void on(PagamentoConfermatoEvent event) {
        readRepo.updateStato(event.getOrdineId(), "PAGATO");
        readRepo.updateDataPagamento(event.getOrdineId(), event.getOccurredAt());
    }

    // Query ottimizzate sul read model
    public List<OrdineReadModel> getOrdiniCliente(String clienteId) {
        return readRepo.findByClienteId(clienteId);
    }

    public List<OrdineReadModel> getOrdiniInAttesa() {
        return readRepo.findByStato("IN_ATTESA_PAGAMENTO");
    }
}

// Schema del read model (tabella ottimizzata per le query piu comuni)
// CREATE TABLE ordini_read (
//   ordine_id      VARCHAR(36) PRIMARY KEY,
//   cliente_id     VARCHAR(36) NOT NULL,
//   stato          VARCHAR(50) NOT NULL,
//   totale         DECIMAL(10,2),
//   data_creazione TIMESTAMPTZ,
//   data_pagamento TIMESTAMPTZ,
//   linee          JSONB,
//   -- Indici ottimizzati per le query frequenti
//   CONSTRAINT idx_cliente_id USING btree(cliente_id),
//   CONSTRAINT idx_stato USING btree(stato)
// );

スナップショット: 多数のイベントを含む集計の再生の最適化

アグリゲートは時間の経過とともにイベントを蓄積するため、 フルリプレイ 遅くなります: ステータス更新が 100 件ある注文には、準備が整うまでに 100 件のクエリと 100 件の適用が必要です 新しいコマンドを管理します。の スナップショット この問題を解決します: 定期的に集約の現在の状態が保存され (スナップショット)、次回のリロード時に保存されます。 ストーリーの最初からではなく、スナップショットから始めます。

// SnapshotStore.java - Gestione degli snapshot degli aggregati

public interface SnapshotStore {
    Optional<AggregateSnapshot> loadLatestSnapshot(String aggregateId);
    void saveSnapshot(String aggregateId, Object aggregateState, long version);
}

// AggregateRepository con supporto snapshot
public class SnapshottingAggregateRepository {

    private static final int SNAPSHOT_THRESHOLD = 50; // snapshot ogni 50 eventi

    private final EventStore eventStore;
    private final SnapshotStore snapshotStore;

    public OrdineAggregate load(String ordineId) {
        // 1. Prova a caricare lo snapshot piu recente
        Optional<AggregateSnapshot> snapshot = snapshotStore.loadLatestSnapshot(ordineId);

        OrdineAggregate aggregate;
        long fromVersion;

        if (snapshot.isPresent()) {
            // Parte dallo snapshot invece che dall'evento 0
            aggregate = (OrdineAggregate) snapshot.get().getState();
            fromVersion = snapshot.get().getVersion() + 1;
        } else {
            aggregate = new OrdineAggregate();
            fromVersion = 0;
        }

        // 2. Carica solo gli eventi DOPO lo snapshot
        List<DomainEvent> recentEvents = eventStore.loadEventsFrom(ordineId, fromVersion);
        recentEvents.forEach(aggregate::applyEvent);

        return aggregate;
    }

    public void save(OrdineAggregate aggregate) {
        List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
        eventStore.appendEvents(
            aggregate.getOrdineId(),
            uncommitted,
            aggregate.getVersion() - uncommitted.size()
        );
        aggregate.clearUncommittedEvents();

        // Crea uno snapshot se l'aggregate ha accumulato abbastanza eventi
        if (aggregate.getVersion() % SNAPSHOT_THRESHOLD == 0) {
            snapshotStore.saveSnapshot(
                aggregate.getOrdineId(),
                aggregate,          // serializza lo stato corrente
                aggregate.getVersion()
            );
        }
    }
}

// Tabella snapshot
// CREATE TABLE aggregate_snapshots (
//   aggregate_id VARCHAR(36) NOT NULL,
//   version      BIGINT NOT NULL,
//   state        JSONB NOT NULL,
//   created_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
//   PRIMARY KEY (aggregate_id, version)
// );

最終的な整合性の管理

イベント ソーシング + CQRS の主な課題は次のとおりです。 不一致ウィンドウ: コマンドが正常に実行された後、読み取りモデルはまだ更新されていません (投影はまだイベントを処理中です)。その後すぐに読むクライアント 変更が表示されない場合があります。

この状況を管理するには、いくつかの戦略があります。

戦略 1: バージョンチェックを使用したポーリング

// Il client aspetta che il read model raggiunga la versione attesa
// Utile per operazioni sequenziali: "aspetta che l'ordine sia visibile prima di procedere"

async function waitForProjection(ordineId, expectedVersion, maxWaitMs = 5000) {
    const startTime = Date.now();

    while (Date.now() - startTime < maxWaitMs) {
        const ordine = await readModel.getOrdine(ordineId);

        if (ordine && ordine.version >= expectedVersion) {
            return ordine;  // Projection aggiornata
        }

        await sleep(100);  // Aspetta 100ms e riprova
    }

    throw new ProjectionTimeoutError(
        `Timeout: read model non aggiornato dopo ${maxWaitMs}ms per ordine ${ordineId}`
    );
}

戦略 2: コマンド応答でバージョンを返す

// Command Handler restituisce la versione dell'evento prodotto
// Il client la usa per il version check o per il polling

@PostMapping("/ordini")
public ResponseEntity<CommandResponse> effettuaOrdine(@RequestBody EffettuaOrdineCommand cmd) {
    OrdineAggregate aggregate = ordineRepository.load(cmd.getOrdineId());
    aggregate.effettuaOrdine(cmd);
    ordineRepository.save(aggregate);

    return ResponseEntity.ok(CommandResponse.builder()
        .aggregateId(cmd.getOrdineId())
        .version(aggregate.getVersion())  // la versione dopo il salvataggio
        .message("Ordine effettuato con successo")
        .build());
}

戦略 3: 重要なデータの同期投影

// Per alcune proiezioni critiche, aggiorna il read model nella stessa transazione
// del salvataggio degli eventi (sacrificando un po' di disaccoppiamento)
// Usato quando il client deve assolutamente vedere i dati immediatamente

@Transactional
public void saveWithSyncProjection(OrdineAggregate aggregate) {
    // 1. Salva eventi nell'Event Store
    List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
    eventStore.appendEvents(aggregate.getOrdineId(), uncommitted, ...);

    // 2. Aggiorna il read model nella STESSA transazione (sincrono)
    uncommitted.forEach(event -> {
        if (event instanceof OrdineEffettuatoEvent e) {
            ordiniReadRepo.save(OrdineReadModel.from(e));
        }
    });

    // 3. Pubblica gli eventi sul bus per le projection asincrone degli altri servizi
    eventBus.publish(uncommitted);

    aggregate.clearUncommittedEvents();
}

プロジェクションの再構築

イベント ソーシングの大きな利点の 1 つは、再構築できることです。 イベント ストアの最初からすべてのイベントを再読み込みすることで、投影を最初から実行できます。 これにより、新しい読み取りモデルを遡及的に追加したり、バグを修正したりすることができます 既存の予測では。

// ProjectionRebuilder.java - Ricostruisce una projection da zero
public class ProjectionRebuilder {

    private final EventStore eventStore;

    public void rebuild(ProjectionHandler handler, String fromAggregateType) {
        System.out.println("Avvio rebuild projection: " + handler.getClass().getSimpleName());

        // 1. Svuota il read model corrente
        handler.reset();

        // 2. Legge tutti gli eventi in ordine cronologico
        // (implementazione dipende dall'event store specifico)
        AtomicLong processed = new AtomicLong(0);

        eventStore.streamAllEvents(fromAggregateType, event -> {
            handler.handle(event);
            long count = processed.incrementAndGet();
            if (count % 1000 == 0) {
                System.out.println("Processati " + count + " eventi...");
            }
        });

        System.out.println("Rebuild completato: " + processed.get() + " eventi processati");
    }
}

ベストプラクティスとアンチパターン

アンチパターン: 過去の出来事を変える

イベント ストアのイベントは次のとおりです。 不変。書き込み後は絶対に編集しないでください。 イベント スキーマを変更する必要がある場合は、新しいバージョンのイベントを作成します (OrdineEffettuatoV2Event)、デシリアライザーでアップキャストを処理します。 過去のイベントを変更すると、監査証跡の整合性が損なわれ、再生が中断されます。

  • トランザクションごとに 1 つの集計: 同じ作業単位内の複数の集計を編集しないでください。 Saga を使用して、クロス集計操作を調整します。
  • 過去の事実としての出来事: イベント名は、すでに起こったことを説明します (OrdineEffettuatoEvent、 ない EffettuaOrdineEvent).
  • べき等投影: 各プロジェクション ハンドラーは、同じイベントで複数回呼び出しても安全である必要があります (少なくとも 1 回の配信)。
  • コマンドなどのイベントは使用しないでください: イベントのパブリッシュでは、別のアグリゲートでコマンドを直接起動してはなりません。 Saga または Process Manager を使用します。
  • イベント数に基づくスナップショットポリシー: 50 ~ 100 イベントごとにスナップショットを作成します。頻度が高すぎる = シリアル化のオーバーヘッド。レアすぎる = リプレイが遅い。

シリーズの次のステップ

  • 第5条 – サーガパターン: 操作に複数の集計が含まれる場合 または複数のサービスの場合、Saga パターンは 2PC なしで分散トランザクションを処理します。 失敗した場合の補償トランザクション付き。
  • 第 6 条 – AWS EventBridge: イベント ストアからイベントを公開する方法 EventBridge 上でサーバーレス方法で Lambda、SQS、その他のターゲットに到達します。

他シリーズとの連携

  • Apache Kafka (シリーズ 38): Kafka は理想的なイベント ストアおよびメッセージ バスです 実稼働環境でのイベント ソーシング用。 Kafka Streams と Kafka Connect の記事は統合されています ここで説明されているパターンを直接使用します。
  • PostgreSQL AI と pgvector: 投影の読み取りモデルは次のようになります。 類似性のためのベクトル列を含む特殊なインデックスを備えた PostgreSQL データベース 注文のテキスト説明のセマンティクス。