クラウドネイティブなポリシー管理: 保険プラットフォーム向けの API ファースト アーキテクチャ
ポリシー管理システム (PAS) は、あらゆるサービスの心臓部です。 保険会社。そして、顧客が何をどのくらいの期間購入したかを定義するシステム どのような価格で、どのような条件で。しかし、ほとんどの保険会社では、 イタリアとヨーロッパのこの重要なシステムは、今でも 1990 年代のメインフレームで稼働しており、夜間バッチが実行されます。 残り数時間の API や、存在しない API、またはスケジュールされた SFTP ファイルに縮小された API。
アーキテクチャへの移行 クラウドネイティブ API ファースト それは単なる質問ではありません 技術的: それは競争上必要なものです。新しいインシュアテック企業は数週間以内に製品を発売します 数か月ではなく、リアルタイムで顧客にサービスを提供し、数十の外部ソースからのデータを統合します。市場 2024 年に 53 億ドルと評価されるグローバル インシュアテックは、2024 年までに 1,329 億ドル以上に成長するでしょう。 2034 年 (CAGR 22%)。伝統的な保険会社の 74% がすでに保険の近代化に投資しています。 コア システム (Capgemini World InsurTech Report 2024)。
この記事では、モデリングからクラウドネイティブのポリシー管理システムをゼロから構築します。 ドメインの API 設計、ポリシーのライフサイクル管理からブローカーとの統合まで、 デジタルチャネルと規制システム。
何を学ぶか
- クラウドネイティブなポリシー管理のための API ファースト アーキテクチャ
- ポリシーの完全なライフサイクルのモデル化
- 見積、バインド、承認、キャンセル、更新のための REST API 設計
- 保険におけるイベント ソーシングと CQRS
- ブローカーポータル、電子商取引、仲介業者との統合
- ポリシーのバージョン管理と承認の管理
- 複雑な保険システムのテストパターン
1. ポリシーのライフサイクル: 割り当てから失効まで
一行のコードを記述する前に、ポリシーの完全なライフサイクルを理解する必要があります。 業界標準モデルには次の主な状態があります。
ポリシーのライフサイクル状態
- 引用: 顧客は見積もりを受け取りましたが、まだ購入していません。
- 応用: 申請は引受審査段階にあります
- バウンド: 補償は有効であり、正式な発行を待っています
- 有効: アクティブかつ完全に効果的な政策
- 一時停止中:保険の一時停止中(未払いなど)
- リニューアルしました: 新しい期間に更新されたポリシー
- キャンセル: 自然満了前に保険契約がキャンセルされました
- 期限切れ/失効:自然失効または失効した保険
各状態遷移には、誰がそれをアクティブ化できるか、どの検証が行われるかなど、正確なビジネス ルールがあります。 が要求されるか、どのようなイベントが生成されるか、どのような通知が送信されるか。コードは次のとおりです。 これらのルールを明示的かつテスト可能な方法で表現します。
2. ドメイン モデル: 基本的なエンティティ
優れたポリシー管理システムは、正確なドメイン モデルに基づいています。基本的なエンティティは次のとおりです Python での型アノテーションとの関係:
from dataclasses import dataclass, field
from datetime import date, datetime
from decimal import Decimal
from enum import Enum
from typing import Optional
from uuid import UUID, uuid4
class PolicyStatus(str, Enum):
QUOTED = "QUOTED"
APPLICATION = "APPLICATION"
BOUND = "BOUND"
IN_FORCE = "IN_FORCE"
SUSPENDED = "SUSPENDED"
RENEWED = "RENEWED"
CANCELLED = "CANCELLED"
EXPIRED = "EXPIRED"
class CoverageType(str, Enum):
LIABILITY = "LIABILITY"
COMPREHENSIVE = "COMPREHENSIVE"
COLLISION = "COLLISION"
PERSONAL_INJURY = "PERSONAL_INJURY"
PROPERTY = "PROPERTY"
BUSINESS_INTERRUPTION = "BUSINESS_INTERRUPTION"
@dataclass(frozen=True)
class Money:
"""Value object immutabile per importi monetari."""
amount: Decimal
currency: str = "EUR"
def __add__(self, other: "Money") -> "Money":
if self.currency != other.currency:
raise ValueError(f"Cannot add {self.currency} and {other.currency}")
return Money(self.amount + other.amount, self.currency)
def __mul__(self, factor: Decimal) -> "Money":
return Money(self.amount * factor, self.currency)
@dataclass(frozen=True)
class Coverage:
"""Una singola copertura assicurativa all'interno di una polizza."""
coverage_id: UUID
coverage_type: CoverageType
limit: Money # Massimale di indennizzo
deductible: Money # Franchigia
premium: Money # Premio per questa copertura
effective_date: date
expiry_date: date
is_active: bool = True
@dataclass
class PolicyHolder:
"""Il contraente/assicurato della polizza."""
party_id: UUID
first_name: str
last_name: str
fiscal_code: str # Codice fiscale IT
date_of_birth: date
email: str
phone: Optional[str]
address: dict # Indirizzo strutturato
@dataclass
class Endorsement:
"""Una modifica alla polizza originale (endorsement / appendice)."""
endorsement_id: UUID
policy_id: UUID
endorsement_type: str # ADD_COVERAGE, REMOVE_COVERAGE, CHANGE_LIMIT, etc.
effective_date: date
description: str
premium_adjustment: Money # Positivo = aumento, negativo = rimborso
previous_state: dict # Snapshot dello stato prima dell'endorsement
new_state: dict # Snapshot dello stato dopo l'endorsement
created_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class Policy:
"""Entità radice dell'aggregato polizza."""
policy_id: UUID
policy_number: str # Es. "AUTO-IT-2024-0001234"
product_code: str # Es. "AUTO_RC", "HOME_ALL_RISK"
status: PolicyStatus
holder: PolicyHolder
coverages: list[Coverage]
endorsements: list[Endorsement]
effective_date: date
expiry_date: date
annual_premium: Money
created_at: datetime
updated_at: datetime
# Metadata di distribuzione
channel: str # "DIRECT", "BROKER", "AGGREGATOR"
agent_code: Optional[str]
broker_code: Optional[str]
def total_premium(self) -> Money:
base = sum(
(c.premium for c in self.coverages if c.is_active),
start=Money(Decimal("0"))
)
adjustments = sum(
(e.premium_adjustment for e in self.endorsements),
start=Money(Decimal("0"))
)
return base + adjustments
def is_active(self) -> bool:
return self.status == PolicyStatus.IN_FORCE
def can_file_claim(self) -> bool:
today = date.today()
return (
self.is_active()
and self.effective_date <= today <= self.expiry_date
)
3. API 設計: ポリシー操作用の REST API
API ファースト アーキテクチャとは、API が最初に設計されるものであり、その前に設計されることを意味します。 実装の。 API コントラクトは明確で、バージョン管理されており、API コントラクトと互換性がある必要があります。 業界の期待 (ACORD 標準、OpenAPI 3.1)。
以下は、ポリシー管理サービスの主要なエンドポイント API の構造です。
| 方法 | エンドポイント | 手術 | ボディ/レスポンス |
|---|---|---|---|
| 役職 | /v1/引用符 | 見積書を作成する | QuoteRequest -> QuoteResponse |
| 役職 | /v1/quotes/{quoteId}/bind | 見積もりをポリシーに変換する | BindRequest -> PolicyResponse |
| 得る | /v1/policies/{policyId} | リカバリポリシー | -> ポリシー対応 |
| 役職 | /v1/policies/{policyId}/endorse | 承認を発行する | エンドースメントリクエスト -> エンドースメントレスポンス |
| 役職 | /v1/policies/{policyId}/cancel | ポリシーをキャンセルする | CancelRequest -> PolicyResponse |
| 役職 | /v1/policies/{policyId}/renew | ポリシーを更新する | RenewalRequest -> PolicyResponse |
| 得る | /v1/policies/{policyId}/documents | 政策文書 | -> ドキュメントリスト応答 |
Pydantic 検証を使用した FastAPI 実装:
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from pydantic import BaseModel, Field, validator
from typing import Optional
from decimal import Decimal
from datetime import date
from uuid import UUID
import uuid
app = FastAPI(
title="Policy Management API",
version="1.0.0",
description="Cloud-native insurance policy management"
)
# --- Request/Response Models ---
class VehicleInfo(BaseModel):
plate: str = Field(..., pattern=r"^[A-Z]{2}[0-9]{3}[A-Z]{2}$|^[A-Z]{2}[0-9]{5}$")
make: str
model: str
year: int = Field(..., ge=1900, le=2026)
value: Decimal = Field(..., gt=0)
engine_cc: int
class QuoteRequest(BaseModel):
product_code: str = Field(..., pattern="^[A-Z_]+$")
holder_fiscal_code: str
vehicle: Optional[VehicleInfo]
effective_date: date
desired_coverages: list[str]
channel: str = Field(default="DIRECT")
@validator("effective_date")
def effective_date_not_past(cls, v: date) -> date:
if v < date.today():
raise ValueError("Effective date cannot be in the past")
return v
class CoverageQuote(BaseModel):
coverage_type: str
limit: Decimal
deductible: Decimal
annual_premium: Decimal
currency: str = "EUR"
class QuoteResponse(BaseModel):
quote_id: UUID
product_code: str
coverages: list[CoverageQuote]
total_annual_premium: Decimal
currency: str = "EUR"
valid_until: date
terms_version: str
class BindRequest(BaseModel):
payment_reference: str
holder_data: dict # Validated separately by KYC service
selected_coverages: list[str]
broker_code: Optional[str]
class PolicyResponse(BaseModel):
policy_id: UUID
policy_number: str
status: str
product_code: str
effective_date: date
expiry_date: date
annual_premium: Decimal
currency: str = "EUR"
created_at: str
# --- Service Layer ---
class PolicyService:
def __init__(self, repo, pricer, underwriter, event_bus):
self.repo = repo
self.pricer = pricer
self.underwriter = underwriter
self.event_bus = event_bus
async def create_quote(self, request: QuoteRequest) -> QuoteResponse:
# 1. Validate product exists
product = await self.repo.get_product(request.product_code)
if not product:
raise ValueError(f"Unknown product: {request.product_code}")
# 2. Score the risk
risk_score = await self.underwriter.score(request)
# 3. Calculate premium
coverages = await self.pricer.calculate(product, risk_score, request)
# 4. Persist quote with TTL
quote = await self.repo.save_quote(
product_code=request.product_code,
coverages=coverages,
valid_hours=72
)
return QuoteResponse(
quote_id=quote.id,
product_code=request.product_code,
coverages=coverages,
total_annual_premium=sum(c.annual_premium for c in coverages),
valid_until=quote.valid_until,
terms_version=product.terms_version
)
async def bind_policy(
self,
quote_id: UUID,
request: BindRequest,
background_tasks: BackgroundTasks
) -> PolicyResponse:
# 1. Retrieve and validate quote
quote = await self.repo.get_quote(quote_id)
if not quote or quote.is_expired():
raise ValueError("Quote expired or not found")
# 2. Final underwriting check
uw_result = await self.underwriter.final_check(quote, request.holder_data)
if uw_result.is_declined:
raise ValueError(f"Policy declined: {uw_result.reason}")
# 3. Create policy
policy = await self.repo.create_policy(quote, request, uw_result)
# 4. Async operations (non-blocking)
background_tasks.add_task(self._post_bind_workflow, policy)
return PolicyResponse(
policy_id=policy.id,
policy_number=policy.number,
status=policy.status,
product_code=policy.product_code,
effective_date=policy.effective_date,
expiry_date=policy.expiry_date,
annual_premium=policy.annual_premium.amount,
currency=policy.annual_premium.currency,
created_at=policy.created_at.isoformat()
)
async def _post_bind_workflow(self, policy):
"""Operazioni asincrone post-bind."""
await self.event_bus.publish("policy.bound", {"policy_id": str(policy.id)})
# Trigger: document generation, CRM update, payment setup
# --- API Routes ---
@app.post("/v1/quotes", response_model=QuoteResponse, status_code=201)
async def create_quote(
request: QuoteRequest,
service: PolicyService = Depends(get_policy_service)
):
try:
return await service.create_quote(request)
except ValueError as e:
raise HTTPException(status_code=422, detail=str(e))
@app.post("/v1/quotes/{quote_id}/bind", response_model=PolicyResponse, status_code=201)
async def bind_policy(
quote_id: UUID,
request: BindRequest,
background_tasks: BackgroundTasks,
service: PolicyService = Depends(get_policy_service)
):
try:
return await service.bind_policy(quote_id, request, background_tasks)
except ValueError as e:
raise HTTPException(status_code=422, detail=str(e))
4. 保険監査証跡のイベントソーシング
保険業界には厳格な監査とトレーサビリティ要件があり、保険契約のすべての変更が対象となります。 規制 (IVASS、EIOPA) および法的理由により、不変の方法で追跡する必要があります。イベント この要件に最適な調達とアーキテクチャ パターン。
状態を直接更新する代わりに、ポリシーに対する各操作によって不変イベントが生成されます。 現在の状態と過去のすべての出来事の予測:
from dataclasses import dataclass
from datetime import datetime
from typing import Union
from uuid import UUID
# --- Policy Domain Events ---
@dataclass(frozen=True)
class PolicyQuoted:
event_type: str = "PolicyQuoted"
policy_id: UUID = None
quote_id: UUID = None
product_code: str = None
holder_id: UUID = None
annual_premium: dict = None # {amount, currency}
occurred_at: datetime = None
@dataclass(frozen=True)
class PolicyBound:
event_type: str = "PolicyBound"
policy_id: UUID = None
policy_number: str = None
effective_date: str = None
expiry_date: str = None
channel: str = None
agent_code: str = None
occurred_at: datetime = None
@dataclass(frozen=True)
class PolicyEndorsed:
event_type: str = "PolicyEndorsed"
policy_id: UUID = None
endorsement_id: UUID = None
endorsement_type: str = None
premium_adjustment: dict = None
occurred_at: datetime = None
@dataclass(frozen=True)
class PolicyCancelled:
event_type: str = "PolicyCancelled"
policy_id: UUID = None
cancellation_reason: str = None
cancellation_date: str = None
refund_amount: dict = None
requested_by: str = None # "HOLDER", "INSURER", "REGULATOR"
occurred_at: datetime = None
PolicyEvent = Union[PolicyQuoted, PolicyBound, PolicyEndorsed, PolicyCancelled]
# --- Event Store ---
class PolicyEventStore:
"""Append-only store per eventi di polizza."""
def __init__(self, db_pool):
self.db = db_pool
async def append(self, policy_id: UUID, event: PolicyEvent) -> int:
"""Appende un evento e restituisce il nuovo sequence number."""
async with self.db.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO policy_events
(policy_id, event_type, event_data, occurred_at)
VALUES ($1, $2, $3, $4)
RETURNING sequence_number
""",
str(policy_id),
event.event_type,
event.__dict__,
event.occurred_at or datetime.utcnow()
)
return row["sequence_number"]
async def get_history(self, policy_id: UUID) -> list[dict]:
"""Recupera tutti gli eventi di una polizza in ordine cronologico."""
async with self.db.acquire() as conn:
rows = await conn.fetch(
"""
SELECT event_type, event_data, occurred_at, sequence_number
FROM policy_events
WHERE policy_id = $1
ORDER BY sequence_number ASC
""",
str(policy_id)
)
return [dict(r) for r in rows]
async def rebuild_state(self, policy_id: UUID) -> dict:
"""Ricostruisce lo stato corrente della polizza dagli eventi."""
events = await self.get_history(policy_id)
state = {}
for evt in events:
state = self._apply_event(state, evt)
return state
def _apply_event(self, state: dict, event: dict) -> dict:
event_type = event["event_type"]
data = event["event_data"]
if event_type == "PolicyQuoted":
return {
**state,
"status": "QUOTED",
"policy_id": data["policy_id"],
"product_code": data["product_code"],
"annual_premium": data["annual_premium"]
}
elif event_type == "PolicyBound":
return {
**state,
"status": "IN_FORCE",
"policy_number": data["policy_number"],
"effective_date": data["effective_date"],
"expiry_date": data["expiry_date"],
"channel": data["channel"]
}
elif event_type == "PolicyEndorsed":
# Aggiorna il premio
current_premium = state.get("annual_premium", {"amount": "0"})
adjustment = data.get("premium_adjustment", {"amount": "0"})
return {
**state,
"annual_premium": {
"amount": str(
float(current_premium["amount"])
+ float(adjustment["amount"])
),
"currency": current_premium.get("currency", "EUR")
}
}
elif event_type == "PolicyCancelled":
return {
**state,
"status": "CANCELLED",
"cancellation_reason": data["cancellation_reason"],
"cancellation_date": data["cancellation_date"]
}
return state
5. 承認管理: 中期的な変更
承認(中間変更)は頻繁かつデリケートな作業です。 ドライバー、車両の変更、住所の変更。すべての支持には次のような特徴があります 保険料 (比例配分) に影響を与えるため、更新されたドキュメントを生成する必要があります。
from decimal import Decimal
from datetime import date
class EndorsementCalculator:
"""Calcola l'aggiustamento di premio per un endorsement mid-term."""
def calculate_prorata_adjustment(
self,
current_annual_premium: Decimal,
new_annual_premium: Decimal,
endorsement_date: date,
policy_expiry: date
) -> Decimal:
"""
Calcola l'aggiustamento pro-rata del premio.
Restituisce importo positivo (addebitare) o negativo (rimborsare).
"""
remaining_days = (policy_expiry - endorsement_date).days
total_days = 365 # Semplificazione; usare calendario reale per polizze annuali
premium_difference = new_annual_premium - current_annual_premium
prorata_factor = Decimal(remaining_days) / Decimal(total_days)
return premium_difference * prorata_factor
class EndorsementService:
def __init__(self, repo, event_store, calculator, doc_service):
self.repo = repo
self.event_store = event_store
self.calculator = calculator
self.doc_service = doc_service
async def add_driver(
self,
policy_id: str,
driver_data: dict,
effective_date: date
) -> dict:
"""Aggiunge un conducente aggiuntivo alla polizza auto."""
# 1. Recupera polizza corrente
policy = await self.repo.get_policy(policy_id)
if not policy.is_active():
raise ValueError("Cannot endorse inactive policy")
# 2. Calcola nuovo premio con driver aggiuntivo
new_premium = await self._reprice_with_driver(policy, driver_data)
# 3. Calcola aggiustamento pro-rata
adjustment = self.calculator.calculate_prorata_adjustment(
current_annual_premium=policy.annual_premium.amount,
new_annual_premium=new_premium,
endorsement_date=effective_date,
policy_expiry=policy.expiry_date
)
# 4. Persisti endorsement
endorsement_id = await self.repo.create_endorsement(
policy_id=policy_id,
type="ADD_DRIVER",
effective_date=effective_date,
driver_data=driver_data,
premium_adjustment=adjustment
)
# 5. Pubblica evento
await self.event_store.append(policy_id, PolicyEndorsed(
policy_id=policy_id,
endorsement_id=endorsement_id,
endorsement_type="ADD_DRIVER",
premium_adjustment={"amount": str(adjustment), "currency": "EUR"}
))
# 6. Genera documento endorsement (async)
await self.doc_service.generate_endorsement_certificate(
policy_id, endorsement_id
)
return {
"endorsement_id": str(endorsement_id),
"premium_adjustment": float(adjustment),
"currency": "EUR",
"effective_date": effective_date.isoformat()
}
async def _reprice_with_driver(self, policy, driver_data: dict) -> Decimal:
"""Riprice la polizza includendo il nuovo conducente."""
# Logic depends on product rating algorithm
base_premium = policy.annual_premium.amount
driver_age = driver_data.get("age", 30)
# Young driver surcharge (semplificato)
if driver_age < 25:
surcharge = base_premium * Decimal("0.25")
elif driver_age < 30:
surcharge = base_premium * Decimal("0.10")
else:
surcharge = Decimal("0")
return base_premium + surcharge
6. マイクロサービス アーキテクチャ: ポリシー ドメインの分解
エンタープライズ ポリシー管理システムは通常、次のマイクロサービスに分解されます。 それぞれに独自の限定されたコンテキストと明確に定義された責任があります。
| サービス | 責任 | データベース | コミュニケーション |
|---|---|---|---|
| 見積サービス | 見積り、評価 | Redis (キャッシュ) + PostgreSQL | 休憩 + イベント |
| 政策サービス | ライフサイクルポリシー、承認 | PostgreSQL(イベントストア) | 休憩 + イベント |
| 課金サービス | 支払い、分割払い、返金 | PostgreSQL | イベント |
| ドキュメントサービス | PDF の生成、アーカイブ | S3 / アズールブロブ | イベント (非同期) |
| 通知サービス | 電子メール、SMS、プッシュ | Redis (キュー) | イベント (非同期) |
| 引受サービス | リスクスコアリング、承認/拒否 | PostgreSQL + ML ストア | レスト(同期) |
7. ブローカーポータルの統合
保険ブローカーは専用のポータルを介してシステムにアクセスします。 API統合 認証、ブローカークライアントデータへのアクセス、手数料、および 集計レポート。以下はブローカー用の OAuth2 ミドルウェアの例です。
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
import httpx
BROKER_PORTAL_SCOPES = [
"policy:read",
"policy:quote",
"policy:bind",
"policy:endorse",
"commission:read"
]
class BrokerAuth:
"""Middleware per autenticazione e autorizzazione broker."""
def __init__(self, jwks_url: str, audience: str):
self.jwks_url = jwks_url
self.audience = audience
self._jwks_cache = None
async def get_jwks(self) -> dict:
if not self._jwks_cache:
async with httpx.AsyncClient() as client:
resp = await client.get(self.jwks_url)
self._jwks_cache = resp.json()
return self._jwks_cache
async def verify_broker_token(
self,
credentials: HTTPAuthorizationCredentials = Depends(HTTPBearer())
) -> dict:
"""Verifica JWT token del broker e restituisce claims."""
try:
jwks = await self.get_jwks()
payload = jwt.decode(
credentials.credentials,
jwks,
algorithms=["RS256"],
audience=self.audience
)
broker_code = payload.get("broker_code")
if not broker_code:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Missing broker_code claim"
)
return payload
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid token: {str(e)}"
)
# Dependency injection
broker_auth = BrokerAuth(
jwks_url="https://auth.insurer.com/.well-known/jwks.json",
audience="broker-portal-api"
)
@app.get("/v1/broker/{broker_code}/portfolio")
async def get_broker_portfolio(
broker_code: str,
broker_claims: dict = Depends(broker_auth.verify_broker_token),
policy_service: PolicyService = Depends(get_policy_service)
):
"""Restituisce il portfolio polizze di un broker."""
# Verifica che il broker acceda solo ai propri dati
if broker_claims["broker_code"] != broker_code:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied: broker_code mismatch"
)
policies = await policy_service.get_policies_by_broker(broker_code)
return {
"broker_code": broker_code,
"total_policies": len(policies),
"in_force_count": sum(1 for p in policies if p["status"] == "IN_FORCE"),
"total_gwp": sum(p["annual_premium"] for p in policies),
"policies": policies[:50] # Paginare in produzione
}
8. 自動更新: バッチ処理とアウトリーチ
更新は重要な業務です。更新時に顧客を失うと高額な費用がかかります(コスト) 顧客獲得は維持率と比較して 5 ~ 7 倍)。最新のシステムは、 パーソナライズされたアウトリーチと動的な価格設定による更新プロセス。
import asyncio
from datetime import date, timedelta
from dataclasses import dataclass
@dataclass
class RenewalCandidate:
policy_id: str
holder_email: str
expiry_date: date
current_premium: float
renewal_premium: float
days_to_expiry: int
class RenewalOrchestrator:
"""Gestisce il processo automatico di rinnovo polizze."""
RENEWAL_WINDOWS = [60, 45, 30, 15, 7] # Giorni prima della scadenza
def __init__(self, policy_repo, pricer, notification_svc, event_bus):
self.repo = policy_repo
self.pricer = pricer
self.notification = notification_svc
self.event_bus = event_bus
async def run_daily_renewal_batch(self) -> dict:
"""Job giornaliero: identifica polizze da rinnovare e avvia outreach."""
today = date.today()
stats = {"processed": 0, "quoted": 0, "errors": 0}
for days_ahead in self.RENEWAL_WINDOWS:
target_date = today + timedelta(days=days_ahead)
candidates = await self.repo.get_expiring_policies(target_date)
tasks = [
self._process_renewal_candidate(c, days_ahead)
for c in candidates
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
stats["processed"] += 1
if isinstance(result, Exception):
stats["errors"] += 1
else:
stats["quoted"] += 1
return stats
async def _process_renewal_candidate(
self,
policy_id: str,
days_to_expiry: int
) -> dict:
policy = await self.repo.get_policy(policy_id)
# Calcola nuovo premio di rinnovo
renewal_premium = await self.pricer.calculate_renewal(policy)
premium_change_pct = (
(renewal_premium - float(policy["annual_premium"])) /
float(policy["annual_premium"]) * 100
)
# Salva preventivo rinnovo
renewal_quote = await self.repo.save_renewal_quote(
policy_id=policy_id,
renewal_premium=renewal_premium,
valid_days=days_to_expiry + 30
)
# Scegli template notifica in base ai giorni rimanenti
if days_to_expiry == 60:
template = "renewal_early_notice"
elif days_to_expiry == 30:
template = "renewal_30_day"
elif days_to_expiry == 7:
template = "renewal_final_reminder"
else:
template = "renewal_standard"
# Invia notifica
await self.notification.send_renewal_outreach(
holder_email=policy["holder"]["email"],
template=template,
context={
"policy_number": policy["policy_number"],
"expiry_date": policy["expiry_date"],
"renewal_premium": renewal_premium,
"premium_change_pct": premium_change_pct,
"renewal_link": f"https://portal.insurer.com/renew/{renewal_quote['id']}"
}
)
# Pubblica evento
await self.event_bus.publish("policy.renewal_quoted", {
"policy_id": policy_id,
"renewal_quote_id": renewal_quote["id"],
"days_to_expiry": days_to_expiry
})
return renewal_quote
9. テスト: 複雑な保険システムへのアプローチ
保険システムには複雑なビジネス ロジックがあり、ルールへの依存度が高く、 それらは時間の経過とともに変化します。テストでは、ビジネス ルール、統合の単体テストをカバーする必要があります エンドツーエンドのフローのテストと、ブローカーに公開される API のコントラクト テストです。
import pytest
from decimal import Decimal
from datetime import date, timedelta
from unittest.mock import AsyncMock, MagicMock
class TestEndorsementCalculator:
"""Unit test per il calcolo pro-rata degli endorsement."""
def setup_method(self):
self.calc = EndorsementCalculator()
def test_prorata_increase_with_full_year_remaining(self):
adjustment = self.calc.calculate_prorata_adjustment(
current_annual_premium=Decimal("600"),
new_annual_premium=Decimal("750"),
endorsement_date=date(2025, 1, 1),
policy_expiry=date(2026, 1, 1)
)
# 150 difference * (365/365) = 150
assert abs(adjustment - Decimal("150")) < Decimal("1")
def test_prorata_decrease_at_midterm(self):
adjustment = self.calc.calculate_prorata_adjustment(
current_annual_premium=Decimal("600"),
new_annual_premium=Decimal("500"),
endorsement_date=date(2025, 7, 1), # 6 mesi dopo inizio
policy_expiry=date(2026, 1, 1)
)
# Aggiustamento negativo (rimborso)
assert adjustment < Decimal("0")
def test_endorsement_on_last_day_is_zero(self):
today = date.today()
adjustment = self.calc.calculate_prorata_adjustment(
current_annual_premium=Decimal("600"),
new_annual_premium=Decimal("800"),
endorsement_date=today,
policy_expiry=today # Scadenza oggi
)
assert adjustment == Decimal("0")
@pytest.mark.asyncio
class TestPolicyService:
"""Integration test del service layer."""
async def test_create_quote_success(self):
# Arrange
mock_repo = AsyncMock()
mock_repo.get_product.return_value = MagicMock(terms_version="v2.1")
mock_pricer = AsyncMock()
mock_pricer.calculate.return_value = [
CoverageQuote(
coverage_type="LIABILITY",
limit=Decimal("1000000"),
deductible=Decimal("0"),
annual_premium=Decimal("400")
)
]
mock_underwriter = AsyncMock()
mock_underwriter.score.return_value = MagicMock(score=0.3)
mock_repo.save_quote.return_value = MagicMock(
id="q-123",
valid_until=date.today() + timedelta(days=72)
)
service = PolicyService(
repo=mock_repo,
pricer=mock_pricer,
underwriter=mock_underwriter,
event_bus=AsyncMock()
)
request = QuoteRequest(
product_code="AUTO_RC",
holder_fiscal_code="CLDFRC80A01H501U",
effective_date=date.today() + timedelta(days=1),
desired_coverages=["LIABILITY"]
)
# Act
result = await service.create_quote(request)
# Assert
assert result.total_annual_premium == Decimal("400")
assert result.terms_version == "v2.1"
mock_underwriter.score.assert_awaited_once()
10. ベストプラクティスとアンチパターン
アンチパターン: 神オブジェクト ポリシー
200 列を持つ単一の「ポリシー」テーブルを作成しないでください。豊富なドメイン モデルを使用して、 値オブジェクト (金額、カバレッジ、住所) を明確にし、境界を集計します。コラム 「coverage_data JSONB」はドメインモデルではありません。
アンチパターン: 監査証跡のない可変状態
履歴を追跡せずにポリシーに対して直接 UPDATE を実行しないでください。保険部門 IVASS の完全な監査が必要です。イベント ソーシングまたは少なくとも履歴テーブルを使用します。
ベスト プラクティス: 重要な操作の冪等性
バインド操作と削除操作は冪等である必要があります。を使用します。
idempotency_key 重要なリクエストのヘッダー。結果を永続化する
同じキーが再度送信された場合はそれを返します。
ベスト プラクティス: Pact を使用した契約テスト
ブローカーやアグリゲーターとの統合には、契約テスト (Pact) を使用します。すべての消費者 期待される契約を定義します。プロバイダーはそれを尊重していることを確認します。これ 重要な API の重大な変更を防ぎます。
結論
API ファーストのクラウドネイティブなポリシー管理システムにはモデリングへの投資が必要です インフラストラクチャ以前にドメインの重要性を認識します。リッチ ドメイン モデル (集約、値オブジェクト、 イベント)、ACORD 互換の API 設計、監査証跡のイベント ソーシングが鍵となります その上に構築するもの。
保険会社の 74% がコア システムを最新化中: InsurTech との競争 デジタル ネイティブにより、この変革はもはやオプションではなくなりました。この記事で説明されているパターンは、 記事 - イベント ソーシング、マイクロサービス、API ファースト、エンドースメント管理 - が基礎 スケーラブルで保守可能な保険プラットフォームを構築するための技術。
InsurTech シリーズの今後の記事
- テレマティクス パイプライン: 大規模な UBI データ処理
- AI 引受業務: 特徴エンジニアリングとリスク スコアリング
- 請求の自動化: コンピューター ビジョンと NLP
- 不正行為の検出: グラフ分析と行動シグナル







