Pub/Sub와 Redis Streams: 근본적인 차이점, 사용 사례 및 소비자 그룹
Redis Pub/Sub는 즉시 실행이 가능합니다. Streams는 다음과 같은 영구 로그입니다. 소비자 그룹 - 차이점을 이해하면 아키텍처가 변경됩니다. 구성 분산 처리를 위한 소비자 그룹은 메시지가 손실되지 않도록 합니다.
Redis의 두 가지 메시징 패러다임
Redis는 근본적으로 다른 두 가지 메시징 메커니즘을 제공합니다. 게시/구독 실시간 방송 시스템입니다. 모든 사람에게 메시지가 전달됩니다. 연결된 가입자 그 순간 그러다가 잊어버렸어요. 가입자가 그렇지 않은 경우 연결되면 메시지가 영원히 손실됩니다.
레디스 스트림 (Redis 5.0에서 도입됨)은 추가 전용 로그입니다. Apache Kafka 스트림 개념에서 지속적으로 영감을 받았습니다. 메시지는 보존됩니다 명시적으로 삭제하지 않는 한 다시 읽을 수 있습니다. 승인을 통한 분산 처리를 위해 소비자 그룹을 지원합니다.
무엇을 배울 것인가
- Pub/Sub: 글로빙 패턴을 사용한 SUBSCRIBE, PUBLISH, PSUBSCRIBE
- Pub/Sub와 Streams를 사용하는 경우: 주요 차이점
- 스트림: 추가 전용 로그용 XADD, XREAD, XRANGE
- 소비자 그룹: XGROUP CREATE, XREADGROUP, XACK
- 대기 메시지 최소 1회 전달 및 관리
- 패턴: Streams를 사용한 이벤트 소싱 및 감사 로그
Pub/Sub: 실시간 방송
Pub/Sub는 일부 정보가 손실된 실시간 알림에 이상적입니다. 메시지는 허용됩니다: 실시간 가격 업데이트, 푸시 알림, 채팅방, 분산캐시 무효화.
# Terminal 1: Subscriber
SUBSCRIBE notifications:user:1001
# Attende messaggi sul canale...
# Quando arriva un messaggio:
# 1) "message"
# 2) "notifications:user:1001"
# 3) "{"type":"payment","amount":100}"
# Puoi iscriverti a piu canali
SUBSCRIBE channel:1 channel:2 channel:3
# PSUBSCRIBE: subscribe con pattern glob
PSUBSCRIBE notifications:user:* # Tutti gli utenti
PSUBSCRIBE events:order:* # Tutti gli ordini
# Terminal 2: Publisher
PUBLISH notifications:user:1001 '{"type":"payment","amount":100}'
# (integer) 1 -- numero di subscriber che hanno ricevuto il messaggio
PUBLISH notifications:user:1001 '{"type":"login","ip":"192.168.1.1"}'
# (integer) 1
# Se nessun subscriber e connesso:
PUBLISH notifications:user:9999 '{"type":"alert"}'
# (integer) 0 -- messaggio PERSO, nessuno lo ricevera mai
# Python: Pub/Sub con redis-py
import redis
import json
import threading
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# --- SUBSCRIBER ---
def message_handler(message):
if message['type'] == 'message':
channel = message['channel']
data = json.loads(message['data'])
print(f"[{channel}] Received: {data}")
def start_subscriber(user_id: int):
"""Subscriber in background thread."""
sub = r.pubsub()
sub.subscribe(**{
f'notifications:user:{user_id}': message_handler,
'notifications:broadcast': message_handler,
})
# Pattern subscribe
sub.psubscribe(**{'events:*': message_handler})
# Blocca in ascolto (in un thread separato)
thread = sub.run_in_thread(sleep_time=0.01, daemon=True)
return thread, sub
# --- PUBLISHER ---
def publish_notification(user_id: int, notification_type: str, data: dict):
"""Pubblica una notifica a un utente specifico."""
payload = json.dumps({'type': notification_type, **data})
subscribers_count = r.publish(f'notifications:user:{user_id}', payload)
print(f"Delivered to {subscribers_count} subscribers")
return subscribers_count
# Avvia subscriber
thread, pubsub = start_subscriber(user_id=1001)
# Pubblica messaggi
import time
time.sleep(0.1) # Aspetta che subscriber sia pronto
publish_notification(1001, 'payment', {'amount': 100, 'currency': 'EUR'})
publish_notification(1001, 'login', {'ip': '192.168.1.1', 'city': 'Milano'})
# IMPORTANTE: Pub/Sub non e' persistente
# Se il subscriber si disconnette, perde tutti i messaggi inviati nel frattempo
time.sleep(0.5)
pubsub.unsubscribe()
thread.stop()
Redis Streams: 영구 및 추가 전용 로그
Streams는 완전히 다른 모델을 사용합니다. 각 메시지에는 고유한 ID가 있습니다. (타임스탬프-시퀀스), 로그에 유지되며 다시 읽을 수 있습니다. 언제든지. 이벤트 소싱, 감사 로그, 작업 대기열 및 메시지 손실이 허용되지 않는 모든 시나리오.
# Redis Streams: XADD e XREAD
# XADD: aggiunge un messaggio allo stream
# ID auto-generato: {milliseconds}-{sequence}
XADD orders:stream * user_id 1001 product_id 555 quantity 2 total 89.99
# "1710000001234-0" <-- ID generato automaticamente
XADD orders:stream * user_id 1002 product_id 123 quantity 1 total 29.99
# "1710000002345-0"
# ID esplicito (raramente necessario)
XADD orders:stream 1710000003000-0 user_id 1003 product_id 789 quantity 3 total 149.97
# XLEN: numero di messaggi nello stream
XLEN orders:stream # 3
# XRANGE: leggi messaggi da un range di ID (o -)
XRANGE orders:stream - +
# 1) 1) "1710000001234-0"
# 2) 1) "user_id"
# 2) "1001"
# 3) "product_id"
# 4) "555"
# 5) "quantity"
# 6) "2"
# 7) "total"
# 8) "89.99"
# ...
# XRANGE con COUNT per paginazione
XRANGE orders:stream - + COUNT 10
# XREVRANGE: ordine inverso (dal piu recente)
XREVRANGE orders:stream + - COUNT 5
# XREAD: leggi messaggi dall'ultimo ID letto (blocking possible)
XREAD COUNT 10 STREAMS orders:stream 0 # Da ID 0, leggi al massimo 10
# Leggi solo messaggi nuovi ($ = solo futuri)
XREAD COUNT 10 BLOCK 5000 STREAMS orders:stream $
# Blocca per max 5 secondi aspettando nuovi messaggi
소비자 그룹: 승인을 통한 분산 처리
소비자 그룹은 Redis Streams의 가장 강력한 기능입니다. 그들은 허용한다 여러 소비자가 동일한 스트림의 메시지를 분산 방식으로 처리합니다. 각 메시지는 그룹 내 정확히 한 명의 소비자에 의해 처리된다는 보장이 있습니다. 확인되지 않은 메시지를 자동으로 다시 전달합니다.
# Consumer Groups: setup e utilizzo
# 1. Crea il Consumer Group
# $ = inizia dai messaggi futuri, 0 = include messaggi esistenti
XGROUP CREATE orders:stream order-processors $ MKSTREAM
# "OK"
# Crea gruppo che inizia dall'inizio dello stream
XGROUP CREATE orders:stream order-processors-v2 0
# 2. Consumer legge messaggi dal gruppo
# > = leggi il prossimo messaggio non assegnato a nessun consumer
XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders:stream >
# 1) 1) "orders:stream"
# 2) 1) 1) "1710000001234-0"
# 2) 1) "user_id"
# 2) "1001"
# ...
# 3. Acknowledge (conferma processing completato)
XACK orders:stream order-processors 1710000001234-0
# (integer) 1 -- 1 messaggio acknowledged
# 4. XPENDING: vedi messaggi pendenti (non ancora acked)
XPENDING orders:stream order-processors - + 10
# Mostra: ID, consumer assegnato, tempo dall'ultimo delivery, delivery count
# 5. XCLAIM: riassegna messaggi idle a un altro consumer (dopo timeout)
XCLAIM orders:stream order-processors worker-2 60000 1710000001234-0
# Riassegna il messaggio se e' idle da piu di 60 secondi
# 6. XDEL: elimina messaggi specifici
XDEL orders:stream 1710000001234-0
# 7. XTRIM: mantieni solo gli ultimi N messaggi
XTRIM orders:stream MAXLEN 10000 # Mantieni ultimi 10.000 messaggi
XTRIM orders:stream MAXLEN ~ 10000 # Approssimativo (piu efficiente)
# Python: Consumer Group con at-least-once delivery
import redis
import json
import time
from typing import Callable
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
STREAM_KEY = 'orders:stream'
GROUP_NAME = 'order-processors'
PENDING_TIMEOUT_MS = 30000 # 30 secondi prima di ri-assegnare
class StreamConsumer:
def __init__(self, consumer_name: str, handler: Callable):
self.consumer_name = consumer_name
self.handler = handler
self._ensure_group()
def _ensure_group(self):
"""Crea il consumer group se non esiste."""
try:
r.xgroup_create(STREAM_KEY, GROUP_NAME, id='






