コンシューマにおける冪等性: 重複排除と冪性キー
SQS、Kafka、または EventBridge を使用してイベント駆動型システムをセットアップしました。メッセージ それらは流れ、消費者はイベントを処理し、開発ではすべてが完璧に機能します。 次に、本番環境に移動すると、一部の注文確認メールが 2 回送信されていることがわかります。 場合によっては、特定の支払いが 2 回請求されることもあります。問題はそうではありません コードのバグ。これは最新の分散システムの基本的な特性です。
すべての主要なメッセージ ブローカー (SQS、Kafka、RabbitMQ、EventBridge) が保証します。 配達 少なくとも1回: メッセージは少なくとも 1 回配信されます ただし、複数回配信される場合があります。これは自動再試行の場合に発生します。 コンシューマ グループのリバランス、可視性タイムアウト、実行中のコンシューマ クラッシュ 処理。解決策はブローカーにあるのではなく、消費者にあります。消費者はこうでなければなりません 冪等.
何を学ぶか
- at-least-once システムでは必然的に重複メッセージが生成されるため
- 冪等性キー: 重複排除の基本パターン
- INSERT ON CONFLICT によるデータベースレベルのべき等性
- 高パフォーマンスを実現する TTL を使用した Redis ベースの重複排除
- 受信箱パターン: 1 回限りのセマンティクスのための構造化されたソリューション
- 自然な冪等性: 自然な冪等な操作を設計する方法
- 消費者の冪等性を検証するためのテスト戦略
メッセージの重複が避けられない理由
消費者の冪等性がなぜ必要なのかを理解するには、どのような場合に消費者が冪等するのかを理解する必要があります。 メッセージが複数回配信されます。本番環境での主なケース:
ケース 1: 処理後、ACK 前にコンシューマがクラッシュする
コンシューマはメッセージを正常に処理します (DB への書き込み、外部 API の呼び出し) しかし、ACK をブローカーに送信する前にクラッシュします。ブローカーはメッセージがそうでないとみなします 配信され、可視性タイムアウト後に送り返されます。新しい消費者 (または同じ消費者) 再起動後) メッセージを受信して再処理します。
ケース 2: 処理タイムアウト
SQSには、 可視性タイムアウト (デフォルトは 30 秒)。消費者が雇用する場合 タイムアウトを延長せずにメッセージを処理するには、SQS が 30 秒以上必要とする 他の消費者に見えるメッセージ。メッセージは 2 回処理されます 2 人の異なる消費者によって並行して実行されます。
ケース 3: Kafka Consumer Group のリバランス
Kafka コンシューマ グループのリバランス中 (コンシューマの追加/削除のため、 デプロイローリング)、一部のパーティションが再割り当てされます。来た消費者が 削除された場合はまだオフセットがコミットされていないため、そのバッチ内のメッセージが送信されます パーティションに割り当てられた新しいコンシューマによって再処理されます。
// Simulazione: perche i duplicati sono inevitabili
// Questo codice mostra IL PROBLEMA, non la soluzione
async function processPayment(message: SQSMessage): Promise<void> {
const { paymentId, amount, customerId } = JSON.parse(message.Body);
// Step 1: chiama l'API di pagamento esterna
await paymentGateway.charge(customerId, amount);
// ^^^ SUCCESSO: il pagamento e stato addebitato
// -- CRASH QUI --
// Il processo muore per OOM, segfault, deploy, ecc.
// Il pagamento e gia stato addebitato MA non abbiamo
// ancora eliminato il messaggio dalla coda SQS.
// SQS considera il messaggio non processato e lo
// rimanda dopo il visibility timeout.
await sqs.deleteMessage({
QueueUrl: QUEUE_URL,
ReceiptHandle: message.ReceiptHandle,
});
// ^^^ Mai eseguito se crashiamo sopra
}
// Risultato: il cliente viene addebitato due volte.
// Nessun bug nel codice. E' la natura del sistema at-least-once.
パターン冪等性キー
最も一般的な解決策は、 冪等性キー: 識別子 操作ごとに一意であり、コンシューマーがすでに処理済みかどうかを検出できるようになります。 このメッセージ。コンシューマは処理前にデータベースをチェックします。 すでに存在する場合は、サイレントでスキップします。存在しない場合は、キーを処理して保存します。
// Pattern base: Idempotency Key con PostgreSQL
// Schema tabella per il tracking
CREATE TABLE processed_messages (
message_id VARCHAR(255) PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
consumer_name VARCHAR(100) NOT NULL,
-- TTL gestito da un job di cleanup o da una policy di partizione
expires_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + INTERVAL '7 days'
);
CREATE INDEX idx_processed_messages_expires
ON processed_messages(expires_at);
-- Job di cleanup: esegui ogni ora
DELETE FROM processed_messages WHERE expires_at < NOW();
// TypeScript: consumer idempotente con PostgreSQL
interface ProcessedMessageRecord {
messageId: string;
processedAt: Date;
consumerName: string;
}
class IdempotentConsumer {
constructor(
private readonly db: Pool,
private readonly consumerName: string
) {}
async processMessage<T>(
messageId: string,
payload: T,
handler: (payload: T) => Promise<void>
): Promise<{ processed: boolean; skipped: boolean }> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// Tenta di inserire il messageId (fail se gia esiste)
const result = await client.query<ProcessedMessageRecord>(`
INSERT INTO processed_messages (message_id, consumer_name, expires_at)
VALUES ($1, $2, NOW() + INTERVAL '7 days')
ON CONFLICT (message_id) DO NOTHING
RETURNING message_id
`, [messageId, this.consumerName]);
if (result.rowCount === 0) {
// Gia processato: skip idempotente
await client.query('ROLLBACK');
console.log(`[${this.consumerName}] Skipping duplicate: ${messageId}`);
return { processed: false, skipped: true };
}
// Prima volta: esegui l'handler nella stessa transazione
await handler(payload);
await client.query('COMMIT');
return { processed: true, skipped: false };
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
// Utilizzo nel consumer SQS
const consumer = new IdempotentConsumer(db, 'payment-processor');
async function handlePaymentEvent(message: SQSMessage): Promise<void> {
const payload = JSON.parse(message.Body);
const messageId = message.MessageId; // ID univoco SQS
const { processed, skipped } = await consumer.processMessage(
messageId,
payload,
async (data) => {
await paymentGateway.charge(data.customerId, data.amount);
await db.query(
'UPDATE orders SET payment_status = $1 WHERE id = $2',
['paid', data.orderId]
);
}
);
if (skipped) {
// Log ma non errore: comportamento atteso
metrics.increment('consumer.duplicate_skipped');
}
}
Redis ベースの重複排除: 高パフォーマンス
PostgreSQL チェックでは、メッセージごとにデータベース クエリが導入されます。のために 高スループット システム (1 秒あたり数千のメッセージ) では、これは次のような問題になる可能性があります。 ボトルネック。 TTL を使用した Redis とその解決策: O(1) オペレーション、ミリ秒未満のレイテンシー、 自動有効期限のためのネイティブ TTL。
// Redis-based deduplication per alto throughput
import { Redis } from 'ioredis';
class RedisIdempotencyStore {
constructor(
private readonly redis: Redis,
private readonly ttlSeconds: number = 86400 // 24 ore default
) {}
// Ritorna true se e la PRIMA VOLTA che vediamo questa key
// Ritorna false se e un duplicato
async setIfAbsent(key: string): Promise<boolean> {
// SET key value NX EX ttl
// NX = solo se non esiste
// EX = TTL in secondi
const result = await this.redis.set(
`dedup:${key}`,
'1',
'EX',
this.ttlSeconds,
'NX'
);
return result === 'OK'; // 'OK' = primo inserimento, null = gia esisteva
}
async isProcessed(key: string): Promise<boolean> {
const exists = await this.redis.exists(`dedup:${key}`);
return exists === 1;
}
// Per operazioni atomiche: check + set in Lua script
async checkAndSet(key: string): Promise<boolean> {
const luaScript = `
local exists = redis.call('EXISTS', KEYS[1])
if exists == 0 then
redis.call('SET', KEYS[1], '1', 'EX', ARGV[1])
return 1
end
return 0
`;
const result = await this.redis.eval(
luaScript,
1,
`dedup:${key}`,
this.ttlSeconds.toString()
);
return result === 1;
}
}
// Consumer con Redis deduplication
class HighThroughputConsumer {
constructor(
private readonly dedup: RedisIdempotencyStore,
private readonly db: Pool
) {}
async handleKafkaMessage(
topic: string,
partition: number,
offset: string,
payload: OrderPayload
): Promise<void> {
// Componi una key univoca: topic + partition + offset
const messageKey = `${topic}-${partition}-${offset}`;
const isFirst = await this.dedup.setIfAbsent(messageKey);
if (!isFirst) {
// Duplicato: skip
return;
}
// Prima elaborazione: procedi
await this.processOrder(payload);
}
private async processOrder(payload: OrderPayload): Promise<void> {
await this.db.query(
'UPDATE inventory SET quantity = quantity - $1 WHERE product_id = $2',
[payload.quantity, payload.productId]
);
}
}
// ATTENZIONE: Redis ha durabilita limitata.
// Se Redis perde dati (AOF/RDB non aggiornati), i duplicati
// potrebbero passare dopo un crash Redis.
// Per operazioni critiche (pagamenti), usa sempre PostgreSQL.
受信箱パターン: 1 回限りのセマンティクス
パターン 受信箱 そして冪等性の最も堅牢なバージョン: メッセージは最初に (DB トランザクション内で) 受信箱テーブルに書き込まれます。 それから試してみました。処理中にクラッシュが発生した場合でも、正確に 1 回を保証します。
-- Schema Inbox Pattern
CREATE TABLE inbox_messages (
id UUID PRIMARY KEY,
source VARCHAR(100) NOT NULL, -- nome del producer/queue
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ, -- NULL = non ancora processato
error TEXT, -- NULL = successo o non processato
retry_count INTEGER NOT NULL DEFAULT 0
);
-- Index per il worker che processa i messaggi pendenti
CREATE INDEX idx_inbox_pending
ON inbox_messages(received_at)
WHERE processed_at IS NULL AND retry_count < 3;
// TypeScript: Inbox Pattern completo
class InboxProcessor {
constructor(private readonly db: Pool) {}
// Fase 1: scrivi nella inbox (idempotente grazie al PK)
async receiveMessage(message: IncomingMessage): Promise<void> {
await this.db.query(`
INSERT INTO inbox_messages (id, source, event_type, payload)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id) DO NOTHING
`, [
message.id,
message.source,
message.eventType,
JSON.stringify(message.payload)
]);
// Se il messaggio arriva due volte, ON CONFLICT DO NOTHING
// lo scarta silenziosamente
}
// Fase 2: worker che processa i messaggi dalla inbox
async processPendingMessages(): Promise<void> {
// Prendi un messaggio con SELECT FOR UPDATE SKIP LOCKED
// Evita che piu worker prendano lo stesso messaggio
const { rows } = await this.db.query(`
SELECT id, event_type, payload
FROM inbox_messages
WHERE processed_at IS NULL
AND retry_count < 3
ORDER BY received_at
LIMIT 1
FOR UPDATE SKIP LOCKED
`);
if (rows.length === 0) return;
const message = rows[0];
const client = await this.db.connect();
try {
await client.query('BEGIN');
// Esegui l'handler specifico per il tipo di evento
await this.dispatch(message.event_type, message.payload);
// Marca come processato nella stessa transazione
await client.query(`
UPDATE inbox_messages
SET processed_at = NOW(), error = NULL
WHERE id = $1
`, [message.id]);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
// Incrementa retry count e salva l'errore
await this.db.query(`
UPDATE inbox_messages
SET retry_count = retry_count + 1,
error = $1
WHERE id = $2
`, [(error as Error).message, message.id]);
} finally {
client.release();
}
}
private async dispatch(
eventType: string,
payload: unknown
): Promise<void> {
switch (eventType) {
case 'OrderPlaced':
await this.handleOrderPlaced(payload as OrderPayload);
break;
case 'PaymentReceived':
await this.handlePaymentReceived(payload as PaymentPayload);
break;
default:
throw new Error(`Unknown event type: ${eventType}`);
}
}
private async handleOrderPlaced(payload: OrderPayload): Promise<void> {
await this.db.query(
'UPDATE inventory SET reserved = reserved + $1 WHERE product_id = $2',
[payload.quantity, payload.productId]
);
}
private async handlePaymentReceived(payload: PaymentPayload): Promise<void> {
await this.db.query(
'UPDATE orders SET status = $1 WHERE id = $2',
['confirmed', payload.orderId]
);
}
}
自然冪等性: 自然のための冪等操作の設計
冪等性に対する最も洗練された解決策は、次のように操作を設計することです。 当然冪等です。複数回実行しても同じ結果が得られます。 単一の実行の。これにより、明示的な追跡の必要がなくなります。
// Operazioni naturalmente idempotenti vs non idempotenti
// NON IDEMPOTENTE: aggiornamento relativo
// Se eseguita due volte, l'inventory diventa -2 invece di -1
async function decrementInventory(productId: string, qty: number): Promise<void> {
await db.query(
'UPDATE inventory SET quantity = quantity - $1 WHERE product_id = $2',
[qty, productId]
);
}
// IDEMPOTENTE: aggiornamento assoluto con versioning
// Usa il numero dell'ordine come "target state"
async function setInventoryForOrder(
productId: string,
orderId: string,
newQuantity: number
): Promise<void> {
await db.query(`
INSERT INTO inventory_reservations (order_id, product_id, quantity)
VALUES ($1, $2, $3)
ON CONFLICT (order_id, product_id)
DO UPDATE SET quantity = EXCLUDED.quantity
`, [orderId, productId, newQuantity]);
}
// NON IDEMPOTENTE: INSERT senza conflict handling
async function createPaymentRecord(payment: Payment): Promise<void> {
await db.query(
'INSERT INTO payments (id, order_id, amount) VALUES ($1, $2, $3)',
[payment.id, payment.orderId, payment.amount]
);
// Fallisce con unique constraint la seconda volta
}
// IDEMPOTENTE: UPSERT con ON CONFLICT DO NOTHING
async function upsertPaymentRecord(payment: Payment): Promise<void> {
await db.query(`
INSERT INTO payments (id, order_id, amount, status)
VALUES ($1, $2, $3, 'pending')
ON CONFLICT (id) DO NOTHING
`, [payment.id, payment.orderId, payment.amount]);
}
// IDEMPOTENTE: update a stato finale (state machine idempotente)
// Transitare da 'confirmed' a 'confirmed' non cambia nulla
async function markOrderAsShipped(orderId: string): Promise<void> {
await db.query(`
UPDATE orders
SET status = 'shipped', shipped_at = COALESCE(shipped_at, NOW())
WHERE id = $1
AND status IN ('confirmed', 'processing')
`, [orderId]);
// Se lo stato e gia 'shipped', la WHERE non matcha: no-op
}
SQS レベルの重複排除
SQS FIFO キューは、ネイティブの重複排除を提供します。 メッセージ重複排除ID。 重複排除間隔内に送信された同じ重複排除 ID を持つメッセージ (5分)は1回のみ配信されます。冪等性の必要性が排除されるわけではありません 消費者側ですが、重複は大幅に減少します。
// AWS SDK v3: invio su SQS FIFO con MessageDeduplicationId
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
const sqs = new SQSClient({ region: 'eu-west-1' });
async function publishOrderEvent(
orderId: string,
eventType: string,
payload: unknown
): Promise<void> {
// MessageDeduplicationId: hash del contenuto o ID evento univoco
// Stesso ID = stesso messaggio entro 5 minuti = consegnato una sola volta
const deduplicationId = `${eventType}-${orderId}-${Date.now()}`;
await sqs.send(new SendMessageCommand({
QueueUrl: 'https://sqs.eu-west-1.amazonaws.com/123456/orders.fifo',
MessageBody: JSON.stringify(payload),
MessageGroupId: orderId, // Ordine FIFO per stesso ordine
MessageDeduplicationId: deduplicationId,
MessageAttributes: {
EventType: {
DataType: 'String',
StringValue: eventType,
},
},
}));
}
// SQS Standard Queue: nessuna deduplication nativa
// Puoi usare l'Attribute MessageId come idempotency key nel consumer
async function handleSqsStandardMessage(msg: SQSMessage): Promise<void> {
// msg.MessageId e univoco per invio, ma se SQS rimanda il messaggio
// il MessageId rimane lo stesso. Usalo come idempotency key.
const idempotencyKey = msg.MessageId;
await consumer.processMessage(idempotencyKey, JSON.parse(msg.Body), handler);
}
冪等性テスト
べき等コンシューマは明示的にテストする必要があります。テストするだけでは十分ではありません。 通常の場合。同じメッセージが届いたときに何が起こるかをテストする必要がある 複数の並列コンシューマーを使用して、2 回、10 回。
// Test suite per consumer idempotente
describe('IdempotentPaymentConsumer', () => {
let consumer: IdempotentConsumer;
let db: Pool;
beforeEach(async () => {
db = await createTestDatabase();
consumer = new IdempotentConsumer(db, 'payment-test');
await db.query('DELETE FROM processed_messages');
});
it('should process message exactly once on first delivery', async () => {
const messageId = 'msg-001';
const payload = { orderId: 'ord-001', amount: 100 };
let callCount = 0;
const handler = async () => { callCount++; };
const result = await consumer.processMessage(messageId, payload, handler);
expect(result.processed).toBe(true);
expect(result.skipped).toBe(false);
expect(callCount).toBe(1);
});
it('should skip duplicate message silently', async () => {
const messageId = 'msg-001';
const payload = { orderId: 'ord-001', amount: 100 };
let callCount = 0;
const handler = async () => { callCount++; };
// Prima consegna
await consumer.processMessage(messageId, payload, handler);
// Seconda consegna (duplicato)
const result = await consumer.processMessage(messageId, payload, handler);
expect(result.processed).toBe(false);
expect(result.skipped).toBe(true);
expect(callCount).toBe(1); // Handler chiamato solo una volta
});
it('should handle concurrent duplicate messages correctly', async () => {
const messageId = 'msg-concurrent';
const payload = { orderId: 'ord-002', amount: 200 };
let callCount = 0;
const handler = async () => {
callCount++;
// Simula elaborazione lenta per forzare concorrenza
await new Promise((resolve) => setTimeout(resolve, 100));
};
// Simula 5 consumer che ricevono lo stesso messaggio in parallelo
const results = await Promise.allSettled([
consumer.processMessage(messageId, payload, handler),
consumer.processMessage(messageId, payload, handler),
consumer.processMessage(messageId, payload, handler),
consumer.processMessage(messageId, payload, handler),
consumer.processMessage(messageId, payload, handler),
]);
const processed = results.filter(
(r) => r.status === 'fulfilled' && r.value.processed
).length;
// Solo uno deve essere processato, gli altri skippati
expect(processed).toBe(1);
expect(callCount).toBe(1);
});
});
アンチパターン: メモリ内のみのべき等性
処理されたメッセージを追跡するために、メモリ内のセットまたはマップを使用しないでください。 プロセスが再起動されると、メモリとすべてのメッセージが失われます 以前に処理されたものは再度処理されます。冪等性ストア 永続的である必要があります (PostgreSQL、AOF を備えた Redis、DynamoDB)。
結論と次のステップ
コンシューマの冪等性はシステムにおけるオプションの最適化ではありません イベント駆動の実稼働グレード: これは基本的な要件です。次の間の選択 PostgreSQL ベースの重複排除、Redis、および受信トレイ パターンはレベルに応じて異なります 必要な耐久性とシステム スループット。重要な操作用 (支払い、取り消しできないステータス更新)、受信トレイ パターンは以下を提供します。 1 回限りのセマンティクスを最大限に保証します。







