Problema atomicității în Redis

Redis are un singur thread pentru executarea comenzii: fiecare comandă vine executate atomic în raport cu celelalte. Dar când trebuie să fugi o secvență de comenzi ca o singură operație atomică (citire-modificare-scriere), Problemele legate de starea cursei apar între clienții concurenți.

Redis oferă trei soluții: MULTI/EXEC (tranzacții optimiste cu CEAS), EVAL (Scripting Lua, cea mai flexibilă soluție) e cel nou Redis Functions API (Redis 7+, scripturi Lua ca biblioteci permanent cu versiunea).

Ce vei învăța

  • MULTI/EXEC: Tranzacții optimiste și limitări
  • EVAL: Rulați scripturile Lua direct în Redis
  • EVALSHA: rulați scripturi stocate în cache (zero suprasarcină de rețea)
  • CHEI și ARGV în Lua: cum să treci parametrii
  • Modele: comparare și schimbare, limitare a ratei atomice, blocare distribuită
  • Funcții Redis: biblioteci permanente Lua în Redis 7+

MULTI/EXEC: Tranzacții optimiste

MULTI/EXEC grupează comenzile într-o coadă care se execută atomic. WATCH adaugă blocare optimistă: dacă cheia urmărită este schimbată de la alt client înainte de EXEC, tranzacția este anulată.

# MULTI/EXEC: transazione base
MULTI
SET counter 0
INCR counter
INCR counter
EXEC
# 1) OK
# 2) (integer) 1
# 3) (integer) 2

# WATCH + MULTI/EXEC: optimistic locking
WATCH balance:user:1001
# Leggi il saldo corrente
GET balance:user:1001   # "1000"

# Se nessuno modifica balance:user:1001 prima di EXEC, la transazione esegue
MULTI
DECRBY balance:user:1001 100
INCRBY balance:user:1002 100
EXEC
# Se balance:user:1001 era stato modificato nel frattempo:
# (nil) -- transazione annullata, il client deve riprovare

# DISCARD: annulla una transazione in corso
MULTI
SET key1 value1
DISCARD   # Annulla tutto

Limitarea MULTI/EXEC este că comenzile sunt puse în coadă, dar nu puteți utiliza rezultatul unei comenzi ca intrare pentru următoarea în aceeași comandă tranzacție. Pentru asta este scriptingul Lua.

EVAL: Atomic Lua Scripting

EVAL rulează un script Lua direct pe serverul Redis. Scriptul rulează atomic: nicio altă comandă nu poate întrerupe-l. Puteți citi valori, puteți face calcule și puteți scrie rezultate într-o singură operaţie atomică.

# EVAL sintassi: EVAL script numkeys key [key ...] arg [arg ...]
# KEYS[1], KEYS[2], ... per i nomi delle chiavi
# ARGV[1], ARGV[2], ... per gli argomenti aggiuntivi

# Esempio 1: incremento condizionale
# Incrementa solo se il valore e' minore di un massimo
EVAL "
  local current = redis.call('GET', KEYS[1])
  if current == false then
    redis.call('SET', KEYS[1], ARGV[1])
    return tonumber(ARGV[1])
  end
  local val = tonumber(current)
  local max = tonumber(ARGV[2])
  if val < max then
    return redis.call('INCR', KEYS[1])
  end
  return val
" 1 counter:requests 0 100
# Incrementa counter:requests (KEYS[1]) se < 100 (ARGV[2])
# Default iniziale = 0 (ARGV[1])

# Esempio 2: get-set-expire atomico (GETSET + EXPIRE)
EVAL "
  local old = redis.call('GET', KEYS[1])
  redis.call('SET', KEYS[1], ARGV[1])
  redis.call('EXPIRE', KEYS[1], ARGV[2])
  return old
" 1 session:token:abc123 "new_value" 3600

# Esempio 3: compare-and-swap (CAS)
EVAL "
  if redis.call('GET', KEYS[1]) == ARGV[1] then
    redis.call('SET', KEYS[1], ARGV[2])
    return 1
  else
    return 0
  end
" 1 config:version "v1" "v2"
# Ritorna 1 se CAS riuscito, 0 se il valore era gia' cambiato
# Python: EVAL con redis-py
import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Script Lua per compare-and-swap atomico
CAS_SCRIPT = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
  redis.call('SET', KEYS[1], ARGV[2])
  return 1
else
  return 0
end
"""

def compare_and_swap(key: str, expected: str, new_value: str) -> bool:
    """Aggiorna key a new_value solo se il valore corrente e' expected."""
    result = r.eval(CAS_SCRIPT, 1, key, expected, new_value)
    return bool(result)

# Uso
r.set('config:feature-flag', 'disabled')

success = compare_and_swap('config:feature-flag', 'disabled', 'enabled')
print(f"CAS succeeded: {success}")  # True

# Secondo tentativo con valore sbagliato
success2 = compare_and_swap('config:feature-flag', 'disabled', 'enabled')
print(f"CAS succeeded: {success2}")  # False (era gia' 'enabled')

EVALSHA: Script cache pentru performanță

De fiecare dată când suni EVAL, scriptul este trimis prin rețea către Redis. Pentru scripturile denumite frecvent, EVALSHA reduce traficul de rețea: clientul trimite doar hash-ul SHA1 al scriptului (40 de caractere) în loc de întregul cod. Redis menține un cache de scripturi încărcate cu SCRIPT LOAD.

# SCRIPT LOAD: carica lo script nel server, ottieni SHA1
SCRIPT LOAD "
  if redis.call('GET', KEYS[1]) == ARGV[1] then
    redis.call('SET', KEYS[1], ARGV[2])
    return 1
  else
    return 0
  end
"
# "7d5c3d9b8e5d5f9b3c3d9b8e5d5f9b3c3d9b8e5d"  (SHA1 hash)

# EVALSHA: esegui con solo l'hash (molto piu' efficiente in rete)
EVALSHA "7d5c3d9b8e5d5f9b3c3d9b8e5d5f9b3c3d9b8e5d" 1 key "expected" "new"

# SCRIPT EXISTS: verifica se uno script e' in cache
SCRIPT EXISTS "7d5c3d9b8e5d5f9b3c3d9b8e5d5f9b3c3d9b8e5d"
# 1) (integer) 1

# SCRIPT FLUSH: rimuove tutti gli script dalla cache (utile in test)
SCRIPT FLUSH
# Python: gestione SHA con fallback automatico
import redis
import hashlib

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

class LuaScript:
    """Wrapper per script Lua con caching SHA automatico."""

    def __init__(self, script: str):
        self.script = script
        self.sha = None

    def _load(self) -> str:
        """Carica lo script e memorizza lo SHA."""
        self.sha = r.script_load(self.script)
        return self.sha

    def execute(self, keys: list, args: list):
        """Esegue lo script, con fallback a EVAL se SHA non in cache."""
        if self.sha is None:
            self._load()
        try:
            return r.evalsha(self.sha, len(keys), *keys, *args)
        except redis.exceptions.NoScriptError:
            # Script non piu' in cache (SCRIPT FLUSH chiamato)
            self._load()
            return r.evalsha(self.sha, len(keys), *keys, *args)

# Script rate limiter atomico: sliding window
RATE_LIMIT_SCRIPT = LuaScript("""
local key = KEYS[1]
local window = tonumber(ARGV[1])   -- finestra in secondi
local max_requests = tonumber(ARGV[2])
local now = tonumber(ARGV[3])      -- timestamp corrente in ms

-- Rimuovi entries scadute
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)

-- Conta requests nella finestra
local count = redis.call('ZCARD', key)

if count < max_requests then
  -- Aggiungi la request corrente
  redis.call('ZADD', key, now, now)
  redis.call('EXPIRE', key, window)
  return 1  -- Allowed
else
  return 0  -- Rate limited
end
""")

def check_rate_limit(user_id: str, window_sec: int = 60, max_req: int = 100) -> bool:
    """Rate limiting sliding window atomico via Lua. Ritorna True se allowed."""
    import time
    now_ms = int(time.time() * 1000)
    key = f"ratelimit:{user_id}"
    result = RATE_LIMIT_SCRIPT.execute(
        keys=[key],
        args=[str(window_sec), str(max_req), str(now_ms)],
    )
    return bool(result)

# Test
for i in range(5):
    allowed = check_rate_limit("user:1001", window_sec=60, max_req=3)
    print(f"Request {i+1}: {'ALLOWED' if allowed else 'RATE LIMITED'}")

Model: Blocare distribuită cu Lua

Blocarea distribuită este unul dintre cele mai importante modele care pot fi implementate cu Lua. Algoritmul Redlock folosește SETNX + EXPIRE atomic pentru a obține încuietori, și un script Lua pentru emitere care verifică jetonul înainte de ștergere.

# Distributed Lock: SETNX + EXPIRE atomici
# SET key value NX PX milliseconds (Redis 2.6.12+)
SET lock:resource:order-1234 "worker-uuid-abc" NX PX 30000
# OK se il lock e' stato acquisito, nil altrimenti

# Script Lua per rilascio ATOMICO (verifica token prima di cancellare)
EVAL "
  if redis.call('GET', KEYS[1]) == ARGV[1] then
    return redis.call('DEL', KEYS[1])
  else
    return 0
  end
" 1 lock:resource:order-1234 "worker-uuid-abc"
# Python: Distributed Lock context manager
import redis
import uuid
import time
from contextlib import contextmanager

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

RELEASE_LOCK_SCRIPT = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
  return redis.call('DEL', KEYS[1])
else
  return 0
end
"""

@contextmanager
def distributed_lock(resource: str, timeout_ms: int = 30000, retry_ms: int = 100):
    """
    Context manager per distributed lock.
    Usa SET NX PX per acquisizione atomica.
    Usa Lua per rilascio atomico (evita rilascio di lock altrui).
    """
    lock_key = f"lock:{resource}"
    token = str(uuid.uuid4())
    acquired = False

    try:
        # Tentativo di acquisizione lock
        acquired = r.set(lock_key, token, nx=True, px=timeout_ms)
        if not acquired:
            raise Exception(f"Could not acquire lock on {resource}")

        yield token  # Esegui codice protetto

    finally:
        if acquired:
            # Rilascio atomico: cancella solo se il token e' ancora il nostro
            released = r.eval(RELEASE_LOCK_SCRIPT, 1, lock_key, token)
            if not released:
                # Il lock e' scaduto e qualcun altro lo ha acquisito nel frattempo
                print(f"WARNING: Lock {lock_key} expired before release")

# Uso
def process_order(order_id: str):
    with distributed_lock(f"order:{order_id}", timeout_ms=10000):
        # Questo codice e' eseguito in modo esclusivo
        print(f"Processing order {order_id}...")
        time.sleep(0.5)  # Simula lavoro
        print(f"Order {order_id} processed")

# Due worker concorrenti
import threading
t1 = threading.Thread(target=process_order, args=("ORD-123",))
t2 = threading.Thread(target=process_order, args=("ORD-123",))
t1.start()
t2.start()
t1.join()
t2.join()

Funcții Redis: biblioteci permanente Lua (Redis 7+)

Redis 7.0 introduce Functions, un sistem de încărcare a bibliotecilor Lua precum cod permanent pe server. Spre deosebire de EVAL/EVALSHA, Funcțiile supraviețuiesc repornirilor Redis, acceptă versiunea și activează pentru a organiza scripturile conexe în biblioteci cu spații de nume.

# Redis Functions: caricare una libreria

# Definisci la libreria in Lua (sintassi Redis 7+)
FUNCTION LOAD "#!lua name=mylib\n
local function rate_limit(keys, args)
  local key = keys[1]
  local window = tonumber(args[1])
  local max_req = tonumber(args[2])
  local now = tonumber(args[3])

  redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
  local count = redis.call('ZCARD', key)

  if count < max_req then
    redis.call('ZADD', key, now, now)
    redis.call('EXPIRE', key, window)
    return 1
  end
  return 0
end

local function atomic_cas(keys, args)
  if redis.call('GET', keys[1]) == args[1] then
    redis.call('SET', keys[1], args[2])
    return 1
  end
  return 0
end

redis.register_function('rate_limit', rate_limit)
redis.register_function('atomic_cas', atomic_cas)
"

# Chiama la funzione con FCALL
FCALL rate_limit 1 ratelimit:user:1001 60 100 1710000000000

# Lista librerie caricate
FUNCTION LIST

# Elimina una libreria
FUNCTION DELETE mylib

# Dump e restore (per backup/migration)
FUNCTION DUMP   # Ritorna binary dump
FUNCTION RESTORE [FLUSH] [APPEND] [REPLACE] <payload>
# Python: Redis Functions con redis-py
import redis
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

LIBRARY_CODE = """#!lua name=ratelimit_lib

local function sliding_window_rl(keys, args)
  local key = keys[1]
  local window = tonumber(args[1])
  local max_req = tonumber(args[2])
  local now = tonumber(args[3])

  redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
  local count = redis.call('ZCARD', key)

  if count < max_req then
    redis.call('ZADD', key, now, tostring(now))
    redis.call('EXPIRE', key, window + 1)
    return {1, max_req - count - 1}  -- {allowed, remaining}
  end

  local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
  local retry_after = math.ceil((tonumber(oldest[2]) + window * 1000 - now) / 1000)
  return {0, 0, retry_after}  -- {blocked, remaining, retry_after_sec}
end

redis.register_function('sliding_rl', sliding_window_rl)
"""

def load_library():
    """Carica o ricarica la libreria Functions."""
    try:
        r.function_load(LIBRARY_CODE)
    except redis.exceptions.ResponseError as e:
        if 'Library already exists' in str(e):
            r.function_load(LIBRARY_CODE, replace=True)
        else:
            raise

def check_rate_limit_fn(user_id: str, window: int = 60, max_req: int = 10):
    """Usa Redis Function per rate limiting sliding window."""
    key = f"rl:user:{user_id}"
    now_ms = int(time.time() * 1000)
    result = r.fcall('sliding_rl', 1, key, str(window), str(max_req), str(now_ms))
    allowed, remaining = result[0], result[1]
    retry_after = result[2] if len(result) > 2 else None
    return {
        'allowed': bool(allowed),
        'remaining': remaining,
        'retry_after': retry_after,
    }

# Setup
load_library()

# Test
for i in range(12):
    result = check_rate_limit_fn("user:1001", window=60, max_req=10)
    status = "OK" if result['allowed'] else f"BLOCKED (retry in {result['retry_after']}s)"
    print(f"Request {i+1}: {status}, remaining: {result['remaining']}")

MULTI/EXEC vs Lua vs Funcții: comparație

Ce mecanism să alegeți

  • MULTI/EXEC: Secvențe de comenzi simple fără logică condiționată. Nu sunt utilizate rezultate intermediare ca intrare pentru comenzile ulterioare în aceeași tranzacție.
  • EVALUARE: Logica condiționată complexă, citire-modificare-scriere atomică, prototipare rapidă. Script inline în codul aplicației.
  • EVALSHA: La fel ca EVAL, dar reduce sarcina utilă a rețelei pentru scripturile frecvent numite. Necesită managementul cache-ului SHA.
  • Funcții (Redis 7+): Scripturi ca primă clasă pe server: supraviețuiesc la reporniri, versionabile, organizabile în biblioteci. Pentru cod stabil, partajat în mai multe aplicații.

Concluzii

Scripting-ul Lua în Redis este instrumentul definitiv pentru atomicitatea complexă. În timp ce MULTI/EXEC acoperă secvențe simple, EVAL vă permite să implementați modele caracteristici sofisticate, cum ar fi limitarea ratei, blocările distribuite și compararea și schimbarea fără nu vă faceți niciodată griji pentru condițiile de cursă. Redis Functions aduce aceste modele la un nivel superior, tratând codul Lua ca infrastructură cu versiuni în loc de șiruri de caractere codificate în codul aplicației.

Articole viitoare din seria Redis

  • Articolul 4: Limitare avansată a ratei — Token Bucket, Fereastra glisantă pe Redis
  • Articolul 5: Gestionarea sesiunii și modele de cache — Cache-Aside, Write-Through
  • Articolul 6: RedisJSON și RediSearch — documente JSON și căutare integrală