Outbox Pattern: Atomic Event Publishing with CDC
You have a service that needs to do two things atomically: Save state to the database and publish an event to Kafka or SQS. The problem — known as dual write — seems trivial but hides a fundamental trap: there is no direct way to execute a transaction involving both a relational database and a message brokers. If you save to the DB and then publish to Kafka fails, the state is inconsistent. If you publish first and then save to DB fails, you are publishing events that they describe a state that does not exist.
L'Outbox Pattern solves this problem elegantly: instead of write directly to the message broker, write the event to an outbox table inside of the same database transaction that saves the state. A separate process — the relay — reads from the outbox table and publishes to the broker. Atomicity is guaranteed by the database, not by a distributed coordinator.
What You Will Learn
- The dual write problem and why it can't be solved with naive retry
- Outbox Pattern: architecture and main components
- Implementation of the outbox schema in PostgreSQL
- Polling Publisher: the easiest relay to get started
- Debezium CDC: Change Data Capture for zero-latency relay
- Setup Debezium with Kafka Connect on Docker
- Trade-offs and alternatives: when to use Transaction Log Tailing vs Polling
The Dual Write Problem
Let's consider an order service that must: 1) save the order in the DB, 2) publish
an event OrderPlaced about Kafka. Any naive approach has a window
of inconsistency:
// Approccio 1: DB prima, poi Kafka
// Problema: se Kafka e giu, l'evento non viene mai pubblicato
async function placeOrderV1(dto: PlaceOrderDto): Promise<Order> {
const order = await db.query(
'INSERT INTO orders (id, customer_id, total) VALUES ($1, $2, $3) RETURNING *',
[dto.id, dto.customerId, dto.total]
);
// Se questa linea fallisce: l'ordine e nel DB ma nessuno lo sa
await kafka.produce('order-events', { type: 'OrderPlaced', orderId: dto.id });
return order.rows[0];
}
// Approccio 2: Kafka prima, poi DB
// Problema: se il DB fallisce, pubblichi un evento per un ordine che non esiste
async function placeOrderV2(dto: PlaceOrderDto): Promise<Order> {
await kafka.produce('order-events', { type: 'OrderPlaced', orderId: dto.id });
// Se questa linea fallisce: l'evento e pubblicato ma l'ordine non esiste nel DB
const order = await db.query(
'INSERT INTO orders (id, customer_id, total) VALUES ($1, $2, $3) RETURNING *',
[dto.id, dto.customerId, dto.total]
);
return order.rows[0];
}
// Non esiste un "giusto ordine": il problema e strutturale.
// I due sistemi (DB + broker) non partecipano alla stessa transazione ACID.
Architecture of the Outbox Pattern
The pattern introduces a table outbox_events in the same database
of the service. The application writes to the outbox table in the same transaction
which saves the domain data. A separate relay reads from the outbox table and publishes
on the broker, then marks the events as published.
-- Schema outbox: nella stessa istanza PostgreSQL del servizio
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL, -- es. 'Order'
aggregate_id VARCHAR(255) NOT NULL, -- es. ID ordine
event_type VARCHAR(100) NOT NULL, -- es. 'OrderPlaced'
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ, -- NULL = non ancora pubblicato
retry_count INTEGER NOT NULL DEFAULT 0,
last_error TEXT
);
-- Index per il relay che legge gli eventi non pubblicati
CREATE INDEX idx_outbox_unpublished
ON outbox_events(created_at)
WHERE published_at IS NULL AND retry_count < 5;
-- Index per la pulizia dei vecchi record
CREATE INDEX idx_outbox_cleanup
ON outbox_events(published_at)
WHERE published_at IS NOT NULL;
// TypeScript: scrivi nella outbox nella stessa transazione
class OrderRepository {
constructor(private readonly db: Pool) {}
async saveWithEvents(
order: Order,
events: DomainEvent[]
): Promise<void> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// 1. Salva lo stato del dominio
await client.query(`
INSERT INTO orders (id, customer_id, total_amount, status)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id) DO UPDATE SET
total_amount = EXCLUDED.total_amount,
status = EXCLUDED.status
`, [order.id, order.customerId, order.totalAmount, order.status]);
// 2. Salva gli eventi nella tabella outbox
// STESSA TRANSAZIONE: atomico con il salvataggio dell'ordine
for (const event of events) {
await client.query(`
INSERT INTO outbox_events
(id, aggregate_type, aggregate_id, event_type, payload)
VALUES ($1, $2, $3, $4, $5)
`, [
event.eventId,
event.aggregateType,
event.aggregateId,
event.eventType,
JSON.stringify(event.payload),
]);
}
await client.query('COMMIT');
// A questo punto: o sia l'ordine che gli eventi sono nel DB,
// o nessuno dei due. Zero possibilita di stato inconsistente.
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
// Domain service
class OrderService {
constructor(
private readonly repo: OrderRepository,
private readonly eventBus: EventBus
) {}
async placeOrder(dto: PlaceOrderDto): Promise<Order> {
const order = Order.create(dto); // genera anche gli eventi nel dominio
const events = order.getUncommittedEvents();
// Singola operazione atomica: ordine + eventi outbox
await this.repo.saveWithEvents(order, events);
order.clearEvents();
return order;
}
}
Polling Publisher: The Simple Relay
Il Polling Publisher and a separate process (or a cron job) that periodically query the outbox table to find unpublished events and there send to broker. Simple to implement, it introduces polling latency (typically 100ms-5s).
// Polling Publisher: relay che legge outbox e pubblica su Kafka
class OutboxPollingPublisher {
private isRunning = false;
private readonly POLL_INTERVAL_MS = 500;
private readonly BATCH_SIZE = 100;
constructor(
private readonly db: Pool,
private readonly kafka: KafkaProducer
) {}
async start(): Promise<void> {
this.isRunning = true;
console.log('OutboxPollingPublisher started');
while (this.isRunning) {
try {
await this.publishPendingEvents();
} catch (error) {
console.error('OutboxPublisher error:', error);
// Continua dopo errore: non vogliamo che il publisher si fermi
}
await this.sleep(this.POLL_INTERVAL_MS);
}
}
private async publishPendingEvents(): Promise<void> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// SELECT FOR UPDATE SKIP LOCKED: evita conflitti con publisher multipli
const { rows } = await client.query(`
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE published_at IS NULL
AND retry_count < 5
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED
`, [this.BATCH_SIZE]);
if (rows.length === 0) {
await client.query('ROLLBACK');
return;
}
// Pubblica su Kafka in batch
const messages = rows.map((row) => ({
key: row.aggregate_id,
value: JSON.stringify({
eventId: row.id,
aggregateType: row.aggregate_type,
aggregateId: row.aggregate_id,
eventType: row.event_type,
payload: row.payload,
}),
headers: {
'event-type': row.event_type,
'aggregate-type': row.aggregate_type,
},
}));
await this.kafka.sendBatch({
topicMessages: [{
topic: `${rows[0].aggregate_type.toLowerCase()}-events`,
messages,
}],
});
// Marca come pubblicati (nella stessa transazione)
const ids = rows.map((r) => r.id);
await client.query(`
UPDATE outbox_events
SET published_at = NOW()
WHERE id = ANY($1)
`, [ids]);
await client.query('COMMIT');
console.log(`Published ${rows.length} events from outbox`);
} catch (error) {
await client.query('ROLLBACK');
// Incrementa retry count per gli eventi falliti
const ids = []; // recuperali dal context
await this.db.query(`
UPDATE outbox_events
SET retry_count = retry_count + 1,
last_error = $1
WHERE id = ANY($2)
`, [(error as Error).message, ids]);
throw error;
} finally {
client.release();
}
}
stop(): void {
this.isRunning = false;
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
Debezium CDC: Change Data Capture for Zero-Latency
The Polling Publisher introduces latency (how often to poll?) and load on the database (continuous queries). Debezium — part of the Kafka Connect ecosystem — reads directly the transaction log of the database (WAL in PostgreSQL) and publish changes on Kafka in real time, with latency typically under 100ms.
# docker-compose.yml: setup Debezium con PostgreSQL e Kafka
version: '3.8'
services:
postgres:
image: postgres:16
environment:
POSTGRES_DB: orders_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
command:
# Abilita il WAL logico necessario per Debezium
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=4"
- "-c"
- "max_wal_senders=4"
ports:
- "5432:5432"
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
kafka-connect:
image: debezium/connect:2.6
depends_on: [kafka, postgres]
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
# Registra il Debezium PostgreSQL Connector via REST API
# POST http://localhost:8083/connectors
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "orders_db",
"database.server.name": "orders",
"table.include.list": "public.outbox_events",
"plugin.name": "pgoutput",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "outbox.${routedByValue}.events"
}
}
# Questo crea automaticamente i topic:
# outbox.Order.events
# outbox.Payment.events
# outbox.Inventory.events
# basandosi sul campo aggregate_type nella tabella outbox
Cleanup of the Outbox Table
The outbox table grows over time. Implement a cleanup strategy for keep size under control without impacting current operations.
// Job di cleanup: esegui ogni ora con pg_cron o un cron esterno
-- Cancella eventi pubblicati piu vecchi di 7 giorni
-- Usa DELETE con LIMIT per evitare lock estesi
DO $
DECLARE
deleted_count INTEGER;
BEGIN
LOOP
DELETE FROM outbox_events
WHERE id IN (
SELECT id FROM outbox_events
WHERE published_at IS NOT NULL
AND published_at < NOW() - INTERVAL '7 days'
LIMIT 1000
);
GET DIAGNOSTICS deleted_count = ROW_COUNT;
EXIT WHEN deleted_count < 1000;
-- Pausa tra batch per non sovraccaricare il DB
PERFORM pg_sleep(0.1);
END LOOP;
END $;
Polling Publisher vs CDC: When to Use Which
- Polling Publishers: simple setup, no additional components, 100ms-5s latency. Ideal for beginners and low to medium volume systems.
- Debezium CDC: sub-100ms latency, zero polling overhead on DB, requires Kafka Connect and WAL configuration. Ideal for high volume production.
- Lightweight alternative: pg_notify + LISTEN for PostgreSQL-only systems that want real-time notifications without Kafka.
Conclusions and Next Steps
The Outbox Pattern solves one of the most insidious problems of event-driven architecture: the atomic publication of events. The key is to leverage ACID transactions database as coordinator instead of a distributed system. With Debezium CDC, the relay becomes almost transparent with minimal latency and zero impact on application queries.







