문제: Python은 단일 스레드입니다.

Python에는 GIL(Global Interpreter Lock)이 있습니다. 단 하나의 Python 스레드만 바이트코드를 실행합니다. 한 번에. 이것은 종종 오해됩니다. GIL은 경쟁을 방해하지 않습니다 - 방지 CPU 병렬성 스레드의. I/O 중심 워크로드의 경우 (API는 일반적으로 그렇습니다) asyncio는 효율성 측면에서 스레드보다 성능이 뛰어납니다. OS 컨텍스트 전환 오버헤드를 제거합니다.

무엇을 배울 것인가

  • 이벤트 루프: 비동기 실행을 제어하는 ​​루프
  • 코루틴: 일시 중지하고 다시 시작할 수 있는 함수
  • async/await: 코루틴을 읽을 수 있게 만드는 구문
  • 태스크 vs 미래: asyncio 프리미티브
  • I/O 바인딩과 CPU 바인딩: 비동기가 도움이 되는 경우와 그렇지 않은 경우
  • 구조화된 동시성을 위한 asyncio.gather 및 asyncio.TaskGroup
  • 실제 워크로드에서 FastAPI 동기화 및 비동기 벤치마크

이벤트 루프: asyncio의 핵심

이벤트 루프는 콜백 대기열을 관리하는 무한 루프입니다. 코루틴. 코루틴이 I/O 작업(await), 일시 중지하고 실행할 수 있는 이벤트 루프로 제어를 반환합니다. 그 동안에는 다른 코루틴을 사용하세요. I/O가 완료되면 코루틴이 재설정됩니다. 촬영을 위해 줄을 서고 있다.

# Visualizzazione concettuale dell'event loop (pseudocodice)
#
# Event Loop Iteration:
# 1. Guarda la coda delle callback pronte
# 2. Esegui la prima callback/coroutine
# 3. Se incontra un await su I/O:
#    - Registra l'operazione I/O con il sistema operativo (epoll/kqueue/IOCP)
#    - Metti la coroutine in "sospensione" (waiting)
#    - Torna al passo 1 (esegui la prossima callback disponibile)
# 4. Quando l'I/O completa (notifica OS):
#    - Rimetti la coroutine nella coda "pronta"
# 5. Ripeti

# In Python reale:
import asyncio

async def fetch_data(url: str) -> str:
    # Simulazione di una richiesta HTTP asincrona
    # await sospende questa coroutine finche la risposta non arriva
    # L'event loop nel frattempo puo eseguire altre coroutine
    await asyncio.sleep(1)  # Simula latenza di rete
    return f"Data from {url}"

async def main():
    # Esecuzione sequenziale: 3 secondi totali
    result1 = await fetch_data("https://api1.example.com")
    result2 = await fetch_data("https://api2.example.com")
    result3 = await fetch_data("https://api3.example.com")
    return [result1, result2, result3]

# asyncio.run() crea l'event loop e lo esegue
asyncio.run(main())

코루틴과 일반 함수

일반 함수(def)는 처음부터 끝까지 실행됩니다. 중단. 코루틴(async def)는 다음과 같은 함수이다. 특정 지점에서 일시 중지할 수 있습니다(await) 포기하고 이벤트 루프 제어.

import asyncio
import time

# --- FUNZIONE SINCRONA ---
def fetch_sync(url: str) -> str:
    time.sleep(1)  # BLOCCA l'intero thread per 1 secondo
    return f"Data from {url}"

def main_sync():
    start = time.time()
    results = [
        fetch_sync("https://api1.example.com"),  # aspetta 1s
        fetch_sync("https://api2.example.com"),  # aspetta 1s
        fetch_sync("https://api3.example.com"),  # aspetta 1s
    ]
    print(f"Sync: {time.time() - start:.2f}s")  # ~3.00s
    return results

# --- COROUTINE ASINCRONA ---
async def fetch_async(url: str) -> str:
    await asyncio.sleep(1)  # SOSPENDE la coroutine, NON il thread
    return f"Data from {url}"

async def main_async():
    start = time.time()
    # gather esegue le tre coroutine CONCORRENTEMENTE
    results = await asyncio.gather(
        fetch_async("https://api1.example.com"),
        fetch_async("https://api2.example.com"),
        fetch_async("https://api3.example.com"),
    )
    print(f"Async: {time.time() - start:.2f}s")  # ~1.00s
    return results

# La differenza: 3s vs 1s per lo stesso workload I/O

작업 및 asyncio.gather

asyncio.gather() 실행하는 가장 일반적인 방법입니다 코루틴을 동시에 수행합니다. 모든 코루틴이 완료되면 반환됩니다. (또는 실패할 경우 기본적으로).

import asyncio
from typing import Any

# asyncio.gather: esecuzione concorrente di piu coroutine
async def concurrent_fetches():
    # Tutte e tre iniziano quasi simultaneamente
    results = await asyncio.gather(
        fetch_async("https://api1.example.com"),
        fetch_async("https://api2.example.com"),
        fetch_async("https://api3.example.com"),
        return_exceptions=True,  # Errori restituiti come valori invece di eccezioni
    )

    for url, result in zip(["api1", "api2", "api3"], results):
        if isinstance(result, Exception):
            print(f"{url}: Error - {result}")
        else:
            print(f"{url}: {result}")

# asyncio.create_task: esecuzione in background
async def background_tasks():
    # Task 1 inizia subito
    task1 = asyncio.create_task(fetch_async("https://api1.example.com"))

    # Fai altro mentre task1 e in background
    await asyncio.sleep(0.5)  # Simula altro lavoro

    # task2 parte dopo 0.5s
    task2 = asyncio.create_task(fetch_async("https://api2.example.com"))

    # Aspetta entrambi
    result1 = await task1
    result2 = await task2
    return result1, result2

# asyncio.TaskGroup (Python 3.11+): concorrenza strutturata
async def structured_concurrency():
    results = []
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_async("https://api1.example.com"))
        task2 = tg.create_task(fetch_async("https://api2.example.com"))
        task3 = tg.create_task(fetch_async("https://api3.example.com"))
    # Qui tutti i task sono completati (o c'e stata un'eccezione)
    return [task1.result(), task2.result(), task3.result()]

FastAPI의 async def: 사용 시기

FastAPI는 둘 다 지원합니다. async def 저것 def 그들에겐 정상 경로. 선택은 함수가 수행하는 작업에 따라 달라집니다.

# FastAPI: async def vs def
from fastapi import FastAPI
import asyncio
import httpx  # Client HTTP asincrono

app = FastAPI()

# USA async def quando:
# - Fai operazioni I/O con librerie async (httpx, asyncpg, aioredis, etc.)
# - Chiami altre coroutine con await
@app.get("/async-example")
async def async_endpoint():
    # httpx.AsyncClient e la versione async di requests
    async with httpx.AsyncClient() as client:
        response = await client.get("https://jsonplaceholder.typicode.com/posts/1")
        return response.json()

# USA def normale quando:
# - La funzione e puramente CPU (calcoli, elaborazione in memoria)
# - Usi librerie sincrone che non supportano async
# FastAPI esegue le funzioni sync in un thread pool separato
# per non bloccare l'event loop
@app.get("/sync-example")
def sync_endpoint():
    import json
    # Operazione CPU-bound: OK in def normale
    data = {"numbers": list(range(1000))}
    return json.dumps(data)

# CASO CRITICO: MAI fare I/O sincrono bloccante in async def
@app.get("/bad-example")
async def bad_endpoint():
    import requests  # SBAGLIATO: requests e sincrono
    # Questo BLOCCA l'event loop per la durata della richiesta HTTP!
    response = requests.get("https://api.example.com")  # NON FARE QUESTO
    return response.json()

# VERSIONE CORRETTA:
@app.get("/good-example")
async def good_endpoint():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com")
        return response.json()

비동기식 동기 라이브러리의 위험성

동기식 I/O 라이브러리(예: requests, psycopg2, 또는 기존 DB 클라이언트) 코루틴 async def 블록 전체 이벤트 루프: 전까지는 다른 요청을 처리할 수 없습니다. 작업이 완료되지 않습니다. 데이터베이스 사용을 위해 asyncpg o SQLAlchemy 2.0 async. HTTP 사용의 경우 httpx o aiohttp. Redis 사용의 경우 redis.asyncio.

I/O 바운드와 CPU 바운드: 주요 차이점

비동기는 워크로드에만 도움이 됩니다. I/O 바운드: 작업 프로그램은 외부 리소스(데이터베이스, HTTP API, 파일 시스템)를 기다립니다. 작업량별 CPU 바인딩 (머신러닝, 비디오 인코딩, 집중계산) asyncio는 도움이 되지 않습니다 — 도움이 됩니다 multiprocessing 아니면 집행자.

# Workload I/O-bound: asyncio aiuta molto
async def io_bound_handler():
    # Fa 3 chiamate API in ~1 secondo invece di ~3 secondi
    results = await asyncio.gather(
        fetch_user_from_db(user_id=1),      # ~50ms
        fetch_user_orders(user_id=1),        # ~80ms
        fetch_user_preferences(user_id=1),   # ~40ms
    )
    return results  # Pronto in ~80ms (il piu lento), non 170ms

# Workload CPU-bound: asyncio NON aiuta, usa ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import asyncio

executor = ProcessPoolExecutor(max_workers=4)

def cpu_intensive_task(data: list) -> list:
    # Sorting O(n log n), computazione pura
    return sorted(data, key=lambda x: x ** 2)

@app.post("/process")
async def process_data(data: list):
    loop = asyncio.get_event_loop()
    # run_in_executor esegue la funzione in un processo separato
    # senza bloccare l'event loop
    result = await loop.run_in_executor(
        executor,
        cpu_intensive_task,
        data,
    )
    return {"processed": result}

벤치마크: FastAPI의 동기화 및 비동기

다음은 엔드포인트 동기화와 비동기의 차이점을 보여주는 현실적인 벤치마크입니다. 100개의 동시 요청이 있는 I/O 바인딩 워크로드:

# Benchmark con httpx e asyncio (script di test)
# pip install httpx
import asyncio
import httpx
import time

async def benchmark(endpoint: str, n_requests: int = 100):
    async with httpx.AsyncClient(base_url="http://localhost:8000") as client:
        start = time.time()
        tasks = [client.get(endpoint) for _ in range(n_requests)]
        responses = await asyncio.gather(*tasks)
        elapsed = time.time() - start

        success = sum(1 for r in responses if r.status_code == 200)
        rps = n_requests / elapsed

        print(f"{endpoint}: {elapsed:.2f}s, {rps:.1f} req/s, {success}/{n_requests} success")

# Endpoint test nel server FastAPI
@app.get("/test/sync")
def sync_test():
    import time
    time.sleep(0.1)  # Simula 100ms DB query
    return {"data": "ok"}

@app.get("/test/async")
async def async_test():
    await asyncio.sleep(0.1)  # Simula 100ms DB query async
    return {"data": "ok"}

# Risultati tipici su un server con 4 worker Uvicorn:
# /test/sync:  10.23s, 9.8 req/s   (quasi sequenziale!)
# /test/async: 1.05s, 95.2 req/s   (quasi perfettamente concorrente)
#
# Con 100ms di latenza simulata:
# Sync:  100 richieste * 100ms = ~10s
# Async: concorrente = ~100ms + overhead

asyncio.run(benchmark("/test/sync"))
asyncio.run(benchmark("/test/async"))

결론

FastAPI에서 Python 비동기의 힘은 실제적이지만 이해가 필요합니다. 모델: 이벤트 루프는 단일 스레드이지만 I/O에 대해 비차단입니다. 코루틴은 다른 핸들러를 차단하지 않고 정지되며 경쟁은 이는 협력적입니다(OS 스레드처럼 선점적이지 않음). 다음 단계는 FastAPI가 의존하는 데이터 검증을 제공하는 Pydantic v2를 이해하세요.

FastAPI 시리즈의 향후 기사

  • 제3조: Pydantic v2 — 고급 검증, BaseModel 및 TypeAdapter
  • 제4조: FastAPI의 종속성 주입: 종속() 및 패턴 정리
  • 제5조: SQLAlchemy 2.0 및 Alembic을 사용한 비동기 데이터베이스