請求の自動化: 請求管理のためのコンピューター ビジョンと NLP
クレーム管理は伝統的に最も費用がかかり、顧客集中型のプロセスです 保険業界の。交通事故の解決には平均して8~15日かかりますが、 顧客と企業の間のタッチポイントは 4 ~ 7 つあり、書類は紙の形式で収集されます。 車両の物理的評価と、さまざまな部門間のバウンス。不満率は これは、請求段階にある顧客の数であり、会社とのやりとりの中で歴史的に最も多いものです。
人工知能はこのプロセスを根本から変革しています。業界実績 2025 年は並外れたものです: セグロス提督は 自動見積もりの 90% が完全に完了 タッチレス、評価の 98% が 15 分以内に完了しました。一部の企業 エンドツーエンドの自動化率を最大でレポートします 57%、平均時間は 標準的な自動車保険の請求では、和解時間が数週間から数時間に短縮されました。抽出の精度 ドキュメントからのデータは、 96%、人間のオペレーターの場合は 65% でした。
このガイドでは、デジタル管理から完全な請求自動化システムを構築します。 FNOL(First Notice of Loss)の、コンピュータビジョンによる損害評価、抽出まで 決済ワークフローのオーケストレーションに至るまで、NLP を使用した文書からの情報。
何を学ぶか
- エンドツーエンドの請求自動化システムのアーキテクチャ
- デジタル FNOL: 自動レポート受信とトリアージ
- 車両損傷推定のためのコンピューター ビジョン: CNN モデルとビジュアル トランスフォーマー
- 保険書類からのデータ抽出のための NLP: 医療報告書、警察報告書
- 高度な OCR によるレガシー文書のデジタル化
- 決済プロセスのワークフローオーケストレーション
- 請求自動化システムのモニタリング指標
請求自動化システムのアーキテクチャ
最新の請求自動化システムは、異なる責任を持つ異なるレイヤーで構成されています 定義されています。マイクロサービス アーキテクチャにより、コンポーネントは独立して拡張でき、 パイプライン全体に影響を与えることなく、個々のモデルを更新します。
アーキテクチャレイヤー
| レイヤー | コンポーネント | テクノロジー |
|---|---|---|
| 摂取 | FNOL取り込み、ドキュメントアップロード、APIゲートウェイ | FastAPI、S3/GCS、Kafka |
| 処理 | OCR、NLP抽出、コンピュータビジョン | Tesseract、spacy、PyTorch、ハグフェイス |
| 知能 | 被害推定、不正スコアリング、予備金計算 | YOLOv8、Detectron2、XGBoost |
| オーケストレーション | ワークフロー エンジン、SLA 管理、エスカレーション | Temporal.io、Airflow、ステートマシン |
| 出力 | 和解案、顧客とのコミュニケーション、監査ログ | Twilio、SendGrid、イベントストア |
FNOL デジタル: 知的損失に関する最初の通知
FNOL (損失の最初の通知) および顧客による請求の最初の報告。 従来、これは電話または直接会って行われていました。現代のシステムではアプリ経由で行われます モバイル、チャットボット、または Web ポータル。 AI が最初の瞬間から介入して、 クレームの種類に応じてその複雑さを評価し、正しいワークフローにルーティングします。
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import List, Optional, Dict
import uuid
class ClaimType(str, Enum):
AUTO_COLLISION = "auto_collision"
AUTO_THEFT = "auto_theft"
AUTO_WINDSHIELD = "auto_windshield"
PROPERTY_WATER = "property_water"
PROPERTY_FIRE = "property_fire"
LIABILITY = "liability"
HEALTH = "health"
UNKNOWN = "unknown"
class ClaimComplexity(str, Enum):
SIMPLE = "simple" # liquidazione automatica
STANDARD = "standard" # processo guidato
COMPLEX = "complex" # intervento umano
LITIGIOUS = "litigious" # legal team
@dataclass
class FNOLSubmission:
"""Rappresenta una segnalazione FNOL ricevuta dal cliente."""
policy_number: str
incident_date: datetime
incident_description: str
location: str
photos: List[str] = field(default_factory=list) # URL foto caricate
documents: List[str] = field(default_factory=list)
contact_phone: str = ""
contact_email: str = ""
third_parties_involved: bool = False
injuries_reported: bool = False
police_report_available: bool = False
claim_id: str = field(default_factory=lambda: str(uuid.uuid4()))
received_at: datetime = field(default_factory=datetime.now)
@dataclass
class FNOLAssessment:
"""Risultato dell'analisi AI della FNOL."""
claim_id: str
claim_type: ClaimType
complexity: ClaimComplexity
estimated_severity: str # low/medium/high
auto_settlement_eligible: bool
required_documents: List[str]
assigned_workflow: str
fraud_risk_score: float # 0-1
priority: int # 1=urgente, 5=normale
routing_notes: str = ""
class FNOLTriageService:
"""
Servizio di triaging automatico per segnalazioni FNOL.
Combina regole di business e modelli ML per classificare
ogni sinistro e assegnarlo al workflow appropriato.
"""
# Keyword per classificazione tipo sinistro (semplificata)
CLAIM_TYPE_KEYWORDS: Dict[ClaimType, List[str]] = {
ClaimType.AUTO_COLLISION: [
"incidente", "scontro", "tamponamento", "urto",
"collision", "crash", "accident"
],
ClaimType.AUTO_THEFT: [
"furto", "rubato", "scomparso", "theft", "stolen"
],
ClaimType.AUTO_WINDSHIELD: [
"parabrezza", "vetro", "windshield", "cristallo"
],
ClaimType.PROPERTY_WATER: [
"allagamento", "perdita", "infiltrazione", "acqua",
"flood", "water", "leak"
],
ClaimType.PROPERTY_FIRE: [
"incendio", "fuoco", "fiamme", "fire", "burn"
],
}
REQUIRED_DOCS: Dict[ClaimType, List[str]] = {
ClaimType.AUTO_COLLISION: [
"CID/CAI o rapporto polizia",
"foto veicolo (4+ lati)",
"documento identità",
"patente di guida",
"carta di circolazione",
],
ClaimType.AUTO_THEFT: [
"denuncia polizia (entro 48h)",
"documento identità",
"carte del veicolo",
"chiavi originali",
],
ClaimType.PROPERTY_WATER: [
"foto danni",
"intervento idraulico (se disponibile)",
"preventivo riparazione",
],
}
def triage(
self, fnol: FNOLSubmission, fraud_score: float = 0.0
) -> FNOLAssessment:
"""Esegue il triaging automatico della segnalazione FNOL."""
claim_type = self._classify_type(fnol.incident_description)
complexity = self._assess_complexity(fnol, fraud_score)
severity = self._estimate_severity(fnol, claim_type)
auto_eligible = self._is_auto_settlement_eligible(fnol, complexity, fraud_score)
workflow = self._assign_workflow(claim_type, complexity)
priority = self._calculate_priority(fnol, complexity, fraud_score)
return FNOLAssessment(
claim_id=fnol.claim_id,
claim_type=claim_type,
complexity=complexity,
estimated_severity=severity,
auto_settlement_eligible=auto_eligible,
required_documents=self.REQUIRED_DOCS.get(claim_type, [
"documento identità",
"foto danni",
"preventivo riparazione",
]),
assigned_workflow=workflow,
fraud_risk_score=fraud_score,
priority=priority,
routing_notes=self._build_routing_notes(
fnol, complexity, fraud_score
),
)
def _classify_type(self, description: str) -> ClaimType:
"""Classifica il tipo di sinistro dalla descrizione testuale."""
desc_lower = description.lower()
scores: Dict[ClaimType, int] = {}
for claim_type, keywords in self.CLAIM_TYPE_KEYWORDS.items():
score = sum(1 for kw in keywords if kw in desc_lower)
if score > 0:
scores[claim_type] = score
if not scores:
return ClaimType.UNKNOWN
return max(scores, key=lambda k: scores[k])
def _assess_complexity(
self, fnol: FNOLSubmission, fraud_score: float
) -> ClaimComplexity:
"""Determina la complessità del sinistro."""
if fnol.injuries_reported:
return ClaimComplexity.LITIGIOUS
if fraud_score > 0.7:
return ClaimComplexity.COMPLEX
if fnol.third_parties_involved and not fnol.police_report_available:
return ClaimComplexity.COMPLEX
if fnol.third_parties_involved or fraud_score > 0.4:
return ClaimComplexity.STANDARD
return ClaimComplexity.SIMPLE
def _estimate_severity(
self, fnol: FNOLSubmission, claim_type: ClaimType
) -> str:
"""Stima la severita economica (low/medium/high)."""
if fnol.injuries_reported:
return "high"
if claim_type == ClaimType.AUTO_THEFT:
return "high"
if fnol.third_parties_involved:
return "medium"
return "low"
def _is_auto_settlement_eligible(
self,
fnol: FNOLSubmission,
complexity: ClaimComplexity,
fraud_score: float,
) -> bool:
"""Verifica l'eleggibilita per liquidazione automatica."""
if complexity not in [ClaimComplexity.SIMPLE, ClaimComplexity.STANDARD]:
return False
if fraud_score > 0.3:
return False
if fnol.injuries_reported or fnol.third_parties_involved:
return False
if len(fnol.photos) < 2:
return False
return True
def _assign_workflow(
self, claim_type: ClaimType, complexity: ClaimComplexity
) -> str:
workflow_map = {
(ClaimType.AUTO_COLLISION, ClaimComplexity.SIMPLE): "auto_collision_fast_track",
(ClaimType.AUTO_COLLISION, ClaimComplexity.STANDARD): "auto_collision_standard",
(ClaimType.AUTO_COLLISION, ClaimComplexity.COMPLEX): "auto_collision_manual",
(ClaimType.AUTO_THEFT, ClaimComplexity.SIMPLE): "auto_theft_standard",
(ClaimType.AUTO_WINDSHIELD, ClaimComplexity.SIMPLE): "windshield_auto",
}
return workflow_map.get(
(claim_type, complexity),
f"generic_{complexity.value}_workflow"
)
def _calculate_priority(
self,
fnol: FNOLSubmission,
complexity: ClaimComplexity,
fraud_score: float,
) -> int:
if fnol.injuries_reported:
return 1 # massima priorità
if complexity == ClaimComplexity.LITIGIOUS:
return 1
if fraud_score > 0.7:
return 2 # priorità alta per indagine fraud
if complexity == ClaimComplexity.COMPLEX:
return 3
return 5 # normale
def _build_routing_notes(
self,
fnol: FNOLSubmission,
complexity: ClaimComplexity,
fraud_score: float,
) -> str:
notes = []
if fnol.injuries_reported:
notes.append("ATTENZIONE: lesioni personali dichiarate - escalation legal/medical obbligatoria")
if fraud_score > 0.5:
notes.append(f"Fraud score elevato ({fraud_score:.2f}) - revisione SIU raccomandata")
if not fnol.photos:
notes.append("Nessuna foto allegata - richiedere al cliente prima di procedere")
return "; ".join(notes) if notes else "Nessuna nota particolare"
自動車の損傷推定のためのコンピュータビジョン
自動損害推定は、保険金請求の自動化において最も影響力のある AI コンポーネントです。 顧客がスマートフォンで破損車両を撮影し、システムが画像を解析 損傷した部品を特定し、必要な介入の種類を推定します (修理か 交換)、更新された価格データベースに基づいてコストの見積もりを計算します。
業界で最も使用されているモデルを組み合わせたもの 物体検出 (特定する 破損箇所あり) 被害の程度の分類 (エンティティを分類する 軽微な損傷から全体的な損傷まで)。市場リーダーであるTractableなどのアプローチがこれを証明しています。 これらのシステムは、人間の専門鑑定士の精度と同等かそれを超えることができるということです。
import torch
import torchvision.transforms as T
from torchvision.models import resnet50, ResNet50_Weights
import numpy as np
from PIL import Image
from typing import Dict, List, Tuple
from dataclasses import dataclass
import io
@dataclass
class DamageRegion:
"""Regione di danno identificata nell'immagine."""
part_name: str # es. "bumper_front", "door_left"
damage_type: str # scratch, dent, crack, broken
severity: str # minor, moderate, severe, total_loss
confidence: float # 0-1
repair_vs_replace: str # "repair" o "replace"
estimated_cost_eur: float
@dataclass
class VehicleDamageAssessment:
"""Risultato completo dell'analisi danni veicolo."""
claim_id: str
images_analyzed: int
damage_regions: List[DamageRegion]
total_estimated_cost: float
total_loss_likelihood: float # 0-1
settlement_recommendation: str
confidence_overall: float
requires_physical_inspection: bool
assessment_notes: str
class VehicleDamageClassifier:
"""
Classificatore di danni veicolo basato su transfer learning.
Architettura: ResNet-50 fine-tuned su dataset proprietario di danni auto
Output: classificazione per parte + severita + tipo danno
In produzione: usare modelli specializzati come quelli di Tractable,
Mitchell, o modelli addestrati su dataset interni.
"""
# Parti veicolo monitorabili
VEHICLE_PARTS = [
"bumper_front", "bumper_rear",
"hood", "trunk",
"door_front_left", "door_front_right",
"door_rear_left", "door_rear_right",
"fender_front_left", "fender_front_right",
"windshield_front", "windshield_rear",
"headlight_left", "headlight_right",
"mirror_left", "mirror_right",
"roof", "pillar",
]
# Tabella costi di riferimento (EUR, valori indicativi 2025)
REPAIR_COST_TABLE: Dict[str, Dict[str, float]] = {
"bumper_front": {"minor": 200, "moderate": 600, "severe": 1200, "replace": 900},
"bumper_rear": {"minor": 200, "moderate": 550, "severe": 1100, "replace": 850},
"hood": {"minor": 300, "moderate": 800, "severe": 1500, "replace": 1200},
"door_front_left": {"minor": 250, "moderate": 700, "severe": 1400, "replace": 1100},
"door_front_right": {"minor": 250, "moderate": 700, "severe": 1400, "replace": 1100},
"windshield_front": {"minor": 0, "moderate": 350, "severe": 600, "replace": 450},
"headlight_left": {"minor": 80, "moderate": 200, "severe": 400, "replace": 350},
"headlight_right": {"minor": 80, "moderate": 200, "severe": 400, "replace": 350},
}
DEFAULT_COST = {"minor": 150, "moderate": 450, "severe": 900, "replace": 700}
def __init__(self, model_path: str = "damage_classifier.pt") -> None:
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = self._load_model(model_path)
self.transform = T.Compose([
T.Resize((224, 224)),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
def _load_model(self, model_path: str) -> torch.nn.Module:
"""Carica il modello fine-tuned da file o usa il pretrained come fallback."""
try:
model = resnet50(weights=None)
# Adatta la testa di classificazione al numero di parti*severita
num_classes = len(self.VEHICLE_PARTS) * 4 # 4 livelli severita
model.fc = torch.nn.Linear(model.fc.in_features, num_classes)
state = torch.load(model_path, map_location=self.device)
model.load_state_dict(state)
print(f"Modello caricato da {model_path}")
except FileNotFoundError:
print("Modello fine-tuned non trovato, uso pretrained ResNet50 (demo only)")
model = resnet50(weights=ResNet50_Weights.IMAGENET1K_V2)
model.eval()
return model.to(self.device)
def analyze_image(self, image_bytes: bytes) -> List[Tuple[str, str, float]]:
"""
Analizza una singola immagine.
Returns: lista di (part_name, severity, confidence)
"""
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
tensor = self.transform(image).unsqueeze(0).to(self.device)
with torch.no_grad():
logits = self.model(tensor)
probs = torch.softmax(logits, dim=1).cpu().numpy()[0]
# Soglia: riporta solo danni con confidence > 30%
results = []
threshold = 0.30
for i, part in enumerate(self.VEHICLE_PARTS):
severities = ["minor", "moderate", "severe", "total_loss"]
for j, sev in enumerate(severities):
idx = i * 4 + j
if idx < len(probs) and probs[idx] > threshold:
results.append((part, sev, float(probs[idx])))
return sorted(results, key=lambda x: x[2], reverse=True)
def estimate_repair_cost(self, part: str, severity: str) -> Tuple[float, str]:
"""
Stima il costo di riparazione e determina repair vs replace.
Returns: (cost_eur, repair_or_replace)
"""
cost_table = self.REPAIR_COST_TABLE.get(part, self.DEFAULT_COST)
if severity in ["severe", "total_loss"]:
replace_cost = cost_table.get("replace", 700)
repair_cost = cost_table.get("severe", 900)
if replace_cost < repair_cost * 0.8:
return replace_cost, "replace"
return repair_cost, "repair"
repair_cost = cost_table.get(severity, 150)
return repair_cost, "repair"
def assess_multiple_images(
self, claim_id: str, image_bytes_list: List[bytes]
) -> VehicleDamageAssessment:
"""
Valutazione completa su più immagini del veicolo.
Aggrega i risultati di tutte le foto per una stima robusta.
"""
# Raccoglie rilevazioni da tutte le immagini
all_detections: Dict[str, List[Tuple[str, float]]] = {}
for img_bytes in image_bytes_list:
detections = self.analyze_image(img_bytes)
for part, severity, confidence in detections:
if part not in all_detections:
all_detections[part] = []
all_detections[part].append((severity, confidence))
# Consolida: per ogni parte prende la severita più alta con confidence media
damage_regions: List[DamageRegion] = []
total_cost = 0.0
severity_order = {"minor": 0, "moderate": 1, "severe": 2, "total_loss": 3}
for part, severity_confidences in all_detections.items():
# Prendi la severita più alta rilevata
max_sev = max(severity_confidences, key=lambda x: severity_order.get(x[0], 0))
severity, _ = max_sev
avg_confidence = np.mean([c for _, c in severity_confidences])
cost, repair_replace = self.estimate_repair_cost(part, severity)
total_cost += cost
damage_regions.append(DamageRegion(
part_name=part,
damage_type="structural" if severity in ["severe", "total_loss"] else "cosmetic",
severity=severity,
confidence=round(float(avg_confidence), 3),
repair_vs_replace=repair_replace,
estimated_cost_eur=cost,
))
total_loss_likelihood = self._estimate_total_loss(damage_regions, total_cost)
requires_inspection = (
total_loss_likelihood > 0.5 or
total_cost > 8000 or
any(d.severity == "severe" and "windshield" in d.part_name
for d in damage_regions)
)
return VehicleDamageAssessment(
claim_id=claim_id,
images_analyzed=len(image_bytes_list),
damage_regions=damage_regions,
total_estimated_cost=round(total_cost, 2),
total_loss_likelihood=round(total_loss_likelihood, 3),
settlement_recommendation=self._settlement_recommendation(
total_cost, total_loss_likelihood
),
confidence_overall=round(
float(np.mean([d.confidence for d in damage_regions])) if damage_regions else 0.0,
3
),
requires_physical_inspection=requires_inspection,
assessment_notes=self._build_notes(damage_regions, total_loss_likelihood),
)
def _estimate_total_loss(
self, regions: List[DamageRegion], total_cost: float
) -> float:
"""Stima la probabilità di perdita totale."""
if total_cost > 15000:
return 0.95
if total_cost > 10000:
return 0.70
severe_count = sum(1 for r in regions if r.severity in ["severe", "total_loss"])
if severe_count >= 4:
return 0.80
if severe_count >= 2:
return 0.40
return max(0.0, (total_cost - 5000) / 10000) if total_cost > 5000 else 0.05
def _settlement_recommendation(self, cost: float, total_loss_prob: float) -> str:
if total_loss_prob > 0.7:
return "TOTAL_LOSS_SETTLEMENT"
if cost > 8000:
return "HIGH_VALUE_REPAIR_AUTHORIZATION"
if cost > 3000:
return "STANDARD_REPAIR_AUTHORIZATION"
return "FAST_TRACK_SETTLEMENT"
def _build_notes(
self, regions: List[DamageRegion], total_loss_prob: float
) -> str:
notes = []
if total_loss_prob > 0.5:
notes.append("Elevata probabilità di perdita totale - verificare valore veicolo")
severe = [r.part_name for r in regions if r.severity == "severe"]
if severe:
notes.append(f"Danni severi rilevati su: {', '.join(severe)}")
return "; ".join(notes) if notes else "Assessment completato senza anomalie"
文書から情報を抽出するための NLP
それぞれの事故では、警察の報告書、医療報告書、見積書など、数十の文書が生成されます。 ワークショップ、目撃者の陳述、CID/CAI レポート。これらの文書の手動処理 遅く(1 ~ 3 日)、エラーが発生しやすくなります。最新の NLP システムと高度な OCR を組み合わせた、 構造化された情報を 90 ~ 96% の精度で自動的に抽出します。
import spacy
import re
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, date
from enum import Enum
class DocumentType(str, Enum):
POLICE_REPORT = "police_report"
MEDICAL_REPORT = "medical_report"
REPAIR_ESTIMATE = "repair_estimate"
CID_CAI = "cid_cai" # Constatazione Amichevole di Incidente
WITNESS_STATEMENT = "witness_statement"
INVOICE = "invoice"
UNKNOWN = "unknown"
@dataclass
class ExtractedEntity:
"""Entità estratta da un documento."""
entity_type: str
value: str
confidence: float
source_text: str # testo originale da cui e stata estratta
position: Tuple[int, int] # start, end nel documento
@dataclass
class DocumentExtraction:
"""Risultato dell'estrazione NLP da un documento assicurativo."""
document_type: DocumentType
document_date: Optional[date]
entities: List[ExtractedEntity]
structured_data: Dict
extraction_confidence: float
raw_text: str
class InsuranceDocumentExtractor:
"""
Estrattore NLP per documenti assicurativi.
Combina:
- spaCy per NER (Named Entity Recognition)
- Regex per pattern specifici del dominio assicurativo
- Regole di business per estrazione strutturata
"""
# Pattern regex per entità assicurative italiane
PATTERNS: Dict[str, str] = {
"targa": r"\b[A-Z]{2}\d{3}[A-Z]{2}\b|\b[A-Z]{2}\d{5}\b",
"polizza": r"(?:polizza|n\.\s*pol\.?)\s*[:\.]?\s*([A-Z0-9/-]{6,20})",
"codice_fiscale": r"\b[A-Z]{6}\d{2}[A-Z]\d{2}[A-Z]\d{3}[A-Z]\b",
"partita_iva": r"\b(?:IT)?\d{11}\b",
"euro_amount": r"(?:EUR?|€)\s*[\d.,]{1,10}|[\d.,]{1,10}\s*(?:EUR?|€)",
"data": r"\b\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4}\b",
"phone": r"(?:\+39|0039)?\s*(?:\d{2,4}[\s\-]?){2,4}\d{4}",
"email": r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,6}",
}
def __init__(self, spacy_model: str = "it_core_news_lg") -> None:
try:
self.nlp = spacy.load(spacy_model)
except OSError:
print(f"Modello spaCy '{spacy_model}' non trovato - installa con:")
print(f"python -m spacy download {spacy_model}")
self.nlp = None
def classify_document(self, text: str) -> DocumentType:
"""Classifica il tipo di documento basandosi sul contenuto."""
text_lower = text.lower()
classification_rules = [
(DocumentType.POLICE_REPORT, ["verbale", "polizia stradale", "carabinieri", "codice della strada", "accertamento"]),
(DocumentType.MEDICAL_REPORT, ["diagnosi", "prognosi", "lesioni", "ospedale", "pronto soccorso", "medico"]),
(DocumentType.CID_CAI, ["constatazione amichevole", "cid", "cai", "modulo blu"]),
(DocumentType.REPAIR_ESTIMATE, ["preventivo", "officina", "carrozzeria", "ricambi", "manodopera"]),
(DocumentType.INVOICE, ["fattura", "ricevuta", "importo totale", "iva", "imponibile"]),
]
scores: Dict[DocumentType, int] = {}
for doc_type, keywords in classification_rules:
score = sum(1 for kw in keywords if kw in text_lower)
if score > 0:
scores[doc_type] = score
return max(scores, key=lambda k: scores[k]) if scores else DocumentType.UNKNOWN
def extract(self, raw_text: str) -> DocumentExtraction:
"""Estrae tutte le informazioni rilevanti dal documento."""
doc_type = self.classify_document(raw_text)
entities = self._extract_entities(raw_text)
structured = self._build_structured_data(entities, doc_type, raw_text)
confidence = self._calculate_confidence(entities, doc_type)
doc_date = self._extract_document_date(entities)
return DocumentExtraction(
document_type=doc_type,
document_date=doc_date,
entities=entities,
structured_data=structured,
extraction_confidence=confidence,
raw_text=raw_text,
)
def _extract_entities(self, text: str) -> List[ExtractedEntity]:
"""Estrae entità con regex e NER spaCy."""
entities: List[ExtractedEntity] = []
# Estrazione con regex
for entity_type, pattern in self.PATTERNS.items():
for match in re.finditer(pattern, text, re.IGNORECASE):
entities.append(ExtractedEntity(
entity_type=entity_type,
value=match.group().strip(),
confidence=0.85, # alta confidence per regex su formato noto
source_text=text[max(0, match.start()-20):match.end()+20],
position=(match.start(), match.end()),
))
# Estrazione NER con spaCy
if self.nlp:
doc = self.nlp(text[:100000]) # limit per performance
for ent in doc.ents:
if ent.label_ in ["PER", "ORG", "LOC", "DATE", "MONEY"]:
entities.append(ExtractedEntity(
entity_type=f"spacy_{ent.label_.lower()}",
value=ent.text.strip(),
confidence=0.75,
source_text=text[max(0, ent.start_char-20):ent.end_char+20],
position=(ent.start_char, ent.end_char),
))
return entities
def _build_structured_data(
self,
entities: List[ExtractedEntity],
doc_type: DocumentType,
text: str,
) -> Dict:
"""Costruisce un dizionario strutturato dal documento."""
structured: Dict = {"document_type": doc_type.value}
# Raggruppa entità per tipo
by_type: Dict[str, List[str]] = {}
for ent in entities:
by_type.setdefault(ent.entity_type, []).append(ent.value)
# Mappa entità in campi strutturati
if "targa" in by_type:
structured["vehicle_plates"] = list(set(by_type["targa"]))
if "polizza" in by_type:
structured["policy_numbers"] = list(set(by_type["polizza"]))
if "euro_amount" in by_type:
amounts = []
for amt_str in by_type["euro_amount"]:
clean = re.sub(r"[^\d,.]", "", amt_str).replace(",", ".")
try:
amounts.append(float(clean))
except ValueError:
pass
structured["amounts_eur"] = sorted(amounts)
structured["max_amount_eur"] = max(amounts) if amounts else 0
if "spacy_per" in by_type:
structured["persons_mentioned"] = list(set(by_type["spacy_per"]))
if "spacy_loc" in by_type:
structured["locations_mentioned"] = list(set(by_type["spacy_loc"]))
return structured
def _calculate_confidence(
self, entities: List[ExtractedEntity], doc_type: DocumentType
) -> float:
if not entities:
return 0.1
avg = float(sum(e.confidence for e in entities) / len(entities))
# Penalizza se il tipo e UNKNOWN
if doc_type == DocumentType.UNKNOWN:
avg *= 0.7
return round(min(avg, 1.0), 3)
def _extract_document_date(
self, entities: List[ExtractedEntity]
) -> Optional[date]:
date_entities = [e for e in entities if e.entity_type == "data"]
for de in date_entities:
for fmt in ["%d/%m/%Y", "%d-%m-%Y", "%d.%m.%Y", "%d/%m/%y"]:
try:
return datetime.strptime(de.value, fmt).date()
except ValueError:
continue
return None
決済ワークフローのオーケストレーション
オーケストレーションはシステムの頭脳であり、すべてのコンポーネント (FNOL トリアージ、損傷) を調整します。 評価、文書抽出、不正スコアリング)および請求のステータス遷移を管理します 時間の経過とともに、契約上の SLA とビジネス エスカレーション ルールを尊重します。
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Callable
import asyncio
class ClaimStatus(str, Enum):
RECEIVED = "received"
TRIAGED = "triaged"
DOCUMENTS_PENDING = "documents_pending"
ASSESSMENT_IN_PROGRESS = "assessment_in_progress"
FRAUD_REVIEW = "fraud_review"
SETTLEMENT_OFFERED = "settlement_offered"
SETTLEMENT_ACCEPTED = "settlement_accepted"
SETTLEMENT_DISPUTED = "settlement_disputed"
MANUAL_REVIEW = "manual_review"
CLOSED = "closed"
REJECTED = "rejected"
@dataclass
class ClaimWorkflowState:
"""Stato corrente del workflow di un sinistro."""
claim_id: str
status: ClaimStatus
created_at: datetime
updated_at: datetime
assigned_to: Optional[str] = None # handler umano o "auto"
sla_deadline: Optional[datetime] = None
settlement_amount: Optional[float] = None
status_history: List[Dict] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
def transition_to(self, new_status: ClaimStatus, note: str = "") -> None:
"""Transizione di stato con audit trail."""
self.status_history.append({
"from": self.status.value,
"to": new_status.value,
"timestamp": datetime.now().isoformat(),
"note": note,
})
self.status = new_status
self.updated_at = datetime.now()
if note:
self.notes.append(f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] {note}")
class ClaimsWorkflowEngine:
"""
Engine di orchestrazione per il workflow sinistri.
Gestisce le transizioni di stato, gli SLA e le escalation.
In produzione: usare Temporal.io o AWS Step Functions
per la durabilita e il recovery dei workflow.
"""
SLA_HOURS: Dict[str, int] = {
"simple": 4, # 4 ore per sinistri semplici
"standard": 24, # 24 ore per sinistri standard
"complex": 72, # 72 ore per sinistri complessi
"litigious": 168, # 7 giorni per sinistri con contenziosi
}
async def process_claim(
self,
claim_state: ClaimWorkflowState,
fnol: "FNOLSubmission",
triage: "FNOLAssessment",
damage_assessment: Optional["VehicleDamageAssessment"] = None,
doc_extractions: Optional[List["DocumentExtraction"]] = None,
) -> ClaimWorkflowState:
"""Processa un sinistro attraverso il workflow completo."""
# Step 1: Triage e routing
claim_state.transition_to(
ClaimStatus.TRIAGED,
f"Tipo: {triage.claim_type.value}, Complessità: {triage.complexity.value}"
)
sla_hours = self.SLA_HOURS.get(triage.complexity.value, 72)
claim_state.sla_deadline = datetime.now() + timedelta(hours=sla_hours)
# Step 2: Check documenti
if not fnol.documents and not fnol.photos:
claim_state.transition_to(
ClaimStatus.DOCUMENTS_PENDING,
f"Documenti mancanti: {', '.join(triage.required_documents)}"
)
await self._notify_customer_documents_needed(claim_state, triage)
return claim_state
# Step 3: Fraud check
if triage.fraud_risk_score > 0.5:
claim_state.transition_to(
ClaimStatus.FRAUD_REVIEW,
f"Fraud score: {triage.fraud_risk_score:.2f} - invio a SIU"
)
await self._escalate_to_siu(claim_state, triage)
return claim_state
# Step 4: Settlement automatico se eleggibile
if triage.auto_settlement_eligible and damage_assessment:
settlement = damage_assessment.total_estimated_cost
claim_state.settlement_amount = settlement
claim_state.transition_to(
ClaimStatus.SETTLEMENT_OFFERED,
f"Offerta automatica: EUR {settlement:.2f}"
)
await self._send_settlement_offer(claim_state, settlement)
return claim_state
# Step 5: Review manuale
claim_state.transition_to(
ClaimStatus.MANUAL_REVIEW,
"Invio a handler umano per valutazione"
)
await self._assign_human_handler(claim_state, triage)
return claim_state
async def _notify_customer_documents_needed(
self,
state: ClaimWorkflowState,
triage: "FNOLAssessment",
) -> None:
"""Notifica il cliente dei documenti mancanti."""
# In produzione: integrazione con Twilio SMS/WhatsApp o SendGrid
print(f"[NOTIFY] Sinistro {state.claim_id}: richiedere documenti al cliente")
print(f" Documenti necessari: {triage.required_documents}")
async def _escalate_to_siu(
self,
state: ClaimWorkflowState,
triage: "FNOLAssessment",
) -> None:
"""Escalation all'unita investigativa speciale (SIU)."""
print(f"[SIU] Sinistro {state.claim_id} flaggato per fraud review")
print(f" Fraud score: {triage.fraud_risk_score:.2f}")
state.assigned_to = "siu_team"
async def _send_settlement_offer(
self, state: ClaimWorkflowState, amount: float
) -> None:
"""Invia offerta di liquidazione automatica al cliente."""
print(f"[SETTLEMENT] Sinistro {state.claim_id}: offerta EUR {amount:.2f}")
print(f" Deadline accettazione: 10 giorni")
async def _assign_human_handler(
self,
state: ClaimWorkflowState,
triage: "FNOLAssessment",
) -> None:
"""Assegna il sinistro a un handler umano in base alla complessità."""
handler_map = {
"litigious": "legal_team",
"complex": "senior_adjuster",
"standard": "adjuster_pool",
}
state.assigned_to = handler_map.get(triage.complexity.value, "adjuster_pool")
print(f"[ASSIGN] Sinistro {state.claim_id} assegnato a: {state.assigned_to}")
請求自動化の指標のモニタリング
請求自動化システムのパフォーマンスを監視するには、次のような指標が必要です。 単純な ML メトリクスを超えています。ビジネスと運用の指標も同様に重要です システムが実際に顧客エクスペリエンスを向上させているかどうかを確認するため、 運営の収益性。
請求自動化システムのKPI
| メトリック | 意味 | ターゲット |
|---|---|---|
| 自動化率 | 人間の介入なしに完了した請求の割合 | > 50% |
| タッチレスクレーム率 | 身体検査を行わない自動車事故の割合 | > 70% (単純な車) |
| 平均決済時間 | FNOL 時間 - 自動車事故の解決 | 24 時間未満 (ファストトラック) |
| 文書抽出の精度 | % フィールドが正しく抽出されました | > 90% |
| ダメージ推定の偏差 | AI 見積もりと最終コスト間の偏差 (%) | < 15% |
| 誤検知詐欺率 | 不正行為としてフラグが立てられた正当な申し立ての割合 | < 2% |
| 顧客満足度 (NPS) | 損失後のNPS | > +30 (従来の +10 と比較) |
ベストプラクティスとアンチパターン
請求の自動化のベスト プラクティス
- フロントガラスから始めます。 完全自動化の最も単純な使用例 - 明確に目に見える損傷、標準化されたコスト、第三者の関与なし
- 人身傷害に対する人的介入の義務化: 人身傷害請求を完全に自動化しないでください。人間の担当者へのエスカレーションは必須です
- 常に少なくとも 4 枚の写真を要求してください。 前部、後部、左側、右側。さらにオドメーターとナンバープレート。写真が不十分だと精度が大幅に低下します
- 上流の不正スコアリング モデルを統合します。 不正請求の処理を避けるために、不正チェックは損害見積りの後ではなく、損害見積りの前に行う必要があります。
- 不変の監査証跡: 各システムの決定(自動または手動)は、タイムスタンプ、モデルのバージョン、および紛争の入力値とともに記録する必要があります
避けるべきアンチパターン
- セーフティネットのない自動化: 常に最小信頼しきい値を実装します。そのしきい値を下回ると、申請は自動的に人間による審査に進む必要があります
- 地理的検証を行わない推定: 修理価格は地域によって大きく異なります。全国平均料金に基づいた見積もりは、地域の現実から大きくかけ離れている可能性があります
- 写真の品質を無視する: ぼやけた写真、照明が不十分な写真、または部分的な写真は精度を大幅に低下させます。加工前に品質チェックを実施する
- 和解案が速すぎる: 顧客が損害の完全性を評価する前に和解を提案すると、その後の紛争で多額の費用がかかる可能性があります
結論と次のステップ
Computer Vision と NLP を使用した請求の自動化は、ROI を備えた AI アプリケーションの 1 つです 保険部門で最高: 営業コストを 40 ~ 60% 削減し、改善 顧客満足度を高め、標準的なケースでは決済時間を数週間から数時間に短縮します。
成功の鍵は段階的なアプローチです。最も単純なケース (フロントガラス、フロントガラス、 小さな外観上の損傷)、人間の鑑定士と比較してシステムの精度を正確に測定し、 常に自動化を維持しながら、より複雑なケースに自動化を段階的に拡張します。 堅牢なヒューマンエスカレーションメカニズム。
シリーズの次の記事では、以下について詳しく説明します。 保険詐欺の検出: グラフ分析を組み合わせて不正なネットワークと行動シグナルを特定する方法 クレーム内の異常なパターンを検出します。
インシュアテックエンジニアリングシリーズ
- 01 - 開発者向けの保険ドメイン: 製品、アクター、データ モデル
- 02 - クラウドネイティブのポリシー管理: API ファーストのアーキテクチャ
- 03 - テレマティクス パイプライン: 大規模な UBI データ処理
- 04 - AI 引受業務: 特徴エンジニアリングとリスク スコアリング
- 05 - 請求の自動化: コンピューター ビジョンと NLP (この記事)
- 06 - 不正行為の検出: グラフ分析と行動シグナル
- 07 - ACORD Standard と保険 API の統合
- 08 - コンプライアンス エンジニアリング: ソルベンシー II および IFRS 17







