送信ボックス パターン: CDC を使用したアトミック イベント発行
2 つのことをアトミックに行う必要があるサービスがあります: 状態をデータベースに保存する イベントを Kafka または SQS に発行します。問題 — として知られている 二重書き込み — 些細なことのように見えますが、根本的な罠が隠されています。直接的な方法はありません。 リレーショナル データベースとメッセージの両方を含むトランザクションを実行する ブローカー。 DB に保存してから Kafka へのパブリッシュが失敗した場合、状態は不整合になります。 最初に公開してから DB への保存に失敗した場合、イベントを公開していることになります。 それらは存在しない状態を記述します。
L'送信ボックスのパターン この問題をエレガントに解決します: 代わりに メッセージブローカーに直接書き込み、イベントを内部の送信ボックステーブルに書き込みます。 状態を保存するのと同じデータベース トランザクションの。別のプロセス - リレー — 送信トレイテーブルから読み取り、ブローカーにパブリッシュします。原子性とは 分散コーディネーターではなく、データベースによって保証されます。
何を学ぶか
- 二重書き込みの問題と単純な再試行では解決できない理由
- 送信ボックスのパターン: アーキテクチャと主要コンポーネント
- PostgreSQL での送信トレイ スキーマの実装
- Polling Publisher: 始めるのが最も簡単なリレー
- Debezium CDC: ゼロレイテンシーリレーのための変更データキャプチャ
- Docker 上で Kafka Connect を使用して Debezium をセットアップする
- トレードオフと代替案: トランザクション ログ テーリングとポーリングをいつ使用するか
二重書き込みの問題
1) 注文を DB に保存し、2) 公開する必要がある注文サービスを考えてみましょう。
イベント OrderPlaced カフカについて。素朴なアプローチにはチャンスがある
矛盾の原因:
// Approccio 1: DB prima, poi Kafka
// Problema: se Kafka e giu, l'evento non viene mai pubblicato
async function placeOrderV1(dto: PlaceOrderDto): Promise<Order> {
const order = await db.query(
'INSERT INTO orders (id, customer_id, total) VALUES ($1, $2, $3) RETURNING *',
[dto.id, dto.customerId, dto.total]
);
// Se questa linea fallisce: l'ordine e nel DB ma nessuno lo sa
await kafka.produce('order-events', { type: 'OrderPlaced', orderId: dto.id });
return order.rows[0];
}
// Approccio 2: Kafka prima, poi DB
// Problema: se il DB fallisce, pubblichi un evento per un ordine che non esiste
async function placeOrderV2(dto: PlaceOrderDto): Promise<Order> {
await kafka.produce('order-events', { type: 'OrderPlaced', orderId: dto.id });
// Se questa linea fallisce: l'evento e pubblicato ma l'ordine non esiste nel DB
const order = await db.query(
'INSERT INTO orders (id, customer_id, total) VALUES ($1, $2, $3) RETURNING *',
[dto.id, dto.customerId, dto.total]
);
return order.rows[0];
}
// Non esiste un "giusto ordine": il problema e strutturale.
// I due sistemi (DB + broker) non partecipano alla stessa transazione ACID.
送信箱パターンのアーキテクチャ
パターンはテーブルを導入します outbox_events 同じデータベース内で
サービスの。アプリケーションは同じトランザクションで送信トレイ テーブルに書き込みます
これにより、ドメイン データが保存されます。別のリレーが送信トレイテーブルから読み取り、パブリッシュします
ブローカー上でイベントを公開済みとしてマークします。
-- Schema outbox: nella stessa istanza PostgreSQL del servizio
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL, -- es. 'Order'
aggregate_id VARCHAR(255) NOT NULL, -- es. ID ordine
event_type VARCHAR(100) NOT NULL, -- es. 'OrderPlaced'
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ, -- NULL = non ancora pubblicato
retry_count INTEGER NOT NULL DEFAULT 0,
last_error TEXT
);
-- Index per il relay che legge gli eventi non pubblicati
CREATE INDEX idx_outbox_unpublished
ON outbox_events(created_at)
WHERE published_at IS NULL AND retry_count < 5;
-- Index per la pulizia dei vecchi record
CREATE INDEX idx_outbox_cleanup
ON outbox_events(published_at)
WHERE published_at IS NOT NULL;
// TypeScript: scrivi nella outbox nella stessa transazione
class OrderRepository {
constructor(private readonly db: Pool) {}
async saveWithEvents(
order: Order,
events: DomainEvent[]
): Promise<void> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// 1. Salva lo stato del dominio
await client.query(`
INSERT INTO orders (id, customer_id, total_amount, status)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id) DO UPDATE SET
total_amount = EXCLUDED.total_amount,
status = EXCLUDED.status
`, [order.id, order.customerId, order.totalAmount, order.status]);
// 2. Salva gli eventi nella tabella outbox
// STESSA TRANSAZIONE: atomico con il salvataggio dell'ordine
for (const event of events) {
await client.query(`
INSERT INTO outbox_events
(id, aggregate_type, aggregate_id, event_type, payload)
VALUES ($1, $2, $3, $4, $5)
`, [
event.eventId,
event.aggregateType,
event.aggregateId,
event.eventType,
JSON.stringify(event.payload),
]);
}
await client.query('COMMIT');
// A questo punto: o sia l'ordine che gli eventi sono nel DB,
// o nessuno dei due. Zero possibilita di stato inconsistente.
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
// Domain service
class OrderService {
constructor(
private readonly repo: OrderRepository,
private readonly eventBus: EventBus
) {}
async placeOrder(dto: PlaceOrderDto): Promise<Order> {
const order = Order.create(dto); // genera anche gli eventi nel dominio
const events = order.getUncommittedEvents();
// Singola operazione atomica: ordine + eventi outbox
await this.repo.saveWithEvents(order, events);
order.clearEvents();
return order;
}
}
ポーリングパブリッシャー: シンプルリレー
Il 投票発行元 そして別のプロセス (または cron ジョブ) 定期的に送信トレイ テーブルにクエリを実行して、未公開のイベントを検索します。 ブローカーに送信します。実装が簡単ですが、ポーリング遅延が発生します (通常は 100 ミリ秒~5 秒)。
// Polling Publisher: relay che legge outbox e pubblica su Kafka
class OutboxPollingPublisher {
private isRunning = false;
private readonly POLL_INTERVAL_MS = 500;
private readonly BATCH_SIZE = 100;
constructor(
private readonly db: Pool,
private readonly kafka: KafkaProducer
) {}
async start(): Promise<void> {
this.isRunning = true;
console.log('OutboxPollingPublisher started');
while (this.isRunning) {
try {
await this.publishPendingEvents();
} catch (error) {
console.error('OutboxPublisher error:', error);
// Continua dopo errore: non vogliamo che il publisher si fermi
}
await this.sleep(this.POLL_INTERVAL_MS);
}
}
private async publishPendingEvents(): Promise<void> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// SELECT FOR UPDATE SKIP LOCKED: evita conflitti con publisher multipli
const { rows } = await client.query(`
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE published_at IS NULL
AND retry_count < 5
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED
`, [this.BATCH_SIZE]);
if (rows.length === 0) {
await client.query('ROLLBACK');
return;
}
// Pubblica su Kafka in batch
const messages = rows.map((row) => ({
key: row.aggregate_id,
value: JSON.stringify({
eventId: row.id,
aggregateType: row.aggregate_type,
aggregateId: row.aggregate_id,
eventType: row.event_type,
payload: row.payload,
}),
headers: {
'event-type': row.event_type,
'aggregate-type': row.aggregate_type,
},
}));
await this.kafka.sendBatch({
topicMessages: [{
topic: `${rows[0].aggregate_type.toLowerCase()}-events`,
messages,
}],
});
// Marca come pubblicati (nella stessa transazione)
const ids = rows.map((r) => r.id);
await client.query(`
UPDATE outbox_events
SET published_at = NOW()
WHERE id = ANY($1)
`, [ids]);
await client.query('COMMIT');
console.log(`Published ${rows.length} events from outbox`);
} catch (error) {
await client.query('ROLLBACK');
// Incrementa retry count per gli eventi falliti
const ids = []; // recuperali dal context
await this.db.query(`
UPDATE outbox_events
SET retry_count = retry_count + 1,
last_error = $1
WHERE id = ANY($2)
`, [(error as Error).message, ids]);
throw error;
} finally {
client.release();
}
}
stop(): void {
this.isRunning = false;
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
Debezium CDC: ゼロレイテンシのための変更データ キャプチャ
ポーリング パブリッシャでは、レイテンシー (ポーリングの頻度) が発生し、データベースに負荷がかかります。 (継続的なクエリ)。 デベジウム — Kafka Connect エコシステムの一部 — は次のようになります 直接的には トランザクションログ データベース (PostgreSQL の WAL) を作成して公開する Kafka 上での変更はリアルタイムで行われ、レイテンシーは通常 100 ミリ秒未満です。
# docker-compose.yml: setup Debezium con PostgreSQL e Kafka
version: '3.8'
services:
postgres:
image: postgres:16
environment:
POSTGRES_DB: orders_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
command:
# Abilita il WAL logico necessario per Debezium
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=4"
- "-c"
- "max_wal_senders=4"
ports:
- "5432:5432"
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
kafka-connect:
image: debezium/connect:2.6
depends_on: [kafka, postgres]
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
# Registra il Debezium PostgreSQL Connector via REST API
# POST http://localhost:8083/connectors
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "orders_db",
"database.server.name": "orders",
"table.include.list": "public.outbox_events",
"plugin.name": "pgoutput",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "outbox.${routedByValue}.events"
}
}
# Questo crea automaticamente i topic:
# outbox.Order.events
# outbox.Payment.events
# outbox.Inventory.events
# basandosi sul campo aggregate_type nella tabella outbox
送信トレイテーブルのクリーンアップ
送信ボックス テーブルは時間の経過とともに増加します。クリーンアップ戦略を実行する 現在の操作に影響を与えることなく、サイズを制御できます。
// Job di cleanup: esegui ogni ora con pg_cron o un cron esterno
-- Cancella eventi pubblicati piu vecchi di 7 giorni
-- Usa DELETE con LIMIT per evitare lock estesi
DO $
DECLARE
deleted_count INTEGER;
BEGIN
LOOP
DELETE FROM outbox_events
WHERE id IN (
SELECT id FROM outbox_events
WHERE published_at IS NOT NULL
AND published_at < NOW() - INTERVAL '7 days'
LIMIT 1000
);
GET DIAGNOSTICS deleted_count = ROW_COUNT;
EXIT WHEN deleted_count < 1000;
-- Pausa tra batch per non sovraccaricare il DB
PERFORM pg_sleep(0.1);
END LOOP;
END $;
ポーリングパブリッシャーと CDC: いつどちらを使用するか
- 投票パブリッシャー: シンプルなセットアップ、追加コンポーネントなし、100ms ~ 5s の遅延。初心者や中量から中量のシステムに最適です。
- デベジウム CDC: 100 ミリ秒未満のレイテンシー、DB でのポーリング オーバーヘッドなし、Kafka Connect と WAL 構成が必要です。大量生産に最適です。
- 軽量の代替品: Kafka を使用せずにリアルタイム通知を必要とする PostgreSQL 専用システムの場合は、pg_notify + LISTEN。
結論と次のステップ
Outbox パターンは、イベント駆動型アーキテクチャの最も厄介な問題の 1 つを解決します。 イベントのアトミックな公開。鍵となるのは ACID トランザクションを活用することです 分散システムの代わりにデータベースをコーディネーターとして使用します。 Debezium CDC を使用すると、 リレーはほぼ透過的になり、遅延が最小限に抑えられ、アプリケーション クエリへの影響がゼロになります。







