Redis のアトミック性の問題

Redis はコマンド実行用にシングルスレッドです。各コマンドが実行されます。 他のものに関してアトミックに実行されます。でも、走る必要があるときは 単一のアトミック操作 (読み取り、変更、書き込み) としてのコマンドのシーケンス。 競合するクライアント間で競合状態の問題が発生します。

Redis は 3 つのソリューションを提供します。 MULTI/EXEC (楽観的な取引 見てください)、 EVAL (Lua スクリプト、最も柔軟なソリューション) 新しいもの Redis Functions API (Redis 7+、ライブラリとしての Lua スクリプト バージョン管理付きで永続的)。

何を学ぶか

  • MULTI/EXEC: 楽観的なトランザクションと制限
  • EVAL: Redis で Lua スクリプトを直接実行する
  • EVALSHA: キャッシュされたスクリプトを実行します (ネットワーク オーバーヘッドなし)
  • Lua の KEYS と ARGV: パラメーターを渡す方法
  • パターン: コンペアアンドスワップ、アトミックレート制限、分散ロック
  • Redis 機能: Redis 7+ の永続的な Lua ライブラリ

MULTI/EXEC: 楽観的なトランザクション

MULTI/EXEC は、コマンドをアトミ​​ックに実行するキューにグループ化します。 WATCH は楽観的ロックを追加します: 監視されているキーが変更された場合 EXEC 前に別のクライアントから送信された場合、トランザクションはキャンセルされます。

# MULTI/EXEC: transazione base
MULTI
SET counter 0
INCR counter
INCR counter
EXEC
# 1) OK
# 2) (integer) 1
# 3) (integer) 2

# WATCH + MULTI/EXEC: optimistic locking
WATCH balance:user:1001
# Leggi il saldo corrente
GET balance:user:1001   # "1000"

# Se nessuno modifica balance:user:1001 prima di EXEC, la transazione esegue
MULTI
DECRBY balance:user:1001 100
INCRBY balance:user:1002 100
EXEC
# Se balance:user:1001 era stato modificato nel frattempo:
# (nil) -- transazione annullata, il client deve riprovare

# DISCARD: annulla una transazione in corso
MULTI
SET key1 value1
DISCARD   # Annulla tutto

MULTI/EXEC の制限は、コマンドはキューに入れられますが、キューには入れられないことです。 あるコマンドの結果を、同じコマンド内の次のコマンドへの入力として使用できます。 取引。これが Lua スクリプトの目的です。

EVAL: アトミック Lua スクリプト

EVAL Lua スクリプトを Redis サーバー上で直接実行します。 スクリプトはアトミックに実行されます。他のコマンドは実行できません。 それを中断します。値を読み取り、計算を実行し、結果を書き込むことができます 単一のアトミック操作で。

# EVAL sintassi: EVAL script numkeys key [key ...] arg [arg ...]
# KEYS[1], KEYS[2], ... per i nomi delle chiavi
# ARGV[1], ARGV[2], ... per gli argomenti aggiuntivi

# Esempio 1: incremento condizionale
# Incrementa solo se il valore e' minore di un massimo
EVAL "
  local current = redis.call('GET', KEYS[1])
  if current == false then
    redis.call('SET', KEYS[1], ARGV[1])
    return tonumber(ARGV[1])
  end
  local val = tonumber(current)
  local max = tonumber(ARGV[2])
  if val < max then
    return redis.call('INCR', KEYS[1])
  end
  return val
" 1 counter:requests 0 100
# Incrementa counter:requests (KEYS[1]) se < 100 (ARGV[2])
# Default iniziale = 0 (ARGV[1])

# Esempio 2: get-set-expire atomico (GETSET + EXPIRE)
EVAL "
  local old = redis.call('GET', KEYS[1])
  redis.call('SET', KEYS[1], ARGV[1])
  redis.call('EXPIRE', KEYS[1], ARGV[2])
  return old
" 1 session:token:abc123 "new_value" 3600

# Esempio 3: compare-and-swap (CAS)
EVAL "
  if redis.call('GET', KEYS[1]) == ARGV[1] then
    redis.call('SET', KEYS[1], ARGV[2])
    return 1
  else
    return 0
  end
" 1 config:version "v1" "v2"
# Ritorna 1 se CAS riuscito, 0 se il valore era gia' cambiato
# Python: EVAL con redis-py
import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Script Lua per compare-and-swap atomico
CAS_SCRIPT = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
  redis.call('SET', KEYS[1], ARGV[2])
  return 1
else
  return 0
end
"""

def compare_and_swap(key: str, expected: str, new_value: str) -> bool:
    """Aggiorna key a new_value solo se il valore corrente e' expected."""
    result = r.eval(CAS_SCRIPT, 1, key, expected, new_value)
    return bool(result)

# Uso
r.set('config:feature-flag', 'disabled')

success = compare_and_swap('config:feature-flag', 'disabled', 'enabled')
print(f"CAS succeeded: {success}")  # True

# Secondo tentativo con valore sbagliato
success2 = compare_and_swap('config:feature-flag', 'disabled', 'enabled')
print(f"CAS succeeded: {success2}")  # False (era gia' 'enabled')

EVALSHA: パフォーマンスのためのキャッシュされたスクリプト

電話をかけるたびに EVAL、スクリプトはネットワーク経由で Redis に送信されます。 頻繁に呼び出されるスクリプトの場合、 EVALSHA ネットワークトラフィックを削減します。 クライアントは、コード全体ではなく、スクリプトの SHA1 ハッシュ (40 文字) のみを送信します。 Redis は、ロードされたスクリプトのキャッシュを維持します。 SCRIPT LOAD.

# SCRIPT LOAD: carica lo script nel server, ottieni SHA1
SCRIPT LOAD "
  if redis.call('GET', KEYS[1]) == ARGV[1] then
    redis.call('SET', KEYS[1], ARGV[2])
    return 1
  else
    return 0
  end
"
# "7d5c3d9b8e5d5f9b3c3d9b8e5d5f9b3c3d9b8e5d"  (SHA1 hash)

# EVALSHA: esegui con solo l'hash (molto piu' efficiente in rete)
EVALSHA "7d5c3d9b8e5d5f9b3c3d9b8e5d5f9b3c3d9b8e5d" 1 key "expected" "new"

# SCRIPT EXISTS: verifica se uno script e' in cache
SCRIPT EXISTS "7d5c3d9b8e5d5f9b3c3d9b8e5d5f9b3c3d9b8e5d"
# 1) (integer) 1

# SCRIPT FLUSH: rimuove tutti gli script dalla cache (utile in test)
SCRIPT FLUSH
# Python: gestione SHA con fallback automatico
import redis
import hashlib

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

class LuaScript:
    """Wrapper per script Lua con caching SHA automatico."""

    def __init__(self, script: str):
        self.script = script
        self.sha = None

    def _load(self) -> str:
        """Carica lo script e memorizza lo SHA."""
        self.sha = r.script_load(self.script)
        return self.sha

    def execute(self, keys: list, args: list):
        """Esegue lo script, con fallback a EVAL se SHA non in cache."""
        if self.sha is None:
            self._load()
        try:
            return r.evalsha(self.sha, len(keys), *keys, *args)
        except redis.exceptions.NoScriptError:
            # Script non piu' in cache (SCRIPT FLUSH chiamato)
            self._load()
            return r.evalsha(self.sha, len(keys), *keys, *args)

# Script rate limiter atomico: sliding window
RATE_LIMIT_SCRIPT = LuaScript("""
local key = KEYS[1]
local window = tonumber(ARGV[1])   -- finestra in secondi
local max_requests = tonumber(ARGV[2])
local now = tonumber(ARGV[3])      -- timestamp corrente in ms

-- Rimuovi entries scadute
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)

-- Conta requests nella finestra
local count = redis.call('ZCARD', key)

if count < max_requests then
  -- Aggiungi la request corrente
  redis.call('ZADD', key, now, now)
  redis.call('EXPIRE', key, window)
  return 1  -- Allowed
else
  return 0  -- Rate limited
end
""")

def check_rate_limit(user_id: str, window_sec: int = 60, max_req: int = 100) -> bool:
    """Rate limiting sliding window atomico via Lua. Ritorna True se allowed."""
    import time
    now_ms = int(time.time() * 1000)
    key = f"ratelimit:{user_id}"
    result = RATE_LIMIT_SCRIPT.execute(
        keys=[key],
        args=[str(window_sec), str(max_req), str(now_ms)],
    )
    return bool(result)

# Test
for i in range(5):
    allowed = check_rate_limit("user:1001", window_sec=60, max_req=3)
    print(f"Request {i+1}: {'ALLOWED' if allowed else 'RATE LIMITED'}")

パターン: Lua を使用した分散ロック

分散ロックは実装できる最も重要なパターンの 1 つです ルアと一緒に。 Redlock アルゴリズムは、アトミック SETNX + EXPIRE を使用してロックを取得します。 削除前にトークンをチェックする発行用の Lua スクリプト。

# Distributed Lock: SETNX + EXPIRE atomici
# SET key value NX PX milliseconds (Redis 2.6.12+)
SET lock:resource:order-1234 "worker-uuid-abc" NX PX 30000
# OK se il lock e' stato acquisito, nil altrimenti

# Script Lua per rilascio ATOMICO (verifica token prima di cancellare)
EVAL "
  if redis.call('GET', KEYS[1]) == ARGV[1] then
    return redis.call('DEL', KEYS[1])
  else
    return 0
  end
" 1 lock:resource:order-1234 "worker-uuid-abc"
# Python: Distributed Lock context manager
import redis
import uuid
import time
from contextlib import contextmanager

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

RELEASE_LOCK_SCRIPT = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
  return redis.call('DEL', KEYS[1])
else
  return 0
end
"""

@contextmanager
def distributed_lock(resource: str, timeout_ms: int = 30000, retry_ms: int = 100):
    """
    Context manager per distributed lock.
    Usa SET NX PX per acquisizione atomica.
    Usa Lua per rilascio atomico (evita rilascio di lock altrui).
    """
    lock_key = f"lock:{resource}"
    token = str(uuid.uuid4())
    acquired = False

    try:
        # Tentativo di acquisizione lock
        acquired = r.set(lock_key, token, nx=True, px=timeout_ms)
        if not acquired:
            raise Exception(f"Could not acquire lock on {resource}")

        yield token  # Esegui codice protetto

    finally:
        if acquired:
            # Rilascio atomico: cancella solo se il token e' ancora il nostro
            released = r.eval(RELEASE_LOCK_SCRIPT, 1, lock_key, token)
            if not released:
                # Il lock e' scaduto e qualcun altro lo ha acquisito nel frattempo
                print(f"WARNING: Lock {lock_key} expired before release")

# Uso
def process_order(order_id: str):
    with distributed_lock(f"order:{order_id}", timeout_ms=10000):
        # Questo codice e' eseguito in modo esclusivo
        print(f"Processing order {order_id}...")
        time.sleep(0.5)  # Simula lavoro
        print(f"Order {order_id} processed")

# Due worker concorrenti
import threading
t1 = threading.Thread(target=process_order, args=("ORD-123",))
t2 = threading.Thread(target=process_order, args=("ORD-123",))
t1.start()
t2.start()
t1.join()
t2.join()

Redis 機能: 永続的な Lua ライブラリ (Redis 7 以降)

Redis 7.0 では、次のような Lua ライブラリをロードするためのシステムである Functions が導入されています。 サーバー上の永続的なコード。 EVAL/EVALSHA とは異なり、関数 これらは Redis の再起動後も存続し、バージョン管理をサポートし、 関連するスクリプトを名前空間ライブラリに整理します。

# Redis Functions: caricare una libreria

# Definisci la libreria in Lua (sintassi Redis 7+)
FUNCTION LOAD "#!lua name=mylib\n
local function rate_limit(keys, args)
  local key = keys[1]
  local window = tonumber(args[1])
  local max_req = tonumber(args[2])
  local now = tonumber(args[3])

  redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
  local count = redis.call('ZCARD', key)

  if count < max_req then
    redis.call('ZADD', key, now, now)
    redis.call('EXPIRE', key, window)
    return 1
  end
  return 0
end

local function atomic_cas(keys, args)
  if redis.call('GET', keys[1]) == args[1] then
    redis.call('SET', keys[1], args[2])
    return 1
  end
  return 0
end

redis.register_function('rate_limit', rate_limit)
redis.register_function('atomic_cas', atomic_cas)
"

# Chiama la funzione con FCALL
FCALL rate_limit 1 ratelimit:user:1001 60 100 1710000000000

# Lista librerie caricate
FUNCTION LIST

# Elimina una libreria
FUNCTION DELETE mylib

# Dump e restore (per backup/migration)
FUNCTION DUMP   # Ritorna binary dump
FUNCTION RESTORE [FLUSH] [APPEND] [REPLACE] <payload>
# Python: Redis Functions con redis-py
import redis
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

LIBRARY_CODE = """#!lua name=ratelimit_lib

local function sliding_window_rl(keys, args)
  local key = keys[1]
  local window = tonumber(args[1])
  local max_req = tonumber(args[2])
  local now = tonumber(args[3])

  redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
  local count = redis.call('ZCARD', key)

  if count < max_req then
    redis.call('ZADD', key, now, tostring(now))
    redis.call('EXPIRE', key, window + 1)
    return {1, max_req - count - 1}  -- {allowed, remaining}
  end

  local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
  local retry_after = math.ceil((tonumber(oldest[2]) + window * 1000 - now) / 1000)
  return {0, 0, retry_after}  -- {blocked, remaining, retry_after_sec}
end

redis.register_function('sliding_rl', sliding_window_rl)
"""

def load_library():
    """Carica o ricarica la libreria Functions."""
    try:
        r.function_load(LIBRARY_CODE)
    except redis.exceptions.ResponseError as e:
        if 'Library already exists' in str(e):
            r.function_load(LIBRARY_CODE, replace=True)
        else:
            raise

def check_rate_limit_fn(user_id: str, window: int = 60, max_req: int = 10):
    """Usa Redis Function per rate limiting sliding window."""
    key = f"rl:user:{user_id}"
    now_ms = int(time.time() * 1000)
    result = r.fcall('sliding_rl', 1, key, str(window), str(max_req), str(now_ms))
    allowed, remaining = result[0], result[1]
    retry_after = result[2] if len(result) > 2 else None
    return {
        'allowed': bool(allowed),
        'remaining': remaining,
        'retry_after': retry_after,
    }

# Setup
load_library()

# Test
for i in range(12):
    result = check_rate_limit_fn("user:1001", window=60, max_req=10)
    status = "OK" if result['allowed'] else f"BLOCKED (retry in {result['retry_after']}s)"
    print(f"Request {i+1}: {status}, remaining: {result['remaining']}")

MULTI/EXEC vs Lua vs 関数: 比較

どのメカニズムを選択するか

  • マルチ/実行: 条件ロジックのない単純なコマンド シーケンス。同じトランザクション内の後続のコマンドへの入力として中間結果が使用されることはありません。
  • 評価: 複雑な条件付きロジック、アトミックな読み取り、変更、書き込み、ラピッド プロトタイピング。アプリケーションコード内のインラインスクリプト。
  • エヴァルシャ: EVAL と似ていますが、頻繁に呼び出されるスクリプトのネットワーク ペイロードが軽減されます。 SHA キャッシュ管理が必要です。
  • 機能 (Redis 7 以降): サーバー内のファーストクラスのスクリプト: 再起動後も存続し、バージョン管理可能で、ライブラリに編成可能。複数のアプリケーション間で安定した共有コードを実現します。

結論

Redis の Lua スクリプトは、複雑なアトミック性を実現するための決定的なツールです。 MULTI/EXEC は単純なシーケンスをカバーしますが、EVAL を使用するとパターンを実装できます。 レート制限、分散ロック、コンペアアンドスワップなどの高度な機能を備えています。 競合状態を心配する必要はありません。 Redis Functions はこれらのパターンをもたらします より高いレベルに移行し、Lua コードをバージョン管理されたインフラストラクチャとして扱います。 アプリケーションコード内のハードコードされた文字列の代わりに。

Redis シリーズの今後の記事

  • 第4条: 高度なレート制限 — Redis のトークン バケット、スライディング ウィンドウ
  • 第5条: セッション管理とキャッシュ パターン — キャッシュアサイド、ライトスルー
  • 第6条: RedisJSON と RediSearch — JSON ドキュメントと全文検索