ゲーム テレメトリ パイプライン: Scala でのプレーヤー分析
最新のマルチプレイヤー ゲームでは、毎日、何十億ものイベントが生成されます。プレイヤーが移動したり、攻撃を実行したり、 アイテムを購入し、ゲームを放棄します。フォートナイトの登録プレイヤー数が 3 億 5,000 万人を超え、数十人のプレイヤーが誕生 1 日あたり数テラバイトのデータ。 League of Legends では、完了したマッチごとに 100 以上の変数を収集します。の呼びかけ Duty は、発射されたすべての弾丸、すべての死亡、遅延が臨界しきい値を超えたすべてのフレームを記録します。
この膨大なデータはバックグラウンドノイズではなく、ゲームの神経系です。そして明らかになるテレメトリー なぜプレイヤーは第 3 レベル以降に放棄するのか、どの武器がメタで強すぎるのか、どのサーバー地域なのか ピーク時のラグや、一部のユーザーが支払いを停止するまでにかかる時間に悩まされます。 堅牢なテレメトリ パイプラインがなければ、目隠しをしたまま時速 200 km で運転することになります。
この記事では、それを構築します 産業用パイプラインテレメトリゲーム エンドツーエンド: コレクションから クライアント側とサーバー側のイベント、Apache Kafka による取り込み、Apache Flink によるリアルタイム処理、 履歴分析のためのデータ ウェアハウスや LiveOps の意思決定のためのダッシュボードまで。私たちも見ます 実装方法 プレーヤーのセグメンテーション そして チャーン予測 直接 パイプライン上で。
何を学ぶか
- ゲーム イベントの分類: プレーヤー イベント、ゲームプレイ イベント、経済イベント、システム イベント
- パイプライン アーキテクチャ: クライアント SDK、イベント バス、ストリーム処理、データ ウェアハウス
- リアルタイム分析のための Kafka、Flink、ClickHouse の実装
- Avro を使用したメッセージ スキーマと検証用のスキーマ レジストリ
- リアルタイムの RFM (最新性、頻度、金額) プレーヤーのセグメンテーション
- Flink パイプラインの機能エンジニアリングによるチャーン予測
- LiveOps 意思決定のためのファネル分析と保持メトリクス
- データ品質、重複排除、遅延イベント処理
1. ゲームイベントの分類
パイプラインを構築する前に、何を収集しているのかを知る必要があります。ゲームイベントは4つに分かれており、 主要なカテゴリ。それぞれに異なる頻度、優先順位、収集方法があります。
| カテゴリ | Esempi | 頻度 | 優先度 | 保持 |
|---|---|---|---|---|
| プレイヤーイベント | ログイン、ログアウト、セッション開始、レベルアップ、実績 | 低 (1-10/h) | 高い | 永遠に |
| ゲームプレイイベント | キル、デス、マッチスタート、能力使用、ゾーンエンター | 高 (100-1000/分) | 平均 | 90日 |
| 経済イベント | 購入、アイテム付与、通貨獲得、ショップオープン | 低 (0-5/h) | 批判 | 永遠に |
| システムイベント | fps_drop、packet_loss、再接続、crash_report | 中 (10-100/分) | 高い | 30日 |
| ソーシャルイベント | friends_add、party_join、chat_sent、report_player | 低 (0-20/h) | 平均 | 180日 |
各イベントには、標準フィールドを備えた共通の構造 (エンベロープ) が必要です。 event_id,
event_type, player_id, session_id, server_timestamp,
client_timestamp、およびタイプ固有のペイロード。厳格かつ基本的なスキームは、
ダウンストリームのデータ品質。
// Schema base evento telemetria (TypeScript/protobuf-like)
interface TelemetryEvent {
// Envelope comune a tutti gli eventi
event_id: string; // UUID v4 per deduplicazione
event_type: string; // "player.level_up", "gameplay.kill", etc.
schema_version: string; // "1.2.0" per compatibilità
// Identita
player_id: string; // ID univoco giocatore
session_id: string; // ID sessione corrente
game_build: string; // "2024.12.1" versione client
// Timestamp dual (client + server)
client_ts: number; // Unix ms dal client (non fidarsi ciecamente)
server_ts: number; // Unix ms dal server (source of truth)
client_tz: string; // "Europe/Rome" per analytics geo
// Contesto
platform: "pc" | "console" | "mobile";
region: string; // "eu-west-1"
match_id?: string; // Se in una partita
// Payload specifico del tipo
payload: Record<string, unknown>;
}
// Esempio evento kill
const killEvent: TelemetryEvent = {
event_id: "a3f7e2d1-8c4b-4e9a-b1f2-3d5e7c9a1b2c",
event_type: "gameplay.kill",
schema_version: "1.2.0",
player_id: "player_12345",
session_id: "sess_abcdef",
game_build: "2024.12.1",
client_ts: 1703123456789,
server_ts: 1703123456812,
client_tz: "Europe/Rome",
platform: "pc",
region: "eu-west-1",
match_id: "match_789xyz",
payload: {
victim_id: "player_67890",
weapon: "assault_rifle",
headshot: true,
distance_meters: 47.3,
position: { x: 145.2, y: 89.1, z: 12.0 },
ttk_ms: 320
}
};
2. パイプライン アーキテクチャ: クライアントからダッシュボードまで
産業用ゲーム テレメトリ パイプラインのアーキテクチャは次のパターンに従います ラムダアーキテクチャ または、より現代的なバージョンでは、 カッパ建築 単一のストリーム処理層 リアルタイム分析と履歴分析の両方を提供します。
パイプラインコンポーネント
| レイヤー | 成分 | テクノロジー | 役割 |
|---|---|---|---|
| コレクション | クライアントSDK | TypeScript/C++/Unity | バッファリング、バッチ処理、再試行 |
| コレクション | テレメトリゲートウェイ | ゴー/特使 | 認証、レート制限、ファンアウト |
| 摂取 | イベントバス | アパッチ カフカ | 耐久性、リプレイ、順序付け |
| 処理 | ストリームエンジン | アパッチフリンク | リアルタイム分析、エンリッチメント |
| 給仕 | OLAPデータベース | クリックハウス | 数十億行にわたる高速クエリ |
| 給仕 | ホットストア | レディス | LiveOps のリアルタイム メトリクス |
| 視覚化 | ダッシュボード | グラファナ/メタベース | LiveOps ダッシュボードと分析 |
3. クライアント SDK: バッファリングとバッチ処理
各イベントをサーバーに個別に送信することは、ネットワークに大きなオーバーヘッドを引き起こす典型的な失敗です。 優れたクライアント SDK が実装するもの ローカルバッファリング e バッチの送信: イベント これらはメモリ内のバッファに蓄積され、定期的に、またはバッファが一定の値に達したときに送信されます。 サイズ。ネットワーク障害が発生した場合、SDK は ディスクバッファ イベントを見逃さないように。
// Client SDK di telemetria in TypeScript
class TelemetrySDK {
private buffer: TelemetryEvent[] = [];
private diskBuffer: DiskQueue;
private readonly BATCH_SIZE = 100;
private readonly FLUSH_INTERVAL_MS = 5000;
private readonly MAX_RETRY_ATTEMPTS = 3;
constructor(private config: SDKConfig) {
this.diskBuffer = new DiskQueue('telemetry_offline');
this.startFlushLoop();
this.startOfflineRetryLoop();
}
// Registra un evento nel buffer in-memory
track(eventType: string, payload: Record<string, unknown>): void {
const event: TelemetryEvent = {
event_id: crypto.randomUUID(),
event_type: eventType,
schema_version: "1.2.0",
player_id: this.config.playerId,
session_id: this.config.sessionId,
game_build: this.config.gameBuild,
client_ts: Date.now(),
server_ts: 0, // Valorizzato dal server
client_tz: Intl.DateTimeFormat().resolvedOptions().timeZone,
platform: this.config.platform,
region: this.config.region,
payload
};
this.buffer.push(event);
if (this.buffer.length >= this.BATCH_SIZE) {
this.flush(); // Flush immediato se buffer pieno
}
}
private async flush(): Promise<void> {
if (this.buffer.length === 0) return;
const batch = this.buffer.splice(0, this.BATCH_SIZE);
try {
await this.sendBatch(batch);
} catch (error) {
// Fallback su disk buffer per invio offline
console.warn('Telemetry send failed, persisting to disk', error);
this.diskBuffer.enqueue(batch);
}
}
private async sendBatch(events: TelemetryEvent[]): Promise<void> {
const response = await fetch(this.config.gatewayUrl + '/v1/events', {
method: 'POST',
headers: {
'Content-Type': 'application/x-ndjson',
'X-Game-Build': this.config.gameBuild,
'Authorization': `Bearer ${this.config.apiKey}`
},
// NDJSON: una riga per evento, più efficiente di JSON array
body: events.map(e => JSON.stringify(e)).join('\n'),
signal: AbortSignal.timeout(5000) // 5s timeout
});
if (!response.ok) {
throw new Error(`Gateway error: ${response.status}`);
}
}
private startFlushLoop(): void {
setInterval(() => this.flush(), this.FLUSH_INTERVAL_MS);
}
private startOfflineRetryLoop(): void {
setInterval(async () => {
const pendingBatch = await this.diskBuffer.dequeue(this.BATCH_SIZE);
if (pendingBatch.length > 0) {
try {
await this.sendBatch(pendingBatch);
} catch {
this.diskBuffer.requeue(pendingBatch);
}
}
}, 30_000); // Retry ogni 30s
}
}
4. Kafka トピックとスキーマ レジストリ
パイプラインの中心は Apache Kafka です。イベントはカテゴリごとに個別のトピックで公開されます。 消費者が興味のある種類のイベントのみに登録できるようにします。の スキーマレジストリ Confluent は、各メッセージが定義された Avro スキーマを遵守していることを保証し、不正な形式のメッセージをブロックします。 データ ウェアハウスが汚染される前に。
# Schema Avro per TelemetryEvent (Avro IDL)
{
"type": "record",
"name": "TelemetryEvent",
"namespace": "io.gamestudio.telemetry",
"fields": [
{ "name": "event_id", "type": "string" },
{ "name": "event_type", "type": "string" },
{ "name": "schema_version", "type": "string" },
{ "name": "player_id", "type": "string" },
{ "name": "session_id", "type": "string" },
{ "name": "game_build", "type": "string" },
{ "name": "client_ts", "type": "long", "logicalType": "timestamp-millis" },
{ "name": "server_ts", "type": "long", "logicalType": "timestamp-millis" },
{ "name": "platform", "type": { "type": "enum", "name": "Platform",
"symbols": ["pc", "console", "mobile"] } },
{ "name": "region", "type": "string" },
{ "name": "match_id", "type": ["null", "string"], "default": null },
{ "name": "payload", "type": { "type": "map", "values": "string" } }
]
}
# Configurazione Kafka topics con partitioning
# Topic per categoria, partitionato per player_id (ordering per player)
kafka-topics.sh --create \
--bootstrap-server kafka:9092 \
--topic game.player.events \
--partitions 32 \
--replication-factor 3 \
--config retention.ms=2592000000 # 30 giorni
--config compression.type=lz4 # 60% riduzione size
--config segment.ms=3600000 # Roll segment ogni ora
kafka-topics.sh --create \
--bootstrap-server kafka:9092 \
--topic game.gameplay.events \
--partitions 128 \ # Più partizioni: alta throughput
--replication-factor 3 \
--config retention.ms=7776000000 # 90 giorni
--config compression.type=snappy
kafka-topics.sh --create \
--bootstrap-server kafka:9092 \
--topic game.economy.events \
--partitions 32 \
--replication-factor 3 \
--config retention.ms=-1 # Forever (economia critica)
--config cleanup.policy=compact # Compact per mantenerla
5. Flink ストリーム処理: リアルタイム分析
Apache Flink は、EA、Riot Games、およびその他の大手企業によって選ばれたストリーム処理エンジンです。 ゲームテレメトリ。彼の管理能力 イベント時処理 透かし入りと 遅延イベント (ネットワークから遅れて到着するイベント) とそのステータスを管理するための基本 分散型では、データを損失することなく、時間枠を超えて集計できます。
// Apache Flink job: Kill/Death ratio in real-time per match (Java/Flink API)
public class KDAStreamProcessor {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Checkpoint ogni 10s per fault tolerance
env.enableCheckpointing(10_000);
env.getCheckpointConfig()
.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Source: Kafka consumer per eventi gameplay
KafkaSource<TelemetryEvent> source = KafkaSource
.<TelemetryEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("game.gameplay.events")
.setGroupId("flink-kda-processor")
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(new TelemetryEventDeserializer())
.build();
DataStream<TelemetryEvent> events = env
.fromSource(source, WatermarkStrategy
// Tollerata latenza massima di 30s per eventi ritardati
.<TelemetryEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner((e, ts) -> e.getServerTs()),
"Kafka-Telemetry-Source"
);
// Filtra solo eventi kill e death
DataStream<TelemetryEvent> combatEvents = events
.filter(e -> e.getEventType().equals("gameplay.kill") ||
e.getEventType().equals("gameplay.death"));
// Aggrega KDA per player ogni minuto, keyed by player_id
DataStream<PlayerKDA> kdaStream = combatEvents
.keyBy(TelemetryEvent::getPlayerId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new KDAAggregateFunctionK(), new KDAWindowFunctionK())
.name("KDA-Per-Minute");
// Sink 1: Redis per LiveOps dashboard (latenza < 1s)
kdaStream.addSink(new RedisSink<>(redisConfig, new KDARedisMapper()));
// Sink 2: ClickHouse per storico (latenza accettabile 5-10s)
kdaStream.addSink(ClickHouseSink.builder()
.setTableName("game_analytics.player_kda_minutes")
.build());
env.execute("Game KDA Real-Time Processor");
}
}
// Funzione di aggregazione KDA
public class KDAAggregateFunctionK
implements AggregateFunction<TelemetryEvent, KDAAcc, KDAAcc> {
@Override
public KDAAcc createAccumulator() {
return new KDAAcc(0, 0, 0);
}
@Override
public KDAAcc add(TelemetryEvent event, KDAAcc acc) {
if (event.getEventType().equals("gameplay.kill")) {
return new KDAAcc(acc.kills + 1, acc.deaths, acc.assists);
} else {
return new KDAAcc(acc.kills, acc.deaths + 1, acc.assists);
}
}
@Override
public KDAAcc getResult(KDAAcc acc) { return acc; }
@Override
public KDAAcc merge(KDAAcc a, KDAAcc b) {
return new KDAAcc(a.kills + b.kills, a.deaths + b.deaths,
a.assists + b.assists);
}
}
6. プレーヤーのセグメンテーション: リアルタイムの RFM
モデル RFM (最新性、頻度、金額)、電子商取引から借用し、適応します プレーヤーをアクション可能なクラスターに分類するゲームに最適です。 プレミアムオファーの用意ができており、最も高額な支出を行う人は慎重に扱われるべきです。
- 最新性: 最後のセッションからの日数 (最新性が低い = 離脱リスク)
- 頻度: 過去 30 日間のセッション (高頻度 = アクティブ プレーヤー)
- 金銭的: 過去 90 日間の実質通貨での合計支出
-- ClickHouse: query RFM per segmentazione giocatori
-- Eseguita ogni ora tramite scheduled materialized view
CREATE MATERIALIZED VIEW game_analytics.player_rfm_segments
ENGINE = ReplacingMergeTree()
ORDER BY (player_id, computed_at)
AS
WITH rfm_base AS (
SELECT
player_id,
-- Recency: giorni dall'ultima sessione
dateDiff('day', max(toDate(server_ts / 1000)), today()) AS recency_days,
-- Frequency: sessioni negli ultimi 30 giorni
countDistinct(
CASE WHEN server_ts > subtractDays(now(), 30) THEN session_id END
) AS frequency_30d,
-- Monetary: spesa totale ultimi 90 giorni (economy events)
sumIf(
toFloat64OrZero(payload['amount_usd']),
event_type = 'economy.purchase' AND
server_ts > subtractDays(now(), 90)
) AS monetary_90d,
now() AS computed_at
FROM game_analytics.events_all
WHERE event_type IN ('player.session_start', 'economy.purchase')
GROUP BY player_id
),
rfm_scored AS (
SELECT
player_id,
recency_days,
frequency_30d,
monetary_90d,
computed_at,
-- Score da 1-5 per ogni dimensione (quintili)
ntile(5) OVER (ORDER BY recency_days DESC) AS r_score, -- Desc: bassa recency = buono
ntile(5) OVER (ORDER BY frequency_30d ASC) AS f_score,
ntile(5) OVER (ORDER BY monetary_90d ASC) AS m_score
FROM rfm_base
)
SELECT
player_id,
recency_days,
frequency_30d,
monetary_90d,
r_score,
f_score,
m_score,
-- Segment label
multiIf(
r_score >= 4 AND f_score >= 4 AND m_score >= 4, 'champions',
r_score >= 4 AND f_score >= 4, 'loyal_players',
r_score >= 4 AND m_score >= 4, 'potential_champions',
r_score <= 2 AND f_score >= 3, 'at_risk',
r_score <= 2 AND f_score <= 2, 'churned_risk',
m_score >= 4, 'big_spenders',
'casual'
) AS segment,
computed_at
FROM rfm_scored;
RFM セグメントと推奨される LiveOps アクション
| セグメント | % 典型的な | 推奨されるアクション | チャネル |
|---|---|---|---|
| チャンピオン | 5% | VIP待遇、ベータアクセス、限定コンテンツ | ゲーム内 + メール |
| 忠実なプレイヤー | 15% | ロイヤルティ報酬、バトルパスインセンティブ | ゲーム内 |
| 危険にさらされています | 20% | 再エンゲージメント キャンペーン、30% 割引オファー | プッシュ + メール |
| チャーンリスク | 25% | 勝ち戻し: 7 日間の無料プレミアム | 電子メール + SMS |
| 大金を使う人 | 3% | クジラ保護、パーソナライズされたオファー | 直接的なアウトリーチ |
| カジュアル | 32% | 改善されたチュートリアル、合理化されたオンボーディング | ゲーム内 |
7. Flink の特徴エンジニアリングによるチャーン予測
チャーン予測にはリアルタイムの特徴エンジニアリングが必要です: ファイルの抽出 ML モデルが次の 7 日間の放棄の確率を予測するために使用する特性。 これらの特徴はスライディング ウィンドウで計算され、特徴ストア (Redis または Feast) に書き込まれます。
// Flink: Feature engineering per churn prediction
// Calcola feature su finestra scorrevole di 7 giorni
DataStream<ChurnFeatures> churnFeatures = events
.keyBy(TelemetryEvent::getPlayerId)
.window(SlidingEventTimeWindows.of(Time.days(7), Time.hours(1)))
.process(new ChurnFeatureExtractor())
.name("Churn-Feature-Engineering");
public class ChurnFeatureExtractor
extends ProcessWindowFunction<TelemetryEvent, ChurnFeatures, String, TimeWindow> {
@Override
public void process(String playerId, Context ctx,
Iterable<TelemetryEvent> events, Collector<ChurnFeatures> out) {
List<TelemetryEvent> eventList = new ArrayList<>();
events.forEach(eventList::add);
// Feature calcolo
long sessionCount = eventList.stream()
.filter(e -> e.getEventType().equals("player.session_start"))
.count();
double avgSessionDurationMin = eventList.stream()
.filter(e -> e.getEventType().equals("player.session_end"))
.mapToDouble(e -> Double.parseDouble(e.getPayload().getOrDefault("duration_sec", "0")))
.average().orElse(0.0) / 60.0;
long matchesCompleted = eventList.stream()
.filter(e -> e.getEventType().equals("gameplay.match_end"))
.count();
long matchesAbandoned = eventList.stream()
.filter(e -> e.getEventType().equals("gameplay.match_abandon"))
.count();
double abandonRate = matchesCompleted > 0
? (double) matchesAbandoned / (matchesCompleted + matchesAbandoned)
: 0.0;
double totalSpend = eventList.stream()
.filter(e -> e.getEventType().equals("economy.purchase"))
.mapToDouble(e -> Double.parseDouble(e.getPayload().getOrDefault("amount_usd", "0")))
.sum();
long daysSinceLastSession = eventList.stream()
.filter(e -> e.getEventType().equals("player.session_start"))
.mapToLong(TelemetryEvent::getServerTs)
.max()
.map(lastTs -> (System.currentTimeMillis() - lastTs) / 86_400_000L)
.orElse(999L);
out.collect(new ChurnFeatures(
playerId,
sessionCount,
avgSessionDurationMin,
matchesCompleted,
abandonRate,
totalSpend,
daysSinceLastSession,
ctx.window().getEnd()
));
}
}
// Sink su Redis Feature Store con TTL
churnFeatures.addSink(new RedisSink<>(
redisConfig,
(ChurnFeatures f, FlinkJedisPoolConfig config) -> {
try (Jedis jedis = new Jedis(config.getHost(), config.getPort())) {
String key = "churn_features:" + f.getPlayerId();
Map<String, String> fields = new HashMap<>();
fields.put("session_count_7d", String.valueOf(f.getSessionCount()));
fields.put("avg_session_min", String.valueOf(f.getAvgSessionDurationMin()));
fields.put("abandon_rate", String.valueOf(f.getAbandonRate()));
fields.put("total_spend_usd", String.valueOf(f.getTotalSpend()));
fields.put("days_since_last", String.valueOf(f.getDaysSinceLastSession()));
jedis.hset(key, fields);
jedis.expire(key, 86400 * 7); // TTL 7 giorni
}
}
));
8. データ品質: 重複排除と遅延イベント処理
分散システムでは重複は避けられません。クライアントはバッチを受信しなかった場合に再送信できます。
ネットワーク分割により二重書き込みが発生する可能性があります。に基づく重複排除 event_id
パイプラインのできるだけ早い段階で実行する必要があります。
// Flink: Deduplicazione stateful con event_id
// Usa RocksDB state backend per gestire miliardi di ID
DataStream<TelemetryEvent> deduplicated = rawEvents
.keyBy(TelemetryEvent::getEventId)
.process(new DeduplicationFunction(Duration.ofHours(24)))
.name("Event-Deduplication");
public class DeduplicationFunction
extends KeyedProcessFunction<String, TelemetryEvent, TelemetryEvent> {
private final Duration deduplicationWindow;
private ValueState<Boolean> seenState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Boolean> descriptor =
new ValueStateDescriptor<>("event-seen", Boolean.class);
// TTL: pulisce lo stato dopo 24h per evitare memory leak
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
descriptor.enableTimeToLive(ttlConfig);
seenState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(TelemetryEvent event, Context ctx,
Collector<TelemetryEvent> out) throws Exception {
if (seenState.value() == null) {
// Prima volta che vediamo questo event_id
seenState.update(true);
out.collect(event); // Passa all'avanti
}
// Altrimenti: duplicato, scartato silenziosamente
// (registra metrica per monitoring)
}
}
-- ClickHouse: tabella eventi con ReplacingMergeTree per dedup storage-level
CREATE TABLE game_analytics.events_all (
event_id String,
event_type LowCardinality(String),
player_id String,
session_id String,
server_ts DateTime64(3),
platform LowCardinality(String),
region LowCardinality(String),
match_id Nullable(String),
payload Map(String, String),
ingested_at DateTime DEFAULT now()
) ENGINE = ReplacingMergeTree(ingested_at)
PARTITION BY toYYYYMM(server_ts)
ORDER BY (player_id, event_type, server_ts)
SETTINGS index_granularity = 8192;
-- Indice bloom filter su player_id per query veloci per giocatore
ALTER TABLE game_analytics.events_all
ADD INDEX bf_player_id player_id TYPE bloom_filter(0.01) GRANULARITY 4;
警告: タイムスタンプ クライアントとタイムスタンプ サーバー
決して信用しないでください client_ts 分析の信頼できる情報源として: クライアント クロック
位相が何時間もずれていたり(特にモバイルで)、詐欺師によって操作されたり、単に
間違っています。常に使用する server_ts 分析クエリ用。の client_ts そして役に立つ
ネットワーク遅延を測定するためだけに (server_ts - client_ts)そして再構築する
単一セッション内の一連のイベント。
9. ファネル分析と保持指標
リテンション メトリクスは、ライブサービス ゲームにとって最も重要です。 1日目の保持、 7 日目と 30 日目 (D1/D7/D30) は、ゲームの健全性を評価するための業界標準の KPI です。
-- ClickHouse: calcolo retention D1/D7/D30 per coorte
-- Una "coorte" e il gruppo di giocatori con lo stesso primo giorno di gioco
SELECT
install_date,
count(DISTINCT player_id) AS cohort_size,
-- D1 Retention: tornati il giorno dopo l'installazione
countDistinctIf(player_id,
dateDiff('day', install_date, last_seen_date) >= 1
) AS retained_d1,
-- D7 Retention
countDistinctIf(player_id,
dateDiff('day', install_date, last_seen_date) >= 7
) AS retained_d7,
-- D30 Retention
countDistinctIf(player_id,
dateDiff('day', install_date, last_seen_date) >= 30
) AS retained_d30,
-- Percentuali
round(100.0 * retained_d1 / cohort_size, 2) AS d1_pct,
round(100.0 * retained_d7 / cohort_size, 2) AS d7_pct,
round(100.0 * retained_d30 / cohort_size, 2) AS d30_pct
FROM (
-- Subquery: per ogni giocatore, data installazione e ultima sessione
SELECT
player_id,
min(toDate(server_ts / 1000)) AS install_date,
max(toDate(server_ts / 1000)) AS last_seen_date
FROM game_analytics.events_all
WHERE event_type = 'player.session_start'
GROUP BY player_id
)
WHERE install_date >= today() - 90 -- Ultimi 90 giorni di coorti
GROUP BY install_date
ORDER BY install_date DESC;
-- Benchmark settore (mobile games 2024):
-- D1: 25-40% (buono), D7: 10-20%, D30: 3-8%
-- Top performers: D1 40%+, D7 20%+, D30 8%+
10. スケーリング: 実数とコストの考慮事項
リリース週末の AAA ゲームでは、1 分あたり 1,000 万から 5,000 万のピークのイベントが生成される可能性があります。その方法は次のとおりです パイプラインの規模と、推定コストはいくらですか。
100 万人の同時プレイヤーに対応したサイジング
| 成分 | サイズ設定 | AWS の月額コスト (推定) |
|---|---|---|
| カフカ (MSK) | 12 m5.4xlarge ブローカー | ~8,000ドル |
| フリンク (EMR) | 20 タスクマネージャー r5.2xlarge | ~12,000ドル |
| ClickHouse (自己ホスト型) | 6 ノード r6a.8xlarge + 50TB SSD | ~15,000ドル |
| Redis (ElastiCache) | 3 つの r7g.2xlarge クラスター ノード | ~3,000ドル |
| テレメトリゲートウェイ | ECS 自動スケーリング (10 ~ 50 タスク) | ~5,000ドル |
| 合計 | ~$43,000/月 |
月間アクティブ プレーヤーが 1,000 万人の場合、コストはユーザーあたり月額約 0.004 ドルとなり、多額の投資となります。 これは、ARPU と維持率を最適化できる能力によって正当化されます。
コストの最適化
- インテリジェントなサンプリング: 高頻度のゲームプレイ イベント (位置、アニメーション) の場合、 すべてのイベントを送信するのではなく、5 つごとに 1 つのイベントのみを送信します。品質の低下はほとんどなく、スループットは 80% 節約されます。
- 階層型ストレージ: S3 上のコールド層を備えた ClickHouse。ローカル SSD 上の最近のデータ (7 日間)、 S3 上の履歴データはアクセスが遅くなりますが、10 分の 1 のコストがかかります。
- 送信前の集約: fps や ping などの単純なメトリクスの場合、クライアント側で集計します すべてのフレームを送信するのではなく、(30 秒ごとの平均、最小、最大)。
結論
産業用ゲーム テレメトリ パイプラインは標準的なデータ エンジニアリング プロジェクトではありません。 ゲーム パターン、レイテンシの制約、LiveOps のニーズを深く理解します。 組み合わせ カフカ + フリンク + クリックハウス + Redis そして事実上の標準となった メトリクスのレイテンシーを 1 秒未満に維持しながら大量のボリュームを処理できる能力で業界でトップクラスの評価を受けています 批判。
RFM プレーヤーのセグメンテーションとリアルタイムのチャーン予測により、生データがアクションに変換されます 具体的: ターゲットを絞ったリエンゲージメント キャンペーン、パーソナライズされたオファー、LiveOps の決定 直感ではなくデータによって情報が得られます。高品質のテレメトリに投資しているゲームでは、 そうでない人に比べて、D30 保持率は平均して 15 ~ 25% 向上します。
ゲーム バックエンド シリーズの次のステップ
- 前の記事: LiveOps: イベント システムと機能フラグ
- 次の記事: クラウド ゲーム: WebRTC とエッジ ノードを使用したストリーミング
- 関連する洞察: ビジネス向け MLOps - 本番環境での AI モデル







