テレマティクス パイプライン: 大規模な UBI データ処理
2025 年までに、2,100 万人以上のアメリカのドライバーがテレマティクス データを共有することになります。 保険。欧州では、AXA が 2025 年 1 月に市場向けに AI 主導の UBI ソリューションを発売しました OBD-II、スマートフォンベースのテレマティクス、リスク予測モデルを統合するヨーロッパ。 UBI (従量制保険) 市場は 2025 年に 626 億ドルの価値があり、 2035 年までの CAGR は 24.8%。
保険テレマティクスは、単なる GPS データの収集ではありません。それは、次のような複雑なパイプラインです。 OBD-II デバイス、スマートフォン、IoT センサーからの生の信号を 個人のリスクスコア 個々の被保険者の保険料を決定します。 悪いドライバーはより多くの賃金を支払い、良いドライバーは節約します。原理は単純です。 技術的な実装とその他すべて。
この記事では、データの取り込みから完全なテレマティクス パイプラインを構築します。 OBD-II は、リアルタイムのリスクスコアリングから運転行動の機能エンジニアリングを実現します。 スケーラビリティ、遅延、プライバシーを考慮した動的な価格設定。
何を学ぶか
- UBI (Pay-How-You-Drive) のテレマティクス パイプライン アーキテクチャ
- OBD-IIプロトコル:パラメータ、サンプリング周波数、エッジ処理
- 運転動作(急ブレーキ、コーナリング、スピード超過)の特徴エンジニアリング
- Apache Kafka および Apache Flink によるストリーミング処理
- リアルタイムのリスクスコアリングと一時的な集計
- 運転スコアに基づいた動的な価格設定
- 地理位置情報データに関する GDPR の考慮事項
1. 保険テレマティクスの種類
テレマティック データ収集には 3 つの主要なモデルがあり、それぞれに異なるトレードオフがあります。 精度、導入コスト、顧客の導入:
| タイプ | デバイス | 収集されたデータ | 正確さ | 料金 |
|---|---|---|---|---|
| OBD-IIドングル | OBDポートプラグイン | ECU、速度、回転数、加速度 | 高い | 20 ~ 50 ユーロのハードウェア |
| スマホアプリ | iOS/Androidアプリ | GPS、加速度計、ジャイロスコープ | 平均 | ゼロハードウェア |
| ブラックボックスOEM | OEM デバイスがインストールされている | ECU + GPS + 加速度センサー | 非常に高い | 100~300ユーロ |
| コネクテッドカーAPI | 車両ネイティブ API | すべてネイティブテレマティクス | 最大 | OEM契約 |
2. OBD-IIのデータ構造
OBD-II (オンボード診断 II) プロトコルは、PID を介して数十の ECU パラメータを公開します (パラメータID)。保険リスクのスコアリングに最も関連する PID は次のとおりです。
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class OBDReading:
"""Singola lettura OBD-II dal veicolo."""
device_id: str # ID univoco del dispositivo
policy_id: str # Polizza associata
timestamp: datetime # UTC timestamp
latitude: float # Gradi decimali (WGS84)
longitude: float # Gradi decimali (WGS84)
speed_kmh: float # Velocita in km/h (PID 0x0D)
rpm: int # Giri per minuto (PID 0x0C)
throttle_pct: float # Posizione acceleratore % (PID 0x11)
engine_load_pct: float # Carico motore % (PID 0x04)
coolant_temp_c: int # Temperatura liquido (PID 0x05)
# Calcolati lato edge (non nativi OBD)
acceleration_ms2: Optional[float] = None # m/s^2 (+/-)
heading_deg: Optional[float] = None # Direzione 0-360
road_type: Optional[str] = None # "URBAN", "RURAL", "HIGHWAY"
@dataclass
class TripSummary:
"""Riepilogo di un singolo viaggio."""
trip_id: str
policy_id: str
device_id: str
start_time: datetime
end_time: datetime
distance_km: float
duration_minutes: float
# Metrics comportamentali (calcolati dalla pipeline)
max_speed_kmh: float
avg_speed_kmh: float
hard_braking_count: int # Decelerazioni > 0.3g
hard_acceleration_count: int # Accelerazioni > 0.3g
hard_cornering_count: int # Laterale > 0.3g
speeding_pct: float # % tempo sopra limite
night_driving_pct: float # % km notturni (22:00-06:00)
highway_pct: float # % km autostrada
# Score aggregati
safety_score: float # 0-100 (100 = ottimo)
fuel_efficiency_score: float # 0-100
3. デバイス上のエッジ処理
生の OBD-II データは、クラウドに送信される前にエッジで前処理する必要があります。 これにより、必要な帯域幅が削減され、個別のイベント (急ブレーキ、スピード違反) がリアルタイムで計算されます。 エッジ プロセッサは通常、OBD デバイスの ARM マイクロコントローラー上で実行されます。
from collections import deque
from dataclasses import dataclass
import math
class EdgeEventDetector:
"""
Algoritmo di detection eventi di guida.
Gira sul dispositivo embedded (ARM Cortex-M).
Sampling rate: 10 Hz (una lettura ogni 100ms)
"""
HARD_BRAKING_THRESHOLD = -3.0 # m/s^2 (0.3g)
HARD_ACCEL_THRESHOLD = 3.0 # m/s^2 (0.3g)
HARD_CORNERING_THRESHOLD = 3.0 # m/s^2 laterale
def __init__(self, window_size: int = 5):
self.speed_history = deque(maxlen=window_size)
self.events: list = []
self.prev_timestamp = None
def process_reading(self, reading: dict) -> list:
"""
Processa una singola lettura e rileva eventi.
Restituisce lista di eventi rilevati (vuota se nessuno).
"""
detected = []
current_speed = reading["speed_kmh"]
current_ts = reading["timestamp"]
# Calcola accelerazione longitudinale
if self.speed_history and self.prev_timestamp:
prev_speed = self.speed_history[-1]
dt = (current_ts - self.prev_timestamp).total_seconds()
if dt > 0:
# Conversione km/h -> m/s
delta_v = (current_speed - prev_speed) / 3.6
acceleration = delta_v / dt
if acceleration <= self.HARD_BRAKING_THRESHOLD:
detected.append({
"type": "HARD_BRAKING",
"timestamp": current_ts.isoformat(),
"value": round(acceleration, 2),
"speed_at_event": current_speed
})
elif acceleration >= self.HARD_ACCEL_THRESHOLD:
detected.append({
"type": "HARD_ACCELERATION",
"timestamp": current_ts.isoformat(),
"value": round(acceleration, 2),
"speed_at_event": current_speed
})
self.speed_history.append(current_speed)
self.prev_timestamp = current_ts
return detected
def detect_speeding(
self,
speed_kmh: float,
speed_limit_kmh: float,
tolerance_pct: float = 0.10
) -> bool:
"""Rileva superamento limite con tolleranza del 10%."""
threshold = speed_limit_kmh * (1 + tolerance_pct)
return speed_kmh > threshold
class EdgeBatchManager:
"""
Gestisce il buffering e l'invio batch al cloud.
Strategia: invia ogni 30 secondi o al termine del viaggio.
"""
BATCH_INTERVAL_SECONDS = 30
MAX_BATCH_SIZE = 300 # max readings per batch
def __init__(self, uploader):
self.buffer: list = []
self.events: list = []
self.last_send = None
self.uploader = uploader
def add_reading(self, reading: dict, events: list) -> None:
self.buffer.append(reading)
self.events.extend(events)
if len(self.buffer) >= self.MAX_BATCH_SIZE:
self._flush_async()
def _flush_async(self) -> None:
if not self.buffer:
return
payload = {
"readings": list(self.buffer),
"events": list(self.events),
"compressed": True
}
# Invia compresso (gzip) via MQTT o HTTPS
self.uploader.send(payload)
self.buffer.clear()
self.events.clear()
4. Apache Kafka と Apache Flink を使用したストリーミング パイプライン
クラウド パイプラインはエッジからバッチを受信し、それらをストリーミングして計算します。 旅行の概要、一定期間にわたるイベントの集計、および近い将来のリスクスコアの更新 リアルタイム。このアーキテクチャは、メッセージ バスとしての Apache Kafka と、メッセージ バスとしての Apache Flink に基づいています。 ストリーム処理:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common import Time, Types
import json
from datetime import datetime
def build_telematics_pipeline():
"""
Pipeline Flink per processing dati telematici.
Topic Kafka:
- telemetry.raw : letture OBD grezze
- telemetry.events : eventi discreti (hard braking, etc.)
- telemetry.trips : trip summaries
- risk.scores : driving scores aggiornati
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# 1. Source: letture OBD-II raw
kafka_source = (
KafkaSource.builder()
.set_bootstrap_servers("kafka:9092")
.set_topics("telemetry.raw")
.set_group_id("telematics-processor")
.set_starting_offsets(KafkaOffsetsInitializer.latest())
.set_value_only_deserializer(SimpleStringSchema())
.build()
)
raw_stream = env.from_source(
kafka_source,
WatermarkStrategy.for_monotonous_timestamps(),
"OBD Raw Source"
)
# 2. Parse e validazione
parsed_stream = raw_stream.map(lambda s: json.loads(s)) \
.filter(lambda r: validate_reading(r))
# 3. Key by policy_id per partitioning
keyed_stream = parsed_stream.key_by(lambda r: r["policy_id"])
# 4. Trip detection tramite session windows
# Una sessione si chiude dopo 5 minuti di inattivita
trip_stream = keyed_stream \
.window(SessionWindows.with_gap(Time.minutes(5))) \
.apply(TripAggregator())
# 5. Feature engineering per ogni trip
features_stream = trip_stream.map(lambda t: extract_features(t))
# 6. Aggiorna driving score (aggregazione 90 giorni rolling)
score_stream = features_stream \
.key_by(lambda f: f["policy_id"]) \
.window(TumblingEventTimeWindows.of(Time.days(1))) \
.reduce(RollingScoreReducer())
# 7. Sink: aggiorna Redis e pubblica evento
score_stream.add_sink(RedisSink())
score_stream.add_sink(KafkaSink("risk.scores"))
env.execute("Telematics Processing Pipeline")
def validate_reading(reading: dict) -> bool:
"""Validazione basica di una lettura OBD."""
required_fields = ["device_id", "policy_id", "timestamp", "speed_kmh"]
if not all(f in reading for f in required_fields):
return False
if not (0 <= reading["speed_kmh"] <= 300):
return False
if "latitude" in reading and not (-90 <= reading["latitude"] <= 90):
return False
return True
def extract_features(trip: dict) -> dict:
"""
Estrae feature per risk scoring da un trip summary.
Output: feature vector per il modello ML.
"""
readings = trip["readings"]
events = trip["events"]
duration_sec = trip["duration_seconds"]
hard_braking = [e for e in events if e["type"] == "HARD_BRAKING"]
hard_accel = [e for e in events if e["type"] == "HARD_ACCELERATION"]
speeding = [e for e in events if e["type"] == "SPEEDING"]
return {
"policy_id": trip["policy_id"],
"trip_id": trip["trip_id"],
"trip_date": trip["start_time"][:10],
# Feature comportamentali (normalizzate per 100km)
"hard_braking_per_100km": len(hard_braking) / max(trip["distance_km"], 0.1) * 100,
"hard_accel_per_100km": len(hard_accel) / max(trip["distance_km"], 0.1) * 100,
"speeding_events_per_100km": len(speeding) / max(trip["distance_km"], 0.1) * 100,
# Feature temporali
"night_driving_ratio": compute_night_ratio(readings),
"rush_hour_ratio": compute_rush_hour_ratio(readings),
"weekend_ratio": compute_weekend_ratio(readings),
# Feature cinematiche
"avg_speed_kmh": trip.get("avg_speed_kmh", 0),
"max_speed_kmh": trip.get("max_speed_kmh", 0),
"avg_acceleration": compute_avg_acceleration(readings),
# Feature contesto
"highway_ratio": trip.get("highway_pct", 0) / 100,
"urban_ratio": trip.get("urban_pct", 0) / 100,
"distance_km": trip["distance_km"],
"duration_hours": duration_sec / 3600
}
def compute_night_ratio(readings: list) -> float:
"""Calcola frazione dei km percorsi di notte (22:00-06:00)."""
if not readings:
return 0.0
night_readings = [
r for r in readings
if int(r["timestamp"][11:13]) >= 22 or int(r["timestamp"][11:13]) < 6
]
return len(night_readings) / len(readings)
5. リスクスコアリングモデル: 運転行動から報酬まで
UBI システムの中心となるのは、行動の特徴を運転スコアに変換するモデルです。 そして保険料調整係数に移ります。この分野で最も使用されているモデルは、 XGBoost アンサンブル + ビジネス ルール:
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
import xgboost as xgb
from typing import NamedTuple
class DrivingScore(NamedTuple):
"""Driving score aggregato su finestra rolling 90 giorni."""
policy_id: str
score: float # 0-100 (100 = guida eccellente)
percentile: float # Percentile rispetto alla flotta
category: str # "EXCELLENT", "GOOD", "FAIR", "POOR", "DANGEROUS"
premium_factor: float # Moltiplicatore premio (es. 0.85 = -15%)
class DrivingScoreCalculator:
"""
Calcola il driving score aggregando features su finestra 90 giorni.
Architettura: features -> normalization -> weighted scoring -> premium mapping
"""
WEIGHTS = {
"hard_braking_per_100km": -0.30, # Impatto negativo: 30%
"hard_accel_per_100km": -0.15, # Impatto negativo: 15%
"speeding_events_per_100km": -0.25, # Impatto negativo: 25%
"night_driving_ratio": -0.15, # Impatto negativo: 15%
"avg_speed_kmh": -0.10, # Impatto negativo: 10%
"highway_ratio": 0.05, # Bonus autostrada: +5%
}
# Soglie per conversione score -> category
CATEGORY_THRESHOLDS = [
(90, "EXCELLENT"),
(75, "GOOD"),
(55, "FAIR"),
(35, "POOR"),
(0, "DANGEROUS"),
]
# Mappa category -> premium discount/surcharge
PREMIUM_FACTORS = {
"EXCELLENT": 0.75, # -25% sconto
"GOOD": 0.90, # -10% sconto
"FAIR": 1.00, # Premio base
"POOR": 1.15, # +15% sovrapprezzo
"DANGEROUS": 1.35, # +35% sovrapprezzo
}
def __init__(self, fleet_stats: dict):
"""
fleet_stats: statistiche aggregate della flotta per normalizzazione
Es: {"hard_braking_per_100km": {"mean": 2.5, "std": 1.2}, ...}
"""
self.fleet_stats = fleet_stats
def calculate(self, features_90d: dict) -> DrivingScore:
"""
Calcola il driving score aggregato su 90 giorni.
features_90d: dizionario con medie delle feature sul periodo
"""
# Normalizza features rispetto alla flotta
normalized = self._normalize_features(features_90d)
# Calcola score pesato (base 100)
raw_score = 100.0
for feature, weight in self.WEIGHTS.items():
if feature in normalized:
z_score = normalized[feature]
# z-score positivo su feature negative peggiora lo score
raw_score += weight * z_score * 10
# Clamp tra 0 e 100
score = float(np.clip(raw_score, 0, 100))
# Determina categoria
category = self._get_category(score)
# Calcola percentile rispetto alla flotta
percentile = self._get_percentile(score)
return DrivingScore(
policy_id=features_90d["policy_id"],
score=round(score, 1),
percentile=round(percentile, 1),
category=category,
premium_factor=self.PREMIUM_FACTORS[category]
)
def _normalize_features(self, features: dict) -> dict:
normalized = {}
for key, value in features.items():
if key in self.fleet_stats:
stats = self.fleet_stats[key]
std = stats.get("std", 1.0)
if std > 0:
normalized[key] = (value - stats["mean"]) / std
else:
normalized[key] = 0.0
return normalized
def _get_category(self, score: float) -> str:
for threshold, category in self.CATEGORY_THRESHOLDS:
if score >= threshold:
return category
return "DANGEROUS"
def _get_percentile(self, score: float) -> float:
# Approssimazione: usa distribuzione normale con media=65, std=15
from scipy.stats import norm
return float(norm.cdf(score, loc=65, scale=15) * 100)
class UBIPremiumCalculator:
"""Calcola il premio UBI finale applicando il driving score."""
def calculate_ubi_premium(
self,
base_premium: float,
driving_score: DrivingScore,
distance_km: float,
base_distance_km: float = 15000.0
) -> dict:
"""
Premio UBI = Base * FactoreGuida * FattoreDistanza
- Fattore guida: basato su driving score (0.75-1.35)
- Fattore distanza: aggiustamento per km effettivi vs. dichiarati
"""
# Fattore distanza (max ±20%)
distance_ratio = distance_km / base_distance_km
distance_factor = np.clip(distance_ratio, 0.8, 1.2)
# Premio finale
ubi_premium = base_premium * driving_score.premium_factor * distance_factor
return {
"base_premium": round(base_premium, 2),
"driving_score": driving_score.score,
"driving_factor": driving_score.premium_factor,
"distance_km": distance_km,
"distance_factor": round(distance_factor, 3),
"final_premium": round(ubi_premium, 2),
"saving_vs_base": round(base_premium - ubi_premium, 2),
"category": driving_score.category
}
6. トリップ検出: セッションウィンドウとファイントリップ信号
旅の始まりと終わりを検出することは、思っているよりも複雑です。車両は、 信号で止まったり、一時停止したり、地下車庫に入って乗り遅れたりした場合 GPS信号。トリップ検出ロジックは、構成可能なギャップ タイムアウトを持つセッション ウィンドウを使用します。
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class TripDetector:
"""
Stato macchina per rilevamento trip.
Usa session window: gap > 5 minuti = fine viaggio.
"""
GAP_TIMEOUT_SECONDS: int = 300 # 5 minuti
MIN_TRIP_DISTANCE_KM: float = 0.5 # Ignora microtrip
policy_id: str
current_trip: Optional[dict] = None
last_reading_time: Optional[datetime] = None
readings_buffer: list = field(default_factory=list)
def process_reading(self, reading: dict) -> Optional[dict]:
"""
Processa una lettura e restituisce il trip completato se rilevato.
"""
current_time = datetime.fromisoformat(reading["timestamp"])
completed_trip = None
if self.current_trip is None:
# Inizio nuovo viaggio
if reading["speed_kmh"] > 2.0:
self.current_trip = {
"trip_id": f"trip-{self.policy_id}-{current_time.strftime('%Y%m%d%H%M%S')}",
"policy_id": self.policy_id,
"start_time": current_time.isoformat(),
"start_lat": reading["latitude"],
"start_lng": reading["longitude"],
"readings_count": 0,
"total_distance_km": 0.0
}
self.readings_buffer = [reading]
else:
# Viaggio in corso
if self.last_reading_time:
gap = (current_time - self.last_reading_time).total_seconds()
if gap > self.GAP_TIMEOUT_SECONDS:
# Gap troppo grande: chiudi viaggio corrente
completed_trip = self._close_trip(reading)
# Inizia nuovo viaggio se in moto
if reading["speed_kmh"] > 2.0:
self.current_trip = self._start_new_trip(reading, current_time)
self.readings_buffer = [reading]
else:
self.current_trip = None
self.readings_buffer = []
else:
self.readings_buffer.append(reading)
self._update_trip_stats(reading)
self.last_reading_time = current_time
return completed_trip
def _close_trip(self, last_reading: dict) -> Optional[dict]:
if not self.current_trip:
return None
trip = {
**self.current_trip,
"end_time": last_reading["timestamp"],
"end_lat": last_reading["latitude"],
"end_lng": last_reading["longitude"],
"readings": self.readings_buffer,
"duration_seconds": (
datetime.fromisoformat(last_reading["timestamp"]) -
datetime.fromisoformat(self.current_trip["start_time"])
).total_seconds()
}
# Scarta microtrip
if trip["total_distance_km"] < self.MIN_TRIP_DISTANCE_KM:
return None
return trip
def _update_trip_stats(self, reading: dict) -> None:
"""Aggiorna statistiche del trip corrente."""
if self.readings_buffer and self.current_trip:
prev = self.readings_buffer[-1]
dist = haversine_distance(
prev["latitude"], prev["longitude"],
reading["latitude"], reading["longitude"]
)
self.current_trip["total_distance_km"] += dist
def haversine_distance(lat1, lon1, lat2, lon2) -> float:
"""Distanza Haversine in km tra due coordinate GPS."""
R = 6371.0
import math
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
7. 位置情報データのプライバシーと GDPR
テレマティック データには、正確な地理位置情報、時間、旅程、個人データが含まれます。 GDPR の下では機密性が高くなります。システムはプライバシー・バイ・デザインをゼロから実装する必要があります。
保険テレマティクスの GDPR 要件
- 明示的な同意: 所有者は運転データの収集に明示的に同意する必要があります
- 最小化: リスクスコアリングに厳密に必要なデータのみを収集する
- 保持: 生の GPS データは処理後に削除されます (最大 90 日間)。ポリシーの期間中に保存される集計
- 匿名化: スコア計算後、GPS データを集約し、匿名化することができます。
- 忘れられる権利: オンデマンドのデータ削除用API
- 転送: データは EU 内で処理されます (適切な保証がない限り、EEA 外に転送することはできません)
class TelematicsPrivacyService:
"""Gestisce data lifecycle nel rispetto GDPR."""
RAW_GPS_RETENTION_DAYS = 90
AGGREGATED_RETENTION_YEARS = 7 # Per audit assicurativo
async def anonymize_trip(self, trip: dict) -> dict:
"""
Anonimizza un trip rimuovendo coordinate GPS precise.
Mantiene solo dati aggregati per il risk score.
"""
return {
"trip_id": trip["trip_id"],
"policy_id": trip["policy_id"],
"date": trip["start_time"][:10],
"duration_hours": trip["duration_seconds"] / 3600,
"distance_km": trip["total_distance_km"],
# Features comportamentali aggregate (no GPS raw)
"hard_braking_count": trip.get("hard_braking_count", 0),
"speeding_pct": trip.get("speeding_pct", 0),
"night_ratio": trip.get("night_driving_ratio", 0),
# NO: coordinate GPS, itinerario, soste specifiche
}
async def delete_raw_data(self, policy_id: str) -> None:
"""Elimina tutti i dati GPS grezzi per una polizza (diritto all'oblio)."""
await self.db.execute(
"DELETE FROM raw_telemetry WHERE policy_id = $1",
policy_id
)
await self.db.execute(
"DELETE FROM gps_tracks WHERE policy_id = $1",
policy_id
)
# Mantieni solo aggregati anonimi per audit
async def purge_expired_raw_data(self) -> int:
"""Job giornaliero: elimina dati GPS grezzi scaduti."""
from datetime import date, timedelta
cutoff = date.today() - timedelta(days=self.RAW_GPS_RETENTION_DAYS)
result = await self.db.execute(
"DELETE FROM raw_telemetry WHERE created_at < $1",
cutoff
)
return result
8. ダッシュボードとドライバーのフィードバック
ドライバーの関与はUBIの成功の基礎です。受け取るドライバーは、 リアルタイムのフィードバックにより、運転行動が 15 ~ 20% 改善されます (AXA データ 2024)。 モバイル ダッシュボードには、スコア、傾向、およびパーソナライズされた推奨事項が表示される必要があります。
class DrivingFeedbackGenerator:
"""Genera feedback personalizzato per il guidatore."""
IMPROVEMENT_TIPS = {
"hard_braking": [
"Mantieni una distanza di sicurezza maggiore dal veicolo precedente",
"Anticipa le frenate osservando il traffico a distanza",
"Riduci la velocità in anticipo agli incroci"
],
"speeding": [
"Usa il cruise control in autostrada",
"Imposta un avviso velocità a 10 km/h sopra il limite",
"Considera che la multa media costa più dello sconto UBI"
],
"night_driving": [
"Pianifica viaggi lunghi durante il giorno quando possibile",
"Se guidi di notte, fai pause ogni 2 ore"
]
}
def generate_weekly_report(
self,
policy_id: str,
current_score: DrivingScore,
weekly_trips: list[dict]
) -> dict:
"""Genera report settimanale per il guidatore."""
worst_behavior = self._identify_worst_behavior(weekly_trips)
tips = [
self.IMPROVEMENT_TIPS[b][0]
for b in worst_behavior[:2]
if b in self.IMPROVEMENT_TIPS
]
return {
"policy_id": policy_id,
"week_score": current_score.score,
"score_category": current_score.category,
"premium_factor": current_score.premium_factor,
"estimated_annual_saving": self._estimate_saving(current_score),
"trips_this_week": len(weekly_trips),
"total_km_this_week": sum(t["distance_km"] for t in weekly_trips),
"improvement_tips": tips,
"percentile_rank": f"Guidi meglio del {current_score.percentile:.0f}% degli assicurati"
}
def _identify_worst_behavior(self, trips: list[dict]) -> list[str]:
behaviors = []
avg_hard_braking = sum(
t.get("hard_braking_per_100km", 0) for t in trips
) / max(len(trips), 1)
avg_speeding = sum(
t.get("speeding_events_per_100km", 0) for t in trips
) / max(len(trips), 1)
night_ratio = sum(
t.get("night_driving_ratio", 0) for t in trips
) / max(len(trips), 1)
if avg_hard_braking > 3:
behaviors.append("hard_braking")
if avg_speeding > 2:
behaviors.append("speeding")
if night_ratio > 0.3:
behaviors.append("night_driving")
return behaviors
def _estimate_saving(self, score: DrivingScore) -> float:
base_premium = 600.0 # Premio medio auto in Italia
return round(base_premium * (1 - score.premium_factor), 0)
9. 完全なシステムアーキテクチャ
UBI テレマティック システムの完全なアーキテクチャは、次の層で構成されています。
| レイヤー | コンポーネント | テクノロジー | レイテンシ |
|---|---|---|---|
| Edge | OBDドングル、スマートフォンSDK | C/ARM、iOS/Android SDK | 100ミリ秒 |
| 摂取 | MQTTブローカー、REST API | AWS IoTコア/EMQX | <1秒 |
| ストリーミング | イベント処理、トリップ検出 | Apache Kafka + フリンク | 5秒未満 |
| ML スコアリング | フィーチャ ストア、スコアリング エンジン | フィースト + MLflow | <100ms |
| ストレージ | 時系列、集計 | InfluxDB + PostgreSQL | - |
| 給仕 | スコア API、ドライバー ダッシュボード | FastAPI + リアクトネイティブ | <200ms |
結論
UBI テレマティクス パイプラインは、エッジ コンピューティングに触れる分散コンピューティング システムです。 ストリーミング処理、機械学習、プライバシー エンジニアリング。 626億市場 CAGR 24.8% のドル (2025 年) は、このカスタマイズに対する業界の関心を反映しています リスクの。
重要な点は、運転動作(急ブレーキ、急ブレーキ、 スピード違反、夜間運転)、フリートの正規化された運転スコアの計算、および GDPR に準拠した地理位置情報データの責任ある管理。フィードバック ドライバーにとって、これは単なるプラスではなく、行動を改善するための最も効果的な手段です。 車両事故を削減します。
InsurTech シリーズの今後の記事
- AI 引受業務: 特徴エンジニアリングとリスク スコアリング
- 請求の自動化: コンピューター ビジョンと NLP
- 不正行為の検出: グラフ分析と行動シグナル







