클라우드 네이티브 정책 관리: 보험 플랫폼을 위한 API 우선 아키텍처
정책행정시스템(PAS)은 모든 정책의 심장입니다. 보험 회사. 그리고 고객이 무엇을 구매했는지, 얼마나 오랫동안 구매했는지 정의하는 시스템입니다. 어떤 가격과 어떤 조건으로. 하지만 대부분의 보험사에서는 이탈리아와 유럽의 이 중요한 시스템은 야간 배치를 통해 1990년대 메인프레임에서 여전히 실행됩니다. 존재하지 않거나 예약된 SFTP 파일로 축소된 마지막 시간 및 API.
아키텍처로의 전환 클라우드 네이티브 API 우선 그것은 단순한 질문이 아니다 기술적: 경쟁적 필요성입니다. 새로운 InsurTech 플레이어는 몇 주 안에 제품을 출시합니다. 몇 달이 아닌 실시간으로 고객에게 서비스를 제공하고 수십 개의 외부 소스로부터 데이터를 통합합니다. 시장 2024년 53억 달러 가치로 평가된 Global InsurTech는 2024년 1,329억 달러 이상으로 성장할 것입니다. 2034년(CAGR 22%). 기존 보험사의 74%가 이미 현대화에 투자하고 있습니다. 핵심 시스템(Capgemini World InsurTech 보고서 2024).
이 기사에서는 모델링부터 클라우드 네이티브 정책 관리 시스템을 처음부터 구축합니다. 도메인부터 API 설계, 정책 수명주기 관리부터 브로커와의 통합까지, 디지털 채널 및 규제 시스템.
무엇을 배울 것인가
- 클라우드 네이티브 정책 관리를 위한 API 우선 아키텍처
- 정책의 전체 수명주기 모델링
- 견적, 바인딩, 승인, 취소, 갱신을 위한 REST API 디자인
- 보험 맥락에서의 이벤트 소싱 및 CQRS
- 브로커 포털, 전자상거래 및 중개자와 통합
- 정책 버전 관리 및 보증 관리
- 복잡한 보험 시스템의 테스트 패턴
1. 정책의 수명주기: 할당량부터 만료까지
한 줄의 코드를 작성하기 전에 정책의 전체 수명주기를 이해해야 합니다. 산업 표준 모델에는 다음과 같은 주요 상태가 있습니다.
정책 수명주기 상태
- 인용됨: 고객이 견적을 받았지만 아직 구매하지 않았습니다.
- 애플리케이션: 신청서가 인수심사 단계에 있습니다.
- 경계: 보장이 활성화되어 정식 발행을 기다리고 있습니다.
- IN_FORCE: 적극적이고 효과적인 정책
- 정지된: 보장이 일시적으로 중단됨(예: 미납)
- 갱신됨: 새로운 기간으로 정책이 갱신됩니다.
- 취소: 자연 만료 이전에 정책이 취소됨
- 만료됨 / 만료됨: 자연히 소멸되거나 소멸된 보험
각 상태 전환에는 정확한 비즈니스 규칙이 있습니다. 즉, 누가 이를 활성화할 수 있고, 어떤 유효성을 검증할 수 있습니까? 요청되는 이벤트, 생성되는 이벤트, 전송되는 알림 등이 있습니다. 코드는 반드시 이러한 규칙을 명시적이고 테스트 가능한 방식으로 표현합니다.
2. 도메인 모델: 기본 개체
좋은 정책 관리 시스템은 정확한 도메인 모델을 기반으로 합니다. 기본 엔터티는 다음과 같습니다. 유형 주석을 사용한 Python에서의 관계:
from dataclasses import dataclass, field
from datetime import date, datetime
from decimal import Decimal
from enum import Enum
from typing import Optional
from uuid import UUID, uuid4
class PolicyStatus(str, Enum):
QUOTED = "QUOTED"
APPLICATION = "APPLICATION"
BOUND = "BOUND"
IN_FORCE = "IN_FORCE"
SUSPENDED = "SUSPENDED"
RENEWED = "RENEWED"
CANCELLED = "CANCELLED"
EXPIRED = "EXPIRED"
class CoverageType(str, Enum):
LIABILITY = "LIABILITY"
COMPREHENSIVE = "COMPREHENSIVE"
COLLISION = "COLLISION"
PERSONAL_INJURY = "PERSONAL_INJURY"
PROPERTY = "PROPERTY"
BUSINESS_INTERRUPTION = "BUSINESS_INTERRUPTION"
@dataclass(frozen=True)
class Money:
"""Value object immutabile per importi monetari."""
amount: Decimal
currency: str = "EUR"
def __add__(self, other: "Money") -> "Money":
if self.currency != other.currency:
raise ValueError(f"Cannot add {self.currency} and {other.currency}")
return Money(self.amount + other.amount, self.currency)
def __mul__(self, factor: Decimal) -> "Money":
return Money(self.amount * factor, self.currency)
@dataclass(frozen=True)
class Coverage:
"""Una singola copertura assicurativa all'interno di una polizza."""
coverage_id: UUID
coverage_type: CoverageType
limit: Money # Massimale di indennizzo
deductible: Money # Franchigia
premium: Money # Premio per questa copertura
effective_date: date
expiry_date: date
is_active: bool = True
@dataclass
class PolicyHolder:
"""Il contraente/assicurato della polizza."""
party_id: UUID
first_name: str
last_name: str
fiscal_code: str # Codice fiscale IT
date_of_birth: date
email: str
phone: Optional[str]
address: dict # Indirizzo strutturato
@dataclass
class Endorsement:
"""Una modifica alla polizza originale (endorsement / appendice)."""
endorsement_id: UUID
policy_id: UUID
endorsement_type: str # ADD_COVERAGE, REMOVE_COVERAGE, CHANGE_LIMIT, etc.
effective_date: date
description: str
premium_adjustment: Money # Positivo = aumento, negativo = rimborso
previous_state: dict # Snapshot dello stato prima dell'endorsement
new_state: dict # Snapshot dello stato dopo l'endorsement
created_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class Policy:
"""Entità radice dell'aggregato polizza."""
policy_id: UUID
policy_number: str # Es. "AUTO-IT-2024-0001234"
product_code: str # Es. "AUTO_RC", "HOME_ALL_RISK"
status: PolicyStatus
holder: PolicyHolder
coverages: list[Coverage]
endorsements: list[Endorsement]
effective_date: date
expiry_date: date
annual_premium: Money
created_at: datetime
updated_at: datetime
# Metadata di distribuzione
channel: str # "DIRECT", "BROKER", "AGGREGATOR"
agent_code: Optional[str]
broker_code: Optional[str]
def total_premium(self) -> Money:
base = sum(
(c.premium for c in self.coverages if c.is_active),
start=Money(Decimal("0"))
)
adjustments = sum(
(e.premium_adjustment for e in self.endorsements),
start=Money(Decimal("0"))
)
return base + adjustments
def is_active(self) -> bool:
return self.status == PolicyStatus.IN_FORCE
def can_file_claim(self) -> bool:
today = date.today()
return (
self.is_active()
and self.effective_date <= today <= self.expiry_date
)
3. API 설계: 정책 운영을 위한 REST API
API 우선 아키텍처는 API가 그 전에도 가장 먼저 설계한다는 의미입니다. 구현의. API 계약은 명확하고 버전이 지정되어 있어야 하며 계약과 호환되어야 합니다. 업계 기대치(ACORD 표준, OpenAPI 3.1).
다음은 정책 관리 서비스에 대한 기본 엔드포인트 API의 구조입니다.
| 방법 | 엔드포인트 | 작업 | 본문 / 응답 |
|---|---|---|---|
| 우편 | /v1/인용문 | 견적 생성 | 견적요청 -> 견적응답 |
| 우편 | /v1/quotes/{quoteId}/bind | 견적을 정책으로 변환 | BindRequest -> 정책 응답 |
| 얻다 | /v1/policies/{policyId} | 정책 복구 | -> 정책응답 |
| 우편 | /v1/policies/{policyId}/endorse | 보증 발행 | EndorsementRequest -> EndorsementResponse |
| 우편 | /v1/policies/{policyId}/취소 | 정책 취소 | 취소요청 -> 정책응답 |
| 우편 | /v1/policies/{policyId}/renew | 정책 갱신 | 갱신요청 -> 정책응답 |
| 얻다 | /v1/policies/{policyId}/documents | 정책 문서 | -> DocumentListResponse |
Pydantic 검증을 통한 FastAPI 구현:
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from pydantic import BaseModel, Field, validator
from typing import Optional
from decimal import Decimal
from datetime import date
from uuid import UUID
import uuid
app = FastAPI(
title="Policy Management API",
version="1.0.0",
description="Cloud-native insurance policy management"
)
# --- Request/Response Models ---
class VehicleInfo(BaseModel):
plate: str = Field(..., pattern=r"^[A-Z]{2}[0-9]{3}[A-Z]{2}$|^[A-Z]{2}[0-9]{5}$")
make: str
model: str
year: int = Field(..., ge=1900, le=2026)
value: Decimal = Field(..., gt=0)
engine_cc: int
class QuoteRequest(BaseModel):
product_code: str = Field(..., pattern="^[A-Z_]+$")
holder_fiscal_code: str
vehicle: Optional[VehicleInfo]
effective_date: date
desired_coverages: list[str]
channel: str = Field(default="DIRECT")
@validator("effective_date")
def effective_date_not_past(cls, v: date) -> date:
if v < date.today():
raise ValueError("Effective date cannot be in the past")
return v
class CoverageQuote(BaseModel):
coverage_type: str
limit: Decimal
deductible: Decimal
annual_premium: Decimal
currency: str = "EUR"
class QuoteResponse(BaseModel):
quote_id: UUID
product_code: str
coverages: list[CoverageQuote]
total_annual_premium: Decimal
currency: str = "EUR"
valid_until: date
terms_version: str
class BindRequest(BaseModel):
payment_reference: str
holder_data: dict # Validated separately by KYC service
selected_coverages: list[str]
broker_code: Optional[str]
class PolicyResponse(BaseModel):
policy_id: UUID
policy_number: str
status: str
product_code: str
effective_date: date
expiry_date: date
annual_premium: Decimal
currency: str = "EUR"
created_at: str
# --- Service Layer ---
class PolicyService:
def __init__(self, repo, pricer, underwriter, event_bus):
self.repo = repo
self.pricer = pricer
self.underwriter = underwriter
self.event_bus = event_bus
async def create_quote(self, request: QuoteRequest) -> QuoteResponse:
# 1. Validate product exists
product = await self.repo.get_product(request.product_code)
if not product:
raise ValueError(f"Unknown product: {request.product_code}")
# 2. Score the risk
risk_score = await self.underwriter.score(request)
# 3. Calculate premium
coverages = await self.pricer.calculate(product, risk_score, request)
# 4. Persist quote with TTL
quote = await self.repo.save_quote(
product_code=request.product_code,
coverages=coverages,
valid_hours=72
)
return QuoteResponse(
quote_id=quote.id,
product_code=request.product_code,
coverages=coverages,
total_annual_premium=sum(c.annual_premium for c in coverages),
valid_until=quote.valid_until,
terms_version=product.terms_version
)
async def bind_policy(
self,
quote_id: UUID,
request: BindRequest,
background_tasks: BackgroundTasks
) -> PolicyResponse:
# 1. Retrieve and validate quote
quote = await self.repo.get_quote(quote_id)
if not quote or quote.is_expired():
raise ValueError("Quote expired or not found")
# 2. Final underwriting check
uw_result = await self.underwriter.final_check(quote, request.holder_data)
if uw_result.is_declined:
raise ValueError(f"Policy declined: {uw_result.reason}")
# 3. Create policy
policy = await self.repo.create_policy(quote, request, uw_result)
# 4. Async operations (non-blocking)
background_tasks.add_task(self._post_bind_workflow, policy)
return PolicyResponse(
policy_id=policy.id,
policy_number=policy.number,
status=policy.status,
product_code=policy.product_code,
effective_date=policy.effective_date,
expiry_date=policy.expiry_date,
annual_premium=policy.annual_premium.amount,
currency=policy.annual_premium.currency,
created_at=policy.created_at.isoformat()
)
async def _post_bind_workflow(self, policy):
"""Operazioni asincrone post-bind."""
await self.event_bus.publish("policy.bound", {"policy_id": str(policy.id)})
# Trigger: document generation, CRM update, payment setup
# --- API Routes ---
@app.post("/v1/quotes", response_model=QuoteResponse, status_code=201)
async def create_quote(
request: QuoteRequest,
service: PolicyService = Depends(get_policy_service)
):
try:
return await service.create_quote(request)
except ValueError as e:
raise HTTPException(status_code=422, detail=str(e))
@app.post("/v1/quotes/{quote_id}/bind", response_model=PolicyResponse, status_code=201)
async def bind_policy(
quote_id: UUID,
request: BindRequest,
background_tasks: BackgroundTasks,
service: PolicyService = Depends(get_policy_service)
):
try:
return await service.bind_policy(quote_id, request, background_tasks)
except ValueError as e:
raise HTTPException(status_code=422, detail=str(e))
4. 보험 감사 추적을 위한 이벤트 소싱
보험 업계에는 엄격한 감사 및 추적성 요구 사항이 있습니다. 정책이 변경될 때마다 규제(IVASS, EIOPA) 및 법적 이유로 인해 불변의 방식으로 추적되어야 합니다. 이벤트 이 요구 사항에 가장 적합한 소싱 및 아키텍처 패턴입니다.
상태를 직접 업데이트하는 대신 정책에 대한 각 작업은 변경할 수 없는 이벤트를 생성합니다. 모든 과거 사건의 현재 상태 및 예측:
from dataclasses import dataclass
from datetime import datetime
from typing import Union
from uuid import UUID
# --- Policy Domain Events ---
@dataclass(frozen=True)
class PolicyQuoted:
event_type: str = "PolicyQuoted"
policy_id: UUID = None
quote_id: UUID = None
product_code: str = None
holder_id: UUID = None
annual_premium: dict = None # {amount, currency}
occurred_at: datetime = None
@dataclass(frozen=True)
class PolicyBound:
event_type: str = "PolicyBound"
policy_id: UUID = None
policy_number: str = None
effective_date: str = None
expiry_date: str = None
channel: str = None
agent_code: str = None
occurred_at: datetime = None
@dataclass(frozen=True)
class PolicyEndorsed:
event_type: str = "PolicyEndorsed"
policy_id: UUID = None
endorsement_id: UUID = None
endorsement_type: str = None
premium_adjustment: dict = None
occurred_at: datetime = None
@dataclass(frozen=True)
class PolicyCancelled:
event_type: str = "PolicyCancelled"
policy_id: UUID = None
cancellation_reason: str = None
cancellation_date: str = None
refund_amount: dict = None
requested_by: str = None # "HOLDER", "INSURER", "REGULATOR"
occurred_at: datetime = None
PolicyEvent = Union[PolicyQuoted, PolicyBound, PolicyEndorsed, PolicyCancelled]
# --- Event Store ---
class PolicyEventStore:
"""Append-only store per eventi di polizza."""
def __init__(self, db_pool):
self.db = db_pool
async def append(self, policy_id: UUID, event: PolicyEvent) -> int:
"""Appende un evento e restituisce il nuovo sequence number."""
async with self.db.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO policy_events
(policy_id, event_type, event_data, occurred_at)
VALUES ($1, $2, $3, $4)
RETURNING sequence_number
""",
str(policy_id),
event.event_type,
event.__dict__,
event.occurred_at or datetime.utcnow()
)
return row["sequence_number"]
async def get_history(self, policy_id: UUID) -> list[dict]:
"""Recupera tutti gli eventi di una polizza in ordine cronologico."""
async with self.db.acquire() as conn:
rows = await conn.fetch(
"""
SELECT event_type, event_data, occurred_at, sequence_number
FROM policy_events
WHERE policy_id = $1
ORDER BY sequence_number ASC
""",
str(policy_id)
)
return [dict(r) for r in rows]
async def rebuild_state(self, policy_id: UUID) -> dict:
"""Ricostruisce lo stato corrente della polizza dagli eventi."""
events = await self.get_history(policy_id)
state = {}
for evt in events:
state = self._apply_event(state, evt)
return state
def _apply_event(self, state: dict, event: dict) -> dict:
event_type = event["event_type"]
data = event["event_data"]
if event_type == "PolicyQuoted":
return {
**state,
"status": "QUOTED",
"policy_id": data["policy_id"],
"product_code": data["product_code"],
"annual_premium": data["annual_premium"]
}
elif event_type == "PolicyBound":
return {
**state,
"status": "IN_FORCE",
"policy_number": data["policy_number"],
"effective_date": data["effective_date"],
"expiry_date": data["expiry_date"],
"channel": data["channel"]
}
elif event_type == "PolicyEndorsed":
# Aggiorna il premio
current_premium = state.get("annual_premium", {"amount": "0"})
adjustment = data.get("premium_adjustment", {"amount": "0"})
return {
**state,
"annual_premium": {
"amount": str(
float(current_premium["amount"])
+ float(adjustment["amount"])
),
"currency": current_premium.get("currency", "EUR")
}
}
elif event_type == "PolicyCancelled":
return {
**state,
"status": "CANCELLED",
"cancellation_reason": data["cancellation_reason"],
"cancellation_date": data["cancellation_date"]
}
return state
5. 보증관리: 중기변화
승인(중간 변경)은 빈번하고 섬세한 작업입니다. 운전자, 차량 변경, 주소 변경. 모든 승인을 받을 수 있습니다. 보험료(비례)에 영향을 미치며 업데이트된 문서를 생성해야 합니다.
from decimal import Decimal
from datetime import date
class EndorsementCalculator:
"""Calcola l'aggiustamento di premio per un endorsement mid-term."""
def calculate_prorata_adjustment(
self,
current_annual_premium: Decimal,
new_annual_premium: Decimal,
endorsement_date: date,
policy_expiry: date
) -> Decimal:
"""
Calcola l'aggiustamento pro-rata del premio.
Restituisce importo positivo (addebitare) o negativo (rimborsare).
"""
remaining_days = (policy_expiry - endorsement_date).days
total_days = 365 # Semplificazione; usare calendario reale per polizze annuali
premium_difference = new_annual_premium - current_annual_premium
prorata_factor = Decimal(remaining_days) / Decimal(total_days)
return premium_difference * prorata_factor
class EndorsementService:
def __init__(self, repo, event_store, calculator, doc_service):
self.repo = repo
self.event_store = event_store
self.calculator = calculator
self.doc_service = doc_service
async def add_driver(
self,
policy_id: str,
driver_data: dict,
effective_date: date
) -> dict:
"""Aggiunge un conducente aggiuntivo alla polizza auto."""
# 1. Recupera polizza corrente
policy = await self.repo.get_policy(policy_id)
if not policy.is_active():
raise ValueError("Cannot endorse inactive policy")
# 2. Calcola nuovo premio con driver aggiuntivo
new_premium = await self._reprice_with_driver(policy, driver_data)
# 3. Calcola aggiustamento pro-rata
adjustment = self.calculator.calculate_prorata_adjustment(
current_annual_premium=policy.annual_premium.amount,
new_annual_premium=new_premium,
endorsement_date=effective_date,
policy_expiry=policy.expiry_date
)
# 4. Persisti endorsement
endorsement_id = await self.repo.create_endorsement(
policy_id=policy_id,
type="ADD_DRIVER",
effective_date=effective_date,
driver_data=driver_data,
premium_adjustment=adjustment
)
# 5. Pubblica evento
await self.event_store.append(policy_id, PolicyEndorsed(
policy_id=policy_id,
endorsement_id=endorsement_id,
endorsement_type="ADD_DRIVER",
premium_adjustment={"amount": str(adjustment), "currency": "EUR"}
))
# 6. Genera documento endorsement (async)
await self.doc_service.generate_endorsement_certificate(
policy_id, endorsement_id
)
return {
"endorsement_id": str(endorsement_id),
"premium_adjustment": float(adjustment),
"currency": "EUR",
"effective_date": effective_date.isoformat()
}
async def _reprice_with_driver(self, policy, driver_data: dict) -> Decimal:
"""Riprice la polizza includendo il nuovo conducente."""
# Logic depends on product rating algorithm
base_premium = policy.annual_premium.amount
driver_age = driver_data.get("age", 30)
# Young driver surcharge (semplificato)
if driver_age < 25:
surcharge = base_premium * Decimal("0.25")
elif driver_age < 30:
surcharge = base_premium * Decimal("0.10")
else:
surcharge = Decimal("0")
return base_premium + surcharge
6. 마이크로서비스 아키텍처: 정책 도메인 분해
엔터프라이즈 정책 관리 시스템은 일반적으로 다음과 같은 마이크로서비스로 분해됩니다. 각각은 제한된 컨텍스트와 잘 정의된 책임을 가지고 있습니다.
| 서비스 | 책임 | 데이터베이스 | 의사소통 |
|---|---|---|---|
| 견적 서비스 | 견적, 평가 | Redis(캐시) + PostgreSQL | REST + 이벤트 |
| 정책 서비스 | 수명주기 정책, 보증 | PostgreSQL(이벤트 저장소) | REST + 이벤트 |
| 결제 서비스 | 결제, 할부, 환불 | 포스트그레SQL | 이벤트 |
| 문서 서비스 | PDF 생성, 보관 | S3 / Azure Blob | 이벤트(비동기) |
| 알림 서비스 | 이메일, SMS, 푸시 | Redis(큐) | 이벤트(비동기) |
| 인수 서비스 | 위험 점수, 수락/거절 | PostgreSQL + ML 스토어 | REST(동기화) |
7. 브로커 포털 통합
보험 중개인은 전용 포털을 통해 시스템에 액세스합니다. API 통합 인증, 브로커 클라이언트 데이터에 대한 액세스, 커미션 및 집계보고. 다음은 브로커용 OAuth2 미들웨어의 예입니다.
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
import httpx
BROKER_PORTAL_SCOPES = [
"policy:read",
"policy:quote",
"policy:bind",
"policy:endorse",
"commission:read"
]
class BrokerAuth:
"""Middleware per autenticazione e autorizzazione broker."""
def __init__(self, jwks_url: str, audience: str):
self.jwks_url = jwks_url
self.audience = audience
self._jwks_cache = None
async def get_jwks(self) -> dict:
if not self._jwks_cache:
async with httpx.AsyncClient() as client:
resp = await client.get(self.jwks_url)
self._jwks_cache = resp.json()
return self._jwks_cache
async def verify_broker_token(
self,
credentials: HTTPAuthorizationCredentials = Depends(HTTPBearer())
) -> dict:
"""Verifica JWT token del broker e restituisce claims."""
try:
jwks = await self.get_jwks()
payload = jwt.decode(
credentials.credentials,
jwks,
algorithms=["RS256"],
audience=self.audience
)
broker_code = payload.get("broker_code")
if not broker_code:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Missing broker_code claim"
)
return payload
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid token: {str(e)}"
)
# Dependency injection
broker_auth = BrokerAuth(
jwks_url="https://auth.insurer.com/.well-known/jwks.json",
audience="broker-portal-api"
)
@app.get("/v1/broker/{broker_code}/portfolio")
async def get_broker_portfolio(
broker_code: str,
broker_claims: dict = Depends(broker_auth.verify_broker_token),
policy_service: PolicyService = Depends(get_policy_service)
):
"""Restituisce il portfolio polizze di un broker."""
# Verifica che il broker acceda solo ai propri dati
if broker_claims["broker_code"] != broker_code:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied: broker_code mismatch"
)
policies = await policy_service.get_policies_by_broker(broker_code)
return {
"broker_code": broker_code,
"total_policies": len(policies),
"in_force_count": sum(1 for p in policies if p["status"] == "IN_FORCE"),
"total_gwp": sum(p["annual_premium"] for p in policies),
"policies": policies[:50] # Paginare in produzione
}
8. 자동 갱신: 일괄 처리 및 지원
갱신은 중요한 작업입니다. 갱신 시 고객 손실은 비용이 많이 듭니다(비용 고객 확보는 유지 대비 5~7배). 현대 시스템은 맞춤형 지원 및 동적 가격 책정을 통한 갱신 프로세스.
import asyncio
from datetime import date, timedelta
from dataclasses import dataclass
@dataclass
class RenewalCandidate:
policy_id: str
holder_email: str
expiry_date: date
current_premium: float
renewal_premium: float
days_to_expiry: int
class RenewalOrchestrator:
"""Gestisce il processo automatico di rinnovo polizze."""
RENEWAL_WINDOWS = [60, 45, 30, 15, 7] # Giorni prima della scadenza
def __init__(self, policy_repo, pricer, notification_svc, event_bus):
self.repo = policy_repo
self.pricer = pricer
self.notification = notification_svc
self.event_bus = event_bus
async def run_daily_renewal_batch(self) -> dict:
"""Job giornaliero: identifica polizze da rinnovare e avvia outreach."""
today = date.today()
stats = {"processed": 0, "quoted": 0, "errors": 0}
for days_ahead in self.RENEWAL_WINDOWS:
target_date = today + timedelta(days=days_ahead)
candidates = await self.repo.get_expiring_policies(target_date)
tasks = [
self._process_renewal_candidate(c, days_ahead)
for c in candidates
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
stats["processed"] += 1
if isinstance(result, Exception):
stats["errors"] += 1
else:
stats["quoted"] += 1
return stats
async def _process_renewal_candidate(
self,
policy_id: str,
days_to_expiry: int
) -> dict:
policy = await self.repo.get_policy(policy_id)
# Calcola nuovo premio di rinnovo
renewal_premium = await self.pricer.calculate_renewal(policy)
premium_change_pct = (
(renewal_premium - float(policy["annual_premium"])) /
float(policy["annual_premium"]) * 100
)
# Salva preventivo rinnovo
renewal_quote = await self.repo.save_renewal_quote(
policy_id=policy_id,
renewal_premium=renewal_premium,
valid_days=days_to_expiry + 30
)
# Scegli template notifica in base ai giorni rimanenti
if days_to_expiry == 60:
template = "renewal_early_notice"
elif days_to_expiry == 30:
template = "renewal_30_day"
elif days_to_expiry == 7:
template = "renewal_final_reminder"
else:
template = "renewal_standard"
# Invia notifica
await self.notification.send_renewal_outreach(
holder_email=policy["holder"]["email"],
template=template,
context={
"policy_number": policy["policy_number"],
"expiry_date": policy["expiry_date"],
"renewal_premium": renewal_premium,
"premium_change_pct": premium_change_pct,
"renewal_link": f"https://portal.insurer.com/renew/{renewal_quote['id']}"
}
)
# Pubblica evento
await self.event_bus.publish("policy.renewal_quoted", {
"policy_id": policy_id,
"renewal_quote_id": renewal_quote["id"],
"days_to_expiry": days_to_expiry
})
return renewal_quote
9. 테스트: 복잡한 보험 시스템에 대한 접근 방식
보험 시스템은 복잡한 비즈니스 로직과 규칙에 대한 의존도가 높습니다. 그들은 시간이 지남에 따라 변합니다. 테스트는 비즈니스 규칙, 통합에 대한 단위 테스트를 다루어야 합니다. 엔드 투 엔드 흐름에 대한 테스트 및 브로커에 노출된 API에 대한 계약 테스트입니다.
import pytest
from decimal import Decimal
from datetime import date, timedelta
from unittest.mock import AsyncMock, MagicMock
class TestEndorsementCalculator:
"""Unit test per il calcolo pro-rata degli endorsement."""
def setup_method(self):
self.calc = EndorsementCalculator()
def test_prorata_increase_with_full_year_remaining(self):
adjustment = self.calc.calculate_prorata_adjustment(
current_annual_premium=Decimal("600"),
new_annual_premium=Decimal("750"),
endorsement_date=date(2025, 1, 1),
policy_expiry=date(2026, 1, 1)
)
# 150 difference * (365/365) = 150
assert abs(adjustment - Decimal("150")) < Decimal("1")
def test_prorata_decrease_at_midterm(self):
adjustment = self.calc.calculate_prorata_adjustment(
current_annual_premium=Decimal("600"),
new_annual_premium=Decimal("500"),
endorsement_date=date(2025, 7, 1), # 6 mesi dopo inizio
policy_expiry=date(2026, 1, 1)
)
# Aggiustamento negativo (rimborso)
assert adjustment < Decimal("0")
def test_endorsement_on_last_day_is_zero(self):
today = date.today()
adjustment = self.calc.calculate_prorata_adjustment(
current_annual_premium=Decimal("600"),
new_annual_premium=Decimal("800"),
endorsement_date=today,
policy_expiry=today # Scadenza oggi
)
assert adjustment == Decimal("0")
@pytest.mark.asyncio
class TestPolicyService:
"""Integration test del service layer."""
async def test_create_quote_success(self):
# Arrange
mock_repo = AsyncMock()
mock_repo.get_product.return_value = MagicMock(terms_version="v2.1")
mock_pricer = AsyncMock()
mock_pricer.calculate.return_value = [
CoverageQuote(
coverage_type="LIABILITY",
limit=Decimal("1000000"),
deductible=Decimal("0"),
annual_premium=Decimal("400")
)
]
mock_underwriter = AsyncMock()
mock_underwriter.score.return_value = MagicMock(score=0.3)
mock_repo.save_quote.return_value = MagicMock(
id="q-123",
valid_until=date.today() + timedelta(days=72)
)
service = PolicyService(
repo=mock_repo,
pricer=mock_pricer,
underwriter=mock_underwriter,
event_bus=AsyncMock()
)
request = QuoteRequest(
product_code="AUTO_RC",
holder_fiscal_code="CLDFRC80A01H501U",
effective_date=date.today() + timedelta(days=1),
desired_coverages=["LIABILITY"]
)
# Act
result = await service.create_quote(request)
# Assert
assert result.total_annual_premium == Decimal("400")
assert result.terms_version == "v2.1"
mock_underwriter.score.assert_awaited_once()
10. 모범 사례 및 안티 패턴
안티 패턴: God 개체 정책
200개의 열이 있는 단일 "정책" 테이블을 만들지 마세요. 다음과 같은 풍부한 도메인 모델을 사용하세요. 명확한 가치 개체(돈, 보장 범위, 주소) 및 집계 경계. 열 "coverage_data JSONB"는 도메인 모델이 아닙니다.
안티 패턴: 감사 추적이 없는 변경 가능한 상태
기록을 추적하지 않고 정책을 직접 업데이트하지 마세요. 보험 부문 IVASS에 대한 전체 감사가 필요합니다. 이벤트 소싱 또는 최소한 기록 테이블을 사용하십시오.
모범 사례: 중요한 작업을 위한 멱등성
바인딩 및 삭제 작업은 멱등적이어야 합니다. 사용
idempotency_key 중요한 요청의 헤더. 결과 유지
동일한 키가 다시 전송되면 이를 반환합니다.
모범 사례: Pact를 사용한 계약 테스트
브로커 및 애그리게이터와의 통합을 위해서는 계약 테스트(Pact)를 사용하십시오. 모든 소비자 예상 계약을 정의합니다. 공급자는 이를 존중하는지 확인합니다. 이 중요한 API의 주요 변경을 방지합니다.
결론
API 우선 클라우드 기반 정책 관리 시스템에는 모델링에 대한 투자가 필요합니다. 인프라가 구축되기 전에도 도메인의 풍부한 도메인 모델(집계, 값 개체, 이벤트), 감사 추적을 위한 ACORD 호환 API 설계 및 이벤트 소싱이 핵심입니다. 빌드할 위치입니다.
보험사의 74%가 핵심 시스템을 현대화하고 있습니다: InsurTechs와의 경쟁 디지털 네이티브는 이러한 변화를 더 이상 선택 사항이 아닙니다. 이 문서에 설명된 패턴 기사 - 이벤트 소싱, 마이크로서비스, API 우선, 보증 관리 -가 기초입니다. 확장 가능하고 유지 관리 가능한 보험 플랫폼을 구축하기 위한 기술입니다.
InsurTech 시리즈의 향후 기사
- 텔레매틱스 파이프라인: 대규모 UBI 데이터 처리
- AI Underwriting: 기능 엔지니어링 및 위험 평가
- 청구 자동화: 컴퓨터 비전 및 NLP
- 사기 탐지: 그래프 분석 및 행동 신호







