Telematics Pipeline: Processing UBI Data at Scale
In 2025, over 21 million American drivers share telematics data with their insurer. In Europe, AXA launched an AI-driven UBI solution in January 2025 for European markets, integrating OBD-II, smartphone-based telematics, and predictive risk modeling. The UBI (Usage-Based Insurance) market is valued at $62.6 billion in 2025 and will grow at a CAGR of 24.8% through 2035.
Insurance telematics is not simple GPS data collection: it is a complex pipeline that transforms raw signals from OBD-II devices, smartphones, and IoT sensors into individual risk scores that determine each insured's premium. A bad driver pays more; a good driver saves money. The principle is simple; the technical implementation is anything but.
In this article we build a complete telematics pipeline: from OBD-II data ingestion to driving behavior feature engineering, from real-time risk scoring to dynamic pricing, with considerations on scalability, latency, and privacy.
What You Will Learn
- Telematics pipeline architecture for UBI (Pay-How-You-Drive)
- OBD-II protocol: parameters, sampling frequencies, edge processing
- Feature engineering for driving behavior (hard braking, cornering, speeding)
- Streaming processing with Apache Kafka and Apache Flink
- Real-time risk scoring and temporal aggregations
- Dynamic pricing based on driving score
- GDPR considerations for geolocation data
1. Types of Insurance Telematics
There are three main models for telematics data collection, with different trade-offs between accuracy, deployment cost, and customer adoption:
| Type | Device | Data Collected | Accuracy | Cost |
|---|---|---|---|---|
| OBD-II Dongle | OBD port plug-in | ECU, speed, RPM, acceleration | High | $25-60 HW |
| Smartphone App | iOS/Android app | GPS, accelerometer, gyroscope | Medium | Zero HW |
| OEM Black Box | Factory-installed device | ECU + GPS + accelerometer | Very High | $120-350 |
| Connected Car API | Native vehicle API | Full native telematics | Maximum | OEM agreement |
2. OBD-II Data Structure
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class OBDReading:
"""Single OBD-II reading from vehicle."""
device_id: str
policy_id: str
timestamp: datetime
latitude: float
longitude: float
speed_kmh: float # Vehicle speed (PID 0x0D)
rpm: int # Engine RPM (PID 0x0C)
throttle_pct: float # Throttle position % (PID 0x11)
engine_load_pct: float # Calculated engine load % (PID 0x04)
coolant_temp_c: int # Engine coolant temperature (PID 0x05)
acceleration_ms2: Optional[float] = None
heading_deg: Optional[float] = None
road_type: Optional[str] = None # "URBAN", "RURAL", "HIGHWAY"
@dataclass
class TripSummary:
"""Summary of a single driving trip."""
trip_id: str
policy_id: str
device_id: str
start_time: datetime
end_time: datetime
distance_km: float
duration_minutes: float
max_speed_kmh: float
avg_speed_kmh: float
hard_braking_count: int
hard_acceleration_count: int
hard_cornering_count: int
speeding_pct: float
night_driving_pct: float
highway_pct: float
safety_score: float # 0-100 (100 = excellent)
fuel_efficiency_score: float
3. Edge Processing on Device
Raw OBD-II data must be pre-processed at the edge before being sent to the cloud. This reduces required bandwidth and computes discrete events (hard braking, speeding) in real-time. The edge processor typically runs on an ARM microcontroller inside the OBD device:
from collections import deque
class EdgeEventDetector:
"""
Driving event detection algorithm.
Runs on embedded device (ARM Cortex-M).
Sampling rate: 10 Hz (one reading every 100ms)
"""
HARD_BRAKING_THRESHOLD = -3.0 # m/s^2 (0.3g)
HARD_ACCEL_THRESHOLD = 3.0 # m/s^2 (0.3g)
def __init__(self, window_size: int = 5):
self.speed_history = deque(maxlen=window_size)
self.prev_timestamp = None
def process_reading(self, reading: dict) -> list:
"""Process a single reading and detect events."""
detected = []
current_speed = reading["speed_kmh"]
current_ts = reading["timestamp"]
if self.speed_history and self.prev_timestamp:
prev_speed = self.speed_history[-1]
dt = (current_ts - self.prev_timestamp).total_seconds()
if dt > 0:
delta_v = (current_speed - prev_speed) / 3.6
acceleration = delta_v / dt
if acceleration <= self.HARD_BRAKING_THRESHOLD:
detected.append({
"type": "HARD_BRAKING",
"timestamp": current_ts.isoformat(),
"value": round(acceleration, 2),
"speed_at_event": current_speed
})
elif acceleration >= self.HARD_ACCEL_THRESHOLD:
detected.append({
"type": "HARD_ACCELERATION",
"timestamp": current_ts.isoformat(),
"value": round(acceleration, 2),
"speed_at_event": current_speed
})
self.speed_history.append(current_speed)
self.prev_timestamp = current_ts
return detected
4. Streaming Pipeline with Kafka and Flink
The cloud pipeline receives batches from the edge and processes them in streaming to calculate trip summaries, aggregate events over time windows, and update risk scores in near real-time:
def extract_features(trip: dict) -> dict:
"""
Extract features for risk scoring from a trip summary.
Output: feature vector for the ML model.
"""
readings = trip["readings"]
events = trip["events"]
hard_braking = [e for e in events if e["type"] == "HARD_BRAKING"]
hard_accel = [e for e in events if e["type"] == "HARD_ACCELERATION"]
speeding = [e for e in events if e["type"] == "SPEEDING"]
return {
"policy_id": trip["policy_id"],
"trip_id": trip["trip_id"],
"trip_date": trip["start_time"][:10],
# Behavioral features (normalized per 100km)
"hard_braking_per_100km": (
len(hard_braking) / max(trip["distance_km"], 0.1) * 100
),
"hard_accel_per_100km": (
len(hard_accel) / max(trip["distance_km"], 0.1) * 100
),
"speeding_events_per_100km": (
len(speeding) / max(trip["distance_km"], 0.1) * 100
),
# Temporal features
"night_driving_ratio": compute_night_ratio(readings),
"rush_hour_ratio": compute_rush_hour_ratio(readings),
# Kinematic features
"avg_speed_kmh": trip.get("avg_speed_kmh", 0),
"max_speed_kmh": trip.get("max_speed_kmh", 0),
# Context features
"highway_ratio": trip.get("highway_pct", 0) / 100,
"distance_km": trip["distance_km"],
"duration_hours": trip["duration_seconds"] / 3600
}
5. Risk Scoring Model
The core of the UBI system is the model that translates behavior features into a driving score and then into a premium adjustment factor:
import numpy as np
from typing import NamedTuple
class DrivingScore(NamedTuple):
policy_id: str
score: float # 0-100 (100 = excellent driving)
percentile: float # Fleet percentile
category: str # "EXCELLENT", "GOOD", "FAIR", "POOR", "DANGEROUS"
premium_factor: float # Premium multiplier (e.g., 0.85 = -15%)
class DrivingScoreCalculator:
WEIGHTS = {
"hard_braking_per_100km": -0.30,
"hard_accel_per_100km": -0.15,
"speeding_events_per_100km": -0.25,
"night_driving_ratio": -0.15,
"avg_speed_kmh": -0.10,
"highway_ratio": 0.05,
}
CATEGORY_THRESHOLDS = [
(90, "EXCELLENT"),
(75, "GOOD"),
(55, "FAIR"),
(35, "POOR"),
(0, "DANGEROUS"),
]
PREMIUM_FACTORS = {
"EXCELLENT": 0.75, # -25% discount
"GOOD": 0.90, # -10% discount
"FAIR": 1.00, # base premium
"POOR": 1.15, # +15% surcharge
"DANGEROUS": 1.35, # +35% surcharge
}
def calculate(self, features_90d: dict) -> DrivingScore:
normalized = self._normalize_features(features_90d)
raw_score = 100.0
for feature, weight in self.WEIGHTS.items():
if feature in normalized:
raw_score += weight * normalized[feature] * 10
score = float(np.clip(raw_score, 0, 100))
category = self._get_category(score)
return DrivingScore(
policy_id=features_90d["policy_id"],
score=round(score, 1),
percentile=self._get_percentile(score),
category=category,
premium_factor=self.PREMIUM_FACTORS[category]
)
6. GDPR Compliance for Geolocation Data
GDPR Requirements for Insurance Telematics
- Explicit consent: The policyholder must explicitly consent to driving data collection
- Data minimization: Collect only data strictly necessary for risk scoring
- Retention: Raw GPS data deleted after processing (max 90 days); aggregates kept for policy duration
- Anonymization: After score calculation, GPS data can be aggregated and anonymized
- Right to erasure: API for data deletion upon request
- Transfers: Data processed in EU (no transfer outside EEA without adequate safeguards)
7. Complete System Architecture
| Layer | Components | Technology | Latency |
|---|---|---|---|
| Edge | OBD dongle, smartphone SDK | C/ARM, iOS/Android SDK | 100ms |
| Ingestion | MQTT broker, REST API | AWS IoT Core / EMQX | <1s |
| Streaming | Event processing, trip detection | Apache Kafka + Flink | <5s |
| ML Scoring | Feature store, scoring engine | Feast + MLflow | <100ms |
| Storage | Time-series, aggregates | InfluxDB + PostgreSQL | - |
| Serving | Score API, driver dashboard | FastAPI + React Native | <200ms |
Conclusions
The UBI telematics pipeline is a distributed computing system that spans edge computing, stream processing, machine learning, and privacy engineering. The $62.6 billion market (2025) with a 24.8% CAGR reflects the industry's interest in this individualized risk approach.
The critical points are feature engineering for driving behavior (hard braking, speeding, night driving), computing a driving score normalized against the fleet, and responsibly managing geolocation data in compliance with GDPR. Driver feedback is not just a bonus: it is the most effective lever for improving behavior and reducing the fleet's claims ratio.







