ESG レポート API: CSRD ワークフローとの統合
Il 2024 年 1 月 1 日 ヨーロッパ企業にとって画期的な転換点となりました。 持続可能性報告指令 (CSRD) が発効し、ESG 報告が変革 自主的な実践から正確な技術基準に基づく法的義務まで。対象企業は、 準拠したレポートを作成できるようになりました 欧州持続可能性報告基準 (ESRS)、 XBRL 形式でタグ付けされ、外部保証の対象となる検証可能なデータを含む。
技術チームの課題は具体的です。数十の企業システムから ESG データを収集することです。 (ERP、エネルギー管理、人事、物流)、ESRS 分類に従って集計し、指標を計算します スコープ 1 ~ 3 の排出量や水の原単位など、承認プロセスを管理する マルチレベルであり、レギュレータ対応の iXBRL フォーマットで出力を生成します。これに必要なのはすべて 1つ 専用APIアーキテクチャ、共有 Excel シートではありません。
この記事では、ESRS 準拠のデータ モデルから REST API までの完全なシステムを構築します。 データの収集と検証から、統合と承認のワークフロー、生成まで XBRL 出力の。バックエンドには Python/FastAPI、構造化ストレージには PostgreSQL を使用します。 そして、SAP Sustainability と Oracle ESG を上流のデータソースとして統合する方法を分析します。
何を学ぶか
- CSRD タイムラインとアプリケーション境界: 誰がいつ何をしなければならないか
- ESRS E1 ~ E5、S1 ~ S4、G1: 標準構造と必須指標
- 二重の重要性評価: 影響 + 財務アルゴリズムの実装
- ESRS 準拠の ESG データ収集のための PostgreSQL データ モデル
- ESG データの収集、検証、集計のための REST/GraphQL API
- マルチレベルの承認と統合のためのワークフローの自動化
- Python による XBRL/iXBRL タグ付け: デジタル レポートの自動生成
- ERPの統合: SAP Sustainability Footprint Management、Oracle Fusion ESG
- 外部保証のためのデータ系統と監査証跡
- 完全なケーススタディ: 従業員 250 人の中規模製造会社
グリーン ソフトウェア エンジニアリング シリーズ
この記事は、グリーン ソフトウェア エンジニアリングに関する包括的なシリーズの 8 番目です。 コードエミッションの測定から規制上のESG報告まで:
| # | アイテム | 集中 |
|---|---|---|
| 1 | グリーン ソフトウェア エンジニアリングの原則 | GSF、SCI仕様、8つの基本原則 |
| 2 | CodeCarbon: コードエミッションの測定 | Python ライブラリ、ダッシュボード、CI/CD 統合 |
| 3 | Climatiq API: バックエンドでの GHG 計算 | REST API、スコープ 1 ~ 3、FastAPI 統合 |
| 4 | カーボンアウェア SDK | ワークロードのシフト、グリッドの強度、タイムシフト |
| 5 | スコープ 3 パイプライン | バリューチェーン排出量、サプライヤー、LCA |
| 6 | グリーン アーキテクチャ パターン | サーバーレス、イベント駆動型の持続可能なキャッシュ |
| 7 | GreenOps: 持続可能な DevOps | グリーン CI/CD、クラウドの適正化、FinOps |
| 8 | ESG レポート API: CSRD ワークフロー (この記事) | ESRS、XBRL、マテリアリティ、ERP統合 |
| 9 | AI の二酸化炭素排出量 | LLM トレーニング/推論、持続可能な ML |
| 10 | 高度なスコープモデリング | GHG プロトコルの方法論、特定の分野 |
CSRD: サステナビリティレポートの新しいパラダイム
CSRD (指令 2022/2464/EU) は非財務報告指令 (NFRD) に置き換わりました。 アプリケーションの境界を大幅に拡大し、レポートに必要な品質を高めます。 NFRD は約 11,000社 欧州諸国、CSRD には欧州諸国も関与 50,000+、初上場の中小企業および欧州子会社を含む 非EUグループ。
アプリケーションのタイムライン
| 日付 | 科目 | 第一報 |
|---|---|---|
| 2024年1月(2024年度) | すでに NFRD の対象となっている大企業 (従業員 500 人以上) | 2025年 |
| 2025年1月(2025年度) | EU の大企業 (従業員 250 人以上、または売上高 4,000 万ユーロ以上、または資産 2,000 万以上) | 2026年 |
| 2026年1月(2026年度) | 規制市場に上場している中小企業 (従業員 10 ~ 250 人) | 2027年 |
| 2028年1月(2028年度) | 非EUグループのEU子会社(EU売上高1億5,000万ユーロ超) | 2029年 |
オムニバスの簡素化 2025
2025 年 2 月、欧州委員会は簡素化を提案するオムニバス パッケージを発表しました。 CSRDにとって重要:中小企業の対象範囲の縮小、第2波企業の2年延期 3、および一部の ESRS データ ポイントの簡略化。しかし、 大波1社残留 完全に主題。この変更は、現時点ではまだ立法過程にあります。 この記事の出版。常に法律の最新状況を確認してください。
ESRS の構造
欧州持続可能性報告基準は 3 つのテーマ領域で構成されています。 加えて 2 つの標準的な横断的機能:
| 標準 | エリア | トピックス | 義務的 |
|---|---|---|---|
| ESRS1 | クロスカット | 一般的な要件と原則 | Si |
| ESRS2 | クロスカット | 一般情報(ガバナンス、戦略、マテリアリティ) | Si |
| ESRS E1 | 環境 | 気候変動(GHG、エネルギー、TCFD) | 素材なら |
| ESRS E2 | 環境 | 汚染(大気、水、土壌、物質) | 素材なら |
| ESRS E3 | 環境 | 水と海洋資源 | 素材なら |
| ESRS E4 | 環境 | 生物多様性と生態系 | 素材なら |
| ESRS E5 | 環境 | 資源利用と循環経済 | 素材なら |
| ESRS S1 | 社交 | 自社の従業員(労働条件、D&I、健康) | 素材なら |
| ESRS S2 | 社交 | バリューチェーンにおける労働者 | 素材なら |
| ESRS S3 | 社交 | 影響を受けるコミュニティ | 素材なら |
| ESRS S4 | 社交 | 消費者とエンドユーザー | 素材なら |
| ESRS G1 | ガバナンス | 企業活動(倫理、汚職防止、ロビー活動) | 素材なら |
ESRS E1 気候: 収集する必要がある指標
ESRS E1 は通常、データ収集に最もコストがかかるトピックです。開示が必要です GHG 排出量 (スコープ 1、2、3)、科学に基づいた削減目標 (SBTi)、エネルギーについて TCFDに基づく、供給源別の消費量と気候リスク/機会。見てみましょう 必須のデータポイント 最も関連性の高いもの:
GHG 排出量 - 必須指標 ESRS E1-6
| データポイント | ユニット | 説明 |
|---|---|---|
| スコープ1の総温室効果ガス排出量 | tCO2当量 | 所有または管理されている発生源からの直接排出 |
| スコープ2のGHG排出量(場所ベース) | tCO2当量 | エネルギー購入による間接排出、位置特定方法 |
| スコープ2のGHG排出量(市場ベース) | tCO2当量 | 間接排出、市場方式(認証) |
| スコープ3のGHG排出量(15カテゴリー) | tCO2当量 | バリューチェーンにおける間接排出 |
| GHG原単位(収益) | tCO2eq / 百万ユーロ | 売上高単位あたりの強度 |
| 総エネルギー消費量 | MWh | 総エネルギー消費量 |
| 再生可能エネルギーのシェア | % | 再生可能資源からのエネルギーの割合 |
| GHGの除去と貯蔵 | tCO2当量 | 植林、CCSなどによる炭素除去 |
二重の重要性評価: アルゴリズムの実装
CSRD の方法論の中心となるのは、 二重の物質性:各社 次の 2 つの観点を同時に考慮して、どの ESG トピックが重要であるかを評価する必要があります。
- 影響の重要性: 会社は重大な影響を及ぼします(プラスまたはマイナス、 現在または潜在的な)そのトピックに関する人々や環境について?
- 財務上の重要性: ESG トピックはリスクまたは機会を生み出す 会社にとって重要な金融資産(短期、中期、長期)はありますか?
トピックは、2 つの基準のうち少なくとも 1 つを満たす場合に重要です。実装方法を見てみましょう このプロセスは専用の API を使用して行われます。
# materiality_assessment.py
# Implementazione Double Materiality Assessment per CSRD ESRS 2-IRO
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
import uuid
from datetime import datetime
class MaterialityDimension(str, Enum):
IMPACT = "impact"
FINANCIAL = "financial"
class ImpactType(str, Enum):
ACTUAL_NEGATIVE = "actual_negative"
ACTUAL_POSITIVE = "actual_positive"
POTENTIAL_NEGATIVE = "potential_negative"
POTENTIAL_POSITIVE = "potential_positive"
class TimeHorizon(str, Enum):
SHORT_TERM = "short_term" # 0-1 anno
MEDIUM_TERM = "medium_term" # 1-5 anni
LONG_TERM = "long_term" # oltre 5 anni
@dataclass
class ImpactMaterialityScore:
"""
Score impact materiality per un topic ESG.
Basato su: severity x likelihood (per impatti negativi)
o scale x likelihood (per impatti positivi)
"""
topic_id: str
impact_type: ImpactType
# Score 1-5 per ogni dimensione
scale: int # Quante persone/ecosistemi impattati
scope: int # Reversibilita' dell'impatto
irremediable: int # Difficolta' di rimediare
likelihood: int # Probabilità' che accada
@property
def severity(self) -> float:
"""Severity = media pesata di scale, scope, irremediable"""
return (self.scale * 0.4 + self.scope * 0.3 + self.irremediable * 0.3)
@property
def score(self) -> float:
"""Score finale 1-25 per ordinamento"""
return self.severity * self.likelihood
@property
def is_material(self) -> bool:
"""Soglia materialita': score >= 9 (severity >= 3, likelihood >= 3)"""
return self.score >= 9.0
@dataclass
class FinancialMaterialityScore:
"""
Score financial materiality per un topic ESG.
Basato su: magnitude x likelihood, per time horizon
"""
topic_id: str
is_risk: bool # True = rischio, False = opportunità'
magnitude: int # 1-5: impatto finanziario potenziale
likelihood: int # 1-5: probabilità'
time_horizon: TimeHorizon
# Fattori qualitativi
quantifiable: bool # Possiamo quantificarlo in EUR?
estimated_impact_eur: Optional[float] = None
@property
def score(self) -> float:
"""Score base con discount per time horizon"""
base = self.magnitude * self.likelihood
# Discount: short=1.0, medium=0.8, long=0.6
discount = {
TimeHorizon.SHORT_TERM: 1.0,
TimeHorizon.MEDIUM_TERM: 0.8,
TimeHorizon.LONG_TERM: 0.6
}[self.time_horizon]
return base * discount
@property
def is_material(self) -> bool:
return self.score >= 7.5 # magnitude >= 3, likelihood >= 3 con discount short
@dataclass
class TopicMaterialityResult:
topic_id: str
topic_name: str
esrs_standard: str
impact_scores: list[ImpactMaterialityScore] = field(default_factory=list)
financial_scores: list[FinancialMaterialityScore] = field(default_factory=list)
@property
def impact_material(self) -> bool:
return any(s.is_material for s in self.impact_scores)
@property
def financial_material(self) -> bool:
return any(s.is_material for s in self.financial_scores)
@property
def is_material(self) -> bool:
"""Double materiality: materiale se almeno una dimensione e' materiale"""
return self.impact_material or self.financial_material
@property
def materiality_level(self) -> str:
if self.impact_material and self.financial_material:
return "DOUBLE_MATERIAL"
elif self.impact_material:
return "IMPACT_MATERIAL"
elif self.financial_material:
return "FINANCIAL_MATERIAL"
else:
return "NOT_MATERIAL"
def run_materiality_assessment(
company_id: str,
topics: list[dict]
) -> list[TopicMaterialityResult]:
"""
Esegue il materiality assessment completo.
Input: lista topic con scores raccolti via stakeholder engagement.
Output: ranking per materialita'.
"""
results = []
for topic_data in topics:
result = TopicMaterialityResult(
topic_id=topic_data["topic_id"],
topic_name=topic_data["topic_name"],
esrs_standard=topic_data["esrs_standard"]
)
# Aggiungi impact scores
for impact in topic_data.get("impact_scores", []):
result.impact_scores.append(ImpactMaterialityScore(**impact))
# Aggiungi financial scores
for financial in topic_data.get("financial_scores", []):
result.financial_scores.append(FinancialMaterialityScore(**financial))
results.append(result)
# Ordina per materialita' discendente
return sorted(
results,
key=lambda r: (
r.is_material,
r.impact_material and r.financial_material,
max((s.score for s in r.impact_scores), default=0) +
max((s.score for s in r.financial_scores), default=0)
),
reverse=True
)
CSRD 用の PostgreSQL データ モデル
CSRD 準拠のシステムには、ESRS の構造を反映するデータベース スキーマが必要です。 データ系統をサポートし、監査証跡の変更履歴を維持します。ここにあります コアスキーム:
-- Schema PostgreSQL per CSRD ESG Data Collection
-- Ottimizzato per ESRS data points con audit trail completo
-- Tabella aziende e perimetro di consolidamento
CREATE TABLE companies (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
legal_name VARCHAR(255) NOT NULL,
lei_code VARCHAR(20), -- Legal Entity Identifier
country_code CHAR(2) NOT NULL,
nace_code VARCHAR(10), -- Classificazione attivita'
fiscal_year_end DATE,
employees_fte DECIMAL(10,2),
revenue_eur DECIMAL(20,2),
total_assets_eur DECIMAL(20,2),
is_parent BOOLEAN DEFAULT FALSE,
parent_id UUID REFERENCES companies(id),
csrd_wave INTEGER, -- 1, 2, 3, 4
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Periodi di reporting
CREATE TABLE reporting_periods (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL REFERENCES companies(id),
year INTEGER NOT NULL,
period_type VARCHAR(20) DEFAULT 'ANNUAL',
start_date DATE NOT NULL,
end_date DATE NOT NULL,
status VARCHAR(30) DEFAULT 'DRAFT',
-- DRAFT, COLLECTING, REVIEWING, APPROVED, FILED
submitted_at TIMESTAMPTZ,
approved_by UUID,
UNIQUE (company_id, year, period_type)
);
-- Catalog ESRS data points (reference table)
CREATE TABLE esrs_data_points (
id VARCHAR(50) PRIMARY KEY,
-- es. "E1-6_GrossScope1", "S1-7_GenderPayGap"
esrs_standard VARCHAR(10) NOT NULL, -- E1, E2, S1, G1...
topic VARCHAR(100) NOT NULL,
sub_topic VARCHAR(100),
name VARCHAR(255) NOT NULL,
description TEXT,
unit_of_measure VARCHAR(50),
data_type VARCHAR(20), -- NUMERIC, BOOLEAN, TEXT, DATE, ENUM
is_mandatory BOOLEAN DEFAULT FALSE,
is_phase_in BOOLEAN DEFAULT FALSE, -- Permesso rinvio anni 1-3
ghg_protocol_scope VARCHAR(10), -- S1, S2_LB, S2_MB, S3
scope3_category INTEGER, -- 1-15 per Scope 3
tags TEXT[]
);
-- Raccolta dati ESG per periodo
CREATE TABLE esg_data_submissions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL REFERENCES companies(id),
reporting_period_id UUID NOT NULL REFERENCES reporting_periods(id),
data_point_id VARCHAR(50) NOT NULL REFERENCES esrs_data_points(id),
-- Valori (uno solo populated in base a data_type)
value_numeric DECIMAL(30, 6),
value_boolean BOOLEAN,
value_text TEXT,
value_date DATE,
-- Metadata qualità'
unit_of_measure VARCHAR(50),
estimation_method VARCHAR(100),
confidence_level VARCHAR(20), -- HIGH, MEDIUM, LOW
is_estimated BOOLEAN DEFAULT FALSE,
-- Data lineage
source_system VARCHAR(100), -- SAP, Oracle, Manual, API
source_document_ref VARCHAR(255),
collected_by UUID,
collection_timestamp TIMESTAMPTZ DEFAULT NOW(),
-- Workflow
status VARCHAR(30) DEFAULT 'DRAFT',
-- DRAFT, SUBMITTED, REVIEWED, APPROVED, REJECTED
reviewed_by UUID,
review_timestamp TIMESTAMPTZ,
review_comment TEXT,
approved_by UUID,
approval_timestamp TIMESTAMPTZ,
-- Versioning per audit
version INTEGER DEFAULT 1,
previous_version_id UUID REFERENCES esg_data_submissions(id),
CONSTRAINT unique_submission UNIQUE (
company_id, reporting_period_id, data_point_id, version
)
);
-- Indici per performance
CREATE INDEX idx_submissions_period ON esg_data_submissions(reporting_period_id);
CREATE INDEX idx_submissions_status ON esg_data_submissions(status);
CREATE INDEX idx_submissions_datapoint ON esg_data_submissions(data_point_id);
-- Audit log immutabile
CREATE TABLE audit_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
entity_type VARCHAR(50) NOT NULL,
entity_id UUID NOT NULL,
action VARCHAR(20) NOT NULL, -- CREATE, UPDATE, DELETE, APPROVE
actor_id UUID NOT NULL,
actor_email VARCHAR(255),
timestamp TIMESTAMPTZ DEFAULT NOW(),
ip_address INET,
old_value JSONB,
new_value JSONB,
change_reason TEXT
);
-- View aggregata emissioni GHG per periodo
CREATE OR REPLACE VIEW v_ghg_emissions_summary AS
SELECT
c.legal_name,
rp.year,
SUM(CASE WHEN dp.id LIKE 'E1%Scope1%' THEN eds.value_numeric ELSE 0 END) AS scope1_tco2eq,
SUM(CASE WHEN dp.id LIKE 'E1%Scope2%LB%' THEN eds.value_numeric ELSE 0 END) AS scope2_lb_tco2eq,
SUM(CASE WHEN dp.id LIKE 'E1%Scope2%MB%' THEN eds.value_numeric ELSE 0 END) AS scope2_mb_tco2eq,
SUM(CASE WHEN dp.id LIKE 'E1%Scope3%' THEN eds.value_numeric ELSE 0 END) AS scope3_tco2eq,
SUM(CASE WHEN dp.id = 'E1-5_TotalEnergy' THEN eds.value_numeric ELSE 0 END) AS total_energy_mwh,
SUM(CASE WHEN dp.id = 'E1-5_RenewableEnergy' THEN eds.value_numeric ELSE 0 END) AS renewable_mwh
FROM esg_data_submissions eds
JOIN companies c ON c.id = eds.company_id
JOIN reporting_periods rp ON rp.id = eds.reporting_period_id
JOIN esrs_data_points dp ON dp.id = eds.data_point_id
WHERE eds.status = 'APPROVED'
GROUP BY c.legal_name, rp.year;
API アーキテクチャ: ESG データの収集と検証
CSRD システムの API アーキテクチャは、次の 3 つの主要な層に分かれています。 ビジネスユニットとソースシステム、ESRSルールに従った検証と強化、e グループ統合のための集計。 FastAPI を使用してバックエンドを実装しましょう。
# main.py - FastAPI ESG Reporting API
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from uuid import UUID
from datetime import datetime, date
from decimal import Decimal
import asyncpg
import json
app = FastAPI(
title="CSRD ESG Reporting API",
description="API per raccolta e gestione dati ESG conformi ESRS",
version="1.0.0"
)
# === PYDANTIC MODELS ===
class ESGDataPointSubmission(BaseModel):
data_point_id: str = Field(..., description="ID ESRS data point (es. E1-6_GrossScope1)")
value_numeric: Optional[Decimal] = None
value_boolean: Optional[bool] = None
value_text: Optional[str] = None
value_date: Optional[date] = None
unit_of_measure: Optional[str] = None
estimation_method: Optional[str] = None
confidence_level: str = Field(default="MEDIUM", pattern="^(HIGH|MEDIUM|LOW)$")
is_estimated: bool = False
source_system: Optional[str] = None
source_document_ref: Optional[str] = None
@validator('value_numeric')
def validate_numeric(cls, v, values):
if v is not None and v < 0:
raise ValueError('Valori numerici GHG non possono essere negativi')
return v
class BulkESGSubmission(BaseModel):
reporting_period_id: UUID
submissions: List[ESGDataPointSubmission]
submission_note: Optional[str] = None
class DataPointValidationResult(BaseModel):
data_point_id: str
is_valid: bool
errors: List[str] = []
warnings: List[str] = []
class ValidationReport(BaseModel):
period_id: UUID
total_mandatory_points: int
submitted_points: int
approved_points: int
coverage_pct: float
validation_results: List[DataPointValidationResult]
overall_valid: bool
# === VALIDATION SERVICE ===
class ESRSValidationService:
"""
Validazione regole ESRS per data points.
Implementa controlli incrociati richiesti dallo standard.
"""
CONSISTENCY_RULES = [
{
"rule": "SCOPE1_PLUS_SCOPE2_CONSISTENCY",
"description": "Total GHG = Scope1 + Scope2 (market) deve essere coerente",
"severity": "ERROR"
},
{
"rule": "ENERGY_GHG_RATIO",
"description": "Intensità' GHG su energia deve essere plausibile",
"severity": "WARNING"
},
{
"rule": "RENEWABLE_WITHIN_TOTAL",
"description": "Energia rinnovabile non può' superare totale energia",
"severity": "ERROR"
},
{
"rule": "YOY_VARIANCE_THRESHOLD",
"description": "Variazione YoY > 30% richiede commento",
"severity": "WARNING"
}
]
async def validate_submission(
self,
submissions: List[ESGDataPointSubmission],
period_id: UUID,
db_pool
) -> ValidationReport:
errors_by_point = {}
warnings_by_point = {}
# 1. Validazione singoli data point
for sub in submissions:
errors = []
warnings = []
# Carica definizione data point da catalogo
dp_def = await self._get_data_point_def(sub.data_point_id, db_pool)
if not dp_def:
errors.append(f"Data point {sub.data_point_id} non trovato nel catalogo ESRS")
else:
# Verifica unita' di misura
if dp_def['unit_of_measure'] and sub.unit_of_measure:
if sub.unit_of_measure != dp_def['unit_of_measure']:
warnings.append(
f"Unit of measure {sub.unit_of_measure} diversa da "
f"attesa {dp_def['unit_of_measure']}"
)
# Verifica tipo dato
if dp_def['data_type'] == 'NUMERIC' and sub.value_numeric is None:
errors.append(f"Data point numerico richiede value_numeric")
errors_by_point[sub.data_point_id] = errors
warnings_by_point[sub.data_point_id] = warnings
# 2. Controlli incrociati
await self._run_cross_checks(submissions, errors_by_point, warnings_by_point)
# 3. Calcola copertura mandatory
mandatory_coverage = await self._calc_mandatory_coverage(period_id, submissions, db_pool)
results = [
DataPointValidationResult(
data_point_id=dp_id,
is_valid=len(errs) == 0,
errors=errs,
warnings=warnings_by_point.get(dp_id, [])
)
for dp_id, errs in errors_by_point.items()
]
overall_valid = all(r.is_valid for r in results)
return ValidationReport(
period_id=period_id,
total_mandatory_points=mandatory_coverage['total'],
submitted_points=mandatory_coverage['submitted'],
approved_points=mandatory_coverage['approved'],
coverage_pct=mandatory_coverage['coverage_pct'],
validation_results=results,
overall_valid=overall_valid
)
async def _run_cross_checks(self, submissions, errors_by_point, warnings_by_point):
"""Controlli incrociati tra data points correlati."""
values = {s.data_point_id: s.value_numeric for s in submissions if s.value_numeric}
# Energia rinnovabile <= totale energia
renewable = values.get('E1-5_RenewableEnergy')
total_energy = values.get('E1-5_TotalEnergy')
if renewable and total_energy and renewable > total_energy:
errors_by_point.setdefault('E1-5_RenewableEnergy', []).append(
"Energia rinnovabile non può' superare energia totale"
)
async def _get_data_point_def(self, dp_id: str, db_pool):
async with db_pool.acquire() as conn:
return await conn.fetchrow(
"SELECT * FROM esrs_data_points WHERE id = $1", dp_id
)
async def _calc_mandatory_coverage(self, period_id, submissions, db_pool):
async with db_pool.acquire() as conn:
total = await conn.fetchval(
"SELECT COUNT(*) FROM esrs_data_points WHERE is_mandatory = TRUE"
)
submitted = len([s for s in submissions if s.value_numeric is not None
or s.value_boolean is not None or s.value_text is not None])
return {
'total': total or 0,
'submitted': submitted,
'approved': 0,
'coverage_pct': (submitted / total * 100) if total else 0
}
# === ENDPOINTS ===
@app.post("/api/v1/periods/{period_id}/submissions/bulk")
async def bulk_submit_esg_data(
period_id: UUID,
payload: BulkESGSubmission,
background_tasks: BackgroundTasks,
db_pool = Depends(get_db_pool)
):
"""
Submission bulk di dati ESG per un periodo.
Accetta fino a 500 data points per chiamata.
"""
if len(payload.submissions) > 500:
raise HTTPException(400, "Max 500 data points per richiesta")
validator = ESRSValidationService()
validation_report = await validator.validate_submission(
payload.submissions, period_id, db_pool
)
if not validation_report.overall_valid:
raise HTTPException(422, {
"message": "Validation failed",
"validation_report": validation_report.dict()
})
# Salva in batch
async with db_pool.acquire() as conn:
async with conn.transaction():
for sub in payload.submissions:
await conn.execute("""
INSERT INTO esg_data_submissions
(company_id, reporting_period_id, data_point_id,
value_numeric, value_boolean, value_text,
unit_of_measure, estimation_method, confidence_level,
is_estimated, source_system, source_document_ref,
status)
SELECT
rp.company_id, $1, $2,
$3, $4, $5,
$6, $7, $8,
$9, $10, $11,
'SUBMITTED'
FROM reporting_periods rp WHERE rp.id = $1
ON CONFLICT (company_id, reporting_period_id, data_point_id, version)
DO UPDATE SET
value_numeric = EXCLUDED.value_numeric,
status = 'SUBMITTED',
collection_timestamp = NOW()
""",
period_id,
str(sub.data_point_id),
sub.value_numeric,
sub.value_boolean,
sub.value_text,
sub.unit_of_measure,
sub.estimation_method,
sub.confidence_level,
sub.is_estimated,
sub.source_system,
sub.source_document_ref
)
# Trigger notifica al reviewer in background
background_tasks.add_task(notify_reviewers, period_id, len(payload.submissions))
return {
"status": "submitted",
"period_id": str(period_id),
"submitted_count": len(payload.submissions),
"validation_report": validation_report.dict()
}
@app.get("/api/v1/periods/{period_id}/validation-report")
async def get_validation_report(
period_id: UUID,
db_pool = Depends(get_db_pool)
):
"""Restituisce report di completezza e validità per un periodo."""
async with db_pool.acquire() as conn:
# Aggregazione stato submissions per mandatory data points
rows = await conn.fetch("""
SELECT
dp.id as data_point_id,
dp.name,
dp.is_mandatory,
dp.unit_of_measure,
eds.status,
eds.value_numeric,
eds.confidence_level
FROM esrs_data_points dp
LEFT JOIN esg_data_submissions eds ON
eds.data_point_id = dp.id
AND eds.reporting_period_id = $1
WHERE dp.is_mandatory = TRUE
ORDER BY dp.esrs_standard, dp.id
""", period_id)
missing_mandatory = [r for r in rows if r['status'] is None]
approved = [r for r in rows if r['status'] == 'APPROVED']
return {
"period_id": str(period_id),
"total_mandatory": len(rows),
"approved": len(approved),
"missing": len(missing_mandatory),
"coverage_pct": round(len(approved) / len(rows) * 100, 1) if rows else 0,
"missing_data_points": [
{"id": r['data_point_id'], "name": r['name']}
for r in missing_mandatory
]
}
async def notify_reviewers(period_id: UUID, count: int):
"""Background task: notifica reviewer via email/webhook."""
print(f"[NOTIFY] Period {period_id}: {count} data points submitted for review")
async def get_db_pool():
"""Dependency injection per asyncpg pool."""
# In produzione: pool globale inizializzato al startup
pool = await asyncpg.create_pool(
"postgresql://user:password@localhost/esg_db",
min_size=5,
max_size=20
)
try:
yield pool
finally:
await pool.close()
ワークフローの自動化: マルチレベルの承認
エンタープライズ CSRD システムには、からの収集を管理する構造化されたワークフローが必要です。 複数のビジネスユニット、レビューと承認サイクル、最終的な統合。 ワークフロー エンジンの実装は次のとおりです。
# workflow_engine.py
# Workflow CSRD multi-livello: collect -> review -> approve -> file
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional
import asyncio
class WorkflowState(str, Enum):
DRAFT = "DRAFT"
DATA_COLLECTION = "DATA_COLLECTION"
INTERNAL_REVIEW = "INTERNAL_REVIEW"
CFO_APPROVAL = "CFO_APPROVAL"
EXTERNAL_ASSURANCE = "EXTERNAL_ASSURANCE"
BOARD_APPROVAL = "BOARD_APPROVAL"
FILED = "FILED"
REJECTED = "REJECTED"
class WorkflowTransition:
"""Definisce una transizione di stato valida con condizioni e azioni."""
def __init__(
self,
from_state: WorkflowState,
to_state: WorkflowState,
action: str,
condition: Optional[Callable] = None,
pre_actions: list = None,
post_actions: list = None
):
self.from_state = from_state
self.to_state = to_state
self.action = action
self.condition = condition
self.pre_actions = pre_actions or []
self.post_actions = post_actions or []
# Definizione FSM per workflow CSRD
CSRD_WORKFLOW_TRANSITIONS = [
WorkflowTransition(
from_state=WorkflowState.DRAFT,
to_state=WorkflowState.DATA_COLLECTION,
action="START_COLLECTION",
post_actions=["send_collection_invites", "set_deadlines"]
),
WorkflowTransition(
from_state=WorkflowState.DATA_COLLECTION,
to_state=WorkflowState.INTERNAL_REVIEW,
action="SUBMIT_FOR_REVIEW",
condition=lambda ctx: ctx.get("mandatory_coverage_pct", 0) >= 80,
pre_actions=["run_validation", "generate_completeness_report"]
),
WorkflowTransition(
from_state=WorkflowState.INTERNAL_REVIEW,
to_state=WorkflowState.DATA_COLLECTION,
action="REQUEST_CORRECTIONS",
post_actions=["notify_data_owners", "log_review_comments"]
),
WorkflowTransition(
from_state=WorkflowState.INTERNAL_REVIEW,
to_state=WorkflowState.CFO_APPROVAL,
action="APPROVE_INTERNAL_REVIEW",
condition=lambda ctx: ctx.get("reviewer_approved", False),
pre_actions=["generate_draft_report"]
),
WorkflowTransition(
from_state=WorkflowState.CFO_APPROVAL,
to_state=WorkflowState.EXTERNAL_ASSURANCE,
action="CFO_SIGN_OFF",
post_actions=["prepare_assurance_package", "notify_auditor"]
),
WorkflowTransition(
from_state=WorkflowState.EXTERNAL_ASSURANCE,
to_state=WorkflowState.BOARD_APPROVAL,
action="ASSURANCE_COMPLETE",
condition=lambda ctx: ctx.get("assurance_opinion") in ["UNQUALIFIED", "QUALIFIED"],
post_actions=["attach_assurance_report"]
),
WorkflowTransition(
from_state=WorkflowState.BOARD_APPROVAL,
to_state=WorkflowState.FILED,
action="BOARD_APPROVE_AND_FILE",
post_actions=["generate_ixbrl", "submit_to_oam", "publish_report"]
),
]
class CSRDWorkflowEngine:
def __init__(self, db_pool):
self.db_pool = db_pool
self.transitions = {
(t.from_state, t.action): t
for t in CSRD_WORKFLOW_TRANSITIONS
}
async def execute_transition(
self,
period_id: str,
action: str,
actor_id: str,
context: dict = None
) -> dict:
"""
Esegue una transizione di workflow per un periodo di reporting.
Restituisce il nuovo stato o errore se la transizione non e' valida.
"""
context = context or {}
async with self.db_pool.acquire() as conn:
# Carica stato corrente
period = await conn.fetchrow(
"SELECT status, company_id FROM reporting_periods WHERE id = $1",
period_id
)
if not period:
raise ValueError(f"Periodo {period_id} non trovato")
current_state = WorkflowState(period['status'])
transition_key = (current_state, action)
if transition_key not in self.transitions:
raise ValueError(
f"Transizione {action} non valida da stato {current_state.value}"
)
transition = self.transitions[transition_key]
# Verifica condizione (arricchisci context con dati DB se necessario)
if transition.condition:
enriched_ctx = await self._enrich_context(period_id, context, conn)
if not transition.condition(enriched_ctx):
raise ValueError(
f"Condizione per {action} non soddisfatta: "
f"mandatory coverage = {enriched_ctx.get('mandatory_coverage_pct', 0)}%"
)
# Esegui pre-actions
for pre_action in transition.pre_actions:
await self._execute_action(pre_action, period_id, context, conn)
# Aggiorna stato
await conn.execute(
"UPDATE reporting_periods SET status = $1, updated_at = NOW() WHERE id = $2",
transition.to_state.value,
period_id
)
# Audit log
await conn.execute("""
INSERT INTO audit_log (entity_type, entity_id, action, actor_id, new_value)
VALUES ('reporting_period', $1, $2, $3, $4)
""", period_id, action, actor_id,
json.dumps({"from": current_state.value, "to": transition.to_state.value}))
# Esegui post-actions in background (notifiche, generazione documenti)
for post_action in transition.post_actions:
asyncio.create_task(
self._execute_action_background(post_action, period_id, context)
)
return {
"period_id": period_id,
"previous_state": current_state.value,
"new_state": transition.to_state.value,
"action": action,
"actor_id": actor_id,
"timestamp": datetime.now().isoformat()
}
async def _enrich_context(self, period_id: str, context: dict, conn) -> dict:
"""Arricchisce context con dati calcolati dal DB."""
mandatory_total = await conn.fetchval(
"SELECT COUNT(*) FROM esrs_data_points WHERE is_mandatory = TRUE"
)
approved = await conn.fetchval("""
SELECT COUNT(*) FROM esg_data_submissions
WHERE reporting_period_id = $1 AND status = 'APPROVED'
""", period_id)
enriched = dict(context)
enriched['mandatory_coverage_pct'] = (
(approved / mandatory_total * 100) if mandatory_total else 0
)
return enriched
async def _execute_action(self, action_name: str, period_id: str, context: dict, conn):
"""Esegue un'azione sincrona nel workflow."""
action_map = {
"run_validation": self._action_run_validation,
"generate_completeness_report": self._action_gen_completeness,
"generate_draft_report": self._action_gen_draft,
}
if action_name in action_map:
await action_map[action_name](period_id, context, conn)
async def _action_run_validation(self, period_id, context, conn):
print(f"[WORKFLOW] Running validation for period {period_id}")
async def _action_gen_completeness(self, period_id, context, conn):
print(f"[WORKFLOW] Generating completeness report for {period_id}")
async def _action_gen_draft(self, period_id, context, conn):
print(f"[WORKFLOW] Generating draft CSRD report for {period_id}")
async def _execute_action_background(self, action_name: str, period_id: str, context: dict):
"""Esegue azione in background (notifiche, documenti)."""
print(f"[BACKGROUND] Executing {action_name} for period {period_id}")
XBRL/iXBRL タグ付け: デジタル レポートの生成
CSRD は、年次報告書が次の形式で作成されることを要求しています。 タグ付きの XHTML iXBRL (インライン XBRL)。構造化データを読み取り可能な HTML に直接埋め込みます。 EFRAG は、各データ ポイントを iXBRL タグにマッピングする ESRS XBRL 分類法を公開しています。 自動生成を実装しましょう。
# xbrl_generator.py
# Generazione iXBRL per CSRD secondo ESRS Taxonomy
from lxml import etree
from decimal import Decimal
from typing import Optional
import uuid
# ESRS XBRL Taxonomy namespace
ESRS_NS = "https://xbrl.efrag.org/taxonomy/draft-esrs/2022-11-22"
XBRLI_NS = "http://www.xbrl.org/2003/instance"
IX_NS = "http://www.xbrl.org/2013/inlineXBRL"
LINK_NS = "http://www.xbrl.org/2003/linkbase"
class ESRSXBRLGenerator:
"""
Genera documenti iXBRL conformi alla ESRS Taxonomy.
Output: XHTML con embedded XBRL facts.
"""
def __init__(self, company_data: dict, period_data: dict):
self.company = company_data
self.period = period_data
self.context_id = f"ctx_{uuid.uuid4().hex[:8]}"
self.facts = []
def add_fact(
self,
element_name: str,
value,
unit: Optional[str] = None,
decimals: int = 0,
scale: int = 0
):
"""Aggiunge un fact XBRL al report."""
self.facts.append({
"element": element_name,
"value": value,
"unit": unit,
"decimals": decimals,
"scale": scale
})
def generate_ixbrl(self) -> str:
"""
Genera il documento iXBRL completo.
Returns: stringa XHTML con tagging iXBRL embedded.
"""
# Namespace map per il documento
nsmap = {
"ix": IX_NS,
"xbrli": XBRLI_NS,
"esrs": ESRS_NS,
None: "http://www.w3.org/1999/xhtml"
}
# Root XHTML
html = etree.Element("html", nsmap=nsmap)
head = etree.SubElement(html, "head")
body = etree.SubElement(html, "body")
# Header XBRL nel head
ix_header = etree.SubElement(head, f"{{IX_NS}}header")
hidden = etree.SubElement(ix_header, f"{{IX_NS}}hidden")
# Context: entità' e periodo
resources = etree.SubElement(ix_header, f"{{IX_NS}}resources")
xbrli_context = etree.SubElement(resources, f"{{XBRLI_NS}}context")
xbrli_context.set("id", self.context_id)
entity = etree.SubElement(xbrli_context, f"{{XBRLI_NS}}entity")
identifier = etree.SubElement(entity, f"{{XBRLI_NS}}identifier")
identifier.set("scheme", "http://www.lei.info")
identifier.text = self.company.get("lei_code", "000000000000000000")
period = etree.SubElement(xbrli_context, f"{{XBRLI_NS}}period")
start_date = etree.SubElement(period, f"{{XBRLI_NS}}startDate")
start_date.text = self.period.get("start_date", "2024-01-01")
end_date = etree.SubElement(period, f"{{XBRLI_NS}}endDate")
end_date.text = self.period.get("end_date", "2024-12-31")
# Unit monetaria e tCO2eq
unit_co2 = etree.SubElement(resources, f"{{XBRLI_NS}}unit")
unit_co2.set("id", "tCO2eq")
measure_co2 = etree.SubElement(unit_co2, f"{{XBRLI_NS}}measure")
measure_co2.text = "esrs:metricTon"
# Body: report HTML con facts inline
h1 = etree.SubElement(body, "h1")
h1.text = f"CSRD Sustainability Report {self.period.get('year', 2024)}"
h2 = etree.SubElement(body, "h2")
h2.text = "E1 - Climate Change: GHG Emissions"
# Tabella emissioni con tagging iXBRL inline
table = etree.SubElement(body, "table")
thead = etree.SubElement(table, "thead")
tr_head = etree.SubElement(thead, "tr")
etree.SubElement(tr_head, "th").text = "Metric"
etree.SubElement(tr_head, "th").text = "Value"
etree.SubElement(tr_head, "th").text = "Unit"
tbody = etree.SubElement(table, "tbody")
# Aggiungi facts come righe di tabella con tag iXBRL
for fact in self.facts:
tr = etree.SubElement(tbody, "tr")
etree.SubElement(tr, "td").text = fact["element"].split(":")[-1]
td_value = etree.SubElement(tr, "td")
# Tag iXBRL inline per il valore
ix_nonfraction = etree.SubElement(
td_value,
f"{{IX_NS}}nonFraction"
)
ix_nonfraction.set("name", fact["element"])
ix_nonfraction.set("contextRef", self.context_id)
ix_nonfraction.set("unitRef", fact.get("unit", "tCO2eq"))
ix_nonfraction.set("decimals", str(fact.get("decimals", 0)))
ix_nonfraction.set("scale", str(fact.get("scale", 0)))
ix_nonfraction.set("format", "ixt:num-dot-decimal")
ix_nonfraction.text = str(fact["value"])
etree.SubElement(tr, "td").text = fact.get("unit", "tCO2eq")
# Serializza come XHTML
return etree.tostring(
html,
pretty_print=True,
xml_declaration=True,
encoding="UTF-8",
method="xml"
).decode("utf-8")
def generate_csrd_report_from_db(approved_submissions: list, company: dict, period: dict) -> str:
"""
Genera report iXBRL da submissions approvate nel DB.
"""
generator = ESRSXBRLGenerator(company, period)
# Mapping data_point_id -> XBRL element name
ESRS_XBRL_MAP = {
"E1-6_GrossScope1": "esrs:GrossScope1GHGEmissions",
"E1-6_GrossScope2LB": "esrs:GrossScope2GHGEmissionsLocationBased",
"E1-6_GrossScope2MB": "esrs:GrossScope2GHGEmissionsMarketBased",
"E1-6_GrossScope3Total": "esrs:GrossScope3GHGEmissions",
"E1-5_TotalEnergy": "esrs:TotalEnergyConsumption",
"E1-5_RenewableEnergy": "esrs:EnergyConsumptionFromRenewableSources",
"E1-5_NonRenewableEnergy": "esrs:EnergyConsumptionFromNonRenewableSources",
}
for sub in approved_submissions:
element = ESRS_XBRL_MAP.get(sub['data_point_id'])
if element and sub['value_numeric'] is not None:
generator.add_fact(
element_name=element,
value=float(sub['value_numeric']),
unit=sub.get('unit_of_measure', 'tCO2eq'),
decimals=0
)
return generator.generate_ixbrl()
ERP の統合: SAP Sustainability と Oracle ESG
ほとんどの ESG データはすでに企業 ERP システムに存在しています: エネルギー消費 PM/PM モジュールでは人事データ、SuccessFactors では物流、TM モジュールではロジスティックスが含まれます。コネクタを実装します エンタープライズ市場の 2 つの主要なプラットフォーム向け:
# erp_connectors.py
# Connettori per SAP Sustainability Footprint Management (SFM)
# e Oracle Fusion ESG
import httpx
import asyncio
from abc import ABC, abstractmethod
from typing import Optional
class ERPConnector(ABC):
"""Interfaccia base per connettori ERP."""
@abstractmethod
async def get_energy_consumption(self, year: int, entity_id: str) -> dict:
pass
@abstractmethod
async def get_ghg_emissions(self, year: int, entity_id: str) -> dict:
pass
@abstractmethod
async def get_workforce_metrics(self, year: int, entity_id: str) -> dict:
pass
class SAPSustainabilityConnector(ERPConnector):
"""
Connettore per SAP Sustainability Footprint Management (SFM).
Usa SAP OData API v4.
API base: /sap/opu/odata4/sap/api_sustainability_footprint/
"""
def __init__(
self,
base_url: str,
client_id: str,
client_secret: str,
token_url: str
):
self.base_url = base_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self._token: Optional[str] = None
self._client = httpx.AsyncClient(timeout=30)
async def _get_token(self) -> str:
"""OAuth2 client credentials per SAP BTP."""
if self._token:
return self._token
response = await self._client.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
}
)
response.raise_for_status()
self._token = response.json()["access_token"]
return self._token
async def _get(self, path: str, params: dict = None) -> dict:
token = await self._get_token()
response = await self._client.get(
f"{self.base_url}{path}",
headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
params=params
)
response.raise_for_status()
return response.json()
async def get_energy_consumption(self, year: int, entity_id: str) -> dict:
"""
Estrae consumo energetico da SAP SFM per anno e entità'.
OData endpoint: /EnergyConsumption
"""
data = await self._get(
"/EnergyConsumption",
params={
"$filter": f"Year eq {year} and EntityId eq '{entity_id}'",
"$select": "Year,EntityId,EnergyType,Quantity,Unit,Source"
}
)
result = {"total_mwh": 0, "renewable_mwh": 0, "non_renewable_mwh": 0, "by_source": []}
for record in data.get("value", []):
qty_mwh = self._convert_to_mwh(record["Quantity"], record["Unit"])
result["total_mwh"] += qty_mwh
result["by_source"].append({
"source": record["EnergyType"],
"mwh": qty_mwh,
"is_renewable": record["EnergyType"] in [
"SOLAR", "WIND", "HYDRO", "BIOMASS", "GEOTHERMAL"
]
})
if record["EnergyType"] in ["SOLAR", "WIND", "HYDRO", "BIOMASS", "GEOTHERMAL"]:
result["renewable_mwh"] += qty_mwh
else:
result["non_renewable_mwh"] += qty_mwh
return result
async def get_ghg_emissions(self, year: int, entity_id: str) -> dict:
"""Estrae emissioni GHG da SAP SFM, suddivise per scope."""
data = await self._get(
"/GHGEmission",
params={
"$filter": f"Year eq {year} and EntityId eq '{entity_id}'",
"$select": "Year,EntityId,GHGScope,Category,EmissionTCO2e"
}
)
result = {"scope1": 0, "scope2_lb": 0, "scope2_mb": 0, "scope3_by_category": {}}
for r in data.get("value", []):
scope = r["GHGScope"]
tco2e = float(r["EmissionTCO2e"])
if scope == "1":
result["scope1"] += tco2e
elif scope == "2" and r.get("Category") == "LB":
result["scope2_lb"] += tco2e
elif scope == "2" and r.get("Category") == "MB":
result["scope2_mb"] += tco2e
elif scope == "3":
cat = r.get("Category", "UNKNOWN")
result["scope3_by_category"][cat] = (
result["scope3_by_category"].get(cat, 0) + tco2e
)
result["scope3_total"] = sum(result["scope3_by_category"].values())
return result
async def get_workforce_metrics(self, year: int, entity_id: str) -> dict:
"""Estrae metriche workforce per ESRS S1 da SAP SuccessFactors."""
# SAP SuccessFactors OData API
data = await self._get(
"/WorkforceMetrics",
params={
"$filter": f"Year eq {year} and EntityId eq '{entity_id}'",
"$select": "Metric,Value,Gender,EmployeeType"
}
)
result = {
"headcount_total": 0,
"headcount_female": 0,
"headcount_male": 0,
"fte_total": 0,
"turnover_rate": 0,
"gender_pay_gap_pct": None
}
for r in data.get("value", []):
metric = r.get("Metric")
value = float(r.get("Value", 0))
if metric == "HEADCOUNT":
result["headcount_total"] += value
if r.get("Gender") == "F":
result["headcount_female"] += value
elif r.get("Gender") == "M":
result["headcount_male"] += value
elif metric == "FTE":
result["fte_total"] += value
elif metric == "TURNOVER_RATE":
result["turnover_rate"] = value
elif metric == "GENDER_PAY_GAP":
result["gender_pay_gap_pct"] = value
return result
def _convert_to_mwh(self, quantity: float, unit: str) -> float:
conversions = {"GJ": 0.2778, "MWh": 1.0, "kWh": 0.001, "TJ": 277.78}
return quantity * conversions.get(unit, 1.0)
class OracleESGConnector(ERPConnector):
"""
Connettore per Oracle Fusion Sustainability (Oracle ESG).
Usa Oracle REST API + BICC per bulk extraction.
"""
def __init__(self, base_url: str, username: str, password: str):
self.base_url = base_url.rstrip("/")
self.auth = (username, password)
self._client = httpx.AsyncClient(timeout=60)
async def _get(self, path: str, params: dict = None) -> dict:
response = await self._client.get(
f"{self.base_url}{path}",
auth=self.auth,
headers={"Accept": "application/json"},
params=params
)
response.raise_for_status()
return response.json()
async def get_energy_consumption(self, year: int, entity_id: str) -> dict:
data = await self._get(
"/fscmRestApi/resources/11.13.18.05/sustainabilityMetrics",
params={
"q": f"metricCategory=ENERGY;fiscalYear={year};legalEntityId={entity_id}",
"fields": "metricName,metricValue,uom,dataSource,lastUpdateDate"
}
)
total_mwh = 0
renewable_mwh = 0
RENEWABLE_METRICS = {"SOLAR_ENERGY", "WIND_ENERGY", "HYDRO_ENERGY", "PPA_RENEWABLE"}
for item in data.get("items", []):
mwh = self._to_mwh(item["metricValue"], item["uom"])
total_mwh += mwh
if item["metricName"] in RENEWABLE_METRICS:
renewable_mwh += mwh
return {
"total_mwh": total_mwh,
"renewable_mwh": renewable_mwh,
"non_renewable_mwh": total_mwh - renewable_mwh
}
async def get_ghg_emissions(self, year: int, entity_id: str) -> dict:
data = await self._get(
"/fscmRestApi/resources/11.13.18.05/sustainabilityMetrics",
params={
"q": f"metricCategory=GHG_EMISSIONS;fiscalYear={year};legalEntityId={entity_id}",
"fields": "metricName,ghgScope,metricValue,uom"
}
)
result = {"scope1": 0, "scope2_lb": 0, "scope2_mb": 0, "scope3_total": 0}
for item in data.get("items", []):
scope = item.get("ghgScope", "")
val = float(item.get("metricValue", 0))
if scope == "SCOPE_1":
result["scope1"] += val
elif scope == "SCOPE_2_LB":
result["scope2_lb"] += val
elif scope == "SCOPE_2_MB":
result["scope2_mb"] += val
elif scope.startswith("SCOPE_3"):
result["scope3_total"] += val
return result
async def get_workforce_metrics(self, year: int, entity_id: str) -> dict:
data = await self._get(
"/fscmRestApi/resources/11.13.18.05/sustainabilityMetrics",
params={
"q": f"metricCategory=WORKFORCE;fiscalYear={year};legalEntityId={entity_id}"
}
)
result = {"headcount_total": 0, "fte_total": 0, "gender_pay_gap_pct": None}
for item in data.get("items", []):
name = item.get("metricName", "")
val = float(item.get("metricValue", 0))
if name == "TOTAL_HEADCOUNT":
result["headcount_total"] = val
elif name == "TOTAL_FTE":
result["fte_total"] = val
elif name == "GENDER_PAY_GAP":
result["gender_pay_gap_pct"] = val
return result
def _to_mwh(self, value: float, uom: str) -> float:
factors = {"MWH": 1.0, "KWH": 0.001, "GJ": 0.2778, "MMBTU": 0.29307}
return value * factors.get(uom.upper(), 1.0)
class ERPDataAggregator:
"""
Aggrega dati da multipli connettori ERP e mappa su ESRS data points.
"""
def __init__(self, connectors: list[ERPConnector]):
self.connectors = connectors
async def collect_and_map_to_esrs(
self,
year: int,
entity_id: str,
period_id: str
) -> list[dict]:
"""
Raccoglie dati da tutti i connettori e produce submissions ESRS-ready.
"""
results = []
# Raccolta parallela da tutti i connettori
energy_tasks = [c.get_energy_consumption(year, entity_id) for c in self.connectors]
ghg_tasks = [c.get_ghg_emissions(year, entity_id) for c in self.connectors]
wf_tasks = [c.get_workforce_metrics(year, entity_id) for c in self.connectors]
energy_results = await asyncio.gather(*energy_tasks, return_exceptions=True)
ghg_results = await asyncio.gather(*ghg_tasks, return_exceptions=True)
wf_results = await asyncio.gather(*wf_tasks, return_exceptions=True)
# Aggregazione (media pesata o somma in base al tipo)
total_energy = sum(
r["total_mwh"] for r in energy_results
if isinstance(r, dict)
)
total_renewable = sum(
r["renewable_mwh"] for r in energy_results
if isinstance(r, dict)
)
total_scope1 = sum(
r.get("scope1", 0) for r in ghg_results
if isinstance(r, dict)
)
# Map su ESRS data points
if total_energy > 0:
results.append({
"data_point_id": "E1-5_TotalEnergy",
"value_numeric": total_energy,
"unit_of_measure": "MWh",
"source_system": "ERP_AGGREGATED",
"confidence_level": "HIGH"
})
if total_renewable > 0:
results.append({
"data_point_id": "E1-5_RenewableEnergy",
"value_numeric": total_renewable,
"unit_of_measure": "MWh",
"source_system": "ERP_AGGREGATED",
"confidence_level": "HIGH"
})
if total_scope1 > 0:
results.append({
"data_point_id": "E1-6_GrossScope1",
"value_numeric": total_scope1,
"unit_of_measure": "tCO2eq",
"source_system": "ERP_AGGREGATED",
"confidence_level": "HIGH"
})
return results
外部保証のためのデータリネージと監査証跡
CSRD が要求するのは、 限定的な保証 (そして合理的な保証という観点から) 外部監査人によるもの。これは、レポート内のすべてのデータには次の情報が含まれている必要があることを意味します。 完全なトレーサビリティ: どこから来たのか、誰が挿入したのか、誰が承認したのか、 そしてそれがどのように計算されたか。データリネージトラッカーを実装してみましょう。
# data_lineage.py
# Data lineage tracker per assurance CSRD
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import hashlib
import json
@dataclass
class LineageNode:
"""Nodo nel grafo di data lineage."""
node_id: str
node_type: str # SOURCE, TRANSFORM, SUBMISSION, APPROVAL
description: str
actor: Optional[str] = None
timestamp: Optional[datetime] = None
metadata: dict = field(default_factory=dict)
hash: Optional[str] = None
def compute_hash(self) -> str:
"""Hash del contenuto per rilevare manomissioni."""
content = json.dumps({
"node_id": self.node_id,
"node_type": self.node_type,
"description": self.description,
"actor": self.actor,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"metadata": self.metadata
}, sort_keys=True)
self.hash = hashlib.sha256(content.encode()).hexdigest()
return self.hash
@dataclass
class LineageEdge:
"""Arco tra nodi nel grafo di lineage."""
from_node: str
to_node: str
relationship: str # DERIVED_FROM, APPROVED_BY, TRANSFORMED_BY
metadata: dict = field(default_factory=dict)
class DataLineageTracker:
"""
Traccia il percorso completo di ogni dato ESG dalla sorgente al report.
Struttura: DAG (Directed Acyclic Graph) di LineageNode.
"""
def __init__(self, db_pool):
self.db_pool = db_pool
async def record_extraction(
self,
data_point_id: str,
source_system: str,
source_query: str,
raw_value,
actor_id: str
) -> str:
"""Registra estrazione da sistema sorgente."""
node = LineageNode(
node_id=f"extract_{data_point_id}_{datetime.now().timestamp():.0f}",
node_type="SOURCE",
description=f"Extracted from {source_system}",
actor=actor_id,
timestamp=datetime.now(),
metadata={
"source_system": source_system,
"source_query": source_query,
"raw_value": str(raw_value),
"data_point_id": data_point_id
}
)
node.compute_hash()
await self._save_node(node)
return node.node_id
async def record_transformation(
self,
from_node_id: str,
data_point_id: str,
transformation_type: str,
input_value,
output_value,
formula: Optional[str],
actor_id: str
) -> str:
"""Registra una trasformazione (unit conversion, aggregation, etc.)."""
node = LineageNode(
node_id=f"transform_{data_point_id}_{datetime.now().timestamp():.0f}",
node_type="TRANSFORM",
description=f"{transformation_type} applied",
actor=actor_id,
timestamp=datetime.now(),
metadata={
"transformation_type": transformation_type,
"input_value": str(input_value),
"output_value": str(output_value),
"formula": formula,
"data_point_id": data_point_id
}
)
node.compute_hash()
await self._save_node(node)
edge = LineageEdge(
from_node=from_node_id,
to_node=node.node_id,
relationship="DERIVED_FROM"
)
await self._save_edge(edge)
return node.node_id
async def record_approval(
self,
submission_node_id: str,
data_point_id: str,
approver_id: str,
approval_comment: Optional[str]
) -> str:
"""Registra approvazione da parte del reviewer."""
node = LineageNode(
node_id=f"approval_{data_point_id}_{datetime.now().timestamp():.0f}",
node_type="APPROVAL",
description="Data point approved for reporting",
actor=approver_id,
timestamp=datetime.now(),
metadata={
"data_point_id": data_point_id,
"approval_comment": approval_comment,
"approver_id": approver_id
}
)
node.compute_hash()
await self._save_node(node)
edge = LineageEdge(
from_node=submission_node_id,
to_node=node.node_id,
relationship="APPROVED_BY"
)
await self._save_edge(edge)
return node.node_id
async def get_lineage_for_data_point(
self,
data_point_id: str,
period_id: str
) -> dict:
"""
Restituisce il grafo completo di lineage per un data point.
Usato dall'auditor per tracciare l'origine dei dati.
"""
async with self.db_pool.acquire() as conn:
nodes = await conn.fetch("""
SELECT * FROM lineage_nodes
WHERE metadata->>'data_point_id' = $1
ORDER BY timestamp ASC
""", data_point_id)
edges = await conn.fetch("""
SELECT le.* FROM lineage_edges le
JOIN lineage_nodes ln ON ln.node_id = le.from_node
WHERE ln.metadata->>'data_point_id' = $1
""", data_point_id)
return {
"data_point_id": data_point_id,
"period_id": period_id,
"nodes": [dict(n) for n in nodes],
"edges": [dict(e) for e in edges],
"integrity_check": await self._verify_chain_integrity(nodes)
}
async def _verify_chain_integrity(self, nodes) -> dict:
"""Verifica che gli hash non siano stati manomessi."""
for node in nodes:
expected_hash = node.compute_hash() if hasattr(node, 'compute_hash') else None
if expected_hash and node.get('hash') != expected_hash:
return {"valid": False, "tampered_node": node['node_id']}
return {"valid": True, "nodes_verified": len(nodes)}
async def _save_node(self, node: LineageNode):
async with self.db_pool.acquire() as conn:
await conn.execute("""
INSERT INTO lineage_nodes
(node_id, node_type, description, actor, timestamp, metadata, hash)
VALUES ($1, $2, $3, $4, $5, $6, $7)
""",
node.node_id, node.node_type, node.description,
node.actor, node.timestamp, json.dumps(node.metadata), node.hash)
async def _save_edge(self, edge: LineageEdge):
async with self.db_pool.acquire() as conn:
await conn.execute("""
INSERT INTO lineage_edges (from_node, to_node, relationship, metadata)
VALUES ($1, $2, $3, $4)
""",
edge.from_node, edge.to_node, edge.relationship,
json.dumps(edge.metadata))
ケーススタディ: 従業員数 250 人の製造会社
実際の実装を分析してみましょう メタルテック S.r.l.、製造会社 従業員数 250 名、2 つの生産工場 (トリノとバーリ)、CSRD ウェーブ 2 の対象 (2025年度、2026年に第一次報告)。彼らがプロジェクトをどのように構成したかは次のとおりです。
フェーズ 1: ギャップ分析と重要性の評価 (1 ~ 2 か月目)
MetalTechはプロセスを通じて製造部門の重要なテーマを特定しました 二重の重要性:
- ESRS E1 (気候): DOUBLE のマテリアリティ - 大量のエネルギー消費 (溶解炉、生産ライン)および異常事態による物理的リスク
- ESRS E2 (汚染): 衝撃物質 - PM2.5 微粒子排出量 塗料からのVOC化合物
- ESRS E5 (循環経済): 材料 - 金属廃棄物 (材料の 35%) 投入)、包装、産業廃水
- ESRS S1 (従業員): 資料 - 従業員 250 人、事故率が高い セクター、男女間の賃金格差について
- ESRS G1 (ビジネス行為): ベースライン規制要件の資料
フェーズ 2: 技術インフラストラクチャ (2 ~ 4 か月目)
MetalTech が採用した技術スタック:
| 成分 | テクノロジー | 使用法 |
|---|---|---|
| APIバックエンド | FastAPI + 非同期ページ | ESGデータの収集と検証 |
| データベース | PostgreSQL 16 + pgvector | ストレージ ESRS データ ポイント + 類似性検索 |
| ERPの統合 | SAP SFM OData API | SAP のエネルギー、排出量、労働力 |
| ワークフロー | カスタム FSM + Redis | マルチレベルの承認ワークフロー |
| XBRLの生成 | Python lxml + ESRS 分類法 | OAM/CONSOB 用の iXBRL 出力 |
| データリネージュ | PostgreSQL 上のカスタム DAG | 保証のための監査証跡 |
| 通知 | FastAPI バックグラウンドタスク + SMTP | 期限警告、レビューリクエスト |
| フロントエンド | Angular 17 + マテリアル | 収集および監視ダッシュボード |
フェーズ 3: 2025 年度のデータ収集 (4 か月目から 14 か月目)
# Esempio raccolta dati MetalTech FY2025
# Script orchestrazione completa per stabilimento Torino
import asyncio
from erp_connectors import SAPSustainabilityConnector, ERPDataAggregator
from workflow_engine import CSRDWorkflowEngine, WorkflowState
async def run_metaltech_fy2025_collection():
# Configurazione connettore SAP
sap = SAPSustainabilityConnector(
base_url="https://metaltech.hana.ondemand.com",
client_id="METALTECH_CSRD_CLIENT",
client_secret="***", # Da env vars
token_url="https://metaltech.authentication.eu12.hana.ondemand.com/oauth/token"
)
aggregator = ERPDataAggregator([sap])
# Raccolta dati stabilimento Torino (entity TO001)
print("[1/3] Raccolta dati da SAP per stabilimento Torino...")
submissions_to = await aggregator.collect_and_map_to_esrs(
year=2025,
entity_id="TO001",
period_id="period_fy2025"
)
# Raccolta dati stabilimento Bari (entity BA001)
print("[2/3] Raccolta dati da SAP per stabilimento Bari...")
submissions_ba = await aggregator.collect_and_map_to_esrs(
year=2025,
entity_id="BA001",
period_id="period_fy2025"
)
# Consolidamento gruppo: somma Scope 1+2, media intensità'
print("[3/3] Consolidamento dati di gruppo...")
consolidated = consolidate_group_data([submissions_to, submissions_ba])
# Output dati FY2025 MetalTech consolidati
for sub in consolidated:
print(f" {sub['data_point_id']}: {sub['value_numeric']:.2f} {sub['unit_of_measure']}")
return consolidated
def consolidate_group_data(entity_submissions: list) -> list:
"""Consolida submissions di multiple entità per il gruppo."""
by_data_point = {}
for entity_subs in entity_submissions:
for sub in entity_subs:
dp_id = sub['data_point_id']
if dp_id not in by_data_point:
by_data_point[dp_id] = {**sub, "value_numeric": 0}
# Per emissioni e energia: somma (non media)
by_data_point[dp_id]["value_numeric"] += sub['value_numeric']
by_data_point[dp_id]["source_system"] = "CONSOLIDATED_GROUP"
return list(by_data_point.values())
# Risultati tipici per azienda manifatturiera 250 dip.
METALTECH_FY2025_RESULTS = {
"E1-6_GrossScope1": {
"value": 2_847, # tCO2eq - principalmente gas naturale forno fusione
"confidence": "HIGH",
"source": "SAP SFM - Gas meter readings"
},
"E1-6_GrossScope2LB": {
"value": 1_523, # tCO2eq - elettricita' rete
"confidence": "HIGH",
"source": "SAP SFM - Electricity bills"
},
"E1-6_GrossScope2MB": {
"value": 980, # tCO2eq - market-based con PPA solare
"confidence": "HIGH",
"source": "SAP SFM + GO certificates"
},
"E1-5_TotalEnergy": {
"value": 18_450, # MWh totali
"confidence": "HIGH",
"source": "SAP SFM - Meters + bills"
},
"E1-5_RenewableEnergy": {
"value": 6_200, # MWh (33.6% via PPA + impianto solare 500kWp)
"confidence": "MEDIUM", # Stima PPA
"source": "SAP SFM + PPA contract"
},
"GHG_Intensity": {
"value": 42.3, # tCO2eq / M EUR fatturato
"confidence": "HIGH",
"source": "Calculated: (S1+S2MB) / revenue"
}
}
結果と教訓
メタルテック2025年度:CSRDプロジェクトの成果
- データ収集時間: 6 週間 (手動) から 4 日 (自動)
- 必須の ESRS 適用範囲: 必須データポイントの 94% が収集および承認されました
- データ品質: 提出物の 78% の信頼度は高い (前年は 45%)
- 監査時間: 自動データリネージュにより 60% 削減
- 外部保証コスト: -25% の場合、サンプリング作業が軽減されます
- 主な発見: スコープ3の猫。 1 (上流材料) は 40% 過小評価されました 以前の手動計算との比較
主な困難とその克服方法
避けるべきアンチパターン
- すべてを最初から構築しないでください。 市場はSaaS CSRDソリューションを提供しています (Workiva、Watershed、Persefoni) 中小企業向け。構築と購入の評価: 社内構築 これは、データの複雑さが高い場合や、独自の統合ニーズがある場合にのみ意味を持ちます。
- 重要性の評価を遅らせないでください。 それはすべての前提条件です。 それがなければ、どのようなデータを収集すればよいのかわかりません。簡単な方法でもすぐに始めましょう。
- スコープ 3 を無視しないでください。 多くの製造会社にとって、スコープ 3 は該当します。 1 (原材料)と猫。 11(製品使用)が総排出量の70%を超えます。 スコープ 3 の過少報告は、最も一般的なグリーンウォッシングです。
- XBRL を最終的な考えとして扱わないでください。 iXBRL タグ付けには次のものが必要です ESRS 分類法に精通している。少なくとも 2 ~ 3 か月の開発を計画します。
- 統合範囲を忘れないでください。 CSRD が要求するのは、 すべての子会社を含めて 50% 以上。 「簡素化」のためにエンティティを除外する それはコンプライアンスのリスクです。
高度な ESG クエリ用の GraphQL API
ESG 分析ダッシュボードとインタラクティブなレポート ツールについては、GraphQL API が提供するもの 従来の REST よりも優れた柔軟性があり、集計を使用した特定のクエリが可能 オンザフライ:
# graphql_schema.py
# Schema GraphQL per ESG Analytics API con Strawberry
import strawberry
from strawberry.types import Info
from typing import Optional, List
from datetime import date
from decimal import Decimal
@strawberry.type
class GHGEmissionsSummary:
year: int
scope1_tco2eq: Decimal
scope2_lb_tco2eq: Decimal
scope2_mb_tco2eq: Decimal
scope3_tco2eq: Decimal
total_tco2eq: Decimal
ghg_intensity: Optional[Decimal]
yoy_change_pct: Optional[Decimal]
@strawberry.type
class EnergySummary:
year: int
total_mwh: Decimal
renewable_mwh: Decimal
non_renewable_mwh: Decimal
renewable_share_pct: Decimal
@strawberry.type
class ESGReportingStatus:
period_id: str
year: int
workflow_state: str
mandatory_coverage_pct: Decimal
approved_data_points: int
pending_review: int
missing_mandatory: int
@strawberry.type
class Query:
@strawberry.field
async def ghg_emissions(
self,
info: Info,
company_id: str,
years: List[int]
) -> List[GHGEmissionsSummary]:
"""Emissioni GHG per anni richiesti con calcolo YoY automatico."""
db = info.context["db_pool"]
async with db.acquire() as conn:
rows = await conn.fetch("""
SELECT
rp.year,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope1%'
THEN eds.value_numeric ELSE 0 END), 0) as scope1,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope2%LB%'
THEN eds.value_numeric ELSE 0 END), 0) as scope2_lb,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope2%MB%'
THEN eds.value_numeric ELSE 0 END), 0) as scope2_mb,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope3%'
THEN eds.value_numeric ELSE 0 END), 0) as scope3
FROM esg_data_submissions eds
JOIN reporting_periods rp ON rp.id = eds.reporting_period_id
JOIN esrs_data_points dp ON dp.id = eds.data_point_id
WHERE rp.company_id = $1
AND rp.year = ANY($2)
AND eds.status = 'APPROVED'
AND dp.esrs_standard = 'E1'
GROUP BY rp.year
ORDER BY rp.year
""", company_id, years)
results = []
prev_total = None
for row in rows:
total = row['scope1'] + row['scope2_mb'] + row['scope3']
yoy = None
if prev_total and prev_total > 0:
yoy = ((total - prev_total) / prev_total) * 100
results.append(GHGEmissionsSummary(
year=row['year'],
scope1_tco2eq=Decimal(str(row['scope1'])),
scope2_lb_tco2eq=Decimal(str(row['scope2_lb'])),
scope2_mb_tco2eq=Decimal(str(row['scope2_mb'])),
scope3_tco2eq=Decimal(str(row['scope3'])),
total_tco2eq=Decimal(str(total)),
ghg_intensity=None,
yoy_change_pct=Decimal(str(yoy)) if yoy else None
))
prev_total = total
return results
@strawberry.field
async def reporting_status(
self,
info: Info,
company_id: str,
year: int
) -> Optional[ESGReportingStatus]:
"""Stato del processo di reporting per un anno."""
db = info.context["db_pool"]
async with db.acquire() as conn:
period = await conn.fetchrow("""
SELECT rp.id, rp.year, rp.status,
COUNT(eds.id) FILTER (WHERE eds.status = 'APPROVED') as approved,
COUNT(eds.id) FILTER (WHERE eds.status IN ('SUBMITTED','REVIEWING')) as pending,
(SELECT COUNT(*) FROM esrs_data_points WHERE is_mandatory = TRUE) as mandatory_total
FROM reporting_periods rp
LEFT JOIN esg_data_submissions eds ON eds.reporting_period_id = rp.id
WHERE rp.company_id = $1 AND rp.year = $2
GROUP BY rp.id, rp.year, rp.status
""", company_id, year)
if not period:
return None
mandatory_total = period['mandatory_total'] or 1
approved = period['approved'] or 0
coverage = (approved / mandatory_total) * 100
return ESGReportingStatus(
period_id=str(period['id']),
year=period['year'],
workflow_state=period['status'],
mandatory_coverage_pct=Decimal(str(round(coverage, 1))),
approved_data_points=approved,
pending_review=period['pending'] or 0,
missing_mandatory=max(0, mandatory_total - approved)
)
schema = strawberry.Schema(query=Query)
# Query di esempio per il frontend
EXAMPLE_QUERY = """
query ESGDashboard($companyId: String!, $years: [Int!]!) {
ghgEmissions(companyId: $companyId, years: $years) {
year
scope1Tco2eq
scope2MbTco2eq
scope3Tco2eq
totalTco2eq
yoyChangePct
}
reportingStatus(companyId: $companyId, year: 2025) {
workflowState
mandatoryCoveragePct
approvedDataPoints
missingMandatory
}
}
"""
AI法に関する考察とCSRDとの交差
2025 年から 2026 年にかけて新たな側面が現れるのは、CSRD と EU の AI 法との交差点です。システム AI は排出量の計算、気候予測、ESG レポートの自動化に使用されます AI 法の高リスク AI カテゴリーに該当する可能性があり、以下が必要です。
- 技術文書 排出量推定に使用される AI システム (ESRS方法論の透明性)
- 人間の監視 自動化された重要性の決定について
- 監査証跡 AI 予測の割合 (当社のデータ系統ですでにカバーされています)
- 差別の禁止 ESRS S1 メトリクスに使用される AI HR システムで
AI法×CSRDタイムライン
- 2025 年 2 月: AI法施行 - 禁止行為を禁止
- 2025 年 8 月: GPAI (汎用 AI) の義務 - AI 基盤モデルの透明性
- 2026 年 8 月: 高リスク AI の包括的な義務 - 人事および与信判断のための AI システムを含む
- 2027 年 8 月: あらゆるユースケースに完全な AI 法の適用
ESG/CSRD システムを構築するチームはすでにコンプライアンスを計画する必要があります スタックの ML コンポーネントに対する AI Act: スコープ 3 排出量推定モデル、 ESG データの異常検出、サプライヤー評価のための NLP。
結論と次のステップ
CSRD に準拠したシステムの構築は、単なるコンプライアンスの実践ではありません。 持続可能性データ アーキテクチャを作成する機会 長年にわたる価値。私たちが見たコンポーネント — ESRS データ モデル、コレクション API 検証、FSM ワークフロー、ERP 統合、XBRL ジェネレーター、データ リネージ - これらはすべてです 再利用可能でモジュール式。
重要なポイント:
- 重要性の評価から始めます。 二重の物質性がなければ何が何だか分からない 集める。これは、実際の利害関係者の関与などが必要なプロセスです。 アルゴリズム。
- ERP 収集を自動化します。 ESRS E1 および S1 データの 60 ~ 70% が存在 すでにシステムに組み込まれています。 SAP および Oracle 用の OData コネクタは投資です 数週間で回復します。
- 今すぐデータリネージに投資してください: 外部保険の費用 データのトレーサビリティに直接依存します。優れた監査証跡システム 監査コストを 20 ~ 30% 削減します。
- オムニバス パッケージを無視しないでください。 2025年に提案されている簡素化 中小企業に対する義務の一部が緩和される可能性がある。立法プロセスを監視するが監視しない 最終的な承認の前に計画を立ててください。
シリーズの次の記事では — AI の二酸化炭素排出量 — AI モデルの環境への影響をトレーニングから測定し、軽減する方法を探ります。 CodeCarbon などのツールを使用して、大規模言語モデルを運用環境で推論する ML CO2 の影響と持続可能な ML エンジニアリングの戦略。
詳細を学ぶためのリソース
- EFRAG ESRS 分類: efrag.org - ESRS XBRL 分類法
- CSRD公式テキスト: EUR-Lex - 指令 2022/2464/EU
- GHG プロトコル企業標準: ghgプロトコル.org
- SAP サステナビリティ フットプリント管理: sap.com/持続可能性
- Python XBRL ライブラリ: arelle (オープンソース)、python-xbrl
- CSRD SaaS ツール: ワーキバ、ウォーターシェッド、ペルセフォニ、スイープ、グリーンリー
グリーン ソフトウェア エンジニアリング シリーズ - ナビゲーション
前の記事: GreenOps: 持続可能な DevOps — CI/CD グリーン、 DevOps チーム向けのクラウドの適正サイジング、FinOps、持続可能性の指標。
次の記事: AI の二酸化炭素排出量 — 測定して削減する 実用的なツールとベンチマークを使用した、トレーニングと推論における AI モデルの排出 セクターの。
このシリーズは旅の一部です データ&AI事業 federicolo.dev にこのシリーズも含まれています ビジネス向け MLOps (ID 306-315) 塊をより深く掘り下げたい人向け 計算効率を重視したAIモデルの制作と、 ガバナンス。







