排出パイプライン スコープ 3 バリュー チェーン: 生データから監査証跡まで
Il 総排出量の 70 ~ 90% テクノロジー企業やソフトウェア企業が自社の外に隠れている 境界: サービスとして購入したクラウド サーバー、従業員のラップトップ、ビジネス フライト、コード内 顧客が自分のデバイス上で実行できるもの。これらは排出量です スコープ3、そしてほとんどの場合 デジタル組織の一部は、世界に存在する最も複雑な測定問題を代表しています。 ESG領域。
スコープ 1 (燃料を燃やす) やスコープ 2 (電気を買う) とは異なり、 スコープ 3 では、数百のサプライヤーからデータを収集し、異種の排出係数を適用する必要があります。 非常に高度な不確実性を管理し、外部監査人が検証できる監査証跡を作成します。と CSRD/ESRS E1 これにより、以下の大企業に対してスコープ 3 報告が義務付けられます。 2025 年から 2026 年、そして 2028 年までに中小企業が焦点となると、この問題はもはや学術的なものではなくなります。 エンジニアリング。
この記事では、それを構築します 完全なパイプライン スコープ3排出量の計算用 バリュー チェーン: サプライヤー データ収集のための ETL アーキテクチャから、CDP などのプラットフォームとの統合まで EcoVadis は、アクティビティ ベースと支出ベースの計算から、自動化および自動化のための Airflow DAG に至るまで、 検証者のための不変の監査証跡に。各セクションには、実際に動作する Python コードとベスト プラクティスが含まれています。 実際の状況で動作がテストされています。
何を学ぶか
- 15 の GHG プロトコル スコープ 3 カテゴリとソフトウェア/SaaS 企業に関連するカテゴリ
- サプライヤーデータ収集のための ETL/ELT アーキテクチャ: アンケート、CDP、EcoVadis、および直接 API
- アクティビティベースと支出ベース: 計算式、精度、いつどちらのアプローチを使用するか
- Apache Airflow を使用した Python パイプライン: 自動化されたスケーラブルなスコープ 3 計算のための DAG
- データ品質スコアリングと推定値における統計的不確実性の伝播
- 外部検証者のトレーサビリティのための SHA-256 ハッシュ チェーンによる不変の監査証跡
- バリュー チェーンとヒートマップの優先順位カテゴリのサンキー ダイアグラムの視覚化
- CSRD/ESRS E1 要件: 何をどのような粒度で開示する必要があるか
- 完全なケーススタディ: 50 社のサプライヤーを持つ SaaS 企業、エンドツーエンドのスコープ 3 の計算
- EcoVadis カーボン データ ネットワークおよび排出係数用の Climatiq API との統合
グリーン ソフトウェア シリーズ — 10 件の記事
| # | アイテム | 主題 |
|---|---|---|
| 1 | グリーン ソフトウェア財団の原則 | 炭素効率、GSF、SCI |
| 2 | CodeCarbon: コードの測定 | 測定、ダッシュボード、最適化 |
| 3 | Climatiq API: 炭素計算 | REST API、GHG プロトコル、スコープ 1 ~ 3 |
| 4 | カーボンアウェア SDK | タイムシフト、ロケーションシフト |
| 5 | スコープ 1-2-3: ESG データモデリング | データ構造、計算、集計 |
| 6 | GreenOps: カーボンを意識した Kubernetes | スケジューリング、スケーリング、モニタリング |
| 7 | 排出パイプライン スコープ 3 バリューチェーン | この記事 |
| 8 | ESGレポートAPI: CSRD | API、ワークフロー、コンプライアンス |
| 9 | 持続可能な建築パターン | ストレージ、キャッシュ、バッチ |
| 10 | AI とカーボン: ML トレーニング | ML トレーニング、最適化、グリーン AI |
15 の GHG プロトコル スコープ 3 カテゴリ
Il GHGプロトコル企業バリューチェーン(スコープ3)基準 それは国際的な枠組みです このリファレンスは 2011 年に発行され、現在改訂中であり、2026 年に更新される予定です。 バリューチェーンの間接排出 15 の異なるカテゴリ、2つにまとめられています マクログループ: 上流 (制作・サービス提供前の活動) e 下流 (顧客への販売後の活動)。
15 スコープ 3 カテゴリ: 上流と下流
| 猫。 | 名前 | 流れ | SaaS/技術の関連性 |
|---|---|---|---|
| 1 | 購入した商品とサービス | 上流 | 高: サーバー ハードウェア、ソフトウェア ライセンス、コンサルティング サービス |
| 2 | 資本財 | 上流 | メディア: データセンター機器、ラップトップ、会社の電話 |
| 3 | 燃料・エネルギー関連活動 | 上流 | 平均: 購入したエネルギー生産からの排出量 (上流のスコープ 2) |
| 4 | 上流の輸送と流通 | 上流 | 低い: オフィスおよびデータセンターへのハードウェアの出荷 |
| 5 | 事業活動で発生する廃棄物 | 上流 | 低: WEEE、紙、オフィス廃棄物 |
| 6 | 出張 | 上流 | 高: 分散チームの航空券、ホテル、電車 |
| 7 | 従業員の通勤 | 上流 | 高: ホームオフィスへの出張、特にハイブリッドチーム向け |
| 8 | 上流のリース資産 | 上流 | メディア: レンタルオフィス (スコープ 1/2 に含まれていない場合) |
| 9 | 下流の輸送と流通 | 下流 | 低: 物理メディアでのソフトウェア配布 (まれ) |
| 10 | 販売した商品の加工 | 下流 | N/A: 純粋なソフトウェアには適用されません |
| 11 | 販売した商品の使用 | 下流 | 非常に高い: SaaS を使用する顧客が消費するエネルギー |
| 12 | 販売した製品の廃棄処理 | 下流 | 低: ユーザーのデバイスが寿命を迎えている |
| 13 | 下流のリース資産 | 下流 | メディア: 顧客にリースされたハードウェア |
| 14 | フランチャイズ | 下流 | N/A: 該当なし |
| 15 | 投資 | 下流 | 高: 企業ポートフォリオ、スタートアップへの株式投資 |
SaaS またはソフトウェア開発会社の場合、通常、最も関連性の高いカテゴリは次のとおりです。 猫。 1 (購入した商品とサービス、多くの場合最大の項目)、 猫。 6 (出張)、 猫。 7 (従業員の通勤) 猫。 11 (販売済み製品の使用)。そこには 二重の物質性 リクエスト CSRD からの影響の観点から、どのカテゴリーが重要であるかを特定する必要がある 企業にとっての環境リスクと財務リスク。
よくある間違い: Cat を省略します。 SaaS の場合は 11
多くのソフトウェア会社は、カテゴリー 11 (「販売された製品の使用」) が適用されないことを前提として除外しています。 実際には、すべての API 呼び出し、すべてのクエリ、顧客が実行するために消費するすべてのワット ソフトウェアはScope 3 Cat. 11 の排出はあなたの責任です。数百万の SaaS の場合 ユーザーの中で、これが支配的なカテゴリーである可能性があります。計算方法としては、 炭素強度 (SCI) ソフトウェア 供給された機能単位を乗算します。
データ収集パイプラインのアーキテクチャ
バリューチェーン全体から信頼できるデータを収集することは、あらゆるビジネスにとって最大のボトルネックです スコープ3プロジェクト。パイプラインは、手動のアンケート、サードパーティの ESG プラットフォームなど、異種ソースを管理する必要があります。 部品、サプライヤーとの直接 API、電子メールで送信された CSV ファイル、内部 ERP データ。次のアーキテクチャ パターンを採用する 3層ETL (ブロンズ/シルバー/ゴールド) レイクハウスをイメージしたデザイン。
パイプライン アーキテクチャ スコープ 3: ブロンズ / シルバー / ゴールド
| レイヤー | コンテンツ | テクノロジー | 範囲 |
|---|---|---|---|
| ブロンズ (未加工) | サプライヤーからの不変の生データ | S3/GCS、デルタ湖 | 監査証跡、リプレイ、信頼できる情報源 |
| シルバー(標準化) | 単位と通貨で正規化されたデータ | dbt、スパーク、パンダ | 排出量計算、排出係数と結合 |
| ゴールド (レポート) | GHGカテゴリー別の総排出量 | PostgreSQL、BigQuery | ダッシュボード、CSRD レポート、検証者 |
ブロンズ層は必須です: 受信したデータはすべて保存されます そのまま タイムスタンプ付き 取り込みの、コンテンツとソース メタデータの SHA-256 ハッシュ。これは可能性を保証します 排出係数や方法論が変更された場合、損失を与えることなくパイプライン全体を再処理する 元のデータ。
# models/scope3_pipeline.py
# Struttura dati per la pipeline Scope 3
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import hashlib
import json
class DataSource(Enum):
SUPPLIER_QUESTIONNAIRE = "supplier_questionnaire"
CDP_API = "cdp_api"
ECOVADIS_API = "ecovadis_api"
ERP_EXPORT = "erp_export"
MANUAL_UPLOAD = "manual_upload"
CLIMATIQ_API = "climatiq_api"
class CalculationMethod(Enum):
ACTIVITY_BASED = "activity_based"
SPEND_BASED = "spend_based"
HYBRID = "hybrid"
SUPPLIER_SPECIFIC = "supplier_specific"
class DataQualityTier(Enum):
TIER_1 = "primary_data" # Dati primari dal supplier
TIER_2 = "secondary_sector" # Fattori settoriali
TIER_3 = "spend_estimated" # Stima basata su spesa
@dataclass
class RawSupplierData:
"""Layer Bronze: dato grezzo immutabile"""
supplier_id: str
source: DataSource
raw_payload: dict
received_at: datetime
content_hash: str = field(init=False)
def __post_init__(self):
payload_str = json.dumps(self.raw_payload, sort_keys=True)
self.content_hash = hashlib.sha256(
payload_str.encode()
).hexdigest()
@dataclass
class StandardizedActivity:
"""Layer Silver: attività normalizzata"""
activity_id: str
supplier_id: str
scope3_category: int # 1-15
activity_type: str # es. "freight_transport"
quantity: float
unit: str # es. "tonne.km"
reporting_period_start: datetime
reporting_period_end: datetime
source: DataSource
quality_tier: DataQualityTier
emission_factor_id: Optional[str] = None
uncertainty_pct: float = 0.0
raw_data_hash: str = "" # Ref al Bronze layer
@dataclass
class EmissionResult:
"""Layer Gold: emissione calcolata"""
result_id: str
activity_id: str
scope3_category: int
co2e_tonnes: float
calculation_method: CalculationMethod
emission_factor_source: str # es. "climatiq:IPCC_2021"
emission_factor_value: float
quality_tier: DataQualityTier
uncertainty_pct: float
calculated_at: datetime
pipeline_version: str
audit_hash: str = field(init=False)
def __post_init__(self):
audit_data = {
"result_id": self.result_id,
"activity_id": self.activity_id,
"co2e_tonnes": self.co2e_tonnes,
"emission_factor_source": self.emission_factor_source,
"calculated_at": self.calculated_at.isoformat(),
"pipeline_version": self.pipeline_version,
}
self.audit_hash = hashlib.sha256(
json.dumps(audit_data, sort_keys=True).encode()
).hexdigest()
サプライヤーデータの統合: CDP、EcoVadis、および Direct API
サプライヤーからのデータ収集は、さまざまな品質レベルの複数のチャネルを通じて行われます。 非常に異なる自動化。の カーボン・ディスクロージャー・プロジェクト (CDP) ~からデータを収集します 24,000 社を超える企業が利用し、検証済みレポートにアクセスするための API を公開しています。 エコヴァディス 2025 年に 48,000 人以上の GHG レポーターがデータを共有するカーボン データ ネットワークを立ち上げました 標準化された。最後に、多くの大規模ベンダーは、直接共有するための独自の API を公開しています。 その足跡の。
# collectors/supplier_collector.py
# Integrazione con fonti dati supplier
import httpx
import asyncio
from typing import AsyncGenerator
from datetime import datetime
from models.scope3_pipeline import RawSupplierData, DataSource
class ClimatiqEmissionFactors:
"""Client per Climatiq API - emission factors database"""
BASE_URL = "https://beta3.api.climatiq.io"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async def get_emission_factor(
self,
activity_id: str,
year: int = 2024,
region: str = "IT"
) -> dict:
"""Recupera fattore di emissione per attività specifica"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/estimate",
headers=self.headers,
json={
"emission_factor": {
"activity_id": activity_id,
"data_version": "^21",
"year": year,
"region": region
},
"parameters": {
"money": 1.0,
"money_unit": "eur"
}
}
)
response.raise_for_status()
return response.json()
async def batch_estimate(
self,
activities: list[dict]
) -> list[dict]:
"""Stima batch per multiple attività - ottimizza le API call"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/batch",
headers=self.headers,
json={"batch": activities},
timeout=30.0
)
response.raise_for_status()
return response.json().get("results", [])
class EcoVadisCollector:
"""Raccoglie dati Scope 3 dalla piattaforma EcoVadis"""
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
async def fetch_supplier_carbon_data(
self,
supplier_ecovadis_id: str,
reporting_year: int
) -> RawSupplierData:
"""Recupera dati carbonio per un supplier dalla Carbon Data Network"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/v1/suppliers/{supplier_ecovadis_id}/carbon",
headers={"X-API-Key": self.api_key},
params={"year": reporting_year},
timeout=15.0
)
if response.status_code == 404:
# Supplier non ha condiviso dati primari
return self._create_no_data_record(supplier_ecovadis_id)
response.raise_for_status()
payload = response.json()
return RawSupplierData(
supplier_id=supplier_ecovadis_id,
source=DataSource.ECOVADIS_API,
raw_payload=payload,
received_at=datetime.utcnow()
)
def _create_no_data_record(self, supplier_id: str) -> RawSupplierData:
return RawSupplierData(
supplier_id=supplier_id,
source=DataSource.ECOVADIS_API,
raw_payload={"status": "no_data", "supplier_id": supplier_id},
received_at=datetime.utcnow()
)
class CDPCollector:
"""Raccoglie dati da CDP (Carbon Disclosure Project)"""
CDP_API_URL = "https://api.cdp.net/v1"
def __init__(self, api_token: str):
self.api_token = api_token
async def search_supplier(
self,
company_name: str,
year: int = 2024
) -> RawSupplierData | None:
"""Cerca un supplier nel database CDP e recupera dati GHG"""
async with httpx.AsyncClient() as client:
# Ricerca azienda
search_resp = await client.get(
f"{self.CDP_API_URL}/companies/search",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"q": company_name, "year": year}
)
if not search_resp.json().get("results"):
return None
company_id = search_resp.json()["results"][0]["id"]
# Recupera dati GHG disclosure
ghg_resp = await client.get(
f"{self.CDP_API_URL}/companies/{company_id}/ghg-emissions",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"year": year}
)
if ghg_resp.status_code != 200:
return None
return RawSupplierData(
supplier_id=company_name,
source=DataSource.CDP_API,
raw_payload=ghg_resp.json(),
received_at=datetime.utcnow()
)
class BronzeLayerStorage:
"""Salvataggio immutabile nel layer Bronze"""
def __init__(self, storage_client, bucket: str):
self.storage = storage_client
self.bucket = bucket
async def store(self, raw_data: RawSupplierData) -> str:
"""Salva dato grezzo con path deterministico basato su hash"""
path = (
f"scope3/bronze/"
f"{raw_data.received_at.year}/"
f"{raw_data.received_at.month:02d}/"
f"{raw_data.supplier_id}/"
f"{raw_data.content_hash}.json"
)
await self.storage.upload_json(
bucket=self.bucket,
path=path,
data={
"supplier_id": raw_data.supplier_id,
"source": raw_data.source.value,
"received_at": raw_data.received_at.isoformat(),
"content_hash": raw_data.content_hash,
"payload": raw_data.raw_payload
}
)
return path
アクティビティベースと支出ベース: 適切な方法の選択
GHG プロトコルでは、スコープ 3 の 4 つの計算方法が定義されています。 2 つの基本的なアプローチに集約します。 アクティビティベースの e 支出ベースの。 どちらを選択するかは、データの入手可能性とカテゴリの重要性によって決まります。 そしてサプライヤーとの関係の成熟度。
方法論の比較: アクティビティベースと支出ベース
| サイズ | アクティビティベース | 支出ベース |
|---|---|---|
| Formula | 量 × 排出係数 (単位/kg CO2e) | 支出 (EUR) × EEIO 係数 (kg CO2e/EUR) |
| 正確さ | 高 (一次データで ±5 ~ 15%) | 低~中 (±50~100%) |
| 要求されたデータ | 物理量(kg、km、kWh、t) | 会計請求書のみ (EUR、USD) |
| ソースEF | Climatiq、IPCC、DEFRA、ecoinvent | USEEIO、EXIOBASE、WIOD |
| いつ使用するか | 材料カテゴリ、大手サプライヤー | キックオフ、小規模サプライヤー、カタログ。 <1% |
| 収集努力 | 高: サプライヤーの協力が必要 | 低: データはすでに ERP/SAP にあります |
| CSRDの受容性 | 素材カテゴリのお気に入り | 初期プロキシとして受け入れられました |
最適な戦略は 1 つのアプローチです プログレッシブハイブリッド: から始めましょう 支出ベースでバリューチェーン全体で迅速なベースラインを設定し、その後段階的に移行します 特定された材料カテゴリの活動ベースに向けて。 GHG プロトコルは 3 つのレベルを定義します データ品質 (Tier 1、2、3) は、この進歩に正確に対応します。
# calculators/emission_calculator.py
# Calcolo emissioni activity-based e spend-based
from dataclasses import dataclass
from typing import Optional
import math
# ============================================================
# EMISSION FACTORS DATABASE (simplified)
# In produzione: usa Climatiq API o database ecoinvent
# ============================================================
EMISSION_FACTORS: dict[str, dict] = {
# Cat. 1: Purchased goods & services
"cloud_compute_kwh": {
"value": 0.233, # kg CO2e/kWh (IT grid mix 2024)
"unit": "kWh",
"source": "IEA 2024",
"uncertainty_pct": 10.0
},
"hardware_laptop": {
"value": 350.0, # kg CO2e/unit (embodied carbon)
"unit": "unit",
"source": "Dell 2024 PCF",
"uncertainty_pct": 20.0
},
# Cat. 6: Business travel
"flight_economy_short": {
"value": 0.255, # kg CO2e/passenger.km
"unit": "passenger.km",
"source": "DEFRA 2024",
"uncertainty_pct": 15.0
},
"flight_economy_long": {
"value": 0.195,
"unit": "passenger.km",
"source": "DEFRA 2024",
"uncertainty_pct": 15.0
},
# Cat. 7: Employee commuting
"car_average": {
"value": 0.170, # kg CO2e/km
"unit": "km",
"source": "DEFRA 2024",
"uncertainty_pct": 12.0
},
"public_transport_it": {
"value": 0.048,
"unit": "passenger.km",
"source": "Ispra 2024",
"uncertainty_pct": 18.0
},
}
# EEIO Spend-based factors (EXIOBASE 3.8)
# kg CO2e per EUR di spesa per categoria merceologica
EEIO_FACTORS: dict[str, float] = {
"it_services": 0.312, # IT e telecomunicazioni
"professional_services": 0.198, # Consulenza, legale, etc.
"office_supplies": 0.445,
"cloud_hosting": 0.287,
"marketing": 0.231,
"utilities": 0.892,
"hr_services": 0.167,
"travel_accommodation": 0.521,
}
def calculate_activity_based(
activity_type: str,
quantity: float,
custom_ef: Optional[float] = None
) -> tuple[float, float]:
"""
Calcola emissioni con metodo activity-based.
Returns:
(co2e_kg, uncertainty_pct)
"""
if custom_ef is not None:
return quantity * custom_ef, 30.0 # alta incertezza EF custom
ef_data = EMISSION_FACTORS.get(activity_type)
if not ef_data:
raise ValueError(f"Emission factor non trovato: {activity_type}")
co2e_kg = quantity * ef_data["value"]
uncertainty = ef_data["uncertainty_pct"]
return co2e_kg, uncertainty
def calculate_spend_based(
spend_eur: float,
procurement_category: str,
inflation_correction: float = 1.0
) -> tuple[float, float]:
"""
Calcola emissioni con metodo spend-based (EEIO).
Args:
spend_eur: importo in EUR
procurement_category: categoria merceologica EEIO
inflation_correction: fattore per correggere inflazione vs anno base EEIO
Returns:
(co2e_kg, uncertainty_pct)
"""
eeio_factor = EEIO_FACTORS.get(procurement_category)
if not eeio_factor:
raise ValueError(f"EEIO factor non trovato: {procurement_category}")
# Corregge per inflazione (EEIO factors spesso in EUR 2015)
adjusted_spend = spend_eur / inflation_correction
co2e_kg = adjusted_spend * eeio_factor
# Lo spend-based ha incertezza intrinsecamente alta
uncertainty = 75.0
return co2e_kg, uncertainty
def propagate_uncertainty(
values: list[float],
uncertainties_pct: list[float]
) -> float:
"""
Propagazione incertezza quadratica (somma in quadratura).
Valida quando le incertezze sono indipendenti.
Returns:
uncertainty_pct sul totale
"""
weighted_variance_sum = sum(
(v * u/100) ** 2
for v, u in zip(values, uncertainties_pct)
)
total = sum(values)
if total == 0:
return 0.0
combined_std = math.sqrt(weighted_variance_sum)
return (combined_std / total) * 100
def calculate_category_total(
activities: list[dict]
) -> dict:
"""
Calcola totale categoria Scope 3 con propagazione incertezza.
activities: lista di {method, value_kg, uncertainty_pct}
"""
if not activities:
return {"total_co2e_kg": 0.0, "uncertainty_pct": 0.0}
values = [a["value_kg"] for a in activities]
uncertainties = [a["uncertainty_pct"] for a in activities]
total_co2e = sum(values)
combined_uncertainty = propagate_uncertainty(values, uncertainties)
# Qualità aggregata: peggiore del gruppo determina il tier
quality_tiers = [a.get("quality_tier", "TIER_3") for a in activities]
dominant_tier = min(quality_tiers) # TIER_1 < TIER_2 < TIER_3 lexicograficamente
return {
"total_co2e_kg": total_co2e,
"total_co2e_tonnes": total_co2e / 1000,
"uncertainty_pct": combined_uncertainty,
"uncertainty_kg": total_co2e * combined_uncertainty / 100,
"dominant_quality_tier": dominant_tier,
"activity_count": len(activities)
}
パイプラインのエアフロー: 自動スコープ 3 計算用の DAG
スコープ 3 のパイプライン オーケストレーションには、適切に構造化された DAG が必要です。 完全なライフサイクル: データ収集、標準化、排出量計算、品質チェック そしてゴールドレイヤーで公開します。 DAG は次のとおりである必要があります 冪等 (実行可能ファイル 数回副作用なし) e リセット可能 の場合 部分的な失敗。
# dags/scope3_pipeline_dag.py
# Apache Airflow DAG per pipeline emissioni Scope 3
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task, task_group
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import Variable
import json
import logging
logger = logging.getLogger(__name__)
# ============================================================
# CONFIGURAZIONE DAG
# ============================================================
SCOPE3_DAG_CONFIG = {
"reporting_year": 2024,
"companies": [
{"id": "S001", "name": "AWS", "tier": "TIER_1", "source": "ecovadis"},
{"id": "S002", "name": "Microsoft Azure", "tier": "TIER_1", "source": "cdp"},
{"id": "S003", "name": "Supplier_XYZ", "tier": "TIER_2", "source": "questionnaire"},
# ... altri supplier
],
"categories_enabled": [1, 2, 3, 6, 7, 11, 15],
"quality_threshold_pct": 80.0,
"alert_email": "esg-team@company.com"
}
default_args = {
"owner": "esg-team",
"depends_on_past": False,
"email_on_failure": True,
"email": [SCOPE3_DAG_CONFIG["alert_email"]],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="scope3_emissions_pipeline",
default_args=default_args,
description="Pipeline calcolo emissioni Scope 3 value chain",
schedule_interval="@quarterly", # Esecuzione trimestrale
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["emissions", "scope3", "esg", "ghg-protocol"],
max_active_runs=1, # Serializza: mai due calcoli in parallelo
) as dag:
# ============================================================
# FASE 1: RACCOLTA DATI SUPPLIER (in parallelo per supplier)
# ============================================================
@task_group(group_id="data_collection")
def collect_supplier_data():
@task(task_id="fetch_ecovadis_suppliers")
def fetch_ecovadis() -> list[dict]:
"""Raccoglie dati da EcoVadis Carbon Data Network"""
from collectors.supplier_collector import EcoVadisCollector
import asyncio
api_key = Variable.get("ECOVADIS_API_KEY", deserialize_json=False)
collector = EcoVadisCollector(api_key, "https://api.ecovadis.com")
suppliers_ecovadis = [
s for s in SCOPE3_DAG_CONFIG["companies"]
if s["source"] == "ecovadis"
]
results = []
for supplier in suppliers_ecovadis:
raw = asyncio.run(
collector.fetch_supplier_carbon_data(
supplier["id"],
SCOPE3_DAG_CONFIG["reporting_year"]
)
)
results.append({
"supplier_id": raw.supplier_id,
"content_hash": raw.content_hash,
"status": "fetched",
"has_data": raw.raw_payload.get("status") != "no_data"
})
logger.info(f"EcoVadis - Supplier {supplier['id']}: fetched")
return results
@task(task_id="fetch_cdp_suppliers")
def fetch_cdp() -> list[dict]:
"""Raccoglie dati verificati da CDP"""
from collectors.supplier_collector import CDPCollector
import asyncio
api_token = Variable.get("CDP_API_TOKEN")
collector = CDPCollector(api_token)
suppliers_cdp = [
s for s in SCOPE3_DAG_CONFIG["companies"]
if s["source"] == "cdp"
]
results = []
for supplier in suppliers_cdp:
raw = asyncio.run(
collector.search_supplier(
supplier["name"],
SCOPE3_DAG_CONFIG["reporting_year"]
)
)
if raw:
results.append({
"supplier_id": supplier["id"],
"content_hash": raw.content_hash,
"status": "fetched",
"has_data": True
})
else:
results.append({
"supplier_id": supplier["id"],
"status": "not_found",
"has_data": False
})
return results
@task(task_id="process_manual_questionnaires")
def process_questionnaires() -> list[dict]:
"""Processa questionari manuali caricati in S3"""
# In produzione: legge da bucket S3 o SharePoint
# Qui restituiamo dati di esempio
return [{
"supplier_id": "S003",
"status": "processed",
"has_data": True,
"scope3_cat1_tco2e": 45.2,
"scope3_cat6_tco2e": 12.8
}]
ev = fetch_ecovadis()
cdp = fetch_cdp()
q = process_questionnaires()
return [ev, cdp, q]
# ============================================================
# FASE 2: STANDARDIZZAZIONE E CALCOLO EMISSIONI
# ============================================================
@task(task_id="standardize_activities")
def standardize_activities(collection_results: list) -> list[dict]:
"""Normalizza tutti i dati in unità fisiche standard"""
from normalizers.activity_normalizer import ActivityNormalizer
normalizer = ActivityNormalizer()
standardized = []
for batch in collection_results:
for result in batch:
if result.get("has_data"):
activities = normalizer.normalize(result)
standardized.extend(activities)
logger.info(f"Standardizzate {len(standardized)} attività")
return standardized
@task(task_id="calculate_emissions")
def calculate_emissions(activities: list[dict]) -> list[dict]:
"""Calcola emissioni CO2e per ogni attività standardizzata"""
from calculators.emission_calculator import (
calculate_activity_based,
calculate_spend_based
)
from models.scope3_pipeline import CalculationMethod
results = []
for activity in activities:
if activity["method"] == "activity_based":
co2e_kg, uncertainty = calculate_activity_based(
activity["activity_type"],
activity["quantity"]
)
method = CalculationMethod.ACTIVITY_BASED
else:
co2e_kg, uncertainty = calculate_spend_based(
activity["spend_eur"],
activity["procurement_category"]
)
method = CalculationMethod.SPEND_BASED
results.append({
**activity,
"co2e_kg": co2e_kg,
"co2e_tonnes": co2e_kg / 1000,
"uncertainty_pct": uncertainty,
"calculation_method": method.value,
"calculated_at": datetime.utcnow().isoformat()
})
return results
# ============================================================
# FASE 3: DATA QUALITY CHECK
# ============================================================
@task(task_id="data_quality_check")
def data_quality_check(results: list[dict]) -> dict:
"""Verifica qualità dati e genera score per categoria"""
from quality.data_quality_scorer import DataQualityScorer
scorer = DataQualityScorer()
quality_report = scorer.score_results(results)
if quality_report["overall_score"] < SCOPE3_DAG_CONFIG["quality_threshold_pct"]:
logger.warning(
f"Quality score sotto soglia: {quality_report['overall_score']}%"
)
return quality_report
# ============================================================
# FASE 4: AUDIT TRAIL E PUBBLICAZIONE GOLD LAYER
# ============================================================
@task(task_id="create_audit_trail")
def create_audit_trail(
results: list[dict],
quality_report: dict
) -> str:
"""Crea audit trail immutabile con hash chain"""
from audit.hash_chain import HashChain
chain = HashChain()
chain_id = chain.create_chain(
calculation_results=results,
quality_report=quality_report,
pipeline_version="2.1.0",
methodology="GHG_Protocol_Scope3_2011",
reporting_standard="CSRD_ESRS_E1"
)
logger.info(f"Audit trail creato: {chain_id}")
return chain_id
@task(task_id="publish_gold_layer")
def publish_gold_layer(
results: list[dict],
audit_chain_id: str
) -> None:
"""Pubblica dati aggregati nel Gold layer (PostgreSQL)"""
hook = PostgresHook(postgres_conn_id="emissions_db")
for result in results:
hook.run(
"""
INSERT INTO scope3_emissions_gold (
supplier_id, scope3_category, co2e_tonnes,
calculation_method, uncertainty_pct,
quality_tier, audit_chain_id,
reporting_year, published_at
) VALUES (
%(supplier_id)s, %(scope3_category)s, %(co2e_tonnes)s,
%(calculation_method)s, %(uncertainty_pct)s,
%(quality_tier)s, %(audit_chain_id)s,
%(reporting_year)s, NOW()
)
ON CONFLICT (supplier_id, scope3_category, reporting_year)
DO UPDATE SET
co2e_tonnes = EXCLUDED.co2e_tonnes,
updated_at = NOW()
""",
parameters={
**result,
"audit_chain_id": audit_chain_id,
"reporting_year": SCOPE3_DAG_CONFIG["reporting_year"]
}
)
logger.info(f"Pubblicati {len(results)} record nel Gold layer")
# ============================================================
# WIRING DEL DAG
# ============================================================
collection_results = collect_supplier_data()
standardized = standardize_activities(collection_results)
emission_results = calculate_emissions(standardized)
quality = data_quality_check(emission_results)
chain_id = create_audit_trail(emission_results, quality)
publish_gold_layer(emission_results, chain_id)
データ品質スコアリングと不確実性の伝播
GHG プロトコル スコープ 3 標準では、その猫を明示的に認識しています。 1~15はそうではありません 絶対的な確実性を持って知られることはありません。品質レポートには推定値を含める必要があります の量的不確実性 各カテゴリに関連付けられています。 IPCCは正式に グッドプラクティスガイダンスの不確実性伝播方法。
# quality/data_quality_scorer.py
# Scoring qualità dati Scope 3
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import math
from datetime import datetime, timedelta
class QualityDimension(Enum):
COMPLETENESS = "completeness"
ACCURACY = "accuracy"
TIMELINESS = "timeliness"
VERIFICATION = "verification"
GRANULARITY = "granularity"
@dataclass
class QualityScore:
dimension: QualityDimension
score: float # 0-100
weight: float # peso nel calcolo aggregato
notes: str = ""
def score_supplier_data_quality(
supplier: dict,
reference_date: datetime = None
) -> dict[str, float]:
"""
Calcola score qualità multi-dimensionale per i dati di un supplier.
Basato su GHG Protocol Data Quality Guidance.
"""
if reference_date is None:
reference_date = datetime.utcnow()
scores = []
# 1. COMPLETENESS: quante delle categorie richieste sono presenti?
required_fields = [
"scope3_cat1_tco2e", "scope3_cat6_tco2e", "scope3_cat7_tco2e"
]
present = sum(1 for f in required_fields if supplier.get(f) is not None)
completeness_score = (present / len(required_fields)) * 100
scores.append(QualityScore(
dimension=QualityDimension.COMPLETENESS,
score=completeness_score,
weight=0.30
))
# 2. TIMELINESS: quanto sono recenti i dati?
data_year = supplier.get("reporting_year", 2020)
current_year = reference_date.year
age_years = current_year - data_year
if age_years <= 1:
timeliness_score = 100.0
elif age_years == 2:
timeliness_score = 75.0
elif age_years == 3:
timeliness_score = 50.0
else:
timeliness_score = 20.0
scores.append(QualityScore(
dimension=QualityDimension.TIMELINESS,
score=timeliness_score,
weight=0.20
))
# 3. VERIFICATION: i dati sono stati verificati da terze parti?
verification_level = supplier.get("verification", "none")
verification_score = {
"independent_assured": 100.0, # GHG verificato da auditor indipendente
"limited_assurance": 80.0, # Limited assurance
"internal_reviewed": 60.0, # Solo review interna
"supplier_declared": 40.0, # Auto-dichiarazione
"estimated": 20.0, # Stima spend-based
"none": 0.0
}.get(verification_level, 20.0)
scores.append(QualityScore(
dimension=QualityDimension.VERIFICATION,
score=verification_score,
weight=0.30
))
# 4. GRANULARITY: attività-specifico o aggregato?
data_type = supplier.get("data_type", "aggregated")
granularity_score = {
"site_specific": 100.0, # Dati per sito produttivo
"product_specific": 90.0, # PCF per prodotto/servizio
"supplier_specific": 70.0, # Dato totale supplier
"sector_average": 40.0, # Media settoriale
"aggregated": 20.0
}.get(data_type, 20.0)
scores.append(QualityScore(
dimension=QualityDimension.GRANULARITY,
score=granularity_score,
weight=0.20
))
# Calcolo score aggregato ponderato
overall = sum(s.score * s.weight for s in scores)
# Mappa score a Tier GHG Protocol
if overall >= 80:
tier = "TIER_1"
uncertainty_band_pct = 15.0
elif overall >= 50:
tier = "TIER_2"
uncertainty_band_pct = 40.0
else:
tier = "TIER_3"
uncertainty_band_pct = 75.0
return {
"overall_score": round(overall, 1),
"tier": tier,
"uncertainty_band_pct": uncertainty_band_pct,
"dimension_scores": {
s.dimension.value: round(s.score, 1)
for s in scores
}
}
def monte_carlo_uncertainty(
base_estimate_tco2e: float,
uncertainty_pct: float,
n_simulations: int = 10_000
) -> dict:
"""
Stima intervallo di confidenza con simulazione Monte Carlo.
Per reporting CSRD si raccomanda almeno 1.000 simulazioni.
"""
import random
# Distribuzione log-normale (emissioni non possono essere negative)
sigma = math.log(1 + (uncertainty_pct / 100) ** 2) ** 0.5
mu = math.log(base_estimate_tco2e) - sigma ** 2 / 2
simulated = [
math.exp(random.gauss(mu, sigma))
for _ in range(n_simulations)
]
simulated_sorted = sorted(simulated)
p05 = simulated_sorted[int(n_simulations * 0.05)]
p50 = simulated_sorted[int(n_simulations * 0.50)]
p95 = simulated_sorted[int(n_simulations * 0.95)]
return {
"base_estimate_tco2e": base_estimate_tco2e,
"p05_tco2e": round(p05, 2),
"p50_tco2e": round(p50, 2),
"p95_tco2e": round(p95, 2),
"confidence_interval_90pct": {
"lower": round(p05, 2),
"upper": round(p95, 2)
},
"coefficient_of_variation": round(
(p95 - p05) / (2 * p50) * 100, 1
)
}
ハッシュチェーンによる不変監査証跡
La エンドツーエンドのトレーサビリティ それは最も重要な要件の 1 つです 検証可能なスコープ 3 レポート。外部監査人は、レポート内のすべての数値を追跡できなければなりません すべての変換ステップを経て、最終的にデータのプライマリ ソースに到達します。あ ハッシュチェーン ブロックチェーンテクノロジーからインスピレーションを得た(ただし複雑さはありません) 分散型)監査証跡の不変性を保証します。
# audit/hash_chain.py
# Audit trail immutabile per emissioni Scope 3
import hashlib
import json
import uuid
from datetime import datetime
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class AuditRecord:
"""Singolo record nell'audit chain"""
def __init__(
self,
record_type: str,
payload: dict,
previous_hash: str,
chain_id: str,
sequence: int
):
self.record_id = str(uuid.uuid4())
self.record_type = record_type
self.payload = payload
self.previous_hash = previous_hash
self.chain_id = chain_id
self.sequence = sequence
self.created_at = datetime.utcnow().isoformat()
self.record_hash = self._compute_hash()
def _compute_hash(self) -> str:
"""SHA-256 hash di tutti i campi del record (eccetto il hash stesso)"""
data = {
"record_id": self.record_id,
"record_type": self.record_type,
"chain_id": self.chain_id,
"sequence": self.sequence,
"previous_hash": self.previous_hash,
"created_at": self.created_at,
"payload_hash": hashlib.sha256(
json.dumps(self.payload, sort_keys=True, default=str).encode()
).hexdigest()
}
return hashlib.sha256(
json.dumps(data, sort_keys=True).encode()
).hexdigest()
def to_dict(self) -> dict:
return {
"record_id": self.record_id,
"record_type": self.record_type,
"chain_id": self.chain_id,
"sequence": self.sequence,
"previous_hash": self.previous_hash,
"record_hash": self.record_hash,
"created_at": self.created_at,
"payload": self.payload
}
class HashChain:
"""
Hash chain per audit trail immutabile emissioni Scope 3.
Ogni record contiene l'hash del record precedente,
rendendo impossibile modificare un record senza invalidare
tutti i record successivi.
"""
GENESIS_HASH = "0" * 64 # Hash del primo record della chain
def __init__(self, db_client=None):
self.db = db_client
self.records: list[AuditRecord] = []
def create_chain(
self,
calculation_results: list[dict],
quality_report: dict,
pipeline_version: str,
methodology: str,
reporting_standard: str
) -> str:
"""Crea una nuova chain per un calcolo Scope 3 completo"""
chain_id = str(uuid.uuid4())
previous_hash = self.GENESIS_HASH
# Record 1: Metadati della pipeline
pipeline_record = AuditRecord(
record_type="PIPELINE_METADATA",
payload={
"version": pipeline_version,
"methodology": methodology,
"reporting_standard": reporting_standard,
"calculation_timestamp": datetime.utcnow().isoformat(),
"total_activities": len(calculation_results)
},
previous_hash=previous_hash,
chain_id=chain_id,
sequence=0
)
self.records.append(pipeline_record)
previous_hash = pipeline_record.record_hash
# Record 2: Quality report
quality_record = AuditRecord(
record_type="QUALITY_ASSESSMENT",
payload=quality_report,
previous_hash=previous_hash,
chain_id=chain_id,
sequence=1
)
self.records.append(quality_record)
previous_hash = quality_record.record_hash
# Record 3..N: Singoli risultati di emissione
for i, result in enumerate(calculation_results):
emission_record = AuditRecord(
record_type="EMISSION_CALCULATION",
payload={
"supplier_id": result.get("supplier_id"),
"scope3_category": result.get("scope3_category"),
"co2e_tonnes": result.get("co2e_tonnes"),
"calculation_method": result.get("calculation_method"),
"emission_factor_source": result.get("emission_factor_source"),
"uncertainty_pct": result.get("uncertainty_pct"),
"quality_tier": result.get("quality_tier")
},
previous_hash=previous_hash,
chain_id=chain_id,
sequence=2 + i
)
self.records.append(emission_record)
previous_hash = emission_record.record_hash
# Persist su DB (o storage immutabile)
if self.db:
self._persist_chain(chain_id)
logger.info(
f"Chain {chain_id} creata con {len(self.records)} record. "
f"Final hash: {previous_hash[:16]}..."
)
return chain_id
def verify_chain_integrity(self, chain_id: str) -> bool:
"""
Verifica che nessun record sia stato alterato.
Percorre la chain ricomputando ogni hash.
"""
records = self._load_chain(chain_id)
if not records:
return False
expected_previous = self.GENESIS_HASH
for record_dict in records:
# Ricomputa hash
record = AuditRecord(
record_type=record_dict["record_type"],
payload=record_dict["payload"],
previous_hash=record_dict["previous_hash"],
chain_id=record_dict["chain_id"],
sequence=record_dict["sequence"]
)
if record_dict["previous_hash"] != expected_previous:
logger.error(
f"Chain corrotta al record {record_dict['sequence']}: "
f"previous_hash non corrisponde"
)
return False
expected_previous = record.record_hash
return True
def _persist_chain(self, chain_id: str) -> None:
"""Salva tutti i record della chain nel DB"""
for record in self.records:
self.db.insert("scope3_audit_chain", record.to_dict())
def _load_chain(self, chain_id: str) -> list[dict]:
"""Carica i record della chain dal DB in ordine di sequenza"""
if not self.db:
return [r.to_dict() for r in self.records]
return self.db.query(
"SELECT * FROM scope3_audit_chain WHERE chain_id = %s ORDER BY sequence",
[chain_id]
)
視覚化: サンキー ダイアグラムとヒートマップ カテゴリ
適切に構築されたスコープ 3 パイプラインは、データをレンダリングする視覚化も生成する必要があります。 技術的な利害関係者にも非技術的な利害関係者にも理解できる。の サンキーダイアグラム それはです バリューチェーンに沿った排出量フローを示す理想的なツールである一方、 ヒートマップ 最も重要なカテゴリをすばやく特定できます データ品質が低いものもあります。
# visualizations/scope3_charts.py
# Generazione Sankey diagram e heatmap Scope 3
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
from typing import Optional
def create_scope3_sankey(
emission_data: list[dict],
title: str = "Scope 3 Value Chain Emissions"
) -> go.Figure:
"""
Crea Sankey diagram per visualizzare flussi emissioni Scope 3.
Struttura: Supplier -> Categoria S3 -> Totale Scope 3
"""
# Raccoglie nodi unici
suppliers = list(set(d["supplier_id"] for d in emission_data))
categories = list(set(f"Cat. {d['scope3_category']}" for d in emission_data))
all_nodes = suppliers + categories + ["Scope 3 Total"]
node_index = {node: i for i, node in enumerate(all_nodes)}
# Costruisce link source->target->value
source_indices = []
target_indices = []
values = []
link_labels = []
for record in emission_data:
supplier = record["supplier_id"]
category = f"Cat. {record['scope3_category']}"
tco2e = record["co2e_tonnes"]
# Supplier -> Categoria
source_indices.append(node_index[supplier])
target_indices.append(node_index[category])
values.append(tco2e)
link_labels.append(f"{tco2e:.1f} tCO2e")
# Categoria -> Total
for cat in categories:
cat_total = sum(
d["co2e_tonnes"]
for d in emission_data
if f"Cat. {d['scope3_category']}" == cat
)
source_indices.append(node_index[cat])
target_indices.append(node_index["Scope 3 Total"])
values.append(cat_total)
link_labels.append(f"{cat_total:.1f} tCO2e")
# Colori nodi
node_colors = (
["#2196F3"] * len(suppliers) + # Blu per supplier
["#FF9800"] * len(categories) + # Arancione per categorie
["#4CAF50"] # Verde per totale
)
fig = go.Figure(go.Sankey(
arrangement="snap",
node=dict(
pad=20,
thickness=20,
line=dict(color="white", width=0.5),
label=all_nodes,
color=node_colors,
hovertemplate="{label}
tCO2e: {value:.1f}<extra></extra>"
),
link=dict(
source=source_indices,
target=target_indices,
value=values,
label=link_labels,
color="rgba(100,100,100,0.3)"
)
))
fig.update_layout(
title_text=title,
font_size=12,
height=600,
paper_bgcolor="rgba(0,0,0,0)",
plot_bgcolor="rgba(0,0,0,0)"
)
return fig
def create_category_heatmap(
emission_data: list[dict]
) -> go.Figure:
"""
Heatmap: asse X = categoria Scope 3, asse Y = qualità dato.
Colore = tCO2e. Aiuta a prioritizzare effort raccolta dati.
"""
df = pd.DataFrame(emission_data)
# Aggrega per categoria e tier qualità
pivot = df.pivot_table(
values="co2e_tonnes",
index="quality_tier",
columns="scope3_category",
aggfunc="sum",
fill_value=0
)
# Ordina tier (TIER_1 migliore in alto)
tier_order = ["TIER_1", "TIER_2", "TIER_3"]
pivot = pivot.reindex(
[t for t in tier_order if t in pivot.index]
)
fig = go.Figure(go.Heatmap(
z=pivot.values,
x=[f"Cat. {c}" for c in pivot.columns],
y=list(pivot.index),
colorscale="RdYlGn_r", # Rosso = alta emissione (critico)
text=pivot.values.round(1),
texttemplate="%{text} t",
textfont={"size": 11},
hovertemplate="Categoria: %{x}
Tier: %{y}
%{z:.1f} tCO2e<extra></extra>",
colorbar=dict(title="tCO2e")
))
fig.update_layout(
title="Heatmap Scope 3: Emissioni per Categoria e Qualità Dato",
xaxis_title="Categoria GHG Protocol",
yaxis_title="Tier Qualità Dato",
height=350,
margin=dict(l=80, r=20, t=60, b=60)
)
return fig
def generate_scope3_dashboard_html(
emission_data: list[dict],
output_path: str
) -> None:
"""Genera report HTML standalone con tutti i grafici"""
sankey = create_scope3_sankey(emission_data)
heatmap = create_category_heatmap(emission_data)
total_tco2e = sum(d["co2e_tonnes"] for d in emission_data)
by_category = {}
for d in emission_data:
cat = d["scope3_category"]
by_category[cat] = by_category.get(cat, 0) + d["co2e_tonnes"]
top_category = max(by_category, key=by_category.get)
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>Scope 3 Emissions Report</title>
<meta charset="utf-8">
</head>
<body>
<h1>Scope 3 Value Chain Emissions Report</h1>
<p>Totale: <strong>{total_tco2e:.1f} tCO2e</strong></p>
<p>Categoria più materiale: Cat. {top_category}
({by_category[top_category]:.1f} tCO2e)</p>
{sankey.to_html(full_html=False)}
{heatmap.to_html(full_html=False)}
</body>
</html>
"""
with open(output_path, "w") as f:
f.write(html_content)
スコープ 3 を報告するための CSRD/ESRS E1 要件
La 企業持続可能性報告指令 (CSRD) および関連規格 ESRS E1 (気候変動) 彼らはスコープ 3 の報告を自発的なものから 何千もの欧州企業に義務付けられています。実施スケジュールは時間差で実施されます そしてすでに進行中です。
CSRD スコープ 3 タイムラインが必要
| FY | 報告します、入ります | 科目 | 注意事項 |
|---|---|---|---|
| 2024年 | 2025 年初頭 | すでにNFRDの対象となっている大規模PIE(従業員500人以上) | 第 1 波: EU 企業約 12,000 社 |
| 2025年 | 2026 年初頭 | すべての大企業 (250 部門以上または 4,000 万ユーロ以上) | ~50,000社のEU企業 |
| 2026年 | 2027 年初頭 | 上場中小企業 | 簡素化された ESRS 標準 |
| 2028年 | 2029 年初頭 | EU に子会社を持つ非 EU 企業 | 重大な世界的影響 |
ESRS E1 では、スコープ 3 の排出に対して特に次のことが要求されます。
- すべての材料カテゴリーの開示: マテリアリティを決定する必要がある 二重のマテリアリティ分析(影響 + 財務リスク)を通じて。ほとんどの場合 ハイテク企業のうち、少なくとも 4 ~ 6 つのカテゴリーが重要です。
- カテゴリ別内訳: 値を単一の合計としてレポートすることはできません 集合体。各物質カテゴリには、tCO2e の独自のデータが必要です。
- 明示的な方法論: カテゴリごとに計算方法を宣言する必要があります (活動ベース、支出ベース、サプライヤー固有)、排出係数の発生源と段階 データの品質。
- 炭素クレジットによる網はありません: 総排出量を報告する必要があります 購入した補償金やカーボン オフセットとは別に。
- 強制保険: 当初は限定的な保証を目的としており、 将来的には合理的な保証に移行します。この記事で説明されている監査証跡 それはまさに査読者が求めるものです。
- 目標と移行計画: 企業は次の目標を宣言する必要があります 削減は中間マイルストーンで 1.5 °C (できれば SBTi によって検証) に合わせられます。
注意: スコープ 3 と二重マテリアリティ
ESRS E1 では、スコープ 3 の 15 のカテゴリーすべてを報告する必要はなく、特定されたカテゴリーのみを報告する必要があります。 どうやって 材料 二重の重要性の分析において。ただし、そのプロセスは、 重要性の決定は文書化され、監査可能でなければなりません。 1 つを除外する 「データ不足による」というカテゴリーは受け入れられる正当化理由ではありません。それを証明する必要があります。 そのカテゴリは特定のビジネスにとって重要ではないこと。
ケーススタディ: 50 社のサプライヤーを抱える SaaS 企業
すべてを具体的な例に置き換えてみましょう。従業員 150 人のイタリアの中規模 SaaS、 収益は 1,500 万ユーロ、AWS 上のインフラストラクチャと 50 のアクティブなサプライヤー。経営陣が決定したのは、 CSRD よりも前にスコープ 3 の計算を開始し、実現までに 3 か月の期間を設ける 監査人が検証できるデータ。
会社概要: SaaS Italia S.r.l.
| パラメータ | 価値 |
|---|---|
| 従業員 | 150 (70% がスマートに働く) |
| 所在地 | ミラノ本社 + ローマオフィス |
| インフラストラクチャー | AWS eu-west-1 (プライマリ)、GCP europe-west1 (バックアップ) |
| アクティブなサプライヤー | 50個(大8個、小・中42個) |
| 調達費 | ~420万ユーロ/年 |
| 年間フライト | 約 380 便 (会議 + 顧客) |
フェーズ 1 – 重要性分析 (第 1 ~ 2 週目): ESGチームは、 スコープ 3 の材料カテゴリーを特定するために迅速な分析を実施しました。からのデータの使用 EEIO 係数を使用した初期代理として ERP (SAP) からの支出を計算すると、次の推定値が得られました。 「スクリーニング」:
スコープ 3 の重要性スクリーニング — SaaS Italia S.r.l.
| 猫。 | 説明 | 支出ベースの推定値 (tCO2e) | 全体の % | 決断 |
|---|---|---|---|---|
| 1 | 購入した商品とサービス (クラウド、SW) | 342 | 54% | マテリアル → アクティビティベース |
| 6 | 出張 | 98 | 15% | マテリアル → アクティビティベース |
| 7 | 従業員の通勤 | 87 | 14% | 資料 → 従業員アンケート |
| 11 | 販売した商品の使用 | 76 | 12% | MATERIAL → SKI測定 |
| 2 | 資本財(ラップトップ、ハードウェア) | 28 | 4% | 材質 → ベンダー PCF |
| 他の | 猫。 3、5、8、15 | 6 | 1% | 非物質→支出ベース |
フェーズ 2 – データ収集 (第 2 ~ 8 週):
- 猫。 1 (クラウド): AWS 顧客二酸化炭素排出量ツールと GCP 二酸化炭素排出量 アカウントごとの月次発行データを提供します。 API 経由で抽出されたデータが Bronze にロードされる 層。品質: TIER 1 (サプライヤー固有、AWS 検証済み)。
- 猫。 1 (ソフトウェアとサービス): 大手サプライヤー 8 社 (>50,000 ユーロ/年) と連絡済み 構造化されたアンケートを使用します。 5 社が一次データ (Microsoft ERP、Slack、 セールスフォース)。 3 つはデータなし → EEIO による支出ベース。
- 猫。 6 (出張): 旅行代理店 (Carlson Wagonlit) から抽出されたデータ API 経由: ルーティングとクラスを含む 380 のフライト。 DEFRA 2024 によるアクティビティベースの計算。
- 猫。 7 (通勤): 全従業員 150 名を対象とした匿名アンケート (回答率 82%)。 交通手段、平均距離、オフィスにいる週の日数。
- 猫。 11(販売商品の使用): SCI (ソフトウェア炭素強度) の計算 実稼働インフラストラクチャで CodeCarbon を使用します。アクティブなセッション/月の数を掛けます。
# case_study/saas_italia_scope3.py
# Calcolo completo Scope 3 per SaaS Italia S.r.l.
from calculators.emission_calculator import (
calculate_activity_based,
calculate_spend_based,
calculate_category_total
)
def calculate_cat1_cloud() -> dict:
"""Cat. 1: Emissioni cloud AWS + GCP (dati primari vendor)"""
# Dati estratti dall'AWS Customer Carbon Footprint API
aws_kwh_year = 187_500 # kWh totali 2024
gcp_kwh_year = 12_300
aws_ef = 0.233 # kg CO2e/kWh IT grid (AWS eu-west-1)
gcp_ef = 0.198 # kg CO2e/kWh GCP europe-west1
aws_co2, aws_unc = calculate_activity_based("cloud_compute_kwh", aws_kwh_year, aws_ef)
gcp_co2, gcp_unc = calculate_activity_based("cloud_compute_kwh", gcp_kwh_year, gcp_ef)
activities = [
{"value_kg": aws_co2, "uncertainty_pct": 8.0, "quality_tier": "TIER_1"},
{"value_kg": gcp_co2, "uncertainty_pct": 10.0, "quality_tier": "TIER_1"},
]
result = calculate_category_total(activities)
result["category"] = 1
result["sub_category"] = "cloud_infrastructure"
return result
def calculate_cat6_business_travel() -> dict:
"""Cat. 6: Business travel (dati agenzia viaggi)"""
# 380 voli totali anno 2024
# 60% corto raggio (<1500km), 40% lungo raggio
short_haul_pkm = 380 * 0.6 * 850 # 850km avg corto raggio
long_haul_pkm = 380 * 0.4 * 3200 # 3200km avg lungo raggio
short_co2, short_unc = calculate_activity_based(
"flight_economy_short", short_haul_pkm
)
long_co2, long_unc = calculate_activity_based(
"flight_economy_long", long_haul_pkm
)
# Radiative forcing factor x1.9 per quota alta
rf_factor = 1.9
short_co2 *= rf_factor
long_co2 *= rf_factor
activities = [
{"value_kg": short_co2, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"},
{"value_kg": long_co2, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"},
]
result = calculate_category_total(activities)
result["category"] = 6
return result
def calculate_cat7_commuting() -> dict:
"""Cat. 7: Employee commuting (survey 123/150 dipendenti)"""
# Risultati survey (valori medi per dipendente/anno)
commuters = {
"car_solo": {"count": 38, "km_day": 28, "days_year": 120},
"car_shared": {"count": 12, "km_day": 22, "days_year": 110},
"public_transport": {"count": 52, "km_day": 35, "days_year": 140},
"cycling_walking": {"count": 21, "km_day": 4, "days_year": 150},
}
activities = []
# Auto privata
car_pkm = (
commuters["car_solo"]["count"] *
commuters["car_solo"]["km_day"] *
commuters["car_solo"]["days_year"]
)
co2_car, unc = calculate_activity_based("car_average", car_pkm)
activities.append({"value_kg": co2_car, "uncertainty_pct": 15.0, "quality_tier": "TIER_2"})
# Trasporto pubblico
pt_pkm = (
commuters["public_transport"]["count"] *
commuters["public_transport"]["km_day"] *
commuters["public_transport"]["days_year"]
)
co2_pt, unc = calculate_activity_based("public_transport_it", pt_pkm)
activities.append({"value_kg": co2_pt, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"})
# Ciclismo/piedi: zero emissioni dirette
activities.append({"value_kg": 0.0, "uncertainty_pct": 0.0, "quality_tier": "TIER_1"})
result = calculate_category_total(activities)
result["category"] = 7
result["survey_response_rate"] = 82.0
return result
def calculate_cat11_use_of_products() -> dict:
"""Cat. 11: Energia consumata dai clienti usando il SaaS"""
# SCI = 0.045 gCO2e per ogni API call (misurato con CodeCarbon)
sci_gco2e_per_call = 0.045
avg_calls_per_month = 48_500_000 # 48.5M calls/mese (dati produzione)
months = 12
total_calls = avg_calls_per_month * months
co2e_grams = total_calls * sci_gco2e_per_call
co2e_kg = co2e_grams / 1000
activities = [
{"value_kg": co2e_kg, "uncertainty_pct": 25.0, "quality_tier": "TIER_2"}
]
result = calculate_category_total(activities)
result["category"] = 11
result["metric"] = "API calls"
result["total_calls"] = total_calls
return result
def run_full_scope3_calculation() -> dict:
"""Esegue il calcolo completo Scope 3 per SaaS Italia S.r.l."""
results = {
"cat_1_cloud": calculate_cat1_cloud(),
"cat_6_travel": calculate_cat6_business_travel(),
"cat_7_commuting": calculate_cat7_commuting(),
"cat_11_use": calculate_cat11_use_of_products(),
}
# Categoria residuale (spend-based per tutto il resto)
residual_spend_eur = 210_000 # ~5% della spesa totale
residual_co2_kg, res_unc = calculate_spend_based(
residual_spend_eur, "it_services"
)
results["cat_residual"] = {
"total_co2e_tonnes": residual_co2_kg / 1000,
"uncertainty_pct": res_unc,
"category": "other",
"dominant_quality_tier": "TIER_3"
}
# Totale Scope 3
total_tco2e = sum(
v["total_co2e_tonnes"] for v in results.values()
)
from calculators.emission_calculator import propagate_uncertainty
all_values = [v["total_co2e_tonnes"] for v in results.values()]
all_uncertainties = [v["uncertainty_pct"] for v in results.values()]
overall_uncertainty = propagate_uncertainty(all_values, all_uncertainties)
return {
"company": "SaaS Italia S.r.l.",
"reporting_year": 2024,
"methodology": "GHG Protocol Corporate Value Chain Standard",
"scope3_total_tco2e": round(total_tco2e, 1),
"overall_uncertainty_pct": round(overall_uncertainty, 1),
"categories": results,
"notes": "Cat. 11 include radiative forcing factor per aviation"
}
if __name__ == "__main__":
import json
report = run_full_scope3_calculation()
print("=" * 50)
print(f"SCOPE 3 TOTALE: {report['scope3_total_tco2e']} tCO2e")
print(f"Incertezza: +/- {report['overall_uncertainty_pct']}%")
print("=" * 50)
for name, cat in report["categories"].items():
tco2e = cat.get("total_co2e_tonnes", 0)
unc = cat.get("uncertainty_pct", 0)
pct = tco2e / report["scope3_total_tco2e"] * 100
print(f" {name:25s} {tco2e:6.1f} tCO2e ({pct:.0f}%) ±{unc:.0f}%")
SaaS Italia S.r.l.の計算結果生成されるもの:
スコープ 3 最終結果 — SaaS Italia S.r.l. (2024年度)
| カテゴリ | tCO2e | % 合計 | 不確実性 | 階層 |
|---|---|---|---|---|
| 猫。 1 – クラウドとサービス | 43.7 | 36% | ±9% | ティア1 |
| 猫。 6 – 出張 | 33.5 | 28% | ±20% | ティア2 |
| 猫。 7 – 通勤 | 23.8 | 20% | ±17% | ティア2 |
| 猫。 11 – 製品の使用 | 26.2 | 22% | ±25% | ティア2 |
| 残された猫 | 6.5 | 5% | ±75% | ティア3 |
| トータルスコープ3 | 133.7 | 100% | ±13% | ティア2 |
スコープ 1 (本社ボイラーから約 8 tCO2e) とスコープ 2 (電力から約 12 tCO2e) を追加 オフィスの)、あなたは 総排出量は約 154 tCO2e 2024 年、そのうち 87% がスコープ 3。まさにソフトウェア会社の典型的なパターンです。
スコープ 3 パイプラインのベスト プラクティスとアンチパターン
スコープ 3 パイプライン実装チェックリスト
| エリア | ベストプラクティス | 避けるべきアンチパターン |
|---|---|---|
| データ | 受信したすべてのデータに対して不変のブロンズ レイヤー | 生データを修正されたバージョンで上書きします |
| 計算 | 使用される排出係数のバージョン | 年と出典を示さずに EF を使用する |
| 不確実性 | 常に各カテゴリー全体に不確実性を伝播する | 範囲を指定せずに正確な値のみをレポートします |
| 品質 | 明示的かつ文書化された品質スコア | TIER 1 と TIER 3 を区別せずに混在させる |
| 監査 | 各計算のハッシュ チェーン、検証可能なオフチェーン | Excel レポートはバージョン管理されておらず、追跡できません |
| サプライヤー | 支出/排出量に基づいて上位 20 のサプライヤーに優先順位を付ける | 50 社のサプライヤーすべてを同じように扱う |
| アップデート | 年次品質向上計画の策定 | 恒久的なソリューションとして支出ベースを受け入れる |
| ほうき | 理由のある除外を明示的に文書化する | 正式な正当性のないカテゴリを除外する |
漸進的改善計画 (データ成熟度ロードマップ)
GHG プロトコルは、進歩的なアプローチを明確に奨励しています。 今日の低品質の支出ベースのデータには何もありません。目標は改善することです 毎年の材料カテゴリの品質レベル:
- 1 年目 (ベースライン): 100% 支出ベース、誤差 ±75%、TIER 3
- 2年目: 活動ベース、±40%、TIER 2 の上位 10 位のサプライヤー
- 3年目: 検証済みの一次データを持つ上位 20 サプライヤー、±20%、TIER 2
- 4 年目以降: すべてのサプライヤーに対する EcoVadis/CDP 統合、±10%、TIER 1
この漸進的な改善は、CSRD レポートに次のように文書化できます。 「方法論の進化」と評者から肯定的に評価されています。
ゴールド層のデータベーススキーマ
Gold レイヤーには、高速集計クエリをサポートするように設計されたスキーマが必要です CSRD レポートを作成し、監査チェーンまでの追跡可能性を維持します。
-- schema/scope3_gold.sql
-- Schema PostgreSQL per il Gold Layer Scope 3
-- Tabella principale: emissioni aggregate per categoria
CREATE TABLE scope3_emissions_gold (
id BIGSERIAL PRIMARY KEY,
company_id VARCHAR(50) NOT NULL,
reporting_year INTEGER NOT NULL,
scope3_category INTEGER NOT NULL CHECK (scope3_category BETWEEN 1 AND 15),
supplier_id VARCHAR(100),
-- Valori emissioni
co2e_tonnes DECIMAL(12, 3) NOT NULL,
co2_tonnes DECIMAL(12, 3),
ch4_tonnes_co2e DECIMAL(12, 3),
n2o_tonnes_co2e DECIMAL(12, 3),
-- Metodologia e qualità
calculation_method VARCHAR(30) NOT NULL, -- activity_based, spend_based, etc.
emission_factor_source VARCHAR(100) NOT NULL,
emission_factor_value DECIMAL(10, 6),
quality_tier VARCHAR(10) NOT NULL, -- TIER_1, TIER_2, TIER_3
uncertainty_pct DECIMAL(5, 1) NOT NULL,
uncertainty_tonnes DECIMAL(12, 3) GENERATED ALWAYS AS
(co2e_tonnes * uncertainty_pct / 100) STORED,
-- Tracciabilità
audit_chain_id UUID NOT NULL REFERENCES scope3_audit_chain(chain_id),
pipeline_version VARCHAR(20) NOT NULL,
reporting_standard VARCHAR(50) DEFAULT 'GHG_Protocol_Scope3_2011',
-- Timestamps
published_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Unicità per reporting period
CONSTRAINT uq_emission_period
UNIQUE (company_id, reporting_year, scope3_category, supplier_id)
);
-- Indici per performance query CSRD report
CREATE INDEX idx_scope3_company_year
ON scope3_emissions_gold (company_id, reporting_year);
CREATE INDEX idx_scope3_category
ON scope3_emissions_gold (scope3_category);
CREATE INDEX idx_scope3_quality
ON scope3_emissions_gold (quality_tier, uncertainty_pct);
-- View per report aggregato CSRD
CREATE VIEW v_scope3_csrd_report AS
SELECT
company_id,
reporting_year,
scope3_category,
SUM(co2e_tonnes) AS total_co2e_tonnes,
-- Propagazione incertezza quadratica
SQRT(SUM(POWER(co2e_tonnes * uncertainty_pct / 100, 2))) /
NULLIF(SUM(co2e_tonnes), 0) * 100 AS combined_uncertainty_pct,
-- Qualità aggregata (tier peggiore nella categoria)
MIN(quality_tier) AS data_quality_tier,
-- Metodo più usato
MODE() WITHIN GROUP (ORDER BY calculation_method) AS primary_method,
COUNT(DISTINCT supplier_id) AS supplier_count,
MAX(updated_at) AS last_updated
FROM scope3_emissions_gold
GROUP BY company_id, reporting_year, scope3_category
ORDER BY company_id, reporting_year, scope3_category;
-- Tabella audit chain
CREATE TABLE scope3_audit_chain (
chain_id UUID PRIMARY KEY,
record_id UUID NOT NULL DEFAULT gen_random_uuid(),
sequence INTEGER NOT NULL,
record_type VARCHAR(50) NOT NULL,
previous_hash CHAR(64) NOT NULL,
record_hash CHAR(64) NOT NULL UNIQUE,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT uq_chain_sequence UNIQUE (chain_id, sequence)
);
-- Indice per verifica integrità chain
CREATE INDEX idx_audit_chain_id_seq
ON scope3_audit_chain (chain_id, sequence);
結論と次のステップ
スコープ 3 排出用の堅牢なパイプラインを構築することは、学術的な取り組みではありません。 それは重要なデータインフラストラクチャ これは必須になります この記事で見た重要な原則 企業規模に関係なく適用できます。
- 生データの不変性: SHA-256 ハッシュを保証するブロンズ レイヤー すべてのレビュー担当者はいつでも、たとえ何年経ってもデータを元のソースまで遡ることができます。
- 方法論的な進歩性: 支出ベースから始めて、 物質カテゴリーに対する活動ベースのアプローチは、GHG プロトコルによって推奨されています。 ショートカットではなく、それ自体です。
- 不確実性の定量化: 間隔を空けずに排出量を報告します 信頼性は不完全な情報です。不確実性の二次伝播 これは実装が簡単であり、レポートの信頼性の基礎となります。
- 検証可能な監査証跡: ハッシュ チェーンにより外部検証が可能になります 計算後にデータが変更されていないことを数学的に確認します。
- 生態系の統合: EcoVadis Carbon Data Network や CDP は、特に大規模なサプライ チェーンのデータ収集の負担を大幅に軽減します。
SaaS Italia S.r.l のケーススタディ中堅企業でもできることを示しています。 2 ~ 3 人のチームでスコープ 3 CSRD に準拠したレポートを 3 か月以内に作成します。 材料カテゴリーについては一次データ、残余については支出ベース。鍵となるのは、 優先順位付け: どこにでも完璧を求めるのではなく、努力を集中してください。 排出量が最も多い場所。
役立つリソース
- GHGプロトコルスコープ3規格: ghgprotocol.org/corporate-value-chain-scope-3-standard
- Climatiq API (排出係数データベース): climatiq.io
- EcoVadis カーボン データ ネットワーク: ecovadis.com/solutions/carbon
- ESRS E1 気候変動 (EU の公式文書): EFRAG ESRS E1
- エクシオベース 3.8 (EEIO 支出ベースの要因): exiobase.eu
シリーズの次の記事
次の記事で ESG レポート API: CSRD ワークフローとの統合 この記事で計算したスコープ 3 データの上に REST API レイヤーを構築し、実装します。 欧州指令で要求される形式に準拠し、ワークフローを統合するエンドポイント 監査人のデジタル署名付きレポートの承認。
データを形式で公開する方法についても説明します。 XBRL/iXBRL 提出用 CSRD レポートの必須形式である ESEF (European Single Electronic Format) に準拠 欧州証券取引所に上場。







