Scope 3 Value Chain Emissions Pipeline: From Raw Data to Audit Trail
70-90% of total emissions for a tech or software company hide outside its own boundaries: in cloud servers purchased as a service, in employee laptops, in business flights, in code that customers run on their devices. These are Scope 3 emissions, and for most digital organizations they represent the most complex measurement challenge in the ESG domain.
Unlike Scope 1 (we burn the fuel) and Scope 2 (we purchase the electricity), Scope 3 requires collecting data from hundreds of suppliers, applying heterogeneous emission factors, managing extremely high degrees of uncertainty, and producing an audit trail verifiable by external auditors. With CSRD/ESRS E1 making Scope 3 reporting mandatory for large companies starting in 2025-2026, and SMEs in scope by 2028, the question is no longer academic: it is engineering.
In this article we build a complete pipeline for calculating Scope 3 value chain emissions: from the ETL architecture for supplier data collection, to integration with platforms like CDP and EcoVadis, from activity-based vs spend-based calculation, to the Airflow DAG for automation and the immutable audit trail for auditors. Every section includes working Python code and operational best practices tested in real-world contexts.
What You Will Learn
- The 15 GHG Protocol Scope 3 categories and which ones are relevant for software/SaaS companies
- ETL/ELT architecture for supplier data collection: questionnaires, CDP, EcoVadis, and direct APIs
- Activity-based vs spend-based: formulas, accuracy, and when to use which approach
- Python pipeline with Apache Airflow: DAG for automated and scalable Scope 3 calculation
- Data quality scoring and statistical uncertainty propagation in estimates
- Immutable audit trail with SHA-256 hash chain for external auditor traceability
- Sankey diagram visualization for value chain and category priority heatmap
- CSRD/ESRS E1 requirements: what you must disclose and at what granularity
- Complete case study: SaaS company with 50 suppliers, end-to-end Scope 3 calculation
- Integration with EcoVadis Carbon Data Network and Climatiq API for emission factors
Green Software Series — 10 Articles
| # | Article | Topic |
|---|---|---|
| 1 | Green Software Foundation Principles | Carbon efficiency, GSF, SCI |
| 2 | CodeCarbon: Measuring Code | Measurement, dashboard, optimization |
| 3 | Climatiq API: Carbon Calculations | REST API, GHG Protocol, Scope 1-3 |
| 4 | Carbon Aware SDK | Time shifting, location shifting |
| 5 | Scope 1-2-3: ESG Data Modeling | Data structure, calculations, aggregation |
| 6 | GreenOps: Carbon-Aware Kubernetes | Scheduling, scaling, monitoring |
| 7 | Scope 3 Value Chain Emissions Pipeline | This article |
| 8 | ESG Reporting API: CSRD | API, workflow, compliance |
| 9 | Sustainable Architectural Patterns | Storage, caching, batch |
| 10 | AI and Carbon: ML Training | ML training, optimization, Green AI |
The 15 GHG Protocol Scope 3 Categories
The GHG Protocol Corporate Value Chain (Scope 3) Standard is the international reference framework published in 2011 and currently under revision with updates expected in 2026. It divides indirect value chain emissions into 15 distinct categories, organized into two macro-groups: upstream (activities before service production/delivery) and downstream (activities after the sale to the customer).
15 Scope 3 Categories: Upstream and Downstream
| Cat. | Name | Flow | SaaS/Tech Relevance |
|---|---|---|---|
| 1 | Purchased goods & services | Upstream | HIGH: server hardware, software licenses, consulting services |
| 2 | Capital goods | Upstream | MEDIUM: datacenter equipment, laptops, company phones |
| 3 | Fuel & energy-related activities | Upstream | MEDIUM: emissions from production of purchased energy (upstream Scope 2) |
| 4 | Upstream transportation & distribution | Upstream | LOW: hardware shipments to offices and datacenters |
| 5 | Waste generated in operations | Upstream | LOW: WEEE, paper, office waste |
| 6 | Business travel | Upstream | HIGH: flights, hotels, trains for distributed teams |
| 7 | Employee commuting | Upstream | HIGH: home-to-office commutes, especially for hybrid teams |
| 8 | Upstream leased assets | Upstream | MEDIUM: rented offices (if not accounted for in Scope 1/2) |
| 9 | Downstream transportation & distribution | Downstream | LOW: software distribution on physical media (rare) |
| 10 | Processing of sold products | Downstream | N/A: not applicable for pure software |
| 11 | Use of sold products | Downstream | VERY HIGH: energy consumed by customers using the SaaS |
| 12 | End-of-life treatment of sold products | Downstream | LOW: end-user devices at end of life |
| 13 | Downstream leased assets | Downstream | MEDIUM: hardware leased to customers |
| 14 | Franchises | Downstream | N/A: not applicable |
| 15 | Investments | Downstream | HIGH: corporate portfolio, startup equity holdings |
For a SaaS or software development company, the most relevant categories are typically: Cat. 1 (purchased goods & services, often the largest item), Cat. 6 (business travel), Cat. 7 (employee commuting), and Cat. 11 (use of sold products). The double materiality required by CSRD mandates identifying which categories are material from both environmental impact and financial risk perspectives.
Common Mistake: Omitting Cat. 11 for SaaS
Many software companies exclude Category 11 ("Use of sold products") assuming it does not apply. In reality, every API call, every query, every watt consumed by customers running your software is a Scope 3 Cat. 11 emission under your responsibility. For a SaaS with millions of users, this can be the dominant category. The calculation method uses the software carbon intensity (SCI) multiplied by the functional units delivered.
Data Collection Pipeline Architecture
Collecting reliable data from across the entire value chain is the number one bottleneck of any Scope 3 project. The pipeline must handle heterogeneous sources: manual questionnaires, third-party ESG platforms, direct APIs with suppliers, CSV files sent via email, internal ERP data. The following architecture adopts a three-layer ETL pattern (Bronze/Silver/Gold) inspired by the Lakehouse approach.
Scope 3 Pipeline Architecture: Bronze / Silver / Gold
| Layer | Content | Technology | Purpose |
|---|---|---|---|
| Bronze (Raw) | Immutable raw data from suppliers | S3/GCS, Delta Lake | Audit trail, replay, source of truth |
| Silver (Standardized) | Data normalized by unit and currency | dbt, Spark, Pandas | Emission calculation, join with emission factors |
| Gold (Reporting) | Emissions aggregated by GHG category | PostgreSQL, BigQuery | Dashboard, CSRD report, auditors |
The Bronze layer is fundamental: all received data is saved as-is with ingestion timestamp, SHA-256 content hash, and source metadata. This guarantees the ability to reprocess the entire pipeline if emission factors or methodology change, without losing the original data.
# models/scope3_pipeline.py
# Data structures for the Scope 3 pipeline
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import hashlib
import json
class DataSource(Enum):
SUPPLIER_QUESTIONNAIRE = "supplier_questionnaire"
CDP_API = "cdp_api"
ECOVADIS_API = "ecovadis_api"
ERP_EXPORT = "erp_export"
MANUAL_UPLOAD = "manual_upload"
CLIMATIQ_API = "climatiq_api"
class CalculationMethod(Enum):
ACTIVITY_BASED = "activity_based"
SPEND_BASED = "spend_based"
HYBRID = "hybrid"
SUPPLIER_SPECIFIC = "supplier_specific"
class DataQualityTier(Enum):
TIER_1 = "primary_data" # Primary data from supplier
TIER_2 = "secondary_sector" # Sector-level factors
TIER_3 = "spend_estimated" # Spend-based estimate
@dataclass
class RawSupplierData:
"""Bronze layer: immutable raw data"""
supplier_id: str
source: DataSource
raw_payload: dict
received_at: datetime
content_hash: str = field(init=False)
def __post_init__(self):
payload_str = json.dumps(self.raw_payload, sort_keys=True)
self.content_hash = hashlib.sha256(
payload_str.encode()
).hexdigest()
@dataclass
class StandardizedActivity:
"""Silver layer: normalized activity"""
activity_id: str
supplier_id: str
scope3_category: int # 1-15
activity_type: str # e.g. "freight_transport"
quantity: float
unit: str # e.g. "tonne.km"
reporting_period_start: datetime
reporting_period_end: datetime
source: DataSource
quality_tier: DataQualityTier
emission_factor_id: Optional[str] = None
uncertainty_pct: float = 0.0
raw_data_hash: str = "" # Ref to Bronze layer
@dataclass
class EmissionResult:
"""Gold layer: calculated emission"""
result_id: str
activity_id: str
scope3_category: int
co2e_tonnes: float
calculation_method: CalculationMethod
emission_factor_source: str # e.g. "climatiq:IPCC_2021"
emission_factor_value: float
quality_tier: DataQualityTier
uncertainty_pct: float
calculated_at: datetime
pipeline_version: str
audit_hash: str = field(init=False)
def __post_init__(self):
audit_data = {
"result_id": self.result_id,
"activity_id": self.activity_id,
"co2e_tonnes": self.co2e_tonnes,
"emission_factor_source": self.emission_factor_source,
"calculated_at": self.calculated_at.isoformat(),
"pipeline_version": self.pipeline_version,
}
self.audit_hash = hashlib.sha256(
json.dumps(audit_data, sort_keys=True).encode()
).hexdigest()
Supplier Data Integration: CDP, EcoVadis, and Direct APIs
Supplier data collection occurs through multiple channels with very different levels of quality and automation. The Carbon Disclosure Project (CDP) collects data from over 24,000 companies and exposes an API to access verified reports. EcoVadis launched the Carbon Data Network in 2025 with 48,000+ GHG reporters sharing data in a standardized format. Finally, many large suppliers expose proprietary APIs for direct footprint data sharing.
# collectors/supplier_collector.py
# Integration with supplier data sources
import httpx
import asyncio
from typing import AsyncGenerator
from datetime import datetime
from models.scope3_pipeline import RawSupplierData, DataSource
class ClimatiqEmissionFactors:
"""Client for Climatiq API - emission factors database"""
BASE_URL = "https://beta3.api.climatiq.io"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async def get_emission_factor(
self,
activity_id: str,
year: int = 2024,
region: str = "IT"
) -> dict:
"""Retrieve emission factor for a specific activity"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/estimate",
headers=self.headers,
json={
"emission_factor": {
"activity_id": activity_id,
"data_version": "^21",
"year": year,
"region": region
},
"parameters": {
"money": 1.0,
"money_unit": "eur"
}
}
)
response.raise_for_status()
return response.json()
async def batch_estimate(
self,
activities: list[dict]
) -> list[dict]:
"""Batch estimate for multiple activities - optimizes API calls"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/batch",
headers=self.headers,
json={"batch": activities},
timeout=30.0
)
response.raise_for_status()
return response.json().get("results", [])
class EcoVadisCollector:
"""Collects Scope 3 data from the EcoVadis platform"""
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
async def fetch_supplier_carbon_data(
self,
supplier_ecovadis_id: str,
reporting_year: int
) -> RawSupplierData:
"""Retrieve carbon data for a supplier from the Carbon Data Network"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/v1/suppliers/{supplier_ecovadis_id}/carbon",
headers={"X-API-Key": self.api_key},
params={"year": reporting_year},
timeout=15.0
)
if response.status_code == 404:
# Supplier has not shared primary data
return self._create_no_data_record(supplier_ecovadis_id)
response.raise_for_status()
payload = response.json()
return RawSupplierData(
supplier_id=supplier_ecovadis_id,
source=DataSource.ECOVADIS_API,
raw_payload=payload,
received_at=datetime.utcnow()
)
def _create_no_data_record(self, supplier_id: str) -> RawSupplierData:
return RawSupplierData(
supplier_id=supplier_id,
source=DataSource.ECOVADIS_API,
raw_payload={"status": "no_data", "supplier_id": supplier_id},
received_at=datetime.utcnow()
)
class CDPCollector:
"""Collects data from CDP (Carbon Disclosure Project)"""
CDP_API_URL = "https://api.cdp.net/v1"
def __init__(self, api_token: str):
self.api_token = api_token
async def search_supplier(
self,
company_name: str,
year: int = 2024
) -> RawSupplierData | None:
"""Search for a supplier in the CDP database and retrieve GHG data"""
async with httpx.AsyncClient() as client:
# Search company
search_resp = await client.get(
f"{self.CDP_API_URL}/companies/search",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"q": company_name, "year": year}
)
if not search_resp.json().get("results"):
return None
company_id = search_resp.json()["results"][0]["id"]
# Retrieve GHG disclosure data
ghg_resp = await client.get(
f"{self.CDP_API_URL}/companies/{company_id}/ghg-emissions",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"year": year}
)
if ghg_resp.status_code != 200:
return None
return RawSupplierData(
supplier_id=company_name,
source=DataSource.CDP_API,
raw_payload=ghg_resp.json(),
received_at=datetime.utcnow()
)
class BronzeLayerStorage:
"""Immutable storage in the Bronze layer"""
def __init__(self, storage_client, bucket: str):
self.storage = storage_client
self.bucket = bucket
async def store(self, raw_data: RawSupplierData) -> str:
"""Save raw data with deterministic path based on hash"""
path = (
f"scope3/bronze/"
f"{raw_data.received_at.year}/"
f"{raw_data.received_at.month:02d}/"
f"{raw_data.supplier_id}/"
f"{raw_data.content_hash}.json"
)
await self.storage.upload_json(
bucket=self.bucket,
path=path,
data={
"supplier_id": raw_data.supplier_id,
"source": raw_data.source.value,
"received_at": raw_data.received_at.isoformat(),
"content_hash": raw_data.content_hash,
"payload": raw_data.raw_payload
}
)
return path
Activity-Based vs Spend-Based: Choosing the Right Method
The GHG Protocol defines four calculation methods for Scope 3, which in practice reduce to two fundamental approaches: activity-based and spend-based. The choice depends on data availability, category materiality, and the maturity of the supplier relationship.
Methodological Comparison: Activity-Based vs Spend-Based
| Dimension | Activity-Based | Spend-Based |
|---|---|---|
| Formula | Quantity × Emission Factor (unit/kg CO2e) | Spend (EUR) × EEIO Factor (kg CO2e/EUR) |
| Accuracy | High (±5-15% with primary data) | Low-Medium (±50-100%) |
| Required data | Physical quantities (kg, km, kWh, t) | Only accounting invoices (EUR, USD) |
| EF source | Climatiq, IPCC, DEFRA, ecoinvent | USEEIO, EXIOBASE, WIOD |
| When to use | Material categories, large suppliers | Kickoff, small suppliers, Cat. <1% |
| Collection effort | High: requires supplier collaboration | Low: data already in ERP/SAP |
| CSRD acceptability | Preferred for material categories | Accepted as initial proxy |
The optimal strategy is a progressive hybrid approach: start with spend-based to get a quick baseline across the entire value chain, then progressively migrate to activity-based for identified material categories. GHG Protocol defines three data quality tiers (Tier 1, 2, 3) that correspond exactly to this progression.
# calculators/emission_calculator.py
# Activity-based and spend-based emission calculations
from dataclasses import dataclass
from typing import Optional
import math
# ============================================================
# EMISSION FACTORS DATABASE (simplified)
# In production: use Climatiq API or ecoinvent database
# ============================================================
EMISSION_FACTORS: dict[str, dict] = {
# Cat. 1: Purchased goods & services
"cloud_compute_kwh": {
"value": 0.233, # kg CO2e/kWh (IT grid mix 2024)
"unit": "kWh",
"source": "IEA 2024",
"uncertainty_pct": 10.0
},
"hardware_laptop": {
"value": 350.0, # kg CO2e/unit (embodied carbon)
"unit": "unit",
"source": "Dell 2024 PCF",
"uncertainty_pct": 20.0
},
# Cat. 6: Business travel
"flight_economy_short": {
"value": 0.255, # kg CO2e/passenger.km
"unit": "passenger.km",
"source": "DEFRA 2024",
"uncertainty_pct": 15.0
},
"flight_economy_long": {
"value": 0.195,
"unit": "passenger.km",
"source": "DEFRA 2024",
"uncertainty_pct": 15.0
},
# Cat. 7: Employee commuting
"car_average": {
"value": 0.170, # kg CO2e/km
"unit": "km",
"source": "DEFRA 2024",
"uncertainty_pct": 12.0
},
"public_transport_it": {
"value": 0.048,
"unit": "passenger.km",
"source": "Ispra 2024",
"uncertainty_pct": 18.0
},
}
# EEIO Spend-based factors (EXIOBASE 3.8)
# kg CO2e per EUR of spend per procurement category
EEIO_FACTORS: dict[str, float] = {
"it_services": 0.312, # IT and telecommunications
"professional_services": 0.198, # Consulting, legal, etc.
"office_supplies": 0.445,
"cloud_hosting": 0.287,
"marketing": 0.231,
"utilities": 0.892,
"hr_services": 0.167,
"travel_accommodation": 0.521,
}
def calculate_activity_based(
activity_type: str,
quantity: float,
custom_ef: Optional[float] = None
) -> tuple[float, float]:
"""
Calculate emissions using the activity-based method.
Returns:
(co2e_kg, uncertainty_pct)
"""
if custom_ef is not None:
return quantity * custom_ef, 30.0 # high uncertainty for custom EF
ef_data = EMISSION_FACTORS.get(activity_type)
if not ef_data:
raise ValueError(f"Emission factor not found: {activity_type}")
co2e_kg = quantity * ef_data["value"]
uncertainty = ef_data["uncertainty_pct"]
return co2e_kg, uncertainty
def calculate_spend_based(
spend_eur: float,
procurement_category: str,
inflation_correction: float = 1.0
) -> tuple[float, float]:
"""
Calculate emissions using the spend-based method (EEIO).
Args:
spend_eur: amount in EUR
procurement_category: EEIO procurement category
inflation_correction: factor to correct for inflation vs EEIO base year
Returns:
(co2e_kg, uncertainty_pct)
"""
eeio_factor = EEIO_FACTORS.get(procurement_category)
if not eeio_factor:
raise ValueError(f"EEIO factor not found: {procurement_category}")
# Correct for inflation (EEIO factors often in EUR 2015)
adjusted_spend = spend_eur / inflation_correction
co2e_kg = adjusted_spend * eeio_factor
# Spend-based has inherently high uncertainty
uncertainty = 75.0
return co2e_kg, uncertainty
def propagate_uncertainty(
values: list[float],
uncertainties_pct: list[float]
) -> float:
"""
Quadratic uncertainty propagation (sum in quadrature).
Valid when uncertainties are independent.
Returns:
uncertainty_pct on the total
"""
weighted_variance_sum = sum(
(v * u/100) ** 2
for v, u in zip(values, uncertainties_pct)
)
total = sum(values)
if total == 0:
return 0.0
combined_std = math.sqrt(weighted_variance_sum)
return (combined_std / total) * 100
def calculate_category_total(
activities: list[dict]
) -> dict:
"""
Calculate Scope 3 category total with uncertainty propagation.
activities: list of {method, value_kg, uncertainty_pct}
"""
if not activities:
return {"total_co2e_kg": 0.0, "uncertainty_pct": 0.0}
values = [a["value_kg"] for a in activities]
uncertainties = [a["uncertainty_pct"] for a in activities]
total_co2e = sum(values)
combined_uncertainty = propagate_uncertainty(values, uncertainties)
# Aggregate quality: worst in the group determines the tier
quality_tiers = [a.get("quality_tier", "TIER_3") for a in activities]
dominant_tier = min(quality_tiers) # TIER_1 < TIER_2 < TIER_3 lexicographically
return {
"total_co2e_kg": total_co2e,
"total_co2e_tonnes": total_co2e / 1000,
"uncertainty_pct": combined_uncertainty,
"uncertainty_kg": total_co2e * combined_uncertainty / 100,
"dominant_quality_tier": dominant_tier,
"activity_count": len(activities)
}
Airflow Pipeline: DAG for Automated Scope 3 Calculation
Orchestrating the Scope 3 pipeline requires a well-structured DAG that manages the complete lifecycle: data collection, standardization, emission calculation, quality checks, and publication to the Gold layer. The DAG must be idempotent (executable multiple times without side effects) and recoverable in case of partial failure.
# dags/scope3_pipeline_dag.py
# Apache Airflow DAG for Scope 3 emissions pipeline
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task, task_group
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import Variable
import json
import logging
logger = logging.getLogger(__name__)
# ============================================================
# DAG CONFIGURATION
# ============================================================
SCOPE3_DAG_CONFIG = {
"reporting_year": 2024,
"companies": [
{"id": "S001", "name": "AWS", "tier": "TIER_1", "source": "ecovadis"},
{"id": "S002", "name": "Microsoft Azure", "tier": "TIER_1", "source": "cdp"},
{"id": "S003", "name": "Supplier_XYZ", "tier": "TIER_2", "source": "questionnaire"},
# ... other suppliers
],
"categories_enabled": [1, 2, 3, 6, 7, 11, 15],
"quality_threshold_pct": 80.0,
"alert_email": "esg-team@company.com"
}
default_args = {
"owner": "esg-team",
"depends_on_past": False,
"email_on_failure": True,
"email": [SCOPE3_DAG_CONFIG["alert_email"]],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="scope3_emissions_pipeline",
default_args=default_args,
description="Pipeline for Scope 3 value chain emissions calculation",
schedule_interval="@quarterly", # Quarterly execution
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["emissions", "scope3", "esg", "ghg-protocol"],
max_active_runs=1, # Serialize: never two calculations in parallel
) as dag:
# ============================================================
# PHASE 1: SUPPLIER DATA COLLECTION (parallel per supplier)
# ============================================================
@task_group(group_id="data_collection")
def collect_supplier_data():
@task(task_id="fetch_ecovadis_suppliers")
def fetch_ecovadis() -> list[dict]:
"""Collect data from EcoVadis Carbon Data Network"""
from collectors.supplier_collector import EcoVadisCollector
import asyncio
api_key = Variable.get("ECOVADIS_API_KEY", deserialize_json=False)
collector = EcoVadisCollector(api_key, "https://api.ecovadis.com")
suppliers_ecovadis = [
s for s in SCOPE3_DAG_CONFIG["companies"]
if s["source"] == "ecovadis"
]
results = []
for supplier in suppliers_ecovadis:
raw = asyncio.run(
collector.fetch_supplier_carbon_data(
supplier["id"],
SCOPE3_DAG_CONFIG["reporting_year"]
)
)
results.append({
"supplier_id": raw.supplier_id,
"content_hash": raw.content_hash,
"status": "fetched",
"has_data": raw.raw_payload.get("status") != "no_data"
})
logger.info(f"EcoVadis - Supplier {supplier['id']}: fetched")
return results
@task(task_id="fetch_cdp_suppliers")
def fetch_cdp() -> list[dict]:
"""Collect verified data from CDP"""
from collectors.supplier_collector import CDPCollector
import asyncio
api_token = Variable.get("CDP_API_TOKEN")
collector = CDPCollector(api_token)
suppliers_cdp = [
s for s in SCOPE3_DAG_CONFIG["companies"]
if s["source"] == "cdp"
]
results = []
for supplier in suppliers_cdp:
raw = asyncio.run(
collector.search_supplier(
supplier["name"],
SCOPE3_DAG_CONFIG["reporting_year"]
)
)
if raw:
results.append({
"supplier_id": supplier["id"],
"content_hash": raw.content_hash,
"status": "fetched",
"has_data": True
})
else:
results.append({
"supplier_id": supplier["id"],
"status": "not_found",
"has_data": False
})
return results
@task(task_id="process_manual_questionnaires")
def process_questionnaires() -> list[dict]:
"""Process manual questionnaires uploaded to S3"""
# In production: reads from S3 bucket or SharePoint
# Here we return sample data
return [{
"supplier_id": "S003",
"status": "processed",
"has_data": True,
"scope3_cat1_tco2e": 45.2,
"scope3_cat6_tco2e": 12.8
}]
ev = fetch_ecovadis()
cdp = fetch_cdp()
q = process_questionnaires()
return [ev, cdp, q]
# ============================================================
# PHASE 2: STANDARDIZATION AND EMISSION CALCULATION
# ============================================================
@task(task_id="standardize_activities")
def standardize_activities(collection_results: list) -> list[dict]:
"""Normalize all data into standard physical units"""
from normalizers.activity_normalizer import ActivityNormalizer
normalizer = ActivityNormalizer()
standardized = []
for batch in collection_results:
for result in batch:
if result.get("has_data"):
activities = normalizer.normalize(result)
standardized.extend(activities)
logger.info(f"Standardized {len(standardized)} activities")
return standardized
@task(task_id="calculate_emissions")
def calculate_emissions(activities: list[dict]) -> list[dict]:
"""Calculate CO2e emissions for each standardized activity"""
from calculators.emission_calculator import (
calculate_activity_based,
calculate_spend_based
)
from models.scope3_pipeline import CalculationMethod
results = []
for activity in activities:
if activity["method"] == "activity_based":
co2e_kg, uncertainty = calculate_activity_based(
activity["activity_type"],
activity["quantity"]
)
method = CalculationMethod.ACTIVITY_BASED
else:
co2e_kg, uncertainty = calculate_spend_based(
activity["spend_eur"],
activity["procurement_category"]
)
method = CalculationMethod.SPEND_BASED
results.append({
**activity,
"co2e_kg": co2e_kg,
"co2e_tonnes": co2e_kg / 1000,
"uncertainty_pct": uncertainty,
"calculation_method": method.value,
"calculated_at": datetime.utcnow().isoformat()
})
return results
# ============================================================
# PHASE 3: DATA QUALITY CHECK
# ============================================================
@task(task_id="data_quality_check")
def data_quality_check(results: list[dict]) -> dict:
"""Verify data quality and generate per-category scores"""
from quality.data_quality_scorer import DataQualityScorer
scorer = DataQualityScorer()
quality_report = scorer.score_results(results)
if quality_report["overall_score"] < SCOPE3_DAG_CONFIG["quality_threshold_pct"]:
logger.warning(
f"Quality score below threshold: {quality_report['overall_score']}%"
)
return quality_report
# ============================================================
# PHASE 4: AUDIT TRAIL AND GOLD LAYER PUBLICATION
# ============================================================
@task(task_id="create_audit_trail")
def create_audit_trail(
results: list[dict],
quality_report: dict
) -> str:
"""Create immutable audit trail with hash chain"""
from audit.hash_chain import HashChain
chain = HashChain()
chain_id = chain.create_chain(
calculation_results=results,
quality_report=quality_report,
pipeline_version="2.1.0",
methodology="GHG_Protocol_Scope3_2011",
reporting_standard="CSRD_ESRS_E1"
)
logger.info(f"Audit trail created: {chain_id}")
return chain_id
@task(task_id="publish_gold_layer")
def publish_gold_layer(
results: list[dict],
audit_chain_id: str
) -> None:
"""Publish aggregated data to the Gold layer (PostgreSQL)"""
hook = PostgresHook(postgres_conn_id="emissions_db")
for result in results:
hook.run(
"""
INSERT INTO scope3_emissions_gold (
supplier_id, scope3_category, co2e_tonnes,
calculation_method, uncertainty_pct,
quality_tier, audit_chain_id,
reporting_year, published_at
) VALUES (
%(supplier_id)s, %(scope3_category)s, %(co2e_tonnes)s,
%(calculation_method)s, %(uncertainty_pct)s,
%(quality_tier)s, %(audit_chain_id)s,
%(reporting_year)s, NOW()
)
ON CONFLICT (supplier_id, scope3_category, reporting_year)
DO UPDATE SET
co2e_tonnes = EXCLUDED.co2e_tonnes,
updated_at = NOW()
""",
parameters={
**result,
"audit_chain_id": audit_chain_id,
"reporting_year": SCOPE3_DAG_CONFIG["reporting_year"]
}
)
logger.info(f"Published {len(results)} records to Gold layer")
# ============================================================
# DAG WIRING
# ============================================================
collection_results = collect_supplier_data()
standardized = standardize_activities(collection_results)
emission_results = calculate_emissions(standardized)
quality = data_quality_check(emission_results)
chain_id = create_audit_trail(emission_results, quality)
publish_gold_layer(emission_results, chain_id)
Data Quality Scoring and Uncertainty Propagation
The GHG Protocol Scope 3 Standard explicitly acknowledges that emissions for categories 1-15 are never known with absolute certainty. Quality reporting must include a quantitative uncertainty estimate associated with each category. The IPCC has formalized the uncertainty propagation method in its Good Practice Guidance.
# quality/data_quality_scorer.py
# Scope 3 data quality scoring
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import math
from datetime import datetime, timedelta
class QualityDimension(Enum):
COMPLETENESS = "completeness"
ACCURACY = "accuracy"
TIMELINESS = "timeliness"
VERIFICATION = "verification"
GRANULARITY = "granularity"
@dataclass
class QualityScore:
dimension: QualityDimension
score: float # 0-100
weight: float # weight in aggregate calculation
notes: str = ""
def score_supplier_data_quality(
supplier: dict,
reference_date: datetime = None
) -> dict[str, float]:
"""
Calculate multi-dimensional quality score for supplier data.
Based on GHG Protocol Data Quality Guidance.
"""
if reference_date is None:
reference_date = datetime.utcnow()
scores = []
# 1. COMPLETENESS: how many of the required categories are present?
required_fields = [
"scope3_cat1_tco2e", "scope3_cat6_tco2e", "scope3_cat7_tco2e"
]
present = sum(1 for f in required_fields if supplier.get(f) is not None)
completeness_score = (present / len(required_fields)) * 100
scores.append(QualityScore(
dimension=QualityDimension.COMPLETENESS,
score=completeness_score,
weight=0.30
))
# 2. TIMELINESS: how recent is the data?
data_year = supplier.get("reporting_year", 2020)
current_year = reference_date.year
age_years = current_year - data_year
if age_years <= 1:
timeliness_score = 100.0
elif age_years == 2:
timeliness_score = 75.0
elif age_years == 3:
timeliness_score = 50.0
else:
timeliness_score = 20.0
scores.append(QualityScore(
dimension=QualityDimension.TIMELINESS,
score=timeliness_score,
weight=0.20
))
# 3. VERIFICATION: has the data been verified by third parties?
verification_level = supplier.get("verification", "none")
verification_score = {
"independent_assured": 100.0, # GHG verified by independent auditor
"limited_assurance": 80.0, # Limited assurance
"internal_reviewed": 60.0, # Internal review only
"supplier_declared": 40.0, # Self-declaration
"estimated": 20.0, # Spend-based estimate
"none": 0.0
}.get(verification_level, 20.0)
scores.append(QualityScore(
dimension=QualityDimension.VERIFICATION,
score=verification_score,
weight=0.30
))
# 4. GRANULARITY: activity-specific or aggregated?
data_type = supplier.get("data_type", "aggregated")
granularity_score = {
"site_specific": 100.0, # Data per production site
"product_specific": 90.0, # PCF per product/service
"supplier_specific": 70.0, # Supplier-level total
"sector_average": 40.0, # Sector average
"aggregated": 20.0
}.get(data_type, 20.0)
scores.append(QualityScore(
dimension=QualityDimension.GRANULARITY,
score=granularity_score,
weight=0.20
))
# Weighted aggregate score calculation
overall = sum(s.score * s.weight for s in scores)
# Map score to GHG Protocol Tier
if overall >= 80:
tier = "TIER_1"
uncertainty_band_pct = 15.0
elif overall >= 50:
tier = "TIER_2"
uncertainty_band_pct = 40.0
else:
tier = "TIER_3"
uncertainty_band_pct = 75.0
return {
"overall_score": round(overall, 1),
"tier": tier,
"uncertainty_band_pct": uncertainty_band_pct,
"dimension_scores": {
s.dimension.value: round(s.score, 1)
for s in scores
}
}
def monte_carlo_uncertainty(
base_estimate_tco2e: float,
uncertainty_pct: float,
n_simulations: int = 10_000
) -> dict:
"""
Estimate confidence interval with Monte Carlo simulation.
For CSRD reporting, at least 1,000 simulations are recommended.
"""
import random
# Log-normal distribution (emissions cannot be negative)
sigma = math.log(1 + (uncertainty_pct / 100) ** 2) ** 0.5
mu = math.log(base_estimate_tco2e) - sigma ** 2 / 2
simulated = [
math.exp(random.gauss(mu, sigma))
for _ in range(n_simulations)
]
simulated_sorted = sorted(simulated)
p05 = simulated_sorted[int(n_simulations * 0.05)]
p50 = simulated_sorted[int(n_simulations * 0.50)]
p95 = simulated_sorted[int(n_simulations * 0.95)]
return {
"base_estimate_tco2e": base_estimate_tco2e,
"p05_tco2e": round(p05, 2),
"p50_tco2e": round(p50, 2),
"p95_tco2e": round(p95, 2),
"confidence_interval_90pct": {
"lower": round(p05, 2),
"upper": round(p95, 2)
},
"coefficient_of_variation": round(
(p95 - p05) / (2 * p50) * 100, 1
)
}
Immutable Audit Trail with Hash Chain
End-to-end traceability is one of the most critical requirements for verifiable Scope 3 reporting. External auditors must be able to trace back from every number in the final report to the primary data source, through all transformation steps. A hash chain inspired by blockchain technology (but without the distributed complexity) guarantees the immutability of the audit trail.
# audit/hash_chain.py
# Immutable audit trail for Scope 3 emissions
import hashlib
import json
import uuid
from datetime import datetime
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class AuditRecord:
"""Single record in the audit chain"""
def __init__(
self,
record_type: str,
payload: dict,
previous_hash: str,
chain_id: str,
sequence: int
):
self.record_id = str(uuid.uuid4())
self.record_type = record_type
self.payload = payload
self.previous_hash = previous_hash
self.chain_id = chain_id
self.sequence = sequence
self.created_at = datetime.utcnow().isoformat()
self.record_hash = self._compute_hash()
def _compute_hash(self) -> str:
"""SHA-256 hash of all record fields (except the hash itself)"""
data = {
"record_id": self.record_id,
"record_type": self.record_type,
"chain_id": self.chain_id,
"sequence": self.sequence,
"previous_hash": self.previous_hash,
"created_at": self.created_at,
"payload_hash": hashlib.sha256(
json.dumps(self.payload, sort_keys=True, default=str).encode()
).hexdigest()
}
return hashlib.sha256(
json.dumps(data, sort_keys=True).encode()
).hexdigest()
def to_dict(self) -> dict:
return {
"record_id": self.record_id,
"record_type": self.record_type,
"chain_id": self.chain_id,
"sequence": self.sequence,
"previous_hash": self.previous_hash,
"record_hash": self.record_hash,
"created_at": self.created_at,
"payload": self.payload
}
class HashChain:
"""
Hash chain for immutable Scope 3 emissions audit trail.
Each record contains the hash of the previous record,
making it impossible to modify a record without invalidating
all subsequent records.
"""
GENESIS_HASH = "0" * 64 # Hash of the first record in the chain
def __init__(self, db_client=None):
self.db = db_client
self.records: list[AuditRecord] = []
def create_chain(
self,
calculation_results: list[dict],
quality_report: dict,
pipeline_version: str,
methodology: str,
reporting_standard: str
) -> str:
"""Create a new chain for a complete Scope 3 calculation"""
chain_id = str(uuid.uuid4())
previous_hash = self.GENESIS_HASH
# Record 1: Pipeline metadata
pipeline_record = AuditRecord(
record_type="PIPELINE_METADATA",
payload={
"version": pipeline_version,
"methodology": methodology,
"reporting_standard": reporting_standard,
"calculation_timestamp": datetime.utcnow().isoformat(),
"total_activities": len(calculation_results)
},
previous_hash=previous_hash,
chain_id=chain_id,
sequence=0
)
self.records.append(pipeline_record)
previous_hash = pipeline_record.record_hash
# Record 2: Quality report
quality_record = AuditRecord(
record_type="QUALITY_ASSESSMENT",
payload=quality_report,
previous_hash=previous_hash,
chain_id=chain_id,
sequence=1
)
self.records.append(quality_record)
previous_hash = quality_record.record_hash
# Records 3..N: Individual emission results
for i, result in enumerate(calculation_results):
emission_record = AuditRecord(
record_type="EMISSION_CALCULATION",
payload={
"supplier_id": result.get("supplier_id"),
"scope3_category": result.get("scope3_category"),
"co2e_tonnes": result.get("co2e_tonnes"),
"calculation_method": result.get("calculation_method"),
"emission_factor_source": result.get("emission_factor_source"),
"uncertainty_pct": result.get("uncertainty_pct"),
"quality_tier": result.get("quality_tier")
},
previous_hash=previous_hash,
chain_id=chain_id,
sequence=2 + i
)
self.records.append(emission_record)
previous_hash = emission_record.record_hash
# Persist to DB (or immutable storage)
if self.db:
self._persist_chain(chain_id)
logger.info(
f"Chain {chain_id} created with {len(self.records)} records. "
f"Final hash: {previous_hash[:16]}..."
)
return chain_id
def verify_chain_integrity(self, chain_id: str) -> bool:
"""
Verify that no record has been tampered with.
Traverses the chain recomputing every hash.
"""
records = self._load_chain(chain_id)
if not records:
return False
expected_previous = self.GENESIS_HASH
for record_dict in records:
# Recompute hash
record = AuditRecord(
record_type=record_dict["record_type"],
payload=record_dict["payload"],
previous_hash=record_dict["previous_hash"],
chain_id=record_dict["chain_id"],
sequence=record_dict["sequence"]
)
if record_dict["previous_hash"] != expected_previous:
logger.error(
f"Chain corrupted at record {record_dict['sequence']}: "
f"previous_hash does not match"
)
return False
expected_previous = record.record_hash
return True
def _persist_chain(self, chain_id: str) -> None:
"""Save all chain records to DB"""
for record in self.records:
self.db.insert("scope3_audit_chain", record.to_dict())
def _load_chain(self, chain_id: str) -> list[dict]:
"""Load chain records from DB in sequence order"""
if not self.db:
return [r.to_dict() for r in self.records]
return self.db.query(
"SELECT * FROM scope3_audit_chain WHERE chain_id = %s ORDER BY sequence",
[chain_id]
)
Visualization: Sankey Diagram and Category Heatmap
A well-built Scope 3 pipeline must also produce visualizations that make data comprehensible to both technical and non-technical stakeholders. The Sankey diagram is the ideal tool for showing emission flows along the value chain, while a heatmap enables rapid identification of the most material categories and those with the lowest data quality.
# visualizations/scope3_charts.py
# Scope 3 Sankey diagram and heatmap generation
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
from typing import Optional
def create_scope3_sankey(
emission_data: list[dict],
title: str = "Scope 3 Value Chain Emissions"
) -> go.Figure:
"""
Create Sankey diagram to visualize Scope 3 emission flows.
Structure: Supplier -> S3 Category -> Scope 3 Total
"""
# Collect unique nodes
suppliers = list(set(d["supplier_id"] for d in emission_data))
categories = list(set(f"Cat. {d['scope3_category']}" for d in emission_data))
all_nodes = suppliers + categories + ["Scope 3 Total"]
node_index = {node: i for i, node in enumerate(all_nodes)}
# Build source->target->value links
source_indices = []
target_indices = []
values = []
link_labels = []
for record in emission_data:
supplier = record["supplier_id"]
category = f"Cat. {record['scope3_category']}"
tco2e = record["co2e_tonnes"]
# Supplier -> Category
source_indices.append(node_index[supplier])
target_indices.append(node_index[category])
values.append(tco2e)
link_labels.append(f"{tco2e:.1f} tCO2e")
# Category -> Total
for cat in categories:
cat_total = sum(
d["co2e_tonnes"]
for d in emission_data
if f"Cat. {d['scope3_category']}" == cat
)
source_indices.append(node_index[cat])
target_indices.append(node_index["Scope 3 Total"])
values.append(cat_total)
link_labels.append(f"{cat_total:.1f} tCO2e")
# Node colors
node_colors = (
["#2196F3"] * len(suppliers) + # Blue for suppliers
["#FF9800"] * len(categories) + # Orange for categories
["#4CAF50"] # Green for total
)
fig = go.Figure(go.Sankey(
arrangement="snap",
node=dict(
pad=20,
thickness=20,
line=dict(color="white", width=0.5),
label=all_nodes,
color=node_colors,
hovertemplate="{label}
tCO2e: {value:.1f}<extra></extra>"
),
link=dict(
source=source_indices,
target=target_indices,
value=values,
label=link_labels,
color="rgba(100,100,100,0.3)"
)
))
fig.update_layout(
title_text=title,
font_size=12,
height=600,
paper_bgcolor="rgba(0,0,0,0)",
plot_bgcolor="rgba(0,0,0,0)"
)
return fig
def create_category_heatmap(
emission_data: list[dict]
) -> go.Figure:
"""
Heatmap: X axis = Scope 3 category, Y axis = data quality.
Color = tCO2e. Helps prioritize data collection effort.
"""
df = pd.DataFrame(emission_data)
# Aggregate by category and quality tier
pivot = df.pivot_table(
values="co2e_tonnes",
index="quality_tier",
columns="scope3_category",
aggfunc="sum",
fill_value=0
)
# Sort tiers (TIER_1 best at top)
tier_order = ["TIER_1", "TIER_2", "TIER_3"]
pivot = pivot.reindex(
[t for t in tier_order if t in pivot.index]
)
fig = go.Figure(go.Heatmap(
z=pivot.values,
x=[f"Cat. {c}" for c in pivot.columns],
y=list(pivot.index),
colorscale="RdYlGn_r", # Red = high emission (critical)
text=pivot.values.round(1),
texttemplate="%{text} t",
textfont={"size": 11},
hovertemplate="Category: %{x}
Tier: %{y}
%{z:.1f} tCO2e<extra></extra>",
colorbar=dict(title="tCO2e")
))
fig.update_layout(
title="Scope 3 Heatmap: Emissions by Category and Data Quality",
xaxis_title="GHG Protocol Category",
yaxis_title="Data Quality Tier",
height=350,
margin=dict(l=80, r=20, t=60, b=60)
)
return fig
def generate_scope3_dashboard_html(
emission_data: list[dict],
output_path: str
) -> None:
"""Generate standalone HTML report with all charts"""
sankey = create_scope3_sankey(emission_data)
heatmap = create_category_heatmap(emission_data)
total_tco2e = sum(d["co2e_tonnes"] for d in emission_data)
by_category = {}
for d in emission_data:
cat = d["scope3_category"]
by_category[cat] = by_category.get(cat, 0) + d["co2e_tonnes"]
top_category = max(by_category, key=by_category.get)
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>Scope 3 Emissions Report</title>
<meta charset="utf-8">
</head>
<body>
<h1>Scope 3 Value Chain Emissions Report</h1>
<p>Total: <strong>{total_tco2e:.1f} tCO2e</strong></p>
<p>Most material category: Cat. {top_category}
({by_category[top_category]:.1f} tCO2e)</p>
{sankey.to_html(full_html=False)}
{heatmap.to_html(full_html=False)}
</body>
</html>
"""
with open(output_path, "w") as f:
f.write(html_content)
CSRD/ESRS E1 Requirements for Scope 3 Reporting
The Corporate Sustainability Reporting Directive (CSRD) and the related standard ESRS E1 (Climate Change) transform Scope 3 reporting from voluntary to mandatory for thousands of European companies. The implementation timeline is staggered and already underway.
Mandatory Scope 3 CSRD Timeline
| FY | Report By | Entities | Notes |
|---|---|---|---|
| 2024 | Early 2025 | Large PIEs already subject to NFRD (>500 emp.) | First wave: ~12,000 EU companies |
| 2025 | Early 2026 | All large companies (>250 emp. or >40M EUR) | ~50,000 EU companies |
| 2026 | Early 2027 | Listed SMEs | Simplified ESRS standard |
| 2028 | Early 2029 | Non-EU companies with EU subsidiaries | Significant global impact |
ESRS E1 specifically requires for Scope 3 emissions:
- Disclosure of all material categories: materiality must be determined through double materiality analysis (impact + financial risk). For most tech companies, at least 4-6 categories turn out to be material.
- Breakdown by category: values cannot be reported as a single aggregated total; each material category must have its own figure in tCO2e.
- Explicit methodology: for each category, the calculation method must be declared (activity-based, spend-based, supplier-specific), along with the emission factor source and data quality tier.
- No netting with carbon credits: gross emissions must be reported separately from any purchased offsets or carbon credits.
- Mandatory assurance: limited assurance initially, with the goal of transitioning to reasonable assurance in the future. The audit trail described in this article is exactly what auditors will require.
- Targets and transition plan: companies must declare reduction targets aligned with 1.5°C (preferably SBTi-validated) with intermediate milestones.
Warning: Scope 3 and Double Materiality
ESRS E1 does not require reporting all 15 Scope 3 categories, but only those identified as material in the double materiality assessment. However, the materiality determination process must be documented and auditable. Excluding a category "due to lack of data" is not an acceptable justification: you must demonstrate that the category is not material for the specific business.
Case Study: SaaS Company with 50 Suppliers
Let us translate everything into a concrete example: a mid-size Italian SaaS company with 150 employees, 15M EUR revenue, infrastructure on AWS, and 50 active suppliers. Management has decided to start Scope 3 calculation in preparation for CSRD and has a 3-month window to deliver verifiable data to their auditor.
Company Profile: SaaS Italia S.r.l.
| Parameter | Value |
|---|---|
| Employees | 150 (70% remote working) |
| Offices | Milan HQ + Rome office |
| Infrastructure | AWS eu-west-1 (primary), GCP europe-west1 (backup) |
| Active suppliers | 50 (8 large, 42 small/medium) |
| Procurement spend | ~4.2M EUR/year |
| Annual flights | ~380 flights (conferences + clients) |
Phase 1 – Materiality Analysis (Week 1-2): The ESG team conducted a rapid analysis to identify material Scope 3 categories. Using spend data from the ERP (SAP) as an initial proxy with EEIO factors, they obtained this "screening" estimate:
Scope 3 Materiality Screening — SaaS Italia S.r.l.
| Cat. | Description | Spend-based Estimate (tCO2e) | % of Total | Decision |
|---|---|---|---|---|
| 1 | Purchased goods & services (cloud, SW) | 342 | 54% | MATERIAL → Activity-based |
| 6 | Business travel | 98 | 15% | MATERIAL → Activity-based |
| 7 | Employee commuting | 87 | 14% | MATERIAL → Employee survey |
| 11 | Use of sold products | 76 | 12% | MATERIAL → SCI measurement |
| 2 | Capital goods (laptops, hardware) | 28 | 4% | MATERIAL → Vendor PCF |
| Other | Cat. 3, 5, 8, 15 | 6 | 1% | Not material → Spend-based |
Phase 2 – Data Collection (Week 2-8):
- Cat. 1 (Cloud): AWS Customer Carbon Footprint Tool and GCP Carbon Footprint provide monthly emission data per account. Data extracted via API and loaded into the Bronze layer. Quality: TIER 1 (supplier-specific, AWS-verified).
- Cat. 1 (Software and services): 8 large suppliers (>50K EUR/year) contacted with a structured questionnaire. 5 responded with primary data (including Microsoft ERP, Slack, Salesforce). 3 had no data → spend-based with EEIO.
- Cat. 6 (Business travel): data extracted from the travel agency (Carlson Wagonlit) via API: 380 flights with routing and class. Activity-based calculation with DEFRA 2024.
- Cat. 7 (Commuting): anonymous survey to all 150 employees (82% response rate). Mode of transport, average distance, days per week in the office.
- Cat. 11 (Use of sold products): SCI (Software Carbon Intensity) calculated with CodeCarbon on production infrastructure. Multiplied by number of active sessions/month.
# case_study/saas_italia_scope3.py
# Complete Scope 3 calculation for SaaS Italia S.r.l.
from calculators.emission_calculator import (
calculate_activity_based,
calculate_spend_based,
calculate_category_total
)
def calculate_cat1_cloud() -> dict:
"""Cat. 1: Cloud emissions AWS + GCP (primary vendor data)"""
# Data extracted from the AWS Customer Carbon Footprint API
aws_kwh_year = 187_500 # Total kWh 2024
gcp_kwh_year = 12_300
aws_ef = 0.233 # kg CO2e/kWh IT grid (AWS eu-west-1)
gcp_ef = 0.198 # kg CO2e/kWh GCP europe-west1
aws_co2, aws_unc = calculate_activity_based("cloud_compute_kwh", aws_kwh_year, aws_ef)
gcp_co2, gcp_unc = calculate_activity_based("cloud_compute_kwh", gcp_kwh_year, gcp_ef)
activities = [
{"value_kg": aws_co2, "uncertainty_pct": 8.0, "quality_tier": "TIER_1"},
{"value_kg": gcp_co2, "uncertainty_pct": 10.0, "quality_tier": "TIER_1"},
]
result = calculate_category_total(activities)
result["category"] = 1
result["sub_category"] = "cloud_infrastructure"
return result
def calculate_cat6_business_travel() -> dict:
"""Cat. 6: Business travel (travel agency data)"""
# 380 total flights in 2024
# 60% short-haul (<1500km), 40% long-haul
short_haul_pkm = 380 * 0.6 * 850 # 850km avg short-haul
long_haul_pkm = 380 * 0.4 * 3200 # 3200km avg long-haul
short_co2, short_unc = calculate_activity_based(
"flight_economy_short", short_haul_pkm
)
long_co2, long_unc = calculate_activity_based(
"flight_economy_long", long_haul_pkm
)
# Radiative forcing factor x1.9 for high altitude
rf_factor = 1.9
short_co2 *= rf_factor
long_co2 *= rf_factor
activities = [
{"value_kg": short_co2, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"},
{"value_kg": long_co2, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"},
]
result = calculate_category_total(activities)
result["category"] = 6
return result
def calculate_cat7_commuting() -> dict:
"""Cat. 7: Employee commuting (survey 123/150 employees)"""
# Survey results (average values per employee/year)
commuters = {
"car_solo": {"count": 38, "km_day": 28, "days_year": 120},
"car_shared": {"count": 12, "km_day": 22, "days_year": 110},
"public_transport": {"count": 52, "km_day": 35, "days_year": 140},
"cycling_walking": {"count": 21, "km_day": 4, "days_year": 150},
}
activities = []
# Private car
car_pkm = (
commuters["car_solo"]["count"] *
commuters["car_solo"]["km_day"] *
commuters["car_solo"]["days_year"]
)
co2_car, unc = calculate_activity_based("car_average", car_pkm)
activities.append({"value_kg": co2_car, "uncertainty_pct": 15.0, "quality_tier": "TIER_2"})
# Public transport
pt_pkm = (
commuters["public_transport"]["count"] *
commuters["public_transport"]["km_day"] *
commuters["public_transport"]["days_year"]
)
co2_pt, unc = calculate_activity_based("public_transport_it", pt_pkm)
activities.append({"value_kg": co2_pt, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"})
# Cycling/walking: zero direct emissions
activities.append({"value_kg": 0.0, "uncertainty_pct": 0.0, "quality_tier": "TIER_1"})
result = calculate_category_total(activities)
result["category"] = 7
result["survey_response_rate"] = 82.0
return result
def calculate_cat11_use_of_products() -> dict:
"""Cat. 11: Energy consumed by customers using the SaaS"""
# SCI = 0.045 gCO2e per API call (measured with CodeCarbon)
sci_gco2e_per_call = 0.045
avg_calls_per_month = 48_500_000 # 48.5M calls/month (production data)
months = 12
total_calls = avg_calls_per_month * months
co2e_grams = total_calls * sci_gco2e_per_call
co2e_kg = co2e_grams / 1000
activities = [
{"value_kg": co2e_kg, "uncertainty_pct": 25.0, "quality_tier": "TIER_2"}
]
result = calculate_category_total(activities)
result["category"] = 11
result["metric"] = "API calls"
result["total_calls"] = total_calls
return result
def run_full_scope3_calculation() -> dict:
"""Execute the complete Scope 3 calculation for SaaS Italia S.r.l."""
results = {
"cat_1_cloud": calculate_cat1_cloud(),
"cat_6_travel": calculate_cat6_business_travel(),
"cat_7_commuting": calculate_cat7_commuting(),
"cat_11_use": calculate_cat11_use_of_products(),
}
# Residual category (spend-based for everything else)
residual_spend_eur = 210_000 # ~5% of total spend
residual_co2_kg, res_unc = calculate_spend_based(
residual_spend_eur, "it_services"
)
results["cat_residual"] = {
"total_co2e_tonnes": residual_co2_kg / 1000,
"uncertainty_pct": res_unc,
"category": "other",
"dominant_quality_tier": "TIER_3"
}
# Scope 3 total
total_tco2e = sum(
v["total_co2e_tonnes"] for v in results.values()
)
from calculators.emission_calculator import propagate_uncertainty
all_values = [v["total_co2e_tonnes"] for v in results.values()]
all_uncertainties = [v["uncertainty_pct"] for v in results.values()]
overall_uncertainty = propagate_uncertainty(all_values, all_uncertainties)
return {
"company": "SaaS Italia S.r.l.",
"reporting_year": 2024,
"methodology": "GHG Protocol Corporate Value Chain Standard",
"scope3_total_tco2e": round(total_tco2e, 1),
"overall_uncertainty_pct": round(overall_uncertainty, 1),
"categories": results,
"notes": "Cat. 11 includes radiative forcing factor for aviation"
}
if __name__ == "__main__":
import json
report = run_full_scope3_calculation()
print("=" * 50)
print(f"SCOPE 3 TOTAL: {report['scope3_total_tco2e']} tCO2e")
print(f"Uncertainty: +/- {report['overall_uncertainty_pct']}%")
print("=" * 50)
for name, cat in report["categories"].items():
tco2e = cat.get("total_co2e_tonnes", 0)
unc = cat.get("uncertainty_pct", 0)
pct = tco2e / report["scope3_total_tco2e"] * 100
print(f" {name:25s} {tco2e:6.1f} tCO2e ({pct:.0f}%) +/-{unc:.0f}%")
The calculation result for SaaS Italia S.r.l. produces:
Final Scope 3 Results — SaaS Italia S.r.l. (FY 2024)
| Category | tCO2e | % of Total | Uncertainty | Tier |
|---|---|---|---|---|
| Cat. 1 – Cloud & Services | 43.7 | 36% | ±9% | TIER 1 |
| Cat. 6 – Business Travel | 33.5 | 28% | ±20% | TIER 2 |
| Cat. 7 – Commuting | 23.8 | 20% | ±17% | TIER 2 |
| Cat. 11 – Use of Products | 26.2 | 22% | ±25% | TIER 2 |
| Residual Categories | 6.5 | 5% | ±75% | TIER 3 |
| SCOPE 3 TOTAL | 133.7 | 100% | ±13% | TIER 2 |
Adding Scope 1 (~8 tCO2e from the HQ boiler) and Scope 2 (~12 tCO2e from office electricity), the result is a total footprint of ~154 tCO2e for 2024, of which 87% is Scope 3. Exactly the typical pattern for software companies.
Best Practices and Anti-Patterns in the Scope 3 Pipeline
Scope 3 Pipeline Implementation Checklist
| Area | Best Practice | Anti-Pattern to Avoid |
|---|---|---|
| Data | Immutable Bronze layer for every piece of received data | Overwriting raw data with corrected versions |
| Calculation | Version the emission factors used | Using EFs without specifying year and source |
| Uncertainty | Always propagate uncertainty across every category | Reporting only point values without ranges |
| Quality | Explicit and documented quality score | Mixing TIER 1 and TIER 3 without distinction |
| Audit | Hash chain for every calculation, verifiable off-chain | Unversioned and untraceable Excel reports |
| Suppliers | Prioritize top 20 suppliers by spend/emissions | Treating all 50 suppliers the same way |
| Updates | Annual data quality improvement plan | Accepting spend-based as a permanent solution |
| Scope | Explicitly document justified exclusions | Excluding categories without formal justification |
Progressive Improvement Plan (Data Maturity Roadmap)
The GHG Protocol explicitly encourages a progressive approach: having low-quality spend-based data today is better than having nothing. The goal is to improve the quality tier of material categories every year:
- Year 1 (baseline): 100% spend-based, ±75% error, TIER 3
- Year 2: top 10 suppliers with activity-based, ±40%, TIER 2
- Year 3: top 20 suppliers with verified primary data, ±20%, TIER 2
- Year 4+: EcoVadis/CDP integration for all suppliers, ±10%, TIER 1
This progressive improvement is documentable in the CSRD report as "methodology evolution" and is positively evaluated by auditors.
Database Schema for the Gold Layer
The Gold layer requires a schema designed to support fast aggregation queries for CSRD reporting while maintaining traceability to the audit chain.
-- schema/scope3_gold.sql
-- PostgreSQL schema for the Scope 3 Gold Layer
-- Main table: emissions aggregated by category
CREATE TABLE scope3_emissions_gold (
id BIGSERIAL PRIMARY KEY,
company_id VARCHAR(50) NOT NULL,
reporting_year INTEGER NOT NULL,
scope3_category INTEGER NOT NULL CHECK (scope3_category BETWEEN 1 AND 15),
supplier_id VARCHAR(100),
-- Emission values
co2e_tonnes DECIMAL(12, 3) NOT NULL,
co2_tonnes DECIMAL(12, 3),
ch4_tonnes_co2e DECIMAL(12, 3),
n2o_tonnes_co2e DECIMAL(12, 3),
-- Methodology and quality
calculation_method VARCHAR(30) NOT NULL, -- activity_based, spend_based, etc.
emission_factor_source VARCHAR(100) NOT NULL,
emission_factor_value DECIMAL(10, 6),
quality_tier VARCHAR(10) NOT NULL, -- TIER_1, TIER_2, TIER_3
uncertainty_pct DECIMAL(5, 1) NOT NULL,
uncertainty_tonnes DECIMAL(12, 3) GENERATED ALWAYS AS
(co2e_tonnes * uncertainty_pct / 100) STORED,
-- Traceability
audit_chain_id UUID NOT NULL REFERENCES scope3_audit_chain(chain_id),
pipeline_version VARCHAR(20) NOT NULL,
reporting_standard VARCHAR(50) DEFAULT 'GHG_Protocol_Scope3_2011',
-- Timestamps
published_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Uniqueness for reporting period
CONSTRAINT uq_emission_period
UNIQUE (company_id, reporting_year, scope3_category, supplier_id)
);
-- Indexes for CSRD report query performance
CREATE INDEX idx_scope3_company_year
ON scope3_emissions_gold (company_id, reporting_year);
CREATE INDEX idx_scope3_category
ON scope3_emissions_gold (scope3_category);
CREATE INDEX idx_scope3_quality
ON scope3_emissions_gold (quality_tier, uncertainty_pct);
-- View for aggregated CSRD report
CREATE VIEW v_scope3_csrd_report AS
SELECT
company_id,
reporting_year,
scope3_category,
SUM(co2e_tonnes) AS total_co2e_tonnes,
-- Quadratic uncertainty propagation
SQRT(SUM(POWER(co2e_tonnes * uncertainty_pct / 100, 2))) /
NULLIF(SUM(co2e_tonnes), 0) * 100 AS combined_uncertainty_pct,
-- Aggregate quality (worst tier in the category)
MIN(quality_tier) AS data_quality_tier,
-- Most used method
MODE() WITHIN GROUP (ORDER BY calculation_method) AS primary_method,
COUNT(DISTINCT supplier_id) AS supplier_count,
MAX(updated_at) AS last_updated
FROM scope3_emissions_gold
GROUP BY company_id, reporting_year, scope3_category
ORDER BY company_id, reporting_year, scope3_category;
-- Audit chain table
CREATE TABLE scope3_audit_chain (
chain_id UUID PRIMARY KEY,
record_id UUID NOT NULL DEFAULT gen_random_uuid(),
sequence INTEGER NOT NULL,
record_type VARCHAR(50) NOT NULL,
previous_hash CHAR(64) NOT NULL,
record_hash CHAR(64) NOT NULL UNIQUE,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT uq_chain_sequence UNIQUE (chain_id, sequence)
);
-- Index for chain integrity verification
CREATE INDEX idx_audit_chain_id_seq
ON scope3_audit_chain (chain_id, sequence);
Conclusions and Next Steps
Building a robust Scope 3 emissions pipeline is not an academic exercise: it is critical data infrastructure that will become mandatory for thousands of European companies by 2028. The key principles covered in this article are applicable regardless of company size:
- Raw data immutability: the Bronze layer with SHA-256 hashing ensures that any auditor can always trace back to the original data source, even years later.
- Methodological progressiveness: starting with spend-based and migrating to activity-based for material categories is the approach recommended by the GHG Protocol itself, not a shortcut.
- Uncertainty quantification: reporting emissions without a confidence interval is incomplete information. Quadratic uncertainty propagation is simple to implement and fundamental for report credibility.
- Verifiable audit trail: the hash chain allows an external auditor to mathematically confirm that no data has been altered post-calculation.
- Ecosystem integration: platforms like EcoVadis Carbon Data Network and CDP enormously reduce the data collection burden, especially for large supply chains.
The SaaS Italia S.r.l. case study shows that even a medium-sized company can produce a CSRD-compliant Scope 3 report in 3 months with a team of 2-3 people, combining primary data for material categories and spend-based for the residual. The key is prioritization: do not seek perfection everywhere, but concentrate effort where emissions are highest.
Useful Resources
- GHG Protocol Scope 3 Standard: ghgprotocol.org/corporate-value-chain-scope-3-standard
- Climatiq API (emission factors database): climatiq.io
- EcoVadis Carbon Data Network: ecovadis.com/solutions/carbon
- ESRS E1 Climate Change (official EU text): EFRAG ESRS E1
- EXIOBASE 3.8 (EEIO spend-based factors): exiobase.eu
Next Article in the Series
In the next article ESG Reporting API: CSRD Workflow Integration we will build a REST API layer on top of the Scope 3 data calculated in this article, implementing endpoints compliant with the formats required by the European directive and integrating the report approval workflow with the auditor's digital signature.
We will also see how to expose data in XBRL/iXBRL format for submission to ESEF (European Single Electronic Format), the mandatory format for CSRD reports of companies listed on European stock exchanges.







