Insurance Fraud Detection: Graph Analytics and Behavioral Signals
Insurance fraud is a systemic and global problem. Industry estimates put between 10% and 15% of claims paid each year as containing some element of fraud — at a total cost of over $80 billion per year globally. In Europe, Insurance Europe estimates fraud losses of around €13 billion annually. This cost inevitably flows through to premiums paid by honest customers, making fraud detection not just a matter of business profitability but of insurance system fairness.
Insurance fraud takes many forms: from opportunistic fraud (exaggerating a real loss), to planned individual fraud (deliberate vehicle fire, fictitious claims), to systematic fraud orchestrated by criminal networks involving doctors, body shops, lawyers and complicit policyholders. This last type — organised fraud — is economically the most damaging and the hardest to detect with traditional methods.
Modern AI, particularly graph analytics models for detecting fraudulent networks and behavioral signal analysis for identifying anomalous patterns, has revolutionised insurers' ability to detect and prevent fraud. The insurance fraud detection analytics market is growing at a CAGR of 33% (2025-2032), driven by GNNs (Graph Neural Networks), ensemble models, and real-time stream processing.
What You Will Learn
- Insurance fraud taxonomy and detection patterns
- Behavioural feature engineering for fraud scoring
- Graph analytics for detecting organised fraudulent networks
- Graph Neural Networks (GNN) for insurance fraud detection
- Ensemble models: XGBoost + Random Forest + Isolation Forest
- Real-time fraud scoring with Apache Kafka and Faust
- SIU (Special Investigation Unit) workflow and case management
Insurance Fraud Taxonomy
Understanding fraud types is the prerequisite for designing effective detection systems. Each type has distinct patterns requiring different ML approaches.
Fraud Types and Detection Techniques
| Fraud Type | Example | Frequency | ML Approach |
|---|---|---|---|
| Opportunistic | Exaggerating damage on a real claim | High | Anomaly detection, AI damage estimation |
| Planned (individual) | Deliberate arson of own vehicle | Medium | Behavioral signals, link analysis |
| Organised (Ring) | Staged accident networks with multiple participants | Low (high impact) | Graph analytics, GNN, community detection |
| Insider | Agent approving fictitious claims | Very low | User behaviour analytics, network analysis |
| Synthetic identity | Policy underwritten with false data | Growing | Identity verification ML, graph links |
Behavioural Feature Engineering for Fraud Scoring
The most predictive features for fraud detection are not basic demographic ones (age, gender, occupation), but those that capture the behaviour of the claimant: notification timing, narrative consistency, history of interactions with the insurer, relationships with other parties in claims.
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, date, timedelta
@dataclass
class ClaimContext:
"""Full context of a claim for fraud scoring."""
claim_id: str
policy_id: str
claimant_id: str
claim_type: str
incident_date: date
report_date: date
incident_day_of_week: int
incident_hour: int
location_zip: str
reported_amount: float
third_party_ids: List[str]
repair_shop_id: Optional[str]
attorney_id: Optional[str]
medical_provider_id: Optional[str]
class FraudFeatureEngineer:
"""
Feature engineering for insurance fraud detection.
Builds behavioural, temporal, and relational features
to capture typical insurance fraud patterns.
"""
def __init__(self, historical_claims: pd.DataFrame, policy_db: pd.DataFrame) -> None:
self.historical = historical_claims
self.policy_db = policy_db
def build_features(self, ctx: ClaimContext) -> Dict[str, float]:
features: Dict[str, float] = {}
features.update(self._temporal_features(ctx))
features.update(self._behavioral_features(ctx))
features.update(self._network_features(ctx))
features.update(self._policy_features(ctx))
features.update(self._claim_amount_features(ctx))
return features
def _temporal_features(self, ctx: ClaimContext) -> Dict[str, float]:
report_delay = (ctx.report_date - ctx.incident_date).days
return {
"report_delay_days": float(report_delay),
"report_delay_over_30": float(report_delay > 30),
"report_delay_over_7": float(report_delay > 7),
"same_day_report": float(report_delay == 0),
"incident_weekend": float(ctx.incident_day_of_week >= 5),
"incident_night": float(ctx.incident_hour < 6 or ctx.incident_hour >= 22),
"incident_monday": float(ctx.incident_day_of_week == 0),
"incident_end_month": float(ctx.incident_date.day >= 25),
}
def _behavioral_features(self, ctx: ClaimContext) -> Dict[str, float]:
history = self.historical[self.historical["claimant_id"] == ctx.claimant_id]
n_prior = len(history)
n_fraud = history.get("is_fraud", pd.Series([0])).sum() if not history.empty else 0
n_12m = len(history[
history["incident_date"] >=
(ctx.incident_date - timedelta(days=365)).strftime("%Y-%m-%d")
]) if not history.empty else 0
policy_start = self._get_policy_start(ctx.policy_id)
policy_age = (ctx.incident_date - policy_start).days
return {
"prior_claims_total": float(n_prior),
"prior_fraud_confirmed": float(n_fraud),
"claims_last_12m": float(n_12m),
"high_claim_frequency": float(n_12m >= 2),
"repeat_claimant": float(n_prior >= 3),
"fraud_history": float(n_fraud > 0),
"policy_age_days": float(policy_age),
"new_policy_claim": float(policy_age < 90),
}
def _network_features(self, ctx: ClaimContext) -> Dict[str, float]:
tp_fraud_rate = self._entity_fraud_rate("third_party_id", ctx.third_party_ids)
shop_rate = self._entity_fraud_rate(
"repair_shop_id", [ctx.repair_shop_id] if ctx.repair_shop_id else []
)
atty_rate = self._entity_fraud_rate(
"attorney_id", [ctx.attorney_id] if ctx.attorney_id else []
)
med_rate = self._entity_fraud_rate(
"medical_provider_id",
[ctx.medical_provider_id] if ctx.medical_provider_id else []
)
return {
"n_third_parties": float(len(ctx.third_party_ids)),
"many_third_parties": float(len(ctx.third_party_ids) >= 3),
"tp_avg_fraud_rate": tp_fraud_rate,
"has_high_fraud_tp": float(tp_fraud_rate > 0.3),
"repair_shop_fraud_rate": shop_rate,
"attorney_present": float(ctx.attorney_id is not None),
"attorney_fraud_rate": atty_rate,
"medical_provider_fraud_rate": med_rate,
"attorney_and_medical": float(
ctx.attorney_id is not None and ctx.medical_provider_id is not None
),
}
def _policy_features(self, ctx: ClaimContext) -> Dict[str, float]:
row = self.policy_db[self.policy_db["policy_id"] == ctx.policy_id]
if row.empty:
return {"policy_found": 0.0}
policy = row.iloc[0]
premium = float(policy.get("annual_premium", 0))
coverage = float(policy.get("coverage_amount", 0))
return {
"policy_found": 1.0,
"policy_premium": premium,
"coverage_amount": coverage,
"coverage_premium_ratio": coverage / max(premium, 1),
"recent_coverage_increase": float(policy.get("coverage_increased_90d", False)),
}
def _claim_amount_features(self, ctx: ClaimContext) -> Dict[str, float]:
type_amounts = self.historical[
self.historical["claim_type"] == ctx.claim_type
]["reported_amount"]
if type_amounts.empty:
return {"amount_percentile_for_type": 0.5}
pct = float((type_amounts < ctx.reported_amount).mean())
z = float((ctx.reported_amount - type_amounts.mean()) / max(type_amounts.std(), 1))
return {
"reported_amount": ctx.reported_amount,
"reported_amount_log": np.log1p(ctx.reported_amount),
"amount_percentile_for_type": pct,
"amount_z_score": z,
"amount_outlier": float(abs(z) > 2.5),
"round_amount": float(ctx.reported_amount % 1000 == 0),
}
def _get_policy_start(self, policy_id: str) -> date:
row = self.policy_db[self.policy_db["policy_id"] == policy_id]
if row.empty:
return date.today() - timedelta(days=365)
try:
return datetime.strptime(str(row.iloc[0].get("start_date", "")), "%Y-%m-%d").date()
except (ValueError, TypeError):
return date.today() - timedelta(days=365)
def _entity_fraud_rate(self, column: str, entity_ids: List[str]) -> float:
if not entity_ids or column not in self.historical.columns:
return 0.0
subset = self.historical[self.historical[column].isin(entity_ids)]
if subset.empty or "is_fraud" not in subset.columns:
return 0.0
return float(subset["is_fraud"].mean())
Graph Analytics for Detecting Fraud Rings
Organised fraud rings are invisible to traditional ML techniques that evaluate claims individually. An isolated claim may look perfectly legitimate, but when observed within a network of relationships — the same third parties, the same body shop, the same attorney appearing in dozens of claims — the pattern emerges clearly.
Graphs allow modelling these relationships: nodes are actors (policyholders, third parties, body shops, attorneys, medical providers), edges are connections (shared claim, shared body shop, same street). Community detection algorithms automatically identify suspicious clusters.
import networkx as nx
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Set, Tuple
from dataclasses import dataclass
from community import best_partition # python-louvain
@dataclass
class FraudRing:
"""A suspicious cluster identified in the graph."""
ring_id: str
members: List[str]
claim_ids: List[str]
total_claimed: float
avg_fraud_score: float
ring_type: str
evidence_summary: str
class InsuranceFraudGraphAnalyzer:
"""
Graph analysis for insurance fraud ring detection.
Builds a bipartite graph: claims <-> entities (people, shops, doctors)
and applies community detection to find suspicious clusters.
"""
def __init__(self, claims_df: pd.DataFrame) -> None:
self.claims_df = claims_df
self.graph = self._build_graph()
def _build_graph(self) -> nx.Graph:
G = nx.Graph()
entity_columns = [
("claimant_id", "claimant"),
("third_party_id", "third_party"),
("repair_shop_id", "repair_shop"),
("attorney_id", "attorney"),
("medical_provider_id", "medical_provider"),
]
for _, row in self.claims_df.iterrows():
entities: List[Tuple[str, str]] = []
for col, etype in entity_columns:
if pd.notna(row.get(col)):
node_id = f"{etype}_{row[col]}"
G.add_node(node_id, entity_type=etype, entity_id=str(row[col]))
entities.append((node_id, etype))
for i, (n1, _) in enumerate(entities):
for n2, _ in entities[i+1:]:
cid = str(row.get("claim_id", ""))
if G.has_edge(n1, n2):
G[n1][n2]["weight"] += 1
G[n1][n2]["claims"].append(cid)
else:
G.add_edge(n1, n2, weight=1, claims=[cid])
return G
def detect_fraud_rings(self, min_ring_size: int = 3) -> List[FraudRing]:
if self.graph.number_of_nodes() < min_ring_size:
return []
partition: Dict[str, int] = best_partition(self.graph, weight="weight")
communities: Dict[int, List[str]] = {}
for node, cid in partition.items():
communities.setdefault(cid, []).append(node)
rings: List[FraudRing] = []
for cid, members in communities.items():
if len(members) < min_ring_size:
continue
ring = self._evaluate_community(cid, members)
if ring is not None:
rings.append(ring)
return sorted(rings, key=lambda r: r.avg_fraud_score, reverse=True)
def _evaluate_community(self, community_id: int, members: List[str]) -> Optional[FraudRing]:
claim_ids: Set[str] = set()
for u, v, data in self.graph.edges(members, data=True):
claim_ids.update(data.get("claims", []))
if not claim_ids:
return None
comm_claims = self.claims_df[self.claims_df["claim_id"].astype(str).isin(claim_ids)]
if comm_claims.empty:
return None
total = float(comm_claims["reported_amount"].sum())
fraud_rate = float(comm_claims["is_fraud"].mean()) if "is_fraud" in comm_claims.columns else 0.0
subgraph = self.graph.subgraph(members)
density = nx.density(subgraph)
avg_weight = float(np.mean([
d["weight"] for _, _, d in subgraph.edges(data=True)
])) if subgraph.number_of_edges() > 0 else 0.0
score = density * 0.4 + fraud_rate * 0.4 + min(avg_weight / 10, 1.0) * 0.2
if score < 0.3 and fraud_rate < 0.1:
return None
return FraudRing(
ring_id=f"ring_{community_id}",
members=members,
claim_ids=list(claim_ids),
total_claimed=round(total, 2),
avg_fraud_score=round(score, 3),
ring_type=self._classify_ring(members),
evidence_summary=(
f"Community of {len(members)} actors, {len(claim_ids)} linked claims, "
f"£{total:.0f} total claimed. "
f"Historical fraud rate: {fraud_rate:.0%}. Graph density: {density:.2f}."
),
)
def _classify_ring(self, members: List[str]) -> str:
types = [self.graph.nodes[m].get("entity_type", "") for m in members]
counts: Dict[str, int] = {}
for t in types:
counts[t] = counts.get(t, 0) + 1
if counts.get("medical_provider", 0) >= 2:
return "medical_mill"
if counts.get("repair_shop", 0) >= 2:
return "repair_shop_ring"
if counts.get("attorney", 0) >= 1 and counts.get("medical_provider", 0) >= 1:
return "organised_injury_ring"
return "staged_accident_ring"
def get_centrality_scores(self) -> Dict[str, float]:
return {
node: round(score, 6)
for node, score in nx.betweenness_centrality(
self.graph, weight="weight", normalized=True
).items()
}
Ensemble Model for Fraud Scoring
No single algorithm captures all fraud types. The most robust production approach combines multiple models in an ensemble: XGBoost for tabular patterns, Isolation Forest for anomaly detection on unlabelled data, and a graph features model to incorporate relational signals extracted from the graph.
import xgboost as xgb
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
from typing import Dict, List, Optional
class InsuranceFraudEnsemble:
"""
Ensemble model for insurance fraud detection.
Combines:
1. XGBoost classifier (supervised classification)
2. Isolation Forest (unsupervised anomaly detection)
3. Graph centrality scores (relational signal)
Ensemble weights sum to 1.0.
"""
XGB_PARAMS: Dict = {
"objective": "binary:logistic",
"eval_metric": "aucpr",
"max_depth": 6,
"learning_rate": 0.05,
"n_estimators": 400,
"min_child_weight": 10,
"subsample": 0.8,
"colsample_bytree": 0.8,
"scale_pos_weight": 20, # compensates ~1:20 fraud imbalance
"reg_alpha": 0.1,
"reg_lambda": 1.0,
"tree_method": "hist",
"early_stopping_rounds": 30,
}
WEIGHTS: Dict[str, float] = {
"xgboost": 0.55,
"isolation_forest": 0.20,
"graph_centrality": 0.25,
}
def __init__(self) -> None:
self.xgb_model: Optional[xgb.XGBClassifier] = None
self.iso_forest: Optional[IsolationForest] = None
self.scaler = StandardScaler()
self.feature_names: List[str] = []
self._is_fitted = False
def fit(
self,
X: pd.DataFrame,
y: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
) -> "InsuranceFraudEnsemble":
self.feature_names = X.columns.tolist()
# Supervised XGBoost
self.xgb_model = xgb.XGBClassifier(**self.XGB_PARAMS)
self.xgb_model.fit(X, y, eval_set=[(X_val, y_val)], verbose=50)
# Unsupervised Isolation Forest (trained on clean claims only)
X_clean = X[y == 0]
X_scaled = self.scaler.fit_transform(X_clean)
self.iso_forest = IsolationForest(
n_estimators=200, contamination=0.05, random_state=42, n_jobs=-1
)
self.iso_forest.fit(X_scaled)
self._is_fitted = True
return self
def predict_fraud_score(
self,
X: pd.DataFrame,
graph_centrality: Optional[np.ndarray] = None,
) -> np.ndarray:
if not self._is_fitted:
raise RuntimeError("Ensemble not trained. Call fit() first.")
xgb_scores = self.xgb_model.predict_proba(X)[:, 1]
X_scaled = self.scaler.transform(X)
iso_raw = self.iso_forest.decision_function(X_scaled)
iso_scores = 1 - (iso_raw - iso_raw.min()) / (iso_raw.max() - iso_raw.min() + 1e-10)
g_scores = graph_centrality if graph_centrality is not None else np.zeros(len(X))
ensemble = (
self.WEIGHTS["xgboost"] * xgb_scores +
self.WEIGHTS["isolation_forest"] * iso_scores +
self.WEIGHTS["graph_centrality"] * g_scores
)
return np.clip(ensemble, 0, 1)
def classify_risk_tier(self, scores: np.ndarray) -> List[str]:
tiers = []
for s in scores:
if s < 0.2:
tiers.append("GREEN")
elif s < 0.4:
tiers.append("YELLOW")
elif s < 0.7:
tiers.append("ORANGE")
else:
tiers.append("RED")
return tiers
def feature_importance(self) -> pd.DataFrame:
if self.xgb_model is None:
raise RuntimeError("Model not trained.")
return pd.DataFrame({
"feature": self.feature_names,
"importance": self.xgb_model.feature_importances_,
}).sort_values("importance", ascending=False)
Best Practices and Anti-patterns
Best Practices for Insurance Fraud Detection
- Ensemble is mandatory: always combine supervised (XGBoost), unsupervised (Isolation Forest), and network (graph analytics) models; no single approach covers all fraud types
- Calibrate thresholds by line of business: the optimal fraud threshold for motor is not the same as for life or accident; calibrate per product line
- SIU feedback loop is mandatory: investigation outcomes must feed back as labels into the training set; without feedback, the model does not improve over time
- Graph DB for real-time connections: use Neo4j or ArangoDB for millisecond connection queries; in-memory graphs (NetworkX) do not scale beyond millions of nodes
- Document every decision: every fraud flag must carry a detailed audit trail with the features that drove the score — mandatory for any subsequent legal action
Anti-patterns to Avoid
- High false positive rate: a false positive rate above 2% erodes trust with honest customers and generates operational costs; monitor precision as well as recall
- Model trained only on known fraud: fraud evolves; a model that only knows historical patterns will not detect new schemes — use Isolation Forest to capture unexpected anomalies
- Proxy discrimination: zip code, occupation, or nationality can be discriminatory proxies; always test disparate impact before deploying
- Batch scoring for all claims: fast frauds (same-day, multi-insurer) require real-time scoring; nightly batch is insufficient for these patterns
Conclusions and Next Steps
Modern insurance fraud detection requires a multi-layered approach: behavioural features for individual fraud scoring, graph analytics for organised fraud rings, and real-time processing for fast-moving frauds. An ensemble of diverse models provides the widest coverage of fraud patterns.
The keys to success: continuous feedback loop from SIU outcomes, monitoring the false positive rate to avoid penalising honest customers, and an immutable audit trail to support legal proceedings.
The next article in this series explores ACORD Standards and Insurance API Integration: how to implement interoperability between heterogeneous insurance systems using ACORD XML/JSON standard messages.
InsurTech Engineering Series
- 01 - Insurance Domain for Developers: Products, Actors and Data Model
- 02 - Cloud-Native Policy Management: API-First Architecture
- 03 - Telematics Pipeline: Processing UBI Data at Scale
- 04 - AI Underwriting: Feature Engineering and Risk Scoring
- 05 - Claims Automation: Computer Vision and NLP
- 06 - Fraud Detection: Graph Analytics and Behavioral Signals (this article)
- 07 - ACORD Standards and Insurance API Integration
- 08 - Compliance Engineering: Solvency II and IFRS 17







