Redis의 두 가지 메시징 패러다임

Redis는 근본적으로 다른 두 가지 메시징 메커니즘을 제공합니다. 게시/구독 실시간 방송 시스템입니다. 모든 사람에게 메시지가 전달됩니다. 연결된 가입자 그 순간 그러다가 잊어버렸어요. 가입자가 그렇지 않은 경우 연결되면 메시지가 영원히 손실됩니다.

레디스 스트림 (Redis 5.0에서 도입됨)은 추가 전용 로그입니다. Apache Kafka 스트림 개념에서 지속적으로 영감을 받았습니다. 메시지는 보존됩니다 명시적으로 삭제하지 않는 한 다시 읽을 수 있습니다. 승인을 통한 분산 처리를 위해 소비자 그룹을 지원합니다.

무엇을 배울 것인가

  • Pub/Sub: 글로빙 패턴을 사용한 SUBSCRIBE, PUBLISH, PSUBSCRIBE
  • Pub/Sub와 Streams를 사용하는 경우: 주요 차이점
  • 스트림: 추가 전용 로그용 XADD, XREAD, XRANGE
  • 소비자 그룹: XGROUP CREATE, XREADGROUP, XACK
  • 대기 메시지 최소 1회 전달 및 관리
  • 패턴: Streams를 사용한 이벤트 소싱 및 감사 로그

Pub/Sub: 실시간 방송

Pub/Sub는 일부 정보가 손실된 실시간 알림에 이상적입니다. 메시지는 허용됩니다: 실시간 가격 업데이트, 푸시 알림, 채팅방, 분산캐시 무효화.

# Terminal 1: Subscriber
SUBSCRIBE notifications:user:1001
# Attende messaggi sul canale...
# Quando arriva un messaggio:
# 1) "message"
# 2) "notifications:user:1001"
# 3) "{"type":"payment","amount":100}"

# Puoi iscriverti a piu canali
SUBSCRIBE channel:1 channel:2 channel:3

# PSUBSCRIBE: subscribe con pattern glob
PSUBSCRIBE notifications:user:*    # Tutti gli utenti
PSUBSCRIBE events:order:*          # Tutti gli ordini

# Terminal 2: Publisher
PUBLISH notifications:user:1001 '{"type":"payment","amount":100}'
# (integer) 1  -- numero di subscriber che hanno ricevuto il messaggio

PUBLISH notifications:user:1001 '{"type":"login","ip":"192.168.1.1"}'
# (integer) 1

# Se nessun subscriber e connesso:
PUBLISH notifications:user:9999 '{"type":"alert"}'
# (integer) 0  -- messaggio PERSO, nessuno lo ricevera mai
# Python: Pub/Sub con redis-py
import redis
import json
import threading

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# --- SUBSCRIBER ---
def message_handler(message):
    if message['type'] == 'message':
        channel = message['channel']
        data = json.loads(message['data'])
        print(f"[{channel}] Received: {data}")

def start_subscriber(user_id: int):
    """Subscriber in background thread."""
    sub = r.pubsub()
    sub.subscribe(**{
        f'notifications:user:{user_id}': message_handler,
        'notifications:broadcast': message_handler,
    })
    # Pattern subscribe
    sub.psubscribe(**{'events:*': message_handler})

    # Blocca in ascolto (in un thread separato)
    thread = sub.run_in_thread(sleep_time=0.01, daemon=True)
    return thread, sub

# --- PUBLISHER ---
def publish_notification(user_id: int, notification_type: str, data: dict):
    """Pubblica una notifica a un utente specifico."""
    payload = json.dumps({'type': notification_type, **data})
    subscribers_count = r.publish(f'notifications:user:{user_id}', payload)
    print(f"Delivered to {subscribers_count} subscribers")
    return subscribers_count

# Avvia subscriber
thread, pubsub = start_subscriber(user_id=1001)

# Pubblica messaggi
import time
time.sleep(0.1)  # Aspetta che subscriber sia pronto

publish_notification(1001, 'payment', {'amount': 100, 'currency': 'EUR'})
publish_notification(1001, 'login', {'ip': '192.168.1.1', 'city': 'Milano'})

# IMPORTANTE: Pub/Sub non e' persistente
# Se il subscriber si disconnette, perde tutti i messaggi inviati nel frattempo
time.sleep(0.5)
pubsub.unsubscribe()
thread.stop()

Redis Streams: 영구 및 추가 전용 로그

Streams는 완전히 다른 모델을 사용합니다. 각 메시지에는 고유한 ID가 있습니다. (타임스탬프-시퀀스), 로그에 유지되며 다시 읽을 수 있습니다. 언제든지. 이벤트 소싱, 감사 로그, 작업 대기열 및 메시지 손실이 허용되지 않는 모든 시나리오.

# Redis Streams: XADD e XREAD

# XADD: aggiunge un messaggio allo stream
# ID auto-generato: {milliseconds}-{sequence}
XADD orders:stream * user_id 1001 product_id 555 quantity 2 total 89.99
# "1710000001234-0"  <-- ID generato automaticamente

XADD orders:stream * user_id 1002 product_id 123 quantity 1 total 29.99
# "1710000002345-0"

# ID esplicito (raramente necessario)
XADD orders:stream 1710000003000-0 user_id 1003 product_id 789 quantity 3 total 149.97

# XLEN: numero di messaggi nello stream
XLEN orders:stream    # 3

# XRANGE: leggi messaggi da un range di ID (o -)
XRANGE orders:stream - +
# 1) 1) "1710000001234-0"
#    2) 1) "user_id"
#       2) "1001"
#       3) "product_id"
#       4) "555"
#       5) "quantity"
#       6) "2"
#       7) "total"
#       8) "89.99"
# ...

# XRANGE con COUNT per paginazione
XRANGE orders:stream - + COUNT 10

# XREVRANGE: ordine inverso (dal piu recente)
XREVRANGE orders:stream + - COUNT 5

# XREAD: leggi messaggi dall'ultimo ID letto (blocking possible)
XREAD COUNT 10 STREAMS orders:stream 0   # Da ID 0, leggi al massimo 10

# Leggi solo messaggi nuovi ($ = solo futuri)
XREAD COUNT 10 BLOCK 5000 STREAMS orders:stream $
# Blocca per max 5 secondi aspettando nuovi messaggi

소비자 그룹: 승인을 통한 분산 처리

소비자 그룹은 Redis Streams의 가장 강력한 기능입니다. 그들은 허용한다 여러 소비자가 동일한 스트림의 메시지를 분산 방식으로 처리합니다. 각 메시지는 그룹 내 정확히 한 명의 소비자에 의해 처리된다는 보장이 있습니다. 확인되지 않은 메시지를 자동으로 다시 전달합니다.

# Consumer Groups: setup e utilizzo

# 1. Crea il Consumer Group
# $ = inizia dai messaggi futuri, 0 = include messaggi esistenti
XGROUP CREATE orders:stream order-processors $ MKSTREAM
# "OK"

# Crea gruppo che inizia dall'inizio dello stream
XGROUP CREATE orders:stream order-processors-v2 0

# 2. Consumer legge messaggi dal gruppo
# > = leggi il prossimo messaggio non assegnato a nessun consumer
XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders:stream >
# 1) 1) "orders:stream"
#    2) 1) 1) "1710000001234-0"
#          2) 1) "user_id"
#             2) "1001"
#             ...

# 3. Acknowledge (conferma processing completato)
XACK orders:stream order-processors 1710000001234-0
# (integer) 1  -- 1 messaggio acknowledged

# 4. XPENDING: vedi messaggi pendenti (non ancora acked)
XPENDING orders:stream order-processors - + 10
# Mostra: ID, consumer assegnato, tempo dall'ultimo delivery, delivery count

# 5. XCLAIM: riassegna messaggi idle a un altro consumer (dopo timeout)
XCLAIM orders:stream order-processors worker-2 60000 1710000001234-0
# Riassegna il messaggio se e' idle da piu di 60 secondi

# 6. XDEL: elimina messaggi specifici
XDEL orders:stream 1710000001234-0

# 7. XTRIM: mantieni solo gli ultimi N messaggi
XTRIM orders:stream MAXLEN 10000    # Mantieni ultimi 10.000 messaggi
XTRIM orders:stream MAXLEN ~ 10000  # Approssimativo (piu efficiente)
# Python: Consumer Group con at-least-once delivery
import redis
import json
import time
from typing import Callable

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

STREAM_KEY = 'orders:stream'
GROUP_NAME = 'order-processors'
PENDING_TIMEOUT_MS = 30000  # 30 secondi prima di ri-assegnare

class StreamConsumer:
    def __init__(self, consumer_name: str, handler: Callable):
        self.consumer_name = consumer_name
        self.handler = handler
        self._ensure_group()

    def _ensure_group(self):
        """Crea il consumer group se non esiste."""
        try:
            r.xgroup_create(STREAM_KEY, GROUP_NAME, id=', mkstream=True)
            print(f"Created consumer group: {GROUP_NAME}")
        except redis.exceptions.ResponseError as e:
            if 'BUSYGROUP' not in str(e):
                raise
            # Gruppo gia' esiste: ok

    def process_messages(self, batch_size: int = 10) -> int:
        """Processa un batch di messaggi. Ritorna il numero processato."""
        processed = 0

        # Prima: controlla e riassegna messaggi pending di altri consumer (crashati)
        self._reclaim_pending()

        # Leggi nuovi messaggi non assegnati
        results = r.xreadgroup(
            groupname=GROUP_NAME,
            consumername=self.consumer_name,
            streams={STREAM_KEY: '>'},
            count=batch_size,
            block=1000,  # Blocca 1 secondo se non ci sono messaggi
        )

        if not results:
            return 0

        for stream_name, messages in results:
            for message_id, fields in messages:
                try:
                    self.handler(message_id, fields)
                    # Acknowledge dopo processing riuscito
                    r.xack(STREAM_KEY, GROUP_NAME, message_id)
                    processed += 1
                except Exception as e:
                    print(f"Error processing {message_id}: {e}")
                    # Non acknowledge: il messaggio rimane pending
                    # Verra riassegnato dopo PENDING_TIMEOUT_MS

        return processed

    def _reclaim_pending(self):
        """Riassegna a se' stesso messaggi idle di altri consumer."""
        pending = r.xpending_range(
            STREAM_KEY, GROUP_NAME,
            min='-', max='+', count=10,
        )
        for p in pending:
            if p['time_since_delivered'] > PENDING_TIMEOUT_MS:
                if p['name'] != self.consumer_name:
                    # Riassegna il messaggio a questo consumer
                    r.xclaim(
                        STREAM_KEY, GROUP_NAME, self.consumer_name,
                        PENDING_TIMEOUT_MS, [p['message_id']],
                    )
                    print(f"Reclaimed {p['message_id']} from {p['name']}")

# --- Producer ---
class OrderEventProducer:
    def publish_order(self, order: dict) -> str:
        """Pubblica un evento ordine nello stream."""
        message_id = r.xadd(STREAM_KEY, order)
        return message_id

# --- Utilizzo ---
def handle_order(message_id: str, fields: dict):
    print(f"Processing order {message_id}: {fields}")
    # Logica di business (es. email conferma, aggiornamento inventario)
    time.sleep(0.01)  # Simula lavoro

producer = OrderEventProducer()
consumer = StreamConsumer('worker-1', handle_order)

# Pubblica 5 ordini
for i in range(5):
    msg_id = producer.publish_order({
        'user_id': str(1000 + i),
        'total': str(50.0 + i * 10),
        'status': 'pending',
    })
    print(f"Published: {msg_id}")

# Processa messaggi in loop
while True:
    count = consumer.process_messages(batch_size=5)
    if count == 0:
        break  # Nessun messaggio rimasto

비교: Pub/Sub와 스트림을 사용하는 경우

Pub/Sub: 시기 선택

  • 일부 부재중 메시지가 허용되는 경우 실시간 알림
  • 분산 캐시 무효화(노드가 수신하지 않으면 다음 요청이 자체적으로 무효화됨)
  • 채팅 또는 실시간 상태 확인(오프라인인 경우 무슨 일이 일어났는지 상관하지 않음)
  • 실시간 금융 가격/티커 업데이트
  • 매우 짧은 지연 시간이 필요하고 손실을 견딜 수 있는 경우

스트림: 시기 선택

  • 각 메시지는 최소 한 번 이상 처리되어야 합니다(주문, 결제).
  • 재생이 필요합니다(디버깅 또는 재구축을 위해 과거 이벤트 다시 읽기).
  • 여러 작업자를 병렬로 사용하는 분산 처리(소비자 그룹)
  • 감사 로그 및 이벤트 소싱(이벤트 기록이 유실되어서는 안 됨)
  • 오프라인으로 전환할 수 있고 손실된 메시지를 복구해야 하는 소비자
# Riepilogo comandi chiave

# PUB/SUB
SUBSCRIBE channel             # Iscriviti a un canale
PSUBSCRIBE pattern:*          # Iscriviti con pattern glob
PUBLISH channel message       # Pubblica (fire-and-forget)
UNSUBSCRIBE channel           # Disiscrivi

# STREAMS - Producer
XADD stream * field value     # Aggiunge messaggio con ID auto
XLEN stream                   # Lunghezza stream
XTRIM stream MAXLEN ~ 10000   # Tronca stream

# STREAMS - Consumer semplice
XREAD COUNT 10 STREAMS stream 0         # Leggi da inizio
XREAD BLOCK 5000 STREAMS stream $       # Blocca aspettando nuovi

# STREAMS - Consumer Groups
XGROUP CREATE stream group $ MKSTREAM    # Crea gruppo
XREADGROUP GROUP group consumer STREAMS stream >  # Leggi non assegnati
XACK stream group message-id             # Acknowledge
XPENDING stream group - + 10            # Lista pending
XCLAIM stream group consumer 30000 id   # Riassegna dopo timeout

결론

Pub/Sub와 Streams 사이의 선택은 결국 하나의 질문으로 귀결됩니다. 메시지를 놓치셨나요? 그렇다면 Pub/Sub는 지연 시간을 제공합니다. 가능한 한 낮게. 그렇지 않은 경우 Streams with Consumer Groups는 보증을 제공합니다. Redis의 운영 단순성과 함께 강력한 제공이 가능합니다. 다음 이 기사에서는 복잡한 원자 작업을 위한 Lua 스크립팅을 살펴봅니다.

Redis 시리즈의 향후 기사

  • 제3조: Lua 스크립팅 — 원자적 연산, EVAL 및 Redis 함수 API
  • 제4조: 속도 제한 — 토큰 버킷, 슬라이딩 윈도우, 고정 카운터
  • 제5조: 세션 관리 및 캐시 패턴 — 캐시 배제, 연속 기입