소비자의 멱등성: 중복 제거 및 멱등성 키
SQS, Kafka 또는 EventBridge를 사용하여 이벤트 기반 시스템을 설정했습니다. 메시지 흐름이 흐르고 소비자가 이벤트를 처리하며 개발 과정에서 모든 것이 완벽하게 작동합니다. 그런 다음 프로덕션으로 이동하여 일부 주문 확인 이메일이 두 번 전송되었음을 확인했습니다. 더 나쁜 경우에는 특정 결제 금액이 두 번 청구되는 경우도 있습니다. 문제는 그렇지 않다 코드의 버그: 이는 현대 분산 시스템의 기본 속성입니다.
SQS, Kafka, RabbitMQ, EventBridge 등 모든 주요 메시지 브로커는 보증을 보장합니다. 배달 적어도 한 번은: 메시지가 한 번 이상 전달됩니다. 시간은 있지만 여러 번 배달될 수 있습니다. 이는 자동 재시도에서 발생합니다. 소비자 그룹 재조정, 가시성 시간 초과, 소비자 충돌 처리. 해결책은 브로커에 있는 것이 아니라 소비자에게 있습니다. 소비자는 다음과 같아야합니다. 멱등성.
무엇을 배울 것인가
- 최소 한 번 이상 시스템에서는 필연적으로 중복 메시지가 생성되기 때문입니다.
- 멱등성 키: 중복 제거의 기본 패턴
- INSERT ON CONFLICT를 사용한 데이터베이스 수준 멱등성
- 고성능을 위한 TTL을 사용한 Redis 기반 중복 제거
- 받은 편지함 패턴: 정확히 한 번 의미 체계를 위한 구조화된 솔루션
- 자연적인 멱등성: 자연적으로 멱등성 연산을 설계하는 방법
- 소비자 멱등성을 확인하기 위한 테스트 전략
중복 메시지가 불가피한 이유
소비자 멱등성이 필요한 이유를 이해하려면 다음을 이해해야 합니다. 메시지가 두 번 이상 전달되었습니다. 생산의 주요 사례:
사례 1: 처리 후 ACK 전 소비자 충돌
소비자가 메시지를 성공적으로 처리합니다(DB에 쓰기, 외부 API 호출). 하지만 브로커에게 ACK를 보내기 전에 충돌이 발생합니다. 브로커는 메시지를 다음과 같이 간주하지 않습니다. 공개 시간이 초과된 후 다시 전송됩니다. 새로운 소비자(또는 동일한 소비자) 다시 시작한 후)는 메시지를 수신하고 다시 처리합니다.
사례 2: 처리 시간 초과
SQS는 공개 시간 초과 (기본값은 30초) 소비자가 고용하는 경우 시간 초과를 연장하지 않고 메시지를 처리하는 데 30초 이상이 소요되면 SQS는 다른 소비자에게 표시되는 메시지입니다. 메시지가 두 번 처리됩니다. 두 명의 다른 소비자가 동시에 사용합니다.
사례 3: Kafka 소비자 그룹 재조정
Kafka 소비자 그룹의 재조정 중(소비자 추가/제거를 위해, 배포 롤링), 일부 파티션이 재할당됩니다. 소비자가 찾아오면 제거됨이 아직 오프셋을 커밋하지 않은 경우 해당 배치의 메시지는 파티션에 할당된 새 소비자에 의해 다시 처리됩니다.
// Simulazione: perche i duplicati sono inevitabili
// Questo codice mostra IL PROBLEMA, non la soluzione
async function processPayment(message: SQSMessage): Promise<void> {
const { paymentId, amount, customerId } = JSON.parse(message.Body);
// Step 1: chiama l'API di pagamento esterna
await paymentGateway.charge(customerId, amount);
// ^^^ SUCCESSO: il pagamento e stato addebitato
// -- CRASH QUI --
// Il processo muore per OOM, segfault, deploy, ecc.
// Il pagamento e gia stato addebitato MA non abbiamo
// ancora eliminato il messaggio dalla coda SQS.
// SQS considera il messaggio non processato e lo
// rimanda dopo il visibility timeout.
await sqs.deleteMessage({
QueueUrl: QUEUE_URL,
ReceiptHandle: message.ReceiptHandle,
});
// ^^^ Mai eseguito se crashiamo sopra
}
// Risultato: il cliente viene addebitato due volte.
// Nessun bug nel codice. E' la natura del sistema at-least-once.
패턴 멱등성 키
가장 일반적인 해결책은 멱등성 키: 식별자 소비자가 이미 처리되었는지 여부를 감지할 수 있도록 하는 각 작업마다 고유합니다. 이 메시지. 소비자는 처리하기 전에 데이터베이스를 확인합니다. 이미 존재합니다. 자동으로 건너뜁니다. 존재하지 않는 경우 키를 처리하고 저장합니다.
// Pattern base: Idempotency Key con PostgreSQL
// Schema tabella per il tracking
CREATE TABLE processed_messages (
message_id VARCHAR(255) PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
consumer_name VARCHAR(100) NOT NULL,
-- TTL gestito da un job di cleanup o da una policy di partizione
expires_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + INTERVAL '7 days'
);
CREATE INDEX idx_processed_messages_expires
ON processed_messages(expires_at);
-- Job di cleanup: esegui ogni ora
DELETE FROM processed_messages WHERE expires_at < NOW();
// TypeScript: consumer idempotente con PostgreSQL
interface ProcessedMessageRecord {
messageId: string;
processedAt: Date;
consumerName: string;
}
class IdempotentConsumer {
constructor(
private readonly db: Pool,
private readonly consumerName: string
) {}
async processMessage<T>(
messageId: string,
payload: T,
handler: (payload: T) => Promise<void>
): Promise<{ processed: boolean; skipped: boolean }> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// Tenta di inserire il messageId (fail se gia esiste)
const result = await client.query<ProcessedMessageRecord>(`
INSERT INTO processed_messages (message_id, consumer_name, expires_at)
VALUES ($1, $2, NOW() + INTERVAL '7 days')
ON CONFLICT (message_id) DO NOTHING
RETURNING message_id
`, [messageId, this.consumerName]);
if (result.rowCount === 0) {
// Gia processato: skip idempotente
await client.query('ROLLBACK');
console.log(`[${this.consumerName}] Skipping duplicate: ${messageId}`);
return { processed: false, skipped: true };
}
// Prima volta: esegui l'handler nella stessa transazione
await handler(payload);
await client.query('COMMIT');
return { processed: true, skipped: false };
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
// Utilizzo nel consumer SQS
const consumer = new IdempotentConsumer(db, 'payment-processor');
async function handlePaymentEvent(message: SQSMessage): Promise<void> {
const payload = JSON.parse(message.Body);
const messageId = message.MessageId; // ID univoco SQS
const { processed, skipped } = await consumer.processMessage(
messageId,
payload,
async (data) => {
await paymentGateway.charge(data.customerId, data.amount);
await db.query(
'UPDATE orders SET payment_status = $1 WHERE id = $2',
['paid', data.orderId]
);
}
);
if (skipped) {
// Log ma non errore: comportamento atteso
metrics.increment('consumer.duplicate_skipped');
}
}
Redis 기반 중복 제거: 고성능
PostgreSQL 검사는 각 메시지에 대한 데이터베이스 쿼리를 도입합니다. 에 대한 높은 처리량 시스템(초당 수천 개의 메시지)을 사용하면 이는 병목 현상. TTL을 사용하는 Redis 및 솔루션: O(1) 작업, 밀리초 미만의 대기 시간, 자동 만료를 위한 기본 TTL입니다.
// Redis-based deduplication per alto throughput
import { Redis } from 'ioredis';
class RedisIdempotencyStore {
constructor(
private readonly redis: Redis,
private readonly ttlSeconds: number = 86400 // 24 ore default
) {}
// Ritorna true se e la PRIMA VOLTA che vediamo questa key
// Ritorna false se e un duplicato
async setIfAbsent(key: string): Promise<boolean> {
// SET key value NX EX ttl
// NX = solo se non esiste
// EX = TTL in secondi
const result = await this.redis.set(
`dedup:${key}`,
'1',
'EX',
this.ttlSeconds,
'NX'
);
return result === 'OK'; // 'OK' = primo inserimento, null = gia esisteva
}
async isProcessed(key: string): Promise<boolean> {
const exists = await this.redis.exists(`dedup:${key}`);
return exists === 1;
}
// Per operazioni atomiche: check + set in Lua script
async checkAndSet(key: string): Promise<boolean> {
const luaScript = `
local exists = redis.call('EXISTS', KEYS[1])
if exists == 0 then
redis.call('SET', KEYS[1], '1', 'EX', ARGV[1])
return 1
end
return 0
`;
const result = await this.redis.eval(
luaScript,
1,
`dedup:${key}`,
this.ttlSeconds.toString()
);
return result === 1;
}
}
// Consumer con Redis deduplication
class HighThroughputConsumer {
constructor(
private readonly dedup: RedisIdempotencyStore,
private readonly db: Pool
) {}
async handleKafkaMessage(
topic: string,
partition: number,
offset: string,
payload: OrderPayload
): Promise<void> {
// Componi una key univoca: topic + partition + offset
const messageKey = `${topic}-${partition}-${offset}`;
const isFirst = await this.dedup.setIfAbsent(messageKey);
if (!isFirst) {
// Duplicato: skip
return;
}
// Prima elaborazione: procedi
await this.processOrder(payload);
}
private async processOrder(payload: OrderPayload): Promise<void> {
await this.db.query(
'UPDATE inventory SET quantity = quantity - $1 WHERE product_id = $2',
[payload.quantity, payload.productId]
);
}
}
// ATTENZIONE: Redis ha durabilita limitata.
// Se Redis perde dati (AOF/RDB non aggiornati), i duplicati
// potrebbero passare dopo un crash Redis.
// Per operazioni critiche (pagamenti), usa sempre PostgreSQL.
받은 편지함 패턴: 정확히 한 번 의미 체계
패턴 받은편지함 그리고 가장 강력한 멱등성 버전: 메시지는 먼저 받은 편지함 테이블(DB 트랜잭션 내)에 기록됩니다. 그런 다음 시도했습니다. 처리 중 충돌이 발생하더라도 정확히 한 번만 보장합니다.
-- Schema Inbox Pattern
CREATE TABLE inbox_messages (
id UUID PRIMARY KEY,
source VARCHAR(100) NOT NULL, -- nome del producer/queue
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ, -- NULL = non ancora processato
error TEXT, -- NULL = successo o non processato
retry_count INTEGER NOT NULL DEFAULT 0
);
-- Index per il worker che processa i messaggi pendenti
CREATE INDEX idx_inbox_pending
ON inbox_messages(received_at)
WHERE processed_at IS NULL AND retry_count < 3;
// TypeScript: Inbox Pattern completo
class InboxProcessor {
constructor(private readonly db: Pool) {}
// Fase 1: scrivi nella inbox (idempotente grazie al PK)
async receiveMessage(message: IncomingMessage): Promise<void> {
await this.db.query(`
INSERT INTO inbox_messages (id, source, event_type, payload)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id) DO NOTHING
`, [
message.id,
message.source,
message.eventType,
JSON.stringify(message.payload)
]);
// Se il messaggio arriva due volte, ON CONFLICT DO NOTHING
// lo scarta silenziosamente
}
// Fase 2: worker che processa i messaggi dalla inbox
async processPendingMessages(): Promise<void> {
// Prendi un messaggio con SELECT FOR UPDATE SKIP LOCKED
// Evita che piu worker prendano lo stesso messaggio
const { rows } = await this.db.query(`
SELECT id, event_type, payload
FROM inbox_messages
WHERE processed_at IS NULL
AND retry_count < 3
ORDER BY received_at
LIMIT 1
FOR UPDATE SKIP LOCKED
`);
if (rows.length === 0) return;
const message = rows[0];
const client = await this.db.connect();
try {
await client.query('BEGIN');
// Esegui l'handler specifico per il tipo di evento
await this.dispatch(message.event_type, message.payload);
// Marca come processato nella stessa transazione
await client.query(`
UPDATE inbox_messages
SET processed_at = NOW(), error = NULL
WHERE id = $1
`, [message.id]);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
// Incrementa retry count e salva l'errore
await this.db.query(`
UPDATE inbox_messages
SET retry_count = retry_count + 1,
error = $1
WHERE id = $2
`, [(error as Error).message, message.id]);
} finally {
client.release();
}
}
private async dispatch(
eventType: string,
payload: unknown
): Promise<void> {
switch (eventType) {
case 'OrderPlaced':
await this.handleOrderPlaced(payload as OrderPayload);
break;
case 'PaymentReceived':
await this.handlePaymentReceived(payload as PaymentPayload);
break;
default:
throw new Error(`Unknown event type: ${eventType}`);
}
}
private async handleOrderPlaced(payload: OrderPayload): Promise<void> {
await this.db.query(
'UPDATE inventory SET reserved = reserved + $1 WHERE product_id = $2',
[payload.quantity, payload.productId]
);
}
private async handlePaymentReceived(payload: PaymentPayload): Promise<void> {
await this.db.query(
'UPDATE orders SET status = $1 WHERE id = $2',
['confirmed', payload.orderId]
);
}
}
자연적 멱등성: 자연에 대한 멱등성 연산 설계
멱등성에 대한 가장 우아한 해결책은 다음과 같이 작업을 설계하는 것입니다. 본질적으로 멱등성을 갖습니다. 여러 번 실행하면 동일한 결과가 생성됩니다. 단일 실행의. 이렇게 하면 명시적인 추적이 필요하지 않습니다.
// Operazioni naturalmente idempotenti vs non idempotenti
// NON IDEMPOTENTE: aggiornamento relativo
// Se eseguita due volte, l'inventory diventa -2 invece di -1
async function decrementInventory(productId: string, qty: number): Promise<void> {
await db.query(
'UPDATE inventory SET quantity = quantity - $1 WHERE product_id = $2',
[qty, productId]
);
}
// IDEMPOTENTE: aggiornamento assoluto con versioning
// Usa il numero dell'ordine come "target state"
async function setInventoryForOrder(
productId: string,
orderId: string,
newQuantity: number
): Promise<void> {
await db.query(`
INSERT INTO inventory_reservations (order_id, product_id, quantity)
VALUES ($1, $2, $3)
ON CONFLICT (order_id, product_id)
DO UPDATE SET quantity = EXCLUDED.quantity
`, [orderId, productId, newQuantity]);
}
// NON IDEMPOTENTE: INSERT senza conflict handling
async function createPaymentRecord(payment: Payment): Promise<void> {
await db.query(
'INSERT INTO payments (id, order_id, amount) VALUES ($1, $2, $3)',
[payment.id, payment.orderId, payment.amount]
);
// Fallisce con unique constraint la seconda volta
}
// IDEMPOTENTE: UPSERT con ON CONFLICT DO NOTHING
async function upsertPaymentRecord(payment: Payment): Promise<void> {
await db.query(`
INSERT INTO payments (id, order_id, amount, status)
VALUES ($1, $2, $3, 'pending')
ON CONFLICT (id) DO NOTHING
`, [payment.id, payment.orderId, payment.amount]);
}
// IDEMPOTENTE: update a stato finale (state machine idempotente)
// Transitare da 'confirmed' a 'confirmed' non cambia nulla
async function markOrderAsShipped(orderId: string): Promise<void> {
await db.query(`
UPDATE orders
SET status = 'shipped', shipped_at = COALESCE(shipped_at, NOW())
WHERE id = $1
AND status IN ('confirmed', 'processing')
`, [orderId]);
// Se lo stato e gia 'shipped', la WHERE non matcha: no-op
}
SQS 수준 중복 제거
SQS FIFO 대기열은 다음을 통해 기본 중복 제거 기능을 제공합니다. 메시지 중복 제거 ID. 중복 제거 간격에 동일한 중복 제거 ID를 가진 메시지가 전송됩니다. (5분)은 1회만 전달됩니다. 멱등성의 필요성이 제거되지는 않습니다. 소비자 측에서는 중복을 크게 줄입니다.
// AWS SDK v3: invio su SQS FIFO con MessageDeduplicationId
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
const sqs = new SQSClient({ region: 'eu-west-1' });
async function publishOrderEvent(
orderId: string,
eventType: string,
payload: unknown
): Promise<void> {
// MessageDeduplicationId: hash del contenuto o ID evento univoco
// Stesso ID = stesso messaggio entro 5 minuti = consegnato una sola volta
const deduplicationId = `${eventType}-${orderId}-${Date.now()}`;
await sqs.send(new SendMessageCommand({
QueueUrl: 'https://sqs.eu-west-1.amazonaws.com/123456/orders.fifo',
MessageBody: JSON.stringify(payload),
MessageGroupId: orderId, // Ordine FIFO per stesso ordine
MessageDeduplicationId: deduplicationId,
MessageAttributes: {
EventType: {
DataType: 'String',
StringValue: eventType,
},
},
}));
}
// SQS Standard Queue: nessuna deduplication nativa
// Puoi usare l'Attribute MessageId come idempotency key nel consumer
async function handleSqsStandardMessage(msg: SQSMessage): Promise<void> {
// msg.MessageId e univoco per invio, ma se SQS rimanda il messaggio
// il MessageId rimane lo stesso. Usalo come idempotency key.
const idempotencyKey = msg.MessageId;
await consumer.processMessage(idempotencyKey, JSON.parse(msg.Body), handler);
}
멱등성 테스트
멱등성 소비자는 명시적으로 테스트되어야 합니다. 테스트하는 것만으로는 충분하지 않습니다. 정상적인 경우. 동일한 메시지가 도착하면 어떤 일이 발생하는지 테스트해야 합니다. 여러 병렬 소비자를 사용하여 두 번, 열 번.
// Test suite per consumer idempotente
describe('IdempotentPaymentConsumer', () => {
let consumer: IdempotentConsumer;
let db: Pool;
beforeEach(async () => {
db = await createTestDatabase();
consumer = new IdempotentConsumer(db, 'payment-test');
await db.query('DELETE FROM processed_messages');
});
it('should process message exactly once on first delivery', async () => {
const messageId = 'msg-001';
const payload = { orderId: 'ord-001', amount: 100 };
let callCount = 0;
const handler = async () => { callCount++; };
const result = await consumer.processMessage(messageId, payload, handler);
expect(result.processed).toBe(true);
expect(result.skipped).toBe(false);
expect(callCount).toBe(1);
});
it('should skip duplicate message silently', async () => {
const messageId = 'msg-001';
const payload = { orderId: 'ord-001', amount: 100 };
let callCount = 0;
const handler = async () => { callCount++; };
// Prima consegna
await consumer.processMessage(messageId, payload, handler);
// Seconda consegna (duplicato)
const result = await consumer.processMessage(messageId, payload, handler);
expect(result.processed).toBe(false);
expect(result.skipped).toBe(true);
expect(callCount).toBe(1); // Handler chiamato solo una volta
});
it('should handle concurrent duplicate messages correctly', async () => {
const messageId = 'msg-concurrent';
const payload = { orderId: 'ord-002', amount: 200 };
let callCount = 0;
const handler = async () => {
callCount++;
// Simula elaborazione lenta per forzare concorrenza
await new Promise((resolve) => setTimeout(resolve, 100));
};
// Simula 5 consumer che ricevono lo stesso messaggio in parallelo
const results = await Promise.allSettled([
consumer.processMessage(messageId, payload, handler),
consumer.processMessage(messageId, payload, handler),
consumer.processMessage(messageId, payload, handler),
consumer.processMessage(messageId, payload, handler),
consumer.processMessage(messageId, payload, handler),
]);
const processed = results.filter(
(r) => r.status === 'fulfilled' && r.value.processed
).length;
// Solo uno deve essere processato, gli altri skippati
expect(processed).toBe(1);
expect(callCount).toBe(1);
});
});
안티 패턴: 메모리 내에서만 멱등성
처리된 메시지를 추적하기 위해 메모리 내 세트 또는 맵을 사용하지 마십시오. 프로세스가 다시 시작되면 메모리와 모든 메시지가 손실됩니다. 이전에 처리된 내용이 다시 처리됩니다. 멱등성 저장소 지속적이어야 합니다(PostgreSQL, AOF가 있는 Redis, DynamoDB).
결론 및 다음 단계
소비자 멱등성은 시스템에서 선택적 최적화가 아닙니다. 이벤트 기반 프로덕션 등급: 이는 기본적인 요구 사항입니다. 사이의 선택 PostgreSQL 기반 중복제거, Redis, Inbox 패턴은 레벨에 따라 다름 내구성과 시스템 처리량이 필요합니다. 중요한 작업용 (결제, 되돌릴 수 없는 상태 업데이트) Inbox 패턴은 다음을 제공합니다. 정확히 한 번 의미론을 최대한 보장합니다.







