保険詐欺の検出: グラフ分析と行動シグナル
保険詐欺は組織的かつ世界的な問題です。業界の推計によると、 の間 10%と15% 毎年支払われる保険金のうち、詐欺の要素が含まれているもの — 合計コストがそれ以上と見積もられる場合 年間800億ドル 世界的に。ヨーロッパでは、Insurance Europe は詐欺による被害額を約 年間130億ユーロ。このコストは必然的に次のような影響を及ぼします。 保険料は誠実な顧客によって支払われるため、不正行為の検出は単なる問題ではありません。 企業の収益性だけでなく、保険制度の公平性も重視します。
保険詐欺はさまざまな形で発生します。 実害)、計画的詐欺(組織的事故、存在しない請求)、最大 医師、ボディーショップ、 弁護士と被保険者の共犯者。この最後のタイプである組織的詐欺は最も有害です 経済的であり、従来の方法では検出するのが最も困難です。
最新の AI、特に次のモデル グラフ分析 検出用 不正なネットワークとそのモデル 行動信号解析 識別用 異常なパターンを検出し、不正行為を防止する企業の能力に革命をもたらしました。 保険詐欺検出のための分析市場は急速に成長している のCAGR 33% (2025 ~ 2032 年)、GNN (グラフ ニューラル ネットワーク)、アンサンブルおよびストリーム モデルによって駆動 リアルタイム処理。
何を学ぶか
- 保険詐欺の分類と検出パターン
- 不正スコアリングのための行動特徴エンジニアリング
- 組織的な不正ネットワークを検出するためのグラフ分析
- 保険詐欺検出のためのグラフ ニューラル ネットワーク (GNN)
- アンサンブル モデル: XGBoost + ランダム フォレスト + アイソレーション フォレスト
- Apache Kafka と Faust を使用したリアルタイム不正スコアリング
- SIU (特別捜査ユニット) のワークフローと事件管理
保険詐欺の分類
詐欺の種類と効果的な検出システムを設計するための前提条件を理解します。 各タイプには、異なる ML アプローチを必要とする独自のパターンがあります。
不正行為の種類と検出手法
| 詐欺の種類 | Esempio | 頻度 | ML アプローチ |
|---|---|---|---|
| 日和見的 | 実際の事故の損害を誇張する | 高い | 異常検知、AI被害推定 |
| 個別に計画 | 自家用車放火 | 平均 | 行動シグナル、リンク分析 |
| 編成/リング | 複数の共犯者による事故を模擬したネットワーク | 低い(しかし大きな影響) | グラフ分析、GNN、コミュニティ検出 |
| 内部(インサイダー) | 偽の申請を承認するエージェント | 非常に低い | ユーザー行動分析、ネットワーク分析 |
| 合成アイデンティティ | 虚偽のデータで署名されたポリシー | 成長する | 本人確認ML、グラフリンク |
不正スコアリングのための行動特徴エンジニアリング
不正行為検出の最も予測的な特徴は、基本的な人口統計的特徴 (年齢、 性別、職業)、しかし、 行動 申請者の: 報告のタイミング、話の一貫性、会社とのやり取りの歴史、 事故の相手との関係。
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, date, timedelta
@dataclass
class ClaimContext:
"""Contesto completo di un sinistro per fraud scoring."""
claim_id: str
policy_id: str
claimant_id: str
claim_type: str
incident_date: date
report_date: date
incident_day_of_week: int
incident_hour: int
location_zip: str
reported_amount: float
third_party_ids: List[str]
repair_shop_id: Optional[str]
attorney_id: Optional[str]
medical_provider_id: Optional[str]
class FraudFeatureEngineer:
"""
Feature engineering per fraud detection assicurativo.
Costruisce feature comportamentali, temporali e relazionali
per catturare i pattern tipici della frode assicurativa.
"""
def __init__(
self,
historical_claims: pd.DataFrame,
policy_db: pd.DataFrame,
) -> None:
self.historical = historical_claims
self.policy_db = policy_db
def build_features(self, ctx: ClaimContext) -> Dict[str, float]:
features: Dict[str, float] = {}
features.update(self._temporal_features(ctx))
features.update(self._behavioral_features(ctx))
features.update(self._network_features(ctx))
features.update(self._policy_features(ctx))
features.update(self._claim_amount_features(ctx))
return features
def _temporal_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature temporali: quando e avvenuto l'incidente e quando e stato denunciato."""
report_delay_days = (ctx.report_date - ctx.incident_date).days
return {
# Pattern sospetto: denunciare tardi o molto in anticipo
"report_delay_days": float(report_delay_days),
"report_delay_over_30": float(report_delay_days > 30),
"report_delay_over_7": float(report_delay_days > 7),
"same_day_report": float(report_delay_days == 0),
# Pattern sospetto: incidenti nel weekend o di notte
"incident_weekend": float(ctx.incident_day_of_week >= 5),
"incident_night": float(ctx.incident_hour < 6 or ctx.incident_hour >= 22),
"incident_monday": float(ctx.incident_day_of_week == 0),
# Fine mese = pressione finanziaria?
"incident_end_month": float(ctx.incident_date.day >= 25),
}
def _behavioral_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature comportamentali basate sullo storico del richiedente."""
claimant_history = self.historical[
self.historical["claimant_id"] == ctx.claimant_id
]
n_prior_claims = len(claimant_history)
n_prior_fraudulent = claimant_history.get("is_fraud", pd.Series([0])).sum()
n_claims_12m = len(claimant_history[
claimant_history["incident_date"] >=
(ctx.incident_date - timedelta(days=365)).strftime("%Y-%m-%d")
]) if not claimant_history.empty else 0
return {
"prior_claims_total": float(n_prior_claims),
"prior_fraud_confirmed": float(n_prior_fraudulent),
"claims_last_12m": float(n_claims_12m),
"high_claim_frequency": float(n_claims_12m >= 2),
"repeat_claimant": float(n_prior_claims >= 3),
"fraud_history": float(n_prior_fraudulent > 0),
# Cambi polizza frequenti = comportamento anomalo?
"policy_age_days": float(
(ctx.incident_date - self._get_policy_start(ctx.policy_id)).days
),
"new_policy_claim": float(
(ctx.incident_date - self._get_policy_start(ctx.policy_id)).days < 90
),
}
def _network_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""
Feature di rete: connessioni sospette con altri attori.
Identifica se il richiedente e connesso a terze parti, officine,
avvocati o medici che compaiono frequentemente in sinistri fraudolenti.
"""
# Conta occorrenze delle terze parti in sinistri storici
tp_fraud_rate = self._entity_fraud_rate("third_party_id", ctx.third_party_ids)
shop_fraud_rate = self._entity_fraud_rate(
"repair_shop_id", [ctx.repair_shop_id] if ctx.repair_shop_id else []
)
attorney_fraud_rate = self._entity_fraud_rate(
"attorney_id", [ctx.attorney_id] if ctx.attorney_id else []
)
medical_fraud_rate = self._entity_fraud_rate(
"medical_provider_id",
[ctx.medical_provider_id] if ctx.medical_provider_id else []
)
return {
"n_third_parties": float(len(ctx.third_party_ids)),
"many_third_parties": float(len(ctx.third_party_ids) >= 3),
"tp_avg_fraud_rate": tp_fraud_rate,
"has_high_fraud_tp": float(tp_fraud_rate > 0.3),
"repair_shop_fraud_rate": shop_fraud_rate,
"attorney_present": float(ctx.attorney_id is not None),
"attorney_fraud_rate": attorney_fraud_rate,
"medical_provider_fraud_rate": medical_fraud_rate,
# Combinazione attorney + medical provider in incidente auto = sospetto
"attorney_and_medical": float(
ctx.attorney_id is not None and ctx.medical_provider_id is not None
),
}
def _policy_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature relative alla polizza."""
policy_row = self.policy_db[self.policy_db["policy_id"] == ctx.policy_id]
if policy_row.empty:
return {"policy_found": 0.0}
policy = policy_row.iloc[0]
return {
"policy_found": 1.0,
"policy_premium": float(policy.get("annual_premium", 0)),
"coverage_amount": float(policy.get("coverage_amount", 0)),
# Rapporto alto tra copertura e premio = polizza sottostimata?
"coverage_premium_ratio": float(
policy.get("coverage_amount", 0) /
max(policy.get("annual_premium", 1), 1)
),
"recent_coverage_increase": float(
policy.get("coverage_increased_90d", False)
),
}
def _claim_amount_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature relative all'importo dichiarato."""
# Distribuzione storica degli importi per tipo sinistro
type_amounts = self.historical[
self.historical["claim_type"] == ctx.claim_type
]["reported_amount"]
if type_amounts.empty:
return {"amount_percentile": 0.5}
percentile = float(
(type_amounts < ctx.reported_amount).mean()
)
z_score = float(
(ctx.reported_amount - type_amounts.mean()) / max(type_amounts.std(), 1)
)
return {
"reported_amount": ctx.reported_amount,
"reported_amount_log": np.log1p(ctx.reported_amount),
"amount_percentile_for_type": percentile,
"amount_z_score": z_score,
"amount_outlier": float(abs(z_score) > 2.5),
"round_amount": float(ctx.reported_amount % 1000 == 0), # importi "tondi" = sospetto
}
def _get_policy_start(self, policy_id: str) -> date:
row = self.policy_db[self.policy_db["policy_id"] == policy_id]
if row.empty:
return date.today() - timedelta(days=365)
start_str = row.iloc[0].get("start_date", "")
try:
return datetime.strptime(str(start_str), "%Y-%m-%d").date()
except (ValueError, TypeError):
return date.today() - timedelta(days=365)
def _entity_fraud_rate(
self, column: str, entity_ids: List[str]
) -> float:
"""Calcola il tasso di frode storico associato a una lista di entità."""
if not entity_ids or column not in self.historical.columns:
return 0.0
mask = self.historical[column].isin(entity_ids)
subset = self.historical[mask]
if subset.empty:
return 0.0
fraud_col = "is_fraud" if "is_fraud" in subset.columns else None
if fraud_col is None:
return 0.0
return float(subset[fraud_col].mean())
グラフ分析による不正ネットワーク検出
組織的詐欺 (保険リング) は従来の ML 技術では認識できません 彼らはクレームを個別に評価します。単独の事故は完全に正当なもののように見えるかもしれませんが、 しかし、それを人間関係のネットワーク内で見ると、同じ第三者、同じものです。 ワークショップでは、同じ弁護士が数十件の請求に登場しているが、そのパターンがはっきりと浮かび上がっている。
グラフを使用すると、次の関係をモデル化できます。 結び目 彼らは主題です (保険加入者、第三者、ワークショップ、弁護士、医師)、 アーチ 彼らは つながり(同じ事故、同じ作業場、同じ通り)。のアルゴリズム コミュニティ検出 疑わしいクラスターを自動的に識別します。
import networkx as nx
import pandas as pd
import numpy as np
from typing import Dict, List, Set, Tuple
from dataclasses import dataclass
from community import best_partition # python-louvain
@dataclass
class FraudRing:
"""Un cluster sospetto identificato dal grafo."""
ring_id: str
members: List[str] # nodi del cluster
claim_ids: List[str]
total_claimed: float
avg_fraud_score: float
ring_type: str # es. "medical_mill", "staged_accident", "repair_shop_ring"
evidence_summary: str
class InsuranceFraudGraphAnalyzer:
"""
Analisi grafo per rilevazione di insurance fraud rings.
Costruisce un grafo bipartito: sinistri <-> entità (persone, officine, medici)
e applica algoritmi di community detection per trovare cluster sospetti.
"""
# Soglie per classificare un nodo come sospetto
SUSPICION_THRESHOLDS = {
"claimant": {"min_claims": 3, "fraud_rate": 0.2},
"repair_shop": {"min_claims": 5, "fraud_rate": 0.15},
"attorney": {"min_claims": 10, "fraud_rate": 0.15},
"medical_provider": {"min_claims": 8, "fraud_rate": 0.20},
}
def __init__(self, claims_df: pd.DataFrame) -> None:
self.claims_df = claims_df
self.graph = self._build_graph()
def _build_graph(self) -> nx.Graph:
"""
Costruisce un grafo di co-occorrenza tra entità nei sinistri.
Due entità sono connesse se compaiono nello stesso sinistro.
"""
G = nx.Graph()
df = self.claims_df
# Aggiungi nodi per ogni tipo di entità
entity_columns = [
("claimant_id", "claimant"),
("third_party_id", "third_party"),
("repair_shop_id", "repair_shop"),
("attorney_id", "attorney"),
("medical_provider_id", "medical_provider"),
]
for _, row in df.iterrows():
entities_in_claim: List[Tuple[str, str]] = []
for col, entity_type in entity_columns:
if pd.notna(row.get(col)):
node_id = f"{entity_type}_{row[col]}"
G.add_node(node_id, entity_type=entity_type, entity_id=str(row[col]))
entities_in_claim.append((node_id, entity_type))
# Collega tutte le entità che compaiono nello stesso sinistro
for i, (node1, type1) in enumerate(entities_in_claim):
for node2, type2 in entities_in_claim[i+1:]:
if G.has_edge(node1, node2):
G[node1][node2]["weight"] += 1
G[node1][node2]["claims"].append(str(row.get("claim_id", "")))
else:
G.add_edge(
node1, node2,
weight=1,
claims=[str(row.get("claim_id", ""))],
)
return G
def detect_fraud_rings(self, min_ring_size: int = 3) -> List[FraudRing]:
"""
Rileva i fraud ring tramite community detection (Louvain algorithm).
Filtra le community per dimensione e score di sospetto.
"""
if self.graph.number_of_nodes() < min_ring_size:
return []
# Louvain community detection
partition: Dict[str, int] = best_partition(self.graph, weight="weight")
# Raggruppa nodi per community
communities: Dict[int, List[str]] = {}
for node, community_id in partition.items():
communities.setdefault(community_id, []).append(node)
fraud_rings: List[FraudRing] = []
for community_id, members in communities.items():
if len(members) < min_ring_size:
continue
ring = self._evaluate_community(community_id, members)
if ring is not None:
fraud_rings.append(ring)
return sorted(fraud_rings, key=lambda r: r.avg_fraud_score, reverse=True)
def _evaluate_community(
self, community_id: int, members: List[str]
) -> "Optional[FraudRing]":
"""Valuta se una community e sospetta e costruisce il FraudRing."""
# Raccogli tutti i sinistri associati ai membri della community
claim_ids: Set[str] = set()
for u, v, data in self.graph.edges(members, data=True):
claim_ids.update(data.get("claims", []))
if not claim_ids:
return None
# Statistiche sui sinistri della community
community_claims = self.claims_df[
self.claims_df["claim_id"].astype(str).isin(claim_ids)
]
if community_claims.empty:
return None
total_claimed = float(community_claims["reported_amount"].sum())
fraud_col = "is_fraud" if "is_fraud" in community_claims.columns else None
fraud_rate = float(community_claims[fraud_col].mean()) if fraud_col else 0.0
# Score di sospetto basato su: densita del grafo, fraud rate storico, importi
subgraph = self.graph.subgraph(members)
density = nx.density(subgraph)
avg_weight = float(np.mean([
d["weight"] for _, _, d in subgraph.edges(data=True)
])) if subgraph.number_of_edges() > 0 else 0.0
suspicion_score = (
density * 0.4 +
fraud_rate * 0.4 +
min(avg_weight / 10, 1.0) * 0.2
)
# Filtra community non sospette
if suspicion_score < 0.3 and fraud_rate < 0.1:
return None
ring_type = self._classify_ring_type(members)
evidence = self._build_evidence_summary(members, community_claims, fraud_rate, density)
return FraudRing(
ring_id=f"ring_{community_id}",
members=members,
claim_ids=list(claim_ids),
total_claimed=round(total_claimed, 2),
avg_fraud_score=round(suspicion_score, 3),
ring_type=ring_type,
evidence_summary=evidence,
)
def _classify_ring_type(self, members: List[str]) -> str:
"""Classifica il tipo di ring in base alle entità presenti."""
types = [self.graph.nodes[m].get("entity_type", "") for m in members]
type_counts: Dict[str, int] = {}
for t in types:
type_counts[t] = type_counts.get(t, 0) + 1
if type_counts.get("medical_provider", 0) >= 2:
return "medical_mill"
if type_counts.get("repair_shop", 0) >= 2:
return "repair_shop_ring"
if type_counts.get("attorney", 0) >= 1 and type_counts.get("medical_provider", 0) >= 1:
return "organized_injury_ring"
return "staged_accident_ring"
def _build_evidence_summary(
self,
members: List[str],
claims: pd.DataFrame,
fraud_rate: float,
density: float,
) -> str:
n_claims = len(claims)
total = claims["reported_amount"].sum() if not claims.empty else 0
return (
f"Community di {len(members)} soggetti, {n_claims} sinistri collegati, "
f"EUR {total:.0f} totale rivendicato. "
f"Fraud rate storico: {fraud_rate:.0%}. "
f"Densita grafo: {density:.2f}."
)
def get_node_centrality_scores(self) -> Dict[str, float]:
"""
Calcola la centralità di ogni nodo nel grafo (betweenness centrality).
Nodi ad alta centralità sono spesso i coordinatori del ring.
"""
centrality = nx.betweenness_centrality(
self.graph, weight="weight", normalized=True
)
return {node: round(score, 6) for node, score in centrality.items()}
不正スコアリングのアンサンブル モデル
すべての種類の詐欺を単一のアルゴリズムで捕捉できるわけではありません。本番環境での最も堅牢なアプローチ 複数のパターンを 1 つのアンサンブルに結合します。表形式パターンの場合は XGBoost、パターンの場合は Isolation Forest ラベルのないデータの異常検出と、組み込むグラフ機能モデル グラフから抽出された関係信号。
import xgboost as xgb
from sklearn.ensemble import RandomForestClassifier, IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.calibration import CalibratedClassifierCV
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
class InsuranceFraudEnsemble:
"""
Ensemble di modelli per fraud detection assicurativo.
Combina:
1. XGBoost classifier (classificazione supervisionata)
2. Isolation Forest (anomaly detection non supervisionata)
3. Score da graph centrality (segnale relazionale)
Il vantaggio dell'ensemble e la robustezza: se un modello
manca un tipo di frode, gli altri possono compensare.
"""
XGB_PARAMS: Dict = {
"objective": "binary:logistic",
"eval_metric": "aucpr", # area under precision-recall: meglio di AUC per dati sbilanciati
"max_depth": 6,
"learning_rate": 0.05,
"n_estimators": 400,
"min_child_weight": 10,
"subsample": 0.8,
"colsample_bytree": 0.8,
"scale_pos_weight": 20, # compensa lo sbilanciamento: tipicamente 1 frode ogni 20 claim
"reg_alpha": 0.1,
"reg_lambda": 1.0,
"tree_method": "hist",
"early_stopping_rounds": 30,
}
# Pesi ensemble (devono sommare a 1)
ENSEMBLE_WEIGHTS: Dict[str, float] = {
"xgboost": 0.55,
"isolation_forest": 0.20,
"graph_centrality": 0.25,
}
def __init__(self) -> None:
self.xgb_model: Optional[xgb.XGBClassifier] = None
self.iso_forest: Optional[IsolationForest] = None
self.scaler = StandardScaler()
self.feature_names: List[str] = []
self._is_fitted = False
def fit(
self,
X: pd.DataFrame,
y: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
) -> "InsuranceFraudEnsemble":
"""
Addestra l'ensemble.
Args:
X, y: training set (y=1 se frode, y=0 altrimenti)
X_val, y_val: validation set per early stopping
"""
self.feature_names = X.columns.tolist()
# 1. XGBoost supervisionato
print("Training XGBoost classifier...")
self.xgb_model = xgb.XGBClassifier(**self.XGB_PARAMS)
self.xgb_model.fit(
X, y,
eval_set=[(X_val, y_val)],
verbose=50,
)
# 2. Isolation Forest (non supervisionato, addestrato solo su non-frodi)
print("Training Isolation Forest on clean claims...")
X_clean = X[y == 0]
X_scaled = self.scaler.fit_transform(X_clean)
self.iso_forest = IsolationForest(
n_estimators=200,
contamination=0.05, # stima del 5% di anomalie nel set pulito
random_state=42,
n_jobs=-1,
)
self.iso_forest.fit(X_scaled)
self._is_fitted = True
return self
def predict_fraud_score(
self,
X: pd.DataFrame,
graph_centrality_scores: Optional[Dict[str, float]] = None,
) -> np.ndarray:
"""
Calcola lo score di frode per ogni claim.
Args:
X: feature matrix
graph_centrality_scores: score di centralità dal grafo (opzionale)
Returns:
Array di score [0,1] dove 1 = massima probabilità di frode
"""
if not self._is_fitted:
raise RuntimeError("Ensemble non addestrato. Chiamare fit() prima.")
# Score XGBoost
xgb_scores = self.xgb_model.predict_proba(X)[:, 1]
# Score Isolation Forest (normalizzato a [0,1])
X_scaled = self.scaler.transform(X)
iso_raw = self.iso_forest.decision_function(X_scaled)
# score negativo = anomalia; normalizziamo invertendo e scalando
iso_scores = 1 - (iso_raw - iso_raw.min()) / (iso_raw.max() - iso_raw.min() + 1e-10)
# Graph centrality scores
if graph_centrality_scores and "graph_centrality" in X.columns:
graph_scores = X["graph_centrality"].values
else:
graph_scores = np.zeros(len(X))
# Ensemble ponderato
ensemble_score = (
self.ENSEMBLE_WEIGHTS["xgboost"] * xgb_scores +
self.ENSEMBLE_WEIGHTS["isolation_forest"] * iso_scores +
self.ENSEMBLE_WEIGHTS["graph_centrality"] * graph_scores
)
return np.clip(ensemble_score, 0, 1)
def classify_risk_tier(
self, scores: np.ndarray
) -> List[str]:
"""Classifica gli score in tier di rischio per routing."""
tiers = []
for score in scores:
if score < 0.2:
tiers.append("GREEN") # auto-approve
elif score < 0.4:
tiers.append("YELLOW") # standard review
elif score < 0.7:
tiers.append("ORANGE") # enhanced review
else:
tiers.append("RED") # SIU referral
return tiers
def get_feature_importance(self) -> pd.DataFrame:
"""Feature importance dal modello XGBoost."""
if self.xgb_model is None:
raise RuntimeError("Modello non addestrato.")
importances = self.xgb_model.feature_importances_
return pd.DataFrame({
"feature": self.feature_names,
"importance": importances,
}).sort_values("importance", ascending=False)
Kafka と Faust によるリアルタイム不正スコアリング
速度を悪用した詐欺の場合 - すでに損傷した車両に保険をかけるなど または、異なる会社で同日に開始された複数の請求 - 不正行為のスコアリングが必要である で起こる リアルタイム、FNOLの時点では、夜間バッチではありません。 Apache Kafka と Faust (Python ストリーム処理) を使用すると、リアルタイム パイプラインを構築できます 遅延は 1 秒未満です。
import faust
from typing import Optional
import json
# Schema del messaggio FNOL in entrata
class FNOLEvent(faust.Record, serializer="json"):
claim_id: str
policy_id: str
claimant_id: str
claim_type: str
reported_amount: float
incident_date: str
report_date: str
third_party_ids: list
repair_shop_id: Optional[str] = None
attorney_id: Optional[str] = None
class FraudScoreResult(faust.Record, serializer="json"):
claim_id: str
fraud_score: float
risk_tier: str
fraud_ring_detected: bool
ring_id: Optional[str]
routing_decision: str
processing_time_ms: float
# Configurazione app Faust
app = faust.App(
"insurance-fraud-detector",
broker="kafka://kafka-broker:9092",
value_serializer="json",
)
fnol_topic = app.topic("insurance.fnol.events", value_type=FNOLEvent)
fraud_scores_topic = app.topic("insurance.fraud.scores", value_type=FraudScoreResult)
# Lazy loading del modello ensemble (caricato una volta all'avvio)
_fraud_ensemble = None
_feature_engineer = None
_graph_analyzer = None
def get_fraud_ensemble():
"""Lazy loading del modello per evitare load al startup."""
global _fraud_ensemble
if _fraud_ensemble is None:
import mlflow
_fraud_ensemble = mlflow.sklearn.load_model(
"models:/insurance-fraud-ensemble/Production"
)
return _fraud_ensemble
@app.agent(fnol_topic)
async def process_fnol_events(events):
"""
Agent Faust che processa ogni FNOL in real-time.
Per ogni evento:
1. Estrae le feature comportamentali
2. Calcola il fraud score ensemble
3. Verifica connessioni con fraud rings noti
4. Pubblica il risultato nel topic di output
"""
import time
async for event in events:
start_time = time.monotonic()
try:
# Costruisci le feature (da cache Redis o DB)
features = await _build_features_async(event)
# Calcola fraud score
ensemble = get_fraud_ensemble()
fraud_score = float(ensemble.predict_fraud_score(features)[0])
risk_tier = ensemble.classify_risk_tier([fraud_score])[0]
# Verifica connessioni con ring noti
ring_id, ring_detected = await _check_ring_connections(event)
# Aggiusta score se ring rilevato
if ring_detected:
fraud_score = min(1.0, fraud_score * 1.4)
risk_tier = "RED"
# Determina routing
routing = _routing_decision(risk_tier, ring_detected, event)
elapsed_ms = (time.monotonic() - start_time) * 1000
result = FraudScoreResult(
claim_id=event.claim_id,
fraud_score=round(fraud_score, 4),
risk_tier=risk_tier,
fraud_ring_detected=ring_detected,
ring_id=ring_id,
routing_decision=routing,
processing_time_ms=round(elapsed_ms, 2),
)
await fraud_scores_topic.send(key=event.claim_id, value=result)
except Exception as exc:
# Non bloccare lo stream per errori singoli
print(f"[ERROR] Claim {event.claim_id}: {exc}")
# Invia con score conservativo per revisione manuale
result = FraudScoreResult(
claim_id=event.claim_id,
fraud_score=0.5,
risk_tier="YELLOW",
fraud_ring_detected=False,
ring_id=None,
routing_decision="MANUAL_REVIEW_ERROR",
processing_time_ms=-1.0,
)
await fraud_scores_topic.send(key=event.claim_id, value=result)
async def _build_features_async(event: FNOLEvent) -> "pd.DataFrame":
"""Costruisce le feature in modo asincrono da Redis/DB."""
import pandas as pd
# In produzione: lookup asincrono su Redis per dati real-time
# e su DB per storico sinistri e policy
features = {
"reported_amount_log": 0.0,
"report_delay_days": 0.0,
"prior_claims_total": 0.0,
"n_third_parties": float(len(event.third_party_ids)),
"attorney_present": float(event.attorney_id is not None),
"attorney_and_medical": 0.0,
"incident_weekend": 0.0,
"round_amount": float(event.reported_amount % 1000 == 0),
"amount_percentile_for_type": 0.5,
"graph_centrality": 0.0,
}
return pd.DataFrame([features])
async def _check_ring_connections(event: FNOLEvent):
"""Verifica se il richiedente e connesso a ring fraudolenti noti."""
# In produzione: query su graph DB (Neo4j) o Redis per ring attivi
return None, False
def _routing_decision(
risk_tier: str, ring_detected: bool, event: FNOLEvent
) -> str:
if ring_detected or risk_tier == "RED":
return "SIU_REFERRAL"
elif risk_tier == "ORANGE":
return "ENHANCED_REVIEW"
elif risk_tier == "YELLOW":
return "STANDARD_REVIEW"
return "AUTO_APPROVE"
ベストプラクティスとアンチパターン
保険詐欺検出のベストプラクティス
- 必須のアンサンブル: 常に教師ありモデル (XGBoost)、教師なしモデル (Isolation Forest)、およびネットワーク信号 (グラフ分析) を組み合わせます。すべての種類の詐欺をカバーする単一のアプローチはありません
- 請求の種類ごとの閾値の調整: 自動車詐欺の最適なしきい値は、生命保険や傷害保険の場合と同じではありません。事業分野ごとに調整する
- 必須のフィードバック ループ: SIU 調査の結果は、トレーニング セットにラベルとして含める必要があります。フィードバック ループがなければ、モデルは時間の経過とともに改善されません
- リアルタイム接続用のグラフ DB: ミリ秒の接続クエリには Neo4j または ArangoDB を使用します。従来のグラフ (NetworkX) は数百万ノードを超えるスケールには対応しません
- すべての決定を文書化します。 各不正フラグには、スコアを決定した特徴を含む詳細な監査証跡が必要です。法的措置が必要な場合には必須です。
避けるべきアンチパターン
- 誤検知が多い: 誤検知率が 2% を超えると、誠実な顧客の信頼が損なわれ、運用コストが発生します。精度と再現率を監視します
- 既知の詐欺についてのみトレーニングされたモデル: 詐欺は進化する。過去のパターンのみを知っているモデルは新しいパターンを検出しません。予期しない異常を捕捉するには分離フォレストを使用します
- 代理による差別: 郵便番号、職業、国籍などの変数は、差別の代用となる可能性があります。導入前にさまざまな影響をテストする
- すべての夜間バッチスコアリング: 素早い詐欺(同日、複数の会社)にはリアルタイムのスコアリングが必要です。このような場合には夜間バッチでは不十分です
結論と次のステップ
保険における最新の不正行為検出には、複数レベルのアプローチが必要です: 機能 個々の不正行為のスコアリング、不正なネットワークのグラフ分析 迅速な不正行為に対する組織的かつリアルタイムの処理。さまざまなモデルのアンサンブル 詐欺パターンを可能な限り広範囲にカバーします。
システムを成功させる鍵は次のとおりです。SIU の結果による継続的なフィードバック ループ、 偽陽性率を監視して誠実な顧客に不利益を与えないようにします。 あらゆる法的措置をサポートする不変の監査証跡。
シリーズの次の記事でさらに詳しく説明します ACORD標準とAPIの統合 保険: 異なる保険システム間の相互運用性を実装する方法 標準の ACORD XML/JSON メッセージを使用します。
インシュアテックエンジニアリングシリーズ
- 01 - 開発者向けの保険ドメイン: 製品、アクター、データ モデル
- 02 - クラウドネイティブのポリシー管理: API ファーストのアーキテクチャ
- 03 - テレマティクス パイプライン: 大規模な UBI データ処理
- 04 - AI 引受業務: 特徴エンジニアリングとリスク スコアリング
- 05 - 請求の自動化: コンピューター ビジョンと NLP
- 06 - 不正行為の検出: グラフ分析と行動シグナル (この記事)
- 07 - ACORD Standard と保険 API の統合
- 08 - コンプライアンス エンジニアリング: ソルベンシー II および IFRS 17







