BEAMコンペティションモデル

従来の言語の同時実行性はオペレーティング システムのスレッドに依存します 記憶を共有する人たち。このモデルは必然的に競合状態を引き起こします。 デッドロックや再現が難しいバグ。言語はミューテックス、セマフォ、 ロック — 共有アクセスを調整するためのメカニズム。

BEAM は、これまでとはまったく異なるアプローチ、つまりアクター モデルを選択しました。プロセス BEAM の多くは通信する完全に孤立したエンティティです 独占的に 非同期メッセージ経由。共有メモリはありません。ロックはありません。 競合状態はプログラマの規律によってではなく、設計によって発生することはありません。

何を学ぶか

  • BEAMプロセスとOSスレッドの違い
  • spawn と spawn_link: 分離されたプロセスを作成します
  • 送受信: プロセス間で受け渡されるメッセージ
  • メールボックス: 各プロセスのメッセージキュー
  • Process.monitor によるプロセス監視
  • Self() と pid: プロセスの ID
  • タスク: 同時生成のための高レベルの抽象化

BEAM プロセスとスレッド OS

BEAM プロセスはオペレーティング システムのスレッドではありません。スケジューラーです VM に Elixir レベルが実装されています。各 Erlang/Elixir ノードは通常 1 つのスレッドを使用します CPU コアごとの OS ですが、数十万 (または数百万) のプロセスをスケジュールできます それらのスレッドにビームします。

# Dimensioni a confronto (approssimative)

# Thread OS (Linux):
# - Stack: 8MB default
# - Context switch: ~1-10 microsecond (syscall kernel)
# - Max per processo: centinaia

# Processo BEAM:
# - Stack iniziale: ~300 parole (pochi KB, cresce dinamicamente)
# - Context switch: sub-microsecondo (nello spazio utente)
# - Max per nodo: milioni (default: 262.144, configurabile)

# Verificare i limiti:
iex> :erlang.system_info(:process_limit)
# 262144 (default, configurabile con +P flag)

iex> :erlang.system_info(:process_count)
# Numero di processi attualmente in esecuzione

# BEAM usa preemptive scheduling basato su "reduction counts"
# Ogni processo ha un budget di "reduction" (operazioni base)
# Quando esaurito, lo scheduler puo' passare a un altro processo
# Questo garantisce fairness anche con processi CPU-intensive

spawn: プロセスの作成

# spawn: crea un processo che esegue una funzione anonima
pid = spawn(fn ->
  IO.puts("Sono un processo! PID: #{inspect(self())}")
  :timer.sleep(1000)
  IO.puts("Fine processo")
end)

IO.puts("Processo padre, PID figlio: #{inspect(pid)}")

# Il padre continua senza aspettare il figlio
# Output (ordine non garantito):
# Processo padre, PID figlio: #PID<0.108.0>
# Sono un processo! PID: #PID<0.108.0>
# Fine processo

# spawn con modulo e funzione
defmodule Worker do
  def run(task) do
    IO.puts("Worker #{inspect(self())} processing: #{task}")
    :timer.sleep(100)
    {:done, task}
  end
end

pid = spawn(Worker, :run, ["task-1"])

# self(): PID del processo corrente
IO.puts("Main process: #{inspect(self())}")

# Verifica se un processo e' ancora in vita
Process.alive?(pid)    # true o false

送受信: メッセージパッシング

プロセスは、メールボックスに送信されるメッセージを通じて伝達されます。メールボックスは プロセスごとに 1 つの FIFO キュー。 receive 一致する最初のメッセージを抽出します パターンの 1 つ — 一致するメッセージがない場合、プロセスは待機を一時停止します。

# Comunicazione padre-figlio tramite messaggi
defmodule PingPong do
  def start do
    parent = self()

    # Crea processo figlio passando il PID del padre
    child = spawn(fn -> child_loop(parent) end)

    # Invia messaggio al figlio
    send(child, {:ping, "hello"})

    # Ricevi risposta
    receive do
      {:pong, message} ->
        IO.puts("Parent received: #{message}")

      {:error, reason} ->
        IO.puts("Error: #{reason}")

    after 5000 ->
      IO.puts("Timeout: no response in 5 seconds")
    end
  end

  defp child_loop(parent_pid) do
    receive do
      {:ping, message} ->
        IO.puts("Child received: #{message}")
        send(parent_pid, {:pong, "world"})
        child_loop(parent_pid)  # Ricomincia il ciclo

      :stop ->
        IO.puts("Child stopping")
        # Il processo termina quando la funzione ritorna
    end
  end
end

PingPong.start()
# Child received: hello
# Parent received: world
# Pattern: processo server con stato via ricorsione
defmodule Counter do
  def start(initial_value \\ 0) do
    spawn(__MODULE__, :loop, [initial_value])
  end

  def increment(pid), do: send(pid, {:increment, 1})
  def add(pid, n), do: send(pid, {:increment, n})

  def get(pid) do
    send(pid, {:get, self()})
    receive do
      {:value, n} -> n
    after 5000 -> {:error, :timeout}
    end
  end

  def stop(pid), do: send(pid, :stop)

  # Loop interno: mantiene lo stato tramite ricorsione di coda
  def loop(count) do
    receive do
      {:increment, n} ->
        loop(count + n)     # Ricorsione: nuovo stato

      {:get, caller} ->
        send(caller, {:value, count})
        loop(count)         # Stato invariato

      :stop ->
        IO.puts("Counter stopping at #{count}")
        # Non ricorre: il processo termina
    end
  end
end

# Uso
counter = Counter.start(0)

Counter.increment(counter)
Counter.increment(counter)
Counter.add(counter, 10)

IO.puts(Counter.get(counter))   # 12

Counter.stop(counter)

spawn_link とクラッシュの伝播

spawn_link 子プロセスを作成し、 リンク 双方向: 2 つのプロセスのうちの 1 つがクラッシュすると、終了シグナルが伝播されます。 もう一方へ。これは OTP の「クラッシュさせましょう」原則の基礎です。

# spawn vs spawn_link
defmodule CrashDemo do
  def demo_spawn do
    # spawn: il crash del figlio NON impatta il padre
    child = spawn(fn ->
      :timer.sleep(100)
      raise "Crash nel figlio!"
    end)

    IO.puts("Padre #{inspect(self())} vivo, figlio: #{inspect(child)}")
    :timer.sleep(500)
    IO.puts("Padre ancora vivo nonostante crash figlio")
    # Process.alive?(child) == false (il figlio e' morto)
  end

  def demo_spawn_link do
    parent = self()

    # spawn_link: il crash del figlio PROPAGA al padre
    # Il padre riceve un exit signal {:EXIT, child_pid, reason}
    child = spawn_link(fn ->
      :timer.sleep(100)
      raise "Crash nel figlio con link!"
    end)

    IO.puts("Padre #{inspect(self())} vivo, figlio: #{inspect(child)}")
    :timer.sleep(500)
    # Se il padre non e' un Supervisor, anche lui crashera'
    # Questo e' intenzionale: il fallimento si propaga verso l'alto
    # finche' un Supervisor non lo gestisce
  end

  def demo_trap_exit do
    # trap_exit: intercetta gli exit signal invece di crashare
    Process.flag(:trap_exit, true)

    child = spawn_link(fn ->
      :timer.sleep(100)
      raise "Crash!"
    end)

    receive do
      {:EXIT, ^child, reason} ->
        IO.puts("Figlio crashato: #{inspect(reason)}")
        # Il padre puo' decidere come gestire il crash
    end
  end
end

Process.monitor: 一方向監視

# Process.monitor: ricevi notifica quando un processo muore
# (senza il link bidirezionale di spawn_link)
defmodule ProcessMonitor do
  def watch(target_pid) do
    ref = Process.monitor(target_pid)
    IO.puts("Monitoring #{inspect(target_pid)}, ref: #{inspect(ref)}")

    receive do
      {:DOWN, ^ref, :process, ^target_pid, reason} ->
        IO.puts("Process #{inspect(target_pid)} down: #{inspect(reason)}")

    after 10_000 ->
      IO.puts("Timeout: process still running after 10s")
      Process.demonitor(ref)
    end
  end
end

# Crea un processo che si ferma dopo 500ms
worker = spawn(fn ->
  :timer.sleep(500)
  IO.puts("Worker done")
end)

# Monitora in background
spawn(fn -> ProcessMonitor.watch(worker) end)

# Attendi
:timer.sleep(1000)
# Output:
# Worker done
# Process #PID<...> down: :normal

タスク: 高レベルの抽象化

フォーム Task より便利な抽象化を提供します spawn 一般的なパターンの場合: 非同期操作を実行し、結果を待ちます。

# Task.async e Task.await: parallelismo strutturato
defmodule ParallelFetcher do
  def fetch_all(urls) do
    urls
    |> Enum.map(fn url ->
      # Lancia un task per ogni URL in parallelo
      Task.async(fn -> fetch(url) end)
    end)
    |> Enum.map(fn task ->
      # Attende il risultato di ciascun task (max 5 secondi)
      Task.await(task, 5_000)
    end)
  end

  defp fetch(url) do
    # Simula una chiamata HTTP
    :timer.sleep(Enum.random(100..500))
    {:ok, "Response from #{url}"}
  end
end

# 3 richieste in parallelo: tempo totale ~max(latenze)
urls = ["https://api1.example.com", "https://api2.example.com", "https://api3.example.com"]
results = ParallelFetcher.fetch_all(urls)
# [{:ok, "Response from ..."}, ...]

# Task.async_stream: parallelo con concorrenza limitata
def process_items(items, max_concurrency \\ 10) do
  items
  |> Task.async_stream(
    fn item -> expensive_operation(item) end,
    max_concurrency: max_concurrency,
    timeout: 10_000,
    on_timeout: :kill_task,
  )
  |> Enum.map(fn
    {:ok, result} -> result
    {:exit, reason} ->
      IO.puts("Task failed: #{inspect(reason)}")
      nil
  end)
  |> Enum.reject(&is_nil/1)
end

# Task.start: fire-and-forget (non attendi il risultato)
Task.start(fn ->
  send_notification_email(user)  # Background job, non bloccante
end)

プロセス レジストリ: プロセスの名前付け

# Process Registry: registra un processo con un nome atom
defmodule NamedCounter do
  def start do
    pid = spawn(__MODULE__, :loop, [0])
    # Registra con nome globale (atom)
    Process.register(pid, :main_counter)
    pid
  end

  def increment do
    # Invia a nome simbolico invece di PID
    send(:main_counter, {:increment, 1})
  end

  def get do
    send(:main_counter, {:get, self()})
    receive do
      {:value, n} -> n
    after 1_000 -> nil
    end
  end

  def loop(n) do
    receive do
      {:increment, delta} -> loop(n + delta)
      {:get, caller} ->
        send(caller, {:value, n})
        loop(n)
    end
  end
end

NamedCounter.start()
NamedCounter.increment()
NamedCounter.increment()
IO.puts(NamedCounter.get())  # 2

# Trova il PID da un nome registrato
pid = Process.whereis(:main_counter)
IO.puts(inspect(pid))   # #PID<0.120.0>

結論

BEAM の俳優モデルが Elixir をユニークなものにしています。 各プロセスは、独自のメモリとメールボックスを備えた分離されたユニバースです。 数百万のプロセスを生成するのは低コストです。メッセージによるコミュニケーション 明示的な同期の必要がなくなります。クラッシュの伝播 via link は、OTP がフォールト トレランスを構築するメカニズムです。 次の記事では、基本的な構成要素である GenServer について説明します。 これらすべてをより高い抽象レベルに引き上げます。

Elixir シリーズの今後の記事

  • 第3条: OTP と GenServer — フォールト トレランスが組み込まれた分散状態
  • 第4条: スーパーバイザー ツリー — 決して死なないシステムの設計
  • 第5条: Ecto — PostgreSQL のコンポーズ可能なクエリとスキーマ マッピング