Pub/Sub vs Redis Streams: Fundamental Differences, Use Cases and Consumer Groups
Redis Pub/Sub is fire-and-forget; Streams is a persistent log with consumer groups — understanding the difference changes the architecture. Configure Consumer Groups for distributed processing and ensure that no messages are lost.
Two Messaging Paradigms in Redis
Redis offers two radically different messaging mechanisms. Pub/Sub it is a real-time broadcast system: messages are delivered to everyone connected subscribers at that moment and then forgotten. If a subscriber does not is connected, the message is lost forever.
Redis Streams (introduced in Redis 5.0) is an append-only log persistent inspired by the Apache Kafka stream concept. Messages are preserved as long as they are not explicitly deleted, they can be read again times and support Consumer Groups for distributed processing with acknowledgments.
What You Will Learn
- Pub/Sub: SUBSCRIBE, PUBLISH, PSUBSCRIBE with globbing pattern
- When to use Pub/Sub vs Streams: the key differences
- Streams: XADD, XREAD, XRANGE for append-only logs
- Consumer Groups: XGROUP CREATE, XREADGROUP, XACK
- At-least-once delivery and management of pending messages
- Pattern: event sourcing and audit log with Streams
Pub/Sub: Real Time Broadcasting
Pub/Sub is ideal for real-time notifications where the loss of some message is acceptable: live price updates, push notifications, chat room, distributed cache invalidation.
# Terminal 1: Subscriber
SUBSCRIBE notifications:user:1001
# Attende messaggi sul canale...
# Quando arriva un messaggio:
# 1) "message"
# 2) "notifications:user:1001"
# 3) "{"type":"payment","amount":100}"
# Puoi iscriverti a piu canali
SUBSCRIBE channel:1 channel:2 channel:3
# PSUBSCRIBE: subscribe con pattern glob
PSUBSCRIBE notifications:user:* # Tutti gli utenti
PSUBSCRIBE events:order:* # Tutti gli ordini
# Terminal 2: Publisher
PUBLISH notifications:user:1001 '{"type":"payment","amount":100}'
# (integer) 1 -- numero di subscriber che hanno ricevuto il messaggio
PUBLISH notifications:user:1001 '{"type":"login","ip":"192.168.1.1"}'
# (integer) 1
# Se nessun subscriber e connesso:
PUBLISH notifications:user:9999 '{"type":"alert"}'
# (integer) 0 -- messaggio PERSO, nessuno lo ricevera mai
# Python: Pub/Sub con redis-py
import redis
import json
import threading
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# --- SUBSCRIBER ---
def message_handler(message):
if message['type'] == 'message':
channel = message['channel']
data = json.loads(message['data'])
print(f"[{channel}] Received: {data}")
def start_subscriber(user_id: int):
"""Subscriber in background thread."""
sub = r.pubsub()
sub.subscribe(**{
f'notifications:user:{user_id}': message_handler,
'notifications:broadcast': message_handler,
})
# Pattern subscribe
sub.psubscribe(**{'events:*': message_handler})
# Blocca in ascolto (in un thread separato)
thread = sub.run_in_thread(sleep_time=0.01, daemon=True)
return thread, sub
# --- PUBLISHER ---
def publish_notification(user_id: int, notification_type: str, data: dict):
"""Pubblica una notifica a un utente specifico."""
payload = json.dumps({'type': notification_type, **data})
subscribers_count = r.publish(f'notifications:user:{user_id}', payload)
print(f"Delivered to {subscribers_count} subscribers")
return subscribers_count
# Avvia subscriber
thread, pubsub = start_subscriber(user_id=1001)
# Pubblica messaggi
import time
time.sleep(0.1) # Aspetta che subscriber sia pronto
publish_notification(1001, 'payment', {'amount': 100, 'currency': 'EUR'})
publish_notification(1001, 'login', {'ip': '192.168.1.1', 'city': 'Milano'})
# IMPORTANTE: Pub/Sub non e' persistente
# Se il subscriber si disconnette, perde tutti i messaggi inviati nel frattempo
time.sleep(0.5)
pubsub.unsubscribe()
thread.stop()
Redis Streams: Persistent and Append-Only Log
Streams uses a completely different model: each message has a unique ID (timestamp-sequence), is persisted in the log and can be reread at any time. It is the right choice for event sourcing, audit logs, task queues and any scenario where message loss is not acceptable.
# Redis Streams: XADD e XREAD
# XADD: aggiunge un messaggio allo stream
# ID auto-generato: {milliseconds}-{sequence}
XADD orders:stream * user_id 1001 product_id 555 quantity 2 total 89.99
# "1710000001234-0" <-- ID generato automaticamente
XADD orders:stream * user_id 1002 product_id 123 quantity 1 total 29.99
# "1710000002345-0"
# ID esplicito (raramente necessario)
XADD orders:stream 1710000003000-0 user_id 1003 product_id 789 quantity 3 total 149.97
# XLEN: numero di messaggi nello stream
XLEN orders:stream # 3
# XRANGE: leggi messaggi da un range di ID (o -)
XRANGE orders:stream - +
# 1) 1) "1710000001234-0"
# 2) 1) "user_id"
# 2) "1001"
# 3) "product_id"
# 4) "555"
# 5) "quantity"
# 6) "2"
# 7) "total"
# 8) "89.99"
# ...
# XRANGE con COUNT per paginazione
XRANGE orders:stream - + COUNT 10
# XREVRANGE: ordine inverso (dal piu recente)
XREVRANGE orders:stream + - COUNT 5
# XREAD: leggi messaggi dall'ultimo ID letto (blocking possible)
XREAD COUNT 10 STREAMS orders:stream 0 # Da ID 0, leggi al massimo 10
# Leggi solo messaggi nuovi ($ = solo futuri)
XREAD COUNT 10 BLOCK 5000 STREAMS orders:stream $
# Blocca per max 5 secondi aspettando nuovi messaggi
Consumer Groups: Distributed Processing with Acknowledgment
Consumer Groups are the most powerful feature of Redis Streams. They allow multiple consumers to process messages from the same stream in a distributed manner, with a guarantee that each message is processed by exactly one consumer in the group, and with automatic re-delivery of unacknowledged messages.
# Consumer Groups: setup e utilizzo
# 1. Crea il Consumer Group
# $ = inizia dai messaggi futuri, 0 = include messaggi esistenti
XGROUP CREATE orders:stream order-processors $ MKSTREAM
# "OK"
# Crea gruppo che inizia dall'inizio dello stream
XGROUP CREATE orders:stream order-processors-v2 0
# 2. Consumer legge messaggi dal gruppo
# > = leggi il prossimo messaggio non assegnato a nessun consumer
XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders:stream >
# 1) 1) "orders:stream"
# 2) 1) 1) "1710000001234-0"
# 2) 1) "user_id"
# 2) "1001"
# ...
# 3. Acknowledge (conferma processing completato)
XACK orders:stream order-processors 1710000001234-0
# (integer) 1 -- 1 messaggio acknowledged
# 4. XPENDING: vedi messaggi pendenti (non ancora acked)
XPENDING orders:stream order-processors - + 10
# Mostra: ID, consumer assegnato, tempo dall'ultimo delivery, delivery count
# 5. XCLAIM: riassegna messaggi idle a un altro consumer (dopo timeout)
XCLAIM orders:stream order-processors worker-2 60000 1710000001234-0
# Riassegna il messaggio se e' idle da piu di 60 secondi
# 6. XDEL: elimina messaggi specifici
XDEL orders:stream 1710000001234-0
# 7. XTRIM: mantieni solo gli ultimi N messaggi
XTRIM orders:stream MAXLEN 10000 # Mantieni ultimi 10.000 messaggi
XTRIM orders:stream MAXLEN ~ 10000 # Approssimativo (piu efficiente)
# Python: Consumer Group con at-least-once delivery
import redis
import json
import time
from typing import Callable
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
STREAM_KEY = 'orders:stream'
GROUP_NAME = 'order-processors'
PENDING_TIMEOUT_MS = 30000 # 30 secondi prima di ri-assegnare
class StreamConsumer:
def __init__(self, consumer_name: str, handler: Callable):
self.consumer_name = consumer_name
self.handler = handler
self._ensure_group()
def _ensure_group(self):
"""Crea il consumer group se non esiste."""
try:
r.xgroup_create(STREAM_KEY, GROUP_NAME, id='






