Atomický problém v Redis

Redis je pro provádění příkazů jednovláknový: každý příkaz přichází provedeny atomicky s ohledem na ostatní. Ale když potřebujete běžet sekvence příkazů jako jedna atomická operace (čtení-upravování-zápis), Problémy s rasovou kondicí vznikají mezi konkurenčními klienty.

Redis nabízí tři řešení: MULTI/EXEC (optimistické transakce s SLEDOVAT), EVAL (Lua skriptování, nejflexibilnější řešení) e ten nový Redis Functions API (Redis 7+, skripty Lua jako knihovny trvalé s verzováním).

Co se naučíte

  • MULTI/EXEC: Optimistické transakce a omezení
  • EVAL: Spouštějte skripty Lua přímo v Redis
  • EVALSHA: Spouštění skriptů uložených v mezipaměti (nulová režie sítě)
  • KEYS a ARGV v Lua: jak předávat parametry
  • Vzory: porovnání a výměna, omezení atomové rychlosti, distribuované zamykání
  • Funkce Redis: Trvalé knihovny Lua v Redis 7+

MULTI/EXEC: Optimistické transakce

MULTI/EXEC seskupuje příkazy do fronty, která se provádí atomicky. WATCH přidává optimistické zamykání: pokud se změní sledovaný klíč od jiného klienta před EXEC, transakce je zrušena.

# MULTI/EXEC: transazione base
MULTI
SET counter 0
INCR counter
INCR counter
EXEC
# 1) OK
# 2) (integer) 1
# 3) (integer) 2

# WATCH + MULTI/EXEC: optimistic locking
WATCH balance:user:1001
# Leggi il saldo corrente
GET balance:user:1001   # "1000"

# Se nessuno modifica balance:user:1001 prima di EXEC, la transazione esegue
MULTI
DECRBY balance:user:1001 100
INCRBY balance:user:1002 100
EXEC
# Se balance:user:1001 era stato modificato nel frattempo:
# (nil) -- transazione annullata, il client deve riprovare

# DISCARD: annulla una transazione in corso
MULTI
SET key1 value1
DISCARD   # Annulla tutto

Omezení MULTI/EXEC spočívá v tom, že příkazy jsou zařazeny do fronty, ale nikoli výsledek jednoho příkazu můžete použít jako vstup pro další ve stejném příkazu transakce. K tomu slouží Lua scripting.

EVAL: Atomic Lua Scripting

EVAL spustí skript Lua přímo na serveru Redis. Skript běží atomicky: žádný jiný příkaz nemůže přerušit to. Můžete číst hodnoty, provádět výpočty a zapisovat výsledky v jediné atomové operaci.

# EVAL sintassi: EVAL script numkeys key [key ...] arg [arg ...]
# KEYS[1], KEYS[2], ... per i nomi delle chiavi
# ARGV[1], ARGV[2], ... per gli argomenti aggiuntivi

# Esempio 1: incremento condizionale
# Incrementa solo se il valore e' minore di un massimo
EVAL "
  local current = redis.call('GET', KEYS[1])
  if current == false then
    redis.call('SET', KEYS[1], ARGV[1])
    return tonumber(ARGV[1])
  end
  local val = tonumber(current)
  local max = tonumber(ARGV[2])
  if val < max then
    return redis.call('INCR', KEYS[1])
  end
  return val
" 1 counter:requests 0 100
# Incrementa counter:requests (KEYS[1]) se < 100 (ARGV[2])
# Default iniziale = 0 (ARGV[1])

# Esempio 2: get-set-expire atomico (GETSET + EXPIRE)
EVAL "
  local old = redis.call('GET', KEYS[1])
  redis.call('SET', KEYS[1], ARGV[1])
  redis.call('EXPIRE', KEYS[1], ARGV[2])
  return old
" 1 session:token:abc123 "new_value" 3600

# Esempio 3: compare-and-swap (CAS)
EVAL "
  if redis.call('GET', KEYS[1]) == ARGV[1] then
    redis.call('SET', KEYS[1], ARGV[2])
    return 1
  else
    return 0
  end
" 1 config:version "v1" "v2"
# Ritorna 1 se CAS riuscito, 0 se il valore era gia' cambiato
# Python: EVAL con redis-py
import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Script Lua per compare-and-swap atomico
CAS_SCRIPT = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
  redis.call('SET', KEYS[1], ARGV[2])
  return 1
else
  return 0
end
"""

def compare_and_swap(key: str, expected: str, new_value: str) -> bool:
    """Aggiorna key a new_value solo se il valore corrente e' expected."""
    result = r.eval(CAS_SCRIPT, 1, key, expected, new_value)
    return bool(result)

# Uso
r.set('config:feature-flag', 'disabled')

success = compare_and_swap('config:feature-flag', 'disabled', 'enabled')
print(f"CAS succeeded: {success}")  # True

# Secondo tentativo con valore sbagliato
success2 = compare_and_swap('config:feature-flag', 'disabled', 'enabled')
print(f"CAS succeeded: {success2}")  # False (era gia' 'enabled')

EVALSHA: Skript pro výkon v mezipaměti

Pokaždé, když zavoláte EVAL, skript je odeslán přes síť do Redis. U často volaných skriptů EVALSHA snižuje provoz v síti: klient místo celého kódu odešle pouze hash SHA1 skriptu (40 znaků). Redis udržuje mezipaměť načtených skriptů SCRIPT LOAD.

# SCRIPT LOAD: carica lo script nel server, ottieni SHA1
SCRIPT LOAD "
  if redis.call('GET', KEYS[1]) == ARGV[1] then
    redis.call('SET', KEYS[1], ARGV[2])
    return 1
  else
    return 0
  end
"
# "7d5c3d9b8e5d5f9b3c3d9b8e5d5f9b3c3d9b8e5d"  (SHA1 hash)

# EVALSHA: esegui con solo l'hash (molto piu' efficiente in rete)
EVALSHA "7d5c3d9b8e5d5f9b3c3d9b8e5d5f9b3c3d9b8e5d" 1 key "expected" "new"

# SCRIPT EXISTS: verifica se uno script e' in cache
SCRIPT EXISTS "7d5c3d9b8e5d5f9b3c3d9b8e5d5f9b3c3d9b8e5d"
# 1) (integer) 1

# SCRIPT FLUSH: rimuove tutti gli script dalla cache (utile in test)
SCRIPT FLUSH
# Python: gestione SHA con fallback automatico
import redis
import hashlib

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

class LuaScript:
    """Wrapper per script Lua con caching SHA automatico."""

    def __init__(self, script: str):
        self.script = script
        self.sha = None

    def _load(self) -> str:
        """Carica lo script e memorizza lo SHA."""
        self.sha = r.script_load(self.script)
        return self.sha

    def execute(self, keys: list, args: list):
        """Esegue lo script, con fallback a EVAL se SHA non in cache."""
        if self.sha is None:
            self._load()
        try:
            return r.evalsha(self.sha, len(keys), *keys, *args)
        except redis.exceptions.NoScriptError:
            # Script non piu' in cache (SCRIPT FLUSH chiamato)
            self._load()
            return r.evalsha(self.sha, len(keys), *keys, *args)

# Script rate limiter atomico: sliding window
RATE_LIMIT_SCRIPT = LuaScript("""
local key = KEYS[1]
local window = tonumber(ARGV[1])   -- finestra in secondi
local max_requests = tonumber(ARGV[2])
local now = tonumber(ARGV[3])      -- timestamp corrente in ms

-- Rimuovi entries scadute
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)

-- Conta requests nella finestra
local count = redis.call('ZCARD', key)

if count < max_requests then
  -- Aggiungi la request corrente
  redis.call('ZADD', key, now, now)
  redis.call('EXPIRE', key, window)
  return 1  -- Allowed
else
  return 0  -- Rate limited
end
""")

def check_rate_limit(user_id: str, window_sec: int = 60, max_req: int = 100) -> bool:
    """Rate limiting sliding window atomico via Lua. Ritorna True se allowed."""
    import time
    now_ms = int(time.time() * 1000)
    key = f"ratelimit:{user_id}"
    result = RATE_LIMIT_SCRIPT.execute(
        keys=[key],
        args=[str(window_sec), str(max_req), str(now_ms)],
    )
    return bool(result)

# Test
for i in range(5):
    allowed = check_rate_limit("user:1001", window_sec=60, max_req=3)
    print(f"Request {i+1}: {'ALLOWED' if allowed else 'RATE LIMITED'}")

Vzor: Distribuovaný zámek s Lua

Distribuovaný zámek je jedním z nejdůležitějších vzorů, které lze implementovat s Luou. Algoritmus Redlock používá atomové SETNX + EXPIRE k získání zámků, a skript Lua pro vydání, který zkontroluje token před odstraněním.

# Distributed Lock: SETNX + EXPIRE atomici
# SET key value NX PX milliseconds (Redis 2.6.12+)
SET lock:resource:order-1234 "worker-uuid-abc" NX PX 30000
# OK se il lock e' stato acquisito, nil altrimenti

# Script Lua per rilascio ATOMICO (verifica token prima di cancellare)
EVAL "
  if redis.call('GET', KEYS[1]) == ARGV[1] then
    return redis.call('DEL', KEYS[1])
  else
    return 0
  end
" 1 lock:resource:order-1234 "worker-uuid-abc"
# Python: Distributed Lock context manager
import redis
import uuid
import time
from contextlib import contextmanager

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

RELEASE_LOCK_SCRIPT = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
  return redis.call('DEL', KEYS[1])
else
  return 0
end
"""

@contextmanager
def distributed_lock(resource: str, timeout_ms: int = 30000, retry_ms: int = 100):
    """
    Context manager per distributed lock.
    Usa SET NX PX per acquisizione atomica.
    Usa Lua per rilascio atomico (evita rilascio di lock altrui).
    """
    lock_key = f"lock:{resource}"
    token = str(uuid.uuid4())
    acquired = False

    try:
        # Tentativo di acquisizione lock
        acquired = r.set(lock_key, token, nx=True, px=timeout_ms)
        if not acquired:
            raise Exception(f"Could not acquire lock on {resource}")

        yield token  # Esegui codice protetto

    finally:
        if acquired:
            # Rilascio atomico: cancella solo se il token e' ancora il nostro
            released = r.eval(RELEASE_LOCK_SCRIPT, 1, lock_key, token)
            if not released:
                # Il lock e' scaduto e qualcun altro lo ha acquisito nel frattempo
                print(f"WARNING: Lock {lock_key} expired before release")

# Uso
def process_order(order_id: str):
    with distributed_lock(f"order:{order_id}", timeout_ms=10000):
        # Questo codice e' eseguito in modo esclusivo
        print(f"Processing order {order_id}...")
        time.sleep(0.5)  # Simula lavoro
        print(f"Order {order_id} processed")

# Due worker concorrenti
import threading
t1 = threading.Thread(target=process_order, args=("ORD-123",))
t2 = threading.Thread(target=process_order, args=("ORD-123",))
t1.start()
t2.start()
t1.join()
t2.join()

Funkce Redis: Trvalé knihovny Lua (Redis 7+)

Redis 7.0 zavádí Functions, systém pro načítání knihoven Lua, jako je např trvalý kód na serveru. Na rozdíl od EVAL/EVALSHA jsou funkce přežívají restarty Redis, podporují verzování a umožňují organizovat související skripty do knihoven s jmenným prostorem.

# Redis Functions: caricare una libreria

# Definisci la libreria in Lua (sintassi Redis 7+)
FUNCTION LOAD "#!lua name=mylib\n
local function rate_limit(keys, args)
  local key = keys[1]
  local window = tonumber(args[1])
  local max_req = tonumber(args[2])
  local now = tonumber(args[3])

  redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
  local count = redis.call('ZCARD', key)

  if count < max_req then
    redis.call('ZADD', key, now, now)
    redis.call('EXPIRE', key, window)
    return 1
  end
  return 0
end

local function atomic_cas(keys, args)
  if redis.call('GET', keys[1]) == args[1] then
    redis.call('SET', keys[1], args[2])
    return 1
  end
  return 0
end

redis.register_function('rate_limit', rate_limit)
redis.register_function('atomic_cas', atomic_cas)
"

# Chiama la funzione con FCALL
FCALL rate_limit 1 ratelimit:user:1001 60 100 1710000000000

# Lista librerie caricate
FUNCTION LIST

# Elimina una libreria
FUNCTION DELETE mylib

# Dump e restore (per backup/migration)
FUNCTION DUMP   # Ritorna binary dump
FUNCTION RESTORE [FLUSH] [APPEND] [REPLACE] <payload>
# Python: Redis Functions con redis-py
import redis
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

LIBRARY_CODE = """#!lua name=ratelimit_lib

local function sliding_window_rl(keys, args)
  local key = keys[1]
  local window = tonumber(args[1])
  local max_req = tonumber(args[2])
  local now = tonumber(args[3])

  redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
  local count = redis.call('ZCARD', key)

  if count < max_req then
    redis.call('ZADD', key, now, tostring(now))
    redis.call('EXPIRE', key, window + 1)
    return {1, max_req - count - 1}  -- {allowed, remaining}
  end

  local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
  local retry_after = math.ceil((tonumber(oldest[2]) + window * 1000 - now) / 1000)
  return {0, 0, retry_after}  -- {blocked, remaining, retry_after_sec}
end

redis.register_function('sliding_rl', sliding_window_rl)
"""

def load_library():
    """Carica o ricarica la libreria Functions."""
    try:
        r.function_load(LIBRARY_CODE)
    except redis.exceptions.ResponseError as e:
        if 'Library already exists' in str(e):
            r.function_load(LIBRARY_CODE, replace=True)
        else:
            raise

def check_rate_limit_fn(user_id: str, window: int = 60, max_req: int = 10):
    """Usa Redis Function per rate limiting sliding window."""
    key = f"rl:user:{user_id}"
    now_ms = int(time.time() * 1000)
    result = r.fcall('sliding_rl', 1, key, str(window), str(max_req), str(now_ms))
    allowed, remaining = result[0], result[1]
    retry_after = result[2] if len(result) > 2 else None
    return {
        'allowed': bool(allowed),
        'remaining': remaining,
        'retry_after': retry_after,
    }

# Setup
load_library()

# Test
for i in range(12):
    result = check_rate_limit_fn("user:1001", window=60, max_req=10)
    status = "OK" if result['allowed'] else f"BLOCKED (retry in {result['retry_after']}s)"
    print(f"Request {i+1}: {status}, remaining: {result['remaining']}")

MULTI/EXEC vs Lua vs Funkce: Porovnání

Jaký mechanismus zvolit

  • MULTI/EXEC: Jednoduché sekvence příkazů bez podmíněné logiky. Žádné mezivýsledky použité jako vstup pro následující příkazy ve stejné transakci.
  • EVAL: Komplexní podmíněná logika, atomické čtení-upravování-zápis, rychlé prototypování. Vložený skript v kódu aplikace.
  • EVALSHA: Jako EVAL, ale snižuje zatížení sítě pro často volané skripty. Vyžaduje správu mezipaměti SHA.
  • Funkce (Redis 7+): Skripty jako první třída na serveru: přežijí restarty, verzovatelné, organizovatelné do knihoven. Pro stabilní, sdílený kód napříč více aplikacemi.

Závěry

Skriptování Lua v Redis je definitivní nástroj pro komplexní atomičnost. Zatímco MULTI/EXEC pokrývá jednoduché sekvence, EVAL umožňuje implementovat vzory sofistikované funkce, jako je omezení rychlosti, distribuované zámky a porovnávání a výměna bez nikdy se nestarejte o závodní podmínky. Redis Functions přináší tyto vzory na vyšší úroveň, zacházet s kódem Lua jako s verzovanou infrastrukturou místo pevně zakódovaných řetězců v kódu vaší aplikace.

Připravované články ze série Redis

  • Článek 4: Pokročilé omezení rychlosti — Token Bucket, posuvné okno na Redis
  • Článek 5: Správa relací a vzory mezipaměti — Cache-Aside, Write-Through
  • Článek 6: RedisJSON a RediSearch — dokumenty JSON a fulltextové vyhledávání