요약: 이벤트 소싱 및 CQRS 분리

두 패턴을 결합하기 전에 두 패턴을 개별적으로 요약해 보겠습니다.

  • 이벤트 소싱: Aggregate의 현재 상태를 데이터베이스에 저장하는 대신, 당신은 저장 일련의 사건 누가 그것을 만들었는지. 국가가 재건된다 재생(재생) 처음부터 이벤트. 장점: 완전한 감사 추적, 시간여행, 사건에서 얻은 비즈니스 인사이트.
  • CQRS: 명령 측(쓰기 모델)과 쿼리 측(읽기 모델)이 별개입니다. 쓰기 모델은 명령을 수신하고 이벤트를 생성합니다. 읽기 모델은 다음에 최적화되어 있습니다. 이벤트를 기반으로 구축된 애플리케이션별 쿼리입니다.

조합은 자연스럽습니다. 이벤트 소싱은 이벤트를 생성하고 CQRS는 이를 사용하여 빌드합니다. 최적화된 읽기 모델(le 투영).

이벤트 소싱 + CQRS를 함께 사용하는 경우

  • 필수 감사 추적 시스템(핀테크, 헬스케어, 전자상거래)
  • 읽기 모델이 쓰기 모델과 구조가 매우 다른 경우
  • 쓰기에 비해 읽기를 10~100배 확장해야 하는 경우
  • 시간 여행 쿼리가 있는 시스템("3월 15일의 상태는 어땠나요?")
  • 사용하지 마십시오 간단한 CRUD 애플리케이션의 경우: 상당한 과잉

완전한 아키텍처: 엔드투엔드 흐름

이벤트 소싱 + CQRS 시스템의 요청 흐름은 다음과 같습니다.

  1. 클라이언트는 명령 (예: EffettuaOrdineCommand)
  2. Il 명령 처리기 이벤트 저장소에서 Aggregate 로드(이벤트 재생)
  3. Aggregate는 명령의 유효성을 검사하고 하나 이상의 명령을 생성합니다. 도메인 이벤트 (예: OrdineEffettuatoEvent)
  4. 이벤트는 지속됩니다.이벤트 매장 (추가 전용)
  5. 이벤트는 다음 날짜에 게시됩니다. 메시지 버스 (카프카, 이벤트브릿지 등)
  6. 하나 이상 프로젝션 핸들러 그들은 이벤트를 소비하고 모델 읽기
  7. 읽기 모델(일반적으로 읽기 최적화된 데이터베이스)에서 읽은 클라이언트 쿼리

구현: 이벤트 소싱을 통한 집계

// OrdineAggregate.java - Aggregate con Event Sourcing
// Segue il pattern "eventi come source of truth"

import java.util.*;
import java.time.Instant;

public class OrdineAggregate {

    // ID univoco dell'aggregate
    private String ordineId;
    // Versione corrente (numero di eventi applicati)
    private long version = -1;
    // Stato ricostruito dagli eventi
    private StatoOrdine stato;
    private String clienteId;
    private List<LineaOrdine> linee;
    private BigDecimal totale;

    // Lista degli eventi prodotti dalla sessione corrente (non ancora persistiti)
    private final List<DomainEvent> uncommittedEvents = new ArrayList<>();

    // Costruttore privato: si crea sempre tramite replay o factory method
    private OrdineAggregate() {}

    // ===== FACTORY METHOD: ricostruisce lo stato dal replay degli eventi =====
    public static OrdineAggregate reconstituteFrom(List<DomainEvent> events) {
        OrdineAggregate aggregate = new OrdineAggregate();
        events.forEach(aggregate::apply);
        return aggregate;
    }

    // ===== COMMAND HANDLING: valida il comando e produce eventi =====

    public void effettuaOrdine(EffettuaOrdineCommand cmd) {
        // Validazione business rules
        if (this.stato != null) {
            throw new IllegalStateException("Ordine gia esistente: " + ordineId);
        }
        if (cmd.getLinee() == null || cmd.getLinee().isEmpty()) {
            throw new IllegalArgumentException("L'ordine deve avere almeno una linea");
        }

        // Produce l'evento (nessuna modifica di stato diretta!)
        OrdineEffettuatoEvent event = new OrdineEffettuatoEvent(
            cmd.getOrdineId(),
            cmd.getClienteId(),
            cmd.getLinee(),
            Instant.now()
        );

        // Applica l'evento localmente e aggiungilo agli uncommitted
        applyAndRecord(event);
    }

    public void confermaPagamento(String metodoPagamento, String transactionId) {
        if (this.stato != StatoOrdine.IN_ATTESA_PAGAMENTO) {
            throw new IllegalStateException("Ordine non in attesa di pagamento");
        }

        PagamentoConfermatoEvent event = new PagamentoConfermatoEvent(
            ordineId, metodoPagamento, transactionId, Instant.now()
        );
        applyAndRecord(event);
    }

    // ===== APPLY: modifica lo stato interno applicando un evento =====
    // Questi metodi devono essere DETERMINISTICI e SENZA SIDE EFFECTS

    private void apply(OrdineEffettuatoEvent event) {
        this.ordineId = event.getOrdineId();
        this.clienteId = event.getClienteId();
        this.linee = new ArrayList<>(event.getLinee());
        this.totale = event.getTotale();
        this.stato = StatoOrdine.IN_ATTESA_PAGAMENTO;
        this.version++;
    }

    private void apply(PagamentoConfermatoEvent event) {
        this.stato = StatoOrdine.PAGATO;
        this.version++;
    }

    private void apply(DomainEvent event) {
        // Dispatch dinamico per tipo di evento
        if (event instanceof OrdineEffettuatoEvent e) apply(e);
        else if (event instanceof PagamentoConfermatoEvent e) apply(e);
        else throw new IllegalArgumentException("Evento sconosciuto: " + event.getClass());
    }

    private void applyAndRecord(DomainEvent event) {
        apply(event);
        uncommittedEvents.add(event);
    }

    public List<DomainEvent> getUncommittedEvents() {
        return Collections.unmodifiableList(uncommittedEvents);
    }

    public void clearUncommittedEvents() {
        uncommittedEvents.clear();
    }

    public long getVersion() { return version; }
    public String getOrdineId() { return ordineId; }
}

이벤트 저장소: 이벤트의 지속성

L'이벤트 매장 각 집계에 대한 이벤트 시퀀스를 유지하는 것은 데이터베이스입니다. 근본적인 요구 사항은 성경이 추가 전용 그리고 지원 는 낙관적 동시성 검사: 집계의 예상 버전을 지정합니다. 그 동안 다른 사람이 이벤트(충돌)를 작성하면 작성이 실패합니다.

// EventStore.java - Interfaccia e implementazione PostgreSQL
import java.util.*;

public interface EventStore {
    // Carica tutti gli eventi per un aggregate (per il replay)
    List<DomainEvent> loadEvents(String aggregateId);

    // Persiste nuovi eventi con optimistic concurrency check
    // expectedVersion: la versione che ci aspettavamo (fallisce se diversa)
    void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion);
}

// Implementazione con PostgreSQL
public class PostgresEventStore implements EventStore {

    private final DataSource dataSource;
    private final EventSerializer serializer;

    // DDL della tabella event_store
    // CREATE TABLE event_store (
    //   id           BIGSERIAL PRIMARY KEY,
    //   aggregate_id VARCHAR(36) NOT NULL,
    //   version      BIGINT NOT NULL,
    //   event_type   VARCHAR(255) NOT NULL,
    //   event_data   JSONB NOT NULL,
    //   occurred_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    //   UNIQUE(aggregate_id, version)  -- optimistic locking
    // );

    @Override
    public List<DomainEvent> loadEvents(String aggregateId) {
        String sql = "SELECT event_type, event_data, version " +
                     "FROM event_store WHERE aggregate_id = ? ORDER BY version ASC";

        try (Connection conn = dataSource.getConnection();
             PreparedStatement ps = conn.prepareStatement(sql)) {
            ps.setString(1, aggregateId);
            ResultSet rs = ps.executeQuery();

            List<DomainEvent> events = new ArrayList<>();
            while (rs.next()) {
                events.add(serializer.deserialize(
                    rs.getString("event_type"),
                    rs.getString("event_data")
                ));
            }
            return events;

        } catch (SQLException e) {
            throw new EventStoreException("Errore caricamento eventi per " + aggregateId, e);
        }
    }

    @Override
    public void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion) {
        String sql = "INSERT INTO event_store (aggregate_id, version, event_type, event_data) " +
                     "VALUES (?, ?, ?, ?::jsonb)";

        try (Connection conn = dataSource.getConnection()) {
            conn.setAutoCommit(false);

            // Optimistic concurrency check
            long currentVersion = getCurrentVersion(conn, aggregateId);
            if (currentVersion != expectedVersion) {
                throw new OptimisticConcurrencyException(
                    "Conflitto per aggregate " + aggregateId +
                    ": atteso version " + expectedVersion + ", trovato " + currentVersion
                );
            }

            try (PreparedStatement ps = conn.prepareStatement(sql)) {
                long version = expectedVersion + 1;
                for (DomainEvent event : events) {
                    ps.setString(1, aggregateId);
                    ps.setLong(2, version++);
                    ps.setString(3, event.getClass().getSimpleName());
                    ps.setString(4, serializer.serialize(event));
                    ps.addBatch();
                }
                ps.executeBatch();
            }

            conn.commit();

        } catch (SQLException e) {
            throw new EventStoreException("Errore scrittura eventi", e);
        }
    }
}

프로젝션: 이벤트에서 읽기 모델 구축

에이 투사 뷰를 작성하는 이벤트 소비자입니다. (읽기 모델) 쿼리에 최적화되었습니다. 모든 투영은 멱등성을 가지며 다음과 같은 일이 가능합니다. 이벤트 저장소 시작 부분부터 모든 이벤트를 다시 읽어서 처음부터 다시 빌드할 수 있습니다.

예측은 각 이벤트 후에 비동기적으로 업데이트됩니다. 이것이 소스입니다. 의 최종 일관성. 방금 명령을 실행한 클라이언트는 읽기 모델의 결과를 즉시 볼 수는 없습니다.

// OrdiniProjectionHandler.java - Costruisce il read model degli ordini
// Ascolta gli eventi e aggiorna la tabella di lettura ottimizzata

@Component
public class OrdiniProjectionHandler {

    private final OrdiniReadRepository readRepo;

    // Gestisce OrdineEffettuatoEvent
    @EventHandler
    public void on(OrdineEffettuatoEvent event) {
        OrdineReadModel readModel = OrdineReadModel.builder()
            .ordineId(event.getOrdineId())
            .clienteId(event.getClienteId())
            .stato("IN_ATTESA_PAGAMENTO")
            .totale(event.getTotale())
            .linee(event.getLinee())
            .dataCreazione(event.getOccurredAt())
            .build();

        readRepo.save(readModel);
    }

    // Gestisce PagamentoConfermatoEvent
    @EventHandler
    public void on(PagamentoConfermatoEvent event) {
        readRepo.updateStato(event.getOrdineId(), "PAGATO");
        readRepo.updateDataPagamento(event.getOrdineId(), event.getOccurredAt());
    }

    // Query ottimizzate sul read model
    public List<OrdineReadModel> getOrdiniCliente(String clienteId) {
        return readRepo.findByClienteId(clienteId);
    }

    public List<OrdineReadModel> getOrdiniInAttesa() {
        return readRepo.findByStato("IN_ATTESA_PAGAMENTO");
    }
}

// Schema del read model (tabella ottimizzata per le query piu comuni)
// CREATE TABLE ordini_read (
//   ordine_id      VARCHAR(36) PRIMARY KEY,
//   cliente_id     VARCHAR(36) NOT NULL,
//   stato          VARCHAR(50) NOT NULL,
//   totale         DECIMAL(10,2),
//   data_creazione TIMESTAMPTZ,
//   data_pagamento TIMESTAMPTZ,
//   linee          JSONB,
//   -- Indici ottimizzati per le query frequenti
//   CONSTRAINT idx_cliente_id USING btree(cliente_id),
//   CONSTRAINT idx_stato USING btree(stato)
// );

스냅샷: 이벤트가 많은 집계 재생 최적화

Aggregate는 시간이 지남에 따라 이벤트를 누적하므로 전체 재생 느려진다: 100개의 상태 업데이트가 있는 주문에는 100개의 쿼리가 필요하고 준비되기 전에 100개의 적용이 필요합니다. 새로운 명령을 관리합니다. 그만큼 스냅샷 이 문제를 해결합니다. 주기적으로 Aggregate의 현재 상태가 저장되고(스냅샷) 다음 다시 로드 시 이야기의 시작 부분부터가 아니라 스냅샷부터 시작합니다.

// SnapshotStore.java - Gestione degli snapshot degli aggregati

public interface SnapshotStore {
    Optional<AggregateSnapshot> loadLatestSnapshot(String aggregateId);
    void saveSnapshot(String aggregateId, Object aggregateState, long version);
}

// AggregateRepository con supporto snapshot
public class SnapshottingAggregateRepository {

    private static final int SNAPSHOT_THRESHOLD = 50; // snapshot ogni 50 eventi

    private final EventStore eventStore;
    private final SnapshotStore snapshotStore;

    public OrdineAggregate load(String ordineId) {
        // 1. Prova a caricare lo snapshot piu recente
        Optional<AggregateSnapshot> snapshot = snapshotStore.loadLatestSnapshot(ordineId);

        OrdineAggregate aggregate;
        long fromVersion;

        if (snapshot.isPresent()) {
            // Parte dallo snapshot invece che dall'evento 0
            aggregate = (OrdineAggregate) snapshot.get().getState();
            fromVersion = snapshot.get().getVersion() + 1;
        } else {
            aggregate = new OrdineAggregate();
            fromVersion = 0;
        }

        // 2. Carica solo gli eventi DOPO lo snapshot
        List<DomainEvent> recentEvents = eventStore.loadEventsFrom(ordineId, fromVersion);
        recentEvents.forEach(aggregate::applyEvent);

        return aggregate;
    }

    public void save(OrdineAggregate aggregate) {
        List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
        eventStore.appendEvents(
            aggregate.getOrdineId(),
            uncommitted,
            aggregate.getVersion() - uncommitted.size()
        );
        aggregate.clearUncommittedEvents();

        // Crea uno snapshot se l'aggregate ha accumulato abbastanza eventi
        if (aggregate.getVersion() % SNAPSHOT_THRESHOLD == 0) {
            snapshotStore.saveSnapshot(
                aggregate.getOrdineId(),
                aggregate,          // serializza lo stato corrente
                aggregate.getVersion()
            );
        }
    }
}

// Tabella snapshot
// CREATE TABLE aggregate_snapshots (
//   aggregate_id VARCHAR(36) NOT NULL,
//   version      BIGINT NOT NULL,
//   state        JSONB NOT NULL,
//   created_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
//   PRIMARY KEY (aggregate_id, version)
// );

최종 일관성 관리

Event Sourcing + CQRS의 주요 과제는 다음과 같습니다. 불일치 창: 명령이 성공적으로 실행된 후에도 읽기 모델은 아직 업데이트되지 않습니다. (Projection은 아직 이벤트를 처리 중입니다.) 직후에 읽는 클라이언트 변경 사항이 표시되지 않을 수 있습니다.

이 상황을 관리하기 위한 몇 가지 전략이 있습니다.

전략 1: 버전 확인을 통한 폴링

// Il client aspetta che il read model raggiunga la versione attesa
// Utile per operazioni sequenziali: "aspetta che l'ordine sia visibile prima di procedere"

async function waitForProjection(ordineId, expectedVersion, maxWaitMs = 5000) {
    const startTime = Date.now();

    while (Date.now() - startTime < maxWaitMs) {
        const ordine = await readModel.getOrdine(ordineId);

        if (ordine && ordine.version >= expectedVersion) {
            return ordine;  // Projection aggiornata
        }

        await sleep(100);  // Aspetta 100ms e riprova
    }

    throw new ProjectionTimeoutError(
        `Timeout: read model non aggiornato dopo ${maxWaitMs}ms per ordine ${ordineId}`
    );
}

전략 2: 명령 응답에 버전 반환

// Command Handler restituisce la versione dell'evento prodotto
// Il client la usa per il version check o per il polling

@PostMapping("/ordini")
public ResponseEntity<CommandResponse> effettuaOrdine(@RequestBody EffettuaOrdineCommand cmd) {
    OrdineAggregate aggregate = ordineRepository.load(cmd.getOrdineId());
    aggregate.effettuaOrdine(cmd);
    ordineRepository.save(aggregate);

    return ResponseEntity.ok(CommandResponse.builder()
        .aggregateId(cmd.getOrdineId())
        .version(aggregate.getVersion())  // la versione dopo il salvataggio
        .message("Ordine effettuato con successo")
        .build());
}

전략 3: 중요 데이터에 대한 동기식 프로젝션

// Per alcune proiezioni critiche, aggiorna il read model nella stessa transazione
// del salvataggio degli eventi (sacrificando un po' di disaccoppiamento)
// Usato quando il client deve assolutamente vedere i dati immediatamente

@Transactional
public void saveWithSyncProjection(OrdineAggregate aggregate) {
    // 1. Salva eventi nell'Event Store
    List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
    eventStore.appendEvents(aggregate.getOrdineId(), uncommitted, ...);

    // 2. Aggiorna il read model nella STESSA transazione (sincrono)
    uncommitted.forEach(event -> {
        if (event instanceof OrdineEffettuatoEvent e) {
            ordiniReadRepo.save(OrdineReadModel.from(e));
        }
    });

    // 3. Pubblica gli eventi sul bus per le projection asincrone degli altri servizi
    eventBus.publish(uncommitted);

    aggregate.clearUncommittedEvents();
}

투영 재구축

이벤트 소싱의 가장 큰 장점 중 하나는 재구축 기능입니다. 이벤트 저장소 시작부터 모든 이벤트를 다시 읽어 처음부터 모든 투영을 수행합니다. 이를 통해 소급하여 새로운 읽기 모델을 추가하거나 버그를 수정할 수 있습니다. 기존 예측에서.

// ProjectionRebuilder.java - Ricostruisce una projection da zero
public class ProjectionRebuilder {

    private final EventStore eventStore;

    public void rebuild(ProjectionHandler handler, String fromAggregateType) {
        System.out.println("Avvio rebuild projection: " + handler.getClass().getSimpleName());

        // 1. Svuota il read model corrente
        handler.reset();

        // 2. Legge tutti gli eventi in ordine cronologico
        // (implementazione dipende dall'event store specifico)
        AtomicLong processed = new AtomicLong(0);

        eventStore.streamAllEvents(fromAggregateType, event -> {
            handler.handle(event);
            long count = processed.incrementAndGet();
            if (count % 1000 == 0) {
                System.out.println("Processati " + count + " eventi...");
            }
        });

        System.out.println("Rebuild completato: " + processed.get() + " eventi processati");
    }
}

모범 사례 및 안티 패턴

안티 패턴: 과거 이벤트 변경

이벤트 상점의 이벤트는 다음과 같습니다. 불변. 작성한 후에는 절대 편집하지 마세요. 이벤트 스키마를 변경해야 하는 경우 새 버전의 이벤트를 생성하세요. (OrdineEffettuatoV2Event) 디시리얼라이저에서 업캐스팅을 처리합니다. 과거 이벤트를 변경하면 감사 추적의 무결성이 손상되고 재생이 중단됩니다.

  • 거래당 하나의 집계: 동일한 작업 단위에서 여러 집계를 편집하지 마십시오. Saga를 사용하여 교차 집계 작업을 조율하세요.
  • 과거 사실로서의 사건: 이벤트 이름은 이미 발생한 일을 설명합니다(OrdineEffettuatoEvent, 아니다 EffettuaOrdineEvent).
  • 멱등성 투영: 각 프로젝션 핸들러는 동일한 이벤트(최소 한 번 전달)로 여러 번 호출해도 안전해야 합니다.
  • Command와 같은 이벤트를 사용하지 마십시오.: 이벤트 게시 시 다른 집계에서 명령을 직접 실행해서는 안 됩니다. Saga 또는 프로세스 관리자를 사용하십시오.
  • 이벤트 수에 따른 스냅샷 정책: 50~100개의 이벤트마다 스냅샷을 생성합니다. 너무 자주 = 직렬화 오버헤드; 너무 드물다 = 재생 속도가 느림.

시리즈의 다음 단계

  • 기사 5 - 사가 패턴: 작업에 여러 집계가 포함되는 경우 또는 여러 서비스를 사용하는 경우 Saga Pattern은 2PC 없이 분산 트랜잭션을 처리하며, 실패 시 거래를 보상합니다.
  • 6조 – AWS EventBridge: 이벤트 스토어에서 이벤트를 게시하는 방법 EventBridge에서 서버리스 방식으로 Lambda, SQS 및 기타 대상에 도달합니다.

다른 시리즈와의 연계

  • 아파치 카프카(시리즈 38): Kafka는 이상적인 이벤트 저장소이자 메시지 버스입니다. 프로덕션의 이벤트 소싱을 위한 것입니다. Kafka Streams 및 Kafka Connect의 기사는 통합됩니다. 여기에 설명된 패턴을 직접 사용합니다.
  • PostgreSQL AI 및 pgVector: 투영의 읽기 모델은 다음과 같습니다. 유사성을 위한 벡터 열을 포함하여 특수 인덱스가 있는 PostgreSQL 데이터베이스 주문의 텍스트 설명에 대한 의미.