e-Discovery 플랫폼 아키텍처: 수집, 처리 및 AI 검토
2024년에는 플랫폼에 인공지능 도입 e-디스커버리 법적 단 1년 만에 19%에서 79%로 뛰어올랐습니다. 이 데이터는 통계적 호기심이 아닙니다. 로펌, 기업, 사법당국이 업무를 관리하는 방식의 구조적 변화 민사 및 형사 소송에서 문서 증거. 소송에 수백만 개의 이메일, 메시지가 관련되는 경우 Slack, SharePoint 문서 및 로그 파일, 수동 검토의 전통적인 모델 법률 보조원은 더 이상 경제적으로나 물류적으로 지속 가능하지 않습니다.
L'e-디스커버리 (전자 증거개시) 및 소송 당사자가 진행하는 프로세스 법적 식별, 수집, 보존, 처리, 검토 및 문서 증거 생성 전자 형식. 미국에서는 연방민사소송규칙(FRCP), 유럽에서는 규제를 받습니다. 국가 차원의 유사한 프레임워크에서. 최신 플랫폼은 페타바이트 규모의 데이터를 처리해야 합니다. 법적으로 유효한 관리 체계를 존중하고 검토할 문서의 양을 줄입니다. 매우 높은 회상률 유지: 관련 문서가 손실될 수 없습니다.
이 문서에서는 엔터프라이즈급 e-Discovery 플랫폼의 전체 아키텍처를 구축합니다. 에서 대량 섭취 이기종 문서의 분산 처리, 에서 중복 제거 al 예측 코딩 AI 모델로 에EDRM XML 형식으로 내보내기. 실제 Python 코드 예제 및 분석 포함 시장의 주요 플랫폼 중 하나입니다.
이 기사에서 배울 내용
- EDRM(Electronic Discovery Reference Model) 모델 및 해당 단계
- 대규모 수집을 위한 마이크로서비스 아키텍처: Kafka, Elasticsearch, MinIO
- 처리 파이프라인: 텍스트 추출, 메타데이터, MD5/SHA 중복 제거 및 거의 중복 제거
- TAR(기술 지원 검토) 및 CAL(지속적 활성 학습)
- scikit-learn 및 문장 변환기를 사용한 예측 코딩
- CoC 관리 및 불변의 감사 추적
- 플랫폼 비교: Relativity, DISCO, Everlaw, Logikcull
- EDRM XML 내보내기 및 케이스 관리 시스템과의 통합
LegalTech 및 AI 시리즈에서의 위치
| # | Articolo | 집중하다 |
|---|---|---|
| 1 | 계약 분석을 위한 NLP | OCR, NER, 조항 분류 |
| 2 | 현재 위치 — e-Discovery 아키텍처 | 수집, 처리, AI 검토 |
| 3 | 규정 준수 자동화 | 규칙 엔진 및 RegTech |
| 4 | 스마트 계약 | Solidity, Vyper, 집행 가능성 |
| 5 | Generative AI를 이용한 요약 | LLM, RAG, 출력 검증 |
| 6 | 법학 검색 엔진 | 벡터 임베딩 및 의미론적 검색 |
| 7 | 디지털 서명 및 eIDAS 2.0 | PKI, 타임스탬프, 워크플로 |
| 8 | GDPR 규정 준수 시스템 | 개인정보 보호 설계, DSR, 데이터 매핑 |
| 9 | 법률 AI 부조종사 | 법적 말뭉치, 가드레일의 RAG |
| 10 | 데이터 통합 LegalTech | ECLI, API 법원 시스템, XBRL |
EDRM 모델: e-Discovery를 위한 참조 프레임워크
L'EDRM(전자 검색 참조 모델) 그리고 이를 설명하는 사실상의 표준 전자 증거개시 프로세스의 단계. 2005년에 개발되어 지속적으로 업데이트되는 모델입니다. 모든 엔터프라이즈 플랫폼이 지원해야 하는 9개의 순차적 단계를 정의합니다.
| EDRM 단계 | 설명 | 기술 구성 요소 |
|---|---|---|
| 1. 정보 거버넌스 | 보존 정책, 데이터 분류, 데이터 맵 | MDM, 정책 엔진, CMDB |
| 2. 신분증 | 잠재적인 관리인 및 관련 데이터 소스 찾기 | 크롤러, 디렉터리 검색, LDAP 쿼리 |
| 3.보존 | 법적 보존: 훼손 방지를 위해 데이터 동결 | 보류 관리, 불변 스토리지, 알림 워크플로우 |
| 4.컬렉션 | 보관 연속성을 통한 포렌식 수집 | 포렌식 수집기, 해시 검증, 보관 로그 |
| 5. 처리 | 텍스트 추출, 메타데이터, 중복 제거, NIST 필터링 | Apache Tika, Elasticsearch 수집 파이프라인 |
| 6.검토 | 관련성/권한 분류, TAR/CAL | 예측코딩, 능동학습, 리뷰 플랫폼 |
| 7. 분석 | 패턴, 타임라인, 엔터티 네트워크, 주제 모델링 | 그래프 분석, NLP, LDA/BER주제 |
| 8. 생산 | 합의된 형식으로 내보내기(TIFF, 기본, PDF) | EDRM XML 내보내기, Bates 번호 매기기, 교정 엔진 |
| 9. 프리젠테이션 | 재판 프리젠테이션, 증언, 시각적 타임라인 | 재판관, 전시관리 |
기업 e-Discovery를 위한 마이크로서비스 아키텍처
최신 e-Discovery 플랫폼은 단일체가 될 수 없습니다. 데이터 볼륨은 소수에 따라 다양합니다. 큰 원인의 경우 기가바이트에서 수백 테라바이트에 이릅니다. 아키텍처는 다음과 같아야합니다. 탄력적으로 확장 가능, 내결함성 및 불변의 감사 추적을 보장합니다. 증거에 대한 적격 요건을 충족합니다. 통합 아키텍처 패턴은 다음과 같은 요소를 결합합니다. 이벤트 스트리밍, 객체 스토리지 및 분산 검색 엔진.
# docker-compose.yml per ambiente e-Discovery locale
version: '3.9'
services:
# Message broker per ingestion asincrona
kafka:
image: confluentinc/cp-kafka:7.5.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_NUM_PARTITIONS: 12
depends_on: [zookeeper]
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
# Object storage per documenti originali e derivati
minio:
image: minio/minio:RELEASE.2024-01-16T16-07-38Z
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: ediscovery
MINIO_ROOT_PASSWORD: ${MINIO_PASSWORD}
volumes:
- minio_data:/data
# Search engine per full-text e metadata search
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
environment:
- discovery.type=single-node
- xpack.security.enabled=true
- ELASTIC_PASSWORD=${ES_PASSWORD}
- ES_JAVA_OPTS=-Xms2g -Xmx2g
volumes:
- es_data:/usr/share/elasticsearch/data
# Worker per processing documenti
tika:
image: apache/tika:2.9.1-full
ports:
- "9998:9998"
# Database relazionale per metadata e catena custodia
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: ediscovery
POSTGRES_USER: ediscovery
POSTGRES_PASSWORD: ${PG_PASSWORD}
volumes:
minio_data:
es_data:
대규모 문서 수집 파이프라인
수집은 가장 중요한 단계입니다. 이기종 소스(이메일 서버, 파일)에서 문서를 수집해야 합니다. 공유, 클라우드 스토리지, SaaS 애플리케이션) 포렌식 무결성을 유지합니다. 획득한 각 문서는 다음을 충족해야 합니다. 가지고 있다 검증 가능한 암호화 해시 언제, 어떻게, 그리고 수집되었습니다.
"""
ediscovery/ingestion/collector.py
Collettore forense con chain of custody
"""
import hashlib
import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import boto3
from kafka import KafkaProducer
import psycopg2
class ForensicCollector:
"""
Raccoglie documenti con hash SHA-256 e registra
la catena di custodia in PostgreSQL.
"""
def __init__(self, config: dict):
self.minio = boto3.client(
's3',
endpoint_url=config['minio_url'],
aws_access_key_id=config['minio_user'],
aws_secret_access_key=config['minio_password']
)
self.producer = KafkaProducer(
bootstrap_servers=config['kafka_brokers'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.pg_conn = psycopg2.connect(config['postgres_dsn'])
self.bucket = 'ediscovery-originals'
def collect_file(
self,
file_path: Path,
matter_id: str,
custodian_id: str,
collector_id: str
) -> dict:
"""
Raccoglie un file con verifica integrita e registra custody event.
Restituisce il documento event per il topic Kafka.
"""
# 1. Calcola hash SHA-256 prima del trasferimento
sha256 = self._compute_sha256(file_path)
md5 = self._compute_md5(file_path)
file_size = file_path.stat().st_size
# 2. Genera Document ID univoco
doc_id = str(uuid.uuid4())
# 3. Upload su MinIO con metadata
s3_key = f"matters/{matter_id}/originals/{doc_id}/{file_path.name}"
self.minio.upload_file(
str(file_path),
self.bucket,
s3_key,
ExtraArgs={
'Metadata': {
'doc-id': doc_id,
'sha256': sha256,
'custodian-id': custodian_id,
'matter-id': matter_id
}
}
)
# 4. Verifica integrita post-upload
response = self.minio.head_object(Bucket=self.bucket, Key=s3_key)
uploaded_size = response['ContentLength']
if uploaded_size != file_size:
raise ValueError(
f"Integrita compromessa: atteso {file_size} bytes, "
f"caricato {uploaded_size} bytes"
)
# 5. Registra custody event in PostgreSQL
collection_timestamp = datetime.now(timezone.utc).isoformat()
custody_event = {
'event_id': str(uuid.uuid4()),
'doc_id': doc_id,
'matter_id': matter_id,
'custodian_id': custodian_id,
'collector_id': collector_id,
'event_type': 'COLLECTION',
'timestamp': collection_timestamp,
'source_path': str(file_path),
'sha256': sha256,
'md5': md5,
'file_size': file_size,
's3_key': s3_key
}
self._record_custody_event(custody_event)
# 6. Pubblica su Kafka per processing asincrono
document_event = {
'doc_id': doc_id,
'matter_id': matter_id,
'custodian_id': custodian_id,
's3_key': s3_key,
'filename': file_path.name,
'file_size': file_size,
'sha256': sha256,
'collection_timestamp': collection_timestamp,
'status': 'COLLECTED'
}
self.producer.send('ediscovery.documents.collected', document_event)
return document_event
def _compute_sha256(self, file_path: Path) -> str:
h = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(65536), b''):
h.update(chunk)
return h.hexdigest()
def _compute_md5(self, file_path: Path) -> str:
h = hashlib.md5()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(65536), b''):
h.update(chunk)
return h.hexdigest()
def _record_custody_event(self, event: dict) -> None:
with self.pg_conn.cursor() as cur:
cur.execute("""
INSERT INTO custody_events (
event_id, doc_id, matter_id, custodian_id,
collector_id, event_type, timestamp,
source_path, sha256, md5, file_size, s3_key
) VALUES (
%(event_id)s, %(doc_id)s, %(matter_id)s, %(custodian_id)s,
%(collector_id)s, %(event_type)s, %(timestamp)s,
%(source_path)s, %(sha256)s, %(md5)s, %(file_size)s, %(s3_key)s
)
""", event)
self.pg_conn.commit()
Apache Tika를 사용한 처리 및 콘텐츠 추출
파이프라인의 처리 및 기술적 핵심입니다. 각 문서는 텍스트로 변환되어야 합니다. 검색 가능, 메타데이터 추출(작성자, 날짜, 이메일 스레드, 문서 속성) 및 정규화 일반적인 계획으로. 아파치 티카 1,500개 이상의 다양한 파일 형식을 관리하며, e-Discovery에서 콘텐츠 추출을 위한 사실상의 표준이 되었습니다.
"""
ediscovery/processing/processor.py
Worker di processing documenti con Apache Tika
"""
import json
import requests
from kafka import KafkaConsumer, KafkaProducer
from elasticsearch import Elasticsearch
import boto3
class DocumentProcessor:
"""
Consumer Kafka che processa ogni documento raccolto:
estrae testo e metadata con Tika, indicizza su ES.
"""
TIKA_URL = "http://tika:9998"
def __init__(self, config: dict):
self.consumer = KafkaConsumer(
'ediscovery.documents.collected',
bootstrap_servers=config['kafka_brokers'],
group_id='document-processor',
value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=config['kafka_brokers'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.es = Elasticsearch(
config['elasticsearch_url'],
basic_auth=('elastic', config['es_password'])
)
self.minio = boto3.client('s3', endpoint_url=config['minio_url'])
def process_documents(self) -> None:
for message in self.consumer:
event = message.value
try:
processed = self._process_document(event)
self._index_document(processed)
self.producer.send(
'ediscovery.documents.processed',
{**event, **processed, 'status': 'PROCESSED'}
)
except Exception as exc:
self.producer.send(
'ediscovery.documents.errors',
{**event, 'error': str(exc), 'status': 'ERROR'}
)
def _process_document(self, event: dict) -> dict:
# Scarica documento da MinIO
response = self.minio.get_object(
Bucket='ediscovery-originals',
Key=event['s3_key']
)
file_content = response['Body'].read()
# Estrai testo con Tika (PUT /tika)
tika_response = requests.put(
f"{self.TIKA_URL}/tika",
data=file_content,
headers={
'Accept': 'text/plain',
'Content-Type': 'application/octet-stream'
},
timeout=120
)
extracted_text = tika_response.text
# Estrai metadata con Tika (PUT /meta)
meta_response = requests.put(
f"{self.TIKA_URL}/meta",
data=file_content,
headers={
'Accept': 'application/json',
'Content-Type': 'application/octet-stream'
},
timeout=60
)
metadata = meta_response.json()
return {
'extracted_text': extracted_text,
'text_length': len(extracted_text),
'tika_metadata': metadata,
'author': metadata.get('dc:creator', ''),
'created_date': metadata.get('dcterms:created', ''),
'modified_date': metadata.get('dcterms:modified', ''),
'content_type': metadata.get('Content-Type', ''),
'language': metadata.get('language', ''),
'page_count': metadata.get('xmpTPg:NPages', 0)
}
def _index_document(self, doc: dict) -> None:
"""Indicizza documento su Elasticsearch per ricerca full-text."""
self.es.index(
index=f"ediscovery-{doc['matter_id']}",
id=doc['doc_id'],
document={
'doc_id': doc['doc_id'],
'matter_id': doc['matter_id'],
'custodian_id': doc['custodian_id'],
'filename': doc['filename'],
'content': doc['extracted_text'],
'author': doc.get('author', ''),
'created_date': doc.get('created_date'),
'modified_date': doc.get('modified_date'),
'content_type': doc.get('content_type', ''),
'language': doc.get('language', ''),
'page_count': doc.get('page_count', 0),
'file_size': doc['file_size'],
'sha256': doc['sha256'],
'collection_timestamp': doc['collection_timestamp'],
'status': 'PROCESSED',
'review_status': 'UNREVIEWED',
'relevance_score': None,
'privilege': False,
'tags': []
}
)
중복 제거: 정확하고 거의 중복된 항목 감지
일반적인 e-Discovery 컬렉션에서는 문서의 40-70%가 중복되거나 거의 중복됩니다. 거기 중복 제거 심사 비용을 줄이는 것도 중요할 뿐만 아니라 법적 요구 사항: 동일한 이메일의 동일한 사본 수천 개를 생성하는 것은 규칙을 위반합니다. 발견하고 상대방의 비용을 증가시킵니다. 중복 제거에는 두 가지 수준이 있습니다.
e-Discovery의 중복 제거 수준
- 정확한 중복 제거(해시 기반): 동일한 SHA-256을 가진 문서는 정확히 중복됩니다. 보다 관련성이 높은 메타데이터가 포함된 "관리 사본"을 유지하고 다른 메타데이터를 중복 항목으로 연결합니다.
- 거의 중복 제거(MinHash/LSH): 내용이 유사하지만 동일하지 않은 문서 (서명이 추가된 이메일, 초안 버전) 지역에 민감한 MinHash와 같은 알고리즘 해싱은 Jaccard 유사성이 0.85보다 큰 문서를 식별합니다.
- 이메일 스레딩: 이메일을 대화(스레드)로 그룹화 모든 맥락이 포함된 최신 메시지만 표시하여 검토 양을 줄입니다.
"""
ediscovery/processing/deduplication.py
Near-deduplication con MinHash e LSH
"""
from datasketch import MinHash, MinHashLSH
import hashlib
from typing import Optional
class DeduplicationEngine:
def __init__(self, num_perm: int = 128, threshold: float = 0.85):
self.lsh = MinHashLSH(threshold=threshold, num_perm=num_perm)
self.num_perm = num_perm
self.processed: dict[str, str] = {} # sha256 -> doc_id
def is_exact_duplicate(self, sha256: str) -> Optional[str]:
"""Restituisce doc_id del duplicato esatto o None."""
return self.processed.get(sha256)
def register_document(self, doc_id: str, sha256: str, text: str) -> None:
"""Registra un documento nel sistema di dedup."""
self.processed[sha256] = doc_id
minhash = self._compute_minhash(text)
self.lsh.insert(doc_id, minhash)
def find_near_duplicates(self, text: str, query_doc_id: str) -> list[str]:
"""
Trova near-duplicati del testo con Jaccard sim >= threshold.
Restituisce lista di doc_id simili.
"""
minhash = self._compute_minhash(text)
results = self.lsh.query(minhash)
return [r for r in results if r != query_doc_id]
def _compute_minhash(self, text: str) -> MinHash:
minhash = MinHash(num_perm=self.num_perm)
# Shingling a livello di parola (3-gram)
tokens = text.lower().split()
shingles = [
' '.join(tokens[i:i+3])
for i in range(len(tokens) - 2)
]
for shingle in shingles:
minhash.update(shingle.encode('utf8'))
return minhash
기술 지원 검토 및 예측 코딩
검토 단계는 역사적으로 비용이 가장 많이 듭니다. 선임 변호사는 모든 문서를 읽고 관련 있음, 관련 없음 또는 특권(변호사-의뢰인 특권이 적용됨)으로 분류합니다. 그만큼 기술 지원 검토(TAR) ~와 함께 CAL(지속적 활성 학습) 이 비용을 대폭 줄입니다. 모델은 리뷰어의 결정과 우선순위를 통해 학습합니다. 가능성이 가장 높은 관련 문서를 검색하여 비율이 높을 때 검토를 중지할 수 있습니다. 예상 회수율이 임계값(일반적으로 TREC Legal Track 표준에 따라 75%)을 초과합니다.
"""
ediscovery/review/predictive_coding.py
Predictive Coding con Sentence Transformers e Active Learning
"""
from sentence_transformers import SentenceTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
import numpy as np
from typing import Optional
class PredictiveCodingEngine:
"""
Implementa TAR 2.0 (Continuous Active Learning).
Il reviewer classifica documenti, il modello re-addestra
e riordina la review queue.
"""
def __init__(self, model_name: str = 'all-mpnet-base-v2'):
self.encoder = SentenceTransformer(model_name)
self.classifier = LogisticRegression(
class_weight='balanced',
max_iter=1000
)
self.scaler = StandardScaler()
self.training_texts: list[str] = []
self.training_labels: list[int] = [] # 1=rilevante, 0=non rilevante
self.is_trained = False
def add_review_decision(
self,
doc_text: str,
is_relevant: bool
) -> None:
"""Aggiunge una decisione di review al training set."""
self.training_texts.append(doc_text)
self.training_labels.append(1 if is_relevant else 0)
# Re-addestra quando ci sono abbastanza esempi (>=10 per classe)
pos_count = sum(self.training_labels)
neg_count = len(self.training_labels) - pos_count
if pos_count >= 5 and neg_count >= 5:
self._retrain()
def _retrain(self) -> None:
embeddings = self.encoder.encode(
self.training_texts,
batch_size=32,
show_progress_bar=False
)
embeddings_scaled = self.scaler.fit_transform(embeddings)
self.classifier.fit(embeddings_scaled, self.training_labels)
self.is_trained = True
def predict_relevance(
self,
texts: list[str]
) -> list[dict]:
"""
Predice la rilevanza di una lista di documenti.
Restituisce lista di {text_index, relevance_prob} ordinata per score desc.
"""
if not self.is_trained:
# Prima del training, restituisce ordine casuale
return [
{'index': i, 'relevance_prob': 0.5}
for i in range(len(texts))
]
embeddings = self.encoder.encode(
texts,
batch_size=32,
show_progress_bar=False
)
embeddings_scaled = self.scaler.transform(embeddings)
probabilities = self.classifier.predict_proba(
embeddings_scaled
)[:, 1] # probabilità classe positiva
results = [
{'index': i, 'relevance_prob': float(p)}
for i, p in enumerate(probabilities)
]
return sorted(results, key=lambda x: x['relevance_prob'], reverse=True)
def estimate_recall(
self,
reviewed_count: int,
total_count: int,
relevant_found: int
) -> float:
"""
Stima il recall atteso usando il metodo seed+sample
secondo TREC Legal Track guidelines.
Semplificazione: usa il tasso di prevalenza osservato.
"""
if reviewed_count == 0:
return 0.0
prevalence = relevant_found / reviewed_count
estimated_total_relevant = prevalence * total_count
if estimated_total_relevant == 0:
return 1.0
return min(relevant_found / estimated_total_relevant, 1.0)
e-Discovery 플랫폼 비교
전자증거개시(e-Discovery) 플랫폼 시장은 통합되어 있지만 다음과 같은 압력으로 빠르게 발전하고 있습니다. AI의. 다음은 2025~2026년에 사용할 수 있는 주요 솔루션을 비교한 것입니다.
| 플랫폼 | 강점 | 제한 사항 | 가격 모델 | AI 기능 |
|---|---|---|---|---|
| 상대성 | 기업 표준, 방대한 생태계, 성숙한 API | 설정 복잡성, 높은 비용 | SaaS + 자체 호스팅, GB당 | RelevanceAI, 개념 검색, 이상 탐지 |
| 디스크 | 클라우드 기반, 현대적인 UX, 기본적으로 통합된 AI | 상대성 이론보다 작은 생태계 | 구독 + 사용량 | DISCO AI: 예측 코딩, 자동 태깅, 문서 Q&A |
| 에버로 | 실시간 협업, 뛰어난 UX, 시험 준비 완료 | 대량 사례에 대한 기능 감소 | GB당/월 | EverAI: 예측 코딩, 요약, 증착 Q&A |
| 로직컬 | 셀프 서비스, 투명한 가격, 신속한 온보딩 | 매우 복잡한 경우에는 덜 적합함 | GB당 또는 구독당 | 자동 태그 추가, 지원 검색, 고급 중복 제거 |
| 드러내다 | 고급 AI(Brainspace), 독자적인 NLP | 가파른 학습 곡선 | 기업 라이선스 | 주제 클러스터링, 개념 검색, 이상 탐지 |
EDRM XML 내보내기 및 문서 제작
마지막 단계는 합의된 형식에 따라 상대방에게 문서를 작성하는 것입니다. EDRM XML 표준은 모든 문서와 문서를 교환하기 위한 XML 스키마를 정의합니다. 메타데이터를 사용하여 모든 플랫폼에 쉽게 업로드할 수 있습니다. 일반적으로 서류가 옵니다. 제품 베이츠 번호 매기기 (고유한 프로그레시브 넘버링) 및 편집진과 함께 권한 있는 콘텐츠에 적용됩니다.
"""
ediscovery/production/edrm_exporter.py
Generazione export EDRM XML con Bates numbering
"""
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from typing import Iterator
def generate_edrm_xml(
documents: list[dict],
matter_id: str,
bates_prefix: str = "PROD",
start_bates: int = 1
) -> str:
"""
Genera EDRM XML per un set di documenti prodotti.
Ogni documento riceve un Bates number univoco.
"""
root = ET.Element('Root')
root.set('DataInterchangeType', 'Processed')
root.set('DateCreated', datetime.now(timezone.utc).isoformat())
root.set('Encoding', 'UTF-8')
root.set('MajorVersion', '1')
root.set('MinorVersion', '2')
batch = ET.SubElement(root, 'Batch')
documents_el = ET.SubElement(batch, 'Documents')
for idx, doc in enumerate(documents):
bates_num = f"{bates_prefix}{str(start_bates + idx).zfill(7)}"
doc_el = ET.SubElement(documents_el, 'Document')
doc_el.set('DocID', doc['doc_id'])
# Tags con metadata
tags_el = ET.SubElement(doc_el, 'Tags')
def add_tag(name: str, value: str, data_type: str = 'Text') -> None:
tag = ET.SubElement(tags_el, 'Tag')
tag.set('TagName', name)
tag.set('TagValue', value)
tag.set('TagDataType', data_type)
add_tag('BatesNumber', bates_num)
add_tag('DocID', doc['doc_id'])
add_tag('MatterID', matter_id)
add_tag('Custodian', doc.get('custodian_id', ''))
add_tag('FileName', doc.get('filename', ''))
add_tag('DateCollected', doc.get('collection_timestamp', ''), 'DateTime')
add_tag('DateCreated', doc.get('created_date', ''), 'DateTime')
add_tag('Author', doc.get('author', ''))
add_tag('FileSize', str(doc.get('file_size', 0)), 'LongInteger')
add_tag('SHA256', doc.get('sha256', ''))
add_tag('ContentType', doc.get('content_type', ''))
add_tag('ReviewStatus', doc.get('review_status', ''))
add_tag('IsPrivileged', str(doc.get('privilege', False)), 'Boolean')
add_tag(
'RelevanceScore',
str(round(doc.get('relevance_score', 0), 4)),
'Decimal'
)
return ET.tostring(root, encoding='unicode', xml_declaration=True)
중요한 법적 고려 사항
- 박탈: 그 후 합리적으로 증거를 인멸하거나 변경하는 행위 예측 가능한 법적 절차는 불리한 추론을 포함한 심각한 제재로 이어질 수 있습니다. 지시(파기된 증거가 불리하다고 가정하도록 배심원에게 지시).
- 권한 검토: 변호사-의뢰인 특권 또는 업무가 적용되는 서류 제품 교리는 생산 전에 식별되고 초안이 작성되어야 합니다. 프로덕션 우발적인 특권 문서 사용은 환수 계약으로 이의를 제기할 수 있습니다(FRE 502(d)).
- 국경 간 데이터 개인정보 보호: 유럽 직원으로부터 데이터를 수집합니다. Discovery USA에서는 심층적인 GDPR 분석이 필요합니다. 미국-EU 데이터 개인정보 보호 프레임워크(2023) 이전이 단순화되었지만 실사는 여전히 필수적입니다.
- AI 방어성: 법원은 사용된 TAR 방법에 대한 투명성을 요구합니다. 선택한 프로토콜과 리콜 임계값은 문서화되고 방어 가능해야 합니다.
CoC에 대한 데이터베이스 스키마
관계형 데이터베이스와 플랫폼의 법적 백본입니다. 모든 문서에 대한 모든 작업 타임스탬프, 사용자 및 조치 이유를 추적해야 합니다. 이 감사 추적은 불변성: 기록을 삭제하거나 소급하여 수정할 수 없습니다.
-- Schema PostgreSQL per e-Discovery con audit trail immutabile
-- Matters (cause/procedimenti)
CREATE TABLE matters (
matter_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
matter_name TEXT NOT NULL,
matter_number TEXT UNIQUE NOT NULL,
client_id UUID NOT NULL,
status TEXT NOT NULL DEFAULT 'ACTIVE',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
closed_at TIMESTAMPTZ
);
-- Custodians (soggetti i cui dati vengono raccolti)
CREATE TABLE custodians (
custodian_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
matter_id UUID REFERENCES matters(matter_id),
full_name TEXT NOT NULL,
email TEXT NOT NULL,
department TEXT,
hold_applied BOOLEAN NOT NULL DEFAULT FALSE,
hold_date TIMESTAMPTZ
);
-- Documents (indice master dei documenti)
CREATE TABLE documents (
doc_id UUID PRIMARY KEY,
matter_id UUID REFERENCES matters(matter_id),
custodian_id UUID REFERENCES custodians(custodian_id),
filename TEXT NOT NULL,
file_size BIGINT NOT NULL,
sha256 CHAR(64) NOT NULL,
md5 CHAR(32) NOT NULL,
s3_key TEXT NOT NULL,
content_type TEXT,
is_duplicate_of UUID REFERENCES documents(doc_id),
is_near_dup_of UUID REFERENCES documents(doc_id),
collection_timestamp TIMESTAMPTZ NOT NULL,
status TEXT NOT NULL DEFAULT 'COLLECTED',
review_status TEXT NOT NULL DEFAULT 'UNREVIEWED',
relevance_score NUMERIC(5,4),
privilege BOOLEAN NOT NULL DEFAULT FALSE,
bates_number TEXT UNIQUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Custody events (audit trail immutabile)
-- Constraint: nessuna UPDATE/DELETE permessa
CREATE TABLE custody_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
doc_id UUID REFERENCES documents(doc_id),
matter_id UUID REFERENCES matters(matter_id),
custodian_id UUID,
collector_id UUID,
event_type TEXT NOT NULL, -- COLLECTION, PROCESSING, REVIEW, PRODUCTION, etc.
timestamp TIMESTAMPTZ NOT NULL,
source_path TEXT,
sha256 CHAR(64),
md5 CHAR(32),
file_size BIGINT,
s3_key TEXT,
user_id UUID,
notes TEXT
);
-- Impedisce UPDATE e DELETE sulla tabella eventi
CREATE RULE no_update_custody_events AS
ON UPDATE TO custody_events DO INSTEAD NOTHING;
CREATE RULE no_delete_custody_events AS
ON DELETE TO custody_events DO INSTEAD NOTHING;
-- Review decisions
CREATE TABLE review_decisions (
decision_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
doc_id UUID REFERENCES documents(doc_id),
reviewer_id UUID NOT NULL,
decision TEXT NOT NULL, -- RELEVANT, NOT_RELEVANT, PRIVILEGED, NEEDS_REDACTION
confidence TEXT, -- HIGH, MEDIUM, LOW
review_timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
notes TEXT
);
결론 및 다음 단계
엔터프라이즈급 e-Discovery 플랫폼을 구축하려면 세 가지가 균형을 이루는 아키텍처가 필요합니다. 명령은 종종 서로 긴장 관계에 있습니다. 기술적 성능 (수백만 달러를 처리 합리적인 시간 내에 서류를 처리), 법적 정확성 (관리 체인 불변, 리콜 문서화, 특권 검토 방어 가능) e 유용성 (리뷰어는 데이터 과학자가 아닌 변호사입니다.)
우리가 조사한 주요 구성 요소 — SHA-256을 사용한 포렌식 수집, 처리 Apache Tika, MinHash/LSH를 통한 중복 제거, Active를 통한 예측 코딩으로 배포 학습 — 2025~2026년 해당 분야의 최첨단 기술을 나타냅니다. AI의 채택 TAR를 실험적 기술에서 시장 표준으로 전환: 플랫폼의 79% 오늘은 이를 기본적으로 통합합니다.
시리즈의 다음 기사에서는 규정 준수 엔진 실시간으로 규제 모니터링을 자동화하는 동적 규칙 엔진을 사용합니다.
리소스 및 통찰력
- EDRM(전자 검색 참조 모델): edrm.net
- 상대성 플랫폼 문서: relativity.com/artificial-intelligence
- TREC 법적 추적 리콜 지침: Measure.it 리콜 표준
- FRE 502(d): 실수로 생성된 특권 문서에 대한 환수 계약
- datasketch - MinHash/LSH용 Python 라이브러리
- 문장 변환기: 예측 코딩을 위한 임베딩







