배출 파이프라인 범위 3 가치 사슬: 원시 데이터에서 감사 추적까지
Il 전체 배출량의 70~90% 기술이나 소프트웨어 회사가 자사 외부에 숨어 있는 경우 경계: 서비스로 구매한 클라우드 서버, 직원 노트북, 비즈니스 항공편, 코드 고객이 자신의 장치에서 실행하는 것입니다. 이는 배출 범위 3, 그리고 대부분의 경우 디지털 조직의 일부는 현재 존재하는 가장 복잡한 측정 문제를 나타냅니다. ESG 영역.
Scope 1(연료를 태운다)과 Scope 2(전기를 구입한다)와는 달리, Scope 3에서는 수백 개의 공급업체로부터 데이터를 수집하고, 이질적인 배출 계수를 적용하고, 매우 높은 수준의 불확실성을 관리하고 외부 감사자가 검증할 수 있는 감사 추적을 생성합니다. 와 함께 CSRD/ESRS E1 이는 대기업에 대해 Scope 3 보고를 의무화합니다. 2025~2026년, 그리고 2028년까지 중소기업이 십자선에 있는 상황에서 문제는 더 이상 학술적인 것이 아닙니다. 공학.
이 기사에서는 하나를 만듭니다. 완전한 파이프라인 Scope 3 배출량 계산을 위해 가치 사슬: 공급업체 데이터 수집을 위한 ETL 아키텍처부터 CDP와 같은 플랫폼과의 통합까지 활동 기반 및 지출 기반 계산부터 자동화를 위한 Airflow DAG까지 EcoVadis 검증자를 위한 불변의 감사 추적. 각 섹션에는 작업 Python 코드와 모범 사례가 포함되어 있습니다. 실제 상황에서 작동 테스트를 거쳤습니다.
무엇을 배울 것인가
- 15가지 GHG 프로토콜 범위 3 카테고리 및 소프트웨어/SaaS 회사와 관련된 카테고리
- 공급업체 데이터 수집을 위한 ETL/ELT 아키텍처: 설문지, CDP, EcoVadis 및 직접 API
- 활동 기반 vs 지출 기반: 공식, 정확성 및 언제 어떤 접근 방식을 사용해야 하는지
- Apache Airflow를 사용한 Python 파이프라인: 자동화되고 확장 가능한 범위 3 계산을 위한 DAG
- 데이터 품질 채점 및 추정치의 통계적 불확실성 전파
- 외부 검증자 추적성을 위한 SHA-256 해시 체인을 사용한 불변 감사 추적
- 가치 사슬 및 히트맵 우선 순위 범주에 대한 Sankey 다이어그램 시각화
- CSRD/ESRS E1 요구사항: 공개해야 할 내용과 세부사항
- 전체 사례 연구: 공급업체가 50개 있는 SaaS 회사, 엔드투엔드 범위 3 계산
- 배출계수에 대한 EcoVadis 탄소 데이터 네트워크 및 Climatiq API와의 통합
그린 소프트웨어 시리즈 — 10개 기사
| # | Articolo | 주제 |
|---|---|---|
| 1 | 그린 소프트웨어 기반 원칙 | 탄소효율, GSF, SCI |
| 2 | CodeCarbon: 코드 측정 | 측정, 대시보드, 최적화 |
| 3 | Climatiq API: 탄소 계산 | REST API, GHG 프로토콜, 범위 1-3 |
| 4 | 탄소 인식 SDK | 시간 이동, 위치 이동 |
| 5 | 범위 1-2-3: ESG 데이터 모델링 | 데이터 구조, 계산, 집계 |
| 6 | GreenOps: 탄소 인식 Kubernetes | 스케줄링, 확장, 모니터링 |
| 7 | 배출 파이프라인 범위 3 가치 사슬 | 이 기사 |
| 8 | ESG 보고 API: CSRD | API, 워크플로, 규정 준수 |
| 9 | 지속 가능한 아키텍처 패턴 | 저장, 캐싱, 배치 |
| 10 | AI 및 탄소: ML 교육 | ML 훈련, 최적화, Green AI |
15가지 GHG 프로토콜 범위 3 카테고리
Il GHG 프로토콜 기업 가치 사슬(범위 3) 표준 그것은 국제적인 틀이다 참고문헌은 2011년에 출판되었으며 현재 개정 중이며 업데이트는 2026년에 예상됩니다. 가치사슬의 간접배출 15개의 개별 카테고리, 두 가지로 정리 매크로그룹: 상류 (생산/서비스 제공 이전의 활동) e 하류 (고객에게 판매 후 활동).
15 범위 3 카테고리: 업스트림 및 다운스트림
| 고양이. | 이름 | 흐름 | SaaS/기술 관련성 |
|---|---|---|---|
| 1 | 구매한 상품 및 서비스 | 업스트림 | 높음: 서버 하드웨어, 소프트웨어 라이선스, 컨설팅 서비스 |
| 2 | 자본재 | 업스트림 | 미디어: 데이터 센터 장비, 노트북, 회사 전화 |
| 3 | 연료 및 에너지 관련 활동 | 업스트림 | 평균: 구매한 에너지 생산으로 인한 배출량(업스트림 범위 2) |
| 4 | 업스트림 운송 및 유통 | 업스트림 | 낮음: 사무실 및 데이터 센터로의 하드웨어 배송 |
| 5 | 운영 중 발생하는 폐기물 | 업스트림 | 낮음: WEEE, 종이, 사무 폐기물 |
| 6 | 출장 | 업스트림 | 높음: 분산된 팀을 위한 항공편, 호텔, 기차 |
| 7 | 직원 통근 | 업스트림 | 높음: 특히 하이브리드 팀의 경우 홈 오피스 여행 |
| 8 | 업스트림 임대 자산 | 업스트림 | 미디어: 임대 사무실(범위 1/2에 포함되지 않은 경우) |
| 9 | 다운스트림 운송 및 유통 | 하류 | 낮음: 물리적 미디어에 소프트웨어 배포(드물게) |
| 10 | 판매된 제품의 처리 | 하류 | N/A: 순수 소프트웨어에는 해당되지 않음 |
| 11 | 판매된 제품의 사용 | 하류 | 매우 높음: SaaS를 사용하는 고객이 소비하는 에너지 |
| 12 | 판매된 제품의 수명 종료 처리 | 하류 | 낮음: 수명이 다한 사용자 장치 |
| 13 | 다운스트림 임대 자산 | 하류 | 미디어: 고객에게 임대된 하드웨어 |
| 14 | 프랜차이즈 | 하류 | 해당 없음: 해당 없음 |
| 15 | 투자 | 하류 | 높음: 기업 포트폴리오, 스타트업에 대한 지분 투자 |
SaaS 또는 소프트웨어 개발 회사의 경우 일반적으로 가장 관련성이 높은 카테고리는 다음과 같습니다. 고양이. 1 (구매한 상품 및 서비스, 종종 가장 큰 품목), 고양이. 6 (출장), 고양이. 7 (직원 출퇴근) e 고양이. 11 (판매된 제품의 사용). 거기 이중 물질성 요청 CSRD에서는 영향의 관점에서 어떤 카테고리가 중요한지 식별해야 합니다. 회사의 환경적, 재정적 위험.
흔한 실수: Cat을 생략합니다. SaaS의 경우 11
많은 소프트웨어 회사에서는 카테고리 11("판매된 제품의 사용")이 적용되지 않는다는 가정하에 제외합니다. 실제로 모든 API 호출, 모든 쿼리, 고객이 귀하의 서비스를 실행하는 데 소비하는 모든 와트 소프트웨어는 Scope 3 Cat입니다. 11가지 배출은 귀하의 책임입니다. 수백만 명이 참여하는 SaaS의 경우 사용자 중 이것이 지배적인 카테고리일 수 있습니다. 계산 방법은 다음을 사용합니다. 탄소 강도(SCI) 소프트웨어 제공된 기능 단위를 곱합니다.
데이터 수집 파이프라인 아키텍처
가치 사슬 전체에서 신뢰할 수 있는 데이터를 수집하는 것은 모든 비즈니스의 가장 큰 병목 현상입니다. 범위 3 프로젝트. 파이프라인은 수동 설문지, 타사 ESG 플랫폼 등 이기종 소스를 관리해야 합니다. 부품, 공급업체와의 직접 API, 이메일을 통해 전송된 CSV 파일, 내부 ERP 데이터. 다음 아키텍처 패턴을 채택하다 3계층 ETL (브론즈/실버/골드) 레이크하우스에서 영감을 받은 디자인입니다.
파이프라인 아키텍처 범위 3: 브론즈/실버/골드
| 레이어 | 콘텐츠 | 기술 | 범위 |
|---|---|---|---|
| 브론즈(원시) | 공급업체의 불변 원시 데이터 | S3/GCS, 델타 레이크 | 감사 추적, 재생, 정보 소스 |
| 은(표준화) | 단위 및 통화별로 정규화된 데이터 | dbt, 스파크, 판다 | 배출량 계산, 배출계수와 결합 |
| 골드(보고) | 온실가스 카테고리별 총 배출량 | PostgreSQL, 빅쿼리 | 대시보드, CSRD 보고서, 검증자 |
브론즈 레이어는 필수입니다. 수신된 모든 데이터가 저장됩니다. 있는 그대로 타임스탬프 포함 수집, 콘텐츠의 SHA-256 해시 및 소스 메타데이터. 이는 가능성을 보장합니다. 배출 요인이나 방법론이 변경되면 손실 없이 전체 파이프라인을 재처리합니다. 원본 데이터.
# models/scope3_pipeline.py
# Struttura dati per la pipeline Scope 3
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import hashlib
import json
class DataSource(Enum):
SUPPLIER_QUESTIONNAIRE = "supplier_questionnaire"
CDP_API = "cdp_api"
ECOVADIS_API = "ecovadis_api"
ERP_EXPORT = "erp_export"
MANUAL_UPLOAD = "manual_upload"
CLIMATIQ_API = "climatiq_api"
class CalculationMethod(Enum):
ACTIVITY_BASED = "activity_based"
SPEND_BASED = "spend_based"
HYBRID = "hybrid"
SUPPLIER_SPECIFIC = "supplier_specific"
class DataQualityTier(Enum):
TIER_1 = "primary_data" # Dati primari dal supplier
TIER_2 = "secondary_sector" # Fattori settoriali
TIER_3 = "spend_estimated" # Stima basata su spesa
@dataclass
class RawSupplierData:
"""Layer Bronze: dato grezzo immutabile"""
supplier_id: str
source: DataSource
raw_payload: dict
received_at: datetime
content_hash: str = field(init=False)
def __post_init__(self):
payload_str = json.dumps(self.raw_payload, sort_keys=True)
self.content_hash = hashlib.sha256(
payload_str.encode()
).hexdigest()
@dataclass
class StandardizedActivity:
"""Layer Silver: attività normalizzata"""
activity_id: str
supplier_id: str
scope3_category: int # 1-15
activity_type: str # es. "freight_transport"
quantity: float
unit: str # es. "tonne.km"
reporting_period_start: datetime
reporting_period_end: datetime
source: DataSource
quality_tier: DataQualityTier
emission_factor_id: Optional[str] = None
uncertainty_pct: float = 0.0
raw_data_hash: str = "" # Ref al Bronze layer
@dataclass
class EmissionResult:
"""Layer Gold: emissione calcolata"""
result_id: str
activity_id: str
scope3_category: int
co2e_tonnes: float
calculation_method: CalculationMethod
emission_factor_source: str # es. "climatiq:IPCC_2021"
emission_factor_value: float
quality_tier: DataQualityTier
uncertainty_pct: float
calculated_at: datetime
pipeline_version: str
audit_hash: str = field(init=False)
def __post_init__(self):
audit_data = {
"result_id": self.result_id,
"activity_id": self.activity_id,
"co2e_tonnes": self.co2e_tonnes,
"emission_factor_source": self.emission_factor_source,
"calculated_at": self.calculated_at.isoformat(),
"pipeline_version": self.pipeline_version,
}
self.audit_hash = hashlib.sha256(
json.dumps(audit_data, sort_keys=True).encode()
).hexdigest()
공급업체 데이터 통합: CDP, EcoVadis 및 Direct API
공급업체로부터의 데이터 수집은 품질 수준과 품질 수준에 따라 다양한 채널을 통해 이루어집니다. 매우 다른 자동화. 그만큼 탄소정보공개프로젝트(CDP) 에서 데이터를 수집합니다. 24,000개 이상의 회사를 보유하고 있으며 검증된 보고서에 액세스할 수 있는 API를 공개합니다. 에코바디스 2025년 48,000명 이상의 GHG 기자들이 데이터를 공유하는 탄소 데이터 네트워크(Carbon Data Network)를 시작했습니다. 표준화. 마지막으로, 많은 대규모 공급업체는 직접 공유를 위해 독점 API를 공개합니다. 그 발자국.
# collectors/supplier_collector.py
# Integrazione con fonti dati supplier
import httpx
import asyncio
from typing import AsyncGenerator
from datetime import datetime
from models.scope3_pipeline import RawSupplierData, DataSource
class ClimatiqEmissionFactors:
"""Client per Climatiq API - emission factors database"""
BASE_URL = "https://beta3.api.climatiq.io"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async def get_emission_factor(
self,
activity_id: str,
year: int = 2024,
region: str = "IT"
) -> dict:
"""Recupera fattore di emissione per attività specifica"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/estimate",
headers=self.headers,
json={
"emission_factor": {
"activity_id": activity_id,
"data_version": "^21",
"year": year,
"region": region
},
"parameters": {
"money": 1.0,
"money_unit": "eur"
}
}
)
response.raise_for_status()
return response.json()
async def batch_estimate(
self,
activities: list[dict]
) -> list[dict]:
"""Stima batch per multiple attività - ottimizza le API call"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/batch",
headers=self.headers,
json={"batch": activities},
timeout=30.0
)
response.raise_for_status()
return response.json().get("results", [])
class EcoVadisCollector:
"""Raccoglie dati Scope 3 dalla piattaforma EcoVadis"""
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
async def fetch_supplier_carbon_data(
self,
supplier_ecovadis_id: str,
reporting_year: int
) -> RawSupplierData:
"""Recupera dati carbonio per un supplier dalla Carbon Data Network"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/v1/suppliers/{supplier_ecovadis_id}/carbon",
headers={"X-API-Key": self.api_key},
params={"year": reporting_year},
timeout=15.0
)
if response.status_code == 404:
# Supplier non ha condiviso dati primari
return self._create_no_data_record(supplier_ecovadis_id)
response.raise_for_status()
payload = response.json()
return RawSupplierData(
supplier_id=supplier_ecovadis_id,
source=DataSource.ECOVADIS_API,
raw_payload=payload,
received_at=datetime.utcnow()
)
def _create_no_data_record(self, supplier_id: str) -> RawSupplierData:
return RawSupplierData(
supplier_id=supplier_id,
source=DataSource.ECOVADIS_API,
raw_payload={"status": "no_data", "supplier_id": supplier_id},
received_at=datetime.utcnow()
)
class CDPCollector:
"""Raccoglie dati da CDP (Carbon Disclosure Project)"""
CDP_API_URL = "https://api.cdp.net/v1"
def __init__(self, api_token: str):
self.api_token = api_token
async def search_supplier(
self,
company_name: str,
year: int = 2024
) -> RawSupplierData | None:
"""Cerca un supplier nel database CDP e recupera dati GHG"""
async with httpx.AsyncClient() as client:
# Ricerca azienda
search_resp = await client.get(
f"{self.CDP_API_URL}/companies/search",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"q": company_name, "year": year}
)
if not search_resp.json().get("results"):
return None
company_id = search_resp.json()["results"][0]["id"]
# Recupera dati GHG disclosure
ghg_resp = await client.get(
f"{self.CDP_API_URL}/companies/{company_id}/ghg-emissions",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"year": year}
)
if ghg_resp.status_code != 200:
return None
return RawSupplierData(
supplier_id=company_name,
source=DataSource.CDP_API,
raw_payload=ghg_resp.json(),
received_at=datetime.utcnow()
)
class BronzeLayerStorage:
"""Salvataggio immutabile nel layer Bronze"""
def __init__(self, storage_client, bucket: str):
self.storage = storage_client
self.bucket = bucket
async def store(self, raw_data: RawSupplierData) -> str:
"""Salva dato grezzo con path deterministico basato su hash"""
path = (
f"scope3/bronze/"
f"{raw_data.received_at.year}/"
f"{raw_data.received_at.month:02d}/"
f"{raw_data.supplier_id}/"
f"{raw_data.content_hash}.json"
)
await self.storage.upload_json(
bucket=self.bucket,
path=path,
data={
"supplier_id": raw_data.supplier_id,
"source": raw_data.source.value,
"received_at": raw_data.received_at.isoformat(),
"content_hash": raw_data.content_hash,
"payload": raw_data.raw_payload
}
)
return path
활동 기반과 지출 기반: 올바른 방법 선택
GHG 프로토콜은 Scope 3에 대한 네 가지 계산 방법을 정의합니다. 실제로는 그렇습니다. 두 가지 근본적인 접근 방식으로 축소됩니다. 활동 기반 e 지출 기반. 선택은 데이터의 가용성과 카테고리의 중요성에 따라 달라집니다. 그리고 공급자와의 관계의 성숙도.
방법론적 비교: 활동 기반과 지출 기반
| 크기 | 활동 기반 | 지출 기반 |
|---|---|---|
| 공식 | 수량 × 배출계수(단위/kg CO2e) | 지출(EUR) × EEIO 계수(kg CO2e/EUR) |
| 정확성 | 높음(기본 데이터의 경우 ±5-15%) | 낮음-중간(±50-100%) |
| 요청된 데이터 | 물리량(kg, km, kWh, t) | 회계 송장만(EUR, USD) |
| 소스 EF | Climatiq, IPCC, DEFRA, 에코인벤트 | USEEIO, EXIOBASE, WIOD |
| 언제 사용하는가 | 자재 카테고리, 대형 공급업체 | 킥오프, 소규모 공급업체, Cat. <1% |
| 수집 노력 | 높음: 공급업체 협력 필요 | 낮음: 데이터가 이미 ERP/SAP에 있음 |
| CSRD 수용성 | 재료 카테고리에서 선호되는 항목 | 초기 프록시로 승인됨 |
최적의 전략은 하나의 접근 방식입니다 프로그레시브 하이브리드: 시작해 볼까요 지출 기반으로 전체 가치 사슬에 대한 빠른 기준선을 확보한 다음 점진적으로 마이그레이션됩니다. 식별된 자재 카테고리에 대한 활동 기반을 지향합니다. GHG 프로토콜은 세 가지 수준을 정의합니다. 이러한 진행 상황과 정확히 일치하는 데이터 품질(계층 1, 2, 3)입니다.
# calculators/emission_calculator.py
# Calcolo emissioni activity-based e spend-based
from dataclasses import dataclass
from typing import Optional
import math
# ============================================================
# EMISSION FACTORS DATABASE (simplified)
# In produzione: usa Climatiq API o database ecoinvent
# ============================================================
EMISSION_FACTORS: dict[str, dict] = {
# Cat. 1: Purchased goods & services
"cloud_compute_kwh": {
"value": 0.233, # kg CO2e/kWh (IT grid mix 2024)
"unit": "kWh",
"source": "IEA 2024",
"uncertainty_pct": 10.0
},
"hardware_laptop": {
"value": 350.0, # kg CO2e/unit (embodied carbon)
"unit": "unit",
"source": "Dell 2024 PCF",
"uncertainty_pct": 20.0
},
# Cat. 6: Business travel
"flight_economy_short": {
"value": 0.255, # kg CO2e/passenger.km
"unit": "passenger.km",
"source": "DEFRA 2024",
"uncertainty_pct": 15.0
},
"flight_economy_long": {
"value": 0.195,
"unit": "passenger.km",
"source": "DEFRA 2024",
"uncertainty_pct": 15.0
},
# Cat. 7: Employee commuting
"car_average": {
"value": 0.170, # kg CO2e/km
"unit": "km",
"source": "DEFRA 2024",
"uncertainty_pct": 12.0
},
"public_transport_it": {
"value": 0.048,
"unit": "passenger.km",
"source": "Ispra 2024",
"uncertainty_pct": 18.0
},
}
# EEIO Spend-based factors (EXIOBASE 3.8)
# kg CO2e per EUR di spesa per categoria merceologica
EEIO_FACTORS: dict[str, float] = {
"it_services": 0.312, # IT e telecomunicazioni
"professional_services": 0.198, # Consulenza, legale, etc.
"office_supplies": 0.445,
"cloud_hosting": 0.287,
"marketing": 0.231,
"utilities": 0.892,
"hr_services": 0.167,
"travel_accommodation": 0.521,
}
def calculate_activity_based(
activity_type: str,
quantity: float,
custom_ef: Optional[float] = None
) -> tuple[float, float]:
"""
Calcola emissioni con metodo activity-based.
Returns:
(co2e_kg, uncertainty_pct)
"""
if custom_ef is not None:
return quantity * custom_ef, 30.0 # alta incertezza EF custom
ef_data = EMISSION_FACTORS.get(activity_type)
if not ef_data:
raise ValueError(f"Emission factor non trovato: {activity_type}")
co2e_kg = quantity * ef_data["value"]
uncertainty = ef_data["uncertainty_pct"]
return co2e_kg, uncertainty
def calculate_spend_based(
spend_eur: float,
procurement_category: str,
inflation_correction: float = 1.0
) -> tuple[float, float]:
"""
Calcola emissioni con metodo spend-based (EEIO).
Args:
spend_eur: importo in EUR
procurement_category: categoria merceologica EEIO
inflation_correction: fattore per correggere inflazione vs anno base EEIO
Returns:
(co2e_kg, uncertainty_pct)
"""
eeio_factor = EEIO_FACTORS.get(procurement_category)
if not eeio_factor:
raise ValueError(f"EEIO factor non trovato: {procurement_category}")
# Corregge per inflazione (EEIO factors spesso in EUR 2015)
adjusted_spend = spend_eur / inflation_correction
co2e_kg = adjusted_spend * eeio_factor
# Lo spend-based ha incertezza intrinsecamente alta
uncertainty = 75.0
return co2e_kg, uncertainty
def propagate_uncertainty(
values: list[float],
uncertainties_pct: list[float]
) -> float:
"""
Propagazione incertezza quadratica (somma in quadratura).
Valida quando le incertezze sono indipendenti.
Returns:
uncertainty_pct sul totale
"""
weighted_variance_sum = sum(
(v * u/100) ** 2
for v, u in zip(values, uncertainties_pct)
)
total = sum(values)
if total == 0:
return 0.0
combined_std = math.sqrt(weighted_variance_sum)
return (combined_std / total) * 100
def calculate_category_total(
activities: list[dict]
) -> dict:
"""
Calcola totale categoria Scope 3 con propagazione incertezza.
activities: lista di {method, value_kg, uncertainty_pct}
"""
if not activities:
return {"total_co2e_kg": 0.0, "uncertainty_pct": 0.0}
values = [a["value_kg"] for a in activities]
uncertainties = [a["uncertainty_pct"] for a in activities]
total_co2e = sum(values)
combined_uncertainty = propagate_uncertainty(values, uncertainties)
# Qualità aggregata: peggiore del gruppo determina il tier
quality_tiers = [a.get("quality_tier", "TIER_3") for a in activities]
dominant_tier = min(quality_tiers) # TIER_1 < TIER_2 < TIER_3 lexicograficamente
return {
"total_co2e_kg": total_co2e,
"total_co2e_tonnes": total_co2e / 1000,
"uncertainty_pct": combined_uncertainty,
"uncertainty_kg": total_co2e * combined_uncertainty / 100,
"dominant_quality_tier": dominant_tier,
"activity_count": len(activities)
}
파이프라인 공기 흐름: 자동화된 범위 3 계산을 위한 DAG
범위 3 파이프라인 오케스트레이션에는 다음을 관리하는 잘 구성된 DAG가 필요합니다. 전체 수명주기: 데이터 수집, 표준화, 배출량 계산, 품질 점검 골드 레이어에 게시합니다. DAG는 다음과 같아야 합니다. 멱등성 (실행 가능 부작용 없이 여러 번) e 재설정 가능 경우에 부분적인 실패.
# dags/scope3_pipeline_dag.py
# Apache Airflow DAG per pipeline emissioni Scope 3
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task, task_group
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import Variable
import json
import logging
logger = logging.getLogger(__name__)
# ============================================================
# CONFIGURAZIONE DAG
# ============================================================
SCOPE3_DAG_CONFIG = {
"reporting_year": 2024,
"companies": [
{"id": "S001", "name": "AWS", "tier": "TIER_1", "source": "ecovadis"},
{"id": "S002", "name": "Microsoft Azure", "tier": "TIER_1", "source": "cdp"},
{"id": "S003", "name": "Supplier_XYZ", "tier": "TIER_2", "source": "questionnaire"},
# ... altri supplier
],
"categories_enabled": [1, 2, 3, 6, 7, 11, 15],
"quality_threshold_pct": 80.0,
"alert_email": "esg-team@company.com"
}
default_args = {
"owner": "esg-team",
"depends_on_past": False,
"email_on_failure": True,
"email": [SCOPE3_DAG_CONFIG["alert_email"]],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="scope3_emissions_pipeline",
default_args=default_args,
description="Pipeline calcolo emissioni Scope 3 value chain",
schedule_interval="@quarterly", # Esecuzione trimestrale
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["emissions", "scope3", "esg", "ghg-protocol"],
max_active_runs=1, # Serializza: mai due calcoli in parallelo
) as dag:
# ============================================================
# FASE 1: RACCOLTA DATI SUPPLIER (in parallelo per supplier)
# ============================================================
@task_group(group_id="data_collection")
def collect_supplier_data():
@task(task_id="fetch_ecovadis_suppliers")
def fetch_ecovadis() -> list[dict]:
"""Raccoglie dati da EcoVadis Carbon Data Network"""
from collectors.supplier_collector import EcoVadisCollector
import asyncio
api_key = Variable.get("ECOVADIS_API_KEY", deserialize_json=False)
collector = EcoVadisCollector(api_key, "https://api.ecovadis.com")
suppliers_ecovadis = [
s for s in SCOPE3_DAG_CONFIG["companies"]
if s["source"] == "ecovadis"
]
results = []
for supplier in suppliers_ecovadis:
raw = asyncio.run(
collector.fetch_supplier_carbon_data(
supplier["id"],
SCOPE3_DAG_CONFIG["reporting_year"]
)
)
results.append({
"supplier_id": raw.supplier_id,
"content_hash": raw.content_hash,
"status": "fetched",
"has_data": raw.raw_payload.get("status") != "no_data"
})
logger.info(f"EcoVadis - Supplier {supplier['id']}: fetched")
return results
@task(task_id="fetch_cdp_suppliers")
def fetch_cdp() -> list[dict]:
"""Raccoglie dati verificati da CDP"""
from collectors.supplier_collector import CDPCollector
import asyncio
api_token = Variable.get("CDP_API_TOKEN")
collector = CDPCollector(api_token)
suppliers_cdp = [
s for s in SCOPE3_DAG_CONFIG["companies"]
if s["source"] == "cdp"
]
results = []
for supplier in suppliers_cdp:
raw = asyncio.run(
collector.search_supplier(
supplier["name"],
SCOPE3_DAG_CONFIG["reporting_year"]
)
)
if raw:
results.append({
"supplier_id": supplier["id"],
"content_hash": raw.content_hash,
"status": "fetched",
"has_data": True
})
else:
results.append({
"supplier_id": supplier["id"],
"status": "not_found",
"has_data": False
})
return results
@task(task_id="process_manual_questionnaires")
def process_questionnaires() -> list[dict]:
"""Processa questionari manuali caricati in S3"""
# In produzione: legge da bucket S3 o SharePoint
# Qui restituiamo dati di esempio
return [{
"supplier_id": "S003",
"status": "processed",
"has_data": True,
"scope3_cat1_tco2e": 45.2,
"scope3_cat6_tco2e": 12.8
}]
ev = fetch_ecovadis()
cdp = fetch_cdp()
q = process_questionnaires()
return [ev, cdp, q]
# ============================================================
# FASE 2: STANDARDIZZAZIONE E CALCOLO EMISSIONI
# ============================================================
@task(task_id="standardize_activities")
def standardize_activities(collection_results: list) -> list[dict]:
"""Normalizza tutti i dati in unità fisiche standard"""
from normalizers.activity_normalizer import ActivityNormalizer
normalizer = ActivityNormalizer()
standardized = []
for batch in collection_results:
for result in batch:
if result.get("has_data"):
activities = normalizer.normalize(result)
standardized.extend(activities)
logger.info(f"Standardizzate {len(standardized)} attività")
return standardized
@task(task_id="calculate_emissions")
def calculate_emissions(activities: list[dict]) -> list[dict]:
"""Calcola emissioni CO2e per ogni attività standardizzata"""
from calculators.emission_calculator import (
calculate_activity_based,
calculate_spend_based
)
from models.scope3_pipeline import CalculationMethod
results = []
for activity in activities:
if activity["method"] == "activity_based":
co2e_kg, uncertainty = calculate_activity_based(
activity["activity_type"],
activity["quantity"]
)
method = CalculationMethod.ACTIVITY_BASED
else:
co2e_kg, uncertainty = calculate_spend_based(
activity["spend_eur"],
activity["procurement_category"]
)
method = CalculationMethod.SPEND_BASED
results.append({
**activity,
"co2e_kg": co2e_kg,
"co2e_tonnes": co2e_kg / 1000,
"uncertainty_pct": uncertainty,
"calculation_method": method.value,
"calculated_at": datetime.utcnow().isoformat()
})
return results
# ============================================================
# FASE 3: DATA QUALITY CHECK
# ============================================================
@task(task_id="data_quality_check")
def data_quality_check(results: list[dict]) -> dict:
"""Verifica qualità dati e genera score per categoria"""
from quality.data_quality_scorer import DataQualityScorer
scorer = DataQualityScorer()
quality_report = scorer.score_results(results)
if quality_report["overall_score"] < SCOPE3_DAG_CONFIG["quality_threshold_pct"]:
logger.warning(
f"Quality score sotto soglia: {quality_report['overall_score']}%"
)
return quality_report
# ============================================================
# FASE 4: AUDIT TRAIL E PUBBLICAZIONE GOLD LAYER
# ============================================================
@task(task_id="create_audit_trail")
def create_audit_trail(
results: list[dict],
quality_report: dict
) -> str:
"""Crea audit trail immutabile con hash chain"""
from audit.hash_chain import HashChain
chain = HashChain()
chain_id = chain.create_chain(
calculation_results=results,
quality_report=quality_report,
pipeline_version="2.1.0",
methodology="GHG_Protocol_Scope3_2011",
reporting_standard="CSRD_ESRS_E1"
)
logger.info(f"Audit trail creato: {chain_id}")
return chain_id
@task(task_id="publish_gold_layer")
def publish_gold_layer(
results: list[dict],
audit_chain_id: str
) -> None:
"""Pubblica dati aggregati nel Gold layer (PostgreSQL)"""
hook = PostgresHook(postgres_conn_id="emissions_db")
for result in results:
hook.run(
"""
INSERT INTO scope3_emissions_gold (
supplier_id, scope3_category, co2e_tonnes,
calculation_method, uncertainty_pct,
quality_tier, audit_chain_id,
reporting_year, published_at
) VALUES (
%(supplier_id)s, %(scope3_category)s, %(co2e_tonnes)s,
%(calculation_method)s, %(uncertainty_pct)s,
%(quality_tier)s, %(audit_chain_id)s,
%(reporting_year)s, NOW()
)
ON CONFLICT (supplier_id, scope3_category, reporting_year)
DO UPDATE SET
co2e_tonnes = EXCLUDED.co2e_tonnes,
updated_at = NOW()
""",
parameters={
**result,
"audit_chain_id": audit_chain_id,
"reporting_year": SCOPE3_DAG_CONFIG["reporting_year"]
}
)
logger.info(f"Pubblicati {len(results)} record nel Gold layer")
# ============================================================
# WIRING DEL DAG
# ============================================================
collection_results = collect_supplier_data()
standardized = standardize_activities(collection_results)
emission_results = calculate_emissions(standardized)
quality = data_quality_check(emission_results)
chain_id = create_audit_trail(emission_results, quality)
publish_gold_layer(emission_results, chain_id)
데이터 품질 점수 및 불확실성 전파
GHG 프로토콜 범위 3 표준은 해당 고양이를 명시적으로 인식합니다. 1~15는 아님 결코 확실하게 알려지지 않았습니다. 품질 보고에는 추정치가 포함되어야 합니다. ~의양적 불확실성 각 카테고리와 연관되어 있습니다. IPCC는 공식화했습니다. 모범 사례 지침의 불확실성 전파 방법.
# quality/data_quality_scorer.py
# Scoring qualità dati Scope 3
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import math
from datetime import datetime, timedelta
class QualityDimension(Enum):
COMPLETENESS = "completeness"
ACCURACY = "accuracy"
TIMELINESS = "timeliness"
VERIFICATION = "verification"
GRANULARITY = "granularity"
@dataclass
class QualityScore:
dimension: QualityDimension
score: float # 0-100
weight: float # peso nel calcolo aggregato
notes: str = ""
def score_supplier_data_quality(
supplier: dict,
reference_date: datetime = None
) -> dict[str, float]:
"""
Calcola score qualità multi-dimensionale per i dati di un supplier.
Basato su GHG Protocol Data Quality Guidance.
"""
if reference_date is None:
reference_date = datetime.utcnow()
scores = []
# 1. COMPLETENESS: quante delle categorie richieste sono presenti?
required_fields = [
"scope3_cat1_tco2e", "scope3_cat6_tco2e", "scope3_cat7_tco2e"
]
present = sum(1 for f in required_fields if supplier.get(f) is not None)
completeness_score = (present / len(required_fields)) * 100
scores.append(QualityScore(
dimension=QualityDimension.COMPLETENESS,
score=completeness_score,
weight=0.30
))
# 2. TIMELINESS: quanto sono recenti i dati?
data_year = supplier.get("reporting_year", 2020)
current_year = reference_date.year
age_years = current_year - data_year
if age_years <= 1:
timeliness_score = 100.0
elif age_years == 2:
timeliness_score = 75.0
elif age_years == 3:
timeliness_score = 50.0
else:
timeliness_score = 20.0
scores.append(QualityScore(
dimension=QualityDimension.TIMELINESS,
score=timeliness_score,
weight=0.20
))
# 3. VERIFICATION: i dati sono stati verificati da terze parti?
verification_level = supplier.get("verification", "none")
verification_score = {
"independent_assured": 100.0, # GHG verificato da auditor indipendente
"limited_assurance": 80.0, # Limited assurance
"internal_reviewed": 60.0, # Solo review interna
"supplier_declared": 40.0, # Auto-dichiarazione
"estimated": 20.0, # Stima spend-based
"none": 0.0
}.get(verification_level, 20.0)
scores.append(QualityScore(
dimension=QualityDimension.VERIFICATION,
score=verification_score,
weight=0.30
))
# 4. GRANULARITY: attività-specifico o aggregato?
data_type = supplier.get("data_type", "aggregated")
granularity_score = {
"site_specific": 100.0, # Dati per sito produttivo
"product_specific": 90.0, # PCF per prodotto/servizio
"supplier_specific": 70.0, # Dato totale supplier
"sector_average": 40.0, # Media settoriale
"aggregated": 20.0
}.get(data_type, 20.0)
scores.append(QualityScore(
dimension=QualityDimension.GRANULARITY,
score=granularity_score,
weight=0.20
))
# Calcolo score aggregato ponderato
overall = sum(s.score * s.weight for s in scores)
# Mappa score a Tier GHG Protocol
if overall >= 80:
tier = "TIER_1"
uncertainty_band_pct = 15.0
elif overall >= 50:
tier = "TIER_2"
uncertainty_band_pct = 40.0
else:
tier = "TIER_3"
uncertainty_band_pct = 75.0
return {
"overall_score": round(overall, 1),
"tier": tier,
"uncertainty_band_pct": uncertainty_band_pct,
"dimension_scores": {
s.dimension.value: round(s.score, 1)
for s in scores
}
}
def monte_carlo_uncertainty(
base_estimate_tco2e: float,
uncertainty_pct: float,
n_simulations: int = 10_000
) -> dict:
"""
Stima intervallo di confidenza con simulazione Monte Carlo.
Per reporting CSRD si raccomanda almeno 1.000 simulazioni.
"""
import random
# Distribuzione log-normale (emissioni non possono essere negative)
sigma = math.log(1 + (uncertainty_pct / 100) ** 2) ** 0.5
mu = math.log(base_estimate_tco2e) - sigma ** 2 / 2
simulated = [
math.exp(random.gauss(mu, sigma))
for _ in range(n_simulations)
]
simulated_sorted = sorted(simulated)
p05 = simulated_sorted[int(n_simulations * 0.05)]
p50 = simulated_sorted[int(n_simulations * 0.50)]
p95 = simulated_sorted[int(n_simulations * 0.95)]
return {
"base_estimate_tco2e": base_estimate_tco2e,
"p05_tco2e": round(p05, 2),
"p50_tco2e": round(p50, 2),
"p95_tco2e": round(p95, 2),
"confidence_interval_90pct": {
"lower": round(p05, 2),
"upper": round(p95, 2)
},
"coefficient_of_variation": round(
(p95 - p05) / (2 * p50) * 100, 1
)
}
해시 체인을 사용한 불변 감사 추적
La 엔드투엔드 추적성 이는 가장 중요한 요구 사항 중 하나입니다. 검증 가능한 Scope 3 보고. 외부 감사자는 보고서의 모든 숫자를 추적할 수 있어야 합니다. 모든 변환 단계를 거쳐 데이터의 기본 소스에 도달합니다. 에이 해시 체인 블록체인 기술에서 영감을 얻었지만 복잡하지는 않습니다. 분산)은 감사 추적의 불변성을 보장합니다.
# audit/hash_chain.py
# Audit trail immutabile per emissioni Scope 3
import hashlib
import json
import uuid
from datetime import datetime
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class AuditRecord:
"""Singolo record nell'audit chain"""
def __init__(
self,
record_type: str,
payload: dict,
previous_hash: str,
chain_id: str,
sequence: int
):
self.record_id = str(uuid.uuid4())
self.record_type = record_type
self.payload = payload
self.previous_hash = previous_hash
self.chain_id = chain_id
self.sequence = sequence
self.created_at = datetime.utcnow().isoformat()
self.record_hash = self._compute_hash()
def _compute_hash(self) -> str:
"""SHA-256 hash di tutti i campi del record (eccetto il hash stesso)"""
data = {
"record_id": self.record_id,
"record_type": self.record_type,
"chain_id": self.chain_id,
"sequence": self.sequence,
"previous_hash": self.previous_hash,
"created_at": self.created_at,
"payload_hash": hashlib.sha256(
json.dumps(self.payload, sort_keys=True, default=str).encode()
).hexdigest()
}
return hashlib.sha256(
json.dumps(data, sort_keys=True).encode()
).hexdigest()
def to_dict(self) -> dict:
return {
"record_id": self.record_id,
"record_type": self.record_type,
"chain_id": self.chain_id,
"sequence": self.sequence,
"previous_hash": self.previous_hash,
"record_hash": self.record_hash,
"created_at": self.created_at,
"payload": self.payload
}
class HashChain:
"""
Hash chain per audit trail immutabile emissioni Scope 3.
Ogni record contiene l'hash del record precedente,
rendendo impossibile modificare un record senza invalidare
tutti i record successivi.
"""
GENESIS_HASH = "0" * 64 # Hash del primo record della chain
def __init__(self, db_client=None):
self.db = db_client
self.records: list[AuditRecord] = []
def create_chain(
self,
calculation_results: list[dict],
quality_report: dict,
pipeline_version: str,
methodology: str,
reporting_standard: str
) -> str:
"""Crea una nuova chain per un calcolo Scope 3 completo"""
chain_id = str(uuid.uuid4())
previous_hash = self.GENESIS_HASH
# Record 1: Metadati della pipeline
pipeline_record = AuditRecord(
record_type="PIPELINE_METADATA",
payload={
"version": pipeline_version,
"methodology": methodology,
"reporting_standard": reporting_standard,
"calculation_timestamp": datetime.utcnow().isoformat(),
"total_activities": len(calculation_results)
},
previous_hash=previous_hash,
chain_id=chain_id,
sequence=0
)
self.records.append(pipeline_record)
previous_hash = pipeline_record.record_hash
# Record 2: Quality report
quality_record = AuditRecord(
record_type="QUALITY_ASSESSMENT",
payload=quality_report,
previous_hash=previous_hash,
chain_id=chain_id,
sequence=1
)
self.records.append(quality_record)
previous_hash = quality_record.record_hash
# Record 3..N: Singoli risultati di emissione
for i, result in enumerate(calculation_results):
emission_record = AuditRecord(
record_type="EMISSION_CALCULATION",
payload={
"supplier_id": result.get("supplier_id"),
"scope3_category": result.get("scope3_category"),
"co2e_tonnes": result.get("co2e_tonnes"),
"calculation_method": result.get("calculation_method"),
"emission_factor_source": result.get("emission_factor_source"),
"uncertainty_pct": result.get("uncertainty_pct"),
"quality_tier": result.get("quality_tier")
},
previous_hash=previous_hash,
chain_id=chain_id,
sequence=2 + i
)
self.records.append(emission_record)
previous_hash = emission_record.record_hash
# Persist su DB (o storage immutabile)
if self.db:
self._persist_chain(chain_id)
logger.info(
f"Chain {chain_id} creata con {len(self.records)} record. "
f"Final hash: {previous_hash[:16]}..."
)
return chain_id
def verify_chain_integrity(self, chain_id: str) -> bool:
"""
Verifica che nessun record sia stato alterato.
Percorre la chain ricomputando ogni hash.
"""
records = self._load_chain(chain_id)
if not records:
return False
expected_previous = self.GENESIS_HASH
for record_dict in records:
# Ricomputa hash
record = AuditRecord(
record_type=record_dict["record_type"],
payload=record_dict["payload"],
previous_hash=record_dict["previous_hash"],
chain_id=record_dict["chain_id"],
sequence=record_dict["sequence"]
)
if record_dict["previous_hash"] != expected_previous:
logger.error(
f"Chain corrotta al record {record_dict['sequence']}: "
f"previous_hash non corrisponde"
)
return False
expected_previous = record.record_hash
return True
def _persist_chain(self, chain_id: str) -> None:
"""Salva tutti i record della chain nel DB"""
for record in self.records:
self.db.insert("scope3_audit_chain", record.to_dict())
def _load_chain(self, chain_id: str) -> list[dict]:
"""Carica i record della chain dal DB in ordine di sequenza"""
if not self.db:
return [r.to_dict() for r in self.records]
return self.db.query(
"SELECT * FROM scope3_audit_chain WHERE chain_id = %s ORDER BY sequence",
[chain_id]
)
시각화: Sankey 다이어그램 및 히트맵 카테고리
잘 구축된 Scope 3 파이프라인은 데이터를 렌더링하는 시각화도 생성해야 합니다. 기술적, 비기술적 이해관계자 모두가 이해할 수 있습니다. 그만큼 생키 다이어그램 그것은이다 가치 사슬을 따라 배출 흐름을 보여주는 이상적인 도구입니다. 히트맵 대부분의 재료 카테고리를 신속하게 식별할 수 있습니다. 데이터 품질이 낮은 사람들.
# visualizations/scope3_charts.py
# Generazione Sankey diagram e heatmap Scope 3
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
from typing import Optional
def create_scope3_sankey(
emission_data: list[dict],
title: str = "Scope 3 Value Chain Emissions"
) -> go.Figure:
"""
Crea Sankey diagram per visualizzare flussi emissioni Scope 3.
Struttura: Supplier -> Categoria S3 -> Totale Scope 3
"""
# Raccoglie nodi unici
suppliers = list(set(d["supplier_id"] for d in emission_data))
categories = list(set(f"Cat. {d['scope3_category']}" for d in emission_data))
all_nodes = suppliers + categories + ["Scope 3 Total"]
node_index = {node: i for i, node in enumerate(all_nodes)}
# Costruisce link source->target->value
source_indices = []
target_indices = []
values = []
link_labels = []
for record in emission_data:
supplier = record["supplier_id"]
category = f"Cat. {record['scope3_category']}"
tco2e = record["co2e_tonnes"]
# Supplier -> Categoria
source_indices.append(node_index[supplier])
target_indices.append(node_index[category])
values.append(tco2e)
link_labels.append(f"{tco2e:.1f} tCO2e")
# Categoria -> Total
for cat in categories:
cat_total = sum(
d["co2e_tonnes"]
for d in emission_data
if f"Cat. {d['scope3_category']}" == cat
)
source_indices.append(node_index[cat])
target_indices.append(node_index["Scope 3 Total"])
values.append(cat_total)
link_labels.append(f"{cat_total:.1f} tCO2e")
# Colori nodi
node_colors = (
["#2196F3"] * len(suppliers) + # Blu per supplier
["#FF9800"] * len(categories) + # Arancione per categorie
["#4CAF50"] # Verde per totale
)
fig = go.Figure(go.Sankey(
arrangement="snap",
node=dict(
pad=20,
thickness=20,
line=dict(color="white", width=0.5),
label=all_nodes,
color=node_colors,
hovertemplate="{label}
tCO2e: {value:.1f}<extra></extra>"
),
link=dict(
source=source_indices,
target=target_indices,
value=values,
label=link_labels,
color="rgba(100,100,100,0.3)"
)
))
fig.update_layout(
title_text=title,
font_size=12,
height=600,
paper_bgcolor="rgba(0,0,0,0)",
plot_bgcolor="rgba(0,0,0,0)"
)
return fig
def create_category_heatmap(
emission_data: list[dict]
) -> go.Figure:
"""
Heatmap: asse X = categoria Scope 3, asse Y = qualità dato.
Colore = tCO2e. Aiuta a prioritizzare effort raccolta dati.
"""
df = pd.DataFrame(emission_data)
# Aggrega per categoria e tier qualità
pivot = df.pivot_table(
values="co2e_tonnes",
index="quality_tier",
columns="scope3_category",
aggfunc="sum",
fill_value=0
)
# Ordina tier (TIER_1 migliore in alto)
tier_order = ["TIER_1", "TIER_2", "TIER_3"]
pivot = pivot.reindex(
[t for t in tier_order if t in pivot.index]
)
fig = go.Figure(go.Heatmap(
z=pivot.values,
x=[f"Cat. {c}" for c in pivot.columns],
y=list(pivot.index),
colorscale="RdYlGn_r", # Rosso = alta emissione (critico)
text=pivot.values.round(1),
texttemplate="%{text} t",
textfont={"size": 11},
hovertemplate="Categoria: %{x}
Tier: %{y}
%{z:.1f} tCO2e<extra></extra>",
colorbar=dict(title="tCO2e")
))
fig.update_layout(
title="Heatmap Scope 3: Emissioni per Categoria e Qualità Dato",
xaxis_title="Categoria GHG Protocol",
yaxis_title="Tier Qualità Dato",
height=350,
margin=dict(l=80, r=20, t=60, b=60)
)
return fig
def generate_scope3_dashboard_html(
emission_data: list[dict],
output_path: str
) -> None:
"""Genera report HTML standalone con tutti i grafici"""
sankey = create_scope3_sankey(emission_data)
heatmap = create_category_heatmap(emission_data)
total_tco2e = sum(d["co2e_tonnes"] for d in emission_data)
by_category = {}
for d in emission_data:
cat = d["scope3_category"]
by_category[cat] = by_category.get(cat, 0) + d["co2e_tonnes"]
top_category = max(by_category, key=by_category.get)
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>Scope 3 Emissions Report</title>
<meta charset="utf-8">
</head>
<body>
<h1>Scope 3 Value Chain Emissions Report</h1>
<p>Totale: <strong>{total_tco2e:.1f} tCO2e</strong></p>
<p>Categoria più materiale: Cat. {top_category}
({by_category[top_category]:.1f} tCO2e)</p>
{sankey.to_html(full_html=False)}
{heatmap.to_html(full_html=False)}
</body>
</html>
"""
with open(output_path, "w") as f:
f.write(html_content)
CSRD/ESRS E1 보고 범위 3 요구 사항
La 기업 지속가능성 보고 지침(CSRD) 그리고 관련 표준 ESRS E1(기후변화) Scope 3 보고를 자발적 보고에서 수천 개의 유럽 기업에 필수입니다. 시행 일정은 시차를 두고 있다 그리고 이미 진행 중입니다.
CSRD 범위 3 일정 필요
| FY | 신고하세요 들어갈게요 | 과목 | 메모 |
|---|---|---|---|
| 2024년 | 2025년 초 | 이미 NFRD의 적용을 받는 대형 PIE(직원 500명 이상) | 첫 번째 물결: ~12,000개의 EU 기업 |
| 2025년 | 2026년 초 | 모든 대기업(>250개 부서 또는 >4천만 EUR) | ~50,000개의 EU 기업 |
| 2026년 | 2027년 초 | 상장 중소기업 | 단순화된 ESRS 표준 |
| 2028년 | 2029년 초 | EU 자회사가 있는 비 EU 회사 | 전 세계적으로 중요한 영향 |
ESRS E1에서는 Scope 3 배출에 대해 구체적으로 요구합니다.
- 모든 재료 카테고리 공개: 중요성이 결정되어야 함 이중 중요성 분석(영향 + 재무 리스크)을 통해 대부분의 경우 기술 기업의 경우 최소 4~6개의 카테고리가 중요합니다.
- 카테고리별 분류: 값은 단일 합계로 보고될 수 없습니다. 집계; 각 재료 카테고리에는 tCO2e에 대한 자체 데이터가 있어야 합니다.
- 명시적 방법론: 각 카테고리에 대해 계산 방법을 선언해야 합니다. (활동 기반, 지출 기반, 공급업체별), 배출 계수 소스 및 계층 데이터 품질.
- 탄소배출권으로 상계 없음: 총 배출량을 보고해야 합니다. 구매한 보상이나 탄소 상쇄와는 별도로.
- 필수 보험: 초기에는 제한적 보증을 목적으로 앞으로는 합리적인 확신으로 나아가세요. 이 문서에 설명된 감사 추적 그것이 바로 리뷰어들이 요구하는 것입니다.
- 목표 및 전환 계획: 기업은 다음의 목표를 선언해야 합니다. 감소는 중간 이정표와 함께 1.5°C(SBTi에 의해 검증됨)로 정렬됩니다.
주의: 범위 3 및 이중 중요성
ESRS E1에서는 15개의 Scope 3 범주를 모두 보고하도록 요구하지 않고 식별된 범주만 보고합니다. 어떻게 재료 이중 중요성 분석에서. 그러나, 그 과정은 중요성 결정은 문서화되고 감사 가능해야 합니다. 하나 제외 "데이터 부족으로 인한"이라는 범주는 허용되는 정당성이 아닙니다. 입증되어야 합니다. 해당 카테고리는 특정 비즈니스에 중요하지 않습니다.
사례 연구: 공급업체가 50개 있는 SaaS 회사
모든 것을 구체적인 예로 바꿔보겠습니다. 직원이 150명인 이탈리아의 중간 규모 SaaS, 수익 1,500만 유로, AWS 기반 인프라 및 50개의 활성 공급업체. 경영진은 다음과 같이 결정했다. CSRD에 앞서 Scope 3 계산을 시작하고 제공까지 3개월의 기간을 갖습니다. 감사자가 검증할 수 있는 데이터.
회사 프로필: SaaS Italia S.r.l.
| 매개변수 | Valore |
|---|---|
| 직원 | 150(70% 스마트워킹) |
| 위치 | 밀라노 본사 + 로마 사무소 |
| 하부 구조 | AWS eu-west-1(기본), GCP Europe-west1(백업) |
| 활동적인 공급업체 | 50개(대형 8개, 소형/중형 42개) |
| 조달비용 | ~420만 유로/년 |
| 연간 항공편 | ~380편(컨퍼런스 + 고객) |
1단계 – 중요성 분석(1~2주차): ESG팀은 Scope 3 물질 범주를 식별하기 위해 신속한 분석을 수행했습니다. 다음의 데이터 사용 EEIO 요소를 포함하는 초기 프록시로 ERP(SAP) 지출을 통해 다음 추정치를 얻었습니다. "검사":
범위 3 중요성 심사 — SaaS Italia S.r.l.
| 고양이. | 설명 | 지출 기반 추정치(tCO2e) | 전체 대비 % | 결정 |
|---|---|---|---|---|
| 1 | 구매한 상품 및 서비스(클라우드, SW) | 342 | 54% | 소재 → 활동 기반 |
| 6 | 출장 | 98 | 15% | 소재 → 활동 기반 |
| 7 | 직원 통근 | 87 | 14% | MATERIAL → 직원 설문조사 |
| 11 | 판매된 제품의 사용 | 76 | 12% | MATERIAL → SKI 측정 |
| 2 | 자본재(노트북, 하드웨어) | 28 | 4% | 소재 → 공급업체 PCF |
| 다른 | 고양이. 3, 5, 8, 15 | 6 | 1% | 비물질적 → 지출 기반 |
2단계 – 데이터 수집(2~8주):
- 고양이. 1(클라우드): AWS 고객 탄소 발자국 도구 및 GCP 탄소 발자국 계좌별 월별 발행 데이터를 제공합니다. API를 통해 추출된 데이터가 Bronze에 로드됨 레이어. 품질: TIER 1(공급업체별, AWS 인증)
- 고양이. 1(소프트웨어 및 서비스): 8개의 대규모 공급업체(>50K EUR/년)와 접촉 체계적인 설문지를 통해 5개는 기본 데이터(Microsoft ERP, Slack 포함)로 응답했습니다. 세일즈포스). 3개는 데이터가 없습니다. → EEIO를 사용한 지출 기반입니다.
- 고양이. 6(출장): 여행사(Carlson Wagonlit)에서 추출한 데이터 API를 통해: 노선과 클래스가 포함된 380편의 항공편. DEFRA 2024를 사용한 활동 기반 계산.
- 고양이. 7(통근): 전체 직원 150명을 대상으로 한 익명 설문조사(응답률 82%) 교통 수단, 평균 거리, 사무실에 있는 주당 일수.
- 고양이. 11 (판매된 제품의 사용): SCI(소프트웨어 탄소 강도) 계산 생산 인프라에 CodeCarbon을 사용합니다. 월별 활성 세션 수를 곱합니다.
# case_study/saas_italia_scope3.py
# Calcolo completo Scope 3 per SaaS Italia S.r.l.
from calculators.emission_calculator import (
calculate_activity_based,
calculate_spend_based,
calculate_category_total
)
def calculate_cat1_cloud() -> dict:
"""Cat. 1: Emissioni cloud AWS + GCP (dati primari vendor)"""
# Dati estratti dall'AWS Customer Carbon Footprint API
aws_kwh_year = 187_500 # kWh totali 2024
gcp_kwh_year = 12_300
aws_ef = 0.233 # kg CO2e/kWh IT grid (AWS eu-west-1)
gcp_ef = 0.198 # kg CO2e/kWh GCP europe-west1
aws_co2, aws_unc = calculate_activity_based("cloud_compute_kwh", aws_kwh_year, aws_ef)
gcp_co2, gcp_unc = calculate_activity_based("cloud_compute_kwh", gcp_kwh_year, gcp_ef)
activities = [
{"value_kg": aws_co2, "uncertainty_pct": 8.0, "quality_tier": "TIER_1"},
{"value_kg": gcp_co2, "uncertainty_pct": 10.0, "quality_tier": "TIER_1"},
]
result = calculate_category_total(activities)
result["category"] = 1
result["sub_category"] = "cloud_infrastructure"
return result
def calculate_cat6_business_travel() -> dict:
"""Cat. 6: Business travel (dati agenzia viaggi)"""
# 380 voli totali anno 2024
# 60% corto raggio (<1500km), 40% lungo raggio
short_haul_pkm = 380 * 0.6 * 850 # 850km avg corto raggio
long_haul_pkm = 380 * 0.4 * 3200 # 3200km avg lungo raggio
short_co2, short_unc = calculate_activity_based(
"flight_economy_short", short_haul_pkm
)
long_co2, long_unc = calculate_activity_based(
"flight_economy_long", long_haul_pkm
)
# Radiative forcing factor x1.9 per quota alta
rf_factor = 1.9
short_co2 *= rf_factor
long_co2 *= rf_factor
activities = [
{"value_kg": short_co2, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"},
{"value_kg": long_co2, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"},
]
result = calculate_category_total(activities)
result["category"] = 6
return result
def calculate_cat7_commuting() -> dict:
"""Cat. 7: Employee commuting (survey 123/150 dipendenti)"""
# Risultati survey (valori medi per dipendente/anno)
commuters = {
"car_solo": {"count": 38, "km_day": 28, "days_year": 120},
"car_shared": {"count": 12, "km_day": 22, "days_year": 110},
"public_transport": {"count": 52, "km_day": 35, "days_year": 140},
"cycling_walking": {"count": 21, "km_day": 4, "days_year": 150},
}
activities = []
# Auto privata
car_pkm = (
commuters["car_solo"]["count"] *
commuters["car_solo"]["km_day"] *
commuters["car_solo"]["days_year"]
)
co2_car, unc = calculate_activity_based("car_average", car_pkm)
activities.append({"value_kg": co2_car, "uncertainty_pct": 15.0, "quality_tier": "TIER_2"})
# Trasporto pubblico
pt_pkm = (
commuters["public_transport"]["count"] *
commuters["public_transport"]["km_day"] *
commuters["public_transport"]["days_year"]
)
co2_pt, unc = calculate_activity_based("public_transport_it", pt_pkm)
activities.append({"value_kg": co2_pt, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"})
# Ciclismo/piedi: zero emissioni dirette
activities.append({"value_kg": 0.0, "uncertainty_pct": 0.0, "quality_tier": "TIER_1"})
result = calculate_category_total(activities)
result["category"] = 7
result["survey_response_rate"] = 82.0
return result
def calculate_cat11_use_of_products() -> dict:
"""Cat. 11: Energia consumata dai clienti usando il SaaS"""
# SCI = 0.045 gCO2e per ogni API call (misurato con CodeCarbon)
sci_gco2e_per_call = 0.045
avg_calls_per_month = 48_500_000 # 48.5M calls/mese (dati produzione)
months = 12
total_calls = avg_calls_per_month * months
co2e_grams = total_calls * sci_gco2e_per_call
co2e_kg = co2e_grams / 1000
activities = [
{"value_kg": co2e_kg, "uncertainty_pct": 25.0, "quality_tier": "TIER_2"}
]
result = calculate_category_total(activities)
result["category"] = 11
result["metric"] = "API calls"
result["total_calls"] = total_calls
return result
def run_full_scope3_calculation() -> dict:
"""Esegue il calcolo completo Scope 3 per SaaS Italia S.r.l."""
results = {
"cat_1_cloud": calculate_cat1_cloud(),
"cat_6_travel": calculate_cat6_business_travel(),
"cat_7_commuting": calculate_cat7_commuting(),
"cat_11_use": calculate_cat11_use_of_products(),
}
# Categoria residuale (spend-based per tutto il resto)
residual_spend_eur = 210_000 # ~5% della spesa totale
residual_co2_kg, res_unc = calculate_spend_based(
residual_spend_eur, "it_services"
)
results["cat_residual"] = {
"total_co2e_tonnes": residual_co2_kg / 1000,
"uncertainty_pct": res_unc,
"category": "other",
"dominant_quality_tier": "TIER_3"
}
# Totale Scope 3
total_tco2e = sum(
v["total_co2e_tonnes"] for v in results.values()
)
from calculators.emission_calculator import propagate_uncertainty
all_values = [v["total_co2e_tonnes"] for v in results.values()]
all_uncertainties = [v["uncertainty_pct"] for v in results.values()]
overall_uncertainty = propagate_uncertainty(all_values, all_uncertainties)
return {
"company": "SaaS Italia S.r.l.",
"reporting_year": 2024,
"methodology": "GHG Protocol Corporate Value Chain Standard",
"scope3_total_tco2e": round(total_tco2e, 1),
"overall_uncertainty_pct": round(overall_uncertainty, 1),
"categories": results,
"notes": "Cat. 11 include radiative forcing factor per aviation"
}
if __name__ == "__main__":
import json
report = run_full_scope3_calculation()
print("=" * 50)
print(f"SCOPE 3 TOTALE: {report['scope3_total_tco2e']} tCO2e")
print(f"Incertezza: +/- {report['overall_uncertainty_pct']}%")
print("=" * 50)
for name, cat in report["categories"].items():
tco2e = cat.get("total_co2e_tonnes", 0)
unc = cat.get("uncertainty_pct", 0)
pct = tco2e / report["scope3_total_tco2e"] * 100
print(f" {name:25s} {tco2e:6.1f} tCO2e ({pct:.0f}%) ±{unc:.0f}%")
SaaS Italia S.r.l.에 대한 계산 결과 다음을 생산합니다:
범위 3 최종 결과 - SaaS Italia S.r.l. (2024년)
| 범주 | tCO2e | % 총 | 불확실성 | 계층 |
|---|---|---|---|---|
| 고양이. 1 – 클라우드 및 서비스 | 43.7 | 36% | ±9% | 1단계 |
| 고양이. 6 – 출장 | 33.5 | 28% | ±20% | 2단계 |
| 고양이. 7 – 통근 | 23.8 | 20% | ±17% | 2단계 |
| 고양이. 11 – 제품 사용 | 26.2 | 22% | ±25% | 2단계 |
| 잔여 고양이 | 6.5 | 5% | ±75% | 3단계 |
| 전체 범위 3 | 133.7 | 100% | ±13% | 2단계 |
Scope 1(본사 보일러에서 ~8tCO2e) 및 Scope 2(전기에서 ~12tCO2e) 추가 사무실), 당신은 총 발자국 약 154tCO2e 2024년에는 87%는 범위 3입니다.. 정확히는 소프트웨어 회사의 전형적인 패턴입니다.
Scope 3 파이프라인의 모범 사례 및 안티 패턴
범위 3 파이프라인 구현 체크리스트
| 영역 | 모범 사례 | 피해야 할 안티패턴 |
|---|---|---|
| 데이터 | 수신된 모든 데이터에 대해 불변의 브론즈 레이어 | 수정된 버전으로 원시 데이터 덮어쓰기 |
| 계산 | 사용된 배출계수 버전 지정 | 연도 및 소스를 표시하지 않고 EF를 사용하십시오. |
| 불확실성 | 항상 각 범주 전체에 불확실성을 전파합니다. | 범위 없이 정확한 값만 보고 |
| 품질 | 명시적이고 문서화된 품질평가점수 | TIER 1과 TIER 3를 구분 없이 혼합 |
| 감사 | 각 계산에 대한 해시 체인, 검증 가능한 오프체인 | 버전이 지정되지 않았으며 추적할 수 없는 Excel 보고서 |
| 공급자 | 지출/배출 기준 상위 20개 공급업체 우선순위 지정 | 50개 공급업체를 모두 동일하게 대우 |
| 업데이트 | 연간 품질 개선 계획 부여 | 지출 기반을 영구적인 솔루션으로 받아들입니다. |
| 빗자루 | 합리적인 제외 사항을 명시적으로 문서화하세요. | 공식적인 정당성 없이 카테고리 제외 |
점진적 개선 계획(데이터 성숙도 로드맵)
GHG 프로토콜은 명시적으로 진보적인 접근 방식을 권장합니다. 오늘날 아무것도 없는 품질이 낮은 지출 기반 데이터입니다. 개선하는 것이 목표 매년 재료 카테고리의 품질 등급:
- 1년차(기준선): 100% 지출 기반, 오류 ±75%, TIER 3
- 2년차: 활동 기반, ±40%, TIER 2 상위 10개 공급업체
- 3년차: 검증된 기본 데이터가 있는 상위 20개 공급업체, ±20%, TIER 2
- 4년 이상: 모든 공급업체를 위한 EcoVadis/CDP 통합, ±10%, TIER 1
이러한 점진적인 개선은 CSRD 보고서에 다음과 같이 기록될 수 있습니다. "방법론의 진화"이며 리뷰어들에 의해 긍정적으로 평가됩니다.
골드 레이어의 데이터베이스 스키마
Gold 계층에는 빠른 집계 쿼리를 지원하도록 설계된 스키마가 필요합니다. CSRD 보고를 위해 감사 체인에 대한 추적성을 유지합니다.
-- schema/scope3_gold.sql
-- Schema PostgreSQL per il Gold Layer Scope 3
-- Tabella principale: emissioni aggregate per categoria
CREATE TABLE scope3_emissions_gold (
id BIGSERIAL PRIMARY KEY,
company_id VARCHAR(50) NOT NULL,
reporting_year INTEGER NOT NULL,
scope3_category INTEGER NOT NULL CHECK (scope3_category BETWEEN 1 AND 15),
supplier_id VARCHAR(100),
-- Valori emissioni
co2e_tonnes DECIMAL(12, 3) NOT NULL,
co2_tonnes DECIMAL(12, 3),
ch4_tonnes_co2e DECIMAL(12, 3),
n2o_tonnes_co2e DECIMAL(12, 3),
-- Metodologia e qualità
calculation_method VARCHAR(30) NOT NULL, -- activity_based, spend_based, etc.
emission_factor_source VARCHAR(100) NOT NULL,
emission_factor_value DECIMAL(10, 6),
quality_tier VARCHAR(10) NOT NULL, -- TIER_1, TIER_2, TIER_3
uncertainty_pct DECIMAL(5, 1) NOT NULL,
uncertainty_tonnes DECIMAL(12, 3) GENERATED ALWAYS AS
(co2e_tonnes * uncertainty_pct / 100) STORED,
-- Tracciabilità
audit_chain_id UUID NOT NULL REFERENCES scope3_audit_chain(chain_id),
pipeline_version VARCHAR(20) NOT NULL,
reporting_standard VARCHAR(50) DEFAULT 'GHG_Protocol_Scope3_2011',
-- Timestamps
published_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Unicità per reporting period
CONSTRAINT uq_emission_period
UNIQUE (company_id, reporting_year, scope3_category, supplier_id)
);
-- Indici per performance query CSRD report
CREATE INDEX idx_scope3_company_year
ON scope3_emissions_gold (company_id, reporting_year);
CREATE INDEX idx_scope3_category
ON scope3_emissions_gold (scope3_category);
CREATE INDEX idx_scope3_quality
ON scope3_emissions_gold (quality_tier, uncertainty_pct);
-- View per report aggregato CSRD
CREATE VIEW v_scope3_csrd_report AS
SELECT
company_id,
reporting_year,
scope3_category,
SUM(co2e_tonnes) AS total_co2e_tonnes,
-- Propagazione incertezza quadratica
SQRT(SUM(POWER(co2e_tonnes * uncertainty_pct / 100, 2))) /
NULLIF(SUM(co2e_tonnes), 0) * 100 AS combined_uncertainty_pct,
-- Qualità aggregata (tier peggiore nella categoria)
MIN(quality_tier) AS data_quality_tier,
-- Metodo più usato
MODE() WITHIN GROUP (ORDER BY calculation_method) AS primary_method,
COUNT(DISTINCT supplier_id) AS supplier_count,
MAX(updated_at) AS last_updated
FROM scope3_emissions_gold
GROUP BY company_id, reporting_year, scope3_category
ORDER BY company_id, reporting_year, scope3_category;
-- Tabella audit chain
CREATE TABLE scope3_audit_chain (
chain_id UUID PRIMARY KEY,
record_id UUID NOT NULL DEFAULT gen_random_uuid(),
sequence INTEGER NOT NULL,
record_type VARCHAR(50) NOT NULL,
previous_hash CHAR(64) NOT NULL,
record_hash CHAR(64) NOT NULL UNIQUE,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT uq_chain_sequence UNIQUE (chain_id, sequence)
);
-- Indice per verifica integrità chain
CREATE INDEX idx_audit_chain_id_seq
ON scope3_audit_chain (chain_id, sequence);
결론 및 다음 단계
Scope 3 배출을 위한 강력한 파이프라인을 구축하는 것은 학술적인 활동이 아닙니다. 그것은중요 데이터 인프라 이는 필수 사항이 될 것입니다. 2028년까지 수천 개의 유럽 기업. 이 기사에서 살펴본 주요 원칙 회사 규모에 관계없이 적용 가능합니다.
- 원시 데이터의 불변성: SHA-256 해시를 보장하는 브론즈 레이어 모든 검토자는 몇 년이 지난 후에도 항상 원본 소스로 데이터를 추적할 수 있습니다.
- 방법론적 진보성: 지출 기반으로 시작하여 다음으로 이전하세요. 물질 범주에 대한 활동 기반 접근 방식은 GHG 프로토콜에서 권장하는 접근 방식입니다. 그 자체는 지름길이 아닙니다.
- 불확실성의 정량화: 간격 없이 배출을 보고합니다. 자신감은 불완전한 정보입니다. 불확실성의 2차 전파 이는 구현이 간단하고 보고서의 신뢰성에 기본입니다.
- 검증 가능한 감사 추적: 해시 체인은 외부 검증자를 허용합니다. 계산 후 데이터가 변경되지 않았음을 수학적으로 확인합니다.
- 생태계 통합: EcoVadis Carbon Data Network 등의 플랫폼 및 CDP는 특히 대규모 공급망의 경우 데이터 수집 부담을 크게 줄여줍니다.
SaaS Italia S.r.l.의 사례 연구 중소기업도 할 수 있다는 걸 보여주네요 2~3명의 팀으로 3개월 안에 Scope 3 CSRD 준수 보고서를 작성합니다. 자재 카테고리에 대한 기본 데이터와 잔여에 대한 지출 기반 데이터입니다. 핵심은 우선순위: 어디에서나 완벽함을 추구하지 말고 집중하세요. 배출량이 가장 높은 곳.
유용한 리소스
- GHG 프로토콜 범위 3 표준: gghgprotocol.org/corporate-value-chain-scope-3-standard
- Climatiq API (배출계수 데이터베이스): climatiq.io
- EcoVadis 탄소 데이터 네트워크: ecovadis.com/solutions/carbon
- ESRS E1 기후 변화 (공식 EU 텍스트): EFRAG ESRS E1
- 엑시오베이스 3.8 (EEIO 지출 기반 요소): exiobase.eu
시리즈의 다음 기사
다음 기사에서 ESG 보고 API: CSRD 워크플로와 통합 이 문서에서 계산된 Scope 3 데이터 위에 REST API 계층을 구축하여 다음을 구현하겠습니다. 유럽 지침에서 요구하는 형식을 준수하고 워크플로우를 통합하는 엔드포인트 감사자의 디지털 서명으로 승인을 보고합니다.
또한 데이터를 형식으로 노출하는 방법도 살펴보겠습니다. XBRL/iXBRL 제출용 CSRD 보고서의 필수 형식인 ESEF(European Single Electronic Format) 유럽 증권 거래소에 상장되었습니다.







