エッジ コンピューティングにおける状態の問題

このシリーズの以前の記事では、Cloudflare Workers がいかに優れているかを示しました ステートレス ワークロードの場合: 各リクエストは共有メモリなしで分離して処理されます。 呼び出しの間。この機能はスケーラビリティの強みです 水平ですが、アプリケーションで調整が必要になるとすぐに障害になります。

次の一般的なシナリオを考えてみましょう。 メッセージを送信する必要があるチャット ルーム 注文して参加者全員に配送します。共同ドキュメントでさらに詳しく ユーザーは同時に編集します。リクエストをカウントする必要がある API のレート リミッター 世界的に一貫した方法で。これらすべてのケースにおいて、ワーカーのステートレス モデルは それでは十分ではありません: 必要です 一貫性が高かった と容量 競合するリクエストを 1 か所で調整します。

これはまさに私が抱えている問題です 耐久性のあるオブジェクト 彼らは解決します。

何を学ぶか

  • デュラブル オブジェクトとは何ですか。ワーカー KV との違いは何ですか?
  • 強力な整合性モデル: シングルライター、グローバル調整
  • Durable Objects を使用したセッション管理で WebSocket を実装する方法
  • トランザクションストレージ: 読み取り、書き込み、およびアトミックトランザクション
  • チャット ルーム、レート制限、ドキュメント コラボレーションのパターン
  • アラーム: 耐久オブジェクト内のスケジュールされた操作
  • 制限、価格設定、DO、KV、D1 を選択するタイミング

耐久性のあるオブジェクトとは

Durable Object (DO) は、Cloudflare がインスタンス化する JavaScript/TypeScript クラスです。 特定の識別子に対して単一の地理的位置にあります。ワーカーズKVとは異なります (300 以上の PoP での一貫性の可能性)、DO には次の保証があります。

  • シングルライターの一貫性: 特定の ID に対して DO のインスタンスは世界中で 1 つだけ存在します
  • リクエストのシリアル化: DO 呼び出しは順番に実行され、同時に実行されることはありません
  • 耐久性のあるストレージ: 各インスタンスのプライベート トランザクション キー/値ストア
  • WebSocket の休止状態: WebSocket 接続はアイドル期間中もコストをかけずに存続します

DO の場所は、最初のアクティブ化時に自動的に決定されます。 そして固定されたままになります。 Cloudflareは、最初のリクエストに最も近いデータセンターを選択します。 後続のすべてのリクエストは、世界中のどこにいても、その単一のリクエストにルーティングされます。 Cloudflareエニーキャストルーティング経由のインスタンス。

原生的 一貫性 競争 読み取りレイテンシ 使用事例
労働者KV 最終的な時間 (分) マルチライター ~1ms (キャッシュ) 構成、アセット、読み取り負荷の高いセッション
耐久性のあるオブジェクト 強い(線形化可能) シングルライター ~50 ~ 150ms (リモート エッジから) チャット、レートリミッター、ゲームステート、CRDT
D1 SQLite 強い(プライマリー) マルチリーダー、シングルライター ~5 ~ 20ms (近くの PoP から) リレーショナルクエリ、レポート、OLTP
R2オブジェクトストレージ 強い(オブジェクト別) マルチライター(競合検出) ~50~200ミリ秒 ファイル、画像、バックアップ

耐久性のあるオブジェクトの構造

Durable Objectは、Cloudflareランタイムが実行する特定のメソッドを持つクラスです。 認識します。最も重要なことは fetch()、すべてに呼び出されます HTTP リクエストが DO に転送されました。基本的な構造を見てみましょう。

// src/counter-do.ts
// Un Durable Object semplice: un contatore con persistenza

export class CounterDO implements DurableObject {
  private state: DurableObjectState;
  private env: Env;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
    this.env = env;
    // state.storage e il key-value store privato di questa istanza
    // Persiste attraverso i riavvii del DO
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    switch (url.pathname) {
      case '/increment': {
        // Legge il valore corrente (undefined se non esiste)
        const current = (await this.state.storage.get<number>('count')) ?? 0;
        const next = current + 1;
        // Scrittura atomica: garantita durabile prima del return
        await this.state.storage.put('count', next);
        return Response.json({ count: next });
      }

      case '/decrement': {
        const current = (await this.state.storage.get<number>('count')) ?? 0;
        const next = Math.max(0, current - 1);
        await this.state.storage.put('count', next);
        return Response.json({ count: next });
      }

      case '/value': {
        const count = (await this.state.storage.get<number>('count')) ?? 0;
        return Response.json({ count });
      }

      case '/reset': {
        await this.state.storage.put('count', 0);
        return Response.json({ count: 0, reset: true });
      }

      default:
        return new Response('Not Found', { status: 404 });
    }
  }
}

interface Env {
  COUNTER: DurableObjectNamespace;
}

Worker エントリ ポイントから DO を使用するには、名前空間バインディングを介して DO をインスタンス化する必要があります。 そしてID:

// src/worker.ts - entry point del Worker
export { CounterDO } from './counter-do';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // Estrae l'ID dalla query string: /counter?id=room-1
    const counterId = url.searchParams.get('id') ?? 'global';

    // idFromName() crea un ID deterministico da una stringa
    // Lo stesso nome produce sempre lo stesso ID (e la stessa istanza)
    const id = env.COUNTER.idFromName(counterId);

    // Ottiene lo stub per comunicare con l'istanza
    const stub = env.COUNTER.get(id);

    // Invia la richiesta al Durable Object
    // La richiesta viene instradata al datacenter corretto automaticamente
    return stub.fetch(request);
  },
};

interface Env {
  COUNTER: DurableObjectNamespace;
}

構成 wrangler.toml バインディングを宣言する必要があります。

# wrangler.toml
name = "counter-worker"
main = "src/worker.ts"
compatibility_date = "2024-09-23"
compatibility_flags = ["nodejs_compat"]

[[durable_objects.bindings]]
name = "COUNTER"
class_name = "CounterDO"

[[migrations]]
tag = "v1"
new_classes = ["CounterDO"]

永続オブジェクトを使用した WebSocket: チャット ルーム

Durable Objects の最も強力な使用例はセッション管理です 共有ステートフル WebSocket。各チャット ルームは DO の個別のインスタンスです。 アクティブな接続とメッセージ履歴のリストを維持します。

Cloudflare は、 WebSocket ハイバネーション API: DOが来る 処理するメッセージがない場合は「休止状態」になり、コストが大幅に削減されます (開いた接続に対してではなく、処理時間に対してのみ料金が発生します)。

// src/chat-room-do.ts
// Durable Object per una stanza di chat con WebSocket Hibernation

export class ChatRoomDO implements DurableObject {
  private state: DurableObjectState;
  private env: Env;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
    this.env = env;
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    if (url.pathname === '/ws') {
      // Verifica che sia una richiesta di upgrade WebSocket
      const upgradeHeader = request.headers.get('Upgrade');
      if (!upgradeHeader || upgradeHeader !== 'websocket') {
        return new Response('Expected WebSocket upgrade', { status: 426 });
      }

      // Crea la coppia WebSocket server/client
      const { 0: clientWs, 1: serverWs } = new WebSocketPair();

      // Accetta la connessione tramite la Hibernation API
      // Il DO sara ibernato tra i messaggi (no costo di CPU idle)
      this.state.acceptWebSocket(serverWs);

      // Opzionale: associa metadata alla connessione
      // Utile per identificare l'utente nei messaggi successivi
      const userId = url.searchParams.get('userId') ?? `anon-${Date.now()}`;
      serverWs.serializeAttachment({ userId });

      // Invia la storia recente al nuovo utente
      const history = (await this.state.storage.get<Message[]>('history')) ?? [];
      if (history.length > 0) {
        serverWs.send(JSON.stringify({ type: 'history', messages: history }));
      }

      return new Response(null, {
        status: 101,
        webSocket: clientWs,
      });
    }

    if (url.pathname === '/messages' && request.method === 'GET') {
      const history = (await this.state.storage.get<Message[]>('history')) ?? [];
      return Response.json({ messages: history });
    }

    return new Response('Not Found', { status: 404 });
  }

  // Chiamato dalla Hibernation API quando arriva un messaggio WebSocket
  async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
    const { userId } = ws.deserializeAttachment() as { userId: string };

    let parsed: ClientMessage;
    try {
      parsed = JSON.parse(message as string);
    } catch {
      ws.send(JSON.stringify({ type: 'error', error: 'Invalid JSON' }));
      return;
    }

    if (parsed.type === 'chat') {
      const msg: Message = {
        id: crypto.randomUUID(),
        userId,
        text: parsed.text,
        timestamp: Date.now(),
      };

      // Salva nella history (mantieni solo gli ultimi 100 messaggi)
      const history = (await this.state.storage.get<Message[]>('history')) ?? [];
      const newHistory = [...history, msg].slice(-100);
      await this.state.storage.put('history', newHistory);

      // Broadcast a tutte le connessioni WebSocket attive nel DO
      const allWebSockets = this.state.getWebSockets();
      const payload = JSON.stringify({ type: 'message', message: msg });
      for (const socket of allWebSockets) {
        try {
          socket.send(payload);
        } catch {
          // Connessione chiusa, ignorala
        }
      }
    }
  }

  // Chiamato quando una connessione WebSocket viene chiusa
  async webSocketClose(ws: WebSocket, code: number, reason: string): Promise<void> {
    const { userId } = ws.deserializeAttachment() as { userId: string };
    ws.close(code, reason);

    // Notifica gli altri utenti dell'uscita
    const notification = JSON.stringify({
      type: 'user_left',
      userId,
      timestamp: Date.now(),
    });
    for (const socket of this.state.getWebSockets()) {
      try {
        socket.send(notification);
      } catch { /* ignore */ }
    }
  }

  // Chiamato in caso di errore sulla connessione WebSocket
  async webSocketError(ws: WebSocket, error: unknown): Promise<void> {
    console.error('WebSocket error:', error);
    ws.close(1011, 'Internal error');
  }
}

interface Message {
  id: string;
  userId: string;
  text: string;
  timestamp: number;
}

interface ClientMessage {
  type: 'chat' | 'ping';
  text?: string;
}

interface Env {
  CHAT_ROOM: DurableObjectNamespace;
}

Worker エントリ ポイントは、パスに基づいてリクエストを正しいルームにルーティングします。

// src/worker.ts
export { ChatRoomDO } from './chat-room-do';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // /room/:roomId/ws -> WebSocket per la stanza
    // /room/:roomId/messages -> GET cronologia
    const match = url.pathname.match(/^\/room\/([^/]+)(\/.*)?$/);
    if (!match) {
      return new Response('Not Found', { status: 404 });
    }

    const roomId = match[1];
    const subpath = match[2] ?? '/ws';

    // Ogni stanza e una istanza distinta del DO
    const id = env.CHAT_ROOM.idFromName(roomId);
    const stub = env.CHAT_ROOM.get(id);

    // Rewrite del path per il DO
    const doUrl = new URL(request.url);
    doUrl.pathname = subpath;

    return stub.fetch(new Request(doUrl.toString(), request));
  },
};

interface Env {
  CHAT_ROOM: DurableObjectNamespace;
}

トランザクションストレージ: アトミックオペレーション

デュラブル オブジェクト ストレージは、アトミック トランザクションをサポートします。 state.storage.transaction()。これは次の場合に重要です 操作では複数のキーを一貫して読み書きする必要があります。

// Esempio: trasferimento di crediti tra utenti (atomico)
export class AccountDO implements DurableObject {
  state: DurableObjectState;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    if (request.method !== 'POST') {
      return new Response('Method Not Allowed', { status: 405 });
    }

    const { from, to, amount } = await request.json<TransferRequest>();

    try {
      // La transazione e atomica: o tutto va a buon fine, o niente
      await this.state.storage.transaction(async (txn) => {
        const fromBalance = (await txn.get<number>(`balance:${from}`)) ?? 0;
        const toBalance = (await txn.get<number>(`balance:${to}`)) ?? 0;

        if (fromBalance < amount) {
          // Il throw annulla la transazione
          throw new Error(`Insufficient balance: ${fromBalance} < ${amount}`);
        }

        await txn.put(`balance:${from}`, fromBalance - amount);
        await txn.put(`balance:${to}`, toBalance + amount);

        // Log dell'operazione
        const txLog = (await txn.get<TxRecord[]>('tx_log')) ?? [];
        txLog.push({ from, to, amount, timestamp: Date.now() });
        await txn.put('tx_log', txLog.slice(-1000));
      });

      return Response.json({ success: true, from, to, amount });
    } catch (err) {
      return Response.json(
        { success: false, error: (err as Error).message },
        { status: 400 }
      );
    }
  }
}

interface TransferRequest {
  from: string;
  to: string;
  amount: number;
}

interface TxRecord {
  from: string;
  to: string;
  amount: number;
  timestamp: number;
}

interface Env {
  ACCOUNT: DurableObjectNamespace;
}

アラーム: 耐久オブジェクトでのスケジュールされた操作

永続オブジェクトのサポート アラーム: コールバック alarm() 私がそこにいない場合でも、スケジュールされた時間に呼び出されます DO へのアクティブなリクエスト。これは、TTL、期限、定期的なジョブに役立ちます DO のステータスにリンクされています:

// src/session-do.ts
// DO con alarm per scadenza automatica della sessione

export class SessionDO implements DurableObject {
  state: DurableObjectState;
  static SESSION_TTL_MS = 30 * 60 * 1000; // 30 minuti

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    if (url.pathname === '/create' && request.method === 'POST') {
      const data = await request.json<SessionData>();

      // Salva i dati della sessione
      await this.state.storage.put('session', {
        ...data,
        createdAt: Date.now(),
      });

      // Schedula l'alarm per la scadenza della sessione
      // Se l'alarm e gia schedulato, viene sostituito
      await this.state.storage.setAlarm(Date.now() + SessionDO.SESSION_TTL_MS);

      return Response.json({ ok: true, expiresIn: SessionDO.SESSION_TTL_MS });
    }

    if (url.pathname === '/get') {
      const session = await this.state.storage.get<SessionData>('session');
      if (!session) {
        return Response.json({ error: 'Session not found' }, { status: 404 });
      }

      // Refresh del TTL ad ogni accesso (sliding expiry)
      await this.state.storage.setAlarm(Date.now() + SessionDO.SESSION_TTL_MS);

      return Response.json({ session });
    }

    if (url.pathname === '/invalidate' && request.method === 'DELETE') {
      await this.state.storage.deleteAll();
      await this.state.storage.deleteAlarm();
      return Response.json({ ok: true });
    }

    return new Response('Not Found', { status: 404 });
  }

  // Chiamato automaticamente quando scatta l'alarm
  async alarm(): Promise<void> {
    // Pulisce i dati della sessione scaduta
    const session = await this.state.storage.get<SessionData>('session');
    if (session) {
      console.log(`Session expired for user: ${session.userId}`);
      await this.state.storage.deleteAll();
    }
  }
}

interface SessionData {
  userId: string;
  role: string;
  metadata?: Record<string, unknown>;
}

interface Env {
  SESSION: DurableObjectNamespace;
}

耐久性のあるオブジェクトを使用したグローバル レート リミッター

分散レート リミッターは、パブリック API で最もリクエストの多いパターンの 1 つです。 Workers KV の場合、実装は競合状態の影響を受ける可能性があります。 C の場合、すべての レート制限「バケット」は、強い一貫性を持つインスタンスです。

// src/rate-limiter-do.ts
// Token bucket rate limiter con Durable Objects

export class RateLimiterDO implements DurableObject {
  state: DurableObjectState;

  // Configurazione: 100 req/minuto per IP
  static MAX_TOKENS = 100;
  static REFILL_RATE_MS = 60_000; // 1 minuto per refill completo

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    const now = Date.now();

    // Legge lo stato attuale del bucket
    const bucket = (await this.state.storage.get<TokenBucket>('bucket')) ?? {
      tokens: RateLimiterDO.MAX_TOKENS,
      lastRefill: now,
    };

    // Calcola quanti token sono stati aggiunti dall'ultimo accesso
    const elapsed = now - bucket.lastRefill;
    const tokensToAdd = (elapsed / RateLimiterDO.REFILL_RATE_MS) * RateLimiterDO.MAX_TOKENS;
    const currentTokens = Math.min(
      RateLimiterDO.MAX_TOKENS,
      bucket.tokens + tokensToAdd
    );

    if (currentTokens < 1) {
      // Rate limit superato
      const retryAfterMs = Math.ceil(
        (1 - currentTokens) / (RateLimiterDO.MAX_TOKENS / RateLimiterDO.REFILL_RATE_MS)
      );

      await this.state.storage.put('bucket', {
        tokens: currentTokens,
        lastRefill: now,
      });

      return Response.json(
        {
          allowed: false,
          retryAfter: Math.ceil(retryAfterMs / 1000),
          remaining: 0,
        },
        {
          status: 429,
          headers: {
            'Retry-After': String(Math.ceil(retryAfterMs / 1000)),
            'X-RateLimit-Limit': String(RateLimiterDO.MAX_TOKENS),
            'X-RateLimit-Remaining': '0',
          },
        }
      );
    }

    // Consuma un token e aggiorna il bucket
    await this.state.storage.put('bucket', {
      tokens: currentTokens - 1,
      lastRefill: now,
    });

    return Response.json({
      allowed: true,
      remaining: Math.floor(currentTokens - 1),
    });
  }
}

interface TokenBucket {
  tokens: number;
  lastRefill: number;
}

interface Env {
  RATE_LIMITER: DurableObjectNamespace;
}

ワーカーは、各リクエストのフローにレート リミッターを統合します。

// src/worker.ts con rate limiting
export { RateLimiterDO } from './rate-limiter-do';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    // Identifica il client (IP o API key)
    const clientIp = request.headers.get('CF-Connecting-IP') ?? 'unknown';
    const apiKey = request.headers.get('X-API-Key');
    const bucketId = apiKey ?? `ip:${clientIp}`;

    // Controlla il rate limit per questo client
    const rateLimiterId = env.RATE_LIMITER.idFromName(bucketId);
    const rateLimiter = env.RATE_LIMITER.get(rateLimiterId);
    const limitCheck = await rateLimiter.fetch(new Request('https://dummy/check'));

    if (limitCheck.status === 429) {
      return limitCheck; // Propaga la risposta 429 con gli headers
    }

    // Prosegue con la logica dell'API
    return handleApiRequest(request, env);
  },
};

async function handleApiRequest(request: Request, env: Env): Promise<Response> {
  return Response.json({ data: 'your api response here' });
}

interface Env {
  RATE_LIMITER: DurableObjectNamespace;
}

パフォーマンスとコストの考慮事項

Durable Object はコストとパフォーマンスのプロファイルが大きく異なります 単純な労働者に。留意すべきいくつかの重要なポイント:

レイテンシー: 必ずしもローカルではない

各 DO インスタンスは単一のデータセンターに存在します。ヨーロッパのユーザーの場合 北米でインスタンス化された DO にアクセスし、各操作のレイテンシー 大西洋往復 (約 100 ~ 150 ミリ秒) が含まれます。設計DO ID 地理的な共有を制限するには: 可能な限り地域ベースの ID を使用します。 (idFromName(`${region}:${resourceId}`))、または遅延を受け入れる リージョン間の強力な一貫性を必要とする操作の場合にのみ高くなります。

Voce 無料枠 有給(有給労働者)
DOへのリクエスト 1M/月を含む 上記より 100 万ドルあたり 0.15 ドル
DO ライフタイム (CPU) 400,000 GB-秒/月 100 万 GB 秒あたり 12.50 ドル
ストレージ 1GB付属 それ以降は $0.20/GB-月
WebSocket ハイバネーション 利用可能 利用可能(アイドルコストなし)
アラーム 利用可能 利用可能

ベストプラクティス

  • IDの粒度: 特定の ID を使用します (例: chat:room-42) 個々のインスタンスのホットスポットを回避するために、グローバル ID ではなく。
  • WebSocket の休止状態は常に次のとおりです。 アメリカ合衆国 state.acceptWebSocket() 手動イベント リスナーの代わりに、アイドル接続コストを削減します。
  • バッチストレージ: アメリカ合衆国 storage.put(map) 書く 複数のキーではなく 1 回の操作で複数のキー put() シングル。
  • ストレージ サイズの制限: 各インスタンスには 128MB の制限があります ストレージの。大きなデータの場合は R2 を使用し、参照のみを DO に保存します。
  • 連載予算: リクエストはキューに入れられて処理されます 順次。遅い操作は後続の操作をブロックします。ハンドラーを保持する 高速 (理想的には 1 秒未満)。

結論と次のステップ

永続オブジェクトは、ワーカーのステートレス モデルとワーカーのステートレス モデルの間のギャップを橋渡しします。 調整と状態の共有を必要とする最新のアプリケーション。彼らのもの WebSocket Hibernation API およびアラームと組み合わせると、プリミティブになります。 チャット、ゲーム、ドキュメントコラボレーション、グローバル調整システムを完備しています。

トレードオフは遅延です。DO は物理的に 1 つのデータセンターにのみ存在します。 したがって、クロスリージョン書き込み操作により待ち時間が増加します。読書用 スケーラブルな場合は、DO (書き込み用) と KV (キャッシュされた読み取り用) を組み合わせることを検討してください。

シリーズの次の記事

  • 第5条: Workers AI — LLM とビジョン モデルの推論 on the Edge: Llama、Whisper、ビジョン モデルをワーカーで直接実行する方法 専用 GPU を使用せず、Workers AI は前年比 4000% 成長しました。
  • 第6条: Vercel Edge ランタイム — アドバンスト ミドルウェア、 地理位置情報と A/B テスト: Next.js を使用した Vercel のエッジへのアプローチ。
  • 第7条: エッジでの地理的ルーティング — パーソナライゼーション GDPR のコンテンツとコンプライアンス: ジオフェンシング、ローカライズされた価格設定、および GDPR。