Event Sourcing + CQRS împreună: proiecție, instantaneu și consistență
Event Sourcing singur gestionează partea de scriere într-un mod imuabil și auditabil. Numai CQRS optimizează citit și scris separat. Folosit Împreună, ei rezolvă una dintre cele mai grele probleme a arhitecturii software: cum să aveți un sistem complet auditabil, scalabil pe citiri și performant la interogări. Dar această putere are un cost: proiecții de menținut, instantanee pentru reluare eficient și gestionarea oricărei coerențe între modelele de scriere și citire.
Rezumat: Aprovizionarea evenimentelor și CQRS separate
Înainte de a le combina, să recapitulăm cele două modele izolat:
- Aprovizionare pentru evenimente: În loc să salvați starea curentă a agregatului în baza de date, salvezi succesiune de evenimente cine l-a creat. Statul este reconstruit reproducerea (reluând) evenimentele de la început. Avantaj: pistă de audit completă, călătorii în timp, perspective de afaceri derivate din evenimente.
- CQRS: Partea de comandă (model de scriere) și partea de interogare (model de citire) sunt separate. Modelul de scriere primește comanda și produce evenimente. Modelul citit este optimizat pentru interogări specifice aplicației, construite din evenimente.
Combinația este firească: Event Sourcing produce evenimente, CQRS le consumă pentru a le construi modele de citire optimizate (le proiecții).
Când să folosiți Event Sourcing + CQRS împreună
- Sisteme cu pistă de audit obligatorie (fintech, healthcare, e-commerce)
- Atunci când modelele citite au structuri foarte diferite față de modelul de scriere
- Când citirile trebuie să fie scalate de 10-100x în comparație cu scrierile
- Sisteme cu interogări de călătorie în timp ("care era starea pe 15 martie?")
- Nu utilizați pentru aplicații CRUD simple: exagerare semnificativă
Arhitectură completă: flux end-to-end
Fluxul unei cereri într-un sistem Event Sourcing + CQRS este următorul:
- Clientul trimite un Comanda (de ex.
EffettuaOrdineCommand) - Il Manager de comandă încărcați agregatul din magazinul de evenimente (reluare eveniment)
- Agregatul validează comanda și produce una sau mai multe Eveniment de domeniu (de ex.
OrdineEffettuatoEvent) - Evenimentele sunt persistente înMagazin de evenimente (doar pentru adăugare)
- Evenimentele sunt publicate pe Autobuz de mesaje (Kafka, EventBridge etc.)
- Una sau mai multe Manipulator de proiecție consumă evenimente și actualizează Citiți modelul
- Interogările clientului citite din modelul de citire (de obicei, o bază de date optimizată pentru citire)
Implementare: Agregatul cu aprovizionare cu evenimente
// OrdineAggregate.java - Aggregate con Event Sourcing
// Segue il pattern "eventi come source of truth"
import java.util.*;
import java.time.Instant;
public class OrdineAggregate {
// ID univoco dell'aggregate
private String ordineId;
// Versione corrente (numero di eventi applicati)
private long version = -1;
// Stato ricostruito dagli eventi
private StatoOrdine stato;
private String clienteId;
private List<LineaOrdine> linee;
private BigDecimal totale;
// Lista degli eventi prodotti dalla sessione corrente (non ancora persistiti)
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// Costruttore privato: si crea sempre tramite replay o factory method
private OrdineAggregate() {}
// ===== FACTORY METHOD: ricostruisce lo stato dal replay degli eventi =====
public static OrdineAggregate reconstituteFrom(List<DomainEvent> events) {
OrdineAggregate aggregate = new OrdineAggregate();
events.forEach(aggregate::apply);
return aggregate;
}
// ===== COMMAND HANDLING: valida il comando e produce eventi =====
public void effettuaOrdine(EffettuaOrdineCommand cmd) {
// Validazione business rules
if (this.stato != null) {
throw new IllegalStateException("Ordine gia esistente: " + ordineId);
}
if (cmd.getLinee() == null || cmd.getLinee().isEmpty()) {
throw new IllegalArgumentException("L'ordine deve avere almeno una linea");
}
// Produce l'evento (nessuna modifica di stato diretta!)
OrdineEffettuatoEvent event = new OrdineEffettuatoEvent(
cmd.getOrdineId(),
cmd.getClienteId(),
cmd.getLinee(),
Instant.now()
);
// Applica l'evento localmente e aggiungilo agli uncommitted
applyAndRecord(event);
}
public void confermaPagamento(String metodoPagamento, String transactionId) {
if (this.stato != StatoOrdine.IN_ATTESA_PAGAMENTO) {
throw new IllegalStateException("Ordine non in attesa di pagamento");
}
PagamentoConfermatoEvent event = new PagamentoConfermatoEvent(
ordineId, metodoPagamento, transactionId, Instant.now()
);
applyAndRecord(event);
}
// ===== APPLY: modifica lo stato interno applicando un evento =====
// Questi metodi devono essere DETERMINISTICI e SENZA SIDE EFFECTS
private void apply(OrdineEffettuatoEvent event) {
this.ordineId = event.getOrdineId();
this.clienteId = event.getClienteId();
this.linee = new ArrayList<>(event.getLinee());
this.totale = event.getTotale();
this.stato = StatoOrdine.IN_ATTESA_PAGAMENTO;
this.version++;
}
private void apply(PagamentoConfermatoEvent event) {
this.stato = StatoOrdine.PAGATO;
this.version++;
}
private void apply(DomainEvent event) {
// Dispatch dinamico per tipo di evento
if (event instanceof OrdineEffettuatoEvent e) apply(e);
else if (event instanceof PagamentoConfermatoEvent e) apply(e);
else throw new IllegalArgumentException("Evento sconosciuto: " + event.getClass());
}
private void applyAndRecord(DomainEvent event) {
apply(event);
uncommittedEvents.add(event);
}
public List<DomainEvent> getUncommittedEvents() {
return Collections.unmodifiableList(uncommittedEvents);
}
public void clearUncommittedEvents() {
uncommittedEvents.clear();
}
public long getVersion() { return version; }
public String getOrdineId() { return ordineId; }
}
Magazin de evenimente: Persistența evenimentelor
L'Magazin de evenimente este baza de date care persistă succesiunea evenimentelor pentru fiecare agregat. Cerința fundamentală este ca scripturile să fie numai anexă si sprijin cel verificare optimistă a concurenței: specificați versiunea așteptată a agregatului iar scrierea eșuează dacă între timp altcineva a scris un eveniment (conflict).
// EventStore.java - Interfaccia e implementazione PostgreSQL
import java.util.*;
public interface EventStore {
// Carica tutti gli eventi per un aggregate (per il replay)
List<DomainEvent> loadEvents(String aggregateId);
// Persiste nuovi eventi con optimistic concurrency check
// expectedVersion: la versione che ci aspettavamo (fallisce se diversa)
void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion);
}
// Implementazione con PostgreSQL
public class PostgresEventStore implements EventStore {
private final DataSource dataSource;
private final EventSerializer serializer;
// DDL della tabella event_store
// CREATE TABLE event_store (
// id BIGSERIAL PRIMARY KEY,
// aggregate_id VARCHAR(36) NOT NULL,
// version BIGINT NOT NULL,
// event_type VARCHAR(255) NOT NULL,
// event_data JSONB NOT NULL,
// occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// UNIQUE(aggregate_id, version) -- optimistic locking
// );
@Override
public List<DomainEvent> loadEvents(String aggregateId) {
String sql = "SELECT event_type, event_data, version " +
"FROM event_store WHERE aggregate_id = ? ORDER BY version ASC";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, aggregateId);
ResultSet rs = ps.executeQuery();
List<DomainEvent> events = new ArrayList<>();
while (rs.next()) {
events.add(serializer.deserialize(
rs.getString("event_type"),
rs.getString("event_data")
));
}
return events;
} catch (SQLException e) {
throw new EventStoreException("Errore caricamento eventi per " + aggregateId, e);
}
}
@Override
public void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion) {
String sql = "INSERT INTO event_store (aggregate_id, version, event_type, event_data) " +
"VALUES (?, ?, ?, ?::jsonb)";
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
// Optimistic concurrency check
long currentVersion = getCurrentVersion(conn, aggregateId);
if (currentVersion != expectedVersion) {
throw new OptimisticConcurrencyException(
"Conflitto per aggregate " + aggregateId +
": atteso version " + expectedVersion + ", trovato " + currentVersion
);
}
try (PreparedStatement ps = conn.prepareStatement(sql)) {
long version = expectedVersion + 1;
for (DomainEvent event : events) {
ps.setString(1, aggregateId);
ps.setLong(2, version++);
ps.setString(3, event.getClass().getSimpleName());
ps.setString(4, serializer.serialize(event));
ps.addBatch();
}
ps.executeBatch();
}
conn.commit();
} catch (SQLException e) {
throw new EventStoreException("Errore scrittura eventi", e);
}
}
}
Proiecție: construirea modelului citit din evenimente
Una proiecție este un consumator de evenimente care își construiește o viziune (modelul citit) optimizat pentru interogări. Fiecare proiecție este idempotentă și poate să fie reconstruit de la zero prin recitirea tuturor evenimentelor de la începutul Magazinului de evenimente.
Proiecțiile sunt actualizate asincron după fiecare eveniment: aceasta este sursa de eventuala consistenta. Clientul care tocmai a executat o comandă ar putea nu vedeți imediat rezultate pe modelul de citire.
// OrdiniProjectionHandler.java - Costruisce il read model degli ordini
// Ascolta gli eventi e aggiorna la tabella di lettura ottimizzata
@Component
public class OrdiniProjectionHandler {
private final OrdiniReadRepository readRepo;
// Gestisce OrdineEffettuatoEvent
@EventHandler
public void on(OrdineEffettuatoEvent event) {
OrdineReadModel readModel = OrdineReadModel.builder()
.ordineId(event.getOrdineId())
.clienteId(event.getClienteId())
.stato("IN_ATTESA_PAGAMENTO")
.totale(event.getTotale())
.linee(event.getLinee())
.dataCreazione(event.getOccurredAt())
.build();
readRepo.save(readModel);
}
// Gestisce PagamentoConfermatoEvent
@EventHandler
public void on(PagamentoConfermatoEvent event) {
readRepo.updateStato(event.getOrdineId(), "PAGATO");
readRepo.updateDataPagamento(event.getOrdineId(), event.getOccurredAt());
}
// Query ottimizzate sul read model
public List<OrdineReadModel> getOrdiniCliente(String clienteId) {
return readRepo.findByClienteId(clienteId);
}
public List<OrdineReadModel> getOrdiniInAttesa() {
return readRepo.findByStato("IN_ATTESA_PAGAMENTO");
}
}
// Schema del read model (tabella ottimizzata per le query piu comuni)
// CREATE TABLE ordini_read (
// ordine_id VARCHAR(36) PRIMARY KEY,
// cliente_id VARCHAR(36) NOT NULL,
// stato VARCHAR(50) NOT NULL,
// totale DECIMAL(10,2),
// data_creazione TIMESTAMPTZ,
// data_pagamento TIMESTAMPTZ,
// linee JSONB,
// -- Indici ottimizzati per le query frequenti
// CONSTRAINT idx_cliente_id USING btree(cliente_id),
// CONSTRAINT idx_stato USING btree(stato)
// );
Instantaneu: optimizarea redării agregatelor cu multe evenimente
Pe măsură ce un agregat acumulează evenimente în timp, reluare completă devine lent: o comandă cu 100 de actualizări de stare necesită 100 de interogări și 100 se aplică înainte de a fi gata pentru a gestiona o nouă comandă. The Instantanee rezolvă această problemă: periodic starea curentă a agregatului este salvată (instantaneu), iar la următoarea reîncărcare plecăm de la instantaneu, nu de la începutul poveștii.
// SnapshotStore.java - Gestione degli snapshot degli aggregati
public interface SnapshotStore {
Optional<AggregateSnapshot> loadLatestSnapshot(String aggregateId);
void saveSnapshot(String aggregateId, Object aggregateState, long version);
}
// AggregateRepository con supporto snapshot
public class SnapshottingAggregateRepository {
private static final int SNAPSHOT_THRESHOLD = 50; // snapshot ogni 50 eventi
private final EventStore eventStore;
private final SnapshotStore snapshotStore;
public OrdineAggregate load(String ordineId) {
// 1. Prova a caricare lo snapshot piu recente
Optional<AggregateSnapshot> snapshot = snapshotStore.loadLatestSnapshot(ordineId);
OrdineAggregate aggregate;
long fromVersion;
if (snapshot.isPresent()) {
// Parte dallo snapshot invece che dall'evento 0
aggregate = (OrdineAggregate) snapshot.get().getState();
fromVersion = snapshot.get().getVersion() + 1;
} else {
aggregate = new OrdineAggregate();
fromVersion = 0;
}
// 2. Carica solo gli eventi DOPO lo snapshot
List<DomainEvent> recentEvents = eventStore.loadEventsFrom(ordineId, fromVersion);
recentEvents.forEach(aggregate::applyEvent);
return aggregate;
}
public void save(OrdineAggregate aggregate) {
List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
eventStore.appendEvents(
aggregate.getOrdineId(),
uncommitted,
aggregate.getVersion() - uncommitted.size()
);
aggregate.clearUncommittedEvents();
// Crea uno snapshot se l'aggregate ha accumulato abbastanza eventi
if (aggregate.getVersion() % SNAPSHOT_THRESHOLD == 0) {
snapshotStore.saveSnapshot(
aggregate.getOrdineId(),
aggregate, // serializza lo stato corrente
aggregate.getVersion()
);
}
}
}
// Tabella snapshot
// CREATE TABLE aggregate_snapshots (
// aggregate_id VARCHAR(36) NOT NULL,
// version BIGINT NOT NULL,
// state JSONB NOT NULL,
// created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// PRIMARY KEY (aggregate_id, version)
// );
Gestionarea coerenței eventuale
Principala provocare a Event Sourcing + CQRS este fereastra de inconsecvență: după ce o comandă este executată cu succes, modelul de citire nu este încă actualizat (Proiecția încă procesează evenimentul). Un client care citește imediat după aceea este posibil să nu vedeți modificările dvs.
Există mai multe strategii pentru a gestiona această situație:
Strategia 1: sondaj cu verificarea versiunii
// Il client aspetta che il read model raggiunga la versione attesa
// Utile per operazioni sequenziali: "aspetta che l'ordine sia visibile prima di procedere"
async function waitForProjection(ordineId, expectedVersion, maxWaitMs = 5000) {
const startTime = Date.now();
while (Date.now() - startTime < maxWaitMs) {
const ordine = await readModel.getOrdine(ordineId);
if (ordine && ordine.version >= expectedVersion) {
return ordine; // Projection aggiornata
}
await sleep(100); // Aspetta 100ms e riprova
}
throw new ProjectionTimeoutError(
`Timeout: read model non aggiornato dopo ${maxWaitMs}ms per ordine ${ordineId}`
);
}
Strategia 2: returnați versiunea în răspunsul la comandă
// Command Handler restituisce la versione dell'evento prodotto
// Il client la usa per il version check o per il polling
@PostMapping("/ordini")
public ResponseEntity<CommandResponse> effettuaOrdine(@RequestBody EffettuaOrdineCommand cmd) {
OrdineAggregate aggregate = ordineRepository.load(cmd.getOrdineId());
aggregate.effettuaOrdine(cmd);
ordineRepository.save(aggregate);
return ResponseEntity.ok(CommandResponse.builder()
.aggregateId(cmd.getOrdineId())
.version(aggregate.getVersion()) // la versione dopo il salvataggio
.message("Ordine effettuato con successo")
.build());
}
Strategia 3: Proiecție sincronă pentru date critice
// Per alcune proiezioni critiche, aggiorna il read model nella stessa transazione
// del salvataggio degli eventi (sacrificando un po' di disaccoppiamento)
// Usato quando il client deve assolutamente vedere i dati immediatamente
@Transactional
public void saveWithSyncProjection(OrdineAggregate aggregate) {
// 1. Salva eventi nell'Event Store
List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
eventStore.appendEvents(aggregate.getOrdineId(), uncommitted, ...);
// 2. Aggiorna il read model nella STESSA transazione (sincrono)
uncommitted.forEach(event -> {
if (event instanceof OrdineEffettuatoEvent e) {
ordiniReadRepo.save(OrdineReadModel.from(e));
}
});
// 3. Pubblica gli eventi sul bus per le projection asincrone degli altri servizi
eventBus.publish(uncommitted);
aggregate.clearUncommittedEvents();
}
Reconstituirea proiecției
Unul dintre marile avantaje ale Event Sourcing este capacitatea de a reconstrui orice proiecție de la zero prin recitirea tuturor evenimentelor de la începutul Magazinului de evenimente. Acest lucru vă permite să adăugați noi modele de citire retroactiv sau să remediați erori în proiecţiile existente.
// ProjectionRebuilder.java - Ricostruisce una projection da zero
public class ProjectionRebuilder {
private final EventStore eventStore;
public void rebuild(ProjectionHandler handler, String fromAggregateType) {
System.out.println("Avvio rebuild projection: " + handler.getClass().getSimpleName());
// 1. Svuota il read model corrente
handler.reset();
// 2. Legge tutti gli eventi in ordine cronologico
// (implementazione dipende dall'event store specifico)
AtomicLong processed = new AtomicLong(0);
eventStore.streamAllEvents(fromAggregateType, event -> {
handler.handle(event);
long count = processed.incrementAndGet();
if (count % 1000 == 0) {
System.out.println("Processati " + count + " eventi...");
}
});
System.out.println("Rebuild completato: " + processed.get() + " eventi processati");
}
}
Cele mai bune practici și anti-modele
Anti-Pattern: Schimbarea evenimentelor trecute
Evenimentele din Magazinul de evenimente sunt imuabil. Nu le editați niciodată după ce ați scris.
Dacă o schemă de eveniment trebuie să se modifice, creați o nouă versiune a evenimentului
(OrdineEffettuatoV2Event) și se ocupă de upcasting în deserializator.
Schimbarea evenimentelor din trecut compromite integritatea pistei de audit și întrerupe redarea.
- Un agregat per tranzacție: Nu editați mai multe agregate în aceeași unitate de lucru. Folosiți Saga pentru a orchestra operațiuni încrucișate.
- Evenimentele ca fapt trecut: Numele evenimentelor descriu ceva ce sa întâmplat deja (
OrdineEffettuatoEvent, NuEffettuaOrdineEvent). - Proiectii idempotente: Fiecare handler de proiecție trebuie să poată fi invocat în siguranță de mai multe ori cu același eveniment (cel puțin o dată).
- Nu utilizați evenimente precum Command: Publicarea unui eveniment nu trebuie să lanseze direct o comandă pe un alt agregat. Utilizați Saga sau Process Manager.
- Politica de instantanee bazată pe numărul de evenimente: Creați instantanee la fiecare 50-100 de evenimente. Prea frecvent = suprasarcina de serializare; prea rare = reluare lentă.
Următorii pași din serie
- Articolul 5 – Saga Pattern: când o operațiune implică mai multe agregate sau servicii multiple, Saga Pattern gestionează tranzacțiile distribuite fără 2PC, cu tranzacţii compensatoare în caz de eşec.
- Articolul 6 – AWS EventBridge: cum să publicați evenimente din Magazinul dvs. de evenimente pe EventBridge pentru a ajunge la Lambda, SQS și alte ținte într-un mod fără server.
Legătură cu alte serii
- Apache Kafka (Seria 38): Kafka este Magazinul de evenimente și magistrala de mesaje ideală pentru Event Sourcing în producție. Articolele despre Kafka Streams și Kafka Connect se integrează direct cu modelele descrise aici.
- PostgreSQL AI și pgvector: modelul citit al unei proiecţii poate fi o bază de date PostgreSQL cu indecși specializați, inclusiv coloane vectoriale pentru similaritate semantica descrierilor textuale ale comenzilor.







