Climatiq API: GHG 排出量の計算をバックエンドに統合
2025 年、炭素排出量監視ソリューションの市場は成長 まで 173億ドルますます厳しくなる規制によって推進される 欧州の CSRD、米国の SEC 気候情報開示、ISO 14064 など。 Excel シートや大まかな見積もりに頼る必要はもうありません。便利です。 自動計算、 検証可能でオペレーティング システムに統合されている.
技術的な課題は現実的です: GHG (温室効果ガス) 排出量の計算にはデータベースへのアクセスが必要です 排出係数 更新、GHG プロトコルによって検証された方法論、および変換 何百もの異なる測定単位の間で。これらすべてを社内で構築および保守する必要があります 数か月にわたる専門的な作業。ここで登場します 気候.
Climatiq は、以上のアクセスを提供する REST API です。 190,000 の排出係数 40 を超える検証済み情報源 (EPA、DEFRA/BEIS、IEA、ecoinvent) からの、以下の内容 300 以上の地理的地域、スコープ 1、2、および 3 の GHG プロトコルに準拠した計算。 この記事では、 完全な FastAPI バックエンド Climatiqを統合したもの TypeScript クライアントとリアルタイム計算機を備えた生産排出量計算用 SaaS アプリケーション向け。
何を学ぶか
- Climatiq API アーキテクチャ: エンドポイント、認証、レート制限、データ モデル
- 排出係数データベース: 正しい係数を検索、フィルタリング、選択する方法
- 活動量ベースの推定:具体的な活動量(kWh、km、kg)に基づいて計算
- 支出ベースの見積もり: 一次データが欠落している場合の通貨支出の計算
- GHG プロトコルのスコープ 1、2、3: カテゴリのマッピングと準拠した計算
- 再試行、Redis キャッシュ、エラー処理を備えた堅牢な Python クライアント
- フロントエンド統合のための Axios および型を備えた TypeScript/Node.js クライアント
- 製品に炭素ラベルを付けた SaaS 向けリアルタイム炭素計算ツール
- CI/CD 環境のモック API を使用したテスト
- Climatiq の代替手段と機能の比較
グリーン ソフトウェア エンジニアリング シリーズ
この記事は、グリーン ソフトウェア エンジニアリングに関する全シリーズの一部です。すべてのアイテム デジタル持続可能性の特定の側面に対処します。
| # | アイテム | メイントピック |
|---|---|---|
| 1 | グリーン ソフトウェア エンジニアリングの原則 | GSF、SCI仕様、8つの基本原則 |
| 2 | CodeCarbon: コードエミッションの測定 | Python ライブラリ、ダッシュボード、CI/CD 統合 |
| 3 | Climatiq API: バックエンドでの GHG 計算 | REST API、スコープ 1 ~ 3、FastAPI + TypeScript の統合 |
| 4 | カーボンアウェア SDK | ワークロードのシフト、グリッドの強度、タイムシフト |
| 5 | スコープ 1、2、3: ESG レポートのためのデータ モデリング | データ構造、計算、集計、レポート作成 |
| 6 | GreenOps: カーボンを意識したインフラストラクチャ | Kubernetes のスケジューリング、問題に基づいたスケーリング |
| 7 | 排出パイプライン スコープ 3 バリューチェーン | サプライヤーのデータ収集、計算、監査証跡 |
| 8 | ESG レポート API: CSRD の統合 | CSRD ワークフロー、レポート自動化、コンプライアンス |
| 9 | 持続可能な建築パターン | ストレージ、インテリジェントなキャッシュ、二酸化炭素を意識したバッチ |
| 10 | AI とカーボン: ML トレーニングのフットプリント | LLM排出量、最適化、グリーンAI |
1. GHG プロトコルと自動計算の必要性
Il GHGプロトコル企業基準 世界で最も多く採用されているフレームワークです 企業の排出量を計算するため。排出量は次の 3 つの領域に分類されます。
- スコープ 1 (直接排出): 社用車の燃料の燃焼、 生産工場、暖房。これらは会社の直接の管理下にあります。
- スコープ 2 (間接エネルギー): 電気、蒸気、熱を購入しました。 それらは次のように分けられます。 ロケーションベースの (ローカルネットワークミックス) e 市場ベースの (エネルギー証明書、PPA)。
- スコープ 3 (バリューチェーン): 商品の購入を含む 15 カテゴリー およびサービス、上流/下流の輸送、製品の使用、耐用年数の終了、出張、 従業員の通勤など。それらは通常、次のことを表します。 70-90% 総排出量のうち 会社の。
各計算には、 排出係数:換算する係数 活動(ディーゼルのリットル、電力のkWh、購入のユーロ) CO₂e kg 単位。これらの要因は次によって異なります。
- 基準年: 電力網の構成は毎年変化する
- 地理的地域: イタリアのkWhはドイツのkWhとは異なります
- データソース: 米国の場合は EPA、英国の場合は DEFRA、イタリアの場合は ISPRA
- 周囲長: 上流、下流、ゆりかごから門まで、ゆりかごから墓場まで
最新の排出係数データベースを維持するには、専任のチームが必要です。 Climatiq が私たちの代わりにそれを行い、さらに集約します 40の検証済み情報源 と 継続的なアップデート。
2.Climatiq API の概要
アーキテクチャと主要なエンドポイント
Climatiq API は URL ベースの JSON REST API です https://beta3.api.climatiq.io。
認証は次の方法で行われます 無記名トークン HTTPヘッダーにあります。
利用可能なプランは次のとおりです。
| Piano | 通話数/月 | 機能性 | 一般的な使用方法 |
|---|---|---|---|
| コミュニティ | 250 | すべてのエンドポイント | プロトタイピング、テスト |
| スターター | 5,000 | + プライベートな要素 | 中小企業、MVP |
| 成長 | 50,000 | + SLA、サポート | 成長する企業 |
| 企業 | カスタム | + 監査証跡、SSO | 大組織 |
主なエンドポイントは次のとおりです。
| エンドポイント | 方法 | 説明 |
|---|---|---|
/search |
得る | データベース内の排出係数を検索する |
/estimate |
役職 | 活動による推定単一排出量 |
/batch/estimate |
役職 | 複数の見積もり(リクエストごとに最大 100 件) |
/travel/flights |
役職 | 航空会社の排出量(スコープ 3.6) |
/freight |
役職 | 複合一貫貨物輸送の排出量 |
/procurement |
役職 | 購入の問題 (スコープ 3.1、支出ベース) |
/energy |
役職 | エネルギー消費排出量(スコープ2) |
/compute |
役職 | クラウドコンピューティングの排出量 |
見積り依頼の構成
# Esempio di richiesta POST /estimate
{
"emission_factor": {
"activity_id": "electricity-supply_grid-source_residual_mix",
"data_version": "^21",
"region": "IT"
},
"parameters": {
"energy": 1000,
"energy_unit": "kWh"
}
}
# Risposta
{
"co2e": 0.415,
"co2e_unit": "kg",
"co2e_calculation_method": "ar5",
"co2e_calculation_origin": "source",
"emission_factor": {
"activity_id": "electricity-supply_grid-source_residual_mix",
"source": "IEA",
"year": 2022,
"region": "IT",
"category": "Electricity",
"lca_activity": "electricity_generation",
"data_quality_flags": []
},
"constituent_gases": {
"co2e_total": 0.415,
"co2e_other": null,
"co2": 0.415,
"ch4": null,
"n2o": null
}
}
データ バージョン: セマンティクスとベスト プラクティス
フィールド data_version 使用するデータベースのバージョンを制御します。
キャレット (^21) 互換性のあるバージョン 21 以降を使用し、確実に
排出係数の自動更新。生産では、 ブロック
正確なバージョン (例えば。 "21") 再現性のため
計算と監査証跡。新しいソフトウェアバージョンに意図的にアップグレードする
過去の値を明示的に再計算します。
3. データモデル: 排出係数データベース
排出係数の構造
Climatiq の排出係数は次のように一意に識別されます。
activity_id, source, region, year
e lca_activity。この構造を理解することが重要です
をクリックして正しい係数を選択します。
# Struttura di un Emission Factor nel database Climatiq
{
"activity_id": "fuel_type-diesel",
"uuid": "94de5038-8b06-4e24-8e8c-1b87e1e0",
"name": "Diesel",
"category": "Fuel",
"sector": "Transport",
"source": "DEFRA",
"source_link": "https://www.gov.uk/guidance/ghg-conversion-factors-for-company-reporting",
"source_dataset": "DEFRA 2023",
"year": 2023,
"year_released": 2023,
"region": "GB",
"region_name": "United Kingdom",
"description": "Diesel combustion emission factor",
"unit_type": ["Volume", "Weight"],
"supported_calculation_methods": ["ar5"],
"factor": 2.5179,
"factor_calculation_method": "ar5",
"factor_calculation_origin": "source",
"constituent_gases": {
"co2e_total": 2.5179,
"co2": 2.5148,
"ch4": 0.0009,
"n2o": 0.0022
}
}
主なデータソース
| ソース | 国/地域 | アップデート | 対象分野 |
|---|---|---|---|
| DEFRA/BEIS | UK | 年間 | エネルギー、輸送、購入、資材 |
| EPA | アメリカ合衆国 | 年間 | エネルギー、産業プロセス、農業 |
| IEA | グローバル | 年間 | 国別の電力、一次エネルギー |
| エコ発明 | グローバル | 半年ごと | 完全なLCA、サプライチェーン、材料 |
| アデメ | フランス | 年間 | 石炭ベース、輸送、FR エネルギー |
| EEA | EU | 年間 | ヨーロッパのグリッド電力、部門別排出量 |
| イスプラ | イタリア | 年間 | 国家 GHG インベントリ、IT 電力構成 |
4. 活動ベースの推定:具体的な活動に関する計算
アプローチ アクティビティベースの が最も正確です: に基づいています 消費された燃料のリットル、電力のkWh、 移動キロメートル、または購入した大量の資材。運用データの収集が必要 詳細ですが、科学的に堅牢な結果が得られます。
例: 自社車両の排出量の計算 (スコープ 1)
# activity_based_estimation.py
import httpx
from dataclasses import dataclass
from enum import Enum
class FuelType(str, Enum):
DIESEL = "fuel_type-diesel"
PETROL = "fuel_type-petrol"
HVO = "fuel_type-hvo_biodiesel"
LPG = "fuel_type-lpg"
CNG = "fuel_type-cng"
@dataclass
class VehicleTrip:
vehicle_id: str
fuel_type: FuelType
litres_consumed: float
region: str = "IT"
async def estimate_fleet_scope1(
api_key: str,
trips: list[VehicleTrip],
data_version: str = "^21"
) -> dict:
"""
Calcola Scope 1 per flotta aziendale con activity-based estimation.
Returns aggregato e dettaglio per veicolo.
"""
BASE_URL = "https://beta3.api.climatiq.io"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# Costruisce le richieste batch (max 100 per chiamata)
batch_requests = [
{
"emission_factor": {
"activity_id": trip.fuel_type.value,
"data_version": data_version,
"region": trip.region
},
"parameters": {
"volume": trip.litres_consumed,
"volume_unit": "l"
}
}
for trip in trips
]
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{BASE_URL}/batch/estimate",
headers=headers,
json={"requests": batch_requests}
)
response.raise_for_status()
results = response.json()["results"]
# Aggrega risultati per veicolo
vehicle_emissions = {}
total_co2e_kg = 0.0
for trip, result in zip(trips, results):
if "error" in result:
print(f"Errore per veicolo {trip.vehicle_id}: {result['error']}")
continue
co2e_kg = result["co2e"]
total_co2e_kg += co2e_kg
if trip.vehicle_id not in vehicle_emissions:
vehicle_emissions[trip.vehicle_id] = {
"total_co2e_kg": 0.0,
"fuel_type": trip.fuel_type.value,
"trips": 0
}
vehicle_emissions[trip.vehicle_id]["total_co2e_kg"] += co2e_kg
vehicle_emissions[trip.vehicle_id]["trips"] += 1
return {
"scope": "scope_1",
"total_co2e_kg": total_co2e_kg,
"total_co2e_tco2e": total_co2e_kg / 1000,
"vehicles": vehicle_emissions,
"vehicle_count": len(vehicle_emissions)
}
# Utilizzo
import asyncio
async def main():
trips = [
VehicleTrip("VAN-001", FuelType.DIESEL, 120.5),
VehicleTrip("VAN-002", FuelType.DIESEL, 98.3),
VehicleTrip("TRUCK-001", FuelType.HVO, 245.0),
VehicleTrip("CAR-001", FuelType.PETROL, 42.1),
]
result = await estimate_fleet_scope1(
api_key="clq_live_your_key_here",
trips=trips
)
print(f"Scope 1 totale: {result['total_co2e_tco2e']:.2f} tCO2e")
for vid, data in result["vehicles"].items():
print(f" {vid}: {data['total_co2e_kg']:.1f} kg CO2e")
asyncio.run(main())
電力排出量の算出(スコープ2)
スコープ 2 では、ロケーション ベース (ネットワーク ミックス) と 市場ベース (残留ミックスファクターまたは PPA)。両方の値をレポートで報告する必要があります。 GHGプロトコルの開示。
# scope2_electricity.py
from enum import Enum
class Scope2Method(str, Enum):
LOCATION_BASED = "location_based"
MARKET_BASED = "market_based"
async def estimate_scope2_electricity(
client: httpx.AsyncClient,
api_key: str,
kwh_consumed: float,
region: str,
method: Scope2Method,
renewable_percentage: float = 0.0,
data_version: str = "^21"
) -> dict:
"""
Scope 2 con metodo location-based o market-based.
- location_based: usa il mix elettrico della rete locale
- market_based: usa residual mix o certificati (IRECs, GOs)
"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# Market-based: se 100% rinnovabile certificato, le emissioni sono zero
if method == Scope2Method.MARKET_BASED and renewable_percentage >= 100.0:
return {
"co2e_kg": 0.0,
"co2e_tco2e": 0.0,
"method": method.value,
"note": "100% energie rinnovabili certificate - emissioni zero"
}
# Activity ID diverso per location-based vs market-based
if method == Scope2Method.LOCATION_BASED:
activity_id = "electricity-supply_grid-source_residual_mix"
else:
# Market-based usa residual mix (esclude RES certificate)
activity_id = "electricity-supply_grid-source_residual_mix"
# Riduzione proporzionale per rinnovabili parziali (market-based)
effective_kwh = kwh_consumed
if method == Scope2Method.MARKET_BASED and renewable_percentage > 0:
effective_kwh = kwh_consumed * (1 - renewable_percentage / 100.0)
payload = {
"emission_factor": {
"activity_id": activity_id,
"data_version": data_version,
"region": region
},
"parameters": {
"energy": effective_kwh,
"energy_unit": "kWh"
}
}
response = await client.post(
"https://beta3.api.climatiq.io/estimate",
headers=headers,
json=payload
)
response.raise_for_status()
data = response.json()
return {
"co2e_kg": data["co2e"],
"co2e_tco2e": data["co2e"] / 1000,
"co2e_unit": data["co2e_unit"],
"method": method.value,
"kwh_consumed": kwh_consumed,
"effective_kwh": effective_kwh,
"renewable_percentage": renewable_percentage,
"emission_factor": data["emission_factor"],
"region": region
}
5. 支出ベースの推定: 支出データからの計算
正確なアクティビティ データがない場合のアプローチ 支出ベースの アクティビティの代理として通貨支出を使用します。 Climatiq は排出係数を適用します データに基づく、購入カテゴリー別の経済性(支出したユーロ/ドルあたりの CO₂e kg) OECD または EXIOBASE テーブルの入出力。スコープ 3 の最も一般的な方法です。 カテゴリ 1 (購入した商品およびサービス)。
支出ベースの精度とアクティビティベースの精度
支出ベースの手法では、不確実性のある推定値が生成されます。 30~100%、 アクティビティベースのメソッドの 5 ~ 15% と比較して。データがない場合にのみ使用してください 予備選挙。 GHG プロトコルはこれらを出発点として受け入れますが、進歩が必要です 活動データの改善。
# scope3_spend_based.py
from typing import Optional
import httpx
# Mappa categorie NACE su activity_id Climatiq per spend-based
CATEGORY_TO_ACTIVITY_ID = {
# Settore IT e servizi digitali
"it_services": "professional_services-type_professional_services",
"cloud_hosting": "professional_services-type_professional_services",
"software_licenses": "professional_services-type_professional_services",
# Logistica e trasporti
"freight_road": "transport-type_freight_vehicle",
"freight_air": "transport-type_air_freight",
"courier_services": "transport-type_freight_vehicle",
# Produzione e manifattura
"raw_materials_steel": "steel-type_steel_products",
"raw_materials_plastic": "plastics-type_plastic_products",
"packaging": "paper-type_paper_products",
# Servizi professionali
"legal_services": "professional_services-type_professional_services",
"consulting": "professional_services-type_professional_services",
"marketing": "professional_services-type_professional_services",
# Utilities e energia
"electricity_bill": "electricity-supply_grid-source_residual_mix",
"gas_bill": "fuel_type-natural_gas",
}
async def calculate_scope3_spend_based(
api_key: str,
purchases: list[dict],
region: str = "IT",
currency: str = "EUR",
data_version: str = "^21"
) -> dict:
"""
Calcola Scope 3.1 con metodo spend-based.
purchases: [{"category": "it_services", "amount": 50000}, ...]
"""
BASE_URL = "https://beta3.api.climatiq.io"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
batch_requests = []
for purchase in purchases:
activity_id = CATEGORY_TO_ACTIVITY_ID.get(purchase["category"])
if not activity_id:
print(f"Categoria non mappata: {purchase['category']}")
continue
batch_requests.append({
"emission_factor": {
"activity_id": activity_id,
"data_version": data_version,
"region": region
},
"parameters": {
"money": purchase["amount"],
"money_unit": currency.lower()
}
})
# Chunking per batch limite 100
results_all = []
chunk_size = 100
async with httpx.AsyncClient(timeout=60.0) as client:
for i in range(0, len(batch_requests), chunk_size):
chunk = batch_requests[i:i + chunk_size]
response = await client.post(
f"{BASE_URL}/batch/estimate",
headers=headers,
json={"requests": chunk}
)
response.raise_for_status()
results_all.extend(response.json()["results"])
# Aggrega per categoria
total_co2e_kg = 0.0
category_breakdown = {}
for purchase, result in zip(purchases, results_all):
if "error" in result:
category_breakdown[purchase["category"]] = {
"error": result["error"],
"amount": purchase["amount"]
}
continue
co2e_kg = result.get("co2e", 0.0)
total_co2e_kg += co2e_kg
cat = purchase["category"]
if cat not in category_breakdown:
category_breakdown[cat] = {
"total_co2e_kg": 0.0,
"total_spend": 0.0,
"intensity_kg_per_eur": 0.0
}
category_breakdown[cat]["total_co2e_kg"] += co2e_kg
category_breakdown[cat]["total_spend"] += purchase["amount"]
# Calcola intensità per categoria
for cat_data in category_breakdown.values():
if "total_spend" in cat_data and cat_data["total_spend"] > 0:
cat_data["intensity_kg_per_eur"] = (
cat_data["total_co2e_kg"] / cat_data["total_spend"]
)
return {
"scope": "scope_3.1",
"method": "spend_based",
"total_co2e_kg": total_co2e_kg,
"total_co2e_tco2e": total_co2e_kg / 1000,
"currency": currency,
"categories": category_breakdown,
"uncertainty_note": "Incertezza stimata 30-100% (metodo spend-based)"
}
6. GHG プロトコルの統合: マッピングスコープ 1、2、3
完全な炭素会計システムを構築するには、すべての炭素会計をマッピングする必要があります。 特定のエンドポイントと Climatiq activity_id への GHG プロトコル カテゴリ。このセクション 最も関連性の高いカテゴリの完全なマッピングを提供します。
| スコープ GHG プロトコル | カテゴリ | エンドポイント気候 | 方法 |
|---|---|---|---|
| スコープ1 | 定常燃焼(加熱) | /estimate |
活動量(量) |
| スコープ1 | 移動式燃焼(フリート) | /batch/estimate |
活動量(量) |
| スコープ1 | 逃散的排出物(冷媒) | /estimate |
活動量(体重) |
| スコープ2 | 電気(ロケーションベース) | /energy |
活動量(kWh) |
| スコープ2 | 電力(市場ベース) | /energy |
活動量 (kWh、残留混合) |
| スコープ3.1 | 購入した商品やサービス | /procurement |
支出ベース (EUR) |
| スコープ3.4 | 上流輸送 | /freight |
活動量(トンキロ) |
| スコープ3.6 | 出張(飛行機) | /travel/flights |
アクティビティ (IATA コード) |
| スコープ3.6 | 出張(車・電車) | /estimate |
アクティビティ (km) |
| スコープ3.7 | 従業員の通勤 | /batch/estimate |
アクティビティ (km、ハーフ) |
| スコープ3.11 | 販売した商品の使用 | /estimate |
アクティビティ/支出 |
# ghg_protocol_calculator.py
# Sistema unificato per calcolo GHG Protocol completo
from dataclasses import dataclass, field
from typing import Optional
import asyncio
import httpx
@dataclass
class GHGProtocolReport:
"""Report GHG Protocol completo per anno fiscale."""
year: int
company_name: str
reporting_boundary: str = "operational_control"
# Scope 1
scope1_combustion_kg: float = 0.0
scope1_mobile_kg: float = 0.0
scope1_fugitive_kg: float = 0.0
# Scope 2
scope2_location_based_kg: float = 0.0
scope2_market_based_kg: float = 0.0
# Scope 3 (categorie principali)
scope3_cat1_purchased_goods_kg: float = 0.0
scope3_cat4_upstream_transport_kg: float = 0.0
scope3_cat6_business_travel_kg: float = 0.0
scope3_cat7_employee_commuting_kg: float = 0.0
# Metadata per audit trail
calculation_date: Optional[str] = None
data_version: str = ""
sources: list[str] = field(default_factory=list)
@property
def scope1_total_kg(self) -> float:
return (self.scope1_combustion_kg +
self.scope1_mobile_kg +
self.scope1_fugitive_kg)
@property
def scope2_selected_kg(self) -> float:
"""Market-based se disponibile, altrimenti location-based."""
return (self.scope2_market_based_kg
if self.scope2_market_based_kg > 0
else self.scope2_location_based_kg)
@property
def scope3_total_kg(self) -> float:
return (self.scope3_cat1_purchased_goods_kg +
self.scope3_cat4_upstream_transport_kg +
self.scope3_cat6_business_travel_kg +
self.scope3_cat7_employee_commuting_kg)
@property
def grand_total_tco2e(self) -> float:
return (self.scope1_total_kg +
self.scope2_selected_kg +
self.scope3_total_kg) / 1000
def to_csrd_dict(self) -> dict:
"""Output formato CSRD/ESRS E1-6."""
return {
"reporting_year": self.year,
"entity": self.company_name,
"ghg_emissions_location_based": {
"scope_1": self.scope1_total_kg / 1000,
"scope_2_location": self.scope2_location_based_kg / 1000,
"scope_3": self.scope3_total_kg / 1000,
"unit": "tCO2e"
},
"ghg_emissions_market_based": {
"scope_1": self.scope1_total_kg / 1000,
"scope_2_market": self.scope2_market_based_kg / 1000,
"scope_3": self.scope3_total_kg / 1000,
"unit": "tCO2e"
},
"data_version": self.data_version,
"methodology": "GHG Protocol Corporate Standard"
}
7. 堅牢な Python クライアント: 再試行、キャッシュ、エラー処理
運用環境では、API 呼び出しは一時的なエラーに対する回復力を備えている必要があります。 レート制限を設定し、効果的なキャッシュにより通話を最小限に抑えます。これは本番環境に対応したクライアントです。
# climatiq_client.py
import asyncio
import hashlib
import json
import logging
import time
from dataclasses import dataclass
from typing import Any, Optional
import httpx
import redis.asyncio as redis
logger = logging.getLogger(__name__)
class ClimatiqAPIError(Exception):
"""Errore API Climatiq con context."""
def __init__(self, message: str, status_code: int, response_body: dict):
super().__init__(message)
self.status_code = status_code
self.response_body = response_body
class RateLimitError(ClimatiqAPIError):
"""Rate limit superato - aspetta prima di riprovare."""
pass
class ClimatiqClient:
"""
Client asincrono per Climatiq API con:
- Retry automatico con exponential backoff
- Cache Redis per ridurre le chiamate API
- Logging strutturato per audit trail
- Gestione rate limit con rispetto dei retry-after header
"""
BASE_URL = "https://beta3.api.climatiq.io"
MAX_RETRIES = 3
BATCH_SIZE = 100 # Limite Climatiq
def __init__(
self,
api_key: str,
redis_client: Optional[redis.Redis] = None,
cache_ttl: int = 86400, # 24 ore - i fattori cambiano raramente
data_version: str = "^21"
):
self.api_key = api_key
self.redis = redis_client
self.cache_ttl = cache_ttl
self.data_version = data_version
self._http_client: Optional[httpx.AsyncClient] = None
async def __aenter__(self):
self._http_client = httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=10.0),
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
)
return self
async def __aexit__(self, *args):
if self._http_client:
await self._http_client.aclose()
def _cache_key(self, payload: dict) -> str:
"""Genera chiave cache deterministica da payload."""
payload_str = json.dumps(payload, sort_keys=True)
return f"climatiq:{hashlib.sha256(payload_str.encode()).hexdigest()[:16]}"
async def _get_cached(self, key: str) -> Optional[dict]:
"""Recupera risultato dalla cache Redis."""
if not self.redis:
return None
try:
cached = await self.redis.get(key)
if cached:
logger.debug(f"Cache HIT: {key}")
return json.loads(cached)
except Exception as e:
logger.warning(f"Errore cache GET: {e}")
return None
async def _set_cached(self, key: str, value: dict) -> None:
"""Salva risultato nella cache Redis."""
if not self.redis:
return
try:
await self.redis.setex(key, self.cache_ttl, json.dumps(value))
logger.debug(f"Cache SET: {key} (TTL: {self.cache_ttl}s)")
except Exception as e:
logger.warning(f"Errore cache SET: {e}")
async def _request_with_retry(
self, method: str, endpoint: str, payload: dict
) -> dict:
"""HTTP request con retry e exponential backoff."""
url = f"{self.BASE_URL}{endpoint}"
last_error = None
for attempt in range(self.MAX_RETRIES):
try:
response = await self._http_client.request(
method, url, json=payload
)
if response.status_code == 429: # Rate limit
retry_after = int(response.headers.get("Retry-After", 60))
logger.warning(
f"Rate limit superato. Aspetto {retry_after}s..."
)
await asyncio.sleep(retry_after)
continue
if response.status_code >= 400:
body = response.json() if response.content else {}
raise ClimatiqAPIError(
f"Errore API {response.status_code}: {body.get('error', 'Unknown')}",
status_code=response.status_code,
response_body=body
)
return response.json()
except httpx.NetworkError as e:
wait_time = 2 ** attempt
logger.warning(
f"Network error (attempt {attempt+1}/{self.MAX_RETRIES}), "
f"retry in {wait_time}s: {e}"
)
last_error = e
await asyncio.sleep(wait_time)
raise ConnectionError(
f"Falliti {self.MAX_RETRIES} tentativi: {last_error}"
)
async def estimate(
self,
activity_id: str,
parameters: dict,
region: Optional[str] = None
) -> dict:
"""
Stima singola con cache automatica.
activity_id: es. "electricity-supply_grid-source_residual_mix"
parameters: es. {"energy": 1000, "energy_unit": "kWh"}
"""
payload = {
"emission_factor": {
"activity_id": activity_id,
"data_version": self.data_version,
**({"region": region} if region else {})
},
"parameters": parameters
}
cache_key = self._cache_key(payload)
if cached := await self._get_cached(cache_key):
return cached
result = await self._request_with_retry("POST", "/estimate", payload)
await self._set_cached(cache_key, result)
logger.info(
"Estimate: activity=%(activity)s region=%(region)s "
"co2e=%(co2e).4f kg",
{
"activity": activity_id,
"region": region or "global",
"co2e": result.get("co2e", 0)
}
)
return result
async def batch_estimate(
self, requests: list[dict]
) -> list[dict]:
"""
Batch estimation con chunking automatico (100 per batch max).
"""
all_results = []
for i in range(0, len(requests), self.BATCH_SIZE):
chunk = requests[i:i + self.BATCH_SIZE]
payload = {"requests": chunk}
response = await self._request_with_retry(
"POST", "/batch/estimate", payload
)
all_results.extend(response.get("results", []))
# Piccola pausa tra chunk grandi per rispettare rate limit
if len(requests) > self.BATCH_SIZE:
await asyncio.sleep(0.5)
return all_results
8. Axios を使用した TypeScript/Node.js クライアント
Node.js、TypeScript、または NestJS バックエンド アプリケーションの場合、型指定されたクライアントは次のとおりです。 これにより、完全な型安全性を備えた Python クライアントと同じ機能が公開されます。
// climatiq-client.ts
import axios, { AxiosInstance, AxiosError } from 'axios';
// Types per la Climatiq API
export interface EmissionFactor {
activity_id: string;
data_version: string;
region?: string;
}
export interface EstimateParameters {
energy?: number;
energy_unit?: 'kWh' | 'MWh' | 'GJ';
volume?: number;
volume_unit?: 'l' | 'gallon' | 'm3';
weight?: number;
weight_unit?: 'kg' | 't' | 'lb';
money?: number;
money_unit?: 'eur' | 'usd' | 'gbp';
distance?: number;
distance_unit?: 'km' | 'mi';
}
export interface EstimateRequest {
emission_factor: EmissionFactor;
parameters: EstimateParameters;
}
export interface ConstituentGases {
co2e_total: number;
co2?: number;
ch4?: number;
n2o?: number;
}
export interface EstimateResponse {
co2e: number;
co2e_unit: string;
co2e_calculation_method: string;
emission_factor: {
activity_id: string;
source: string;
year: number;
region: string;
category: string;
data_quality_flags: string[];
};
constituent_gases: ConstituentGases;
}
export interface BatchEstimateResult {
co2e?: number;
co2e_unit?: string;
error?: string;
constituent_gases?: ConstituentGases;
}
export class ClimatiqAPIError extends Error {
constructor(
message: string,
public readonly statusCode: number,
public readonly responseBody: unknown
) {
super(message);
this.name = 'ClimatiqAPIError';
}
}
export class ClimatiqClient {
private readonly http: AxiosInstance;
private readonly cache = new Map<string, { data: unknown; expiresAt: number }>();
private readonly cacheTtlMs: number;
constructor(
private readonly apiKey: string,
private readonly dataVersion = '^21',
cacheTtlSeconds = 3600
) {
this.cacheTtlMs = cacheTtlSeconds * 1000;
this.http = axios.create({
baseURL: 'https://beta3.api.climatiq.io',
headers: {
Authorization: `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
timeout: 30_000,
});
}
private cacheKey(payload: unknown): string {
return JSON.stringify(payload);
}
private getFromCache<T>(key: string): T | null {
const entry = this.cache.get(key);
if (!entry) return null;
if (Date.now() > entry.expiresAt) {
this.cache.delete(key);
return null;
}
return entry.data as T;
}
private setCache(key: string, data: unknown): void {
this.cache.set(key, {
data,
expiresAt: Date.now() + this.cacheTtlMs,
});
}
private handleAxiosError(error: AxiosError): never {
if (error.response) {
const status = error.response.status;
const body = error.response.data as Record<string, unknown>;
if (status === 429) {
throw new ClimatiqAPIError(
'Rate limit superato. Riprova tra qualche momento.',
429,
body
);
}
throw new ClimatiqAPIError(
`Climatiq API error ${status}: ${body['error'] ?? 'Unknown'}`,
status,
body
);
}
throw new ClimatiqAPIError(
`Network error: ${error.message}`,
0,
null
);
}
async estimate(
activityId: string,
parameters: EstimateParameters,
region?: string
): Promise<EstimateResponse> {
const payload: EstimateRequest = {
emission_factor: {
activity_id: activityId,
data_version: this.dataVersion,
...(region ? { region } : {}),
},
parameters,
};
const key = this.cacheKey(payload);
const cached = this.getFromCache<EstimateResponse>(key);
if (cached) return cached;
try {
const { data } = await this.http.post<EstimateResponse>(
'/estimate',
payload
);
this.setCache(key, data);
return data;
} catch (error) {
if (axios.isAxiosError(error)) this.handleAxiosError(error);
throw error;
}
}
async batchEstimate(
requests: EstimateRequest[]
): Promise<BatchEstimateResult[]> {
const CHUNK_SIZE = 100;
const allResults: BatchEstimateResult[] = [];
for (let i = 0; i < requests.length; i += CHUNK_SIZE) {
const chunk = requests.slice(i, i + CHUNK_SIZE);
try {
const { data } = await this.http.post<{ results: BatchEstimateResult[] }>(
'/batch/estimate',
{ requests: chunk }
);
allResults.push(...data.results);
} catch (error) {
if (axios.isAxiosError(error)) this.handleAxiosError(error);
throw error;
}
// Throttle tra chunk multipli
if (i + CHUNK_SIZE < requests.length) {
await new Promise(resolve => setTimeout(resolve, 200));
}
}
return allResults;
}
async estimateFlight(
originIata: string,
destinationIata: string,
passengers: number,
cabinClass: 'economy' | 'business' | 'first' = 'economy'
): Promise<EstimateResponse> {
try {
const { data } = await this.http.post<EstimateResponse>(
'/travel/flights',
{
legs: [{
from: originIata,
to: destinationIata,
passengers,
cabin_class: cabinClass,
}],
}
);
return data;
} catch (error) {
if (axios.isAxiosError(error)) this.handleAxiosError(error);
throw error;
}
}
}
TypeScript クライアントの使用
// usage-example.ts
import { ClimatiqClient } from './climatiq-client';
const client = new ClimatiqClient(
process.env['CLIMATIQ_API_KEY'] ?? '',
'^21',
3600
);
// Calcolo emissioni elettricità ufficio
async function calculateOfficeElectricity() {
const result = await client.estimate(
'electricity-supply_grid-source_residual_mix',
{ energy: 5000, energy_unit: 'kWh' },
'IT'
);
console.log(`Emissioni ufficio: ${result.co2e.toFixed(2)} kg CO2e`);
console.log(`Fonte: ${result.emission_factor.source} (${result.emission_factor.year})`);
return result;
}
// Calcolo batch per fleet management
async function calculateFleetEmissions(
vehicles: Array<{ id: string; litres: number; fuel: string }>
) {
const requests = vehicles.map(v => ({
emission_factor: {
activity_id: `fuel_type-${v.fuel}`,
data_version: '^21',
region: 'IT',
},
parameters: {
volume: v.litres,
volume_unit: 'l' as const,
},
}));
const results = await client.batchEstimate(requests);
return vehicles.map((v, i) => ({
vehicleId: v.id,
co2eKg: results[i].co2e ?? 0,
error: results[i].error,
}));
}
// Calcolo volo business travel
async function calculateBusinessFlight() {
const result = await client.estimateFlight('MXP', 'LHR', 2, 'economy');
console.log(`Volo MXP-LHR (2 pax): ${result.co2e.toFixed(1)} kg CO2e`);
}
9. SaaS 用リアルタイム炭素計算ツール
Climatiq の強力な使用例は、 カーボンラベル 電子商取引または SaaS 製品へ: ユーザーに二酸化炭素への影響を示す 確認する前に注文やアクションの見積もりを行うこと。これにより透明度が高まります 情報に基づいた消費者の選択をサポートします。
アーキテクチャ: 電子商取引用の Carbon Label API
# carbon_label_api.py
# FastAPI endpoint per carbon label real-time su prodotti e-commerce
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from pydantic import BaseModel, Field
from typing import Optional
import asyncio
app = FastAPI(title="Carbon Label API", version="1.0.0")
class ProductCarbonRequest(BaseModel):
"""Richiesta calcolo carbon label per carrello."""
items: list[dict] = Field(
description="Lista prodotti con categoria e peso",
example=[
{"product_id": "SKU-123", "category": "electronics", "weight_kg": 0.5, "quantity": 1},
{"product_id": "SKU-456", "category": "clothing", "weight_kg": 0.3, "quantity": 2}
]
)
shipping_method: str = Field(
default="road",
description="Metodo spedizione: road, air, sea"
)
destination_region: str = Field(
default="IT",
description="Regione destinazione ISO 3166-2"
)
origin_region: str = Field(
default="CN",
description="Regione origine/produzione"
)
class CarbonLabelResponse(BaseModel):
"""Risposta con carbon label completa."""
total_co2e_kg: float
breakdown: dict
label: str # "A", "B", "C", "D", "E" come etichetta energetica EU
label_color: str
offset_cost_eur: float # Stima costo compensazione
trees_equivalent: float # Equivalente alberi annui
km_car_equivalent: float # Equivalente km auto
# Mappatura categoria prodotto su activity_id per produzione
PRODUCT_CATEGORY_PRODUCTION = {
"electronics": "electrical_equipment-type_small_electronics",
"clothing": "textiles-type_clothing",
"food": "food-type_mixed",
"furniture": "furniture-type_mixed",
"books": "paper-type_books",
"plastics": "plastics-type_plastic_products",
}
# Mappatura metodo spedizione su activity_id
SHIPPING_ACTIVITY_ID = {
"road": "transport-type_freight_vehicle-fuel_source_diesel-vehicle_type_hgv",
"air": "transport-type_air_freight",
"sea": "transport-type_sea_freight-route_type_container_ship",
}
def calculate_carbon_label(co2e_kg: float) -> tuple[str, str]:
"""
Calcola etichetta A-E basata su impatto carbonico.
Soglie ispirate alla proposta EU eco-label per e-commerce.
"""
if co2e_kg < 0.5:
return "A", "#2ecc71" # Verde - impatto molto basso
elif co2e_kg < 1.5:
return "B", "#27ae60" # Verde scuro - impatto basso
elif co2e_kg < 5.0:
return "C", "#f39c12" # Arancione - impatto medio
elif co2e_kg < 15.0:
return "D", "#e67e22" # Arancione scuro - impatto alto
else:
return "E", "#e74c3c" # Rosso - impatto molto alto
@app.post("/api/v1/carbon-label", response_model=CarbonLabelResponse)
async def get_carbon_label(
request: ProductCarbonRequest,
climatiq: ClimatiqClient = Depends(get_climatiq_client)
):
"""
Calcola carbon label real-time per un carrello e-commerce.
Considera produzione + imballaggio + spedizione.
"""
batch_requests = []
# 1. Emissioni produzione per ciascun prodotto
for item in request.items:
activity_id = PRODUCT_CATEGORY_PRODUCTION.get(
item["category"],
"manufactured_goods-type_mixed" # Fallback generico
)
# Calcolo per peso totale (qty * peso)
total_weight = item["weight_kg"] * item.get("quantity", 1)
batch_requests.append({
"emission_factor": {
"activity_id": activity_id,
"data_version": "^21",
"region": request.origin_region
},
"parameters": {
"weight": total_weight,
"weight_unit": "kg"
}
})
# 2. Emissioni spedizione (distanza stimata)
total_weight_kg = sum(
i["weight_kg"] * i.get("quantity", 1)
for i in request.items
)
# Stima distanza basata su regioni (semplificata)
estimated_distance_km = _estimate_distance(
request.origin_region,
request.destination_region
)
shipping_activity = SHIPPING_ACTIVITY_ID.get(
request.shipping_method,
SHIPPING_ACTIVITY_ID["road"]
)
# Freight: tonne * km = tonne-km
tonne_km = (total_weight_kg / 1000) * estimated_distance_km
batch_requests.append({
"emission_factor": {
"activity_id": shipping_activity,
"data_version": "^21"
},
"parameters": {
"weight": total_weight_kg,
"weight_unit": "kg",
"distance": estimated_distance_km,
"distance_unit": "km"
}
})
# Chiamata batch a Climatiq
try:
results = await climatiq.batch_estimate(batch_requests)
except ClimatiqAPIError as e:
raise HTTPException(
status_code=502,
detail=f"Errore calcolo emissioni: {str(e)}"
)
# Aggrega risultati
production_co2e = sum(
r.get("co2e", 0) for r in results[:-1] # Tutti tranne ultimo (spedizione)
)
shipping_co2e = results[-1].get("co2e", 0) if results else 0
total_co2e = production_co2e + shipping_co2e
label, color = calculate_carbon_label(total_co2e)
# Calcola equivalenze intuitive per l'utente
# Offset market rate ~15 EUR/tCO2e (mercato volontario 2025)
offset_cost = (total_co2e / 1000) * 15.0
# Un albero assorbe ~22 kg CO2/anno
trees_equivalent = total_co2e / 22.0
# Auto media emette ~0.21 kg CO2/km
km_car = total_co2e / 0.21
return CarbonLabelResponse(
total_co2e_kg=round(total_co2e, 3),
breakdown={
"production_kg": round(production_co2e, 3),
"shipping_kg": round(shipping_co2e, 3),
"shipping_method": request.shipping_method,
"distance_km": estimated_distance_km
},
label=label,
label_color=color,
offset_cost_eur=round(offset_cost, 2),
trees_equivalent=round(trees_equivalent, 1),
km_car_equivalent=round(km_car, 1)
)
def _estimate_distance(origin: str, destination: str) -> float:
"""Stima distanza tra regioni in km (lookup semplificato)."""
DISTANCES = {
("CN", "IT"): 9_000,
("DE", "IT"): 950,
("IT", "IT"): 300,
("US", "IT"): 8_500,
("IN", "IT"): 7_200,
}
key = (origin[:2].upper(), destination[:2].upper())
return DISTANCES.get(key, 5_000) # Default 5000km se sconosciuto
カーボンラベルを表示するフロントエンドウィジェット
// carbon-label.component.ts (Angular/TypeScript)
import { Component, Input, OnChanges, SimpleChanges } from '@angular/core';
import { HttpClient } from '@angular/common/http';
interface CarbonLabel {
total_co2e_kg: number;
label: string;
label_color: string;
trees_equivalent: number;
km_car_equivalent: number;
offset_cost_eur: number;
}
@Component({
selector: 'app-carbon-label',
template: `
<div class="carbon-label" *ngIf="carbonData">
<div class="label-badge" [style.background]="carbonData.label_color">
{{ carbonData.label }}
</div>
<div class="carbon-info">
<span class="co2e">{{ carbonData.total_co2e_kg | number:'1.1-2' }} kg CO₂e</span>
<span class="equivalent">
= {{ carbonData.km_car_equivalent | number:'1.0-0' }} km in auto
</span>
<button (click)="offsetCarbon()" class="offset-btn">
Compensa per €{{ carbonData.offset_cost_eur | number:'1.2-2' }}
</button>
</div>
</div>
`
})
export class CarbonLabelComponent implements OnChanges {
@Input() cartItems: Array<{ product_id: string; category: string; weight_kg: number; quantity: number }> = [];
@Input() shippingMethod = 'road';
carbonData: CarbonLabel | null = null;
loading = false;
constructor(private http: HttpClient) {}
ngOnChanges(changes: SimpleChanges): void {
if (changes['cartItems'] || changes['shippingMethod']) {
this.loadCarbonLabel();
}
}
private loadCarbonLabel(): void {
if (!this.cartItems.length) return;
this.loading = true;
this.http.post<CarbonLabel>('/api/v1/carbon-label', {
items: this.cartItems,
shipping_method: this.shippingMethod,
destination_region: 'IT',
origin_region: 'CN',
}).subscribe({
next: (data) => {
this.carbonData = data;
this.loading = false;
},
error: (err) => {
console.error('Errore carbon label:', err);
this.loading = false;
},
});
}
offsetCarbon(): void {
// Integrazione con piattaforma di offsetting
window.open('https://example.com/offset', '_blank');
}
}
10. APIのテストとモック
CI/CD では、Climatiq API を実際に呼び出す必要はありません。構造化する方法は次のとおりです 成功と失敗の両方の応答をシミュレートする堅牢なモックを使用したテスト。
# tests/conftest.py
import pytest
from unittest.mock import AsyncMock, MagicMock
from app.climatiq_client import ClimatiqClient
@pytest.fixture
def mock_climatiq_client():
"""
Mock del client Climatiq per test unitari.
Simula risposte realistiche senza chiamate API reali.
"""
client = AsyncMock(spec=ClimatiqClient)
# Risposta standard per stima elettricità italiana
electricity_response = {
"co2e": 415.0,
"co2e_unit": "kg",
"co2e_calculation_method": "ar5",
"emission_factor": {
"activity_id": "electricity-supply_grid-source_residual_mix",
"source": "IEA",
"year": 2022,
"region": "IT",
"category": "Electricity",
"data_quality_flags": []
},
"constituent_gases": {
"co2e_total": 415.0,
"co2": 415.0,
"ch4": None,
"n2o": None
}
}
# Risposta per diesel
diesel_response = {
"co2e": 302.15,
"co2e_unit": "kg",
"co2e_calculation_method": "ar5",
"emission_factor": {
"activity_id": "fuel_type-diesel",
"source": "DEFRA",
"year": 2023,
"region": "IT",
"category": "Fuel"
},
"constituent_gases": {
"co2e_total": 302.15,
"co2": 301.5,
"ch4": 0.02,
"n2o": 0.63
}
}
# Configura mock per rispondere in base all'activity_id
async def mock_estimate(activity_id, parameters, region=None):
if "electricity" in activity_id:
kwh = parameters.get("energy", 1000)
return {**electricity_response, "co2e": kwh * 0.415}
elif "diesel" in activity_id:
litres = parameters.get("volume", 100)
return {**diesel_response, "co2e": litres * 2.52}
return electricity_response
client.estimate = AsyncMock(side_effect=mock_estimate)
client.batch_estimate = AsyncMock(return_value=[electricity_response])
return client
# tests/test_activity_based.py
import pytest
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_fleet_scope1_calculates_correctly(mock_climatiq_client):
"""Test che il calcolo Scope 1 flotta produca risultati corretti."""
from app.services.fleet_calculator import calculate_fleet_scope1
result = await calculate_fleet_scope1(
client=mock_climatiq_client,
litres_diesel=100.0,
region="IT"
)
assert result["co2e_kg"] == pytest.approx(252.0, rel=0.01)
assert result["scope"] == "scope_1"
mock_climatiq_client.estimate.assert_called_once()
@pytest.mark.asyncio
async def test_rate_limit_error_is_handled(mock_climatiq_client):
"""Test che rate limit error non lasci il sistema in stato inconsistente."""
from app.climatiq_client import ClimatiqAPIError
mock_climatiq_client.estimate = AsyncMock(
side_effect=ClimatiqAPIError("Rate limit", 429, {"error": "rate_limit"})
)
with pytest.raises(ClimatiqAPIError) as exc_info:
await mock_climatiq_client.estimate(
"electricity-supply_grid-source_residual_mix",
{"energy": 1000, "energy_unit": "kWh"},
"IT"
)
assert exc_info.value.status_code == 429
@pytest.mark.asyncio
async def test_scope2_100_renewable_returns_zero(mock_climatiq_client):
"""Test che 100% rinnovabile dia emissioni zero (market-based)."""
from app.services.scope2_calculator import (
calculate_scope2_electricity, Scope2Method
)
result = await calculate_scope2_electricity(
client=mock_climatiq_client,
kwh_consumed=100_000,
region="IT",
method=Scope2Method.MARKET_BASED,
renewable_percentage=100.0
)
assert result["co2e_kg"] == 0.0
# Non deve chiamare l'API (nessun consumo da rete)
mock_climatiq_client.estimate.assert_not_called()
11. Scala のバッチ推定とキャッシュ戦略
数千のレコード (月次レポート、履歴分析) を処理するアプリケーションの場合、 コストを削減するには、バッチ API 呼び出しと Redis キャッシュの組み合わせが不可欠になります。 処理時間と API コストの両方。
# batch_processor.py
# Elaborazione batch per calcoli di emissioni su larga scala
import asyncio
from datetime import datetime
from typing import AsyncIterator
import redis.asyncio as redis
async def process_monthly_emissions_report(
api_key: str,
records: list[dict],
redis_url: str = "redis://localhost:6379"
) -> dict:
"""
Elabora report mensile emissioni per grandi dataset.
records: lista di attività (flotta, energia, acquisti, etc.)
Restituisce: aggregato mensile Scope 1, 2, 3.
"""
redis_client = await redis.from_url(redis_url)
async with ClimatiqClient(
api_key=api_key,
redis_client=redis_client,
cache_ttl=86400 * 7, # 7 giorni per fattori stabili
data_version="21" # Versione fissa per riproducibilità
) as client:
# Raggruppa per tipo di emissione
scope1_records = [r for r in records if r["scope"] == "scope1"]
scope2_records = [r for r in records if r["scope"] == "scope2"]
scope3_records = [r for r in records if r["scope"] == "scope3"]
# Elabora in parallelo i tre scope
scope1_result, scope2_result, scope3_result = await asyncio.gather(
_process_scope1_batch(client, scope1_records),
_process_scope2_batch(client, scope2_records),
_process_scope3_batch(client, scope3_records)
)
total = (
scope1_result["total_co2e_kg"] +
scope2_result["total_co2e_kg"] +
scope3_result["total_co2e_kg"]
)
await redis_client.aclose()
return {
"report_generated_at": datetime.utcnow().isoformat(),
"total_co2e_kg": total,
"total_co2e_tco2e": total / 1000,
"scope1": scope1_result,
"scope2": scope2_result,
"scope3": scope3_result,
"record_count": len(records)
}
async def _process_scope1_batch(
client: ClimatiqClient, records: list[dict]
) -> dict:
"""Elabora batch Scope 1 con chunking automatico."""
if not records:
return {"total_co2e_kg": 0.0, "record_count": 0}
batch_requests = [
{
"emission_factor": {
"activity_id": r["activity_id"],
"data_version": "21",
"region": r.get("region", "IT")
},
"parameters": {
r["param_key"]: r["param_value"],
f"{r['param_key']}_unit": r["param_unit"]
}
}
for r in records
]
results = await client.batch_estimate(batch_requests)
total = sum(r.get("co2e", 0) for r in results if "error" not in r)
errors = [r for r in results if "error" in r]
if errors:
import logging
logging.warning(f"{len(errors)} errori nel batch Scope 1")
return {
"total_co2e_kg": total,
"record_count": len(records),
"error_count": len(errors)
}
12. Climatiq の代替案: 比較
Climatiq が唯一の選択肢ではありません。主なものを比較するとこんな感じ 適切なソリューションを選択するのに役立つ代替案:
| 解決 | 主力 | 限界 | 理想的な使用例 | 無料プラン |
|---|---|---|---|---|
| 気候 | 190,000 以上の要素、40 以上のソース、堅牢な API | コミュニティプランでは250通話/月 | エンタープライズ、マルチスコープのプロダクション | 250 件の通話/月 |
| カーボンインターフェース | シンプルな API、優れた開発者エクスペリエンス | 要素を減らし、米国に焦点を当てる | スタートアップ、eコマース、配送 | はい(限定的) |
| オープン排出係数 | 無料、オープンソース、Climatiq によって保守されています | データセットのみ、直接 REST API なし | 研究、生データへのアクセス | はい (データセット) |
| 流域API | 監査対応のエンタープライズ機能 | エンタープライズのみ、高価 | 大企業、CSRDレポート | No |
| EPAフライトデータベース | 無料、米国政府 | 米国のみ、REST API は対象外 | 米国の報道、調査 | はい (データセット) |
| カスタム DB (DEFRA/IEA) | フルコントロール、API コストなし | 高価な内部メンテナンス | 専任チームを擁する大規模組織 | はい (公開データ) |
カスタム データベースを使用する場合
場合によっては、排出係数の社内データベースを構築することが合理的です。 これを処理する PostgreSQL スキーマは次のとおりです。
-- Schema PostgreSQL per database fattori di emissione personalizzato
CREATE TABLE IF NOT EXISTS emission_factors (
id SERIAL PRIMARY KEY,
activity_category VARCHAR(200) NOT NULL,
activity_name VARCHAR(500),
region_code VARCHAR(10) NOT NULL,
reference_year INTEGER NOT NULL,
unit_type VARCHAR(50) NOT NULL, -- 'litre', 'kWh', 'tonne', 'EUR'
factor_kg_co2e_per_unit DECIMAL(12, 6) NOT NULL,
source VARCHAR(100) NOT NULL, -- 'DEFRA', 'ISPRA', 'EPA'
source_version VARCHAR(50),
source_priority INTEGER DEFAULT 1,
constituent_co2 DECIMAL(12, 6),
constituent_ch4 DECIMAL(12, 6),
constituent_n2o DECIMAL(12, 6),
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_ef_lookup ON emission_factors
(activity_category, region_code, reference_year, unit_type);
-- Query di lookup con fallback regionale
SELECT factor_kg_co2e_per_unit, source, reference_year
FROM emission_factors
WHERE activity_category = $1
AND reference_year = $2
AND unit_type = $3
AND region_code IN ($4, 'EU', 'GLOBAL') -- Fallback a EU poi globale
ORDER BY
CASE region_code
WHEN $4 THEN 1 -- Regione specifica prima
WHEN 'EU' THEN 2 -- Poi EU
ELSE 3 -- Poi globale
END,
source_priority DESC,
reference_year DESC
LIMIT 1;
カスタム データベースが正しい選択ではない場合
排出係数のデータベースを構築および維持するには、以下が必要です。 2~3ヶ月間専属担当者 初期構築のために、 必須の半年ごとの更新(要素は毎年変化します)、 監査証跡のバージョン管理、および地理的な範囲 非 OECD 地域では欠落しています。ほとんどの企業にとって、 Climatiq は優れた ROI を提供します APIコストを考慮しても。
13. ケーススタディ: カーボンラベルを使用した電子商取引プラットフォーム
エコショップ・イタリア 50,000 点の商品を扱う電子商取引プラットフォームです 1 か月あたり 200,000 件の注文。要件: すべての製品にカーボンラベルを追加する オフセット オプションを使用して、チェックアウト時に推定排出量を表示します。
実装されたアーキテクチャ
- 製品カタログの充実: を事前計算する夜間のジョブ バッチエンドポイントを使用した各 SKU の生産排出量。結果が保存されました 30 日間の TTL で製品データベースに保存されます。
- リアルタイムチェックアウト: チェックアウト時に排出量を動的に計算 カートの重量、配送先地域、選択した方法に基づいて配送します。 目標レイテンシ: 200 ミリ秒未満 (標準出荷の Redis キャッシュを使用)。
- カーボンダッシュボード販売者: 販売者向けの月次レポート スコープ 3.4 (上流輸送) および推定スコープ 3.11 (販売された製品の使用)。
# ecoshop_carbon_service.py
# Servizio di calcolo emissioni per EcoShop Italia
from dataclasses import dataclass
from typing import Optional
import asyncio
from datetime import datetime, timedelta
@dataclass
class ProductEmissionProfile:
"""Profilo emissioni di un prodotto nel catalogo."""
sku: str
production_co2e_kg: float
packaging_co2e_kg: float
category: str
origin_region: str
calculated_at: datetime
climatiq_data_version: str
@property
def total_product_co2e_kg(self) -> float:
return self.production_co2e_kg + self.packaging_co2e_kg
@property
def is_stale(self) -> bool:
"""Profilo è obsoleto se più vecchio di 30 giorni."""
return datetime.utcnow() - self.calculated_at > timedelta(days=30)
class EcoShopCarbonService:
"""
Servizio carbon per EcoShop: gestisce calcoli produzione,
spedizione e report venditori.
"""
def __init__(
self,
climatiq_client: ClimatiqClient,
db_pool, # asyncpg pool
):
self.climatiq = climatiq_client
self.db = db_pool
async def get_product_emission_profile(
self, sku: str
) -> Optional[ProductEmissionProfile]:
"""
Recupera profilo emissioni dal DB.
Se assente o obsoleto, ricalcola via Climatiq.
"""
# Controlla DB
profile = await self._load_from_db(sku)
if profile and not profile.is_stale:
return profile
# Ricalcola
product = await self._get_product_data(sku)
if not product:
return None
new_profile = await self._calculate_product_emissions(product)
await self._save_to_db(new_profile)
return new_profile
async def calculate_checkout_carbon(
self,
cart_items: list[dict],
destination_region: str,
shipping_method: str
) -> dict:
"""
Calcola carbon label per checkout in tempo reale.
Target: < 200ms con cache.
"""
# Recupera profili prodotti (con cache)
profiles = await asyncio.gather(*[
self.get_product_emission_profile(item["sku"])
for item in cart_items
])
# Calcola emissioni produzione
production_co2e = sum(
(profile.total_product_co2e_kg * item.get("quantity", 1))
for profile, item in zip(profiles, cart_items)
if profile is not None
)
# Calcola emissioni spedizione
total_weight_kg = sum(
item.get("weight_kg", 0.5) * item.get("quantity", 1)
for item in cart_items
)
shipping_co2e = await self._estimate_shipping(
total_weight_kg, destination_region, shipping_method
)
total = production_co2e + shipping_co2e
label, color = calculate_carbon_label(total)
return {
"total_co2e_kg": round(total, 3),
"production_co2e_kg": round(production_co2e, 3),
"shipping_co2e_kg": round(shipping_co2e, 3),
"label": label,
"label_color": color,
"offset_price_eur": round((total / 1000) * 15.0, 2),
"generated_at": datetime.utcnow().isoformat()
}
# Risultati dopo 6 mesi
ECOSHOP_METRICS = {
"prodotti_con_carbon_label": 47_832,
"ordini_con_label_mese": 180_000,
"percentuale_utenti_offset": 8.3, # % utenti che comprano offset
"revenue_offset_mensile_eur": 4_200,
"latenza_media_ms": 45, # ms (grazie a cache Redis)
"api_calls_risparmiate_cache": "92%",
"co2e_totale_calcolato_mese_tco2e": 1_240,
"nps_incremento": +12 # punti NPS grazie a trasparenza
}
14. 本番環境での展開と安全な構成
# app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
from functools import lru_cache
class Settings(BaseSettings):
"""Configurazione applicazione da variabili d'ambiente."""
# Climatiq
climatiq_api_key: str
climatiq_data_version: str = "^21"
# Cache
cache_ttl_seconds: int = 3600
redis_url: str = "redis://localhost:6379"
# API Security
api_secret_key: str
allowed_origins: list[str] = ["https://dashboard.tuaazienda.it"]
# Database (per audit trail)
database_url: str = "postgresql://user:pass@localhost/ghg_db"
model_config = SettingsConfigDict(env_file=".env", case_sensitive=False)
@lru_cache
def get_settings() -> Settings:
return Settings()
# .env (NON committare in git - usare secrets manager in produzione)
# CLIMATIQ_API_KEY=clq_live_xxxxxxxxxxxxxxxxxxxxxxx
# CLIMATIQ_DATA_VERSION=^21
# API_SECRET_KEY=generato-con-openssl-rand-hex-32
# CACHE_TTL_SECONDS=3600
# REDIS_URL=redis://redis:6379
# DATABASE_URL=postgresql://ghg_user:password@postgres:5432/ghg_db
Climatiq API キーを決して公開しないでください
Climatiq API キーはソース コードやログ、 応答 API またはクライアント側 JavaScript 変数で。常に使用します: 環境変数、シークレットマネージャー (AWS Secrets Manager、HashiCorp Vault、 Azure Key Vault)、または保存時の暗号化を備えた Kubernetes Secret。 誤ってキーが露出した場合は、すぐにキーを回してください。
15. ベストプラクティスとアンチパターン
ベストプラクティス
-
本番環境で data_version をロックする: アメリカ合衆国
"21"の代わりに"^21"計算の再現性を確保するため。 明示的な再計算を使用して意図的に更新します。 - 積極的なキャッシュ: 排出係数はほとんど変化しません。 24 ~ 168 時間のキャッシュにより、特に API コールが大幅に削減されます 反復計算用 (毎月のフリート、製品カタログ)。
- 常に監査証跡を記録します。 各計算を activity_id で保存します。 使用される要素、データのバージョン、タイムスタンプ、およびソース。 CSRDには必須です。
- 可能な場合はバッチを使用します。 単一のバッチ呼び出し 100 件の項目は、100 件の個別の呼び出しよりもはるかに効率的です。 レート制限を尊重し、総遅延を削減します。
- data_quality_flags を検証します。 Climatiq が報告するのは、 因子のデータ品質が低いです。代替要素を使用してこれらのケースに対処する またはレポート内の不確実性のメモ。
- スコープ 2 の二重レポート: GHG プロトコルでは次のことが求められます。 場所ベースの方法と市場ベースの方法の両方。両方を計算して保存します。
- TypeScript のインメモリ キャッシュ: TTL を使用してマップを実装する TypeScript クライアントで、アプリケーションでの冗長な呼び出しを回避します。 Redis を使用しない長命の Node.js。
避けるべきアンチパターン
- フロントエンドの API キー: Climatiq キーを公開しないでください。 クライアント側の JavaScript。常にバックエンドを経由します。
- 構成ガスを無視します。 正確な CSRD レポートを作成するには、 CO₂、CH₄、N₂O は、総 CO₂e に加えて個別に報告する必要があります。
- 間違った測定単位: Climatiq は、次の間の変換を受け入れます。 単位は指定しますが、エンドポイントが kWh を期待している場合にリットルを渡すと結果が生成されます。 数値的にはもっともらしいが、科学的には間違っています。常に有効です。
- バッチ > 100 アイテム: API はエラー 422 を返します。 大規模なデータセットには常にチャンク ロジックを実装してください。
-
領域の一致を無視します。 電力の要因
リージョンを指定しない場合は、グローバルのデフォルトを使用します。イタリアの場合は常に使用します
"IT". -
同期呼び出しのブロック: 同期 HTTP ライブラリは絶対に使用しないでください
非同期エンドポイントで。常に使用する
httpx.AsyncClientPython またはaxiosconasync/awaitTypeScriptで。
結論と次のステップ
Climatiq は、自動炭素会計の最も困難な問題を解決します。 の 排出係数データベース。 190,000 以上の検証済み要素により、 coverage of 300 regions and continuous updates, allows you to build production-ready GHG calculation systems in days instead of months.
この記事では以下を構築しました。
- Un 堅牢な Python クライアント 再試行、Redis キャッシュ、型指定されたエラー処理付き
- Un TypeScript/Node.js クライアント Axios とフロントエンド統合のためのフルタイプセーフティを搭載
- の計算 スコープ1 (ディーゼル/HVO フリート)、 スコープ2 (デュアルメソッド) そして主なカテゴリー スコープ3
- Un Carbon Label API リアルタイム A-E ラベルとオフセットを使用した電子商取引用
- によるテスト モックAPI 実際の呼び出しを行わない CI/CD 環境の場合
- の戦略 バッチ処理とキャッシュ エンタープライズ規模向け
グリーン ソフトウェア シリーズは継続します
- 前の記事: CodeCarbon - コードエミッションを測定する オープンソースの Python ライブラリで実行されます。
- 次の記事: Carbon Aware SDK - ワークロードをシフトする方法 グリッド強度予測を使用して、最もクリーンなエネルギーを使用して時間内に電力を供給します。
- 関連記事 (MLOps シリーズ): 最適化 二酸化炭素排出量を削減するために ML モデルをトレーニングします。
- 関連記事(データ&AIビジネスシリーズ): データガバナンス 信頼性の高い AI のため - 持続可能性の指標をデータ カタログに統合する方法。
次の実際的なステップは、Climatiq のコミュニティ プランにサインアップすることです。 (250 回の無料通話/月)、 データエクスプローラー あなたの業界に関連する要素を見つけて実装する 最初のマイルストーンとして、最も単純なユースケースのスコープ 1 を計算します。
CSRDの規制圧力(2025年からEUの大企業に義務化)により、 2026 年から中小企業にも拡大)、ESG 指標に対する投資家の注目が高まっていること、 自動化された炭素会計のための技術的インフラストラクチャはもはや存在しません 競争上の優位性: それは 運用上の必要性.







