OTP: Otevřená telekomunikační platforma

OTP (Open Telecom Platform) je sada knihoven, návrhových vzorů a architektonické principy, které pocházejí z Erlangovy historie ve společnosti Ericsson. Není to webový rámec – je to rámec pro vytváření systémy odolné proti poruchám. Jeho stavební bloky (GenServer, Supervisor, GenStage, Registr) vám poskytuje standardizovaný způsob, jak strukturovat souběžné procesy se zárukami spolehlivosti.

V předchozí kapitole jsme viděli, jak implementovat stavový server pomocí rekurze ocasu a ruční příjem. GenServer je stejný nápad zapouzdřené ve standardním chování OTP s konvencemi, nástroji a integrací se stromem dohledu.

Co se naučíte

  • Chování GenServeru: povinná a volitelná zpětná volání
  • handle_call: synchronní požadavky (klient čeká na odpověď)
  • handle_cast: asynchronní požadavky (fire-and-forget)
  • handle_info: zprávy bez OTP (časovač, výpadky monitoru)
  • Registrace s názvem: {:local, :name} a {:via, module, key}
  • init/1 a ukončit/2: životní cyklus serveru
  • „Nechte to havarovat“: kdy NEZpracovat chyby

Anatomie GenServeru

# GenServer: struttura base
defmodule MyApp.Counter do
  use GenServer

  # --- Client API (chiamato da altri processi) ---

  @doc "Avvia il server Counter."
  def start_link(initial_value \\ 0) do
    # Il nome :my_counter permette di riferirsi al server senza PID
    GenServer.start_link(__MODULE__, initial_value, name: :my_counter)
  end

  @doc "Incrementa il contatore di n (default 1). Sincrono."
  def increment(n \\ 1) do
    GenServer.call(:my_counter, {:increment, n})
  end

  @doc "Decrementa in modo asincrono (non aspetta conferma)."
  def decrement_async(n \\ 1) do
    GenServer.cast(:my_counter, {:decrement, n})
  end

  @doc "Legge il valore corrente."
  def get_value do
    GenServer.call(:my_counter, :get_value)
  end

  @doc "Reset asincrono."
  def reset do
    GenServer.cast(:my_counter, :reset)
  end

  # --- Server Callbacks (eseguiti nel processo GenServer) ---

  @impl true
  def init(initial_value) do
    {:ok, initial_value}
    # Oppure: {:ok, state, timeout_ms}  -- handle_info(:timeout, ...) dopo ms
    # Oppure: {:stop, reason}            -- non avviare il server
  end

  # handle_call: sincrono, risponde con {:reply, reply, new_state}
  @impl true
  def handle_call({:increment, n}, _from, state) do
    new_state = state + n
    {:reply, new_state, new_state}
    # {:reply, risposta_al_client, nuovo_stato}
  end

  def handle_call(:get_value, _from, state) do
    {:reply, state, state}
    # Lo stato non cambia, ma rispondiamo con il valore corrente
  end

  # handle_cast: asincrono, NON risponde
  @impl true
  def handle_cast({:decrement, n}, state) do
    {:noreply, state - n}
    # {:noreply, nuovo_stato}
  end

  def handle_cast(:reset, _state) do
    {:noreply, 0}
  end

  # handle_info: messaggi non-OTP (es. :timer.send_after, Process.send_after)
  @impl true
  def handle_info(:log_state, state) do
    IO.puts("[Counter] Current value: #{state}")
    {:noreply, state}
  end

  # Catch-all per messaggi non gestiti (evita crash per messaggi inaspettati)
  def handle_info(msg, state) do
    IO.puts("Unexpected message: #{inspect(msg)}")
    {:noreply, state}
  end

  # terminate: chiamato prima dello stop (cleanup)
  @impl true
  def terminate(reason, state) do
    IO.puts("Counter stopping. Reason: #{inspect(reason)}, final value: #{state}")
    :ok
  end
end
# Utilizzo del Counter GenServer
{:ok, _pid} = MyApp.Counter.start_link(0)

MyApp.Counter.increment()        # 1
MyApp.Counter.increment(5)       # 6
MyApp.Counter.decrement_async(2) # Async: ritorna :ok immediatamente
:timer.sleep(10)                  # Aspetta che il cast venga processato
MyApp.Counter.get_value()        # 4

# Test:
iex> MyApp.Counter.get_value()
4

# Invia un messaggio direttamente al processo
send(:my_counter, :log_state)
# [Counter] Current value: 4

Strukturovaný stav: nad rámec jednoduchých hodnot

Ve skutečných aplikacích je stavem GenServeru typicky struktura nebo mapa s více poli. Použití struktury dělá kód explicitnějším a umožňuje porovnávání vzorů ve stavu ve zpětných voláních.

# GenServer con stato strutturato
defmodule MyApp.RateLimiter do
  use GenServer

  defstruct [
    :window_ms,
    :max_requests,
    requests: %{},  # user_id => list of timestamps
  ]

  # --- Client API ---

  def start_link(opts \\ []) do
    window_ms = Keyword.get(opts, :window_ms, 60_000)
    max_requests = Keyword.get(opts, :max_requests, 100)
    GenServer.start_link(__MODULE__, {window_ms, max_requests}, name: __MODULE__)
  end

  @doc "Controlla se la richiesta e' permessa. Ritorna {:ok, remaining} | {:error, :rate_limited}"
  def check(user_id) do
    GenServer.call(__MODULE__, {:check, user_id})
  end

  def reset_user(user_id) do
    GenServer.cast(__MODULE__, {:reset_user, user_id})
  end

  # --- Server Callbacks ---

  @impl true
  def init({window_ms, max_requests}) do
    # Avvia cleanup periodico ogni 30 secondi
    :timer.send_interval(30_000, :cleanup)

    state = %__MODULE__{
      window_ms: window_ms,
      max_requests: max_requests,
    }
    {:ok, state}
  end

  @impl true
  def handle_call({:check, user_id}, _from, state) do
    now = System.monotonic_time(:millisecond)
    window_start = now - state.window_ms

    # Recupera timestamps delle richieste precedenti, filtra scadute
    user_requests =
      Map.get(state.requests, user_id, [])
      |> Enum.filter(&(&1 > window_start))

    if length(user_requests) >= state.max_requests do
      # Rate limited: non aggiorno lo stato
      remaining = 0
      retry_after = hd(user_requests) + state.window_ms - now
      {:reply, {:error, :rate_limited, retry_after}, state}
    else
      # Permesso: aggiungo il timestamp corrente
      new_requests = [now | user_requests]
      new_state = put_in(state.requests[user_id], new_requests)
      remaining = state.max_requests - length(new_requests)
      {:reply, {:ok, remaining}, new_state}
    end
  end

  @impl true
  def handle_cast({:reset_user, user_id}, state) do
    new_state = update_in(state.requests, &Map.delete(&1, user_id))
    {:noreply, new_state}
  end

  @impl true
  def handle_info(:cleanup, state) do
    now = System.monotonic_time(:millisecond)
    window_start = now - state.window_ms

    # Rimuovi utenti senza richieste recenti
    cleaned_requests =
      state.requests
      |> Enum.reject(fn {_user, timestamps} ->
        Enum.all?(timestamps, &(&1 <= window_start))
      end)
      |> Map.new()

    {:noreply, %{state | requests: cleaned_requests}}
  end
end

„Let It Crash“: Filozofie OTP

„Nech to havarovat“ je nejvíce nepochopený princip Erlang/Elixir. Neznamená to „ignorovat chyby“ – znamená to obchodní kód nesmí být zatíženo defenzivním řešením chyb pro jakýkoli nepředvídaný případ. Místo toho je obnovení delegováno na supervizora, který automaticky restartuje procesy ta havárie.

# Let it crash: codice senza defensive programming eccessivo
defmodule MyApp.OrderProcessor do
  use GenServer

  # Approccio SBAGLIATO: defensive programming eccessivo
  def process_order_bad(order) do
    try do
      case validate_order(order) do
        {:ok, valid_order} ->
          case save_to_db(valid_order) do
            {:ok, saved} ->
              case send_confirmation(saved) do
                {:ok, _} -> {:ok, saved}
                {:error, e} -> handle_email_error(e)
              end
            {:error, e} -> handle_db_error(e)
          end
        {:error, e} -> handle_validation_error(e)
      end
    rescue
      e -> handle_unexpected_error(e)
    end
  end

  # Approccio CORRETTO: gestisci solo gli errori attesi, crash per il resto
  def process_order(order) do
    with {:ok, valid_order} <- validate_order(order),
         {:ok, saved} <- save_to_db(valid_order),
         {:ok, _} <- send_confirmation(saved) do
      {:ok, saved}
    else
      {:error, :invalid_data} = error ->
        Logger.warning("Invalid order data: #{inspect(order)}")
        error  # Errore previsto: gestito

      {:error, :duplicate_order} = error ->
        Logger.info("Duplicate order ignored: #{order.id}")
        {:ok, :duplicate}  # Caso atteso: ritorna ok
    end
    # Per tutto il resto (bug nel codice, db down, etc.):
    # Il crash propaghera' al Supervisor che riavviera' il processo
    # in uno stato pulito
  end

  # Errori che NON gestisci:
  # - Bug nel codice (FunctionClauseError, MatchError)
  # - Dipendenze irraggiungibili (db crash totale)
  # - Situazioni che non dovrebbero mai accadere
  # Il Supervisor le gestisce riavviando il processo
end

Registrace procesu: Přes a Registr

# Registrazione con nome atom globale (semplice ma unico per nodo)
GenServer.start_link(MyServer, args, name: :global_name)

# {:via, module, key}: Registry distribuito (piu' flessibile)
defmodule MyApp.UserSession do
  use GenServer

  # Avvia con via Registry per supportare piu' istanze
  def start_link(user_id) do
    GenServer.start_link(
      __MODULE__,
      %{user_id: user_id},
      name: via_tuple(user_id)
    )
  end

  # Helper per costruire il via tuple
  defp via_tuple(user_id) do
    {:via, Registry, {MyApp.Registry, "user_session:#{user_id}"}}
  end

  # Client API usa via_tuple invece del PID
  def get_session(user_id) do
    GenServer.call(via_tuple(user_id), :get_session)
  end

  def update_session(user_id, updates) do
    GenServer.cast(via_tuple(user_id), {:update, updates})
  end

  # Server callbacks
  @impl true
  def init(state), do: {:ok, state}

  @impl true
  def handle_call(:get_session, _from, state) do
    {:reply, state, state}
  end

  @impl true
  def handle_cast({:update, updates}, state) do
    {:noreply, Map.merge(state, updates)}
  end
end

# Setup in application.ex
# children = [
#   {Registry, keys: :unique, name: MyApp.Registry},
#   ...
# ]

# Uso
{:ok, _} = MyApp.UserSession.start_link("user-1001")
MyApp.UserSession.get_session("user-1001")
# %{user_id: "user-1001"}

MyApp.UserSession.update_session("user-1001", %{last_seen: DateTime.utc_now()})

GenServer s :via a DynamicSupervisor

# Application setup per piu' GenServer dello stesso tipo
defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # Registry per lookup per chiave
      {Registry, keys: :unique, name: MyApp.SessionRegistry},

      # DynamicSupervisor: supervisore per figli creati dinamicamente
      {DynamicSupervisor, name: MyApp.SessionSupervisor, strategy: :one_for_one},
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

# Avvia sessioni on-demand
defmodule MyApp.SessionManager do
  def start_session(user_id) do
    spec = {MyApp.UserSession, user_id}
    DynamicSupervisor.start_child(MyApp.SessionSupervisor, spec)
  end

  def stop_session(user_id) do
    case Registry.lookup(MyApp.SessionRegistry, "user_session:#{user_id}") do
      [{pid, _}] -> DynamicSupervisor.terminate_child(MyApp.SessionSupervisor, pid)
      [] -> {:error, :not_found}
    end
  end

  def active_sessions do
    DynamicSupervisor.which_children(MyApp.SessionSupervisor)
    |> length()
  end
end

Testování GenServeru

# Test di un GenServer con ExUnit
defmodule MyApp.CounterTest do
  use ExUnit.Case, async: true

  setup do
    # Avvia una nuova istanza per ogni test (senza nome, usa PID)
    {:ok, pid} = GenServer.start_link(MyApp.Counter, 0)
    {:ok, counter: pid}
  end

  test "starts at 0", %{counter: pid} do
    assert GenServer.call(pid, :get_value) == 0
  end

  test "increments correctly", %{counter: pid} do
    GenServer.call(pid, {:increment, 5})
    GenServer.call(pid, {:increment, 3})
    assert GenServer.call(pid, :get_value) == 8
  end

  test "resets to zero", %{counter: pid} do
    GenServer.call(pid, {:increment, 10})
    GenServer.cast(pid, :reset)
    # Piccola attesa per il cast asincrono
    :timer.sleep(10)
    assert GenServer.call(pid, :get_value) == 0
  end

  test "handles concurrent increments safely", %{counter: pid} do
    # Spawna 100 processi che incrementano contemporaneamente
    tasks = for _ <- 1..100 do
      Task.async(fn ->
        GenServer.call(pid, {:increment, 1})
      end)
    end
    Enum.each(tasks, &Task.await/1)

    assert GenServer.call(pid, :get_value) == 100
  end
end

Závěry

GenServer je místo, kde se teorie procesů Elixir setkává praxe. Chování OTP poskytuje strukturu, konvence a integraci se stromem dohledu. handle_call pro synchronicity, handle_cast pro fire-and-forget, handle_info pro externí zprávy — tato tři zpětná volání pokrývají 95 % případů použití. „Nech to havarovat“ není lenost: je filozofie, která nám umožňuje budovat odolné systémy bez znečišťování obchodní kód s defenzivním programováním pro každý okrajový případ. Následující článek to posouvá na další úroveň se supervizory.

Připravované články ze série Elixír

  • Článek 4: Stromy supervizorů – strategie one_for_one, one_for_all a rest_for_one
  • Článek 5: Ecto — Composable Query and Schema Mapping for PostgreSQL
  • Článek 6: Phoenix Framework — Router, Controller, View a JSON API