コンテキストの伝播: ログ、トレース、メトリクスの相関関係
La コンテキストの伝播 そしてすべての信号を接続できる仕組み サービスの数に関係なく、リクエストの処理中に生成されるテレメトリの数 十字架。コンテキストの伝播がなければ、トレース、ログ、およびメトリクスは孤立した信号のままになります。 これにより、各リクエストの完全なストーリーを伝える一貫した物語になります。
この記事では、OpenTelemetry のコンテキスト伝播メカニズムを分析します。 トレースコンテキストからバゲッジまで、ログとトレースの相関関係からパターンまで、 サービス境界を越えて信号が失われることはありません。
この記事で学べること
- サービス間でのトレース コンテキスト伝播の仕組み
- W3C Baggage: サービス間でカスタム メタデータを運ぶ
- ログとトレースの相関: 構造化ログをトレースに接続します
- 完全なサービス間相関のパターン
- 非同期シナリオ (キュー、イベント) での伝播の管理
- 伝播の問題のトラブルシューティング
トレースコンテキストの伝播
の伝播 トレースコンテキスト とそれらを結び付ける基本的なメカニズム 同じトレース内の異なるサービスのスパン。サービスが別のサービスを呼び出すとき、 トレース コンテキスト (trace_id、span_id、trace_flags) はリクエスト ヘッダーでシリアル化されます。 そして、受信サービスによって逆シリアル化されて、子スパンが作成されます。
OpenTelemetry は i を使用します プロパゲーター シリアル化と逆シリアル化を管理するため コンテキストの。デフォルトのプロパゲータは W3C トレースコンテキスト、ただしOTelはサポートしています B3 (Zipkin)、Jaeger、カスタムフォーマットも可能です。
from opentelemetry import trace, context
from opentelemetry.propagate import inject, extract
from opentelemetry.propagators.textmap import DefaultTextMapPropagator
import requests
tracer = trace.get_tracer("order-service")
# --- LATO CLIENT: iniettare il contesto nella richiesta ---
def call_payment_service(order):
with tracer.start_as_current_span("call-payment-service") as span:
span.set_attribute("order.id", order.id)
# Creare un dizionario per gli header
headers = {}
# Iniettare il trace context negli header
inject(headers)
# Ora headers contiene:
# { "traceparent": "00-4bf92f35...-00f067aa...-01",
# "tracestate": "" }
# Inviare la richiesta con gli header di contesto
response = requests.post(
"http://payment-service/api/charge",
json={"order_id": order.id, "amount": order.total},
headers=headers
)
return response.json()
# --- LATO SERVER: estrarre il contesto dalla richiesta ---
from flask import Flask, request as flask_request
app = Flask(__name__)
@app.route("/api/charge", methods=["POST"])
def handle_charge():
# Estrarre il trace context dagli header della richiesta
ctx = extract(flask_request.headers)
# Creare uno span figlio nel contesto estratto
with tracer.start_as_current_span(
"process-charge",
context=ctx,
kind=trace.SpanKind.SERVER,
attributes={
"payment.amount": flask_request.json["amount"]
}
) as span:
result = process_payment(flask_request.json)
span.set_attribute("payment.status", result.status)
return result.to_json()
W3C Baggage: クロスサービスメタデータ
Il W3C 手荷物 カスタムのキーと値のペアを運ぶメカニズム トレース コンテキストとともに、サービス境界を越えて。の属性とは異なり、 スパン(ローカル)では、バゲージはすべての下流サービスに伝播されます。
Baggage は、スパンに属さないコンテキスト情報を運ぶのに役立ちます ただし、顧客層、A/B テストのバリアント、リクエストの優先順位、 原産地。
from opentelemetry import baggage, context
from opentelemetry.propagate import inject
# --- Impostare il baggage nel servizio di origine ---
def handle_api_request(request):
# Impostare valori di baggage
ctx = baggage.set_baggage("customer.tier", "premium")
ctx = baggage.set_baggage("ab.test.variant", "checkout-v2", context=ctx)
ctx = baggage.set_baggage("request.priority", "high", context=ctx)
# Attaccare il contesto
token = context.attach(ctx)
try:
with tracer.start_as_current_span("handle-request") as span:
# Il baggage verrà propagato automaticamente
# a tutti i servizi downstream
headers = {}
inject(headers)
# headers ora contiene anche:
# "baggage": "customer.tier=premium,ab.test.variant=checkout-v2,request.priority=high"
call_downstream_service(headers)
finally:
context.detach(token)
# --- Leggere il baggage in un servizio downstream ---
def handle_downstream_request(request):
ctx = extract(request.headers)
token = context.attach(ctx)
try:
# Leggere i valori di baggage
customer_tier = baggage.get_baggage("customer.tier")
ab_variant = baggage.get_baggage("ab.test.variant")
priority = baggage.get_baggage("request.priority")
with tracer.start_as_current_span("downstream-operation") as span:
# Usare il baggage come attributi dello span
span.set_attribute("customer.tier", customer_tier or "standard")
span.set_attribute("ab.test.variant", ab_variant or "control")
# Differenziare il comportamento in base al baggage
if priority == "high":
process_with_priority(request)
else:
process_normal(request)
finally:
context.detach(token)
手荷物: いつ使用し、いつ避けるべきか
| シナリオ | おすすめ | 理由 |
|---|---|---|
| ルーティングの顧客層 | はい、手荷物を使用します | ルーティング決定のための複数のサービスを提供します |
| A/B テストのバリエーション | はい、手荷物を使用します | 各サービスは、どのバリアントを提供するかを認識する必要があります |
| 機密データ (PII、トークン) | いいえ、荷物は絶対に入れないでください | 手荷物はHTTPヘッダーで暗号化されずに渡されます |
| ビッグデータ (ペイロード、JSON) | いいえ、避けてください | 手荷物により各リクエストのサイズが増加します |
| リクエストID / 相関ID | 代わりにtrace_idを使用してください | Trace_id はトレース コンテキストによってすでに伝播されています |
ログとトレースの相関関係
La ログとトレースの相関関係 およびその際に生成されるログを接続するパターン
processing a request to the corresponding track.注射することで得られます
trace_id e span_id 各ログ行でフィルタリングが可能
特定のトレースに関連付けられたすべてのログ。
import logging
import json
from opentelemetry import trace
class TraceContextFilter(logging.Filter):
"""Filtro che aggiunge trace_id e span_id a ogni record di log"""
def filter(self, record):
span = trace.get_current_span()
if span and span.is_recording():
ctx = span.get_span_context()
record.trace_id = format(ctx.trace_id, "032x")
record.span_id = format(ctx.span_id, "016x")
record.trace_flags = format(ctx.trace_flags, "02x")
record.service_name = "order-service"
else:
record.trace_id = "0" * 32
record.span_id = "0" * 16
record.trace_flags = "00"
record.service_name = "order-service"
return True
# Configurare il logger con il filtro
logger = logging.getLogger("order-service")
logger.addFilter(TraceContextFilter())
# Formatter JSON che include trace context
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(json.dumps({
"timestamp": "%(asctime)s",
"level": "%(levelname)s",
"message": "%(message)s",
"service": "%(service_name)s",
"trace_id": "%(trace_id)s",
"span_id": "%(span_id)s",
"trace_flags": "%(trace_flags)s",
"logger": "%(name)s"
})))
logger.addHandler(handler)
# Uso: i log avranno automaticamente trace_id e span_id
def process_order(order):
with tracer.start_as_current_span("process-order") as span:
logger.info(f"Processing order {order.id}")
# Output: {"trace_id": "4bf92f35...", "span_id": "00f067aa...",
# "message": "Processing order ORD-123", ...}
validate_order(order)
logger.info(f"Order {order.id} validated successfully")
process_payment(order)
logger.info(f"Payment processed for order {order.id}")
非同期シナリオでの伝播
非同期シナリオでのコンテキストの伝播 (メッセージ キュー、イベント、スケジュールされたタスク) 特別な注意が必要です。同期 HTTP 呼び出しとは異なり、ヘッダー コンテキストを運ぶため、非同期システムではコンテキストをシリアル化する必要があります。 メッセージ ペイロードまたはそのメタデータ。
from opentelemetry import trace, context
from opentelemetry.propagate import inject, extract
import json
tracer = trace.get_tracer("order-service")
# --- PRODUCER: iniettare contesto nel messaggio Kafka ---
def publish_order_event(order, kafka_producer):
with tracer.start_as_current_span(
"publish-order-created",
kind=trace.SpanKind.PRODUCER,
attributes={
"messaging.system": "kafka",
"messaging.destination.name": "order-events",
"messaging.operation.type": "publish",
"order.id": order.id
}
) as span:
# Iniettare il trace context negli header del messaggio
headers = {}
inject(headers)
# Convertire gli header in formato Kafka
kafka_headers = [(k, v.encode()) for k, v in headers.items()]
kafka_producer.produce(
topic="order-events",
key=order.id.encode(),
value=json.dumps(order.to_dict()).encode(),
headers=kafka_headers
)
# --- CONSUMER: estrarre contesto dal messaggio Kafka ---
def consume_order_events(kafka_consumer):
for message in kafka_consumer:
# Convertire header Kafka in dizionario
headers = {k: v.decode() for k, v in message.headers()}
# Estrarre il trace context
ctx = extract(headers)
# Creare uno span nel contesto estratto
with tracer.start_as_current_span(
"process-order-event",
context=ctx,
kind=trace.SpanKind.CONSUMER,
attributes={
"messaging.system": "kafka",
"messaging.destination.name": "order-events",
"messaging.operation.type": "process"
}
) as span:
order_data = json.loads(message.value())
span.set_attribute("order.id", order_data["id"])
process_order_event(order_data)
コンテキスト伝播のチェックリスト
- 同期HTTP: コンテキストは、HTTP ヘッダーを介した自動インストルメンテーションによって自動的に伝播されます。
- gRPC: gRPC (自動インストルメンテーション) メタデータによる自動伝播
- メッセージキュー: メッセージヘッダーにコンテキストを手動で挿入/抽出します
- スレッドプール: 新しいスレッドでコンテキストをキャプチャして再アタッチします。
attach/detach - 非同期/待機: 非同期フレームワークがコンテキストを維持していることを確認します (Python asyncio はネイティブでサポートしています)。
- スケジュールされたタスク: 新しいルート スパンまたは元のトラックへのリンクを作成します
伝播のトラブルシューティング
伝播の問題は、可観測性の導入において最も一般的なものの 1 つです。とき トラックが「壊れている」ように見えます (親のない孤立したスパン)、通常は問題があります コンテキストの伝播。最も一般的な原因と解決策を次に示します。
一般的な伝播の問題
壊れた線路: スパンは接続されているのではなく、別個のトラックとして表示されます。原因:
中間プロキシまたはロードバランサがヘッダーを削除します traceparent。解決策: 設定する
トレースヘッダーを渡すプロキシ。
スレッド内で失われたコンテキスト: サブスレッドで作成されたスパンには親がありません。
原因: コンテキストはスレッド境界を越えて伝播されません。解決策: を使用する
context.get_current() e attach() 新しいスレッドで。
プロパゲータが構成されていません: サービスはさまざまな形式を使用します (W3C と B3)。
解決策: CompositePropagator 必要なフォーマットがすべて揃っています。
手荷物は伝播されません:手荷物の価値は下流のサービスに届きません。
原因: BaggagePropagator 登録されていません。解決策:
W3CBaggagePropagator プロパゲータの構成に影響します。
結論と次のステップ
コンテキストの伝播は、可観測性システム全体をまとめる接着剤です。 これがないと、トレース、ログ、メトリクスは孤立した信号のままになります。適切に繁殖させることで、 それらは統一された物語となり、エントリの時点からあらゆるリクエストに従うことができます。 応答が返されるまで、関連するすべてのサービスを通じて。
3 つの主要なメカニズムは次のとおりです。 W3C トレースコンテキスト の普及のために トレースID、 W3C 手荷物 カスタムメタデータを運ぶため、そして ログとトレースの相関関係 ログを対応するトレースにリンクします。
次の記事では、eBPF 計測器、革新的な技術 これにより、アプリケーションコードを変更せずにカーネルレベルで可観測性を取得できます。 eBPF プログラムを使用してシステム コールをインターセプトするエージェントレス。







