Scala でのデジタル署名と文書認証
2024 年、世界の電子署名市場は 50 億ドルを超え、 2030 年までは年間 30% 以上の成長が見込まれています。しかし、市場の数字を超えて、 デジタル署名の大規模な導入により、何百万もの署名をどのように管理するかという具体的な技術的課題が生じます。 複数の管轄区域で法的有効性を維持しながら、1 日あたり何件の署名が必要ですか?実装方法 ヨーロッパの eIDAS 2.0 およびその他の世界の同等の規制に準拠するスケーラブルな PKI システムですか?
この記事では、エンタープライズ グレードのデジタル署名システムを構築します。 PKI 証明書の生成と管理、署名ワークフローの実装 マルチステップ、最大 RFC 3161 準拠のタイムスタンプおよび長期アーカイブ ヨーロッパの基準に従って。コードは Python と TypeScript であり、統合例も含まれています Angular アプリケーション用。
何を学ぶか
- eIDAS 2.0 に準拠した電子署名の種類 (SES、AES、QES)
- PKI アーキテクチャ: CA、RA、X.509 証明書、CRL、OCSP
- PyHanko (Python) を使用した PDF 署名の実装
- 存在証明のための RFC 3161 準拠のタイムスタンプ
- マシン状態を使用したマルチパーティ署名ワークフロー
- DocuSign/Adobe Sign API との Angular 統合
- 長期アーカイブ (LTV - 長期検証)
eIDAS 2.0に基づく電子署名レベル
2024 年 5 月 20 日に発効した eIDAS 規制 (EU) 2024/1183 では、次のように定義されています。 異なるセキュリティ要件と法的価値を持つ 3 つのレベルの電子署名:
| タイプ | 頭字語 | 要件 | 法定価値 | 使用事例 |
|---|---|---|---|---|
| 簡易電子署名 | SES | 署名者に関連する電子データ | ベース | クリックラップ、メール承認 |
| 高度な電子署名 | AES | 署名者を一意に参照可能であり、署名者の管理下にあるデータを使用して作成されます。 | 中くらい | 商業契約、人事 |
| 適格な電子署名 | QES | 適格証明書 + QSCD (適格署名作成デバイス) | 同等の手書き署名 | 公正証書、不動産契約書 |
eIDAS 2.0 とデジタル ID ウォレット
2026 年 12 月から、EU 加盟 27 か国すべてが国民に以下のサービスを提供する必要があります。 EU デジタル ID ウォレット (EUDI ウォレット) QES署名を許可します モバイルデバイス上で。 This will fundamentally change user onboarding デジタル署名システムの場合: USB ハードウェア トークンに別れを告げ、スマートフォンにこんにちは。
デジタル署名のための PKI アーキテクチャ
公開キー基盤 (PKI) とそれが保護する暗号化インフラストラクチャ デジタル署名の信頼性と完全性。基本的なコンポーネントは次のとおりです。
- ルート CA (認証局): 階層の信頼されたアンカー。 中間証明書を発行します。セキュリティを最大限に高めるには、オフライン (エアギャップ) にする必要があります。
- 中間 CA: エンドユーザーに証明書を発行する運用 CA。
- RA (登録局): 申請者の身元を確認します 証明書の発行を承認する前に。
- OCSP レスポンダー: 証明書が有効かどうかを確認するリアルタイム サービス 取り消されました (CRL の代替)。
- TSA (タイムスタンプ局): RFC 3161 に準拠したタイムスタンプを出力します。
from cryptography import x509
from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.backends import default_backend
import datetime
import uuid
class PKIManager:
"""
Gestore PKI per la creazione di certificati X.509 self-signed e firmati da CA.
Per uso in sviluppo/test. In produzione usare una CA qualificata eIDAS.
"""
def generate_key_pair(self, key_size: int = 4096):
"""Genera coppia di chiavi RSA."""
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=default_backend()
)
return private_key, private_key.public_key()
def create_root_ca(self, subject_name: str, validity_years: int = 20):
"""
Crea un certificato Root CA self-signed.
Normalmente questa operazione viene eseguita offline.
"""
private_key, public_key = self.generate_key_pair()
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "IT"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, subject_name),
x509.NameAttribute(NameOID.COMMON_NAME, f"{subject_name} Root CA"),
])
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=365 * validity_years)
)
.add_extension(
x509.BasicConstraints(ca=True, path_length=1),
critical=True
)
.add_extension(
x509.KeyUsage(
digital_signature=True, key_cert_sign=True,
crl_sign=True, content_commitment=False,
key_encipherment=False, data_encipherment=False,
key_agreement=False, encipher_only=False, decipher_only=False
),
critical=True
)
.sign(private_key, hashes.SHA256(), default_backend())
)
return cert, private_key
def create_end_entity_certificate(
self,
subject_cn: str,
subject_email: str,
ca_cert,
ca_private_key,
validity_days: int = 365
):
"""
Crea un certificato end-entity firmato dalla CA.
Usato per la firma digitale dei documenti.
"""
user_private_key, user_public_key = self.generate_key_pair(key_size=2048)
subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "IT"),
x509.NameAttribute(NameOID.COMMON_NAME, subject_cn),
x509.NameAttribute(NameOID.EMAIL_ADDRESS, subject_email),
])
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(ca_cert.subject)
.public_key(user_public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=validity_days)
)
.add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True
)
.add_extension(
x509.ExtendedKeyUsage([
ExtendedKeyUsageOID.EMAIL_PROTECTION,
# OID per firma documenti: 1.2.840.113549.1.9.15 (non standard)
]),
critical=False
)
.add_extension(
x509.SubjectAlternativeName([
x509.RFC822Name(subject_email),
]),
critical=False
)
.sign(ca_private_key, hashes.SHA256(), default_backend())
)
return cert, user_private_key
PyHanko で PDF に署名
PyHanko は、PDF ドキュメントにデジタル署名するためのリファレンス Python ライブラリです PDF/A および PAdES (PDF Advanced Electronic Signature) 標準に準拠しています。サポート 目に見えない署名、見える署名、インタラクティブな署名フィールド、統合されたタイムスタンプ。
from pyhanko.sign import signers, fields
from pyhanko.sign.fields import MDPPerm
from pyhanko import stamp
from pyhanko.pdf_utils.incremental_writer import IncrementalPdfFileWriter
from pyhanko.sign.timestamps import HTTPTimeStamper
from pyhanko.sign.validation import validate_pdf_signature
from pyhanko.pdf_utils.reader import PdfFileReader
import hashlib
from io import BytesIO
class PDFSigningService:
"""
Servizio di firma PDF con supporto PAdES e timestamping RFC 3161.
"""
def __init__(
self,
cert_pem_path: str,
key_pem_path: str,
ca_chain_pem_path: str,
tsa_url: str = "http://timestamp.digicert.com"
):
# Carica certificato e chiave privata
self.signer = signers.SimpleSigner.load(
cert_file=cert_pem_path,
key_file=key_pem_path,
ca_chain_files=[ca_chain_pem_path]
)
self.timestamper = HTTPTimeStamper(tsa_url)
def sign_document(
self,
input_pdf_bytes: bytes,
reason: str = "Approvazione contratto",
location: str = "Milano, Italia",
visible: bool = True,
page: int = 0,
add_timestamp: bool = True
) -> bytes:
"""
Firma un documento PDF e opzionalmente aggiunge un timestamp qualificato.
Restituisce il PDF firmato come bytes.
"""
writer = IncrementalPdfFileWriter(BytesIO(input_pdf_bytes))
if visible:
# Crea campo firma visibile nell'angolo in basso a destra dell'ultima pagina
fields.append_signature_field(
writer,
sig_field_spec=fields.SigFieldSpec(
sig_field_name="Signature1",
on_page=page,
box=(400, 50, 560, 110) # (x1, y1, x2, y2) in punti
)
)
# Configurazione del digest e firma
meta = signers.PdfSignatureMetadata(
field_name="Signature1",
reason=reason,
location=location,
certify=True,
certify_perm=MDPPerm.NO_CHANGES # impedisce modifiche post-firma
)
sign_result = signers.sign_pdf(
writer,
signature_meta=meta,
signer=self.signer,
timestamper=self.timestamper if add_timestamp else None,
in_place=False
)
return sign_result.getvalue()
def validate_signature(self, signed_pdf_bytes: bytes) -> dict:
"""
Valida tutte le firme in un documento PDF.
Restituisce un report strutturato per ogni firma.
"""
reader = PdfFileReader(BytesIO(signed_pdf_bytes))
validation_results = []
for sig_obj in reader.embedded_signatures:
val_status = validate_pdf_signature(sig_obj)
validation_results.append({
'field_name': sig_obj.field_name,
'signer_name': str(val_status.signing_cert.subject),
'signing_time': str(val_status.signer_reported_dt),
'timestamp_valid': val_status.timestamp_validity.valid if val_status.timestamp_validity else None,
'cert_valid': val_status.signing_cert_validity.valid,
'modification_on_unchanged': val_status.modification_level.name,
'intact': val_status.bottom_line # True = firma integra e valida
})
return {
'total_signatures': len(validation_results),
'all_valid': all(r['intact'] for r in validation_results),
'signatures': validation_results,
'document_hash_sha256': hashlib.sha256(signed_pdf_bytes).hexdigest()
}
ステートマシンを使用したマルチパーティ署名ワークフロー
現実のシナリオでは、契約では複数の当事者が 1 つの注文に署名する必要があることがよくあります。 正確に言えば、最初に社内法務マネージャー、次にクライアント、そして最後に公証人です。 このワークフローを堅牢に処理するためにステート マシンを実装します。
from enum import Enum, auto
from dataclasses import dataclass, field
from typing import List, Optional, Callable
from datetime import datetime
import uuid
class SignatureStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
REJECTED = "rejected"
EXPIRED = "expired"
@dataclass
class SignatureRequest:
"""Una richiesta di firma per un singolo firmatario."""
request_id: str
signer_email: str
signer_name: str
order: int # ordine di firma (1, 2, 3...)
status: SignatureStatus = SignatureStatus.PENDING
signed_at: Optional[datetime] = None
rejection_reason: Optional[str] = None
@dataclass
class SigningWorkflow:
"""
Workflow di firma multi-party con ordinamento sequenziale.
"""
workflow_id: str
document_id: str
document_name: str
created_at: datetime
expires_at: datetime
signers: List[SignatureRequest]
current_signer_order: int = 1
status: SignatureStatus = SignatureStatus.IN_PROGRESS
audit_log: List[dict] = field(default_factory=list)
def get_current_signer(self) -> Optional[SignatureRequest]:
"""Restituisce il firmatario corrente."""
for signer in self.signers:
if signer.order == self.current_signer_order:
return signer
return None
def record_signature(
self,
signer_email: str,
signed_pdf_bytes: bytes,
validation_report: dict
) -> bool:
"""
Registra una firma e avanza il workflow al prossimo firmatario.
Returns True se il workflow e completato.
"""
current = self.get_current_signer()
if not current or current.signer_email != signer_email:
raise ValueError(f"Non e il turno di {signer_email} di firmare")
if not validation_report.get('all_valid'):
raise ValueError("Firma non valida secondo il report di validazione")
# Aggiorna stato del firmatario
current.status = SignatureStatus.COMPLETED
current.signed_at = datetime.utcnow()
# Log immutabile dell'evento
self.audit_log.append({
'event': 'signature_recorded',
'signer': signer_email,
'order': self.current_signer_order,
'timestamp': datetime.utcnow().isoformat(),
'doc_hash': validation_report.get('document_hash_sha256')
})
# Avanza al prossimo firmatario
self.current_signer_order += 1
next_signer = self.get_current_signer()
if next_signer is None:
# Tutti hanno firmato: workflow completato
self.status = SignatureStatus.COMPLETED
return True
# Notifica il prossimo firmatario
next_signer.status = SignatureStatus.IN_PROGRESS
return False
def reject(self, signer_email: str, reason: str):
"""Il firmatario corrente rifiuta di firmare."""
current = self.get_current_signer()
if current and current.signer_email == signer_email:
current.status = SignatureStatus.REJECTED
current.rejection_reason = reason
self.status = SignatureStatus.REJECTED
self.audit_log.append({
'event': 'signature_rejected',
'signer': signer_email,
'reason': reason,
'timestamp': datetime.utcnow().isoformat()
})
# Factory per creare workflow
def create_signing_workflow(
document_id: str,
document_name: str,
signers_ordered: List[dict],
validity_days: int = 30
) -> SigningWorkflow:
"""
Crea un workflow di firma dal documento e dalla lista di firmatari.
signers_ordered: [{'email': '...', 'name': '...'}, ...] in ordine di firma
"""
requests = [
SignatureRequest(
request_id=str(uuid.uuid4()),
signer_email=s['email'],
signer_name=s['name'],
order=idx + 1
)
for idx, s in enumerate(signers_ordered)
]
requests[0].status = SignatureStatus.IN_PROGRESS # Primo firmatario attivo
return SigningWorkflow(
workflow_id=str(uuid.uuid4()),
document_id=document_id,
document_name=document_name,
created_at=datetime.utcnow(),
expires_at=datetime.utcnow().replace(
day=datetime.utcnow().day + validity_days
),
signers=requests
)
Signature API との Angular 統合
Angular フロントエンド側からは、署名サービスとの統合で次の処理を行う必要があります。 リダイレクト フロー (ユーザーは署名プラットフォームに誘導されてから戻ります)、 または iframe/SDK を介した直接埋め込み。
// signature.service.ts
import { Injectable, inject } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs';
export interface SigningSession {
sessionId: string;
signingUrl: string; // URL redirect alla piattaforma di firma
expiresAt: string;
documentId: string;
}
export interface SignatureStatus {
workflowId: string;
status: 'pending' | 'in_progress' | 'completed' | 'rejected';
completedSigners: number;
totalSigners: number;
nextSigner?: string;
completedAt?: string;
}
@Injectable({ providedIn: 'root' })
export class SignatureService {
private readonly http = inject(HttpClient);
private readonly apiBase = '/api/v1/signatures';
initiateSignature(documentId: string, signerEmail: string): Observable<SigningSession> {
return this.http.post<SigningSession>(
`{this.apiBase}/sessions`,
{ documentId, signerEmail }
);
}
getWorkflowStatus(workflowId: string): Observable<SignatureStatus> {
return this.http.get<SignatureStatus>(
`{this.apiBase}/workflows/{workflowId}/status`
);
}
downloadSignedDocument(workflowId: string): Observable<Blob> {
return this.http.get(
`{this.apiBase}/workflows/{workflowId}/document`,
{ responseType: 'blob' }
);
}
}
// document-signing.component.ts
import { Component, input, inject, signal } from '@angular/core';
import { SignatureService, SignatureStatus } from './signature.service';
@Component({
selector: 'app-document-signing',
template: `
<div class="signing-container">
@if (status() === 'idle') {
<button (click)="startSigning()">Firma il Documento</button>
}
@if (status() === 'loading') {
<div class="spinner">Preparazione firma in corso...</div>
}
@if (status() === 'completed') {
<div class="success">
<p>Documento firmato con successo!</p>
<button (click)="downloadSigned()">Scarica PDF firmato</button>
</div>
}
</div>
`
})
export class DocumentSigningComponent {
documentId = input.required<string>();
workflowId = input.required<string>();
private sigService = inject(SignatureService);
status = signal<'idle' | 'loading' | 'redirect' | 'completed' | 'error'>('idle');
startSigning(): void {
this.status.set('loading');
this.sigService.initiateSignature(this.documentId(), 'user@example.com').subscribe({
next: (session) => {
// Redirect alla piattaforma di firma (DocuSign, YouSign, etc.)
window.location.href = session.signingUrl;
},
error: () => this.status.set('error')
});
}
downloadSigned(): void {
this.sigService.downloadSignedDocument(this.workflowId()).subscribe(blob => {
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = 'documento-firmato.pdf';
a.click();
URL.revokeObjectURL(url);
});
}
}
長期検証 (LTV) とアーカイブ
デジタル署名は現在だけでなく、10 年後、20 年後にも検証可能でなければなりません。 元の証明書の有効期限が切れたか、暗号化アルゴリズムが失効した可能性がある場合 時代遅れの。そこには 長期検証 (LTV) この問題を解決します 検証に必要なすべての情報を署名された文書に組み込む 将来: 証明書チェーン、OCSP 応答、およびタイムスタンプ。
長期アーカイブ標準
- PAdES-LTV: 証明書が埋め込まれた PDF 署名と OCSP 応答
- XAdES-A: 定期的なアーカイブ タイムスタンプを含む XML 署名
- CADES-A: アーカイブタイムスタンプを含む CMS 署名
- ASiC-E: 署名 + ドキュメント + メタデータを含む ZIP コンテナー
有効期限が 10 年を超える法的文書の場合は、タイムスタンプを再押しすることが推奨されます。 SHA-256 が使用される前に、定期的 (5 年ごと) に暗号強度を更新します。 時代遅れの。
セキュリティに関する考慮事項
重要な安全ポイント
- 秘密キーの保護: 秘密キーを生成または保存しないでください アプリケーションコード内で。 HSM (ハードウェア セキュリティ モジュール) またはクラウド KMS (AWS KMS、 Azure Key Vault、Google Cloud KMS)。
- 証明書の失効: OCSP ステープル留めを実装して回避します シグネチャごとに OCSP をリアルタイムでチェックします。これはパフォーマンスにとって重要です。
- 不変の監査ログ: 各ワークフロー イベント (署名、拒否、有効期限) 改ざんを検出するには、ハッシュ チェーンを使用して追加専用のログに書き込む必要があります。
- 署名前の文書の検証: PDF に次のものが含まれていないことを確認してください。 表示されるコンテンツを変更する可能性のある埋め込み JavaScript またはインタラクティブ フォーム。
結論
スケーラブルで法的に有効なデジタル署名システムを実装するには、多くのことが必要です PDF に単に「署名を追加する」だけではありません。ライフサイクル管理 PKI、eIDAS 2.0 準拠、マルチパーティ ワークフロー、タイムスタンプ、および長期 検証は、最初から一緒に設計する必要があるコンポーネントです。
この記事のコードは、システムを構築するための強固な基盤を提供します。 生産。重要な申請(公正証書、不動産契約書、書類など)の場合 企業)の場合は、認定 TSP(トラスト サービス プロバイダー)との統合を検討してください。 複雑な規制を管理するNamirial、InfoCert、Aruba、DocuSignなど eIDAS 2.0 をあなたに。
リーガルテックとAIシリーズ
- 契約分析のための NLP: OCR から理解まで
- 電子証拠開示プラットフォームのアーキテクチャ
- 動的ルールエンジンによるコンプライアンスの自動化
- 法的合意のためのスマートコントラクト: Solidity と Vyper
- 生成 AI による法的文書の要約
- 検索エンジンの法則: ベクトル埋め込み
- Scala でのデジタル署名と文書認証 (この記事)
- データプライバシーとGDPRコンプライアンスシステム
- 法務 AI アシスタント (法務副操縦士) の構築
- LegalTech データ統合パターン







