ESG 보고 API: CSRD 워크플로와 통합
Il 2024년 1월 1일 유럽 기업들에게 획기적인 전환점이 되었습니다. 지속가능성 보고 지침(CSRD)이 발효되어 ESG 보고가 변화되었습니다. 정확한 기술기준으로 자발적 실천부터 법적 의무까지 대상 기업은 반드시 이제 규정 준수 보고서 생성 유럽 지속가능성 보고 표준(ESRS), 검증 가능한 데이터를 사용하고 XBRL 형식으로 태그를 지정하며 외부 보증을 받습니다.
기술팀의 과제는 구체적입니다. 수십 개의 회사 시스템에서 ESG 데이터를 수집하는 것입니다. (ERP, 에너지 관리, HR, 물류), ESRS 분류법에 따라 집계, 지표 계산 Scope 1-3 배출, 물 농도 등 승인 프로세스 관리 다중 레벨이며 조정기 지원 iXBRL 형식으로 출력을 생성합니다. 이 모든 것이 필요합니다 하나 전용 API 아키텍처, 공유 Excel 시트가 아닙니다.
이 기사에서는 ESRS 호환 데이터 모델에서 REST API에 이르기까지 완전한 시스템을 구축합니다. 데이터 수집 및 검증부터 통합 및 승인 워크플로, 생성까지 XBRL 출력의 백엔드에는 Python/FastAPI를 사용하고, 구조화된 스토리지에는 PostgreSQL을 사용합니다. SAP Sustainability와 Oracle ESG를 업스트림 데이터 소스로 통합하는 방법을 분석합니다.
무엇을 배울 것인가
- CSRD 타임라인 및 애플리케이션 경계: 누가 언제 무엇을 해야 하는지
- ESRS E1-E5, S1-S4, G1: 표준 구조 및 필수 지표
- 이중 중요성 평가: 영향 + 재무 알고리즘 구현
- ESRS 호환 ESG 데이터 수집을 위한 PostgreSQL 데이터 모델
- ESG 데이터 수집, 검증 및 집계를 위한 REST/GraphQL API
- 다단계 승인 및 통합을 위한 워크플로 자동화
- Python을 사용한 XBRL/iXBRL 태깅: 디지털 보고서 자동 생성
- ERP 통합: SAP Sustainability Footprint Management, Oracle Fusion ESG
- 외부 보증을 위한 데이터 계보 및 감사 추적
- 전체 사례 연구: 직원이 250명인 중견 제조업체
그린 소프트웨어 엔지니어링 시리즈
이 기사는 그린 소프트웨어 엔지니어링에 관한 포괄적인 시리즈 중 여덟 번째입니다. 코드 방출 측정부터 규제 ESG 보고까지:
| # | Articolo | 집중하다 |
|---|---|---|
| 1 | 그린 소프트웨어 엔지니어링 원칙 | GSF, SCI 스펙, 8가지 기본 원칙 |
| 2 | CodeCarbon: 코드 방출 측정 | Python 라이브러리, 대시보드, CI/CD 통합 |
| 3 | Climatiq API: 백엔드의 GHG 계산 | REST API, 범위 1-3, FastAPI 통합 |
| 4 | 탄소 인식 SDK | 워크로드 이동, 그리드 강도, 시간 이동 |
| 5 | 범위 3 파이프라인 | 가치 사슬 배출, 공급업체, LCA |
| 6 | 친환경 아키텍처 패턴 | 서버리스, 이벤트 중심, 지속 가능한 캐싱 |
| 7 | GreenOps: 지속 가능한 DevOps | 그린 CI/CD, 클라우드 규모 조정, FinOps |
| 8 | ESG 보고 API: CSRD 워크플로(이 문서) | ESRS, XBRL, 중요성, ERP 통합 |
| 9 | AI 탄소발자국 | LLM 교육/추론, 지속 가능한 ML |
| 10 | 고급 범위 모델링 | GHG 프로토콜 방법론, 특정 부문 |
CSRD: 지속가능성 보고의 새로운 패러다임
CSRD(지침 2022/2464/EU)는 비재무 보고 지침(NFRD)을 대체했습니다. 애플리케이션 경계를 대폭 확장하고 필요한 보고서 품질을 높입니다. NFRD는 대략 11,000개 기업 유럽 국가, CSRD가 참여합니다. 50,000+, 최초로 상장된 중소기업 및 유럽 자회사를 포함하여 비 EU 그룹.
신청 일정
| 날짜 | 과목 | 첫 번째 보고서 |
|---|---|---|
| 2024년 1월(2024년) | 이미 NFRD를 적용받고 있는 대기업(직원 500명 이상) | 2025년 |
| 2025년 1월(2025년) | EU 대규모 기업(직원 250명 초과 또는 매출액 4천만 유로 초과 또는 자산 2천만 초과) | 2026년 |
| 2026년 1월(2026년) | 규제 시장에 상장된 중소기업(직원 10~250명) | 2027년 |
| 2028년 1월(2028년) | 비EU 그룹의 EU 자회사(EU 매출액 >1억 5천만 유로) | 2029 |
옴니버스 단순화 2025
2025년 2월, 유럽연합 집행위원회는 단순화를 제안하는 옴니버스 패키지를 발표했습니다. CSRD에 중요한 의미: 중소기업 범위 축소, 웨이브 2 기업의 경우 2년 연기 3, 일부 ESRS 데이터 포인트의 단순화. 하지만, 큰 파도 1 회사는 남아 있습니다 완전 주제. 변경된 사항은 당시에도 여전히 입법 과정에 있습니다. 이 기사의 출판. 항상 법률의 업데이트 상태를 확인하세요.
ESRS의 구조
유럽 지속가능성 보고 표준은 세 가지 주제 영역으로 구성됩니다. 두 가지 표준 교차 절단:
| 기준 | 영역 | 주제 | 의무사항 |
|---|---|---|---|
| ESRS 1 | 교차 절단 | 일반 요구 사항 및 원칙 | Si |
| ESRS 2 | 교차 절단 | 일반 정보(거버넌스, 전략, 중요성) | Si |
| ESRS E1 | 환경 | 기후변화(GHG, 에너지, TCFD) | 물질적인 경우 |
| ESRS E2 | 환경 | 오염(공기, 물, 토양, 물질) | 물질적인 경우 |
| ESRS E3 | 환경 | 수자원 및 해양자원 | 물질적인 경우 |
| ESRS E4 | 환경 | 생물다양성과 생태계 | 물질적인 경우 |
| ESRS E5 | 환경 | 자원사용과 순환경제 | 물질적인 경우 |
| ESRS S1 | 사회의 | 자체 인력(근로조건, D&I, 건강) | 물질적인 경우 |
| ESRS S2 | 사회의 | 가치 사슬의 근로자 | 물질적인 경우 |
| ESRS S3 | 사회의 | 영향을 받은 커뮤니티 | 물질적인 경우 |
| ESRS S4 | 사회의 | 소비자 및 최종 사용자 | 물질적인 경우 |
| ESRS G1 | 통치 | 업무 수행(윤리, 부패 방지, 로비) | 물질적인 경우 |
ESRS E1 기후: 수집해야 할 측정항목
ESRS E1은 일반적으로 데이터 수집에 가장 비용이 많이 드는 주제입니다. 공개가 필요합니다 온실가스 배출(Scope 1, 2, 3), 과학적 연계 감축 목표(SBTi), 에너지 TCFD에 따른 공급원별 소비량 및 기후 위험/기회. 보자 필수 데이터 포인트 가장 관련성이 높은 것:
GHG 배출 - 필수 지표 ESRS E1-6
| 데이터 포인트 | 단위 | 설명 |
|---|---|---|
| 총 범위 1 GHG 배출량 | tCO2eq | 소유 또는 통제된 소스로부터의 직접 배출 |
| 총 범위 2 GHG 배출량(위치 기반) | tCO2eq | 에너지 구매로 인한 간접 배출량, 위치 파악 방법 |
| 총 범위 2 GHG 배출량(시장 기준) | tCO2eq | 간접배출, 시장방식(인증서) |
| 총 범위 3 GHG 배출량(15개 항목) | tCO2eq | 가치사슬에서의 간접배출 |
| 온실가스 집약도(수익) | tCO2eq / M EUR | 매출액 단위당 강도 |
| 총 에너지 소비량 | MWh | 총 에너지 소비량 |
| 재생에너지 점유율 | % | 재생 가능한 에너지원에서 얻은 에너지 비율 |
| 온실가스 제거 및 저장 | tCO2eq | 조림, CCS 등을 통한 탄소제거 |
이중 중요성 평가: 알고리즘 구현
CSRD의 방법론적 핵심은 이중 물질성: 회사마다 두 가지 동시 관점을 고려하여 어떤 ESG 주제가 중요한지 평가해야 합니다.
- 영향 중요성: 회사가 상당한 영향을 미칩니다(긍정적이든 부정적이든, 현재 또는 잠재적) 해당 주제에 대한 사람과 환경에 대해 어떻게 생각하시나요?
- 재정적 중요성: 해당 ESG 주제는 위험이나 기회를 생성합니다. 회사에 중요한 금융 자산(단기, 중기, 장기)이 있습니까?
두 가지 기준 중 하나 이상을 충족하는 주제는 중요합니다. 구현 방법을 살펴보겠습니다. 전용 API를 사용하여 이 프로세스를 수행합니다.
# materiality_assessment.py
# Implementazione Double Materiality Assessment per CSRD ESRS 2-IRO
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
import uuid
from datetime import datetime
class MaterialityDimension(str, Enum):
IMPACT = "impact"
FINANCIAL = "financial"
class ImpactType(str, Enum):
ACTUAL_NEGATIVE = "actual_negative"
ACTUAL_POSITIVE = "actual_positive"
POTENTIAL_NEGATIVE = "potential_negative"
POTENTIAL_POSITIVE = "potential_positive"
class TimeHorizon(str, Enum):
SHORT_TERM = "short_term" # 0-1 anno
MEDIUM_TERM = "medium_term" # 1-5 anni
LONG_TERM = "long_term" # oltre 5 anni
@dataclass
class ImpactMaterialityScore:
"""
Score impact materiality per un topic ESG.
Basato su: severity x likelihood (per impatti negativi)
o scale x likelihood (per impatti positivi)
"""
topic_id: str
impact_type: ImpactType
# Score 1-5 per ogni dimensione
scale: int # Quante persone/ecosistemi impattati
scope: int # Reversibilita' dell'impatto
irremediable: int # Difficolta' di rimediare
likelihood: int # Probabilità' che accada
@property
def severity(self) -> float:
"""Severity = media pesata di scale, scope, irremediable"""
return (self.scale * 0.4 + self.scope * 0.3 + self.irremediable * 0.3)
@property
def score(self) -> float:
"""Score finale 1-25 per ordinamento"""
return self.severity * self.likelihood
@property
def is_material(self) -> bool:
"""Soglia materialita': score >= 9 (severity >= 3, likelihood >= 3)"""
return self.score >= 9.0
@dataclass
class FinancialMaterialityScore:
"""
Score financial materiality per un topic ESG.
Basato su: magnitude x likelihood, per time horizon
"""
topic_id: str
is_risk: bool # True = rischio, False = opportunità'
magnitude: int # 1-5: impatto finanziario potenziale
likelihood: int # 1-5: probabilità'
time_horizon: TimeHorizon
# Fattori qualitativi
quantifiable: bool # Possiamo quantificarlo in EUR?
estimated_impact_eur: Optional[float] = None
@property
def score(self) -> float:
"""Score base con discount per time horizon"""
base = self.magnitude * self.likelihood
# Discount: short=1.0, medium=0.8, long=0.6
discount = {
TimeHorizon.SHORT_TERM: 1.0,
TimeHorizon.MEDIUM_TERM: 0.8,
TimeHorizon.LONG_TERM: 0.6
}[self.time_horizon]
return base * discount
@property
def is_material(self) -> bool:
return self.score >= 7.5 # magnitude >= 3, likelihood >= 3 con discount short
@dataclass
class TopicMaterialityResult:
topic_id: str
topic_name: str
esrs_standard: str
impact_scores: list[ImpactMaterialityScore] = field(default_factory=list)
financial_scores: list[FinancialMaterialityScore] = field(default_factory=list)
@property
def impact_material(self) -> bool:
return any(s.is_material for s in self.impact_scores)
@property
def financial_material(self) -> bool:
return any(s.is_material for s in self.financial_scores)
@property
def is_material(self) -> bool:
"""Double materiality: materiale se almeno una dimensione e' materiale"""
return self.impact_material or self.financial_material
@property
def materiality_level(self) -> str:
if self.impact_material and self.financial_material:
return "DOUBLE_MATERIAL"
elif self.impact_material:
return "IMPACT_MATERIAL"
elif self.financial_material:
return "FINANCIAL_MATERIAL"
else:
return "NOT_MATERIAL"
def run_materiality_assessment(
company_id: str,
topics: list[dict]
) -> list[TopicMaterialityResult]:
"""
Esegue il materiality assessment completo.
Input: lista topic con scores raccolti via stakeholder engagement.
Output: ranking per materialita'.
"""
results = []
for topic_data in topics:
result = TopicMaterialityResult(
topic_id=topic_data["topic_id"],
topic_name=topic_data["topic_name"],
esrs_standard=topic_data["esrs_standard"]
)
# Aggiungi impact scores
for impact in topic_data.get("impact_scores", []):
result.impact_scores.append(ImpactMaterialityScore(**impact))
# Aggiungi financial scores
for financial in topic_data.get("financial_scores", []):
result.financial_scores.append(FinancialMaterialityScore(**financial))
results.append(result)
# Ordina per materialita' discendente
return sorted(
results,
key=lambda r: (
r.is_material,
r.impact_material and r.financial_material,
max((s.score for s in r.impact_scores), default=0) +
max((s.score for s in r.financial_scores), default=0)
),
reverse=True
)
CSRD용 PostgreSQL 데이터 모델
CSRD 호환 시스템에는 ESRS의 구조를 반영하는 데이터베이스 스키마가 필요합니다. 데이터 계보를 지원하고 감사 추적에 대한 변경 기록을 유지합니다. 여기 있어요 핵심 계획:
-- Schema PostgreSQL per CSRD ESG Data Collection
-- Ottimizzato per ESRS data points con audit trail completo
-- Tabella aziende e perimetro di consolidamento
CREATE TABLE companies (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
legal_name VARCHAR(255) NOT NULL,
lei_code VARCHAR(20), -- Legal Entity Identifier
country_code CHAR(2) NOT NULL,
nace_code VARCHAR(10), -- Classificazione attivita'
fiscal_year_end DATE,
employees_fte DECIMAL(10,2),
revenue_eur DECIMAL(20,2),
total_assets_eur DECIMAL(20,2),
is_parent BOOLEAN DEFAULT FALSE,
parent_id UUID REFERENCES companies(id),
csrd_wave INTEGER, -- 1, 2, 3, 4
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Periodi di reporting
CREATE TABLE reporting_periods (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL REFERENCES companies(id),
year INTEGER NOT NULL,
period_type VARCHAR(20) DEFAULT 'ANNUAL',
start_date DATE NOT NULL,
end_date DATE NOT NULL,
status VARCHAR(30) DEFAULT 'DRAFT',
-- DRAFT, COLLECTING, REVIEWING, APPROVED, FILED
submitted_at TIMESTAMPTZ,
approved_by UUID,
UNIQUE (company_id, year, period_type)
);
-- Catalog ESRS data points (reference table)
CREATE TABLE esrs_data_points (
id VARCHAR(50) PRIMARY KEY,
-- es. "E1-6_GrossScope1", "S1-7_GenderPayGap"
esrs_standard VARCHAR(10) NOT NULL, -- E1, E2, S1, G1...
topic VARCHAR(100) NOT NULL,
sub_topic VARCHAR(100),
name VARCHAR(255) NOT NULL,
description TEXT,
unit_of_measure VARCHAR(50),
data_type VARCHAR(20), -- NUMERIC, BOOLEAN, TEXT, DATE, ENUM
is_mandatory BOOLEAN DEFAULT FALSE,
is_phase_in BOOLEAN DEFAULT FALSE, -- Permesso rinvio anni 1-3
ghg_protocol_scope VARCHAR(10), -- S1, S2_LB, S2_MB, S3
scope3_category INTEGER, -- 1-15 per Scope 3
tags TEXT[]
);
-- Raccolta dati ESG per periodo
CREATE TABLE esg_data_submissions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL REFERENCES companies(id),
reporting_period_id UUID NOT NULL REFERENCES reporting_periods(id),
data_point_id VARCHAR(50) NOT NULL REFERENCES esrs_data_points(id),
-- Valori (uno solo populated in base a data_type)
value_numeric DECIMAL(30, 6),
value_boolean BOOLEAN,
value_text TEXT,
value_date DATE,
-- Metadata qualità'
unit_of_measure VARCHAR(50),
estimation_method VARCHAR(100),
confidence_level VARCHAR(20), -- HIGH, MEDIUM, LOW
is_estimated BOOLEAN DEFAULT FALSE,
-- Data lineage
source_system VARCHAR(100), -- SAP, Oracle, Manual, API
source_document_ref VARCHAR(255),
collected_by UUID,
collection_timestamp TIMESTAMPTZ DEFAULT NOW(),
-- Workflow
status VARCHAR(30) DEFAULT 'DRAFT',
-- DRAFT, SUBMITTED, REVIEWED, APPROVED, REJECTED
reviewed_by UUID,
review_timestamp TIMESTAMPTZ,
review_comment TEXT,
approved_by UUID,
approval_timestamp TIMESTAMPTZ,
-- Versioning per audit
version INTEGER DEFAULT 1,
previous_version_id UUID REFERENCES esg_data_submissions(id),
CONSTRAINT unique_submission UNIQUE (
company_id, reporting_period_id, data_point_id, version
)
);
-- Indici per performance
CREATE INDEX idx_submissions_period ON esg_data_submissions(reporting_period_id);
CREATE INDEX idx_submissions_status ON esg_data_submissions(status);
CREATE INDEX idx_submissions_datapoint ON esg_data_submissions(data_point_id);
-- Audit log immutabile
CREATE TABLE audit_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
entity_type VARCHAR(50) NOT NULL,
entity_id UUID NOT NULL,
action VARCHAR(20) NOT NULL, -- CREATE, UPDATE, DELETE, APPROVE
actor_id UUID NOT NULL,
actor_email VARCHAR(255),
timestamp TIMESTAMPTZ DEFAULT NOW(),
ip_address INET,
old_value JSONB,
new_value JSONB,
change_reason TEXT
);
-- View aggregata emissioni GHG per periodo
CREATE OR REPLACE VIEW v_ghg_emissions_summary AS
SELECT
c.legal_name,
rp.year,
SUM(CASE WHEN dp.id LIKE 'E1%Scope1%' THEN eds.value_numeric ELSE 0 END) AS scope1_tco2eq,
SUM(CASE WHEN dp.id LIKE 'E1%Scope2%LB%' THEN eds.value_numeric ELSE 0 END) AS scope2_lb_tco2eq,
SUM(CASE WHEN dp.id LIKE 'E1%Scope2%MB%' THEN eds.value_numeric ELSE 0 END) AS scope2_mb_tco2eq,
SUM(CASE WHEN dp.id LIKE 'E1%Scope3%' THEN eds.value_numeric ELSE 0 END) AS scope3_tco2eq,
SUM(CASE WHEN dp.id = 'E1-5_TotalEnergy' THEN eds.value_numeric ELSE 0 END) AS total_energy_mwh,
SUM(CASE WHEN dp.id = 'E1-5_RenewableEnergy' THEN eds.value_numeric ELSE 0 END) AS renewable_mwh
FROM esg_data_submissions eds
JOIN companies c ON c.id = eds.company_id
JOIN reporting_periods rp ON rp.id = eds.reporting_period_id
JOIN esrs_data_points dp ON dp.id = eds.data_point_id
WHERE eds.status = 'APPROVED'
GROUP BY c.legal_name, rp.year;
API 아키텍처: ESG 데이터 수집 및 검증
CSRD 시스템의 API 아키텍처는 세 가지 주요 계층으로 나뉩니다. 사업부 및 소스 시스템, ESRS 규칙에 따른 검증 및 강화, e 그룹 통합을 위한 집계. FastAPI를 사용하여 백엔드를 구현해 보겠습니다.
# main.py - FastAPI ESG Reporting API
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from uuid import UUID
from datetime import datetime, date
from decimal import Decimal
import asyncpg
import json
app = FastAPI(
title="CSRD ESG Reporting API",
description="API per raccolta e gestione dati ESG conformi ESRS",
version="1.0.0"
)
# === PYDANTIC MODELS ===
class ESGDataPointSubmission(BaseModel):
data_point_id: str = Field(..., description="ID ESRS data point (es. E1-6_GrossScope1)")
value_numeric: Optional[Decimal] = None
value_boolean: Optional[bool] = None
value_text: Optional[str] = None
value_date: Optional[date] = None
unit_of_measure: Optional[str] = None
estimation_method: Optional[str] = None
confidence_level: str = Field(default="MEDIUM", pattern="^(HIGH|MEDIUM|LOW)$")
is_estimated: bool = False
source_system: Optional[str] = None
source_document_ref: Optional[str] = None
@validator('value_numeric')
def validate_numeric(cls, v, values):
if v is not None and v < 0:
raise ValueError('Valori numerici GHG non possono essere negativi')
return v
class BulkESGSubmission(BaseModel):
reporting_period_id: UUID
submissions: List[ESGDataPointSubmission]
submission_note: Optional[str] = None
class DataPointValidationResult(BaseModel):
data_point_id: str
is_valid: bool
errors: List[str] = []
warnings: List[str] = []
class ValidationReport(BaseModel):
period_id: UUID
total_mandatory_points: int
submitted_points: int
approved_points: int
coverage_pct: float
validation_results: List[DataPointValidationResult]
overall_valid: bool
# === VALIDATION SERVICE ===
class ESRSValidationService:
"""
Validazione regole ESRS per data points.
Implementa controlli incrociati richiesti dallo standard.
"""
CONSISTENCY_RULES = [
{
"rule": "SCOPE1_PLUS_SCOPE2_CONSISTENCY",
"description": "Total GHG = Scope1 + Scope2 (market) deve essere coerente",
"severity": "ERROR"
},
{
"rule": "ENERGY_GHG_RATIO",
"description": "Intensità' GHG su energia deve essere plausibile",
"severity": "WARNING"
},
{
"rule": "RENEWABLE_WITHIN_TOTAL",
"description": "Energia rinnovabile non può' superare totale energia",
"severity": "ERROR"
},
{
"rule": "YOY_VARIANCE_THRESHOLD",
"description": "Variazione YoY > 30% richiede commento",
"severity": "WARNING"
}
]
async def validate_submission(
self,
submissions: List[ESGDataPointSubmission],
period_id: UUID,
db_pool
) -> ValidationReport:
errors_by_point = {}
warnings_by_point = {}
# 1. Validazione singoli data point
for sub in submissions:
errors = []
warnings = []
# Carica definizione data point da catalogo
dp_def = await self._get_data_point_def(sub.data_point_id, db_pool)
if not dp_def:
errors.append(f"Data point {sub.data_point_id} non trovato nel catalogo ESRS")
else:
# Verifica unita' di misura
if dp_def['unit_of_measure'] and sub.unit_of_measure:
if sub.unit_of_measure != dp_def['unit_of_measure']:
warnings.append(
f"Unit of measure {sub.unit_of_measure} diversa da "
f"attesa {dp_def['unit_of_measure']}"
)
# Verifica tipo dato
if dp_def['data_type'] == 'NUMERIC' and sub.value_numeric is None:
errors.append(f"Data point numerico richiede value_numeric")
errors_by_point[sub.data_point_id] = errors
warnings_by_point[sub.data_point_id] = warnings
# 2. Controlli incrociati
await self._run_cross_checks(submissions, errors_by_point, warnings_by_point)
# 3. Calcola copertura mandatory
mandatory_coverage = await self._calc_mandatory_coverage(period_id, submissions, db_pool)
results = [
DataPointValidationResult(
data_point_id=dp_id,
is_valid=len(errs) == 0,
errors=errs,
warnings=warnings_by_point.get(dp_id, [])
)
for dp_id, errs in errors_by_point.items()
]
overall_valid = all(r.is_valid for r in results)
return ValidationReport(
period_id=period_id,
total_mandatory_points=mandatory_coverage['total'],
submitted_points=mandatory_coverage['submitted'],
approved_points=mandatory_coverage['approved'],
coverage_pct=mandatory_coverage['coverage_pct'],
validation_results=results,
overall_valid=overall_valid
)
async def _run_cross_checks(self, submissions, errors_by_point, warnings_by_point):
"""Controlli incrociati tra data points correlati."""
values = {s.data_point_id: s.value_numeric for s in submissions if s.value_numeric}
# Energia rinnovabile <= totale energia
renewable = values.get('E1-5_RenewableEnergy')
total_energy = values.get('E1-5_TotalEnergy')
if renewable and total_energy and renewable > total_energy:
errors_by_point.setdefault('E1-5_RenewableEnergy', []).append(
"Energia rinnovabile non può' superare energia totale"
)
async def _get_data_point_def(self, dp_id: str, db_pool):
async with db_pool.acquire() as conn:
return await conn.fetchrow(
"SELECT * FROM esrs_data_points WHERE id = $1", dp_id
)
async def _calc_mandatory_coverage(self, period_id, submissions, db_pool):
async with db_pool.acquire() as conn:
total = await conn.fetchval(
"SELECT COUNT(*) FROM esrs_data_points WHERE is_mandatory = TRUE"
)
submitted = len([s for s in submissions if s.value_numeric is not None
or s.value_boolean is not None or s.value_text is not None])
return {
'total': total or 0,
'submitted': submitted,
'approved': 0,
'coverage_pct': (submitted / total * 100) if total else 0
}
# === ENDPOINTS ===
@app.post("/api/v1/periods/{period_id}/submissions/bulk")
async def bulk_submit_esg_data(
period_id: UUID,
payload: BulkESGSubmission,
background_tasks: BackgroundTasks,
db_pool = Depends(get_db_pool)
):
"""
Submission bulk di dati ESG per un periodo.
Accetta fino a 500 data points per chiamata.
"""
if len(payload.submissions) > 500:
raise HTTPException(400, "Max 500 data points per richiesta")
validator = ESRSValidationService()
validation_report = await validator.validate_submission(
payload.submissions, period_id, db_pool
)
if not validation_report.overall_valid:
raise HTTPException(422, {
"message": "Validation failed",
"validation_report": validation_report.dict()
})
# Salva in batch
async with db_pool.acquire() as conn:
async with conn.transaction():
for sub in payload.submissions:
await conn.execute("""
INSERT INTO esg_data_submissions
(company_id, reporting_period_id, data_point_id,
value_numeric, value_boolean, value_text,
unit_of_measure, estimation_method, confidence_level,
is_estimated, source_system, source_document_ref,
status)
SELECT
rp.company_id, $1, $2,
$3, $4, $5,
$6, $7, $8,
$9, $10, $11,
'SUBMITTED'
FROM reporting_periods rp WHERE rp.id = $1
ON CONFLICT (company_id, reporting_period_id, data_point_id, version)
DO UPDATE SET
value_numeric = EXCLUDED.value_numeric,
status = 'SUBMITTED',
collection_timestamp = NOW()
""",
period_id,
str(sub.data_point_id),
sub.value_numeric,
sub.value_boolean,
sub.value_text,
sub.unit_of_measure,
sub.estimation_method,
sub.confidence_level,
sub.is_estimated,
sub.source_system,
sub.source_document_ref
)
# Trigger notifica al reviewer in background
background_tasks.add_task(notify_reviewers, period_id, len(payload.submissions))
return {
"status": "submitted",
"period_id": str(period_id),
"submitted_count": len(payload.submissions),
"validation_report": validation_report.dict()
}
@app.get("/api/v1/periods/{period_id}/validation-report")
async def get_validation_report(
period_id: UUID,
db_pool = Depends(get_db_pool)
):
"""Restituisce report di completezza e validità per un periodo."""
async with db_pool.acquire() as conn:
# Aggregazione stato submissions per mandatory data points
rows = await conn.fetch("""
SELECT
dp.id as data_point_id,
dp.name,
dp.is_mandatory,
dp.unit_of_measure,
eds.status,
eds.value_numeric,
eds.confidence_level
FROM esrs_data_points dp
LEFT JOIN esg_data_submissions eds ON
eds.data_point_id = dp.id
AND eds.reporting_period_id = $1
WHERE dp.is_mandatory = TRUE
ORDER BY dp.esrs_standard, dp.id
""", period_id)
missing_mandatory = [r for r in rows if r['status'] is None]
approved = [r for r in rows if r['status'] == 'APPROVED']
return {
"period_id": str(period_id),
"total_mandatory": len(rows),
"approved": len(approved),
"missing": len(missing_mandatory),
"coverage_pct": round(len(approved) / len(rows) * 100, 1) if rows else 0,
"missing_data_points": [
{"id": r['data_point_id'], "name": r['name']}
for r in missing_mandatory
]
}
async def notify_reviewers(period_id: UUID, count: int):
"""Background task: notifica reviewer via email/webhook."""
print(f"[NOTIFY] Period {period_id}: {count} data points submitted for review")
async def get_db_pool():
"""Dependency injection per asyncpg pool."""
# In produzione: pool globale inizializzato al startup
pool = await asyncpg.create_pool(
"postgresql://user:password@localhost/esg_db",
min_size=5,
max_size=20
)
try:
yield pool
finally:
await pool.close()
워크플로우 자동화: 다단계 승인
기업 CSRD 시스템에는 수집을 관리하는 구조화된 작업 흐름이 필요합니다. 여러 사업부, 검토 및 승인 주기, 최종 통합 등이 있습니다. 워크플로 엔진의 구현은 다음과 같습니다.
# workflow_engine.py
# Workflow CSRD multi-livello: collect -> review -> approve -> file
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional
import asyncio
class WorkflowState(str, Enum):
DRAFT = "DRAFT"
DATA_COLLECTION = "DATA_COLLECTION"
INTERNAL_REVIEW = "INTERNAL_REVIEW"
CFO_APPROVAL = "CFO_APPROVAL"
EXTERNAL_ASSURANCE = "EXTERNAL_ASSURANCE"
BOARD_APPROVAL = "BOARD_APPROVAL"
FILED = "FILED"
REJECTED = "REJECTED"
class WorkflowTransition:
"""Definisce una transizione di stato valida con condizioni e azioni."""
def __init__(
self,
from_state: WorkflowState,
to_state: WorkflowState,
action: str,
condition: Optional[Callable] = None,
pre_actions: list = None,
post_actions: list = None
):
self.from_state = from_state
self.to_state = to_state
self.action = action
self.condition = condition
self.pre_actions = pre_actions or []
self.post_actions = post_actions or []
# Definizione FSM per workflow CSRD
CSRD_WORKFLOW_TRANSITIONS = [
WorkflowTransition(
from_state=WorkflowState.DRAFT,
to_state=WorkflowState.DATA_COLLECTION,
action="START_COLLECTION",
post_actions=["send_collection_invites", "set_deadlines"]
),
WorkflowTransition(
from_state=WorkflowState.DATA_COLLECTION,
to_state=WorkflowState.INTERNAL_REVIEW,
action="SUBMIT_FOR_REVIEW",
condition=lambda ctx: ctx.get("mandatory_coverage_pct", 0) >= 80,
pre_actions=["run_validation", "generate_completeness_report"]
),
WorkflowTransition(
from_state=WorkflowState.INTERNAL_REVIEW,
to_state=WorkflowState.DATA_COLLECTION,
action="REQUEST_CORRECTIONS",
post_actions=["notify_data_owners", "log_review_comments"]
),
WorkflowTransition(
from_state=WorkflowState.INTERNAL_REVIEW,
to_state=WorkflowState.CFO_APPROVAL,
action="APPROVE_INTERNAL_REVIEW",
condition=lambda ctx: ctx.get("reviewer_approved", False),
pre_actions=["generate_draft_report"]
),
WorkflowTransition(
from_state=WorkflowState.CFO_APPROVAL,
to_state=WorkflowState.EXTERNAL_ASSURANCE,
action="CFO_SIGN_OFF",
post_actions=["prepare_assurance_package", "notify_auditor"]
),
WorkflowTransition(
from_state=WorkflowState.EXTERNAL_ASSURANCE,
to_state=WorkflowState.BOARD_APPROVAL,
action="ASSURANCE_COMPLETE",
condition=lambda ctx: ctx.get("assurance_opinion") in ["UNQUALIFIED", "QUALIFIED"],
post_actions=["attach_assurance_report"]
),
WorkflowTransition(
from_state=WorkflowState.BOARD_APPROVAL,
to_state=WorkflowState.FILED,
action="BOARD_APPROVE_AND_FILE",
post_actions=["generate_ixbrl", "submit_to_oam", "publish_report"]
),
]
class CSRDWorkflowEngine:
def __init__(self, db_pool):
self.db_pool = db_pool
self.transitions = {
(t.from_state, t.action): t
for t in CSRD_WORKFLOW_TRANSITIONS
}
async def execute_transition(
self,
period_id: str,
action: str,
actor_id: str,
context: dict = None
) -> dict:
"""
Esegue una transizione di workflow per un periodo di reporting.
Restituisce il nuovo stato o errore se la transizione non e' valida.
"""
context = context or {}
async with self.db_pool.acquire() as conn:
# Carica stato corrente
period = await conn.fetchrow(
"SELECT status, company_id FROM reporting_periods WHERE id = $1",
period_id
)
if not period:
raise ValueError(f"Periodo {period_id} non trovato")
current_state = WorkflowState(period['status'])
transition_key = (current_state, action)
if transition_key not in self.transitions:
raise ValueError(
f"Transizione {action} non valida da stato {current_state.value}"
)
transition = self.transitions[transition_key]
# Verifica condizione (arricchisci context con dati DB se necessario)
if transition.condition:
enriched_ctx = await self._enrich_context(period_id, context, conn)
if not transition.condition(enriched_ctx):
raise ValueError(
f"Condizione per {action} non soddisfatta: "
f"mandatory coverage = {enriched_ctx.get('mandatory_coverage_pct', 0)}%"
)
# Esegui pre-actions
for pre_action in transition.pre_actions:
await self._execute_action(pre_action, period_id, context, conn)
# Aggiorna stato
await conn.execute(
"UPDATE reporting_periods SET status = $1, updated_at = NOW() WHERE id = $2",
transition.to_state.value,
period_id
)
# Audit log
await conn.execute("""
INSERT INTO audit_log (entity_type, entity_id, action, actor_id, new_value)
VALUES ('reporting_period', $1, $2, $3, $4)
""", period_id, action, actor_id,
json.dumps({"from": current_state.value, "to": transition.to_state.value}))
# Esegui post-actions in background (notifiche, generazione documenti)
for post_action in transition.post_actions:
asyncio.create_task(
self._execute_action_background(post_action, period_id, context)
)
return {
"period_id": period_id,
"previous_state": current_state.value,
"new_state": transition.to_state.value,
"action": action,
"actor_id": actor_id,
"timestamp": datetime.now().isoformat()
}
async def _enrich_context(self, period_id: str, context: dict, conn) -> dict:
"""Arricchisce context con dati calcolati dal DB."""
mandatory_total = await conn.fetchval(
"SELECT COUNT(*) FROM esrs_data_points WHERE is_mandatory = TRUE"
)
approved = await conn.fetchval("""
SELECT COUNT(*) FROM esg_data_submissions
WHERE reporting_period_id = $1 AND status = 'APPROVED'
""", period_id)
enriched = dict(context)
enriched['mandatory_coverage_pct'] = (
(approved / mandatory_total * 100) if mandatory_total else 0
)
return enriched
async def _execute_action(self, action_name: str, period_id: str, context: dict, conn):
"""Esegue un'azione sincrona nel workflow."""
action_map = {
"run_validation": self._action_run_validation,
"generate_completeness_report": self._action_gen_completeness,
"generate_draft_report": self._action_gen_draft,
}
if action_name in action_map:
await action_map[action_name](period_id, context, conn)
async def _action_run_validation(self, period_id, context, conn):
print(f"[WORKFLOW] Running validation for period {period_id}")
async def _action_gen_completeness(self, period_id, context, conn):
print(f"[WORKFLOW] Generating completeness report for {period_id}")
async def _action_gen_draft(self, period_id, context, conn):
print(f"[WORKFLOW] Generating draft CSRD report for {period_id}")
async def _execute_action_background(self, action_name: str, period_id: str, context: dict):
"""Esegue azione in background (notifiche, documenti)."""
print(f"[BACKGROUND] Executing {action_name} for period {period_id}")
XBRL/iXBRL 태깅: 디지털 보고서 생성
CSRD는 연례 보고서가 다음 형식으로 생성되도록 요구합니다. 태그가 있는 XHTML iXBRL (인라인 XBRL)은 구조화된 데이터를 읽을 수 있는 HTML에 직접 포함합니다. EFRAG는 각 데이터 포인트를 iXBRL 태그에 매핑하는 ESRS XBRL 분류법을 게시했습니다. 자동 생성을 구현해 보겠습니다.
# xbrl_generator.py
# Generazione iXBRL per CSRD secondo ESRS Taxonomy
from lxml import etree
from decimal import Decimal
from typing import Optional
import uuid
# ESRS XBRL Taxonomy namespace
ESRS_NS = "https://xbrl.efrag.org/taxonomy/draft-esrs/2022-11-22"
XBRLI_NS = "http://www.xbrl.org/2003/instance"
IX_NS = "http://www.xbrl.org/2013/inlineXBRL"
LINK_NS = "http://www.xbrl.org/2003/linkbase"
class ESRSXBRLGenerator:
"""
Genera documenti iXBRL conformi alla ESRS Taxonomy.
Output: XHTML con embedded XBRL facts.
"""
def __init__(self, company_data: dict, period_data: dict):
self.company = company_data
self.period = period_data
self.context_id = f"ctx_{uuid.uuid4().hex[:8]}"
self.facts = []
def add_fact(
self,
element_name: str,
value,
unit: Optional[str] = None,
decimals: int = 0,
scale: int = 0
):
"""Aggiunge un fact XBRL al report."""
self.facts.append({
"element": element_name,
"value": value,
"unit": unit,
"decimals": decimals,
"scale": scale
})
def generate_ixbrl(self) -> str:
"""
Genera il documento iXBRL completo.
Returns: stringa XHTML con tagging iXBRL embedded.
"""
# Namespace map per il documento
nsmap = {
"ix": IX_NS,
"xbrli": XBRLI_NS,
"esrs": ESRS_NS,
None: "http://www.w3.org/1999/xhtml"
}
# Root XHTML
html = etree.Element("html", nsmap=nsmap)
head = etree.SubElement(html, "head")
body = etree.SubElement(html, "body")
# Header XBRL nel head
ix_header = etree.SubElement(head, f"{{IX_NS}}header")
hidden = etree.SubElement(ix_header, f"{{IX_NS}}hidden")
# Context: entità' e periodo
resources = etree.SubElement(ix_header, f"{{IX_NS}}resources")
xbrli_context = etree.SubElement(resources, f"{{XBRLI_NS}}context")
xbrli_context.set("id", self.context_id)
entity = etree.SubElement(xbrli_context, f"{{XBRLI_NS}}entity")
identifier = etree.SubElement(entity, f"{{XBRLI_NS}}identifier")
identifier.set("scheme", "http://www.lei.info")
identifier.text = self.company.get("lei_code", "000000000000000000")
period = etree.SubElement(xbrli_context, f"{{XBRLI_NS}}period")
start_date = etree.SubElement(period, f"{{XBRLI_NS}}startDate")
start_date.text = self.period.get("start_date", "2024-01-01")
end_date = etree.SubElement(period, f"{{XBRLI_NS}}endDate")
end_date.text = self.period.get("end_date", "2024-12-31")
# Unit monetaria e tCO2eq
unit_co2 = etree.SubElement(resources, f"{{XBRLI_NS}}unit")
unit_co2.set("id", "tCO2eq")
measure_co2 = etree.SubElement(unit_co2, f"{{XBRLI_NS}}measure")
measure_co2.text = "esrs:metricTon"
# Body: report HTML con facts inline
h1 = etree.SubElement(body, "h1")
h1.text = f"CSRD Sustainability Report {self.period.get('year', 2024)}"
h2 = etree.SubElement(body, "h2")
h2.text = "E1 - Climate Change: GHG Emissions"
# Tabella emissioni con tagging iXBRL inline
table = etree.SubElement(body, "table")
thead = etree.SubElement(table, "thead")
tr_head = etree.SubElement(thead, "tr")
etree.SubElement(tr_head, "th").text = "Metric"
etree.SubElement(tr_head, "th").text = "Value"
etree.SubElement(tr_head, "th").text = "Unit"
tbody = etree.SubElement(table, "tbody")
# Aggiungi facts come righe di tabella con tag iXBRL
for fact in self.facts:
tr = etree.SubElement(tbody, "tr")
etree.SubElement(tr, "td").text = fact["element"].split(":")[-1]
td_value = etree.SubElement(tr, "td")
# Tag iXBRL inline per il valore
ix_nonfraction = etree.SubElement(
td_value,
f"{{IX_NS}}nonFraction"
)
ix_nonfraction.set("name", fact["element"])
ix_nonfraction.set("contextRef", self.context_id)
ix_nonfraction.set("unitRef", fact.get("unit", "tCO2eq"))
ix_nonfraction.set("decimals", str(fact.get("decimals", 0)))
ix_nonfraction.set("scale", str(fact.get("scale", 0)))
ix_nonfraction.set("format", "ixt:num-dot-decimal")
ix_nonfraction.text = str(fact["value"])
etree.SubElement(tr, "td").text = fact.get("unit", "tCO2eq")
# Serializza come XHTML
return etree.tostring(
html,
pretty_print=True,
xml_declaration=True,
encoding="UTF-8",
method="xml"
).decode("utf-8")
def generate_csrd_report_from_db(approved_submissions: list, company: dict, period: dict) -> str:
"""
Genera report iXBRL da submissions approvate nel DB.
"""
generator = ESRSXBRLGenerator(company, period)
# Mapping data_point_id -> XBRL element name
ESRS_XBRL_MAP = {
"E1-6_GrossScope1": "esrs:GrossScope1GHGEmissions",
"E1-6_GrossScope2LB": "esrs:GrossScope2GHGEmissionsLocationBased",
"E1-6_GrossScope2MB": "esrs:GrossScope2GHGEmissionsMarketBased",
"E1-6_GrossScope3Total": "esrs:GrossScope3GHGEmissions",
"E1-5_TotalEnergy": "esrs:TotalEnergyConsumption",
"E1-5_RenewableEnergy": "esrs:EnergyConsumptionFromRenewableSources",
"E1-5_NonRenewableEnergy": "esrs:EnergyConsumptionFromNonRenewableSources",
}
for sub in approved_submissions:
element = ESRS_XBRL_MAP.get(sub['data_point_id'])
if element and sub['value_numeric'] is not None:
generator.add_fact(
element_name=element,
value=float(sub['value_numeric']),
unit=sub.get('unit_of_measure', 'tCO2eq'),
decimals=0
)
return generator.generate_ixbrl()
ERP 통합: SAP Sustainability 및 Oracle ESG
대부분의 ESG 데이터는 이미 기업 ERP 시스템에 있습니다. 에너지 소비 PM/PM 모듈의 경우 SuccessFactors의 HR 데이터, TM 모듈의 물류입니다. 커넥터를 구현합니다 엔터프라이즈 시장의 두 가지 주요 플랫폼:
# erp_connectors.py
# Connettori per SAP Sustainability Footprint Management (SFM)
# e Oracle Fusion ESG
import httpx
import asyncio
from abc import ABC, abstractmethod
from typing import Optional
class ERPConnector(ABC):
"""Interfaccia base per connettori ERP."""
@abstractmethod
async def get_energy_consumption(self, year: int, entity_id: str) -> dict:
pass
@abstractmethod
async def get_ghg_emissions(self, year: int, entity_id: str) -> dict:
pass
@abstractmethod
async def get_workforce_metrics(self, year: int, entity_id: str) -> dict:
pass
class SAPSustainabilityConnector(ERPConnector):
"""
Connettore per SAP Sustainability Footprint Management (SFM).
Usa SAP OData API v4.
API base: /sap/opu/odata4/sap/api_sustainability_footprint/
"""
def __init__(
self,
base_url: str,
client_id: str,
client_secret: str,
token_url: str
):
self.base_url = base_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self._token: Optional[str] = None
self._client = httpx.AsyncClient(timeout=30)
async def _get_token(self) -> str:
"""OAuth2 client credentials per SAP BTP."""
if self._token:
return self._token
response = await self._client.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
}
)
response.raise_for_status()
self._token = response.json()["access_token"]
return self._token
async def _get(self, path: str, params: dict = None) -> dict:
token = await self._get_token()
response = await self._client.get(
f"{self.base_url}{path}",
headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
params=params
)
response.raise_for_status()
return response.json()
async def get_energy_consumption(self, year: int, entity_id: str) -> dict:
"""
Estrae consumo energetico da SAP SFM per anno e entità'.
OData endpoint: /EnergyConsumption
"""
data = await self._get(
"/EnergyConsumption",
params={
"$filter": f"Year eq {year} and EntityId eq '{entity_id}'",
"$select": "Year,EntityId,EnergyType,Quantity,Unit,Source"
}
)
result = {"total_mwh": 0, "renewable_mwh": 0, "non_renewable_mwh": 0, "by_source": []}
for record in data.get("value", []):
qty_mwh = self._convert_to_mwh(record["Quantity"], record["Unit"])
result["total_mwh"] += qty_mwh
result["by_source"].append({
"source": record["EnergyType"],
"mwh": qty_mwh,
"is_renewable": record["EnergyType"] in [
"SOLAR", "WIND", "HYDRO", "BIOMASS", "GEOTHERMAL"
]
})
if record["EnergyType"] in ["SOLAR", "WIND", "HYDRO", "BIOMASS", "GEOTHERMAL"]:
result["renewable_mwh"] += qty_mwh
else:
result["non_renewable_mwh"] += qty_mwh
return result
async def get_ghg_emissions(self, year: int, entity_id: str) -> dict:
"""Estrae emissioni GHG da SAP SFM, suddivise per scope."""
data = await self._get(
"/GHGEmission",
params={
"$filter": f"Year eq {year} and EntityId eq '{entity_id}'",
"$select": "Year,EntityId,GHGScope,Category,EmissionTCO2e"
}
)
result = {"scope1": 0, "scope2_lb": 0, "scope2_mb": 0, "scope3_by_category": {}}
for r in data.get("value", []):
scope = r["GHGScope"]
tco2e = float(r["EmissionTCO2e"])
if scope == "1":
result["scope1"] += tco2e
elif scope == "2" and r.get("Category") == "LB":
result["scope2_lb"] += tco2e
elif scope == "2" and r.get("Category") == "MB":
result["scope2_mb"] += tco2e
elif scope == "3":
cat = r.get("Category", "UNKNOWN")
result["scope3_by_category"][cat] = (
result["scope3_by_category"].get(cat, 0) + tco2e
)
result["scope3_total"] = sum(result["scope3_by_category"].values())
return result
async def get_workforce_metrics(self, year: int, entity_id: str) -> dict:
"""Estrae metriche workforce per ESRS S1 da SAP SuccessFactors."""
# SAP SuccessFactors OData API
data = await self._get(
"/WorkforceMetrics",
params={
"$filter": f"Year eq {year} and EntityId eq '{entity_id}'",
"$select": "Metric,Value,Gender,EmployeeType"
}
)
result = {
"headcount_total": 0,
"headcount_female": 0,
"headcount_male": 0,
"fte_total": 0,
"turnover_rate": 0,
"gender_pay_gap_pct": None
}
for r in data.get("value", []):
metric = r.get("Metric")
value = float(r.get("Value", 0))
if metric == "HEADCOUNT":
result["headcount_total"] += value
if r.get("Gender") == "F":
result["headcount_female"] += value
elif r.get("Gender") == "M":
result["headcount_male"] += value
elif metric == "FTE":
result["fte_total"] += value
elif metric == "TURNOVER_RATE":
result["turnover_rate"] = value
elif metric == "GENDER_PAY_GAP":
result["gender_pay_gap_pct"] = value
return result
def _convert_to_mwh(self, quantity: float, unit: str) -> float:
conversions = {"GJ": 0.2778, "MWh": 1.0, "kWh": 0.001, "TJ": 277.78}
return quantity * conversions.get(unit, 1.0)
class OracleESGConnector(ERPConnector):
"""
Connettore per Oracle Fusion Sustainability (Oracle ESG).
Usa Oracle REST API + BICC per bulk extraction.
"""
def __init__(self, base_url: str, username: str, password: str):
self.base_url = base_url.rstrip("/")
self.auth = (username, password)
self._client = httpx.AsyncClient(timeout=60)
async def _get(self, path: str, params: dict = None) -> dict:
response = await self._client.get(
f"{self.base_url}{path}",
auth=self.auth,
headers={"Accept": "application/json"},
params=params
)
response.raise_for_status()
return response.json()
async def get_energy_consumption(self, year: int, entity_id: str) -> dict:
data = await self._get(
"/fscmRestApi/resources/11.13.18.05/sustainabilityMetrics",
params={
"q": f"metricCategory=ENERGY;fiscalYear={year};legalEntityId={entity_id}",
"fields": "metricName,metricValue,uom,dataSource,lastUpdateDate"
}
)
total_mwh = 0
renewable_mwh = 0
RENEWABLE_METRICS = {"SOLAR_ENERGY", "WIND_ENERGY", "HYDRO_ENERGY", "PPA_RENEWABLE"}
for item in data.get("items", []):
mwh = self._to_mwh(item["metricValue"], item["uom"])
total_mwh += mwh
if item["metricName"] in RENEWABLE_METRICS:
renewable_mwh += mwh
return {
"total_mwh": total_mwh,
"renewable_mwh": renewable_mwh,
"non_renewable_mwh": total_mwh - renewable_mwh
}
async def get_ghg_emissions(self, year: int, entity_id: str) -> dict:
data = await self._get(
"/fscmRestApi/resources/11.13.18.05/sustainabilityMetrics",
params={
"q": f"metricCategory=GHG_EMISSIONS;fiscalYear={year};legalEntityId={entity_id}",
"fields": "metricName,ghgScope,metricValue,uom"
}
)
result = {"scope1": 0, "scope2_lb": 0, "scope2_mb": 0, "scope3_total": 0}
for item in data.get("items", []):
scope = item.get("ghgScope", "")
val = float(item.get("metricValue", 0))
if scope == "SCOPE_1":
result["scope1"] += val
elif scope == "SCOPE_2_LB":
result["scope2_lb"] += val
elif scope == "SCOPE_2_MB":
result["scope2_mb"] += val
elif scope.startswith("SCOPE_3"):
result["scope3_total"] += val
return result
async def get_workforce_metrics(self, year: int, entity_id: str) -> dict:
data = await self._get(
"/fscmRestApi/resources/11.13.18.05/sustainabilityMetrics",
params={
"q": f"metricCategory=WORKFORCE;fiscalYear={year};legalEntityId={entity_id}"
}
)
result = {"headcount_total": 0, "fte_total": 0, "gender_pay_gap_pct": None}
for item in data.get("items", []):
name = item.get("metricName", "")
val = float(item.get("metricValue", 0))
if name == "TOTAL_HEADCOUNT":
result["headcount_total"] = val
elif name == "TOTAL_FTE":
result["fte_total"] = val
elif name == "GENDER_PAY_GAP":
result["gender_pay_gap_pct"] = val
return result
def _to_mwh(self, value: float, uom: str) -> float:
factors = {"MWH": 1.0, "KWH": 0.001, "GJ": 0.2778, "MMBTU": 0.29307}
return value * factors.get(uom.upper(), 1.0)
class ERPDataAggregator:
"""
Aggrega dati da multipli connettori ERP e mappa su ESRS data points.
"""
def __init__(self, connectors: list[ERPConnector]):
self.connectors = connectors
async def collect_and_map_to_esrs(
self,
year: int,
entity_id: str,
period_id: str
) -> list[dict]:
"""
Raccoglie dati da tutti i connettori e produce submissions ESRS-ready.
"""
results = []
# Raccolta parallela da tutti i connettori
energy_tasks = [c.get_energy_consumption(year, entity_id) for c in self.connectors]
ghg_tasks = [c.get_ghg_emissions(year, entity_id) for c in self.connectors]
wf_tasks = [c.get_workforce_metrics(year, entity_id) for c in self.connectors]
energy_results = await asyncio.gather(*energy_tasks, return_exceptions=True)
ghg_results = await asyncio.gather(*ghg_tasks, return_exceptions=True)
wf_results = await asyncio.gather(*wf_tasks, return_exceptions=True)
# Aggregazione (media pesata o somma in base al tipo)
total_energy = sum(
r["total_mwh"] for r in energy_results
if isinstance(r, dict)
)
total_renewable = sum(
r["renewable_mwh"] for r in energy_results
if isinstance(r, dict)
)
total_scope1 = sum(
r.get("scope1", 0) for r in ghg_results
if isinstance(r, dict)
)
# Map su ESRS data points
if total_energy > 0:
results.append({
"data_point_id": "E1-5_TotalEnergy",
"value_numeric": total_energy,
"unit_of_measure": "MWh",
"source_system": "ERP_AGGREGATED",
"confidence_level": "HIGH"
})
if total_renewable > 0:
results.append({
"data_point_id": "E1-5_RenewableEnergy",
"value_numeric": total_renewable,
"unit_of_measure": "MWh",
"source_system": "ERP_AGGREGATED",
"confidence_level": "HIGH"
})
if total_scope1 > 0:
results.append({
"data_point_id": "E1-6_GrossScope1",
"value_numeric": total_scope1,
"unit_of_measure": "tCO2eq",
"source_system": "ERP_AGGREGATED",
"confidence_level": "HIGH"
})
return results
외부 보증을 위한 데이터 계보 및 감사 추적
CSRD에는 다음이 필요합니다. 제한된 보증 (그리고 관점에서 합리적인 보증) 외부감사인에 의해. 즉, 보고서의 모든 데이터에는 완벽한 추적성: 출처, 삽입자, 승인자, 그리고 그것이 어떻게 계산되었는지. 데이터 계보 추적기를 구현해 보겠습니다.
# data_lineage.py
# Data lineage tracker per assurance CSRD
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import hashlib
import json
@dataclass
class LineageNode:
"""Nodo nel grafo di data lineage."""
node_id: str
node_type: str # SOURCE, TRANSFORM, SUBMISSION, APPROVAL
description: str
actor: Optional[str] = None
timestamp: Optional[datetime] = None
metadata: dict = field(default_factory=dict)
hash: Optional[str] = None
def compute_hash(self) -> str:
"""Hash del contenuto per rilevare manomissioni."""
content = json.dumps({
"node_id": self.node_id,
"node_type": self.node_type,
"description": self.description,
"actor": self.actor,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"metadata": self.metadata
}, sort_keys=True)
self.hash = hashlib.sha256(content.encode()).hexdigest()
return self.hash
@dataclass
class LineageEdge:
"""Arco tra nodi nel grafo di lineage."""
from_node: str
to_node: str
relationship: str # DERIVED_FROM, APPROVED_BY, TRANSFORMED_BY
metadata: dict = field(default_factory=dict)
class DataLineageTracker:
"""
Traccia il percorso completo di ogni dato ESG dalla sorgente al report.
Struttura: DAG (Directed Acyclic Graph) di LineageNode.
"""
def __init__(self, db_pool):
self.db_pool = db_pool
async def record_extraction(
self,
data_point_id: str,
source_system: str,
source_query: str,
raw_value,
actor_id: str
) -> str:
"""Registra estrazione da sistema sorgente."""
node = LineageNode(
node_id=f"extract_{data_point_id}_{datetime.now().timestamp():.0f}",
node_type="SOURCE",
description=f"Extracted from {source_system}",
actor=actor_id,
timestamp=datetime.now(),
metadata={
"source_system": source_system,
"source_query": source_query,
"raw_value": str(raw_value),
"data_point_id": data_point_id
}
)
node.compute_hash()
await self._save_node(node)
return node.node_id
async def record_transformation(
self,
from_node_id: str,
data_point_id: str,
transformation_type: str,
input_value,
output_value,
formula: Optional[str],
actor_id: str
) -> str:
"""Registra una trasformazione (unit conversion, aggregation, etc.)."""
node = LineageNode(
node_id=f"transform_{data_point_id}_{datetime.now().timestamp():.0f}",
node_type="TRANSFORM",
description=f"{transformation_type} applied",
actor=actor_id,
timestamp=datetime.now(),
metadata={
"transformation_type": transformation_type,
"input_value": str(input_value),
"output_value": str(output_value),
"formula": formula,
"data_point_id": data_point_id
}
)
node.compute_hash()
await self._save_node(node)
edge = LineageEdge(
from_node=from_node_id,
to_node=node.node_id,
relationship="DERIVED_FROM"
)
await self._save_edge(edge)
return node.node_id
async def record_approval(
self,
submission_node_id: str,
data_point_id: str,
approver_id: str,
approval_comment: Optional[str]
) -> str:
"""Registra approvazione da parte del reviewer."""
node = LineageNode(
node_id=f"approval_{data_point_id}_{datetime.now().timestamp():.0f}",
node_type="APPROVAL",
description="Data point approved for reporting",
actor=approver_id,
timestamp=datetime.now(),
metadata={
"data_point_id": data_point_id,
"approval_comment": approval_comment,
"approver_id": approver_id
}
)
node.compute_hash()
await self._save_node(node)
edge = LineageEdge(
from_node=submission_node_id,
to_node=node.node_id,
relationship="APPROVED_BY"
)
await self._save_edge(edge)
return node.node_id
async def get_lineage_for_data_point(
self,
data_point_id: str,
period_id: str
) -> dict:
"""
Restituisce il grafo completo di lineage per un data point.
Usato dall'auditor per tracciare l'origine dei dati.
"""
async with self.db_pool.acquire() as conn:
nodes = await conn.fetch("""
SELECT * FROM lineage_nodes
WHERE metadata->>'data_point_id' = $1
ORDER BY timestamp ASC
""", data_point_id)
edges = await conn.fetch("""
SELECT le.* FROM lineage_edges le
JOIN lineage_nodes ln ON ln.node_id = le.from_node
WHERE ln.metadata->>'data_point_id' = $1
""", data_point_id)
return {
"data_point_id": data_point_id,
"period_id": period_id,
"nodes": [dict(n) for n in nodes],
"edges": [dict(e) for e in edges],
"integrity_check": await self._verify_chain_integrity(nodes)
}
async def _verify_chain_integrity(self, nodes) -> dict:
"""Verifica che gli hash non siano stati manomessi."""
for node in nodes:
expected_hash = node.compute_hash() if hasattr(node, 'compute_hash') else None
if expected_hash and node.get('hash') != expected_hash:
return {"valid": False, "tampered_node": node['node_id']}
return {"valid": True, "nodes_verified": len(nodes)}
async def _save_node(self, node: LineageNode):
async with self.db_pool.acquire() as conn:
await conn.execute("""
INSERT INTO lineage_nodes
(node_id, node_type, description, actor, timestamp, metadata, hash)
VALUES ($1, $2, $3, $4, $5, $6, $7)
""",
node.node_id, node.node_type, node.description,
node.actor, node.timestamp, json.dumps(node.metadata), node.hash)
async def _save_edge(self, edge: LineageEdge):
async with self.db_pool.acquire() as conn:
await conn.execute("""
INSERT INTO lineage_edges (from_node, to_node, relationship, metadata)
VALUES ($1, $2, $3, $4)
""",
edge.from_node, edge.to_node, edge.relationship,
json.dumps(edge.metadata))
사례 연구: 직원이 250명인 제조 회사
실제 구현을 분석해 보겠습니다. MetalTech S.r.l., 제조 회사 직원 250명, 생산 공장 2개(Turin 및 Bari), CSRD 웨이브 2 적용 (2025년, 2026년 첫 보고). 프로젝트를 구성한 방법은 다음과 같습니다.
1단계: 격차 분석 및 중요성 평가(1~2개월)
메탈텍은 프로세스를 통해 제조 부문의 중요 주제를 파악했습니다. 이중 중요성:
- ESRS E1(기후): 이중 중요성 - 상당한 에너지 소비 (용해로, 생산라인) 및 기상이변으로 인한 물리적 위험
- ESRS E2(오염): 충격 물질 - PM2.5 입자 배출 페인트의 VOC 화합물
- ESRS E5(순환 경제): 재료 - 금속 폐기물(재료의 35% 투입), 포장, 산업폐수
- ESRS S1(인력): 자재 - 직원 250명, 재해율 높음 업종별 성별 임금격차
- ESRS G1(비즈니스 행동): 기본 규제 요구 사항에 대한 자료
2단계: 기술 인프라(2~4개월)
MetalTech이 채택한 기술 스택:
| 요소 | 기술 | 용법 |
|---|---|---|
| API 백엔드 | FastAPI + asyncpg | ESG 데이터 수집 및 검증 |
| 데이터베이스 | PostgreSQL 16 + pg벡터 | 스토리지 ESRS 데이터 포인트 + 유사성 검색 |
| ERP 통합 | SAP SFM OData API | SAP의 에너지, 배출, 인력 |
| 작업 흐름 | 맞춤형 FSM + Redis | 다단계 승인 워크플로 |
| XBRL 생성 | Python lxml + ESRS 분류 | OAM/CONSOB용 iXBRL 출력 |
| 데이터 계보 | PostgreSQL의 사용자 정의 DAG | 보증을 위한 감사 추적 |
| 알림 | FastAPI BackgroundTasks + SMTP | 마감 알림, 검토 요청 |
| 프런트엔드 | 각도 17 + 재질 | 수집 및 모니터링 대시보드 |
3단계: FY2025 데이터 수집(4~14개월)
# Esempio raccolta dati MetalTech FY2025
# Script orchestrazione completa per stabilimento Torino
import asyncio
from erp_connectors import SAPSustainabilityConnector, ERPDataAggregator
from workflow_engine import CSRDWorkflowEngine, WorkflowState
async def run_metaltech_fy2025_collection():
# Configurazione connettore SAP
sap = SAPSustainabilityConnector(
base_url="https://metaltech.hana.ondemand.com",
client_id="METALTECH_CSRD_CLIENT",
client_secret="***", # Da env vars
token_url="https://metaltech.authentication.eu12.hana.ondemand.com/oauth/token"
)
aggregator = ERPDataAggregator([sap])
# Raccolta dati stabilimento Torino (entity TO001)
print("[1/3] Raccolta dati da SAP per stabilimento Torino...")
submissions_to = await aggregator.collect_and_map_to_esrs(
year=2025,
entity_id="TO001",
period_id="period_fy2025"
)
# Raccolta dati stabilimento Bari (entity BA001)
print("[2/3] Raccolta dati da SAP per stabilimento Bari...")
submissions_ba = await aggregator.collect_and_map_to_esrs(
year=2025,
entity_id="BA001",
period_id="period_fy2025"
)
# Consolidamento gruppo: somma Scope 1+2, media intensità'
print("[3/3] Consolidamento dati di gruppo...")
consolidated = consolidate_group_data([submissions_to, submissions_ba])
# Output dati FY2025 MetalTech consolidati
for sub in consolidated:
print(f" {sub['data_point_id']}: {sub['value_numeric']:.2f} {sub['unit_of_measure']}")
return consolidated
def consolidate_group_data(entity_submissions: list) -> list:
"""Consolida submissions di multiple entità per il gruppo."""
by_data_point = {}
for entity_subs in entity_submissions:
for sub in entity_subs:
dp_id = sub['data_point_id']
if dp_id not in by_data_point:
by_data_point[dp_id] = {**sub, "value_numeric": 0}
# Per emissioni e energia: somma (non media)
by_data_point[dp_id]["value_numeric"] += sub['value_numeric']
by_data_point[dp_id]["source_system"] = "CONSOLIDATED_GROUP"
return list(by_data_point.values())
# Risultati tipici per azienda manifatturiera 250 dip.
METALTECH_FY2025_RESULTS = {
"E1-6_GrossScope1": {
"value": 2_847, # tCO2eq - principalmente gas naturale forno fusione
"confidence": "HIGH",
"source": "SAP SFM - Gas meter readings"
},
"E1-6_GrossScope2LB": {
"value": 1_523, # tCO2eq - elettricita' rete
"confidence": "HIGH",
"source": "SAP SFM - Electricity bills"
},
"E1-6_GrossScope2MB": {
"value": 980, # tCO2eq - market-based con PPA solare
"confidence": "HIGH",
"source": "SAP SFM + GO certificates"
},
"E1-5_TotalEnergy": {
"value": 18_450, # MWh totali
"confidence": "HIGH",
"source": "SAP SFM - Meters + bills"
},
"E1-5_RenewableEnergy": {
"value": 6_200, # MWh (33.6% via PPA + impianto solare 500kWp)
"confidence": "MEDIUM", # Stima PPA
"source": "SAP SFM + PPA contract"
},
"GHG_Intensity": {
"value": 42.3, # tCO2eq / M EUR fatturato
"confidence": "HIGH",
"source": "Calculated: (S1+S2MB) / revenue"
}
}
결과와 교훈
MetalTech FY2025: CSRD 프로젝트 결과
- 데이터 수집 시간: 6주(수동) ~ 4일(자동)
- 필수 ESRS 적용 범위: 필수 데이터 포인트의 94%가 수집 및 승인되었습니다.
- 데이터 품질: 제출물 중 78%에 대한 신뢰도가 높음(전년 45% 대비)
- 감사 시간: 자동 데이터 계보로 60% 감소
- 외부보증비용: -25% 더 적은 샘플링 노력으로
- 주요 결과: 범위 3 고양이. 1(업스트림 재료)이 40% 과소평가되었습니다. 이전 수동 계산에 비해
주요 어려움과 극복 방법
피해야 할 안티패턴
- 처음부터 모든 것을 구축하지 마세요: 시장은 SaaS CSRD 솔루션을 제공합니다 (Workiva, Watershed, Persefoni) 중소기업용. 구축 vs 구매 평가: 자체 구축 이는 데이터 복잡성이 높거나 고유한 통합 요구 사항이 있는 경우에만 의미가 있습니다.
- 중요성 평가를 지연하지 마십시오. 모든 일의 전제조건입니다. 그것이 없으면 어떤 데이터를 수집해야 할지 알 수 없습니다. 간단한 방법으로도 즉시 시작하세요.
- 범위 3을 무시하지 마십시오. 많은 제조 회사의 경우 Scope 3 cat. 1 (원재료)와 고양이. 11(제품 사용)이 전체 배출량의 70%를 초과합니다. Scope 3을 과소보고하는 것은 가장 일반적인 greenwashing입니다.
- XBRL을 최종 생각으로 다루지 마십시오. iXBRL 태깅에는 다음이 필요합니다. ESRS 분류법에 익숙합니다. 최소 2~3개월의 개발 기간을 계획하세요.
- 통합 범위를 잊지 마세요. CSRD에는 다음이 필요합니다. 50%를 초과하는 모든 자회사를 포함합니다. "단순성"을 위해 엔터티 제외 이는 규정 준수 위험입니다.
고급 ESG 쿼리를 위한 GraphQL API
ESG 분석 대시보드 및 대화형 보고 도구의 경우 GraphQL API는 다음을 제공합니다. 기존 REST보다 뛰어난 유연성으로 집계를 통한 특정 쿼리 허용 즉석에서:
# graphql_schema.py
# Schema GraphQL per ESG Analytics API con Strawberry
import strawberry
from strawberry.types import Info
from typing import Optional, List
from datetime import date
from decimal import Decimal
@strawberry.type
class GHGEmissionsSummary:
year: int
scope1_tco2eq: Decimal
scope2_lb_tco2eq: Decimal
scope2_mb_tco2eq: Decimal
scope3_tco2eq: Decimal
total_tco2eq: Decimal
ghg_intensity: Optional[Decimal]
yoy_change_pct: Optional[Decimal]
@strawberry.type
class EnergySummary:
year: int
total_mwh: Decimal
renewable_mwh: Decimal
non_renewable_mwh: Decimal
renewable_share_pct: Decimal
@strawberry.type
class ESGReportingStatus:
period_id: str
year: int
workflow_state: str
mandatory_coverage_pct: Decimal
approved_data_points: int
pending_review: int
missing_mandatory: int
@strawberry.type
class Query:
@strawberry.field
async def ghg_emissions(
self,
info: Info,
company_id: str,
years: List[int]
) -> List[GHGEmissionsSummary]:
"""Emissioni GHG per anni richiesti con calcolo YoY automatico."""
db = info.context["db_pool"]
async with db.acquire() as conn:
rows = await conn.fetch("""
SELECT
rp.year,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope1%'
THEN eds.value_numeric ELSE 0 END), 0) as scope1,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope2%LB%'
THEN eds.value_numeric ELSE 0 END), 0) as scope2_lb,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope2%MB%'
THEN eds.value_numeric ELSE 0 END), 0) as scope2_mb,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope3%'
THEN eds.value_numeric ELSE 0 END), 0) as scope3
FROM esg_data_submissions eds
JOIN reporting_periods rp ON rp.id = eds.reporting_period_id
JOIN esrs_data_points dp ON dp.id = eds.data_point_id
WHERE rp.company_id = $1
AND rp.year = ANY($2)
AND eds.status = 'APPROVED'
AND dp.esrs_standard = 'E1'
GROUP BY rp.year
ORDER BY rp.year
""", company_id, years)
results = []
prev_total = None
for row in rows:
total = row['scope1'] + row['scope2_mb'] + row['scope3']
yoy = None
if prev_total and prev_total > 0:
yoy = ((total - prev_total) / prev_total) * 100
results.append(GHGEmissionsSummary(
year=row['year'],
scope1_tco2eq=Decimal(str(row['scope1'])),
scope2_lb_tco2eq=Decimal(str(row['scope2_lb'])),
scope2_mb_tco2eq=Decimal(str(row['scope2_mb'])),
scope3_tco2eq=Decimal(str(row['scope3'])),
total_tco2eq=Decimal(str(total)),
ghg_intensity=None,
yoy_change_pct=Decimal(str(yoy)) if yoy else None
))
prev_total = total
return results
@strawberry.field
async def reporting_status(
self,
info: Info,
company_id: str,
year: int
) -> Optional[ESGReportingStatus]:
"""Stato del processo di reporting per un anno."""
db = info.context["db_pool"]
async with db.acquire() as conn:
period = await conn.fetchrow("""
SELECT rp.id, rp.year, rp.status,
COUNT(eds.id) FILTER (WHERE eds.status = 'APPROVED') as approved,
COUNT(eds.id) FILTER (WHERE eds.status IN ('SUBMITTED','REVIEWING')) as pending,
(SELECT COUNT(*) FROM esrs_data_points WHERE is_mandatory = TRUE) as mandatory_total
FROM reporting_periods rp
LEFT JOIN esg_data_submissions eds ON eds.reporting_period_id = rp.id
WHERE rp.company_id = $1 AND rp.year = $2
GROUP BY rp.id, rp.year, rp.status
""", company_id, year)
if not period:
return None
mandatory_total = period['mandatory_total'] or 1
approved = period['approved'] or 0
coverage = (approved / mandatory_total) * 100
return ESGReportingStatus(
period_id=str(period['id']),
year=period['year'],
workflow_state=period['status'],
mandatory_coverage_pct=Decimal(str(round(coverage, 1))),
approved_data_points=approved,
pending_review=period['pending'] or 0,
missing_mandatory=max(0, mandatory_total - approved)
)
schema = strawberry.Schema(query=Query)
# Query di esempio per il frontend
EXAMPLE_QUERY = """
query ESGDashboard($companyId: String!, $years: [Int!]!) {
ghgEmissions(companyId: $companyId, years: $years) {
year
scope1Tco2eq
scope2MbTco2eq
scope3Tco2eq
totalTco2eq
yoyChangePct
}
reportingStatus(companyId: $companyId, year: 2025) {
workflowState
mandatoryCoveragePct
approvedDataPoints
missingMandatory
}
}
"""
AI법과 CSRD와의 교차점에 대한 고찰
2025~2026년에 떠오르는 측면은 CSRD와 AI Act EU 간의 교차점입니다. 시스템 배출량 계산, 기후 예측 또는 ESG 보고 자동화에 사용되는 AI AI법의 고위험 AI 범주에 속할 수 있으며 다음을 요구합니다.
- 기술 문서 배출량 추정에 사용되는 AI 시스템 (ESRS 방법론적 투명성)
- 인간의 감독 자동화된 중요성 결정
- 감사 추적 AI 예측(이미 데이터 계보에 포함됨)
- 차별금지 ESRS S1 지표에 사용되는 AI HR 시스템
AI Act x CSRD 타임라인
- 2025년 2월: AI법 시행 중 - 금지된 관행 금지
- 2025년 8월: GPAI(범용 AI) 의무 - AI 기반 모델의 투명성
- 2026년 8월: 고위험 AI 종합 의무 - HR 및 신용 결정을 위한 AI 시스템 포함
- 2027년 8월: 모든 사용 사례에 대한 전체 AI Act 적용
ESG/CSRD 시스템을 구축하는 팀은 이미 규정 준수 계획을 세워야 합니다. 스택의 ML 구성 요소에 대한 AI 법: 범위 3 배출 추정 모델, ESG 데이터에 대한 이상 탐지, 공급업체 평가를 위한 NLP.
결론 및 다음 단계
CSRD 준수 시스템을 구축하는 것은 단순한 준수 활동이 아닙니다. 지속가능성 데이터 아키텍처를 생성할 수 있는 기회 수년간의 가치. 우리가 본 구성 요소 — ESRS 데이터 모델, 컬렉션 API 검증, FSM 워크플로우, ERP 통합, XBRL 생성기, 데이터 계보 - 모두입니다. 재사용 가능하고 모듈식입니다.
핵심 포인트:
- 중요성 평가부터 시작하세요. 이중 중요성 없이는 무엇을 알 수 없습니다 수집하다. 이는 실제 이해관계자의 참여 등이 필요한 프로세스입니다. 알고리즘.
- ERP 수집 자동화: ESRS E1 및 S1 데이터의 60-70%가 존재합니다. 이미 시스템에 있습니다. SAP 및 Oracle용 OData 커넥터는 투자입니다 몇 주 안에 회복됩니다.
- 지금 데이터 계보에 투자하세요. 외부 보험 비용 데이터 추적성에 직접적으로 의존합니다. 좋은 감사 추적 시스템 감사 비용을 20~30% 절감합니다.
- 옴니버스 패키지를 무시하지 마십시오. 2025년에 제안된 단순화 중소기업의 일부 의무를 완화할 수 있습니다. 입법 과정을 모니터링하되 모니터링하지는 않음 최종 승인 전에 계획을 세우세요.
시리즈의 다음 기사에서 — AI 탄소발자국 — AI 모델이 환경에 미치는 영향을 측정하고 줄이는 방법을 살펴보겠습니다. CodeCarbon과 같은 도구를 사용하여 프로덕션에서 추론하는 대규모 언어 모델, 지속 가능한 ML 엔지니어링을 위한 ML CO2 영향 및 전략.
자세히 알아볼 수 있는 리소스
- EFRAG ESRS 분류: efrag.org-ESRS XBRL 분류
- CSRD 공식 텍스트: EUR-Lex - 지침 2022/2464/EU
- GHG 프로토콜 기업 표준: ggprotocol.org
- SAP 지속 가능성 발자국 관리: sap.com/sustainability
- Python XBRL 라이브러리: arelle(오픈 소스), python-xbrl
- CSRD SaaS 도구: Workiva, 유역, Persefoni, 스윕, Greenly
그린 소프트웨어 엔지니어링 시리즈 - 탐색
이전 기사: GreenOps: 지속 가능한 DevOps — CI/CD 녹색, DevOps 팀을 위한 클라우드 규모 조정, FinOps 및 지속 가능성 지표를 제공합니다.
다음 기사: AI 탄소발자국 — 측정 및 감소 실용적인 도구와 벤치마크를 사용하여 훈련 및 추론에서 AI 모델의 방출 부문의.
이 시리즈는 여행의 일부입니다 데이터 & AI 사업 시리즈도 포함되어 있는 federicocalo.dev에서 비즈니스용 MLOps (ID 306-315) 대중에 대해 더 깊이 탐구하고 싶은 사람들을 위한 계산 효율성에 중점을 두고 AI 모델 제작에 거버넌스.







