BEAM 경쟁 모델

기존 언어의 동시성은 운영 체제 스레드에 의존합니다. 추억을 공유하는 사람. 이 모델은 필연적으로 경쟁 조건을 초래합니다. 재현하기 어려운 교착 상태 및 버그. 언어는 뮤텍스, 세마포어, 잠금 — 공유 액세스를 조정하는 메커니즘입니다.

BEAM은 근본적으로 다른 접근 방식인 액터 모델을 선택했습니다. 프로세스 BEAM은 통신하는 완전히 고립된 개체입니다. 독점적으로 비동기 메시지를 통해. 공유 메모리가 없습니다. 자물쇠가 없습니다. 경쟁 조건은 프로그래머 규율이 아닌 의도적으로 가능하지 않습니다.

무엇을 배울 것인가

  • BEAM 프로세스와 OS 스레드의 차이점
  • 스폰 및 스폰_링크: 격리된 프로세스 생성
  • 보내기 및 받기: 프로세스 간 메시지 전달
  • 메일박스: 각 프로세스의 메시지 큐
  • Process.monitor를 통한 프로세스 모니터링
  • Self() 및 pid: 프로세스의 ID
  • 작업: 동시 생성을 위한 높은 수준의 추상화

BEAM 프로세스와 스레드 OS

BEAM 프로세스는 운영 체제 스레드가 아닙니다. 스케줄러입니다 VM에 Elixir 수준이 구현되었습니다. 각 Erlang/Elixir 노드는 일반적으로 하나의 스레드를 사용합니다. CPU 코어당 OS이지만 수십만(또는 수백만) 개의 프로세스를 예약할 수 있습니다. 해당 스레드의 BEAM.

# Dimensioni a confronto (approssimative)

# Thread OS (Linux):
# - Stack: 8MB default
# - Context switch: ~1-10 microsecond (syscall kernel)
# - Max per processo: centinaia

# Processo BEAM:
# - Stack iniziale: ~300 parole (pochi KB, cresce dinamicamente)
# - Context switch: sub-microsecondo (nello spazio utente)
# - Max per nodo: milioni (default: 262.144, configurabile)

# Verificare i limiti:
iex> :erlang.system_info(:process_limit)
# 262144 (default, configurabile con +P flag)

iex> :erlang.system_info(:process_count)
# Numero di processi attualmente in esecuzione

# BEAM usa preemptive scheduling basato su "reduction counts"
# Ogni processo ha un budget di "reduction" (operazioni base)
# Quando esaurito, lo scheduler puo' passare a un altro processo
# Questo garantisce fairness anche con processi CPU-intensive

생성: 프로세스 생성

# spawn: crea un processo che esegue una funzione anonima
pid = spawn(fn ->
  IO.puts("Sono un processo! PID: #{inspect(self())}")
  :timer.sleep(1000)
  IO.puts("Fine processo")
end)

IO.puts("Processo padre, PID figlio: #{inspect(pid)}")

# Il padre continua senza aspettare il figlio
# Output (ordine non garantito):
# Processo padre, PID figlio: #PID<0.108.0>
# Sono un processo! PID: #PID<0.108.0>
# Fine processo

# spawn con modulo e funzione
defmodule Worker do
  def run(task) do
    IO.puts("Worker #{inspect(self())} processing: #{task}")
    :timer.sleep(100)
    {:done, task}
  end
end

pid = spawn(Worker, :run, ["task-1"])

# self(): PID del processo corrente
IO.puts("Main process: #{inspect(self())}")

# Verifica se un processo e' ancora in vita
Process.alive?(pid)    # true o false

보내고 받기: 메시지 전달

프로세스는 사서함으로 전송된 메시지를 통해 전달됩니다. 우편함은 프로세스당 하나의 FIFO 대기열. receive 일치하는 첫 번째 메시지를 추출합니다. 패턴 중 하나 - 일치하는 메시지가 없으면 프로세스가 대기를 일시 중지합니다.

# Comunicazione padre-figlio tramite messaggi
defmodule PingPong do
  def start do
    parent = self()

    # Crea processo figlio passando il PID del padre
    child = spawn(fn -> child_loop(parent) end)

    # Invia messaggio al figlio
    send(child, {:ping, "hello"})

    # Ricevi risposta
    receive do
      {:pong, message} ->
        IO.puts("Parent received: #{message}")

      {:error, reason} ->
        IO.puts("Error: #{reason}")

    after 5000 ->
      IO.puts("Timeout: no response in 5 seconds")
    end
  end

  defp child_loop(parent_pid) do
    receive do
      {:ping, message} ->
        IO.puts("Child received: #{message}")
        send(parent_pid, {:pong, "world"})
        child_loop(parent_pid)  # Ricomincia il ciclo

      :stop ->
        IO.puts("Child stopping")
        # Il processo termina quando la funzione ritorna
    end
  end
end

PingPong.start()
# Child received: hello
# Parent received: world
# Pattern: processo server con stato via ricorsione
defmodule Counter do
  def start(initial_value \\ 0) do
    spawn(__MODULE__, :loop, [initial_value])
  end

  def increment(pid), do: send(pid, {:increment, 1})
  def add(pid, n), do: send(pid, {:increment, n})

  def get(pid) do
    send(pid, {:get, self()})
    receive do
      {:value, n} -> n
    after 5000 -> {:error, :timeout}
    end
  end

  def stop(pid), do: send(pid, :stop)

  # Loop interno: mantiene lo stato tramite ricorsione di coda
  def loop(count) do
    receive do
      {:increment, n} ->
        loop(count + n)     # Ricorsione: nuovo stato

      {:get, caller} ->
        send(caller, {:value, count})
        loop(count)         # Stato invariato

      :stop ->
        IO.puts("Counter stopping at #{count}")
        # Non ricorre: il processo termina
    end
  end
end

# Uso
counter = Counter.start(0)

Counter.increment(counter)
Counter.increment(counter)
Counter.add(counter, 10)

IO.puts(Counter.get(counter))   # 12

Counter.stop(counter)

spawn_link 및 충돌 전파

spawn_link 자식 프로세스를 생성하고 링크 양방향: 두 프로세스 중 하나가 충돌하면 종료 신호가 전파됩니다. 다른 사람에게. 이것이 OTP의 "Let it crash" 원칙의 기초입니다.

# spawn vs spawn_link
defmodule CrashDemo do
  def demo_spawn do
    # spawn: il crash del figlio NON impatta il padre
    child = spawn(fn ->
      :timer.sleep(100)
      raise "Crash nel figlio!"
    end)

    IO.puts("Padre #{inspect(self())} vivo, figlio: #{inspect(child)}")
    :timer.sleep(500)
    IO.puts("Padre ancora vivo nonostante crash figlio")
    # Process.alive?(child) == false (il figlio e' morto)
  end

  def demo_spawn_link do
    parent = self()

    # spawn_link: il crash del figlio PROPAGA al padre
    # Il padre riceve un exit signal {:EXIT, child_pid, reason}
    child = spawn_link(fn ->
      :timer.sleep(100)
      raise "Crash nel figlio con link!"
    end)

    IO.puts("Padre #{inspect(self())} vivo, figlio: #{inspect(child)}")
    :timer.sleep(500)
    # Se il padre non e' un Supervisor, anche lui crashera'
    # Questo e' intenzionale: il fallimento si propaga verso l'alto
    # finche' un Supervisor non lo gestisce
  end

  def demo_trap_exit do
    # trap_exit: intercetta gli exit signal invece di crashare
    Process.flag(:trap_exit, true)

    child = spawn_link(fn ->
      :timer.sleep(100)
      raise "Crash!"
    end)

    receive do
      {:EXIT, ^child, reason} ->
        IO.puts("Figlio crashato: #{inspect(reason)}")
        # Il padre puo' decidere come gestire il crash
    end
  end
end

Process.monitor: 단방향 모니터링

# Process.monitor: ricevi notifica quando un processo muore
# (senza il link bidirezionale di spawn_link)
defmodule ProcessMonitor do
  def watch(target_pid) do
    ref = Process.monitor(target_pid)
    IO.puts("Monitoring #{inspect(target_pid)}, ref: #{inspect(ref)}")

    receive do
      {:DOWN, ^ref, :process, ^target_pid, reason} ->
        IO.puts("Process #{inspect(target_pid)} down: #{inspect(reason)}")

    after 10_000 ->
      IO.puts("Timeout: process still running after 10s")
      Process.demonitor(ref)
    end
  end
end

# Crea un processo che si ferma dopo 500ms
worker = spawn(fn ->
  :timer.sleep(500)
  IO.puts("Worker done")
end)

# Monitora in background
spawn(fn -> ProcessMonitor.watch(worker) end)

# Attendi
:timer.sleep(1000)
# Output:
# Worker done
# Process #PID<...> down: :normal

작업: 높은 수준의 추상화

형태 Task 보다 편리한 추상화를 제공합니다. spawn 일반적인 패턴의 경우: 비동기 작업을 수행하고 결과를 기다립니다.

# Task.async e Task.await: parallelismo strutturato
defmodule ParallelFetcher do
  def fetch_all(urls) do
    urls
    |> Enum.map(fn url ->
      # Lancia un task per ogni URL in parallelo
      Task.async(fn -> fetch(url) end)
    end)
    |> Enum.map(fn task ->
      # Attende il risultato di ciascun task (max 5 secondi)
      Task.await(task, 5_000)
    end)
  end

  defp fetch(url) do
    # Simula una chiamata HTTP
    :timer.sleep(Enum.random(100..500))
    {:ok, "Response from #{url}"}
  end
end

# 3 richieste in parallelo: tempo totale ~max(latenze)
urls = ["https://api1.example.com", "https://api2.example.com", "https://api3.example.com"]
results = ParallelFetcher.fetch_all(urls)
# [{:ok, "Response from ..."}, ...]

# Task.async_stream: parallelo con concorrenza limitata
def process_items(items, max_concurrency \\ 10) do
  items
  |> Task.async_stream(
    fn item -> expensive_operation(item) end,
    max_concurrency: max_concurrency,
    timeout: 10_000,
    on_timeout: :kill_task,
  )
  |> Enum.map(fn
    {:ok, result} -> result
    {:exit, reason} ->
      IO.puts("Task failed: #{inspect(reason)}")
      nil
  end)
  |> Enum.reject(&is_nil/1)
end

# Task.start: fire-and-forget (non attendi il risultato)
Task.start(fn ->
  send_notification_email(user)  # Background job, non bloccante
end)

프로세스 레지스트리: 프로세스 이름 지정

# Process Registry: registra un processo con un nome atom
defmodule NamedCounter do
  def start do
    pid = spawn(__MODULE__, :loop, [0])
    # Registra con nome globale (atom)
    Process.register(pid, :main_counter)
    pid
  end

  def increment do
    # Invia a nome simbolico invece di PID
    send(:main_counter, {:increment, 1})
  end

  def get do
    send(:main_counter, {:get, self()})
    receive do
      {:value, n} -> n
    after 1_000 -> nil
    end
  end

  def loop(n) do
    receive do
      {:increment, delta} -> loop(n + delta)
      {:get, caller} ->
        send(caller, {:value, n})
        loop(n)
    end
  end
end

NamedCounter.start()
NamedCounter.increment()
NamedCounter.increment()
IO.puts(NamedCounter.get())  # 2

# Trova il PID da un nome registrato
pid = Process.whereis(:main_counter)
IO.puts(inspect(pid))   # #PID<0.120.0>

결론

BEAM의 액터 모델은 Elixir를 독특하게 만드는 요소입니다. 각 프로세스는 자체 메모리와 메일박스를 갖춘 격리된 우주입니다. 수백만 개의 프로세스를 생성하는 것은 저렴합니다. 메시지를 통한 커뮤니케이션 명시적인 동기화가 필요하지 않습니다. 충돌 전파 링크를 통한 것은 OTP가 내결함성을 구축하는 메커니즘입니다. 다음 기사에서는 GenServer의 기본 구성 요소인 GenServer를 살펴봅니다. 이 모든 것을 더 높은 수준의 추상화로 가져갑니다.

Elixir 시리즈의 향후 기사

  • 제3조: OTP 및 GenServer - 내결함성이 내장된 분산 상태
  • 제4조: 감독자 트리 — 절대 죽지 않는 시스템 설계
  • 제5조: Ecto — PostgreSQL용 구성 가능한 쿼리 및 스키마 매핑