보험 사기 탐지: 그래프 분석 및 행동 신호
보험사기는 체계적이고 세계적인 문제입니다. 업계 추정에 따르면, 사이에 10%와 15% 매년 지불된 청구액 중 사기 요소가 포함되어 있음 — 총 비용이 그 이상으로 추정되는 경우 연간 800억 달러 전 세계적으로. 유럽에서는 Insurance Europe이 사기로 인한 손실을 대략적으로 추산합니다. 연간 130억 유로. 이 비용은 필연적으로 다음에 영향을 미칩니다. 정직한 고객이 지불한 보험료로 사기 탐지가 단순한 문제가 아닌 기업 수익성뿐만 아니라 보험 시스템의 공정성도 중요합니다.
보험 사기는 기회주의적 사기(과장된 사기)부터 다양한 형태로 나타납니다. 실제 피해), 계획된 사기(조직적 사고, 존재하지 않는 청구), 최대 의사, 바디샵, 변호사 및 피보험자. 마지막 유형인 조직적 사기가 가장 해롭습니다. 경제적이며 전통적인 방법으로는 검출하기가 가장 어렵습니다.
최신 AI, 특히 다음 모델 그래프 분석 탐지를 위해 사기성 네트워크와 그 모델 행동 신호 분석 식별을 위해 변칙적인 패턴으로 인해 사기를 탐지하고 예방하는 기업의 능력에 혁명이 일어났습니다. 보험 사기 탐지를 위한 분석 시장은 빠른 속도로 성장하고 있습니다. CAGR 33% (2025-2032), GNN(Graph Neural Networks), 앙상블 및 스트림 모델 기반 실시간 처리.
무엇을 배울 것인가
- 보험 사기 분류 및 탐지 패턴
- 사기 점수를 위한 행동 특성 엔지니어링
- 조직화된 사기 네트워크 탐지를 위한 그래프 분석
- 보험 사기 탐지를 위한 그래프 신경망(GNN)
- 앙상블 모델: XGBoost + Random Forest + Isolation Forest
- Apache Kafka 및 Faust를 사용한 실시간 사기 채점
- SIU(Special Investigation Unit) 워크플로 및 사례 관리
보험 사기의 분류
효과적인 탐지 시스템을 설계하기 위한 사기 유형과 전제 조건을 이해합니다. 각 유형에는 서로 다른 ML 접근 방식이 필요한 고유한 패턴이 있습니다.
사기 유형 및 탐지 기술
| 사기 유형 | Esempio | 빈도 | ML 접근 방식 |
|---|---|---|---|
| 기회주의적 | 실제 사고 피해를 과장하다 | 높은 | 이상징후 탐지, AI 피해 추정 |
| 개별적으로 계획됨 | 자신의 차량 방화 | 평균 | 행동 신호, 링크 분석 |
| 편성/링 | 다수의 공범자를 대상으로 한 시뮬레이션 사고 네트워크 | 낮음(그러나 영향은 높음) | 그래프 분석, GNN, 커뮤니티 탐지 |
| 내부(내부자) | 허위 청구를 승인하는 대리인 | 매우 낮음 | 사용자 행동 분석, 네트워크 분석 |
| 합성 정체성 | 허위 데이터로 서명된 정책 | 성장 | 신원 확인 ML, 그래프 링크 |
사기 채점을 위한 행동 특성 엔지니어링
사기 탐지를 위한 가장 예측 가능한 특징은 기본적인 인구통계학적 특징(연령, 성별, 직업), 그러나 행동 신청자의: 보고 시기, 서술의 일관성, 회사와의 상호 작용 이력, 사고 시 다른 당사자와의 관계.
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, date, timedelta
@dataclass
class ClaimContext:
"""Contesto completo di un sinistro per fraud scoring."""
claim_id: str
policy_id: str
claimant_id: str
claim_type: str
incident_date: date
report_date: date
incident_day_of_week: int
incident_hour: int
location_zip: str
reported_amount: float
third_party_ids: List[str]
repair_shop_id: Optional[str]
attorney_id: Optional[str]
medical_provider_id: Optional[str]
class FraudFeatureEngineer:
"""
Feature engineering per fraud detection assicurativo.
Costruisce feature comportamentali, temporali e relazionali
per catturare i pattern tipici della frode assicurativa.
"""
def __init__(
self,
historical_claims: pd.DataFrame,
policy_db: pd.DataFrame,
) -> None:
self.historical = historical_claims
self.policy_db = policy_db
def build_features(self, ctx: ClaimContext) -> Dict[str, float]:
features: Dict[str, float] = {}
features.update(self._temporal_features(ctx))
features.update(self._behavioral_features(ctx))
features.update(self._network_features(ctx))
features.update(self._policy_features(ctx))
features.update(self._claim_amount_features(ctx))
return features
def _temporal_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature temporali: quando e avvenuto l'incidente e quando e stato denunciato."""
report_delay_days = (ctx.report_date - ctx.incident_date).days
return {
# Pattern sospetto: denunciare tardi o molto in anticipo
"report_delay_days": float(report_delay_days),
"report_delay_over_30": float(report_delay_days > 30),
"report_delay_over_7": float(report_delay_days > 7),
"same_day_report": float(report_delay_days == 0),
# Pattern sospetto: incidenti nel weekend o di notte
"incident_weekend": float(ctx.incident_day_of_week >= 5),
"incident_night": float(ctx.incident_hour < 6 or ctx.incident_hour >= 22),
"incident_monday": float(ctx.incident_day_of_week == 0),
# Fine mese = pressione finanziaria?
"incident_end_month": float(ctx.incident_date.day >= 25),
}
def _behavioral_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature comportamentali basate sullo storico del richiedente."""
claimant_history = self.historical[
self.historical["claimant_id"] == ctx.claimant_id
]
n_prior_claims = len(claimant_history)
n_prior_fraudulent = claimant_history.get("is_fraud", pd.Series([0])).sum()
n_claims_12m = len(claimant_history[
claimant_history["incident_date"] >=
(ctx.incident_date - timedelta(days=365)).strftime("%Y-%m-%d")
]) if not claimant_history.empty else 0
return {
"prior_claims_total": float(n_prior_claims),
"prior_fraud_confirmed": float(n_prior_fraudulent),
"claims_last_12m": float(n_claims_12m),
"high_claim_frequency": float(n_claims_12m >= 2),
"repeat_claimant": float(n_prior_claims >= 3),
"fraud_history": float(n_prior_fraudulent > 0),
# Cambi polizza frequenti = comportamento anomalo?
"policy_age_days": float(
(ctx.incident_date - self._get_policy_start(ctx.policy_id)).days
),
"new_policy_claim": float(
(ctx.incident_date - self._get_policy_start(ctx.policy_id)).days < 90
),
}
def _network_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""
Feature di rete: connessioni sospette con altri attori.
Identifica se il richiedente e connesso a terze parti, officine,
avvocati o medici che compaiono frequentemente in sinistri fraudolenti.
"""
# Conta occorrenze delle terze parti in sinistri storici
tp_fraud_rate = self._entity_fraud_rate("third_party_id", ctx.third_party_ids)
shop_fraud_rate = self._entity_fraud_rate(
"repair_shop_id", [ctx.repair_shop_id] if ctx.repair_shop_id else []
)
attorney_fraud_rate = self._entity_fraud_rate(
"attorney_id", [ctx.attorney_id] if ctx.attorney_id else []
)
medical_fraud_rate = self._entity_fraud_rate(
"medical_provider_id",
[ctx.medical_provider_id] if ctx.medical_provider_id else []
)
return {
"n_third_parties": float(len(ctx.third_party_ids)),
"many_third_parties": float(len(ctx.third_party_ids) >= 3),
"tp_avg_fraud_rate": tp_fraud_rate,
"has_high_fraud_tp": float(tp_fraud_rate > 0.3),
"repair_shop_fraud_rate": shop_fraud_rate,
"attorney_present": float(ctx.attorney_id is not None),
"attorney_fraud_rate": attorney_fraud_rate,
"medical_provider_fraud_rate": medical_fraud_rate,
# Combinazione attorney + medical provider in incidente auto = sospetto
"attorney_and_medical": float(
ctx.attorney_id is not None and ctx.medical_provider_id is not None
),
}
def _policy_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature relative alla polizza."""
policy_row = self.policy_db[self.policy_db["policy_id"] == ctx.policy_id]
if policy_row.empty:
return {"policy_found": 0.0}
policy = policy_row.iloc[0]
return {
"policy_found": 1.0,
"policy_premium": float(policy.get("annual_premium", 0)),
"coverage_amount": float(policy.get("coverage_amount", 0)),
# Rapporto alto tra copertura e premio = polizza sottostimata?
"coverage_premium_ratio": float(
policy.get("coverage_amount", 0) /
max(policy.get("annual_premium", 1), 1)
),
"recent_coverage_increase": float(
policy.get("coverage_increased_90d", False)
),
}
def _claim_amount_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature relative all'importo dichiarato."""
# Distribuzione storica degli importi per tipo sinistro
type_amounts = self.historical[
self.historical["claim_type"] == ctx.claim_type
]["reported_amount"]
if type_amounts.empty:
return {"amount_percentile": 0.5}
percentile = float(
(type_amounts < ctx.reported_amount).mean()
)
z_score = float(
(ctx.reported_amount - type_amounts.mean()) / max(type_amounts.std(), 1)
)
return {
"reported_amount": ctx.reported_amount,
"reported_amount_log": np.log1p(ctx.reported_amount),
"amount_percentile_for_type": percentile,
"amount_z_score": z_score,
"amount_outlier": float(abs(z_score) > 2.5),
"round_amount": float(ctx.reported_amount % 1000 == 0), # importi "tondi" = sospetto
}
def _get_policy_start(self, policy_id: str) -> date:
row = self.policy_db[self.policy_db["policy_id"] == policy_id]
if row.empty:
return date.today() - timedelta(days=365)
start_str = row.iloc[0].get("start_date", "")
try:
return datetime.strptime(str(start_str), "%Y-%m-%d").date()
except (ValueError, TypeError):
return date.today() - timedelta(days=365)
def _entity_fraud_rate(
self, column: str, entity_ids: List[str]
) -> float:
"""Calcola il tasso di frode storico associato a una lista di entità."""
if not entity_ids or column not in self.historical.columns:
return 0.0
mask = self.historical[column].isin(entity_ids)
subset = self.historical[mask]
if subset.empty:
return 0.0
fraud_col = "is_fraud" if "is_fraud" in subset.columns else None
if fraud_col is None:
return 0.0
return float(subset[fraud_col].mean())
사기성 네트워크 탐지를 위한 그래프 분석
조직적인 사기(보험회사)는 기존 ML 기술에서는 보이지 않습니다. 그들은 주장을 개별적으로 평가합니다. 고립된 사고는 완벽하게 합법적인 것처럼 보일 수 있습니다. 하지만 관계 네트워크 내에서 보면 — 동일한 제3자, 동일한 수십 건의 청구에 등장한 동일한 변호사인 워크샵 — 패턴이 명확하게 나타납니다.
그래프를 사용하면 다음과 같은 관계를 모델링할 수 있습니다. 매듭 그들은 과목이다 (피보험자, 제3자, 작업장, 변호사, 의사) 아치 그들은 연결(동일한 사고, 동일한 작업장, 동일한 거리). 알고리즘은 커뮤니티 감지 의심스러운 클러스터를 자동으로 식별합니다.
import networkx as nx
import pandas as pd
import numpy as np
from typing import Dict, List, Set, Tuple
from dataclasses import dataclass
from community import best_partition # python-louvain
@dataclass
class FraudRing:
"""Un cluster sospetto identificato dal grafo."""
ring_id: str
members: List[str] # nodi del cluster
claim_ids: List[str]
total_claimed: float
avg_fraud_score: float
ring_type: str # es. "medical_mill", "staged_accident", "repair_shop_ring"
evidence_summary: str
class InsuranceFraudGraphAnalyzer:
"""
Analisi grafo per rilevazione di insurance fraud rings.
Costruisce un grafo bipartito: sinistri <-> entità (persone, officine, medici)
e applica algoritmi di community detection per trovare cluster sospetti.
"""
# Soglie per classificare un nodo come sospetto
SUSPICION_THRESHOLDS = {
"claimant": {"min_claims": 3, "fraud_rate": 0.2},
"repair_shop": {"min_claims": 5, "fraud_rate": 0.15},
"attorney": {"min_claims": 10, "fraud_rate": 0.15},
"medical_provider": {"min_claims": 8, "fraud_rate": 0.20},
}
def __init__(self, claims_df: pd.DataFrame) -> None:
self.claims_df = claims_df
self.graph = self._build_graph()
def _build_graph(self) -> nx.Graph:
"""
Costruisce un grafo di co-occorrenza tra entità nei sinistri.
Due entità sono connesse se compaiono nello stesso sinistro.
"""
G = nx.Graph()
df = self.claims_df
# Aggiungi nodi per ogni tipo di entità
entity_columns = [
("claimant_id", "claimant"),
("third_party_id", "third_party"),
("repair_shop_id", "repair_shop"),
("attorney_id", "attorney"),
("medical_provider_id", "medical_provider"),
]
for _, row in df.iterrows():
entities_in_claim: List[Tuple[str, str]] = []
for col, entity_type in entity_columns:
if pd.notna(row.get(col)):
node_id = f"{entity_type}_{row[col]}"
G.add_node(node_id, entity_type=entity_type, entity_id=str(row[col]))
entities_in_claim.append((node_id, entity_type))
# Collega tutte le entità che compaiono nello stesso sinistro
for i, (node1, type1) in enumerate(entities_in_claim):
for node2, type2 in entities_in_claim[i+1:]:
if G.has_edge(node1, node2):
G[node1][node2]["weight"] += 1
G[node1][node2]["claims"].append(str(row.get("claim_id", "")))
else:
G.add_edge(
node1, node2,
weight=1,
claims=[str(row.get("claim_id", ""))],
)
return G
def detect_fraud_rings(self, min_ring_size: int = 3) -> List[FraudRing]:
"""
Rileva i fraud ring tramite community detection (Louvain algorithm).
Filtra le community per dimensione e score di sospetto.
"""
if self.graph.number_of_nodes() < min_ring_size:
return []
# Louvain community detection
partition: Dict[str, int] = best_partition(self.graph, weight="weight")
# Raggruppa nodi per community
communities: Dict[int, List[str]] = {}
for node, community_id in partition.items():
communities.setdefault(community_id, []).append(node)
fraud_rings: List[FraudRing] = []
for community_id, members in communities.items():
if len(members) < min_ring_size:
continue
ring = self._evaluate_community(community_id, members)
if ring is not None:
fraud_rings.append(ring)
return sorted(fraud_rings, key=lambda r: r.avg_fraud_score, reverse=True)
def _evaluate_community(
self, community_id: int, members: List[str]
) -> "Optional[FraudRing]":
"""Valuta se una community e sospetta e costruisce il FraudRing."""
# Raccogli tutti i sinistri associati ai membri della community
claim_ids: Set[str] = set()
for u, v, data in self.graph.edges(members, data=True):
claim_ids.update(data.get("claims", []))
if not claim_ids:
return None
# Statistiche sui sinistri della community
community_claims = self.claims_df[
self.claims_df["claim_id"].astype(str).isin(claim_ids)
]
if community_claims.empty:
return None
total_claimed = float(community_claims["reported_amount"].sum())
fraud_col = "is_fraud" if "is_fraud" in community_claims.columns else None
fraud_rate = float(community_claims[fraud_col].mean()) if fraud_col else 0.0
# Score di sospetto basato su: densita del grafo, fraud rate storico, importi
subgraph = self.graph.subgraph(members)
density = nx.density(subgraph)
avg_weight = float(np.mean([
d["weight"] for _, _, d in subgraph.edges(data=True)
])) if subgraph.number_of_edges() > 0 else 0.0
suspicion_score = (
density * 0.4 +
fraud_rate * 0.4 +
min(avg_weight / 10, 1.0) * 0.2
)
# Filtra community non sospette
if suspicion_score < 0.3 and fraud_rate < 0.1:
return None
ring_type = self._classify_ring_type(members)
evidence = self._build_evidence_summary(members, community_claims, fraud_rate, density)
return FraudRing(
ring_id=f"ring_{community_id}",
members=members,
claim_ids=list(claim_ids),
total_claimed=round(total_claimed, 2),
avg_fraud_score=round(suspicion_score, 3),
ring_type=ring_type,
evidence_summary=evidence,
)
def _classify_ring_type(self, members: List[str]) -> str:
"""Classifica il tipo di ring in base alle entità presenti."""
types = [self.graph.nodes[m].get("entity_type", "") for m in members]
type_counts: Dict[str, int] = {}
for t in types:
type_counts[t] = type_counts.get(t, 0) + 1
if type_counts.get("medical_provider", 0) >= 2:
return "medical_mill"
if type_counts.get("repair_shop", 0) >= 2:
return "repair_shop_ring"
if type_counts.get("attorney", 0) >= 1 and type_counts.get("medical_provider", 0) >= 1:
return "organized_injury_ring"
return "staged_accident_ring"
def _build_evidence_summary(
self,
members: List[str],
claims: pd.DataFrame,
fraud_rate: float,
density: float,
) -> str:
n_claims = len(claims)
total = claims["reported_amount"].sum() if not claims.empty else 0
return (
f"Community di {len(members)} soggetti, {n_claims} sinistri collegati, "
f"EUR {total:.0f} totale rivendicato. "
f"Fraud rate storico: {fraud_rate:.0%}. "
f"Densita grafo: {density:.2f}."
)
def get_node_centrality_scores(self) -> Dict[str, float]:
"""
Calcola la centralità di ogni nodo nel grafo (betweenness centrality).
Nodi ad alta centralità sono spesso i coordinatori del ring.
"""
centrality = nx.betweenness_centrality(
self.graph, weight="weight", normalized=True
)
return {node: round(score, 6) for node, score in centrality.items()}
사기 채점을 위한 앙상블 모델
단일 알고리즘으로 모든 유형의 사기를 포착할 수는 없습니다. 생산에서 가장 강력한 접근 방식 여러 패턴을 하나의 앙상블로 결합: 테이블 형식 패턴의 경우 XGBoost, 테이블 형식 패턴의 경우 Isolation Forest 레이블이 지정되지 않은 데이터에 대한 이상 탐지 및 통합할 그래프 기능 모델 그래프에서 추출된 관계 신호.
import xgboost as xgb
from sklearn.ensemble import RandomForestClassifier, IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.calibration import CalibratedClassifierCV
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
class InsuranceFraudEnsemble:
"""
Ensemble di modelli per fraud detection assicurativo.
Combina:
1. XGBoost classifier (classificazione supervisionata)
2. Isolation Forest (anomaly detection non supervisionata)
3. Score da graph centrality (segnale relazionale)
Il vantaggio dell'ensemble e la robustezza: se un modello
manca un tipo di frode, gli altri possono compensare.
"""
XGB_PARAMS: Dict = {
"objective": "binary:logistic",
"eval_metric": "aucpr", # area under precision-recall: meglio di AUC per dati sbilanciati
"max_depth": 6,
"learning_rate": 0.05,
"n_estimators": 400,
"min_child_weight": 10,
"subsample": 0.8,
"colsample_bytree": 0.8,
"scale_pos_weight": 20, # compensa lo sbilanciamento: tipicamente 1 frode ogni 20 claim
"reg_alpha": 0.1,
"reg_lambda": 1.0,
"tree_method": "hist",
"early_stopping_rounds": 30,
}
# Pesi ensemble (devono sommare a 1)
ENSEMBLE_WEIGHTS: Dict[str, float] = {
"xgboost": 0.55,
"isolation_forest": 0.20,
"graph_centrality": 0.25,
}
def __init__(self) -> None:
self.xgb_model: Optional[xgb.XGBClassifier] = None
self.iso_forest: Optional[IsolationForest] = None
self.scaler = StandardScaler()
self.feature_names: List[str] = []
self._is_fitted = False
def fit(
self,
X: pd.DataFrame,
y: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
) -> "InsuranceFraudEnsemble":
"""
Addestra l'ensemble.
Args:
X, y: training set (y=1 se frode, y=0 altrimenti)
X_val, y_val: validation set per early stopping
"""
self.feature_names = X.columns.tolist()
# 1. XGBoost supervisionato
print("Training XGBoost classifier...")
self.xgb_model = xgb.XGBClassifier(**self.XGB_PARAMS)
self.xgb_model.fit(
X, y,
eval_set=[(X_val, y_val)],
verbose=50,
)
# 2. Isolation Forest (non supervisionato, addestrato solo su non-frodi)
print("Training Isolation Forest on clean claims...")
X_clean = X[y == 0]
X_scaled = self.scaler.fit_transform(X_clean)
self.iso_forest = IsolationForest(
n_estimators=200,
contamination=0.05, # stima del 5% di anomalie nel set pulito
random_state=42,
n_jobs=-1,
)
self.iso_forest.fit(X_scaled)
self._is_fitted = True
return self
def predict_fraud_score(
self,
X: pd.DataFrame,
graph_centrality_scores: Optional[Dict[str, float]] = None,
) -> np.ndarray:
"""
Calcola lo score di frode per ogni claim.
Args:
X: feature matrix
graph_centrality_scores: score di centralità dal grafo (opzionale)
Returns:
Array di score [0,1] dove 1 = massima probabilità di frode
"""
if not self._is_fitted:
raise RuntimeError("Ensemble non addestrato. Chiamare fit() prima.")
# Score XGBoost
xgb_scores = self.xgb_model.predict_proba(X)[:, 1]
# Score Isolation Forest (normalizzato a [0,1])
X_scaled = self.scaler.transform(X)
iso_raw = self.iso_forest.decision_function(X_scaled)
# score negativo = anomalia; normalizziamo invertendo e scalando
iso_scores = 1 - (iso_raw - iso_raw.min()) / (iso_raw.max() - iso_raw.min() + 1e-10)
# Graph centrality scores
if graph_centrality_scores and "graph_centrality" in X.columns:
graph_scores = X["graph_centrality"].values
else:
graph_scores = np.zeros(len(X))
# Ensemble ponderato
ensemble_score = (
self.ENSEMBLE_WEIGHTS["xgboost"] * xgb_scores +
self.ENSEMBLE_WEIGHTS["isolation_forest"] * iso_scores +
self.ENSEMBLE_WEIGHTS["graph_centrality"] * graph_scores
)
return np.clip(ensemble_score, 0, 1)
def classify_risk_tier(
self, scores: np.ndarray
) -> List[str]:
"""Classifica gli score in tier di rischio per routing."""
tiers = []
for score in scores:
if score < 0.2:
tiers.append("GREEN") # auto-approve
elif score < 0.4:
tiers.append("YELLOW") # standard review
elif score < 0.7:
tiers.append("ORANGE") # enhanced review
else:
tiers.append("RED") # SIU referral
return tiers
def get_feature_importance(self) -> pd.DataFrame:
"""Feature importance dal modello XGBoost."""
if self.xgb_model is None:
raise RuntimeError("Modello non addestrato.")
importances = self.xgb_model.feature_importances_
return pd.DataFrame({
"feature": self.feature_names,
"importance": importances,
}).sort_values("importance", ascending=False)
Kafka 및 Faust를 사용한 실시간 사기 채점
속도를 이용하는 사기 행위(예: 이미 손상된 차량에 대한 보험 가입) 또는 여러 회사에서 같은 날 여러 건의 청구가 접수된 경우 - 사기 점수는 반드시 에서 일어나다 실시간, FNOL 당시에는 야간 배치가 아닙니다. Apache Kafka 및 Faust(Python 스트림 처리)를 사용하면 실시간 파이프라인을 구축할 수 있습니다. 대기 시간은 1초 미만입니다.
import faust
from typing import Optional
import json
# Schema del messaggio FNOL in entrata
class FNOLEvent(faust.Record, serializer="json"):
claim_id: str
policy_id: str
claimant_id: str
claim_type: str
reported_amount: float
incident_date: str
report_date: str
third_party_ids: list
repair_shop_id: Optional[str] = None
attorney_id: Optional[str] = None
class FraudScoreResult(faust.Record, serializer="json"):
claim_id: str
fraud_score: float
risk_tier: str
fraud_ring_detected: bool
ring_id: Optional[str]
routing_decision: str
processing_time_ms: float
# Configurazione app Faust
app = faust.App(
"insurance-fraud-detector",
broker="kafka://kafka-broker:9092",
value_serializer="json",
)
fnol_topic = app.topic("insurance.fnol.events", value_type=FNOLEvent)
fraud_scores_topic = app.topic("insurance.fraud.scores", value_type=FraudScoreResult)
# Lazy loading del modello ensemble (caricato una volta all'avvio)
_fraud_ensemble = None
_feature_engineer = None
_graph_analyzer = None
def get_fraud_ensemble():
"""Lazy loading del modello per evitare load al startup."""
global _fraud_ensemble
if _fraud_ensemble is None:
import mlflow
_fraud_ensemble = mlflow.sklearn.load_model(
"models:/insurance-fraud-ensemble/Production"
)
return _fraud_ensemble
@app.agent(fnol_topic)
async def process_fnol_events(events):
"""
Agent Faust che processa ogni FNOL in real-time.
Per ogni evento:
1. Estrae le feature comportamentali
2. Calcola il fraud score ensemble
3. Verifica connessioni con fraud rings noti
4. Pubblica il risultato nel topic di output
"""
import time
async for event in events:
start_time = time.monotonic()
try:
# Costruisci le feature (da cache Redis o DB)
features = await _build_features_async(event)
# Calcola fraud score
ensemble = get_fraud_ensemble()
fraud_score = float(ensemble.predict_fraud_score(features)[0])
risk_tier = ensemble.classify_risk_tier([fraud_score])[0]
# Verifica connessioni con ring noti
ring_id, ring_detected = await _check_ring_connections(event)
# Aggiusta score se ring rilevato
if ring_detected:
fraud_score = min(1.0, fraud_score * 1.4)
risk_tier = "RED"
# Determina routing
routing = _routing_decision(risk_tier, ring_detected, event)
elapsed_ms = (time.monotonic() - start_time) * 1000
result = FraudScoreResult(
claim_id=event.claim_id,
fraud_score=round(fraud_score, 4),
risk_tier=risk_tier,
fraud_ring_detected=ring_detected,
ring_id=ring_id,
routing_decision=routing,
processing_time_ms=round(elapsed_ms, 2),
)
await fraud_scores_topic.send(key=event.claim_id, value=result)
except Exception as exc:
# Non bloccare lo stream per errori singoli
print(f"[ERROR] Claim {event.claim_id}: {exc}")
# Invia con score conservativo per revisione manuale
result = FraudScoreResult(
claim_id=event.claim_id,
fraud_score=0.5,
risk_tier="YELLOW",
fraud_ring_detected=False,
ring_id=None,
routing_decision="MANUAL_REVIEW_ERROR",
processing_time_ms=-1.0,
)
await fraud_scores_topic.send(key=event.claim_id, value=result)
async def _build_features_async(event: FNOLEvent) -> "pd.DataFrame":
"""Costruisce le feature in modo asincrono da Redis/DB."""
import pandas as pd
# In produzione: lookup asincrono su Redis per dati real-time
# e su DB per storico sinistri e policy
features = {
"reported_amount_log": 0.0,
"report_delay_days": 0.0,
"prior_claims_total": 0.0,
"n_third_parties": float(len(event.third_party_ids)),
"attorney_present": float(event.attorney_id is not None),
"attorney_and_medical": 0.0,
"incident_weekend": 0.0,
"round_amount": float(event.reported_amount % 1000 == 0),
"amount_percentile_for_type": 0.5,
"graph_centrality": 0.0,
}
return pd.DataFrame([features])
async def _check_ring_connections(event: FNOLEvent):
"""Verifica se il richiedente e connesso a ring fraudolenti noti."""
# In produzione: query su graph DB (Neo4j) o Redis per ring attivi
return None, False
def _routing_decision(
risk_tier: str, ring_detected: bool, event: FNOLEvent
) -> str:
if ring_detected or risk_tier == "RED":
return "SIU_REFERRAL"
elif risk_tier == "ORANGE":
return "ENHANCED_REVIEW"
elif risk_tier == "YELLOW":
return "STANDARD_REVIEW"
return "AUTO_APPROVE"
모범 사례 및 안티패턴
보험 사기 탐지 모범 사례
- 필수 앙상블: 항상 지도 모델(XGBoost), 비지도 모델(Isolation Forest) 및 네트워크 신호(그래프 분석)를 결합합니다. 모든 유형의 사기를 포괄하는 단일 접근 방식은 없습니다.
- 청구 유형별 임계값 보정: 자동차 사기에 대한 최적의 기준은 생명 보험이나 상해 보험과 동일하지 않습니다. 사업 부문별로 조정
- 필수 피드백 루프: SIU 조사 결과는 훈련 세트에 라벨로 포함되어야 합니다. 피드백 루프가 없으면 모델은 시간이 지나도 개선되지 않습니다.
- 실시간 연결을 위한 그래프 DB: 밀리초 연결 쿼리에는 Neo4j 또는 ArangoDB를 사용합니다. 기존 그래프(NetworkX)는 수백만 개의 노드 이상으로 확장되지 않습니다.
- 모든 결정을 문서화하세요. 각 사기 신고에는 점수를 결정하는 기능이 포함된 자세한 감사 추적이 있어야 합니다. 법적 조치를 취하려면 필수입니다.
피해야 할 안티패턴
- 높은 오탐률: 2%가 넘는 오탐률은 정직한 고객의 신뢰를 약화시키고 운영 비용을 발생시킵니다. 정밀도와 재현율을 모니터링합니다.
- 알려진 사기 행위에 대해서만 훈련된 모델: 사기가 진화하고 있습니다. 과거 패턴만 아는 모델은 새로운 패턴을 감지하지 못합니다. 격리 포레스트를 사용하여 예상치 못한 이상 현상을 포착합니다.
- 대리인에 의한 차별: 우편번호, 직업, 국적 등의 변수는 차별적 대리인이 될 수 있습니다. 배포하기 전에 서로 다른 영향을 테스트하세요.
- 모든 것에 대한 야간 일괄 채점: 빠른 사기(당일, 여러 회사)에는 실시간 점수가 필요합니다. 이러한 경우에는 야간 배치가 충분하지 않습니다.
결론 및 다음 단계
현대 보험 사기 탐지에는 다단계 접근 방식이 필요합니다. 개별 사기 점수를 위한 행동, 사기 네트워크에 대한 그래프 분석 빠른 사기를 위해 조직화되고 실시간 처리됩니다. 다양한 모델의 앙상블 사기 패턴에 대한 가능한 가장 광범위한 적용 범위를 보장합니다.
성공적인 시스템의 핵심은 SIU 결과를 통한 지속적인 피드백 루프, 정직한 고객에게 불이익을 주지 않도록 위양성률을 모니터링하고, 모든 법적 조치를 지원하는 불변의 감사 추적.
시리즈의 다음 기사에서 더 자세히 설명합니다. ACORD 표준 및 API 통합 보험: 서로 다른 보험 시스템 간의 상호 운용성을 구현하는 방법 표준 ACORD XML/JSON 메시지를 사용합니다.
InsurTech 엔지니어링 시리즈
- 01 - 개발자를 위한 보험 도메인: 제품, 행위자 및 데이터 모델
- 02 - 클라우드 네이티브 정책 관리: API 우선 아키텍처
- 03 - 텔레매틱스 파이프라인: 대규모 UBI 데이터 처리
- 04 - AI Underwriting: 기능 엔지니어링 및 위험 평가
- 05 - 청구 자동화: 컴퓨터 비전 및 NLP
- 06 - 사기 탐지: 그래프 분석 및 행동 신호(본 기사)
- 07 - ACORD 표준 및 보험 API 통합
- 08 - 규정 준수 엔지니어링: Solvency II 및 IFRS 17







