청구 자동화: 청구 관리를 위한 컴퓨터 비전 및 NLP
청구 관리는 전통적으로 가장 비용이 많이 들고 고객 집약적인 프로세스입니다. 보험업계의. 보통 교통사고 처리에는 8~15일이 소요되며, 고객과 회사 사이의 4~7개 접점으로, 문서는 종이 형식으로 수집되며, 차량에 대한 물리적 평가 및 여러 부서 간의 반송. 불만족 비율 클레임 단계의 고객이며 역사적으로 회사와의 상호 작용 중 가장 높은 수준입니다.
인공지능은 이 프로세스를 근본적으로 변화시키고 있습니다. 업계 실적 2025년은 특별합니다. Seguros 제독은 자동 견적의 90% 완전 터치리스, 평가의 98%가 15분 이내에 완료되었습니다. 일부 회사 엔드투엔드 자동화 속도를 최대 보고합니다. 57%, 평균 시간은 표준 자동 청구의 정산 시간이 몇 주에서 몇 시간으로 단축되었습니다. 추출의 정확성 문서의 데이터가 96%, 인간 조작자의 경우 65%와 비교됩니다.
이 가이드는 디지털 관리부터 완전한 청구 자동화 시스템을 구축합니다. FNOL(First Notice of Loss), 컴퓨터 비전을 통한 피해 평가, 추출까지 NLP가 포함된 문서의 정보부터 정산 워크플로우 조정까지.
무엇을 배울 것인가
- 엔드투엔드 청구 자동화 시스템 아키텍처
- 디지털 FNOL: 자동 보고 수신 및 분류
- 차량 손상 추정을 위한 컴퓨터 비전: CNN 모델 및 시각적 변환기
- 보험 문서에서 데이터 추출을 위한 NLP: 의료 보고서, 경찰 보고서
- 레거시 문서를 디지털화하기 위한 고급 OCR
- 정산 프로세스를 위한 워크플로 조정
- 청구 자동화 시스템에 대한 지표 모니터링
청구 자동화 시스템 아키텍처
최신 청구 자동화 시스템은 뚜렷한 책임을 지닌 개별 계층으로 구성됩니다. 정의되었습니다. 마이크로서비스 아키텍처를 사용하면 구성 요소를 독립적으로 확장하고 전체 파이프라인에 영향을 주지 않고 개별 모델을 업데이트합니다.
아키텍처 레이어
| 레이어 | 구성요소 | 기술 |
|---|---|---|
| 음식물 섭취 | FNOL 접수, 문서 업로드, API 게이트웨이 | FastAPI, S3/GCS, 카프카 |
| 처리 | OCR, NLP 추출, 컴퓨터 비전 | Tesseract, spaCy, PyTorch, 허깅 페이스 |
| 지능 | 피해 추정, 사기 점수, 적립금 계산 | YOLOv8, Detectron2, XGBoost |
| 관현악법 | 워크플로우 엔진, SLA 관리, 에스컬레이션 | Temporal.io, Airflow, 상태 머신 |
| 출력 | 정산 제안, 고객 커뮤니케이션, 감사 로그 | Twilio, SendGrid, EventStore |
FNOL Digital: 지능형 손실에 대한 첫 번째 알림
FNOL(첫 번째 손실 통지) 및 고객의 청구에 대한 최초 보고입니다. 전통적으로 이는 전화나 직접 방문을 통해 이루어졌습니다. 현대 시스템에서는 앱을 통해 발생합니다. 모바일, 챗봇 또는 웹 포털. AI가 처음부터 개입하여 분류합니다. 청구 유형을 파악하고 복잡성을 평가한 후 올바른 워크플로로 전달하세요.
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import List, Optional, Dict
import uuid
class ClaimType(str, Enum):
AUTO_COLLISION = "auto_collision"
AUTO_THEFT = "auto_theft"
AUTO_WINDSHIELD = "auto_windshield"
PROPERTY_WATER = "property_water"
PROPERTY_FIRE = "property_fire"
LIABILITY = "liability"
HEALTH = "health"
UNKNOWN = "unknown"
class ClaimComplexity(str, Enum):
SIMPLE = "simple" # liquidazione automatica
STANDARD = "standard" # processo guidato
COMPLEX = "complex" # intervento umano
LITIGIOUS = "litigious" # legal team
@dataclass
class FNOLSubmission:
"""Rappresenta una segnalazione FNOL ricevuta dal cliente."""
policy_number: str
incident_date: datetime
incident_description: str
location: str
photos: List[str] = field(default_factory=list) # URL foto caricate
documents: List[str] = field(default_factory=list)
contact_phone: str = ""
contact_email: str = ""
third_parties_involved: bool = False
injuries_reported: bool = False
police_report_available: bool = False
claim_id: str = field(default_factory=lambda: str(uuid.uuid4()))
received_at: datetime = field(default_factory=datetime.now)
@dataclass
class FNOLAssessment:
"""Risultato dell'analisi AI della FNOL."""
claim_id: str
claim_type: ClaimType
complexity: ClaimComplexity
estimated_severity: str # low/medium/high
auto_settlement_eligible: bool
required_documents: List[str]
assigned_workflow: str
fraud_risk_score: float # 0-1
priority: int # 1=urgente, 5=normale
routing_notes: str = ""
class FNOLTriageService:
"""
Servizio di triaging automatico per segnalazioni FNOL.
Combina regole di business e modelli ML per classificare
ogni sinistro e assegnarlo al workflow appropriato.
"""
# Keyword per classificazione tipo sinistro (semplificata)
CLAIM_TYPE_KEYWORDS: Dict[ClaimType, List[str]] = {
ClaimType.AUTO_COLLISION: [
"incidente", "scontro", "tamponamento", "urto",
"collision", "crash", "accident"
],
ClaimType.AUTO_THEFT: [
"furto", "rubato", "scomparso", "theft", "stolen"
],
ClaimType.AUTO_WINDSHIELD: [
"parabrezza", "vetro", "windshield", "cristallo"
],
ClaimType.PROPERTY_WATER: [
"allagamento", "perdita", "infiltrazione", "acqua",
"flood", "water", "leak"
],
ClaimType.PROPERTY_FIRE: [
"incendio", "fuoco", "fiamme", "fire", "burn"
],
}
REQUIRED_DOCS: Dict[ClaimType, List[str]] = {
ClaimType.AUTO_COLLISION: [
"CID/CAI o rapporto polizia",
"foto veicolo (4+ lati)",
"documento identità",
"patente di guida",
"carta di circolazione",
],
ClaimType.AUTO_THEFT: [
"denuncia polizia (entro 48h)",
"documento identità",
"carte del veicolo",
"chiavi originali",
],
ClaimType.PROPERTY_WATER: [
"foto danni",
"intervento idraulico (se disponibile)",
"preventivo riparazione",
],
}
def triage(
self, fnol: FNOLSubmission, fraud_score: float = 0.0
) -> FNOLAssessment:
"""Esegue il triaging automatico della segnalazione FNOL."""
claim_type = self._classify_type(fnol.incident_description)
complexity = self._assess_complexity(fnol, fraud_score)
severity = self._estimate_severity(fnol, claim_type)
auto_eligible = self._is_auto_settlement_eligible(fnol, complexity, fraud_score)
workflow = self._assign_workflow(claim_type, complexity)
priority = self._calculate_priority(fnol, complexity, fraud_score)
return FNOLAssessment(
claim_id=fnol.claim_id,
claim_type=claim_type,
complexity=complexity,
estimated_severity=severity,
auto_settlement_eligible=auto_eligible,
required_documents=self.REQUIRED_DOCS.get(claim_type, [
"documento identità",
"foto danni",
"preventivo riparazione",
]),
assigned_workflow=workflow,
fraud_risk_score=fraud_score,
priority=priority,
routing_notes=self._build_routing_notes(
fnol, complexity, fraud_score
),
)
def _classify_type(self, description: str) -> ClaimType:
"""Classifica il tipo di sinistro dalla descrizione testuale."""
desc_lower = description.lower()
scores: Dict[ClaimType, int] = {}
for claim_type, keywords in self.CLAIM_TYPE_KEYWORDS.items():
score = sum(1 for kw in keywords if kw in desc_lower)
if score > 0:
scores[claim_type] = score
if not scores:
return ClaimType.UNKNOWN
return max(scores, key=lambda k: scores[k])
def _assess_complexity(
self, fnol: FNOLSubmission, fraud_score: float
) -> ClaimComplexity:
"""Determina la complessità del sinistro."""
if fnol.injuries_reported:
return ClaimComplexity.LITIGIOUS
if fraud_score > 0.7:
return ClaimComplexity.COMPLEX
if fnol.third_parties_involved and not fnol.police_report_available:
return ClaimComplexity.COMPLEX
if fnol.third_parties_involved or fraud_score > 0.4:
return ClaimComplexity.STANDARD
return ClaimComplexity.SIMPLE
def _estimate_severity(
self, fnol: FNOLSubmission, claim_type: ClaimType
) -> str:
"""Stima la severita economica (low/medium/high)."""
if fnol.injuries_reported:
return "high"
if claim_type == ClaimType.AUTO_THEFT:
return "high"
if fnol.third_parties_involved:
return "medium"
return "low"
def _is_auto_settlement_eligible(
self,
fnol: FNOLSubmission,
complexity: ClaimComplexity,
fraud_score: float,
) -> bool:
"""Verifica l'eleggibilita per liquidazione automatica."""
if complexity not in [ClaimComplexity.SIMPLE, ClaimComplexity.STANDARD]:
return False
if fraud_score > 0.3:
return False
if fnol.injuries_reported or fnol.third_parties_involved:
return False
if len(fnol.photos) < 2:
return False
return True
def _assign_workflow(
self, claim_type: ClaimType, complexity: ClaimComplexity
) -> str:
workflow_map = {
(ClaimType.AUTO_COLLISION, ClaimComplexity.SIMPLE): "auto_collision_fast_track",
(ClaimType.AUTO_COLLISION, ClaimComplexity.STANDARD): "auto_collision_standard",
(ClaimType.AUTO_COLLISION, ClaimComplexity.COMPLEX): "auto_collision_manual",
(ClaimType.AUTO_THEFT, ClaimComplexity.SIMPLE): "auto_theft_standard",
(ClaimType.AUTO_WINDSHIELD, ClaimComplexity.SIMPLE): "windshield_auto",
}
return workflow_map.get(
(claim_type, complexity),
f"generic_{complexity.value}_workflow"
)
def _calculate_priority(
self,
fnol: FNOLSubmission,
complexity: ClaimComplexity,
fraud_score: float,
) -> int:
if fnol.injuries_reported:
return 1 # massima priorità
if complexity == ClaimComplexity.LITIGIOUS:
return 1
if fraud_score > 0.7:
return 2 # priorità alta per indagine fraud
if complexity == ClaimComplexity.COMPLEX:
return 3
return 5 # normale
def _build_routing_notes(
self,
fnol: FNOLSubmission,
complexity: ClaimComplexity,
fraud_score: float,
) -> str:
notes = []
if fnol.injuries_reported:
notes.append("ATTENZIONE: lesioni personali dichiarate - escalation legal/medical obbligatoria")
if fraud_score > 0.5:
notes.append(f"Fraud score elevato ({fraud_score:.2f}) - revisione SIU raccomandata")
if not fnol.photos:
notes.append("Nessuna foto allegata - richiedere al cliente prima di procedere")
return "; ".join(notes) if notes else "Nessuna nota particolare"
자동차 손상 추정을 위한 컴퓨터 비전
자동 피해 추정은 청구 자동화에서 가장 영향력 있는 AI 구성 요소입니다. 고객이 스마트폰으로 손상된 차량을 촬영하면 시스템이 이미지를 분석합니다. 손상된 부품을 식별하기 위해 필요한 개입 유형을 추정합니다(수리 대 교체) 업데이트된 가격 데이터베이스를 기반으로 비용 견적을 계산합니다.
업계에서 가장 많이 사용되는 모델은 다음과 같습니다. 물체 감지 (식별하다 손상된 부분) 손상 심각도 분류 (엔티티를 분류하다 경미한 손상부터 전체 손상까지). 시장 선두주자인 Tractable과 같은 접근 방식이 이를 입증했습니다. 이러한 시스템은 전문 감정인의 정확성과 일치하거나 이를 초과할 수 있습니다.
import torch
import torchvision.transforms as T
from torchvision.models import resnet50, ResNet50_Weights
import numpy as np
from PIL import Image
from typing import Dict, List, Tuple
from dataclasses import dataclass
import io
@dataclass
class DamageRegion:
"""Regione di danno identificata nell'immagine."""
part_name: str # es. "bumper_front", "door_left"
damage_type: str # scratch, dent, crack, broken
severity: str # minor, moderate, severe, total_loss
confidence: float # 0-1
repair_vs_replace: str # "repair" o "replace"
estimated_cost_eur: float
@dataclass
class VehicleDamageAssessment:
"""Risultato completo dell'analisi danni veicolo."""
claim_id: str
images_analyzed: int
damage_regions: List[DamageRegion]
total_estimated_cost: float
total_loss_likelihood: float # 0-1
settlement_recommendation: str
confidence_overall: float
requires_physical_inspection: bool
assessment_notes: str
class VehicleDamageClassifier:
"""
Classificatore di danni veicolo basato su transfer learning.
Architettura: ResNet-50 fine-tuned su dataset proprietario di danni auto
Output: classificazione per parte + severita + tipo danno
In produzione: usare modelli specializzati come quelli di Tractable,
Mitchell, o modelli addestrati su dataset interni.
"""
# Parti veicolo monitorabili
VEHICLE_PARTS = [
"bumper_front", "bumper_rear",
"hood", "trunk",
"door_front_left", "door_front_right",
"door_rear_left", "door_rear_right",
"fender_front_left", "fender_front_right",
"windshield_front", "windshield_rear",
"headlight_left", "headlight_right",
"mirror_left", "mirror_right",
"roof", "pillar",
]
# Tabella costi di riferimento (EUR, valori indicativi 2025)
REPAIR_COST_TABLE: Dict[str, Dict[str, float]] = {
"bumper_front": {"minor": 200, "moderate": 600, "severe": 1200, "replace": 900},
"bumper_rear": {"minor": 200, "moderate": 550, "severe": 1100, "replace": 850},
"hood": {"minor": 300, "moderate": 800, "severe": 1500, "replace": 1200},
"door_front_left": {"minor": 250, "moderate": 700, "severe": 1400, "replace": 1100},
"door_front_right": {"minor": 250, "moderate": 700, "severe": 1400, "replace": 1100},
"windshield_front": {"minor": 0, "moderate": 350, "severe": 600, "replace": 450},
"headlight_left": {"minor": 80, "moderate": 200, "severe": 400, "replace": 350},
"headlight_right": {"minor": 80, "moderate": 200, "severe": 400, "replace": 350},
}
DEFAULT_COST = {"minor": 150, "moderate": 450, "severe": 900, "replace": 700}
def __init__(self, model_path: str = "damage_classifier.pt") -> None:
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = self._load_model(model_path)
self.transform = T.Compose([
T.Resize((224, 224)),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
def _load_model(self, model_path: str) -> torch.nn.Module:
"""Carica il modello fine-tuned da file o usa il pretrained come fallback."""
try:
model = resnet50(weights=None)
# Adatta la testa di classificazione al numero di parti*severita
num_classes = len(self.VEHICLE_PARTS) * 4 # 4 livelli severita
model.fc = torch.nn.Linear(model.fc.in_features, num_classes)
state = torch.load(model_path, map_location=self.device)
model.load_state_dict(state)
print(f"Modello caricato da {model_path}")
except FileNotFoundError:
print("Modello fine-tuned non trovato, uso pretrained ResNet50 (demo only)")
model = resnet50(weights=ResNet50_Weights.IMAGENET1K_V2)
model.eval()
return model.to(self.device)
def analyze_image(self, image_bytes: bytes) -> List[Tuple[str, str, float]]:
"""
Analizza una singola immagine.
Returns: lista di (part_name, severity, confidence)
"""
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
tensor = self.transform(image).unsqueeze(0).to(self.device)
with torch.no_grad():
logits = self.model(tensor)
probs = torch.softmax(logits, dim=1).cpu().numpy()[0]
# Soglia: riporta solo danni con confidence > 30%
results = []
threshold = 0.30
for i, part in enumerate(self.VEHICLE_PARTS):
severities = ["minor", "moderate", "severe", "total_loss"]
for j, sev in enumerate(severities):
idx = i * 4 + j
if idx < len(probs) and probs[idx] > threshold:
results.append((part, sev, float(probs[idx])))
return sorted(results, key=lambda x: x[2], reverse=True)
def estimate_repair_cost(self, part: str, severity: str) -> Tuple[float, str]:
"""
Stima il costo di riparazione e determina repair vs replace.
Returns: (cost_eur, repair_or_replace)
"""
cost_table = self.REPAIR_COST_TABLE.get(part, self.DEFAULT_COST)
if severity in ["severe", "total_loss"]:
replace_cost = cost_table.get("replace", 700)
repair_cost = cost_table.get("severe", 900)
if replace_cost < repair_cost * 0.8:
return replace_cost, "replace"
return repair_cost, "repair"
repair_cost = cost_table.get(severity, 150)
return repair_cost, "repair"
def assess_multiple_images(
self, claim_id: str, image_bytes_list: List[bytes]
) -> VehicleDamageAssessment:
"""
Valutazione completa su più immagini del veicolo.
Aggrega i risultati di tutte le foto per una stima robusta.
"""
# Raccoglie rilevazioni da tutte le immagini
all_detections: Dict[str, List[Tuple[str, float]]] = {}
for img_bytes in image_bytes_list:
detections = self.analyze_image(img_bytes)
for part, severity, confidence in detections:
if part not in all_detections:
all_detections[part] = []
all_detections[part].append((severity, confidence))
# Consolida: per ogni parte prende la severita più alta con confidence media
damage_regions: List[DamageRegion] = []
total_cost = 0.0
severity_order = {"minor": 0, "moderate": 1, "severe": 2, "total_loss": 3}
for part, severity_confidences in all_detections.items():
# Prendi la severita più alta rilevata
max_sev = max(severity_confidences, key=lambda x: severity_order.get(x[0], 0))
severity, _ = max_sev
avg_confidence = np.mean([c for _, c in severity_confidences])
cost, repair_replace = self.estimate_repair_cost(part, severity)
total_cost += cost
damage_regions.append(DamageRegion(
part_name=part,
damage_type="structural" if severity in ["severe", "total_loss"] else "cosmetic",
severity=severity,
confidence=round(float(avg_confidence), 3),
repair_vs_replace=repair_replace,
estimated_cost_eur=cost,
))
total_loss_likelihood = self._estimate_total_loss(damage_regions, total_cost)
requires_inspection = (
total_loss_likelihood > 0.5 or
total_cost > 8000 or
any(d.severity == "severe" and "windshield" in d.part_name
for d in damage_regions)
)
return VehicleDamageAssessment(
claim_id=claim_id,
images_analyzed=len(image_bytes_list),
damage_regions=damage_regions,
total_estimated_cost=round(total_cost, 2),
total_loss_likelihood=round(total_loss_likelihood, 3),
settlement_recommendation=self._settlement_recommendation(
total_cost, total_loss_likelihood
),
confidence_overall=round(
float(np.mean([d.confidence for d in damage_regions])) if damage_regions else 0.0,
3
),
requires_physical_inspection=requires_inspection,
assessment_notes=self._build_notes(damage_regions, total_loss_likelihood),
)
def _estimate_total_loss(
self, regions: List[DamageRegion], total_cost: float
) -> float:
"""Stima la probabilità di perdita totale."""
if total_cost > 15000:
return 0.95
if total_cost > 10000:
return 0.70
severe_count = sum(1 for r in regions if r.severity in ["severe", "total_loss"])
if severe_count >= 4:
return 0.80
if severe_count >= 2:
return 0.40
return max(0.0, (total_cost - 5000) / 10000) if total_cost > 5000 else 0.05
def _settlement_recommendation(self, cost: float, total_loss_prob: float) -> str:
if total_loss_prob > 0.7:
return "TOTAL_LOSS_SETTLEMENT"
if cost > 8000:
return "HIGH_VALUE_REPAIR_AUTHORIZATION"
if cost > 3000:
return "STANDARD_REPAIR_AUTHORIZATION"
return "FAST_TRACK_SETTLEMENT"
def _build_notes(
self, regions: List[DamageRegion], total_loss_prob: float
) -> str:
notes = []
if total_loss_prob > 0.5:
notes.append("Elevata probabilità di perdita totale - verificare valore veicolo")
severe = [r.part_name for r in regions if r.severity == "severe"]
if severe:
notes.append(f"Danni severi rilevati su: {', '.join(severe)}")
return "; ".join(notes) if notes else "Assessment completato senza anomalie"
문서에서 정보를 추출하는 NLP
사고가 발생할 때마다 경찰 보고서, 의료 보고서, 추정치 등 수십 개의 문서가 생성됩니다. 워크숍, 증인 진술, CID/CAI 보고서. 이 문서의 수동 처리 느리고(1~3일) 오류가 발생하기 쉽습니다. 고급 OCR과 결합된 최신 NLP 시스템, 90~96%의 정확도로 구조화된 정보를 자동으로 추출합니다.
import spacy
import re
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, date
from enum import Enum
class DocumentType(str, Enum):
POLICE_REPORT = "police_report"
MEDICAL_REPORT = "medical_report"
REPAIR_ESTIMATE = "repair_estimate"
CID_CAI = "cid_cai" # Constatazione Amichevole di Incidente
WITNESS_STATEMENT = "witness_statement"
INVOICE = "invoice"
UNKNOWN = "unknown"
@dataclass
class ExtractedEntity:
"""Entità estratta da un documento."""
entity_type: str
value: str
confidence: float
source_text: str # testo originale da cui e stata estratta
position: Tuple[int, int] # start, end nel documento
@dataclass
class DocumentExtraction:
"""Risultato dell'estrazione NLP da un documento assicurativo."""
document_type: DocumentType
document_date: Optional[date]
entities: List[ExtractedEntity]
structured_data: Dict
extraction_confidence: float
raw_text: str
class InsuranceDocumentExtractor:
"""
Estrattore NLP per documenti assicurativi.
Combina:
- spaCy per NER (Named Entity Recognition)
- Regex per pattern specifici del dominio assicurativo
- Regole di business per estrazione strutturata
"""
# Pattern regex per entità assicurative italiane
PATTERNS: Dict[str, str] = {
"targa": r"\b[A-Z]{2}\d{3}[A-Z]{2}\b|\b[A-Z]{2}\d{5}\b",
"polizza": r"(?:polizza|n\.\s*pol\.?)\s*[:\.]?\s*([A-Z0-9/-]{6,20})",
"codice_fiscale": r"\b[A-Z]{6}\d{2}[A-Z]\d{2}[A-Z]\d{3}[A-Z]\b",
"partita_iva": r"\b(?:IT)?\d{11}\b",
"euro_amount": r"(?:EUR?|€)\s*[\d.,]{1,10}|[\d.,]{1,10}\s*(?:EUR?|€)",
"data": r"\b\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4}\b",
"phone": r"(?:\+39|0039)?\s*(?:\d{2,4}[\s\-]?){2,4}\d{4}",
"email": r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,6}",
}
def __init__(self, spacy_model: str = "it_core_news_lg") -> None:
try:
self.nlp = spacy.load(spacy_model)
except OSError:
print(f"Modello spaCy '{spacy_model}' non trovato - installa con:")
print(f"python -m spacy download {spacy_model}")
self.nlp = None
def classify_document(self, text: str) -> DocumentType:
"""Classifica il tipo di documento basandosi sul contenuto."""
text_lower = text.lower()
classification_rules = [
(DocumentType.POLICE_REPORT, ["verbale", "polizia stradale", "carabinieri", "codice della strada", "accertamento"]),
(DocumentType.MEDICAL_REPORT, ["diagnosi", "prognosi", "lesioni", "ospedale", "pronto soccorso", "medico"]),
(DocumentType.CID_CAI, ["constatazione amichevole", "cid", "cai", "modulo blu"]),
(DocumentType.REPAIR_ESTIMATE, ["preventivo", "officina", "carrozzeria", "ricambi", "manodopera"]),
(DocumentType.INVOICE, ["fattura", "ricevuta", "importo totale", "iva", "imponibile"]),
]
scores: Dict[DocumentType, int] = {}
for doc_type, keywords in classification_rules:
score = sum(1 for kw in keywords if kw in text_lower)
if score > 0:
scores[doc_type] = score
return max(scores, key=lambda k: scores[k]) if scores else DocumentType.UNKNOWN
def extract(self, raw_text: str) -> DocumentExtraction:
"""Estrae tutte le informazioni rilevanti dal documento."""
doc_type = self.classify_document(raw_text)
entities = self._extract_entities(raw_text)
structured = self._build_structured_data(entities, doc_type, raw_text)
confidence = self._calculate_confidence(entities, doc_type)
doc_date = self._extract_document_date(entities)
return DocumentExtraction(
document_type=doc_type,
document_date=doc_date,
entities=entities,
structured_data=structured,
extraction_confidence=confidence,
raw_text=raw_text,
)
def _extract_entities(self, text: str) -> List[ExtractedEntity]:
"""Estrae entità con regex e NER spaCy."""
entities: List[ExtractedEntity] = []
# Estrazione con regex
for entity_type, pattern in self.PATTERNS.items():
for match in re.finditer(pattern, text, re.IGNORECASE):
entities.append(ExtractedEntity(
entity_type=entity_type,
value=match.group().strip(),
confidence=0.85, # alta confidence per regex su formato noto
source_text=text[max(0, match.start()-20):match.end()+20],
position=(match.start(), match.end()),
))
# Estrazione NER con spaCy
if self.nlp:
doc = self.nlp(text[:100000]) # limit per performance
for ent in doc.ents:
if ent.label_ in ["PER", "ORG", "LOC", "DATE", "MONEY"]:
entities.append(ExtractedEntity(
entity_type=f"spacy_{ent.label_.lower()}",
value=ent.text.strip(),
confidence=0.75,
source_text=text[max(0, ent.start_char-20):ent.end_char+20],
position=(ent.start_char, ent.end_char),
))
return entities
def _build_structured_data(
self,
entities: List[ExtractedEntity],
doc_type: DocumentType,
text: str,
) -> Dict:
"""Costruisce un dizionario strutturato dal documento."""
structured: Dict = {"document_type": doc_type.value}
# Raggruppa entità per tipo
by_type: Dict[str, List[str]] = {}
for ent in entities:
by_type.setdefault(ent.entity_type, []).append(ent.value)
# Mappa entità in campi strutturati
if "targa" in by_type:
structured["vehicle_plates"] = list(set(by_type["targa"]))
if "polizza" in by_type:
structured["policy_numbers"] = list(set(by_type["polizza"]))
if "euro_amount" in by_type:
amounts = []
for amt_str in by_type["euro_amount"]:
clean = re.sub(r"[^\d,.]", "", amt_str).replace(",", ".")
try:
amounts.append(float(clean))
except ValueError:
pass
structured["amounts_eur"] = sorted(amounts)
structured["max_amount_eur"] = max(amounts) if amounts else 0
if "spacy_per" in by_type:
structured["persons_mentioned"] = list(set(by_type["spacy_per"]))
if "spacy_loc" in by_type:
structured["locations_mentioned"] = list(set(by_type["spacy_loc"]))
return structured
def _calculate_confidence(
self, entities: List[ExtractedEntity], doc_type: DocumentType
) -> float:
if not entities:
return 0.1
avg = float(sum(e.confidence for e in entities) / len(entities))
# Penalizza se il tipo e UNKNOWN
if doc_type == DocumentType.UNKNOWN:
avg *= 0.7
return round(min(avg, 1.0), 3)
def _extract_document_date(
self, entities: List[ExtractedEntity]
) -> Optional[date]:
date_entities = [e for e in entities if e.entity_type == "data"]
for de in date_entities:
for fmt in ["%d/%m/%Y", "%d-%m-%Y", "%d.%m.%Y", "%d/%m/%y"]:
try:
return datetime.strptime(de.value, fmt).date()
except ValueError:
continue
return None
정산 작업흐름의 조율
오케스트레이션은 시스템의 두뇌입니다. 모든 구성 요소(FNOL 분류, 손상)를 조정합니다. 평가, 문서 추출, 사기 채점) 및 청구 상태 전환 관리 시간이 지남에 따라 계약상의 SLA 및 비즈니스 에스컬레이션 규칙을 존중합니다.
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Callable
import asyncio
class ClaimStatus(str, Enum):
RECEIVED = "received"
TRIAGED = "triaged"
DOCUMENTS_PENDING = "documents_pending"
ASSESSMENT_IN_PROGRESS = "assessment_in_progress"
FRAUD_REVIEW = "fraud_review"
SETTLEMENT_OFFERED = "settlement_offered"
SETTLEMENT_ACCEPTED = "settlement_accepted"
SETTLEMENT_DISPUTED = "settlement_disputed"
MANUAL_REVIEW = "manual_review"
CLOSED = "closed"
REJECTED = "rejected"
@dataclass
class ClaimWorkflowState:
"""Stato corrente del workflow di un sinistro."""
claim_id: str
status: ClaimStatus
created_at: datetime
updated_at: datetime
assigned_to: Optional[str] = None # handler umano o "auto"
sla_deadline: Optional[datetime] = None
settlement_amount: Optional[float] = None
status_history: List[Dict] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
def transition_to(self, new_status: ClaimStatus, note: str = "") -> None:
"""Transizione di stato con audit trail."""
self.status_history.append({
"from": self.status.value,
"to": new_status.value,
"timestamp": datetime.now().isoformat(),
"note": note,
})
self.status = new_status
self.updated_at = datetime.now()
if note:
self.notes.append(f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] {note}")
class ClaimsWorkflowEngine:
"""
Engine di orchestrazione per il workflow sinistri.
Gestisce le transizioni di stato, gli SLA e le escalation.
In produzione: usare Temporal.io o AWS Step Functions
per la durabilita e il recovery dei workflow.
"""
SLA_HOURS: Dict[str, int] = {
"simple": 4, # 4 ore per sinistri semplici
"standard": 24, # 24 ore per sinistri standard
"complex": 72, # 72 ore per sinistri complessi
"litigious": 168, # 7 giorni per sinistri con contenziosi
}
async def process_claim(
self,
claim_state: ClaimWorkflowState,
fnol: "FNOLSubmission",
triage: "FNOLAssessment",
damage_assessment: Optional["VehicleDamageAssessment"] = None,
doc_extractions: Optional[List["DocumentExtraction"]] = None,
) -> ClaimWorkflowState:
"""Processa un sinistro attraverso il workflow completo."""
# Step 1: Triage e routing
claim_state.transition_to(
ClaimStatus.TRIAGED,
f"Tipo: {triage.claim_type.value}, Complessità: {triage.complexity.value}"
)
sla_hours = self.SLA_HOURS.get(triage.complexity.value, 72)
claim_state.sla_deadline = datetime.now() + timedelta(hours=sla_hours)
# Step 2: Check documenti
if not fnol.documents and not fnol.photos:
claim_state.transition_to(
ClaimStatus.DOCUMENTS_PENDING,
f"Documenti mancanti: {', '.join(triage.required_documents)}"
)
await self._notify_customer_documents_needed(claim_state, triage)
return claim_state
# Step 3: Fraud check
if triage.fraud_risk_score > 0.5:
claim_state.transition_to(
ClaimStatus.FRAUD_REVIEW,
f"Fraud score: {triage.fraud_risk_score:.2f} - invio a SIU"
)
await self._escalate_to_siu(claim_state, triage)
return claim_state
# Step 4: Settlement automatico se eleggibile
if triage.auto_settlement_eligible and damage_assessment:
settlement = damage_assessment.total_estimated_cost
claim_state.settlement_amount = settlement
claim_state.transition_to(
ClaimStatus.SETTLEMENT_OFFERED,
f"Offerta automatica: EUR {settlement:.2f}"
)
await self._send_settlement_offer(claim_state, settlement)
return claim_state
# Step 5: Review manuale
claim_state.transition_to(
ClaimStatus.MANUAL_REVIEW,
"Invio a handler umano per valutazione"
)
await self._assign_human_handler(claim_state, triage)
return claim_state
async def _notify_customer_documents_needed(
self,
state: ClaimWorkflowState,
triage: "FNOLAssessment",
) -> None:
"""Notifica il cliente dei documenti mancanti."""
# In produzione: integrazione con Twilio SMS/WhatsApp o SendGrid
print(f"[NOTIFY] Sinistro {state.claim_id}: richiedere documenti al cliente")
print(f" Documenti necessari: {triage.required_documents}")
async def _escalate_to_siu(
self,
state: ClaimWorkflowState,
triage: "FNOLAssessment",
) -> None:
"""Escalation all'unita investigativa speciale (SIU)."""
print(f"[SIU] Sinistro {state.claim_id} flaggato per fraud review")
print(f" Fraud score: {triage.fraud_risk_score:.2f}")
state.assigned_to = "siu_team"
async def _send_settlement_offer(
self, state: ClaimWorkflowState, amount: float
) -> None:
"""Invia offerta di liquidazione automatica al cliente."""
print(f"[SETTLEMENT] Sinistro {state.claim_id}: offerta EUR {amount:.2f}")
print(f" Deadline accettazione: 10 giorni")
async def _assign_human_handler(
self,
state: ClaimWorkflowState,
triage: "FNOLAssessment",
) -> None:
"""Assegna il sinistro a un handler umano in base alla complessità."""
handler_map = {
"litigious": "legal_team",
"complex": "senior_adjuster",
"standard": "adjuster_pool",
}
state.assigned_to = handler_map.get(triage.complexity.value, "adjuster_pool")
print(f"[ASSIGN] Sinistro {state.claim_id} assegnato a: {state.assigned_to}")
청구 자동화를 위한 지표 모니터링
청구 자동화 시스템의 성능을 모니터링하려면 다음과 같은 지표가 필요합니다. 단순한 ML 지표를 넘어서는 것입니다. 비즈니스 및 운영 지표는 똑같이 중요합니다. 시스템이 실제로 고객 경험을 개선하고 있는지 확인하고 운영의 수익성.
청구자동화시스템 KPI
| 미터법 | 정의 | 목표 |
|---|---|---|
| 자동화율 | 사람의 개입 없이 종결된 청구 비율 | > 50% |
| 비접촉식 청구 비율 | 신체검사를 받지 않은 교통사고 비율 | > 70% (간단한 자동차) |
| 평균 정산 시간 | FNOL Time - 자동차 사고 처리 | < 24시간(빠른 경로) |
| 문서 추출 정확도 | % 필드가 올바르게 추출되었습니다. | > 90% |
| 피해 추정 편차 | AI 추정치와 최종 비용 간의 % 편차 | < 15% |
| 거짓 긍정 사기율 | 사기로 신고된 합법적인 주장 비율 | < 2% |
| 고객 만족(NPS) | 손실 후 NPS | > +30 (대 +10 레거시) |
모범 사례 및 안티패턴
청구 자동화 모범 사례
- 앞유리부터 시작하세요. 완전 자동화를 위한 가장 간단한 사용 사례 - 명확하게 눈에 보이는 손상, 표준화된 비용, 제3자가 관여하지 않음
- 개인 상해에 대한 필수 인간 개입: 개인 상해 청구를 완전히 자동화하지 마십시오. 인간 핸들러에게 에스컬레이션하는 것이 필수입니다.
- 항상 최소 4장의 사진을 요청하세요. 앞, 뒤, 왼쪽, 오른쪽; 주행 거리계와 번호판도 포함됩니다. 사진이 부족하면 정확도가 급격히 떨어집니다.
- 업스트림 사기 점수 모델 통합: 사기 확인은 사기 청구 처리를 피하기 위해 손상 추정 이후가 아닌 피해 추정 이전에 이루어져야 합니다.
- 변경할 수 없는 감사 추적: 각 시스템 결정(자동 또는 수동)은 분쟁에 대한 타임스탬프, 모델 버전 및 입력 값과 함께 기록되어야 합니다.
피해야 할 안티패턴
- 안전망이 없는 자동화: 항상 최소 신뢰 임계값을 구현합니다. 해당 기준 미만에서는 소유권 주장이 자동으로 사람의 검토를 거쳐야 합니다.
- 지리적 검증 없이 추정: 수리 가격은 지역에 따라 크게 다릅니다. 전국 평균 요율을 기준으로 한 견적은 현지 현실과 매우 다를 수 있습니다.
- 사진 품질 무시: 흐릿하고 조명이 어둡거나 부분적인 사진은 정확도를 심각하게 떨어뜨립니다. 처리하기 전에 품질 검사를 실시합니다.
- 너무 빠른 합의 제안: 고객이 손해의 완전성을 평가할 기회를 갖기 전에 합의를 제안하면 후속 분쟁으로 인해 비용이 많이 들 수 있습니다.
결론 및 다음 단계
Computer Vision 및 NLP를 사용한 청구 자동화는 ROI를 제공하는 AI 애플리케이션 중 하나입니다. 보험 부문 최고 수준: 운영 비용을 40~60% 절감하고 표준 케이스의 경우 고객 만족을 보장하고 결제 시간을 몇 주에서 몇 시간으로 단축합니다.
성공의 열쇠는 점진적인 접근 방식입니다. 가장 간단한 사례(앞유리, 작은 외관 손상), 인간 평가자와 비교하여 시스템의 정확도를 정확하게 측정합니다. 항상 유지하면서 보다 복잡한 사례로 자동화를 점진적으로 확장합니다. 강력한 인적 에스컬레이션 메커니즘.
시리즈의 다음 기사에서는 보험 사기 탐지: 그래프 분석을 결합하여 사기성 네트워크와 행동 신호를 식별하는 방법 클레임에서 비정상적인 패턴을 감지합니다.
InsurTech 엔지니어링 시리즈
- 01 - 개발자를 위한 보험 도메인: 제품, 행위자 및 데이터 모델
- 02 - 클라우드 네이티브 정책 관리: API 우선 아키텍처
- 03 - 텔레매틱스 파이프라인: 대규모 UBI 데이터 처리
- 04 - AI Underwriting: 기능 엔지니어링 및 위험 평가
- 05 - 청구 자동화: 컴퓨터 비전 및 NLP(이 기사)
- 06 - 사기 탐지: 그래프 분석 및 행동 신호
- 07 - ACORD 표준 및 보험 API 통합
- 08 - 규정 준수 엔지니어링: Solvency II 및 IFRS 17







