OTP 및 GenServer: 통합 내결함성을 갖춘 분산 상태 관리
GenServer는 OTP의 기본 구성 요소입니다. 서버 구현 방법 Handle_call/handle_cast를 통한 상태 저장, 불변 상태 처리, 프로세스 등록 이름과 왜 "Let it crash"는 디자인 철학이지 변명이 아닙니다.
OTP: 개방형 통신 플랫폼
OTP(Open Telecom Platform)는 라이브러리, 디자인 패턴 및 Ericsson의 Erlang 역사에서 나온 아키텍처 원칙입니다. 웹 프레임워크가 아닙니다. 구축을 위한 프레임워크입니다. 내결함성 시스템. 빌딩 블록(GenServer, Supervisor, GenStage, 레지스트리)는 동시 프로세스를 구성하는 표준화된 방법을 제공합니다. 신뢰성을 보장합니다.
이전 장에서 우리는 상태 저장 서버를 구현하는 방법을 보았습니다. 꼬리 재귀 및 수동 수신. GenServer도 같은 생각입니다 규칙, 도구 및 통합을 통해 표준 OTP 동작으로 캡슐화됩니다. 감독 트리와 함께.
무엇을 배울 것인가
- GenServer 동작: 필수 및 선택적 콜백
- handler_call: 동기 요청(클라이언트가 응답을 기다림)
- handler_cast: 비동기 요청(fire-and-forget)
- handler_info: OTP가 아닌 메시지(타이머, 모니터 다운)
- 이름으로 등록: {:local, :name} 및 {:via, module, key}
- init/1 및 종료/2: 서버 수명 주기
- "Let it crash": 오류를 처리하지 말아야 할 경우
GenServer 분석
# GenServer: struttura base
defmodule MyApp.Counter do
use GenServer
# --- Client API (chiamato da altri processi) ---
@doc "Avvia il server Counter."
def start_link(initial_value \\ 0) do
# Il nome :my_counter permette di riferirsi al server senza PID
GenServer.start_link(__MODULE__, initial_value, name: :my_counter)
end
@doc "Incrementa il contatore di n (default 1). Sincrono."
def increment(n \\ 1) do
GenServer.call(:my_counter, {:increment, n})
end
@doc "Decrementa in modo asincrono (non aspetta conferma)."
def decrement_async(n \\ 1) do
GenServer.cast(:my_counter, {:decrement, n})
end
@doc "Legge il valore corrente."
def get_value do
GenServer.call(:my_counter, :get_value)
end
@doc "Reset asincrono."
def reset do
GenServer.cast(:my_counter, :reset)
end
# --- Server Callbacks (eseguiti nel processo GenServer) ---
@impl true
def init(initial_value) do
{:ok, initial_value}
# Oppure: {:ok, state, timeout_ms} -- handle_info(:timeout, ...) dopo ms
# Oppure: {:stop, reason} -- non avviare il server
end
# handle_call: sincrono, risponde con {:reply, reply, new_state}
@impl true
def handle_call({:increment, n}, _from, state) do
new_state = state + n
{:reply, new_state, new_state}
# {:reply, risposta_al_client, nuovo_stato}
end
def handle_call(:get_value, _from, state) do
{:reply, state, state}
# Lo stato non cambia, ma rispondiamo con il valore corrente
end
# handle_cast: asincrono, NON risponde
@impl true
def handle_cast({:decrement, n}, state) do
{:noreply, state - n}
# {:noreply, nuovo_stato}
end
def handle_cast(:reset, _state) do
{:noreply, 0}
end
# handle_info: messaggi non-OTP (es. :timer.send_after, Process.send_after)
@impl true
def handle_info(:log_state, state) do
IO.puts("[Counter] Current value: #{state}")
{:noreply, state}
end
# Catch-all per messaggi non gestiti (evita crash per messaggi inaspettati)
def handle_info(msg, state) do
IO.puts("Unexpected message: #{inspect(msg)}")
{:noreply, state}
end
# terminate: chiamato prima dello stop (cleanup)
@impl true
def terminate(reason, state) do
IO.puts("Counter stopping. Reason: #{inspect(reason)}, final value: #{state}")
:ok
end
end
# Utilizzo del Counter GenServer
{:ok, _pid} = MyApp.Counter.start_link(0)
MyApp.Counter.increment() # 1
MyApp.Counter.increment(5) # 6
MyApp.Counter.decrement_async(2) # Async: ritorna :ok immediatamente
:timer.sleep(10) # Aspetta che il cast venga processato
MyApp.Counter.get_value() # 4
# Test:
iex> MyApp.Counter.get_value()
4
# Invia un messaggio direttamente al processo
send(:my_counter, :log_state)
# [Counter] Current value: 4
구조화된 상태: 단순한 값 이상
실제 애플리케이션에서 GenServer의 상태는 일반적으로 구조체입니다. 또는 여러 필드가 있는 지도. 구조체를 사용하면 코드가 더 명확해집니다. 콜백의 상태에 대한 패턴 일치를 허용합니다.
# GenServer con stato strutturato
defmodule MyApp.RateLimiter do
use GenServer
defstruct [
:window_ms,
:max_requests,
requests: %{}, # user_id => list of timestamps
]
# --- Client API ---
def start_link(opts \\ []) do
window_ms = Keyword.get(opts, :window_ms, 60_000)
max_requests = Keyword.get(opts, :max_requests, 100)
GenServer.start_link(__MODULE__, {window_ms, max_requests}, name: __MODULE__)
end
@doc "Controlla se la richiesta e' permessa. Ritorna {:ok, remaining} | {:error, :rate_limited}"
def check(user_id) do
GenServer.call(__MODULE__, {:check, user_id})
end
def reset_user(user_id) do
GenServer.cast(__MODULE__, {:reset_user, user_id})
end
# --- Server Callbacks ---
@impl true
def init({window_ms, max_requests}) do
# Avvia cleanup periodico ogni 30 secondi
:timer.send_interval(30_000, :cleanup)
state = %__MODULE__{
window_ms: window_ms,
max_requests: max_requests,
}
{:ok, state}
end
@impl true
def handle_call({:check, user_id}, _from, state) do
now = System.monotonic_time(:millisecond)
window_start = now - state.window_ms
# Recupera timestamps delle richieste precedenti, filtra scadute
user_requests =
Map.get(state.requests, user_id, [])
|> Enum.filter(&(&1 > window_start))
if length(user_requests) >= state.max_requests do
# Rate limited: non aggiorno lo stato
remaining = 0
retry_after = hd(user_requests) + state.window_ms - now
{:reply, {:error, :rate_limited, retry_after}, state}
else
# Permesso: aggiungo il timestamp corrente
new_requests = [now | user_requests]
new_state = put_in(state.requests[user_id], new_requests)
remaining = state.max_requests - length(new_requests)
{:reply, {:ok, remaining}, new_state}
end
end
@impl true
def handle_cast({:reset_user, user_id}, state) do
new_state = update_in(state.requests, &Map.delete(&1, user_id))
{:noreply, new_state}
end
@impl true
def handle_info(:cleanup, state) do
now = System.monotonic_time(:millisecond)
window_start = now - state.window_ms
# Rimuovi utenti senza richieste recenti
cleaned_requests =
state.requests
|> Enum.reject(fn {_user, timestamps} ->
Enum.all?(timestamps, &(&1 <= window_start))
end)
|> Map.new()
{:noreply, %{state | requests: cleaned_requests}}
end
end
"Let It Crash": OTP 철학
"Let it crash"는 Erlang/Elixir의 가장 오해받는 원칙입니다. "오류 무시"를 의미하는 것이 아니라 비즈니스 코드를 의미합니다. 예상치 못한 경우에 대한 방어적인 오류 처리에 부담을 주어서는 안 됩니다. 대신 프로세스를 자동으로 다시 시작하는 감독자에게 복구가 위임됩니다. 그 충돌.
# Let it crash: codice senza defensive programming eccessivo
defmodule MyApp.OrderProcessor do
use GenServer
# Approccio SBAGLIATO: defensive programming eccessivo
def process_order_bad(order) do
try do
case validate_order(order) do
{:ok, valid_order} ->
case save_to_db(valid_order) do
{:ok, saved} ->
case send_confirmation(saved) do
{:ok, _} -> {:ok, saved}
{:error, e} -> handle_email_error(e)
end
{:error, e} -> handle_db_error(e)
end
{:error, e} -> handle_validation_error(e)
end
rescue
e -> handle_unexpected_error(e)
end
end
# Approccio CORRETTO: gestisci solo gli errori attesi, crash per il resto
def process_order(order) do
with {:ok, valid_order} <- validate_order(order),
{:ok, saved} <- save_to_db(valid_order),
{:ok, _} <- send_confirmation(saved) do
{:ok, saved}
else
{:error, :invalid_data} = error ->
Logger.warning("Invalid order data: #{inspect(order)}")
error # Errore previsto: gestito
{:error, :duplicate_order} = error ->
Logger.info("Duplicate order ignored: #{order.id}")
{:ok, :duplicate} # Caso atteso: ritorna ok
end
# Per tutto il resto (bug nel codice, db down, etc.):
# Il crash propaghera' al Supervisor che riavviera' il processo
# in uno stato pulito
end
# Errori che NON gestisci:
# - Bug nel codice (FunctionClauseError, MatchError)
# - Dipendenze irraggiungibili (db crash totale)
# - Situazioni che non dovrebbero mai accadere
# Il Supervisor le gestisce riavviando il processo
end
프로세스 등록: Via 및 Registry
# Registrazione con nome atom globale (semplice ma unico per nodo)
GenServer.start_link(MyServer, args, name: :global_name)
# {:via, module, key}: Registry distribuito (piu' flessibile)
defmodule MyApp.UserSession do
use GenServer
# Avvia con via Registry per supportare piu' istanze
def start_link(user_id) do
GenServer.start_link(
__MODULE__,
%{user_id: user_id},
name: via_tuple(user_id)
)
end
# Helper per costruire il via tuple
defp via_tuple(user_id) do
{:via, Registry, {MyApp.Registry, "user_session:#{user_id}"}}
end
# Client API usa via_tuple invece del PID
def get_session(user_id) do
GenServer.call(via_tuple(user_id), :get_session)
end
def update_session(user_id, updates) do
GenServer.cast(via_tuple(user_id), {:update, updates})
end
# Server callbacks
@impl true
def init(state), do: {:ok, state}
@impl true
def handle_call(:get_session, _from, state) do
{:reply, state, state}
end
@impl true
def handle_cast({:update, updates}, state) do
{:noreply, Map.merge(state, updates)}
end
end
# Setup in application.ex
# children = [
# {Registry, keys: :unique, name: MyApp.Registry},
# ...
# ]
# Uso
{:ok, _} = MyApp.UserSession.start_link("user-1001")
MyApp.UserSession.get_session("user-1001")
# %{user_id: "user-1001"}
MyApp.UserSession.update_session("user-1001", %{last_seen: DateTime.utc_now()})
:via 및 DynamicSupervisor를 사용하는 GenServer
# Application setup per piu' GenServer dello stesso tipo
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# Registry per lookup per chiave
{Registry, keys: :unique, name: MyApp.SessionRegistry},
# DynamicSupervisor: supervisore per figli creati dinamicamente
{DynamicSupervisor, name: MyApp.SessionSupervisor, strategy: :one_for_one},
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
# Avvia sessioni on-demand
defmodule MyApp.SessionManager do
def start_session(user_id) do
spec = {MyApp.UserSession, user_id}
DynamicSupervisor.start_child(MyApp.SessionSupervisor, spec)
end
def stop_session(user_id) do
case Registry.lookup(MyApp.SessionRegistry, "user_session:#{user_id}") do
[{pid, _}] -> DynamicSupervisor.terminate_child(MyApp.SessionSupervisor, pid)
[] -> {:error, :not_found}
end
end
def active_sessions do
DynamicSupervisor.which_children(MyApp.SessionSupervisor)
|> length()
end
end
GenServer 테스트
# Test di un GenServer con ExUnit
defmodule MyApp.CounterTest do
use ExUnit.Case, async: true
setup do
# Avvia una nuova istanza per ogni test (senza nome, usa PID)
{:ok, pid} = GenServer.start_link(MyApp.Counter, 0)
{:ok, counter: pid}
end
test "starts at 0", %{counter: pid} do
assert GenServer.call(pid, :get_value) == 0
end
test "increments correctly", %{counter: pid} do
GenServer.call(pid, {:increment, 5})
GenServer.call(pid, {:increment, 3})
assert GenServer.call(pid, :get_value) == 8
end
test "resets to zero", %{counter: pid} do
GenServer.call(pid, {:increment, 10})
GenServer.cast(pid, :reset)
# Piccola attesa per il cast asincrono
:timer.sleep(10)
assert GenServer.call(pid, :get_value) == 0
end
test "handles concurrent increments safely", %{counter: pid} do
# Spawna 100 processi che incrementano contemporaneamente
tasks = for _ <- 1..100 do
Task.async(fn ->
GenServer.call(pid, {:increment, 1})
end)
end
Enum.each(tasks, &Task.await/1)
assert GenServer.call(pid, :get_value) == 100
end
end
결론
GenServer는 Elixir 프로세스 이론이 만나는 곳입니다. 연습. OTP 동작은 구조, 규칙 및 통합을 제공합니다. 감독 트리와 함께. 동기화를 위한 핸들_콜, 동기화를 위한 핸들_캐스트 Fire-and-forget, 외부 메시지용 handler_info — 이 세 가지 콜백 사용 사례의 95%를 다루고 있습니다. "충돌하자"는 게으름이 아닙니다. 오염 없이 복원력 있는 시스템을 구축할 수 있는 철학 각 엣지 케이스에 대한 방어 프로그래밍이 포함된 비즈니스 코드. 다음 문서에서는 감독자를 통해 이를 다음 단계로 끌어올립니다.
Elixir 시리즈의 향후 기사
- 제4조: 감독자 트리 — one_for_one, one_for_all 및 Rest_for_one 전략
- 제5조: Ecto — PostgreSQL용 구성 가능한 쿼리 및 스키마 매핑
- 제6조: Phoenix 프레임워크 — 라우터, 컨트롤러, 보기 및 JSON API







