Event Sourcing + CQRS Together: Projection, Snapshot and Consistency
Event Sourcing alone handles the write side in an immutable and auditable way. CQRS alone optimizes reading and writing separately. Used Together, they solve one of the hardest problems of software architecture: how to have a fully auditable system, scalable on readings and performant on queries. But this power comes at a cost: projections to maintain, snapshots for replay efficient, and the management of any consistency between write and read models.
Summary: Event Sourcing and CQRS Separate
Before combining them, let's recap the two patterns in isolation:
- Event Sourcing: Instead of saving the current state of the aggregate to the database, you save the sequence of events who created it. The state is reconstructed reproducing (replaying) the events from the beginning. Advantage: complete audit trail, time travel, business insights derived from events.
- CQRS: Command side (write model) and Query side (read model) are separate. The write model receives Command and produces events. The read model is optimized for application-specific queries, built from events.
The combination is natural: Event Sourcing produces events, CQRS consumes them to build optimized read models (le projections).
When to Use Event Sourcing + CQRS Together
- Systems with mandatory audit trail (fintech, healthcare, e-commerce)
- When read models have very different structures from the write model
- When reads need to scale 10-100x compared to writes
- Systems with time travel queries ("what was the status on March 15th?")
- Do not use for simple CRUD applications: significant overkill
Complete Architecture: End-to-end flow
The flow of a request in an Event Sourcing + CQRS system is as follows:
- The client sends a Command (e.g.
EffettuaOrdineCommand) - Il Command Handler load the Aggregate from the Event Store (event replay)
- The Aggregate validates the Command and produces one or more Domain Event (e.g.
OrdineEffettuatoEvent) - Events are persisted intoEvent Store (append-only)
- Events are published on Message Bus (Kafka, EventBridge, etc.)
- One or more Projection Handler they consume events and update the Read Model
- Client queries read from the Read Model (typically a read-optimized database)
Implementation: The Aggregate with Event Sourcing
// OrdineAggregate.java - Aggregate con Event Sourcing
// Segue il pattern "eventi come source of truth"
import java.util.*;
import java.time.Instant;
public class OrdineAggregate {
// ID univoco dell'aggregate
private String ordineId;
// Versione corrente (numero di eventi applicati)
private long version = -1;
// Stato ricostruito dagli eventi
private StatoOrdine stato;
private String clienteId;
private List<LineaOrdine> linee;
private BigDecimal totale;
// Lista degli eventi prodotti dalla sessione corrente (non ancora persistiti)
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// Costruttore privato: si crea sempre tramite replay o factory method
private OrdineAggregate() {}
// ===== FACTORY METHOD: ricostruisce lo stato dal replay degli eventi =====
public static OrdineAggregate reconstituteFrom(List<DomainEvent> events) {
OrdineAggregate aggregate = new OrdineAggregate();
events.forEach(aggregate::apply);
return aggregate;
}
// ===== COMMAND HANDLING: valida il comando e produce eventi =====
public void effettuaOrdine(EffettuaOrdineCommand cmd) {
// Validazione business rules
if (this.stato != null) {
throw new IllegalStateException("Ordine gia esistente: " + ordineId);
}
if (cmd.getLinee() == null || cmd.getLinee().isEmpty()) {
throw new IllegalArgumentException("L'ordine deve avere almeno una linea");
}
// Produce l'evento (nessuna modifica di stato diretta!)
OrdineEffettuatoEvent event = new OrdineEffettuatoEvent(
cmd.getOrdineId(),
cmd.getClienteId(),
cmd.getLinee(),
Instant.now()
);
// Applica l'evento localmente e aggiungilo agli uncommitted
applyAndRecord(event);
}
public void confermaPagamento(String metodoPagamento, String transactionId) {
if (this.stato != StatoOrdine.IN_ATTESA_PAGAMENTO) {
throw new IllegalStateException("Ordine non in attesa di pagamento");
}
PagamentoConfermatoEvent event = new PagamentoConfermatoEvent(
ordineId, metodoPagamento, transactionId, Instant.now()
);
applyAndRecord(event);
}
// ===== APPLY: modifica lo stato interno applicando un evento =====
// Questi metodi devono essere DETERMINISTICI e SENZA SIDE EFFECTS
private void apply(OrdineEffettuatoEvent event) {
this.ordineId = event.getOrdineId();
this.clienteId = event.getClienteId();
this.linee = new ArrayList<>(event.getLinee());
this.totale = event.getTotale();
this.stato = StatoOrdine.IN_ATTESA_PAGAMENTO;
this.version++;
}
private void apply(PagamentoConfermatoEvent event) {
this.stato = StatoOrdine.PAGATO;
this.version++;
}
private void apply(DomainEvent event) {
// Dispatch dinamico per tipo di evento
if (event instanceof OrdineEffettuatoEvent e) apply(e);
else if (event instanceof PagamentoConfermatoEvent e) apply(e);
else throw new IllegalArgumentException("Evento sconosciuto: " + event.getClass());
}
private void applyAndRecord(DomainEvent event) {
apply(event);
uncommittedEvents.add(event);
}
public List<DomainEvent> getUncommittedEvents() {
return Collections.unmodifiableList(uncommittedEvents);
}
public void clearUncommittedEvents() {
uncommittedEvents.clear();
}
public long getVersion() { return version; }
public String getOrdineId() { return ordineId; }
}
Event Store: The Persistence of Events
L'Event Store it is the database that persists the sequence of events for each aggregate. The fundamental requirement is that the scriptures are append-only and support the optimistic concurrency check: you specify the expected version of the aggregate and the writing fails if in the meantime someone else has written an event (conflict).
// EventStore.java - Interfaccia e implementazione PostgreSQL
import java.util.*;
public interface EventStore {
// Carica tutti gli eventi per un aggregate (per il replay)
List<DomainEvent> loadEvents(String aggregateId);
// Persiste nuovi eventi con optimistic concurrency check
// expectedVersion: la versione che ci aspettavamo (fallisce se diversa)
void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion);
}
// Implementazione con PostgreSQL
public class PostgresEventStore implements EventStore {
private final DataSource dataSource;
private final EventSerializer serializer;
// DDL della tabella event_store
// CREATE TABLE event_store (
// id BIGSERIAL PRIMARY KEY,
// aggregate_id VARCHAR(36) NOT NULL,
// version BIGINT NOT NULL,
// event_type VARCHAR(255) NOT NULL,
// event_data JSONB NOT NULL,
// occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// UNIQUE(aggregate_id, version) -- optimistic locking
// );
@Override
public List<DomainEvent> loadEvents(String aggregateId) {
String sql = "SELECT event_type, event_data, version " +
"FROM event_store WHERE aggregate_id = ? ORDER BY version ASC";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, aggregateId);
ResultSet rs = ps.executeQuery();
List<DomainEvent> events = new ArrayList<>();
while (rs.next()) {
events.add(serializer.deserialize(
rs.getString("event_type"),
rs.getString("event_data")
));
}
return events;
} catch (SQLException e) {
throw new EventStoreException("Errore caricamento eventi per " + aggregateId, e);
}
}
@Override
public void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion) {
String sql = "INSERT INTO event_store (aggregate_id, version, event_type, event_data) " +
"VALUES (?, ?, ?, ?::jsonb)";
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
// Optimistic concurrency check
long currentVersion = getCurrentVersion(conn, aggregateId);
if (currentVersion != expectedVersion) {
throw new OptimisticConcurrencyException(
"Conflitto per aggregate " + aggregateId +
": atteso version " + expectedVersion + ", trovato " + currentVersion
);
}
try (PreparedStatement ps = conn.prepareStatement(sql)) {
long version = expectedVersion + 1;
for (DomainEvent event : events) {
ps.setString(1, aggregateId);
ps.setLong(2, version++);
ps.setString(3, event.getClass().getSimpleName());
ps.setString(4, serializer.serialize(event));
ps.addBatch();
}
ps.executeBatch();
}
conn.commit();
} catch (SQLException e) {
throw new EventStoreException("Errore scrittura eventi", e);
}
}
}
Projection: Building the Read Model from Events
Una projection is an event consumer that builds a view (the read model) optimized for queries. Every projection is idempotent and can be rebuilt from scratch by re-reading all the events from the beginning of the Event Store.
The projections are updated asynchronously after each event: this is the source of eventual consistency. The client who just executed a Command might do not immediately see results on the Read Model.
// OrdiniProjectionHandler.java - Costruisce il read model degli ordini
// Ascolta gli eventi e aggiorna la tabella di lettura ottimizzata
@Component
public class OrdiniProjectionHandler {
private final OrdiniReadRepository readRepo;
// Gestisce OrdineEffettuatoEvent
@EventHandler
public void on(OrdineEffettuatoEvent event) {
OrdineReadModel readModel = OrdineReadModel.builder()
.ordineId(event.getOrdineId())
.clienteId(event.getClienteId())
.stato("IN_ATTESA_PAGAMENTO")
.totale(event.getTotale())
.linee(event.getLinee())
.dataCreazione(event.getOccurredAt())
.build();
readRepo.save(readModel);
}
// Gestisce PagamentoConfermatoEvent
@EventHandler
public void on(PagamentoConfermatoEvent event) {
readRepo.updateStato(event.getOrdineId(), "PAGATO");
readRepo.updateDataPagamento(event.getOrdineId(), event.getOccurredAt());
}
// Query ottimizzate sul read model
public List<OrdineReadModel> getOrdiniCliente(String clienteId) {
return readRepo.findByClienteId(clienteId);
}
public List<OrdineReadModel> getOrdiniInAttesa() {
return readRepo.findByStato("IN_ATTESA_PAGAMENTO");
}
}
// Schema del read model (tabella ottimizzata per le query piu comuni)
// CREATE TABLE ordini_read (
// ordine_id VARCHAR(36) PRIMARY KEY,
// cliente_id VARCHAR(36) NOT NULL,
// stato VARCHAR(50) NOT NULL,
// totale DECIMAL(10,2),
// data_creazione TIMESTAMPTZ,
// data_pagamento TIMESTAMPTZ,
// linee JSONB,
// -- Indici ottimizzati per le query frequenti
// CONSTRAINT idx_cliente_id USING btree(cliente_id),
// CONSTRAINT idx_stato USING btree(stato)
// );
Snapshot: Optimizing the Replay of Aggregates with Many Events
As an Aggregate accumulates events over time, the full replay becomes slow: an order with 100 status updates requires 100 queries and 100 applies before it is ready to manage a new Command. The Snapshots solves this problem: periodically the current state of the Aggregate is saved (snapshot), and at the next reload we start from the snapshot instead of from the beginning of the story.
// SnapshotStore.java - Gestione degli snapshot degli aggregati
public interface SnapshotStore {
Optional<AggregateSnapshot> loadLatestSnapshot(String aggregateId);
void saveSnapshot(String aggregateId, Object aggregateState, long version);
}
// AggregateRepository con supporto snapshot
public class SnapshottingAggregateRepository {
private static final int SNAPSHOT_THRESHOLD = 50; // snapshot ogni 50 eventi
private final EventStore eventStore;
private final SnapshotStore snapshotStore;
public OrdineAggregate load(String ordineId) {
// 1. Prova a caricare lo snapshot piu recente
Optional<AggregateSnapshot> snapshot = snapshotStore.loadLatestSnapshot(ordineId);
OrdineAggregate aggregate;
long fromVersion;
if (snapshot.isPresent()) {
// Parte dallo snapshot invece che dall'evento 0
aggregate = (OrdineAggregate) snapshot.get().getState();
fromVersion = snapshot.get().getVersion() + 1;
} else {
aggregate = new OrdineAggregate();
fromVersion = 0;
}
// 2. Carica solo gli eventi DOPO lo snapshot
List<DomainEvent> recentEvents = eventStore.loadEventsFrom(ordineId, fromVersion);
recentEvents.forEach(aggregate::applyEvent);
return aggregate;
}
public void save(OrdineAggregate aggregate) {
List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
eventStore.appendEvents(
aggregate.getOrdineId(),
uncommitted,
aggregate.getVersion() - uncommitted.size()
);
aggregate.clearUncommittedEvents();
// Crea uno snapshot se l'aggregate ha accumulato abbastanza eventi
if (aggregate.getVersion() % SNAPSHOT_THRESHOLD == 0) {
snapshotStore.saveSnapshot(
aggregate.getOrdineId(),
aggregate, // serializza lo stato corrente
aggregate.getVersion()
);
}
}
}
// Tabella snapshot
// CREATE TABLE aggregate_snapshots (
// aggregate_id VARCHAR(36) NOT NULL,
// version BIGINT NOT NULL,
// state JSONB NOT NULL,
// created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// PRIMARY KEY (aggregate_id, version)
// );
Managing Eventual Consistency
The main challenge of Event Sourcing + CQRS is inconsistency window: after a Command is successfully executed, the Read Model is not yet updated (Projection is still processing the event). A client that reads immediately afterwards you may not see your changes.
There are several strategies to manage this situation:
Strategy 1: Polling with Version Check
// Il client aspetta che il read model raggiunga la versione attesa
// Utile per operazioni sequenziali: "aspetta che l'ordine sia visibile prima di procedere"
async function waitForProjection(ordineId, expectedVersion, maxWaitMs = 5000) {
const startTime = Date.now();
while (Date.now() - startTime < maxWaitMs) {
const ordine = await readModel.getOrdine(ordineId);
if (ordine && ordine.version >= expectedVersion) {
return ordine; // Projection aggiornata
}
await sleep(100); // Aspetta 100ms e riprova
}
throw new ProjectionTimeoutError(
`Timeout: read model non aggiornato dopo ${maxWaitMs}ms per ordine ${ordineId}`
);
}
Strategy 2: Return the Version in the Command Response
// Command Handler restituisce la versione dell'evento prodotto
// Il client la usa per il version check o per il polling
@PostMapping("/ordini")
public ResponseEntity<CommandResponse> effettuaOrdine(@RequestBody EffettuaOrdineCommand cmd) {
OrdineAggregate aggregate = ordineRepository.load(cmd.getOrdineId());
aggregate.effettuaOrdine(cmd);
ordineRepository.save(aggregate);
return ResponseEntity.ok(CommandResponse.builder()
.aggregateId(cmd.getOrdineId())
.version(aggregate.getVersion()) // la versione dopo il salvataggio
.message("Ordine effettuato con successo")
.build());
}
Strategy 3: Synchronous Projection for Critical Data
// Per alcune proiezioni critiche, aggiorna il read model nella stessa transazione
// del salvataggio degli eventi (sacrificando un po' di disaccoppiamento)
// Usato quando il client deve assolutamente vedere i dati immediatamente
@Transactional
public void saveWithSyncProjection(OrdineAggregate aggregate) {
// 1. Salva eventi nell'Event Store
List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
eventStore.appendEvents(aggregate.getOrdineId(), uncommitted, ...);
// 2. Aggiorna il read model nella STESSA transazione (sincrono)
uncommitted.forEach(event -> {
if (event instanceof OrdineEffettuatoEvent e) {
ordiniReadRepo.save(OrdineReadModel.from(e));
}
});
// 3. Pubblica gli eventi sul bus per le projection asincrone degli altri servizi
eventBus.publish(uncommitted);
aggregate.clearUncommittedEvents();
}
Projection rebuild
One of the great advantages of Event Sourcing is the ability to rebuild any projection from scratch by re-reading all events from the beginning of the Event Store. This allows you to add new read models retroactively or fix bugs in existing projections.
// ProjectionRebuilder.java - Ricostruisce una projection da zero
public class ProjectionRebuilder {
private final EventStore eventStore;
public void rebuild(ProjectionHandler handler, String fromAggregateType) {
System.out.println("Avvio rebuild projection: " + handler.getClass().getSimpleName());
// 1. Svuota il read model corrente
handler.reset();
// 2. Legge tutti gli eventi in ordine cronologico
// (implementazione dipende dall'event store specifico)
AtomicLong processed = new AtomicLong(0);
eventStore.streamAllEvents(fromAggregateType, event -> {
handler.handle(event);
long count = processed.incrementAndGet();
if (count % 1000 == 0) {
System.out.println("Processati " + count + " eventi...");
}
});
System.out.println("Rebuild completato: " + processed.get() + " eventi processati");
}
}
Best Practices and Anti-Patterns
Anti-Pattern: Changing Past Events
Events in the Event Store are immutable. Never edit them after writing.
If an event schema needs to change, create a new version of the event
(OrdineEffettuatoV2Event) and handle upcasting in the deserializer.
Changing past events compromises the integrity of the audit trail and breaks replay.
- One aggregate per transaction: Do not edit multiple aggregates in the same Unit of Work. Use Saga to orchestrate cross-aggregate operations.
- Events as a past fact: Event names describe something that has already happened (
OrdineEffettuatoEvent, NotEffettuaOrdineEvent). - Idempotent projections: Each projection handler must be safe to invoke multiple times with the same event (at-least-once delivery).
- Don't use events like Command: Publishing an event must not directly launch a Command on another aggregate. Use Saga or Process Manager.
- Snapshot policy based on the number of events: Create snapshots every 50-100 events. Too frequent = serialization overhead; too rare = slow replay.
Next Steps in the Series
- Article 5 – Saga Pattern: when an operation involves multiple aggregates or multiple services, the Saga Pattern handles distributed transactions without 2PC, with compensating transactions in case of failure.
- Article 6 – AWS EventBridge: how to publish events from your Event Store on EventBridge to reach Lambda, SQS and other targets in a serverless way.
Link with Other Series
- Apache Kafka (Series 38): Kafka is the ideal Event Store and message bus for Event Sourcing in production. The articles on Kafka Streams and Kafka Connect integrate directly with the patterns described here.
- PostgreSQL AI and pgvector: the read model of a projection can be a PostgreSQL database with specialized indexes, including vector columns for similarity semantics on textual descriptions of orders.







