보낼 편지함 패턴: CDC를 통한 원자 이벤트 게시
두 가지 작업을 원자적으로 수행해야 하는 서비스가 있습니다. 데이터베이스에 상태 저장 Kafka 또는 SQS에 이벤트를 게시합니다. 문제 — 알려진 이중 쓰기 — 사소해 보이지만 근본적인 함정이 숨겨져 있습니다. 직접적인 방법은 없습니다. 관계형 데이터베이스와 메시지를 모두 포함하는 트랜잭션을 실행합니다. 브로커. DB에 저장한 다음 Kafka에 게시하는 데 실패하면 상태가 일관되지 않습니다. 먼저 게시한 다음 DB에 저장하지 못한 경우 다음과 같은 이벤트를 게시하게 됩니다. 존재하지 않는 상태를 설명합니다.
L'보낼편지함 패턴 이 문제를 우아하게 해결합니다. 메시지 브로커에 직접 쓰고 내부의 보낼 편지함 테이블에 이벤트를 씁니다. 상태를 저장하는 동일한 데이터베이스 트랜잭션입니다. 별도의 프로세스 - 계전기 — 보낼 편지함 테이블에서 읽고 브로커에 게시합니다. 원자성은 분산 코디네이터가 아닌 데이터베이스에 의해 보장됩니다.
무엇을 배울 것인가
- 이중 쓰기 문제와 단순한 재시도로 해결할 수 없는 이유
- 보낼 편지함 패턴: 아키텍처 및 주요 구성 요소
- PostgreSQL에서 보낼 편지함 스키마 구현
- 여론 조사 게시자: 시작하기 가장 쉬운 릴레이
- Debezium CDC: 지연 시간 없는 릴레이를 위한 변경 데이터 캡처
- Docker에서 Kafka Connect를 사용하여 Debezium 설정
- 장단점 및 대안: 트랜잭션 로그 테일링과 폴링을 사용하는 경우
이중 쓰기 문제
1) DB에 주문을 저장하고, 2) 게시해야 하는 주문 서비스를 생각해 보겠습니다.
이벤트 OrderPlaced 카프카에 대해서 모든 순진한 접근 방식에는 창이 있습니다.
불일치:
// Approccio 1: DB prima, poi Kafka
// Problema: se Kafka e giu, l'evento non viene mai pubblicato
async function placeOrderV1(dto: PlaceOrderDto): Promise<Order> {
const order = await db.query(
'INSERT INTO orders (id, customer_id, total) VALUES ($1, $2, $3) RETURNING *',
[dto.id, dto.customerId, dto.total]
);
// Se questa linea fallisce: l'ordine e nel DB ma nessuno lo sa
await kafka.produce('order-events', { type: 'OrderPlaced', orderId: dto.id });
return order.rows[0];
}
// Approccio 2: Kafka prima, poi DB
// Problema: se il DB fallisce, pubblichi un evento per un ordine che non esiste
async function placeOrderV2(dto: PlaceOrderDto): Promise<Order> {
await kafka.produce('order-events', { type: 'OrderPlaced', orderId: dto.id });
// Se questa linea fallisce: l'evento e pubblicato ma l'ordine non esiste nel DB
const order = await db.query(
'INSERT INTO orders (id, customer_id, total) VALUES ($1, $2, $3) RETURNING *',
[dto.id, dto.customerId, dto.total]
);
return order.rows[0];
}
// Non esiste un "giusto ordine": il problema e strutturale.
// I due sistemi (DB + broker) non partecipano alla stessa transazione ACID.
보낼 편지함 패턴의 아키텍처
패턴은 테이블을 소개합니다 outbox_events 동일한 데이터베이스에서
서비스의. 애플리케이션은 동일한 트랜잭션의 보낼 편지함 테이블에 씁니다.
도메인 데이터를 저장합니다. 별도의 릴레이가 보낼 편지함 테이블에서 읽고 게시합니다.
브로커에서 이벤트를 게시된 것으로 표시합니다.
-- Schema outbox: nella stessa istanza PostgreSQL del servizio
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL, -- es. 'Order'
aggregate_id VARCHAR(255) NOT NULL, -- es. ID ordine
event_type VARCHAR(100) NOT NULL, -- es. 'OrderPlaced'
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ, -- NULL = non ancora pubblicato
retry_count INTEGER NOT NULL DEFAULT 0,
last_error TEXT
);
-- Index per il relay che legge gli eventi non pubblicati
CREATE INDEX idx_outbox_unpublished
ON outbox_events(created_at)
WHERE published_at IS NULL AND retry_count < 5;
-- Index per la pulizia dei vecchi record
CREATE INDEX idx_outbox_cleanup
ON outbox_events(published_at)
WHERE published_at IS NOT NULL;
// TypeScript: scrivi nella outbox nella stessa transazione
class OrderRepository {
constructor(private readonly db: Pool) {}
async saveWithEvents(
order: Order,
events: DomainEvent[]
): Promise<void> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// 1. Salva lo stato del dominio
await client.query(`
INSERT INTO orders (id, customer_id, total_amount, status)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id) DO UPDATE SET
total_amount = EXCLUDED.total_amount,
status = EXCLUDED.status
`, [order.id, order.customerId, order.totalAmount, order.status]);
// 2. Salva gli eventi nella tabella outbox
// STESSA TRANSAZIONE: atomico con il salvataggio dell'ordine
for (const event of events) {
await client.query(`
INSERT INTO outbox_events
(id, aggregate_type, aggregate_id, event_type, payload)
VALUES ($1, $2, $3, $4, $5)
`, [
event.eventId,
event.aggregateType,
event.aggregateId,
event.eventType,
JSON.stringify(event.payload),
]);
}
await client.query('COMMIT');
// A questo punto: o sia l'ordine che gli eventi sono nel DB,
// o nessuno dei due. Zero possibilita di stato inconsistente.
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
// Domain service
class OrderService {
constructor(
private readonly repo: OrderRepository,
private readonly eventBus: EventBus
) {}
async placeOrder(dto: PlaceOrderDto): Promise<Order> {
const order = Order.create(dto); // genera anche gli eventi nel dominio
const events = order.getUncommittedEvents();
// Singola operazione atomica: ordine + eventi outbox
await this.repo.saveWithEvents(order, events);
order.clearEvents();
return order;
}
}
여론조사 게시자: The Simple Relay
Il 여론조사 게시자 별도의 프로세스(또는 크론 작업) 정기적으로 보낼 편지함 테이블을 쿼리하여 게시되지 않은 이벤트를 찾습니다. 브로커에게 보냅니다. 구현이 간단하고 폴링 대기 시간이 발생합니다. (일반적으로 100ms-5s).
// Polling Publisher: relay che legge outbox e pubblica su Kafka
class OutboxPollingPublisher {
private isRunning = false;
private readonly POLL_INTERVAL_MS = 500;
private readonly BATCH_SIZE = 100;
constructor(
private readonly db: Pool,
private readonly kafka: KafkaProducer
) {}
async start(): Promise<void> {
this.isRunning = true;
console.log('OutboxPollingPublisher started');
while (this.isRunning) {
try {
await this.publishPendingEvents();
} catch (error) {
console.error('OutboxPublisher error:', error);
// Continua dopo errore: non vogliamo che il publisher si fermi
}
await this.sleep(this.POLL_INTERVAL_MS);
}
}
private async publishPendingEvents(): Promise<void> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// SELECT FOR UPDATE SKIP LOCKED: evita conflitti con publisher multipli
const { rows } = await client.query(`
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE published_at IS NULL
AND retry_count < 5
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED
`, [this.BATCH_SIZE]);
if (rows.length === 0) {
await client.query('ROLLBACK');
return;
}
// Pubblica su Kafka in batch
const messages = rows.map((row) => ({
key: row.aggregate_id,
value: JSON.stringify({
eventId: row.id,
aggregateType: row.aggregate_type,
aggregateId: row.aggregate_id,
eventType: row.event_type,
payload: row.payload,
}),
headers: {
'event-type': row.event_type,
'aggregate-type': row.aggregate_type,
},
}));
await this.kafka.sendBatch({
topicMessages: [{
topic: `${rows[0].aggregate_type.toLowerCase()}-events`,
messages,
}],
});
// Marca come pubblicati (nella stessa transazione)
const ids = rows.map((r) => r.id);
await client.query(`
UPDATE outbox_events
SET published_at = NOW()
WHERE id = ANY($1)
`, [ids]);
await client.query('COMMIT');
console.log(`Published ${rows.length} events from outbox`);
} catch (error) {
await client.query('ROLLBACK');
// Incrementa retry count per gli eventi falliti
const ids = []; // recuperali dal context
await this.db.query(`
UPDATE outbox_events
SET retry_count = retry_count + 1,
last_error = $1
WHERE id = ANY($2)
`, [(error as Error).message, ids]);
throw error;
} finally {
client.release();
}
}
stop(): void {
this.isRunning = false;
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
Debezium CDC: 제로 레이턴시를 위한 변경 데이터 캡처
Polling 게시자는 대기 시간(폴링 빈도)을 도입하고 데이터베이스에 로드합니다. (지속적인 쿼리). 데베지움 — Kafka Connect 생태계의 일부 — 읽기 직접적으로 거래 로그 데이터베이스(PostgreSQL의 WAL) 및 게시 Kafka에서 실시간으로 변경되며 일반적으로 대기 시간은 100ms 미만입니다.
# docker-compose.yml: setup Debezium con PostgreSQL e Kafka
version: '3.8'
services:
postgres:
image: postgres:16
environment:
POSTGRES_DB: orders_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
command:
# Abilita il WAL logico necessario per Debezium
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=4"
- "-c"
- "max_wal_senders=4"
ports:
- "5432:5432"
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
kafka-connect:
image: debezium/connect:2.6
depends_on: [kafka, postgres]
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
# Registra il Debezium PostgreSQL Connector via REST API
# POST http://localhost:8083/connectors
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "orders_db",
"database.server.name": "orders",
"table.include.list": "public.outbox_events",
"plugin.name": "pgoutput",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "outbox.${routedByValue}.events"
}
}
# Questo crea automaticamente i topic:
# outbox.Order.events
# outbox.Payment.events
# outbox.Inventory.events
# basandosi sul campo aggregate_type nella tabella outbox
보낼 편지함 테이블 정리
보낼 편지함 테이블은 시간이 지남에 따라 증가합니다. 다음에 대한 정리 전략을 구현합니다. 현재 운영에 영향을 주지 않고 크기를 통제할 수 있습니다.
// Job di cleanup: esegui ogni ora con pg_cron o un cron esterno
-- Cancella eventi pubblicati piu vecchi di 7 giorni
-- Usa DELETE con LIMIT per evitare lock estesi
DO $
DECLARE
deleted_count INTEGER;
BEGIN
LOOP
DELETE FROM outbox_events
WHERE id IN (
SELECT id FROM outbox_events
WHERE published_at IS NOT NULL
AND published_at < NOW() - INTERVAL '7 days'
LIMIT 1000
);
GET DIAGNOSTICS deleted_count = ROW_COUNT;
EXIT WHEN deleted_count < 1000;
-- Pausa tra batch per non sovraccaricare il DB
PERFORM pg_sleep(0.1);
END LOOP;
END $;
여론조사 게시자 vs CDC: 언제 어느 것을 사용해야 할까요?
- 여론조사 게시자: 간단한 설정, 추가 구성 요소 없음, 100ms-5s 대기 시간. 초보자와 중저용량 시스템에 이상적입니다.
- 데베지움 CDC: 100ms 미만의 대기 시간, DB 폴링 오버헤드가 없는 경우 Kafka Connect 및 WAL 구성이 필요합니다. 대량 생산에 이상적입니다.
- 경량 대안: Kafka 없이 실시간 알림을 원하는 PostgreSQL 전용 시스템의 경우 pg_notify + LISTEN입니다.
결론 및 다음 단계
Outbox 패턴은 이벤트 중심 아키텍처의 가장 교묘한 문제 중 하나를 해결합니다. 이벤트의 원자 출판. 핵심은 ACID 트랜잭션을 활용하는 것입니다. 분산 시스템 대신 코디네이터로 데이터베이스를 사용합니다. Debezium CDC에서는 릴레이는 대기 시간을 최소화하고 애플리케이션 쿼리에 영향을 주지 않으면서 거의 투명해집니다.







