Climatiq API: GHG 배출 계산을 백엔드에 통합
2025년 탄소배출량 모니터링 솔루션 시장 성장 까지 173억 달러점점 엄격해지는 규제로 인해 예를 들어 유럽 CSRD, 미국의 SEC 기후 공개 및 ISO 14064 등이 있습니다. 더 이상 Excel 시트와 대략적인 견적에 의존하지 않아도 됩니다. 유용합니다. 자동 계산, 검증 가능하고 운영 체제에 통합됨.
기술적 과제는 현실적입니다. GHG(온실가스) 배출량을 계산하려면 데이터베이스에 대한 액세스가 필요합니다. 배출계수 업데이트, GHG 프로토콜에 의해 검증된 방법론 및 전환 수백 가지의 다양한 측정 단위 중 하나입니다. 이 모든 것을 사내에서 구축하고 유지관리하려면 다음이 필요합니다. 수개월의 전문 작업. 이것이 들어오는 곳입니다 기후.
Climatiq은 이상의 액세스를 제공하는 REST API입니다. 190,000개의 배출계수 40개 이상의 검증된 소스(EPA, DEFRA/BEIS, IEA, ecoinvent)에서 300개 이상의 지역, 범위 1, 2, 3에 대한 GHG 프로토콜을 준수하는 계산입니다. 이 글에서 우리는 FastAPI 백엔드 완성 Climatiq을 통합한 제품 TypeScript 클라이언트 및 실시간 계산기와 함께 생산 배출량 계산용 SaaS 애플리케이션용.
무엇을 배울 것인가
- Climatiq API 아키텍처: 엔드포인트, 인증, 속도 제한 및 데이터 모델
- 배출계수 데이터베이스: 올바른 계수를 검색, 필터링 및 선택하는 방법
- 활동 기반 추정: 구체적인 활동(kWh, km, kg)을 기반으로 계산
- 지출 기반 추정: 기본 데이터가 누락된 경우 통화 지출 계산
- GHG 프로토콜 범위 1, 2, 3: 카테고리 매핑 및 준수 계산
- 재시도, Redis 캐싱 및 오류 처리 기능을 갖춘 강력한 Python 클라이언트
- 프런트엔드 통합을 위한 Axios 및 유형이 포함된 TypeScript/Node.js 클라이언트
- 제품에 탄소 라벨이 부착된 SaaS용 실시간 탄소 계산기
- CI/CD 환경을 위한 모의 API를 사용한 테스트
- Climatiq의 대안 및 기능 비교
그린 소프트웨어 엔지니어링 시리즈
이 기사는 그린 소프트웨어 엔지니어링에 관한 전체 시리즈의 일부입니다. 모든 항목 디지털 지속 가능성의 특정 측면을 다룹니다.
| # | Articolo | 주요 주제 |
|---|---|---|
| 1 | 그린 소프트웨어 엔지니어링 원칙 | GSF, SCI 스펙, 8가지 기본 원칙 |
| 2 | CodeCarbon: 코드 방출 측정 | Python 라이브러리, 대시보드, CI/CD 통합 |
| 3 | Climatiq API: 백엔드의 GHG 계산 | REST API, 범위 1-3, FastAPI + TypeScript 통합 |
| 4 | 탄소 인식 SDK | 워크로드 이동, 그리드 강도, 시간 이동 |
| 5 | 범위 1, 2, 3: ESG 보고를 위한 데이터 모델링 | 데이터 구조, 계산, 집계, 보고 |
| 6 | GreenOps: 탄소 인식 인프라 | Kubernetes 스케줄링, 문제에 따른 확장 |
| 7 | 배출 파이프라인 범위 3 가치 사슬 | 공급업체 데이터 수집, 계산, 감사 추적 |
| 8 | ESG 보고 API: CSRD 통합 | CSRD 워크플로우, 보고서 자동화, 규정 준수 |
| 9 | 지속 가능한 아키텍처 패턴 | 스토리지, 지능형 캐싱, 탄소 인식 배치 |
| 10 | AI 및 Carbon: ML 훈련 공간 | LLM 배출, 최적화, Green AI |
1. 온실가스 프로토콜과 자동 계산의 필요성
Il GHG 프로토콜 기업 표준 세계에서 가장 많이 채택되는 프레임워크입니다. 회사의 배출량을 계산하기 위한 것입니다. 배출량을 세 가지 영역으로 분류합니다.
- 범위 1(직접 배출): 회사 차량의 연료 연소, 생산 공장, 난방. 그들은 회사의 직접적인 통제를 받습니다.
- 범위 2(간접 에너지): 전기, 증기, 열을 구입했습니다. 그들은 다음과 같이 나누어진다 위치 기반 (로컬 네트워크 혼합) e 시장 기반 (에너지 인증서, PPA).
- 범위 3(가치 사슬): 상품 구매를 포함한 15개 카테고리 및 서비스, 업스트림/다운스트림 운송, 제품 사용, 수명 종료, 출장, 직원 통근 등. 그들은 일반적으로 70-90% 총 배출량 대비 회사의.
각 계산마다 다음이 필요합니다. 배출계수: 변환하는 계수 활동(디젤 리터, 전기 kWh, 구매 유로) kg CO2e 단위. 이러한 요소는 다음에 따라 다릅니다.
- 참고 연도: 전기 네트워크는 매년 구성을 변경합니다.
- 지리적 지역: 이탈리아 kWh는 독일 kWh와 다릅니다.
- 데이터 소스: 미국은 EPA, 영국은 DEFRA, 이탈리아는 ISPRA
- 둘레: 업스트림, 다운스트림, 요람에서 게이트까지, 요람에서 무덤까지
업데이트된 배출계수 데이터베이스를 유지하려면 전담팀이 필요합니다. Climatiq은 우리를 대신하여 더 많은 정보를 수집합니다. 40개의 검증된 소스 와 지속적인 업데이트.
2. Climatiq API 개요
아키텍처 및 주요 엔드포인트
Climatiq API는 URL 기반 JSON REST API입니다. https://beta3.api.climatiq.io.
인증은 다음을 통해 이루어집니다. 무기명 토큰 HTTP 헤더에.
사용 가능한 계획은 다음과 같습니다.
| 바닥 | 통화/월 | 기능성 | 일반적인 사용 |
|---|---|---|---|
| 지역 사회 | 250 | 모든 엔드포인트 | 프로토타이핑, 테스트 |
| 기동기 | 5,000 | + 개인적인 요인 | SME, MVP |
| 성장 | 50,000 | + SLA, 지원 | 성장하는 기업 |
| 기업 | 관습 | + 감사 추적, SSO | 대규모 조직 |
주요 끝점은 다음과 같습니다.
| 엔드포인트 | 방법 | 설명 |
|---|---|---|
/search |
얻다 | 데이터베이스에서 배출계수 검색 |
/estimate |
우편 | 활동으로 인한 예상 단일 배출량 |
/batch/estimate |
우편 | 여러 견적(요청당 최대 100개) |
/travel/flights |
우편 | 항공사 배출(범위 3.6) |
/freight |
우편 | 복합 화물 운송 배출 |
/procurement |
우편 | 구매 문제(범위 3.1, 지출 기반) |
/energy |
우편 | 에너지 소비 배출량(Scope 2) |
/compute |
우편 | 클라우드 컴퓨팅 배출량 |
견적 요청의 구조
# Esempio di richiesta POST /estimate
{
"emission_factor": {
"activity_id": "electricity-supply_grid-source_residual_mix",
"data_version": "^21",
"region": "IT"
},
"parameters": {
"energy": 1000,
"energy_unit": "kWh"
}
}
# Risposta
{
"co2e": 0.415,
"co2e_unit": "kg",
"co2e_calculation_method": "ar5",
"co2e_calculation_origin": "source",
"emission_factor": {
"activity_id": "electricity-supply_grid-source_residual_mix",
"source": "IEA",
"year": 2022,
"region": "IT",
"category": "Electricity",
"lca_activity": "electricity_generation",
"data_quality_flags": []
},
"constituent_gases": {
"co2e_total": 0.415,
"co2e_other": null,
"co2": 0.415,
"ch4": null,
"n2o": null
}
}
데이터 버전: 의미론 및 모범 사례
분야 data_version 사용할 데이터베이스 버전을 제어합니다.
캐럿(^21) 호환 가능한 버전 21 이상을 사용하십시오.
배출계수 자동 업데이트. 생산에서는 블록
정확한 버전 (예: "21")의 재현성을 위해
계산 및 감사 추적. 의도적으로 새 소프트웨어 버전으로 업그레이드
역사적 가치를 명시적으로 재계산합니다.
3. 데이터 모델: 배출계수 데이터베이스
배출계수의 구조
Climatiq의 배출계수는 다음과 같이 고유하게 식별됩니다.
activity_id, source, region, year
e lca_activity. 이 구조를 이해하는 것이 중요합니다
올바른 요소를 선택합니다.
# Struttura di un Emission Factor nel database Climatiq
{
"activity_id": "fuel_type-diesel",
"uuid": "94de5038-8b06-4e24-8e8c-1b87e1e0",
"name": "Diesel",
"category": "Fuel",
"sector": "Transport",
"source": "DEFRA",
"source_link": "https://www.gov.uk/guidance/ghg-conversion-factors-for-company-reporting",
"source_dataset": "DEFRA 2023",
"year": 2023,
"year_released": 2023,
"region": "GB",
"region_name": "United Kingdom",
"description": "Diesel combustion emission factor",
"unit_type": ["Volume", "Weight"],
"supported_calculation_methods": ["ar5"],
"factor": 2.5179,
"factor_calculation_method": "ar5",
"factor_calculation_origin": "source",
"constituent_gases": {
"co2e_total": 2.5179,
"co2": 2.5148,
"ch4": 0.0009,
"n2o": 0.0022
}
}
주요 데이터 소스
| 원천 | 국가/지역 | 업데이트 | 다루는 분야 |
|---|---|---|---|
| 데프라/BEIS | UK | 연간 | 에너지, 운송, 구매, 자재 |
| EPA | 미국 | 연간 | 에너지, 산업 공정, 농업 |
| IEA | 글로벌 | 연간 | 국가별 전기, 1차에너지 |
| 에코벤트 | 글로벌 | 반년마다 | 완전한 LCA, 공급망, 자재 |
| 아데메 | 프랑스 | 연간 | 석탄 기반, 운송, FR 에너지 |
| EEA | EU | 연간 | 그리드 전력 유럽, 부문별 배출 |
| 이스프라 | 이탈리아 | 연간 | 국가 온실가스 인벤토리, IT 전력 믹스 |
4. 활동 기반 추정: 구체적인 활동에 대한 계산
접근 방식 활동 기반 가장 정확합니다. 소비된 연료 리터, 전기 kWh, 킬로미터를 여행했거나 구매한 재료의 톤. 운영 데이터 수집 필요 상세하지만 과학적으로 확고한 결과를 만들어냅니다.
예: 회사 차량 배출량 계산(범위 1)
# activity_based_estimation.py
import httpx
from dataclasses import dataclass
from enum import Enum
class FuelType(str, Enum):
DIESEL = "fuel_type-diesel"
PETROL = "fuel_type-petrol"
HVO = "fuel_type-hvo_biodiesel"
LPG = "fuel_type-lpg"
CNG = "fuel_type-cng"
@dataclass
class VehicleTrip:
vehicle_id: str
fuel_type: FuelType
litres_consumed: float
region: str = "IT"
async def estimate_fleet_scope1(
api_key: str,
trips: list[VehicleTrip],
data_version: str = "^21"
) -> dict:
"""
Calcola Scope 1 per flotta aziendale con activity-based estimation.
Returns aggregato e dettaglio per veicolo.
"""
BASE_URL = "https://beta3.api.climatiq.io"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# Costruisce le richieste batch (max 100 per chiamata)
batch_requests = [
{
"emission_factor": {
"activity_id": trip.fuel_type.value,
"data_version": data_version,
"region": trip.region
},
"parameters": {
"volume": trip.litres_consumed,
"volume_unit": "l"
}
}
for trip in trips
]
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{BASE_URL}/batch/estimate",
headers=headers,
json={"requests": batch_requests}
)
response.raise_for_status()
results = response.json()["results"]
# Aggrega risultati per veicolo
vehicle_emissions = {}
total_co2e_kg = 0.0
for trip, result in zip(trips, results):
if "error" in result:
print(f"Errore per veicolo {trip.vehicle_id}: {result['error']}")
continue
co2e_kg = result["co2e"]
total_co2e_kg += co2e_kg
if trip.vehicle_id not in vehicle_emissions:
vehicle_emissions[trip.vehicle_id] = {
"total_co2e_kg": 0.0,
"fuel_type": trip.fuel_type.value,
"trips": 0
}
vehicle_emissions[trip.vehicle_id]["total_co2e_kg"] += co2e_kg
vehicle_emissions[trip.vehicle_id]["trips"] += 1
return {
"scope": "scope_1",
"total_co2e_kg": total_co2e_kg,
"total_co2e_tco2e": total_co2e_kg / 1000,
"vehicles": vehicle_emissions,
"vehicle_count": len(vehicle_emissions)
}
# Utilizzo
import asyncio
async def main():
trips = [
VehicleTrip("VAN-001", FuelType.DIESEL, 120.5),
VehicleTrip("VAN-002", FuelType.DIESEL, 98.3),
VehicleTrip("TRUCK-001", FuelType.HVO, 245.0),
VehicleTrip("CAR-001", FuelType.PETROL, 42.1),
]
result = await estimate_fleet_scope1(
api_key="clq_live_your_key_here",
trips=trips
)
print(f"Scope 1 totale: {result['total_co2e_tco2e']:.2f} tCO2e")
for vid, data in result["vehicles"].items():
print(f" {vid}: {data['total_co2e_kg']:.1f} kg CO2e")
asyncio.run(main())
전기 배출량 계산(Scope 2)
Scope 2에는 위치 기반(네트워크 혼합) 및 시장 기반(잔여 혼합 계수 또는 PPA). 두 값 모두에 보고되어야 합니다. 공개 GHG 프로토콜.
# scope2_electricity.py
from enum import Enum
class Scope2Method(str, Enum):
LOCATION_BASED = "location_based"
MARKET_BASED = "market_based"
async def estimate_scope2_electricity(
client: httpx.AsyncClient,
api_key: str,
kwh_consumed: float,
region: str,
method: Scope2Method,
renewable_percentage: float = 0.0,
data_version: str = "^21"
) -> dict:
"""
Scope 2 con metodo location-based o market-based.
- location_based: usa il mix elettrico della rete locale
- market_based: usa residual mix o certificati (IRECs, GOs)
"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# Market-based: se 100% rinnovabile certificato, le emissioni sono zero
if method == Scope2Method.MARKET_BASED and renewable_percentage >= 100.0:
return {
"co2e_kg": 0.0,
"co2e_tco2e": 0.0,
"method": method.value,
"note": "100% energie rinnovabili certificate - emissioni zero"
}
# Activity ID diverso per location-based vs market-based
if method == Scope2Method.LOCATION_BASED:
activity_id = "electricity-supply_grid-source_residual_mix"
else:
# Market-based usa residual mix (esclude RES certificate)
activity_id = "electricity-supply_grid-source_residual_mix"
# Riduzione proporzionale per rinnovabili parziali (market-based)
effective_kwh = kwh_consumed
if method == Scope2Method.MARKET_BASED and renewable_percentage > 0:
effective_kwh = kwh_consumed * (1 - renewable_percentage / 100.0)
payload = {
"emission_factor": {
"activity_id": activity_id,
"data_version": data_version,
"region": region
},
"parameters": {
"energy": effective_kwh,
"energy_unit": "kWh"
}
}
response = await client.post(
"https://beta3.api.climatiq.io/estimate",
headers=headers,
json=payload
)
response.raise_for_status()
data = response.json()
return {
"co2e_kg": data["co2e"],
"co2e_tco2e": data["co2e"] / 1000,
"co2e_unit": data["co2e_unit"],
"method": method.value,
"kwh_consumed": kwh_consumed,
"effective_kwh": effective_kwh,
"renewable_percentage": renewable_percentage,
"emission_factor": data["emission_factor"],
"region": region
}
5. 지출 기반 추정: 지출 데이터를 이용한 계산
정확한 활동 데이터가 없는 경우 접근 방식은 다음과 같습니다. 지출 기반 통화 지출을 활동의 대용으로 사용합니다. Climatiq은 배출계수를 적용합니다. 데이터를 기반으로 한 구매 카테고리별 경제성(지출된 유로/달러당 kg CO2e) OECD 또는 EXIOBASE 테이블의 입출력. Scope 3의 가장 일반적인 방법입니다. 카테고리 1(구매한 상품 및 서비스).
지출 기반 정확도와 활동 기반 정확도
지출 기반 방법은 불확실성이 있는 추정치를 생성합니다. 30-100%, 활동 기반 방법의 5-15%와 비교됩니다. 데이터가 없을 때만 사용하세요 예비 선거. GHG 프로토콜은 이를 출발점으로 받아들이지만 발전이 필요합니다. 활동 데이터 개선.
# scope3_spend_based.py
from typing import Optional
import httpx
# Mappa categorie NACE su activity_id Climatiq per spend-based
CATEGORY_TO_ACTIVITY_ID = {
# Settore IT e servizi digitali
"it_services": "professional_services-type_professional_services",
"cloud_hosting": "professional_services-type_professional_services",
"software_licenses": "professional_services-type_professional_services",
# Logistica e trasporti
"freight_road": "transport-type_freight_vehicle",
"freight_air": "transport-type_air_freight",
"courier_services": "transport-type_freight_vehicle",
# Produzione e manifattura
"raw_materials_steel": "steel-type_steel_products",
"raw_materials_plastic": "plastics-type_plastic_products",
"packaging": "paper-type_paper_products",
# Servizi professionali
"legal_services": "professional_services-type_professional_services",
"consulting": "professional_services-type_professional_services",
"marketing": "professional_services-type_professional_services",
# Utilities e energia
"electricity_bill": "electricity-supply_grid-source_residual_mix",
"gas_bill": "fuel_type-natural_gas",
}
async def calculate_scope3_spend_based(
api_key: str,
purchases: list[dict],
region: str = "IT",
currency: str = "EUR",
data_version: str = "^21"
) -> dict:
"""
Calcola Scope 3.1 con metodo spend-based.
purchases: [{"category": "it_services", "amount": 50000}, ...]
"""
BASE_URL = "https://beta3.api.climatiq.io"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
batch_requests = []
for purchase in purchases:
activity_id = CATEGORY_TO_ACTIVITY_ID.get(purchase["category"])
if not activity_id:
print(f"Categoria non mappata: {purchase['category']}")
continue
batch_requests.append({
"emission_factor": {
"activity_id": activity_id,
"data_version": data_version,
"region": region
},
"parameters": {
"money": purchase["amount"],
"money_unit": currency.lower()
}
})
# Chunking per batch limite 100
results_all = []
chunk_size = 100
async with httpx.AsyncClient(timeout=60.0) as client:
for i in range(0, len(batch_requests), chunk_size):
chunk = batch_requests[i:i + chunk_size]
response = await client.post(
f"{BASE_URL}/batch/estimate",
headers=headers,
json={"requests": chunk}
)
response.raise_for_status()
results_all.extend(response.json()["results"])
# Aggrega per categoria
total_co2e_kg = 0.0
category_breakdown = {}
for purchase, result in zip(purchases, results_all):
if "error" in result:
category_breakdown[purchase["category"]] = {
"error": result["error"],
"amount": purchase["amount"]
}
continue
co2e_kg = result.get("co2e", 0.0)
total_co2e_kg += co2e_kg
cat = purchase["category"]
if cat not in category_breakdown:
category_breakdown[cat] = {
"total_co2e_kg": 0.0,
"total_spend": 0.0,
"intensity_kg_per_eur": 0.0
}
category_breakdown[cat]["total_co2e_kg"] += co2e_kg
category_breakdown[cat]["total_spend"] += purchase["amount"]
# Calcola intensità per categoria
for cat_data in category_breakdown.values():
if "total_spend" in cat_data and cat_data["total_spend"] > 0:
cat_data["intensity_kg_per_eur"] = (
cat_data["total_co2e_kg"] / cat_data["total_spend"]
)
return {
"scope": "scope_3.1",
"method": "spend_based",
"total_co2e_kg": total_co2e_kg,
"total_co2e_tco2e": total_co2e_kg / 1000,
"currency": currency,
"categories": category_breakdown,
"uncertainty_note": "Incertezza stimata 30-100% (metodo spend-based)"
}
6. GHG 프로토콜 통합: 매핑 범위 1, 2, 3
완전한 탄소 회계 시스템을 구축하려면 모든 탄소 배출량을 매핑해야 합니다. 특정 엔드포인트 및 Climatiq 활동_ID에 대한 GHG 프로토콜 카테고리입니다. 이 섹션 가장 관련성이 높은 카테고리에 대한 완전한 매핑을 제공합니다.
| 범위 GHG 프로토콜 | 범주 | 엔드포인트 클라이마틱 | 방법 |
|---|---|---|---|
| 범위 1 | 고정연소(가열) | /estimate |
활동(볼륨) |
| 범위 1 | 이동연소(함대) | /batch/estimate |
활동(볼륨) |
| 범위 1 | 비산 배출(냉매) | /estimate |
활동량(체중) |
| 범위 2 | 전기(위치 기반) | /energy |
활동량(kWh) |
| 범위 2 | 전기(시장 기반) | /energy |
활동량(kWh, 잔여 혼합량) |
| 범위 3.1 | 구매한 상품 및 서비스 | /procurement |
지출 기반(EUR) |
| 범위 3.4 | 업스트림 운송 | /freight |
활동량(톤-km) |
| 범위 3.6 | 출장(비행기) | /travel/flights |
활동(IATA 코드) |
| 범위 3.6 | 출장(자동차/기차) | /estimate |
활동(km) |
| 범위 3.7 | 직원 통근 | /batch/estimate |
활동(km, 반) |
| 범위 3.11 | 판매된 제품의 사용 | /estimate |
활동/지출 |
# ghg_protocol_calculator.py
# Sistema unificato per calcolo GHG Protocol completo
from dataclasses import dataclass, field
from typing import Optional
import asyncio
import httpx
@dataclass
class GHGProtocolReport:
"""Report GHG Protocol completo per anno fiscale."""
year: int
company_name: str
reporting_boundary: str = "operational_control"
# Scope 1
scope1_combustion_kg: float = 0.0
scope1_mobile_kg: float = 0.0
scope1_fugitive_kg: float = 0.0
# Scope 2
scope2_location_based_kg: float = 0.0
scope2_market_based_kg: float = 0.0
# Scope 3 (categorie principali)
scope3_cat1_purchased_goods_kg: float = 0.0
scope3_cat4_upstream_transport_kg: float = 0.0
scope3_cat6_business_travel_kg: float = 0.0
scope3_cat7_employee_commuting_kg: float = 0.0
# Metadata per audit trail
calculation_date: Optional[str] = None
data_version: str = ""
sources: list[str] = field(default_factory=list)
@property
def scope1_total_kg(self) -> float:
return (self.scope1_combustion_kg +
self.scope1_mobile_kg +
self.scope1_fugitive_kg)
@property
def scope2_selected_kg(self) -> float:
"""Market-based se disponibile, altrimenti location-based."""
return (self.scope2_market_based_kg
if self.scope2_market_based_kg > 0
else self.scope2_location_based_kg)
@property
def scope3_total_kg(self) -> float:
return (self.scope3_cat1_purchased_goods_kg +
self.scope3_cat4_upstream_transport_kg +
self.scope3_cat6_business_travel_kg +
self.scope3_cat7_employee_commuting_kg)
@property
def grand_total_tco2e(self) -> float:
return (self.scope1_total_kg +
self.scope2_selected_kg +
self.scope3_total_kg) / 1000
def to_csrd_dict(self) -> dict:
"""Output formato CSRD/ESRS E1-6."""
return {
"reporting_year": self.year,
"entity": self.company_name,
"ghg_emissions_location_based": {
"scope_1": self.scope1_total_kg / 1000,
"scope_2_location": self.scope2_location_based_kg / 1000,
"scope_3": self.scope3_total_kg / 1000,
"unit": "tCO2e"
},
"ghg_emissions_market_based": {
"scope_1": self.scope1_total_kg / 1000,
"scope_2_market": self.scope2_market_based_kg / 1000,
"scope_3": self.scope3_total_kg / 1000,
"unit": "tCO2e"
},
"data_version": self.data_version,
"methodology": "GHG Protocol Corporate Standard"
}
7. 강력한 Python 클라이언트: 재시도, 캐시 및 오류 처리
프로덕션에서 API 호출은 일시적인 오류에 대해 복원력이 있어야 합니다. 효과적인 캐싱으로 속도를 제한하고 호출을 최소화합니다. 다음은 프로덕션 준비가 완료된 클라이언트입니다.
# climatiq_client.py
import asyncio
import hashlib
import json
import logging
import time
from dataclasses import dataclass
from typing import Any, Optional
import httpx
import redis.asyncio as redis
logger = logging.getLogger(__name__)
class ClimatiqAPIError(Exception):
"""Errore API Climatiq con context."""
def __init__(self, message: str, status_code: int, response_body: dict):
super().__init__(message)
self.status_code = status_code
self.response_body = response_body
class RateLimitError(ClimatiqAPIError):
"""Rate limit superato - aspetta prima di riprovare."""
pass
class ClimatiqClient:
"""
Client asincrono per Climatiq API con:
- Retry automatico con exponential backoff
- Cache Redis per ridurre le chiamate API
- Logging strutturato per audit trail
- Gestione rate limit con rispetto dei retry-after header
"""
BASE_URL = "https://beta3.api.climatiq.io"
MAX_RETRIES = 3
BATCH_SIZE = 100 # Limite Climatiq
def __init__(
self,
api_key: str,
redis_client: Optional[redis.Redis] = None,
cache_ttl: int = 86400, # 24 ore - i fattori cambiano raramente
data_version: str = "^21"
):
self.api_key = api_key
self.redis = redis_client
self.cache_ttl = cache_ttl
self.data_version = data_version
self._http_client: Optional[httpx.AsyncClient] = None
async def __aenter__(self):
self._http_client = httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=10.0),
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
)
return self
async def __aexit__(self, *args):
if self._http_client:
await self._http_client.aclose()
def _cache_key(self, payload: dict) -> str:
"""Genera chiave cache deterministica da payload."""
payload_str = json.dumps(payload, sort_keys=True)
return f"climatiq:{hashlib.sha256(payload_str.encode()).hexdigest()[:16]}"
async def _get_cached(self, key: str) -> Optional[dict]:
"""Recupera risultato dalla cache Redis."""
if not self.redis:
return None
try:
cached = await self.redis.get(key)
if cached:
logger.debug(f"Cache HIT: {key}")
return json.loads(cached)
except Exception as e:
logger.warning(f"Errore cache GET: {e}")
return None
async def _set_cached(self, key: str, value: dict) -> None:
"""Salva risultato nella cache Redis."""
if not self.redis:
return
try:
await self.redis.setex(key, self.cache_ttl, json.dumps(value))
logger.debug(f"Cache SET: {key} (TTL: {self.cache_ttl}s)")
except Exception as e:
logger.warning(f"Errore cache SET: {e}")
async def _request_with_retry(
self, method: str, endpoint: str, payload: dict
) -> dict:
"""HTTP request con retry e exponential backoff."""
url = f"{self.BASE_URL}{endpoint}"
last_error = None
for attempt in range(self.MAX_RETRIES):
try:
response = await self._http_client.request(
method, url, json=payload
)
if response.status_code == 429: # Rate limit
retry_after = int(response.headers.get("Retry-After", 60))
logger.warning(
f"Rate limit superato. Aspetto {retry_after}s..."
)
await asyncio.sleep(retry_after)
continue
if response.status_code >= 400:
body = response.json() if response.content else {}
raise ClimatiqAPIError(
f"Errore API {response.status_code}: {body.get('error', 'Unknown')}",
status_code=response.status_code,
response_body=body
)
return response.json()
except httpx.NetworkError as e:
wait_time = 2 ** attempt
logger.warning(
f"Network error (attempt {attempt+1}/{self.MAX_RETRIES}), "
f"retry in {wait_time}s: {e}"
)
last_error = e
await asyncio.sleep(wait_time)
raise ConnectionError(
f"Falliti {self.MAX_RETRIES} tentativi: {last_error}"
)
async def estimate(
self,
activity_id: str,
parameters: dict,
region: Optional[str] = None
) -> dict:
"""
Stima singola con cache automatica.
activity_id: es. "electricity-supply_grid-source_residual_mix"
parameters: es. {"energy": 1000, "energy_unit": "kWh"}
"""
payload = {
"emission_factor": {
"activity_id": activity_id,
"data_version": self.data_version,
**({"region": region} if region else {})
},
"parameters": parameters
}
cache_key = self._cache_key(payload)
if cached := await self._get_cached(cache_key):
return cached
result = await self._request_with_retry("POST", "/estimate", payload)
await self._set_cached(cache_key, result)
logger.info(
"Estimate: activity=%(activity)s region=%(region)s "
"co2e=%(co2e).4f kg",
{
"activity": activity_id,
"region": region or "global",
"co2e": result.get("co2e", 0)
}
)
return result
async def batch_estimate(
self, requests: list[dict]
) -> list[dict]:
"""
Batch estimation con chunking automatico (100 per batch max).
"""
all_results = []
for i in range(0, len(requests), self.BATCH_SIZE):
chunk = requests[i:i + self.BATCH_SIZE]
payload = {"requests": chunk}
response = await self._request_with_retry(
"POST", "/batch/estimate", payload
)
all_results.extend(response.get("results", []))
# Piccola pausa tra chunk grandi per rispettare rate limit
if len(requests) > self.BATCH_SIZE:
await asyncio.sleep(0.5)
return all_results
8. Axios를 사용하는 TypeScript/Node.js 클라이언트
Node.js, TypeScript 또는 NestJS 백엔드 애플리케이션의 경우 다음은 유형화된 클라이언트입니다. 이는 완전한 유형 안전성을 갖춘 Python 클라이언트와 동일한 기능을 제공합니다.
// climatiq-client.ts
import axios, { AxiosInstance, AxiosError } from 'axios';
// Types per la Climatiq API
export interface EmissionFactor {
activity_id: string;
data_version: string;
region?: string;
}
export interface EstimateParameters {
energy?: number;
energy_unit?: 'kWh' | 'MWh' | 'GJ';
volume?: number;
volume_unit?: 'l' | 'gallon' | 'm3';
weight?: number;
weight_unit?: 'kg' | 't' | 'lb';
money?: number;
money_unit?: 'eur' | 'usd' | 'gbp';
distance?: number;
distance_unit?: 'km' | 'mi';
}
export interface EstimateRequest {
emission_factor: EmissionFactor;
parameters: EstimateParameters;
}
export interface ConstituentGases {
co2e_total: number;
co2?: number;
ch4?: number;
n2o?: number;
}
export interface EstimateResponse {
co2e: number;
co2e_unit: string;
co2e_calculation_method: string;
emission_factor: {
activity_id: string;
source: string;
year: number;
region: string;
category: string;
data_quality_flags: string[];
};
constituent_gases: ConstituentGases;
}
export interface BatchEstimateResult {
co2e?: number;
co2e_unit?: string;
error?: string;
constituent_gases?: ConstituentGases;
}
export class ClimatiqAPIError extends Error {
constructor(
message: string,
public readonly statusCode: number,
public readonly responseBody: unknown
) {
super(message);
this.name = 'ClimatiqAPIError';
}
}
export class ClimatiqClient {
private readonly http: AxiosInstance;
private readonly cache = new Map<string, { data: unknown; expiresAt: number }>();
private readonly cacheTtlMs: number;
constructor(
private readonly apiKey: string,
private readonly dataVersion = '^21',
cacheTtlSeconds = 3600
) {
this.cacheTtlMs = cacheTtlSeconds * 1000;
this.http = axios.create({
baseURL: 'https://beta3.api.climatiq.io',
headers: {
Authorization: `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
timeout: 30_000,
});
}
private cacheKey(payload: unknown): string {
return JSON.stringify(payload);
}
private getFromCache<T>(key: string): T | null {
const entry = this.cache.get(key);
if (!entry) return null;
if (Date.now() > entry.expiresAt) {
this.cache.delete(key);
return null;
}
return entry.data as T;
}
private setCache(key: string, data: unknown): void {
this.cache.set(key, {
data,
expiresAt: Date.now() + this.cacheTtlMs,
});
}
private handleAxiosError(error: AxiosError): never {
if (error.response) {
const status = error.response.status;
const body = error.response.data as Record<string, unknown>;
if (status === 429) {
throw new ClimatiqAPIError(
'Rate limit superato. Riprova tra qualche momento.',
429,
body
);
}
throw new ClimatiqAPIError(
`Climatiq API error ${status}: ${body['error'] ?? 'Unknown'}`,
status,
body
);
}
throw new ClimatiqAPIError(
`Network error: ${error.message}`,
0,
null
);
}
async estimate(
activityId: string,
parameters: EstimateParameters,
region?: string
): Promise<EstimateResponse> {
const payload: EstimateRequest = {
emission_factor: {
activity_id: activityId,
data_version: this.dataVersion,
...(region ? { region } : {}),
},
parameters,
};
const key = this.cacheKey(payload);
const cached = this.getFromCache<EstimateResponse>(key);
if (cached) return cached;
try {
const { data } = await this.http.post<EstimateResponse>(
'/estimate',
payload
);
this.setCache(key, data);
return data;
} catch (error) {
if (axios.isAxiosError(error)) this.handleAxiosError(error);
throw error;
}
}
async batchEstimate(
requests: EstimateRequest[]
): Promise<BatchEstimateResult[]> {
const CHUNK_SIZE = 100;
const allResults: BatchEstimateResult[] = [];
for (let i = 0; i < requests.length; i += CHUNK_SIZE) {
const chunk = requests.slice(i, i + CHUNK_SIZE);
try {
const { data } = await this.http.post<{ results: BatchEstimateResult[] }>(
'/batch/estimate',
{ requests: chunk }
);
allResults.push(...data.results);
} catch (error) {
if (axios.isAxiosError(error)) this.handleAxiosError(error);
throw error;
}
// Throttle tra chunk multipli
if (i + CHUNK_SIZE < requests.length) {
await new Promise(resolve => setTimeout(resolve, 200));
}
}
return allResults;
}
async estimateFlight(
originIata: string,
destinationIata: string,
passengers: number,
cabinClass: 'economy' | 'business' | 'first' = 'economy'
): Promise<EstimateResponse> {
try {
const { data } = await this.http.post<EstimateResponse>(
'/travel/flights',
{
legs: [{
from: originIata,
to: destinationIata,
passengers,
cabin_class: cabinClass,
}],
}
);
return data;
} catch (error) {
if (axios.isAxiosError(error)) this.handleAxiosError(error);
throw error;
}
}
}
TypeScript 클라이언트 사용
// usage-example.ts
import { ClimatiqClient } from './climatiq-client';
const client = new ClimatiqClient(
process.env['CLIMATIQ_API_KEY'] ?? '',
'^21',
3600
);
// Calcolo emissioni elettricità ufficio
async function calculateOfficeElectricity() {
const result = await client.estimate(
'electricity-supply_grid-source_residual_mix',
{ energy: 5000, energy_unit: 'kWh' },
'IT'
);
console.log(`Emissioni ufficio: ${result.co2e.toFixed(2)} kg CO2e`);
console.log(`Fonte: ${result.emission_factor.source} (${result.emission_factor.year})`);
return result;
}
// Calcolo batch per fleet management
async function calculateFleetEmissions(
vehicles: Array<{ id: string; litres: number; fuel: string }>
) {
const requests = vehicles.map(v => ({
emission_factor: {
activity_id: `fuel_type-${v.fuel}`,
data_version: '^21',
region: 'IT',
},
parameters: {
volume: v.litres,
volume_unit: 'l' as const,
},
}));
const results = await client.batchEstimate(requests);
return vehicles.map((v, i) => ({
vehicleId: v.id,
co2eKg: results[i].co2e ?? 0,
error: results[i].error,
}));
}
// Calcolo volo business travel
async function calculateBusinessFlight() {
const result = await client.estimateFlight('MXP', 'LHR', 2, 'economy');
console.log(`Volo MXP-LHR (2 pax): ${result.co2e.toFixed(1)} kg CO2e`);
}
9. SaaS용 실시간 탄소 계산기
Climatiq의 강력한 사용 사례는 탄소 라벨 전자상거래 또는 SaaS 제품: 사용자에게 탄소 영향을 보여줍니다. 주문이나 조치를 확인하기 전에 견적을 내보세요. 이렇게 하면 투명성이 높아집니다. 소비자의 정보에 입각한 선택을 지원합니다.
아키텍처: 전자상거래용 탄소 라벨 API
# carbon_label_api.py
# FastAPI endpoint per carbon label real-time su prodotti e-commerce
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from pydantic import BaseModel, Field
from typing import Optional
import asyncio
app = FastAPI(title="Carbon Label API", version="1.0.0")
class ProductCarbonRequest(BaseModel):
"""Richiesta calcolo carbon label per carrello."""
items: list[dict] = Field(
description="Lista prodotti con categoria e peso",
example=[
{"product_id": "SKU-123", "category": "electronics", "weight_kg": 0.5, "quantity": 1},
{"product_id": "SKU-456", "category": "clothing", "weight_kg": 0.3, "quantity": 2}
]
)
shipping_method: str = Field(
default="road",
description="Metodo spedizione: road, air, sea"
)
destination_region: str = Field(
default="IT",
description="Regione destinazione ISO 3166-2"
)
origin_region: str = Field(
default="CN",
description="Regione origine/produzione"
)
class CarbonLabelResponse(BaseModel):
"""Risposta con carbon label completa."""
total_co2e_kg: float
breakdown: dict
label: str # "A", "B", "C", "D", "E" come etichetta energetica EU
label_color: str
offset_cost_eur: float # Stima costo compensazione
trees_equivalent: float # Equivalente alberi annui
km_car_equivalent: float # Equivalente km auto
# Mappatura categoria prodotto su activity_id per produzione
PRODUCT_CATEGORY_PRODUCTION = {
"electronics": "electrical_equipment-type_small_electronics",
"clothing": "textiles-type_clothing",
"food": "food-type_mixed",
"furniture": "furniture-type_mixed",
"books": "paper-type_books",
"plastics": "plastics-type_plastic_products",
}
# Mappatura metodo spedizione su activity_id
SHIPPING_ACTIVITY_ID = {
"road": "transport-type_freight_vehicle-fuel_source_diesel-vehicle_type_hgv",
"air": "transport-type_air_freight",
"sea": "transport-type_sea_freight-route_type_container_ship",
}
def calculate_carbon_label(co2e_kg: float) -> tuple[str, str]:
"""
Calcola etichetta A-E basata su impatto carbonico.
Soglie ispirate alla proposta EU eco-label per e-commerce.
"""
if co2e_kg < 0.5:
return "A", "#2ecc71" # Verde - impatto molto basso
elif co2e_kg < 1.5:
return "B", "#27ae60" # Verde scuro - impatto basso
elif co2e_kg < 5.0:
return "C", "#f39c12" # Arancione - impatto medio
elif co2e_kg < 15.0:
return "D", "#e67e22" # Arancione scuro - impatto alto
else:
return "E", "#e74c3c" # Rosso - impatto molto alto
@app.post("/api/v1/carbon-label", response_model=CarbonLabelResponse)
async def get_carbon_label(
request: ProductCarbonRequest,
climatiq: ClimatiqClient = Depends(get_climatiq_client)
):
"""
Calcola carbon label real-time per un carrello e-commerce.
Considera produzione + imballaggio + spedizione.
"""
batch_requests = []
# 1. Emissioni produzione per ciascun prodotto
for item in request.items:
activity_id = PRODUCT_CATEGORY_PRODUCTION.get(
item["category"],
"manufactured_goods-type_mixed" # Fallback generico
)
# Calcolo per peso totale (qty * peso)
total_weight = item["weight_kg"] * item.get("quantity", 1)
batch_requests.append({
"emission_factor": {
"activity_id": activity_id,
"data_version": "^21",
"region": request.origin_region
},
"parameters": {
"weight": total_weight,
"weight_unit": "kg"
}
})
# 2. Emissioni spedizione (distanza stimata)
total_weight_kg = sum(
i["weight_kg"] * i.get("quantity", 1)
for i in request.items
)
# Stima distanza basata su regioni (semplificata)
estimated_distance_km = _estimate_distance(
request.origin_region,
request.destination_region
)
shipping_activity = SHIPPING_ACTIVITY_ID.get(
request.shipping_method,
SHIPPING_ACTIVITY_ID["road"]
)
# Freight: tonne * km = tonne-km
tonne_km = (total_weight_kg / 1000) * estimated_distance_km
batch_requests.append({
"emission_factor": {
"activity_id": shipping_activity,
"data_version": "^21"
},
"parameters": {
"weight": total_weight_kg,
"weight_unit": "kg",
"distance": estimated_distance_km,
"distance_unit": "km"
}
})
# Chiamata batch a Climatiq
try:
results = await climatiq.batch_estimate(batch_requests)
except ClimatiqAPIError as e:
raise HTTPException(
status_code=502,
detail=f"Errore calcolo emissioni: {str(e)}"
)
# Aggrega risultati
production_co2e = sum(
r.get("co2e", 0) for r in results[:-1] # Tutti tranne ultimo (spedizione)
)
shipping_co2e = results[-1].get("co2e", 0) if results else 0
total_co2e = production_co2e + shipping_co2e
label, color = calculate_carbon_label(total_co2e)
# Calcola equivalenze intuitive per l'utente
# Offset market rate ~15 EUR/tCO2e (mercato volontario 2025)
offset_cost = (total_co2e / 1000) * 15.0
# Un albero assorbe ~22 kg CO2/anno
trees_equivalent = total_co2e / 22.0
# Auto media emette ~0.21 kg CO2/km
km_car = total_co2e / 0.21
return CarbonLabelResponse(
total_co2e_kg=round(total_co2e, 3),
breakdown={
"production_kg": round(production_co2e, 3),
"shipping_kg": round(shipping_co2e, 3),
"shipping_method": request.shipping_method,
"distance_km": estimated_distance_km
},
label=label,
label_color=color,
offset_cost_eur=round(offset_cost, 2),
trees_equivalent=round(trees_equivalent, 1),
km_car_equivalent=round(km_car, 1)
)
def _estimate_distance(origin: str, destination: str) -> float:
"""Stima distanza tra regioni in km (lookup semplificato)."""
DISTANCES = {
("CN", "IT"): 9_000,
("DE", "IT"): 950,
("IT", "IT"): 300,
("US", "IT"): 8_500,
("IN", "IT"): 7_200,
}
key = (origin[:2].upper(), destination[:2].upper())
return DISTANCES.get(key, 5_000) # Default 5000km se sconosciuto
탄소 라벨을 표시하는 프런트엔드 위젯
// carbon-label.component.ts (Angular/TypeScript)
import { Component, Input, OnChanges, SimpleChanges } from '@angular/core';
import { HttpClient } from '@angular/common/http';
interface CarbonLabel {
total_co2e_kg: number;
label: string;
label_color: string;
trees_equivalent: number;
km_car_equivalent: number;
offset_cost_eur: number;
}
@Component({
selector: 'app-carbon-label',
template: `
<div class="carbon-label" *ngIf="carbonData">
<div class="label-badge" [style.background]="carbonData.label_color">
{{ carbonData.label }}
</div>
<div class="carbon-info">
<span class="co2e">{{ carbonData.total_co2e_kg | number:'1.1-2' }} kg CO₂e</span>
<span class="equivalent">
= {{ carbonData.km_car_equivalent | number:'1.0-0' }} km in auto
</span>
<button (click)="offsetCarbon()" class="offset-btn">
Compensa per €{{ carbonData.offset_cost_eur | number:'1.2-2' }}
</button>
</div>
</div>
`
})
export class CarbonLabelComponent implements OnChanges {
@Input() cartItems: Array<{ product_id: string; category: string; weight_kg: number; quantity: number }> = [];
@Input() shippingMethod = 'road';
carbonData: CarbonLabel | null = null;
loading = false;
constructor(private http: HttpClient) {}
ngOnChanges(changes: SimpleChanges): void {
if (changes['cartItems'] || changes['shippingMethod']) {
this.loadCarbonLabel();
}
}
private loadCarbonLabel(): void {
if (!this.cartItems.length) return;
this.loading = true;
this.http.post<CarbonLabel>('/api/v1/carbon-label', {
items: this.cartItems,
shipping_method: this.shippingMethod,
destination_region: 'IT',
origin_region: 'CN',
}).subscribe({
next: (data) => {
this.carbonData = data;
this.loading = false;
},
error: (err) => {
console.error('Errore carbon label:', err);
this.loading = false;
},
});
}
offsetCarbon(): void {
// Integrazione con piattaforma di offsetting
window.open('https://example.com/offset', '_blank');
}
}
10. 테스트 및 모의 API
CI/CD에서는 Climatiq API에 대한 실제 호출을 원하지 않습니다. 구조화하는 방법은 다음과 같습니다. 성공과 실패 응답을 모두 시뮬레이션하는 강력한 모의 테스트를 수행합니다.
# tests/conftest.py
import pytest
from unittest.mock import AsyncMock, MagicMock
from app.climatiq_client import ClimatiqClient
@pytest.fixture
def mock_climatiq_client():
"""
Mock del client Climatiq per test unitari.
Simula risposte realistiche senza chiamate API reali.
"""
client = AsyncMock(spec=ClimatiqClient)
# Risposta standard per stima elettricità italiana
electricity_response = {
"co2e": 415.0,
"co2e_unit": "kg",
"co2e_calculation_method": "ar5",
"emission_factor": {
"activity_id": "electricity-supply_grid-source_residual_mix",
"source": "IEA",
"year": 2022,
"region": "IT",
"category": "Electricity",
"data_quality_flags": []
},
"constituent_gases": {
"co2e_total": 415.0,
"co2": 415.0,
"ch4": None,
"n2o": None
}
}
# Risposta per diesel
diesel_response = {
"co2e": 302.15,
"co2e_unit": "kg",
"co2e_calculation_method": "ar5",
"emission_factor": {
"activity_id": "fuel_type-diesel",
"source": "DEFRA",
"year": 2023,
"region": "IT",
"category": "Fuel"
},
"constituent_gases": {
"co2e_total": 302.15,
"co2": 301.5,
"ch4": 0.02,
"n2o": 0.63
}
}
# Configura mock per rispondere in base all'activity_id
async def mock_estimate(activity_id, parameters, region=None):
if "electricity" in activity_id:
kwh = parameters.get("energy", 1000)
return {**electricity_response, "co2e": kwh * 0.415}
elif "diesel" in activity_id:
litres = parameters.get("volume", 100)
return {**diesel_response, "co2e": litres * 2.52}
return electricity_response
client.estimate = AsyncMock(side_effect=mock_estimate)
client.batch_estimate = AsyncMock(return_value=[electricity_response])
return client
# tests/test_activity_based.py
import pytest
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_fleet_scope1_calculates_correctly(mock_climatiq_client):
"""Test che il calcolo Scope 1 flotta produca risultati corretti."""
from app.services.fleet_calculator import calculate_fleet_scope1
result = await calculate_fleet_scope1(
client=mock_climatiq_client,
litres_diesel=100.0,
region="IT"
)
assert result["co2e_kg"] == pytest.approx(252.0, rel=0.01)
assert result["scope"] == "scope_1"
mock_climatiq_client.estimate.assert_called_once()
@pytest.mark.asyncio
async def test_rate_limit_error_is_handled(mock_climatiq_client):
"""Test che rate limit error non lasci il sistema in stato inconsistente."""
from app.climatiq_client import ClimatiqAPIError
mock_climatiq_client.estimate = AsyncMock(
side_effect=ClimatiqAPIError("Rate limit", 429, {"error": "rate_limit"})
)
with pytest.raises(ClimatiqAPIError) as exc_info:
await mock_climatiq_client.estimate(
"electricity-supply_grid-source_residual_mix",
{"energy": 1000, "energy_unit": "kWh"},
"IT"
)
assert exc_info.value.status_code == 429
@pytest.mark.asyncio
async def test_scope2_100_renewable_returns_zero(mock_climatiq_client):
"""Test che 100% rinnovabile dia emissioni zero (market-based)."""
from app.services.scope2_calculator import (
calculate_scope2_electricity, Scope2Method
)
result = await calculate_scope2_electricity(
client=mock_climatiq_client,
kwh_consumed=100_000,
region="IT",
method=Scope2Method.MARKET_BASED,
renewable_percentage=100.0
)
assert result["co2e_kg"] == 0.0
# Non deve chiamare l'API (nessun consumo da rete)
mock_climatiq_client.estimate.assert_not_called()
11. Scala의 배치 추정 및 캐싱 전략
수천 건의 기록을 처리하는 애플리케이션(월별 보고서, 이력 분석)의 경우, 이를 위해서는 배치 API 호출과 Redis 캐시의 조합이 필수적입니다. 처리 시간과 API 비용 모두.
# batch_processor.py
# Elaborazione batch per calcoli di emissioni su larga scala
import asyncio
from datetime import datetime
from typing import AsyncIterator
import redis.asyncio as redis
async def process_monthly_emissions_report(
api_key: str,
records: list[dict],
redis_url: str = "redis://localhost:6379"
) -> dict:
"""
Elabora report mensile emissioni per grandi dataset.
records: lista di attività (flotta, energia, acquisti, etc.)
Restituisce: aggregato mensile Scope 1, 2, 3.
"""
redis_client = await redis.from_url(redis_url)
async with ClimatiqClient(
api_key=api_key,
redis_client=redis_client,
cache_ttl=86400 * 7, # 7 giorni per fattori stabili
data_version="21" # Versione fissa per riproducibilità
) as client:
# Raggruppa per tipo di emissione
scope1_records = [r for r in records if r["scope"] == "scope1"]
scope2_records = [r for r in records if r["scope"] == "scope2"]
scope3_records = [r for r in records if r["scope"] == "scope3"]
# Elabora in parallelo i tre scope
scope1_result, scope2_result, scope3_result = await asyncio.gather(
_process_scope1_batch(client, scope1_records),
_process_scope2_batch(client, scope2_records),
_process_scope3_batch(client, scope3_records)
)
total = (
scope1_result["total_co2e_kg"] +
scope2_result["total_co2e_kg"] +
scope3_result["total_co2e_kg"]
)
await redis_client.aclose()
return {
"report_generated_at": datetime.utcnow().isoformat(),
"total_co2e_kg": total,
"total_co2e_tco2e": total / 1000,
"scope1": scope1_result,
"scope2": scope2_result,
"scope3": scope3_result,
"record_count": len(records)
}
async def _process_scope1_batch(
client: ClimatiqClient, records: list[dict]
) -> dict:
"""Elabora batch Scope 1 con chunking automatico."""
if not records:
return {"total_co2e_kg": 0.0, "record_count": 0}
batch_requests = [
{
"emission_factor": {
"activity_id": r["activity_id"],
"data_version": "21",
"region": r.get("region", "IT")
},
"parameters": {
r["param_key"]: r["param_value"],
f"{r['param_key']}_unit": r["param_unit"]
}
}
for r in records
]
results = await client.batch_estimate(batch_requests)
total = sum(r.get("co2e", 0) for r in results if "error" not in r)
errors = [r for r in results if "error" in r]
if errors:
import logging
logging.warning(f"{len(errors)} errori nel batch Scope 1")
return {
"total_co2e_kg": total,
"record_count": len(records),
"error_count": len(errors)
}
12. Climatiq의 대안: 비교
Climatiq이 유일한 옵션은 아닙니다. 주요 내용을 비교해 보겠습니다. 올바른 솔루션을 선택하는 데 도움이 되는 대안:
| 해결책 | 주력 | 한계 | 이상적인 사용 사례 | 무료 플랜 |
|---|---|---|---|---|
| 기후 | 190,000개 이상의 요소, 40개 이상의 소스, 강력한 API | 커뮤니티 플랜에서는 월 250통화 | 기업, 다중 범위 생산 | 250 통화/월 |
| 카본 인터페이스 | 간단한 API, 훌륭한 개발자 경험 | 요인 감소, 미국에 집중 | 스타트업, 전자상거래, 배송 | 예(제한적) |
| 개방형 배출계수 | Climatiq에서 관리하는 무료 오픈 소스 | 데이터 세트만 해당, 직접 REST API 없음 | 연구, 원시 데이터 액세스 | 예(데이터 세트) |
| 유역 API | 감사 가능한 엔터프라이즈 기능 | 기업 전용, 비용이 많이 듦 | 대기업, CSRD 보고 | No |
| EPA 비행 데이터베이스 | 무료, 미국 정부 | 미국에만 해당(REST API 아님) | 미국 보고, 연구 | 예(데이터 세트) |
| 맞춤형 DB(DEFRA/IEA) | 완전한 제어, API 비용 없음 | 비용이 많이 드는 내부 유지 관리 | 전담팀을 갖춘 대규모 조직 | 예(공공 데이터) |
사용자 정의 데이터베이스를 사용해야 하는 경우
어떤 경우에는 배출계수에 대한 내부 데이터베이스를 구축하는 것이 합리적입니다. 이를 처리하는 PostgreSQL 스키마는 다음과 같습니다.
-- Schema PostgreSQL per database fattori di emissione personalizzato
CREATE TABLE IF NOT EXISTS emission_factors (
id SERIAL PRIMARY KEY,
activity_category VARCHAR(200) NOT NULL,
activity_name VARCHAR(500),
region_code VARCHAR(10) NOT NULL,
reference_year INTEGER NOT NULL,
unit_type VARCHAR(50) NOT NULL, -- 'litre', 'kWh', 'tonne', 'EUR'
factor_kg_co2e_per_unit DECIMAL(12, 6) NOT NULL,
source VARCHAR(100) NOT NULL, -- 'DEFRA', 'ISPRA', 'EPA'
source_version VARCHAR(50),
source_priority INTEGER DEFAULT 1,
constituent_co2 DECIMAL(12, 6),
constituent_ch4 DECIMAL(12, 6),
constituent_n2o DECIMAL(12, 6),
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_ef_lookup ON emission_factors
(activity_category, region_code, reference_year, unit_type);
-- Query di lookup con fallback regionale
SELECT factor_kg_co2e_per_unit, source, reference_year
FROM emission_factors
WHERE activity_category = $1
AND reference_year = $2
AND unit_type = $3
AND region_code IN ($4, 'EU', 'GLOBAL') -- Fallback a EU poi globale
ORDER BY
CASE region_code
WHEN $4 THEN 1 -- Regione specifica prima
WHEN 'EU' THEN 2 -- Poi EU
ELSE 3 -- Poi globale
END,
source_priority DESC,
reference_year DESC
LIMIT 1;
사용자 정의 데이터베이스가 올바른 선택이 아닌 경우
배출계수 데이터베이스를 구축하고 유지하려면 다음이 필요합니다. 2~3개월 동안 헌신적인 사람 초기 건설을 위해, 필수 반기별 업데이트(요소는 매년 변경됨), 감사 추적을 위한 버전 관리 및 자주 발생하는 지리적 범위 비OECD 지역에서는 누락되었습니다. 대부분의 기업의 경우, Climatiq은 탁월한 ROI를 제공합니다. API 비용을 고려하더라도 말이죠.
13. 사례 연구: 카본 라벨을 사용한 전자상거래 플랫폼
에코샵 이탈리아 50,000개의 제품을 보유한 전자상거래 플랫폼입니다. 200,000 주문/월. 요구사항: 모든 제품에 탄소 라벨을 추가하세요. 오프셋 옵션을 사용하여 결제 시 예상 배출량을 표시합니다.
구현된 아키텍처
- 제품 카탈로그 강화: 사전 계산하는 야간 작업 배치 엔드포인트를 사용하여 각 SKU에 대한 생산 배출량입니다. 결과가 저장되었습니다. TTL이 30일인 제품 데이터베이스에 있습니다.
- 실시간 결제: 결제 시 배출량을 동적으로 계산합니다. 장바구니 무게, 목적지 지역 및 선택한 방법에 따라 배송됩니다. 목표 지연 시간: 200ms 미만(표준 배송의 경우 Redis 캐시 사용)
- 카본 대시보드 판매자: 판매자를 위한 월간 보고서 범위 3.4(업스트림 운송) 및 추정 범위 3.11(판매된 제품의 사용).
# ecoshop_carbon_service.py
# Servizio di calcolo emissioni per EcoShop Italia
from dataclasses import dataclass
from typing import Optional
import asyncio
from datetime import datetime, timedelta
@dataclass
class ProductEmissionProfile:
"""Profilo emissioni di un prodotto nel catalogo."""
sku: str
production_co2e_kg: float
packaging_co2e_kg: float
category: str
origin_region: str
calculated_at: datetime
climatiq_data_version: str
@property
def total_product_co2e_kg(self) -> float:
return self.production_co2e_kg + self.packaging_co2e_kg
@property
def is_stale(self) -> bool:
"""Profilo è obsoleto se più vecchio di 30 giorni."""
return datetime.utcnow() - self.calculated_at > timedelta(days=30)
class EcoShopCarbonService:
"""
Servizio carbon per EcoShop: gestisce calcoli produzione,
spedizione e report venditori.
"""
def __init__(
self,
climatiq_client: ClimatiqClient,
db_pool, # asyncpg pool
):
self.climatiq = climatiq_client
self.db = db_pool
async def get_product_emission_profile(
self, sku: str
) -> Optional[ProductEmissionProfile]:
"""
Recupera profilo emissioni dal DB.
Se assente o obsoleto, ricalcola via Climatiq.
"""
# Controlla DB
profile = await self._load_from_db(sku)
if profile and not profile.is_stale:
return profile
# Ricalcola
product = await self._get_product_data(sku)
if not product:
return None
new_profile = await self._calculate_product_emissions(product)
await self._save_to_db(new_profile)
return new_profile
async def calculate_checkout_carbon(
self,
cart_items: list[dict],
destination_region: str,
shipping_method: str
) -> dict:
"""
Calcola carbon label per checkout in tempo reale.
Target: < 200ms con cache.
"""
# Recupera profili prodotti (con cache)
profiles = await asyncio.gather(*[
self.get_product_emission_profile(item["sku"])
for item in cart_items
])
# Calcola emissioni produzione
production_co2e = sum(
(profile.total_product_co2e_kg * item.get("quantity", 1))
for profile, item in zip(profiles, cart_items)
if profile is not None
)
# Calcola emissioni spedizione
total_weight_kg = sum(
item.get("weight_kg", 0.5) * item.get("quantity", 1)
for item in cart_items
)
shipping_co2e = await self._estimate_shipping(
total_weight_kg, destination_region, shipping_method
)
total = production_co2e + shipping_co2e
label, color = calculate_carbon_label(total)
return {
"total_co2e_kg": round(total, 3),
"production_co2e_kg": round(production_co2e, 3),
"shipping_co2e_kg": round(shipping_co2e, 3),
"label": label,
"label_color": color,
"offset_price_eur": round((total / 1000) * 15.0, 2),
"generated_at": datetime.utcnow().isoformat()
}
# Risultati dopo 6 mesi
ECOSHOP_METRICS = {
"prodotti_con_carbon_label": 47_832,
"ordini_con_label_mese": 180_000,
"percentuale_utenti_offset": 8.3, # % utenti che comprano offset
"revenue_offset_mensile_eur": 4_200,
"latenza_media_ms": 45, # ms (grazie a cache Redis)
"api_calls_risparmiate_cache": "92%",
"co2e_totale_calcolato_mese_tco2e": 1_240,
"nps_incremento": +12 # punti NPS grazie a trasparenza
}
14. 프로덕션 환경에서의 배포 및 보안 구성
# app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
from functools import lru_cache
class Settings(BaseSettings):
"""Configurazione applicazione da variabili d'ambiente."""
# Climatiq
climatiq_api_key: str
climatiq_data_version: str = "^21"
# Cache
cache_ttl_seconds: int = 3600
redis_url: str = "redis://localhost:6379"
# API Security
api_secret_key: str
allowed_origins: list[str] = ["https://dashboard.tuaazienda.it"]
# Database (per audit trail)
database_url: str = "postgresql://user:pass@localhost/ghg_db"
model_config = SettingsConfigDict(env_file=".env", case_sensitive=False)
@lru_cache
def get_settings() -> Settings:
return Settings()
# .env (NON committare in git - usare secrets manager in produzione)
# CLIMATIQ_API_KEY=clq_live_xxxxxxxxxxxxxxxxxxxxxxx
# CLIMATIQ_DATA_VERSION=^21
# API_SECRET_KEY=generato-con-openssl-rand-hex-32
# CACHE_TTL_SECONDS=3600
# REDIS_URL=redis://redis:6379
# DATABASE_URL=postgresql://ghg_user:password@postgres:5432/ghg_db
Climatiq API 키를 노출하지 마세요.
Climatiq API 키는 소스 코드, 로그, 응답 API 또는 클라이언트 측 JavaScript 변수에서. 항상 다음을 사용하십시오. 환경 변수, 비밀 관리자(AWS Secrets Manager, HashiCorp Vault, Azure Key Vault) 또는 미사용 암호화가 포함된 Kubernetes 비밀. 실수로 키를 노출한 경우 즉시 키를 돌리십시오.
15. 모범 사례 및 안티 패턴
모범 사례
-
프로덕션에서 data_version을 잠급니다. 미국
"21"대신에"^21"계산의 재현성을 보장합니다. 명시적 재계산을 통해 의도적으로 업데이트합니다. - 적극적인 캐싱: 배출계수는 거의 변하지 않습니다. 24~168시간 캐시는 API 호출을 크게 줄여줍니다. 반복 계산용(월별 차량, 제품 카탈로그)
- 항상 감사 추적: Activity_id로 각 계산을 저장합니다. 사용된 요소, 데이터 버전, 타임스탬프 및 소스. CSRD에 필수적입니다.
- 가능하면 배치를 사용하세요. 단일 일괄 호출 100개의 항목은 100개의 개별 호출보다 훨씬 더 효율적입니다. 속도 제한을 준수하고 총 대기 시간을 줄입니다.
- data_quality_flags 검증: Climatiq은 다음과 같은 경우를 보고합니다. 요인의 데이터 품질이 낮습니다. 대체 요인으로 이러한 사례를 처리합니다. 또는 보고서의 불확실성에 대한 메모.
- 범위 2에 대한 이중 보고: GHG 프로토콜은 다음을 요구합니다. 위치 기반 방법과 시장 기반 방법 모두. 둘 다 계산하고 저장하세요.
- TypeScript용 인메모리 캐시: TTL을 사용하여 지도 구현 애플리케이션에서 중복 호출을 방지하기 위해 TypeScript 클라이언트에서 Redis 없이 수명이 긴 Node.js.
피해야 할 안티패턴
- 프런트엔드의 API 키: Climatiq 키를 노출시키지 마십시오. 클라이언트 측 JavaScript. 항상 백엔드를 통과합니다.
- 구성 가스를 무시하십시오. 정확한 CSRD 보고를 위해 총 CO2e 외에 CO2, CH₄, N2O를 별도로 보고해야 합니다.
- 잘못된 측정 단위: Climatiq은 다음 사이의 변환을 허용합니다. 단, 엔드포인트에서 kWh가 결과를 생성할 것으로 예상할 때 리터를 전달합니다. 수치적으로는 그럴듯하지만 과학적으로는 틀렸습니다. 항상 유효합니다.
- 배치 > 100개 항목: API가 오류 422를 반환합니다. 대규모 데이터세트의 경우 항상 청크 논리를 구현하세요.
-
지역 일치 무시: 전기에 대한 요소
지역을 지정하지 않고 전역 기본값을 사용합니다. 이탈리아의 경우 항상 사용하십시오.
"IT". -
동기 호출 차단: 동기식 HTTP 라이브러리를 사용하지 마세요
비동기 끝점에서. 항상 사용
httpx.AsyncClient파이썬에서 또는axios~와 함께async/await타입스크립트에서.
결론 및 다음 단계
Climatiq은 자동화된 탄소 회계의 가장 어려운 문제를 해결합니다. 는 배출계수 데이터베이스. 190,000개 이상의 검증된 요소로, 300개 지역을 커버하고 지속적인 업데이트를 통해 몇 달이 아닌 며칠 만에 생산 준비가 완료된 GHG 계산 시스템.
이 기사에서는 다음을 구축했습니다.
- Un 강력한 Python 클라이언트 재시도, Redis 캐시 및 입력된 오류 처리 포함
- Un TypeScript/Node.js 클라이언트 프런트엔드 통합을 위한 Axios 및 전체 유형 안전성 포함
- 다음에 대한 계산 범위 1 (디젤/HVO 차량), 범위 2 (이중 방법) 그리고 주요 카테고리 범위 3
- Un 탄소라벨 API 실시간 A-E 라벨 및 오프셋을 사용한 전자상거래용
- 테스트 모의 API 실제 호출이 없는 CI/CD 환경용
- 전략 일괄 처리 및 캐싱 엔터프라이즈 규모용
그린 소프트웨어 시리즈는 계속됩니다
- 이전 기사: CodeCarbon - 코드 방출 측정 오픈 소스 Python 라이브러리로 실행 중입니다.
- 다음 기사: Carbon Aware SDK - 워크로드를 전환하는 방법 그리드 강도 예측을 사용하여 몇 시간 안에 가장 깨끗한 에너지를 사용합니다.
- 관련 기사(MLOps 시리즈): 최적화 탄소 배출을 줄이기 위해 ML 모델을 교육합니다.
- 관련 기사(데이터 및 AI 비즈니스 시리즈): 데이터 거버넌스 신뢰할 수 있는 AI를 위해 - 지속 가능성 지표를 데이터 카탈로그에 통합하는 방법.
다음 실제 단계는 Climatiq의 커뮤니티 플랜에 가입하는 것입니다. (월 250통화 무료) 데이터 탐색기 귀하의 산업과 관련된 요소를 찾아 구현하십시오. 가장 간단한 사용 사례에 대한 범위 1을 첫 번째 마일스톤으로 계산합니다.
CSRD(2025년부터 EU 대기업에 의무화)의 규제 압력으로 인해 2026년부터 중소기업으로 확대) 및 ESG 지표에 대한 투자자의 관심이 높아지고 있습니다. 자동화된 탄소 회계를 위한 기술 인프라를 갖추는 것은 더 이상 불가능합니다. 경쟁 우위: 그것은 운영상의 필요성.







