BEAM および Elixir プロセス: 共有スレッドを使用しない大規模な同時実行性
BEAM は、何百万もの独立した軽量プロセスを実行できます。各プロセスには、 独自のヒープであり、メッセージ経由でのみ通信し、状態を共有しません。理由を調べてください このパターンは、設計とそれが実際の Elixir コードにどのように変換されるかによって競合状態を排除します。
BEAMコンペティションモデル
従来の言語の同時実行性はオペレーティング システムのスレッドに依存します 記憶を共有する人たち。このモデルは必然的に競合状態を引き起こします。 デッドロックや再現が難しいバグ。言語はミューテックス、セマフォ、 ロック — 共有アクセスを調整するためのメカニズム。
BEAM は、これまでとはまったく異なるアプローチ、つまりアクター モデルを選択しました。プロセス BEAM の多くは通信する完全に孤立したエンティティです 独占的に 非同期メッセージ経由。共有メモリはありません。ロックはありません。 競合状態はプログラマの規律によってではなく、設計によって発生することはありません。
何を学ぶか
- BEAMプロセスとOSスレッドの違い
- spawn と spawn_link: 分離されたプロセスを作成します
- 送受信: プロセス間で受け渡されるメッセージ
- メールボックス: 各プロセスのメッセージキュー
- Process.monitor によるプロセス監視
- Self() と pid: プロセスの ID
- タスク: 同時生成のための高レベルの抽象化
BEAM プロセスとスレッド OS
BEAM プロセスはオペレーティング システムのスレッドではありません。スケジューラーです VM に Elixir レベルが実装されています。各 Erlang/Elixir ノードは通常 1 つのスレッドを使用します CPU コアごとの OS ですが、数十万 (または数百万) のプロセスをスケジュールできます それらのスレッドにビームします。
# Dimensioni a confronto (approssimative)
# Thread OS (Linux):
# - Stack: 8MB default
# - Context switch: ~1-10 microsecond (syscall kernel)
# - Max per processo: centinaia
# Processo BEAM:
# - Stack iniziale: ~300 parole (pochi KB, cresce dinamicamente)
# - Context switch: sub-microsecondo (nello spazio utente)
# - Max per nodo: milioni (default: 262.144, configurabile)
# Verificare i limiti:
iex> :erlang.system_info(:process_limit)
# 262144 (default, configurabile con +P flag)
iex> :erlang.system_info(:process_count)
# Numero di processi attualmente in esecuzione
# BEAM usa preemptive scheduling basato su "reduction counts"
# Ogni processo ha un budget di "reduction" (operazioni base)
# Quando esaurito, lo scheduler puo' passare a un altro processo
# Questo garantisce fairness anche con processi CPU-intensive
spawn: プロセスの作成
# spawn: crea un processo che esegue una funzione anonima
pid = spawn(fn ->
IO.puts("Sono un processo! PID: #{inspect(self())}")
:timer.sleep(1000)
IO.puts("Fine processo")
end)
IO.puts("Processo padre, PID figlio: #{inspect(pid)}")
# Il padre continua senza aspettare il figlio
# Output (ordine non garantito):
# Processo padre, PID figlio: #PID<0.108.0>
# Sono un processo! PID: #PID<0.108.0>
# Fine processo
# spawn con modulo e funzione
defmodule Worker do
def run(task) do
IO.puts("Worker #{inspect(self())} processing: #{task}")
:timer.sleep(100)
{:done, task}
end
end
pid = spawn(Worker, :run, ["task-1"])
# self(): PID del processo corrente
IO.puts("Main process: #{inspect(self())}")
# Verifica se un processo e' ancora in vita
Process.alive?(pid) # true o false
送受信: メッセージパッシング
プロセスは、メールボックスに送信されるメッセージを通じて伝達されます。メールボックスは
プロセスごとに 1 つの FIFO キュー。 receive 一致する最初のメッセージを抽出します
パターンの 1 つ — 一致するメッセージがない場合、プロセスは待機を一時停止します。
# Comunicazione padre-figlio tramite messaggi
defmodule PingPong do
def start do
parent = self()
# Crea processo figlio passando il PID del padre
child = spawn(fn -> child_loop(parent) end)
# Invia messaggio al figlio
send(child, {:ping, "hello"})
# Ricevi risposta
receive do
{:pong, message} ->
IO.puts("Parent received: #{message}")
{:error, reason} ->
IO.puts("Error: #{reason}")
after 5000 ->
IO.puts("Timeout: no response in 5 seconds")
end
end
defp child_loop(parent_pid) do
receive do
{:ping, message} ->
IO.puts("Child received: #{message}")
send(parent_pid, {:pong, "world"})
child_loop(parent_pid) # Ricomincia il ciclo
:stop ->
IO.puts("Child stopping")
# Il processo termina quando la funzione ritorna
end
end
end
PingPong.start()
# Child received: hello
# Parent received: world
# Pattern: processo server con stato via ricorsione
defmodule Counter do
def start(initial_value \\ 0) do
spawn(__MODULE__, :loop, [initial_value])
end
def increment(pid), do: send(pid, {:increment, 1})
def add(pid, n), do: send(pid, {:increment, n})
def get(pid) do
send(pid, {:get, self()})
receive do
{:value, n} -> n
after 5000 -> {:error, :timeout}
end
end
def stop(pid), do: send(pid, :stop)
# Loop interno: mantiene lo stato tramite ricorsione di coda
def loop(count) do
receive do
{:increment, n} ->
loop(count + n) # Ricorsione: nuovo stato
{:get, caller} ->
send(caller, {:value, count})
loop(count) # Stato invariato
:stop ->
IO.puts("Counter stopping at #{count}")
# Non ricorre: il processo termina
end
end
end
# Uso
counter = Counter.start(0)
Counter.increment(counter)
Counter.increment(counter)
Counter.add(counter, 10)
IO.puts(Counter.get(counter)) # 12
Counter.stop(counter)
spawn_link とクラッシュの伝播
spawn_link 子プロセスを作成し、 リンク
双方向: 2 つのプロセスのうちの 1 つがクラッシュすると、終了シグナルが伝播されます。
もう一方へ。これは OTP の「クラッシュさせましょう」原則の基礎です。
# spawn vs spawn_link
defmodule CrashDemo do
def demo_spawn do
# spawn: il crash del figlio NON impatta il padre
child = spawn(fn ->
:timer.sleep(100)
raise "Crash nel figlio!"
end)
IO.puts("Padre #{inspect(self())} vivo, figlio: #{inspect(child)}")
:timer.sleep(500)
IO.puts("Padre ancora vivo nonostante crash figlio")
# Process.alive?(child) == false (il figlio e' morto)
end
def demo_spawn_link do
parent = self()
# spawn_link: il crash del figlio PROPAGA al padre
# Il padre riceve un exit signal {:EXIT, child_pid, reason}
child = spawn_link(fn ->
:timer.sleep(100)
raise "Crash nel figlio con link!"
end)
IO.puts("Padre #{inspect(self())} vivo, figlio: #{inspect(child)}")
:timer.sleep(500)
# Se il padre non e' un Supervisor, anche lui crashera'
# Questo e' intenzionale: il fallimento si propaga verso l'alto
# finche' un Supervisor non lo gestisce
end
def demo_trap_exit do
# trap_exit: intercetta gli exit signal invece di crashare
Process.flag(:trap_exit, true)
child = spawn_link(fn ->
:timer.sleep(100)
raise "Crash!"
end)
receive do
{:EXIT, ^child, reason} ->
IO.puts("Figlio crashato: #{inspect(reason)}")
# Il padre puo' decidere come gestire il crash
end
end
end
Process.monitor: 一方向監視
# Process.monitor: ricevi notifica quando un processo muore
# (senza il link bidirezionale di spawn_link)
defmodule ProcessMonitor do
def watch(target_pid) do
ref = Process.monitor(target_pid)
IO.puts("Monitoring #{inspect(target_pid)}, ref: #{inspect(ref)}")
receive do
{:DOWN, ^ref, :process, ^target_pid, reason} ->
IO.puts("Process #{inspect(target_pid)} down: #{inspect(reason)}")
after 10_000 ->
IO.puts("Timeout: process still running after 10s")
Process.demonitor(ref)
end
end
end
# Crea un processo che si ferma dopo 500ms
worker = spawn(fn ->
:timer.sleep(500)
IO.puts("Worker done")
end)
# Monitora in background
spawn(fn -> ProcessMonitor.watch(worker) end)
# Attendi
:timer.sleep(1000)
# Output:
# Worker done
# Process #PID<...> down: :normal
タスク: 高レベルの抽象化
フォーム Task より便利な抽象化を提供します spawn
一般的なパターンの場合: 非同期操作を実行し、結果を待ちます。
# Task.async e Task.await: parallelismo strutturato
defmodule ParallelFetcher do
def fetch_all(urls) do
urls
|> Enum.map(fn url ->
# Lancia un task per ogni URL in parallelo
Task.async(fn -> fetch(url) end)
end)
|> Enum.map(fn task ->
# Attende il risultato di ciascun task (max 5 secondi)
Task.await(task, 5_000)
end)
end
defp fetch(url) do
# Simula una chiamata HTTP
:timer.sleep(Enum.random(100..500))
{:ok, "Response from #{url}"}
end
end
# 3 richieste in parallelo: tempo totale ~max(latenze)
urls = ["https://api1.example.com", "https://api2.example.com", "https://api3.example.com"]
results = ParallelFetcher.fetch_all(urls)
# [{:ok, "Response from ..."}, ...]
# Task.async_stream: parallelo con concorrenza limitata
def process_items(items, max_concurrency \\ 10) do
items
|> Task.async_stream(
fn item -> expensive_operation(item) end,
max_concurrency: max_concurrency,
timeout: 10_000,
on_timeout: :kill_task,
)
|> Enum.map(fn
{:ok, result} -> result
{:exit, reason} ->
IO.puts("Task failed: #{inspect(reason)}")
nil
end)
|> Enum.reject(&is_nil/1)
end
# Task.start: fire-and-forget (non attendi il risultato)
Task.start(fn ->
send_notification_email(user) # Background job, non bloccante
end)
プロセス レジストリ: プロセスの名前付け
# Process Registry: registra un processo con un nome atom
defmodule NamedCounter do
def start do
pid = spawn(__MODULE__, :loop, [0])
# Registra con nome globale (atom)
Process.register(pid, :main_counter)
pid
end
def increment do
# Invia a nome simbolico invece di PID
send(:main_counter, {:increment, 1})
end
def get do
send(:main_counter, {:get, self()})
receive do
{:value, n} -> n
after 1_000 -> nil
end
end
def loop(n) do
receive do
{:increment, delta} -> loop(n + delta)
{:get, caller} ->
send(caller, {:value, n})
loop(n)
end
end
end
NamedCounter.start()
NamedCounter.increment()
NamedCounter.increment()
IO.puts(NamedCounter.get()) # 2
# Trova il PID da un nome registrato
pid = Process.whereis(:main_counter)
IO.puts(inspect(pid)) # #PID<0.120.0>
結論
BEAM の俳優モデルが Elixir をユニークなものにしています。 各プロセスは、独自のメモリとメールボックスを備えた分離されたユニバースです。 数百万のプロセスを生成するのは低コストです。メッセージによるコミュニケーション 明示的な同期の必要がなくなります。クラッシュの伝播 via link は、OTP がフォールト トレランスを構築するメカニズムです。 次の記事では、基本的な構成要素である GenServer について説明します。 これらすべてをより高い抽象レベルに引き上げます。
Elixir シリーズの今後の記事
- 第3条: OTP と GenServer — フォールト トレランスが組み込まれた分散状態
- 第4条: スーパーバイザー ツリー — 決して死なないシステムの設計
- 第5条: Ecto — PostgreSQL のコンポーズ可能なクエリとスキーマ マッピング







