EV 충전 부하 분산: 지능형 충전을 위한 실시간 알고리즘
화요일 저녁 6시 30분입니다. 수천 명의 이탈리아 운전자들이 퇴근 후 집으로 돌아옵니다. 전기차를 주차하고 충전 케이블을 연결합니다. 몇 분 후에 질문이 나옵니다. 인근 변압기의 전력이 위쪽으로 쏠립니다. 지능형 관리가 없으면 이 시나리오는 공원이 성장함에 따라 매일 점점 더 강렬하게 반복됩니다. EV - 네트워크 과부하, 국지적 정전 및 인프라 업그레이드 비용 발생 수십억 유로 정도.
2025년에는 더 많이 유통될 것입니다. 유럽의 전기차 1,700만 대, 그 중 대략 이탈리아에서는 230,000명, 2025년에만 94,230명의 신규 등록이 이루어졌습니다(2024년 대비 +46%). 2030년까지의 예측에 따르면 유럽에서는 5천만 대 이상의 EV가 필요합니다. 각 차량에는 평균적으로 충전 시 전력은 7~22kW입니다. 재충전하는 수백만 명의 사용자를 곱함 같은 날 저녁 시간에 전례 없는 네트워크 안정성 문제가 발생했습니다.
해결책은 더 많은 네트워크를 구축하는 것이 아닙니다. 비용이 너무 많이 들고 느립니다. 해결책은 지능형 로드 밸런싱: 전력을 분배하는 실시간 알고리즘 충전 차량 간에 사용 가능, 네트워크 제약 사항 존중, 에너지 비용 최소화 사용자 만족도를 극대화합니다. 이 기사에서는 전체 스택을 살펴봅니다. 기술: OCPP 2.0.1 프로토콜부터 최적화 알고리즘, 강화 학습까지 작동하는 Python 코드와 이탈리아 규제 상황을 통해 V2G 통합까지.
이 기사에서 배울 내용
- 오리 곡선과 관리되지 않는 EV가 유통 네트워크에 미치는 영향
- 로드 밸런싱 알고리즘 분류: 정적, 동적, 예측
- OCPP 2.0.1 스마트 충전: SetChargingProfile, ChargerSchedule, StackLevel
- Python 구현: 실시간 동적 재계산을 통한 Equal Share
- 힙 큐(SoC, 마감일, 속도)를 사용한 우선순위 기반 알고리즘
- 에너지 비용 최적화를 위한 강화 학습(PPO)
- V2G(Vehicle-to-Grid): 양방향, ISO 15118-20, 주파수 조정
- 태양광 통합: 태양열 과잉 및 EV로의 전환
- 마이크로서비스 아키텍처: 충전 컨트롤러, 에너지 관리자, 예측자
- 이탈리아 상황: 2시간당 관세, EV가 포함된 CER, ARERA 규정
EnergyTech 시리즈 - 10개 기사
| # | Articolo | 상태 |
|---|---|---|
| 1 | 스마트 그리드와 IoT: 미래의 전력망을 위한 아키텍처 | 게시됨 |
| 2 | DERMS 아키텍처: 수백만 개의 분산 리소스 수집 | 게시됨 |
| 3 | 배터리 관리 시스템: BESS 제어 알고리즘 | 게시됨 |
| 4 | Python과 Pandapower를 사용한 전력망의 디지털 트윈 | 게시됨 |
| 5 | 재생 에너지 예측: PV 및 풍력을 위한 ML | 게시됨 |
| 6 | EV 충전 부하 분산: 실시간 알고리즘(현재 위치) | 현재의 |
| 7 | 실시간 에너지 원격 측정을 위한 MQTT 및 InfluxDB | 곧 출시 예정 |
| 8 | IEC 61850: 전기 변전소에서의 통신 | 곧 출시 예정 |
| 9 | 탄소 회계 소프트웨어: 배출량 측정 및 감소 | 곧 출시 예정 |
| 10 | CER의 P2P 에너지 거래를 위한 블록체인 | 곧 출시 예정 |
최대 문제: 오리 커브(Duck Curve)와 관리되지 않는 EV
EV 로드 밸런싱이 선택이 아닌 필수인 이유를 이해하려면 먼저 시작해야 합니다. 전력망의 물리학과 전력망 관리자가 매일 두려워하는 개념에서: 는 오리 곡선.
오리 곡선(Duck Curve)과 그 악화
오리곡선은 순전력수요곡선의 모양을 나타낸다. 태양광 침투율이 높은 시스템의 일반적인 하루 동안. 그게 바로 그거야 곡선이 오리의 옆모습과 비슷하기 때문입니다. 한낮의 배가 낮습니다( 태양광 발전은 풍부하고 순 수요가 낮음) 그리고 저녁에 높은 혹(때 해가 지고, PV 생산이 붕괴되고, 주택 수요가 급증합니다.
EV가 없으면 이탈리아 그리드는 이미 이 문제를 매일 처리하고 있습니다. 전기차의 성장과 함께 관리하지 않으면 문제가 급격히 증폭됩니다. MDPI 연구(2025)에 따르면 최적화를 통해 최대 수요는 500만 대의 EV를 기준으로 22,000MW에서 35,000MW로 증가합니다. 스마트 충전을 사용하면 동일한 차량에서 야간 시간 동안 분배되는 전력이 2,000~3,000MW만 추가됩니다.
| 대본 | 피크타임 | 추가하중 | 네트워크 위험 |
|---|---|---|---|
| 기준(EV 없음) | 오후 7시~8시 | 0MW | 다루기 쉬운 |
| 관리되지 않는 EV 500,000대 | 오후 6시 30분 - 오후 7시 30분 | +3,500MW | 변압기의 국부적 스트레스 |
| 150만 대의 관리되지 않는 EV | 18:00-20:00 | +10,500MW | 광범위한 과부하 |
| 관리되지 않는 EV 500만 대 | 오후 6시 - 오후 9시 | +35,000MW | 전신 정전 |
| 스마트 충전 기능을 갖춘 150만 EV | 오후 10시~오전 6시 배달됨 | +2,100MW 희석 | 무시할 만한 |
AI 기반 충전 프로필 최적화를 통해 최대 수요를 줄일 수 있습니다. 150만 대의 EV로 16%, 300만개로 21%, 500만개로 34% 증가 (출처: MDPI 전자, 2025). 관리되는 시나리오와 관리되지 않는 시나리오의 차이점 몇 퍼센트 포인트가 아닙니다. 안정적인 네트워크와 붕괴의 차이입니다.
배전 변압기에 미치는 영향
문제는 전송 시스템 수준뿐만 아니라 무엇보다도 네트워크 수준에 있습니다. 현지 유통. 400kVA 인접 변압기는 일반적으로 80-120을 제공합니다. 가족. 이 중 15개가 오후 6시 30분에 모두 11kW로 충전을 시작하는 EV를 가지고 있다면, 추가 부하는 165kW로 변압기 정격 용량의 거의 41%에 해당합니다. 국내 소비의 경우 이미 60-70%에 도달했을 수 있습니다. 결과: 과열, 감소 수명을 연장하고 최악의 경우 보호 장치를 개방합니다.
무활동의 대가
Terna에 따르면 이탈리아 유통망을 강화하는 데 드는 비용은 다음과 같습니다. 스마트 충전이 없는 EV 전환은 다음과 같이 추정됩니다. 100억~150억 유로 2030년까지. 스마트 충전이 널리 보급되면 전환 자체에 투자가 필요함 해당 시간 동안 로드를 이동하기만 하면 인프라 비용이 60~70% 낮아집니다. 네트워크에 사용 가능한 용량이 있습니다.
로드 밸런싱 알고리즘 분류
최적의 단일 로드 밸런싱 알고리즘은 없습니다. 선택은 크기에 따라 다릅니다. 설치, 기록 데이터의 가용성, 대기 시간 요구 사항 및 관리자가 기꺼이 유지하려는 복잡성. 우리는 접근 방식을 세 가지로 분류합니다. 주요 가족.
정적 알고리즘
정적 알고리즘은 선험적으로 전력 분배 규칙을 정의합니다. 시스템 상태에 동적으로 적응하지 않고. 구현이 간단합니다. 디버깅 기능이 있어 소규모 동종 설치에 이상적입니다.
- 라운드 로빈: 각 커넥터는 회전당 동일한 최대 전력을 받습니다. 간단하지만 비효율적입니다. 거의 충전된 EV는 빈 EV와 동일한 전력을 사용합니다.
- 균등 공유: 총 가용 전력은 다음과 같이 균등하게 나뉩니다. 모든 활성 커넥터. 동종 설치에 효과적이지만 다음 사항을 고려하지 않습니다. 개인의 필요.
- 선착순: 연결된 첫 번째 EV는 최대 전력을 받습니다. 후속 것들은 잔여물을 나눕니다. 늦게 도착하는 사람은 처벌을 받습니다.
동적 알고리즘
동적 알고리즘은 상태에 따라 실시간으로 분포를 조정합니다. 시스템 전류: 연결된 EV 수, 충전 수준(SoC), 사용 가능한 전력, 사용자 우선순위.
- 비례항: 전력은 에너지에 비례하여 분배됩니다. 각 EV에서 요청한 것입니다. 배터리가 가장 낮은 사람이 가장 많은 전력을 받습니다.
- 우선순위 기반: 각 EV에는 여러 항목에 대해 계산된 우선순위 점수가 있습니다. 요소(SoC, 마감일, 구독). 권력은 더 높은 우선순위로 흐른다.
- 퍼지 논리: "SoC 및 낮은 E 마감일 경우"와 같은 언어 규칙 닫으면 우선순위가 높습니다." 입력 데이터의 불확실성을 잘 처리합니다.
예측 알고리즘(ML 기반)
예측 알고리즘은 기계 학습 모델을 사용하여 미래의 이벤트를 예측합니다. (도착, 출발, 에너지 가격 변동) 및 일정 최적화 장기적으로 총 비용을 최소화합니다.
- 모델 예측 제어(MPC): 최적화 문제를 해결합니다 각 제어 간격(예: 15분)에서 미래 시간 범위(예: 4시간)에 걸쳐, 첫 번째 단계만 적용하고 다음 단계에서 재구성합니다.
- 심층 강화 학습(DRL): 에이전트는 최적의 정책을 학습합니다. 시뮬레이터와 상호 작용합니다. PPO와 SAC는 생산에 가장 많이 사용됩니다.
- 확률론적 프로그래밍: 불확실성을 명시적으로 통합합니다. EV 도착 및 출발은 최악의 시나리오에 견고한 일정을 생성합니다.
| 연산 | 복잡성 | 숨어 있음 | 최적성 | 요청된 데이터 | 이상적인 사용 사례 |
|---|---|---|---|---|---|
| 균등 공유 | 낮은 | <1ms | 낮은 | 아무도 | 주거용, 매장 10개 미만 |
| 우선순위 기반 | 평균 | <10ms | 중간-높음 | SoC, 사용자 마감일 | 사무실, 호텔, 차량 |
| MPC | 높은 | 100ms-1s | 높은 | 에너지 가격, 예측 | 허브 >50소켓, C&I |
| 딥 RL(PPO) | 매우 높음 | <50ms(추론) | 매우 높음 | 역사 6개월 이상 | 훌륭한 허브, V2G 내장 |
OCPP 2.0.1 스마트 충전: 지능형 충전 프로토콜
OCPP(Open Charge Point Protocol)와 충전 포인트 간 공용 언어(CP) 및 중앙 시스템(CSMS - 충전소 관리 시스템). 버전 2.0.1에서는 스마트 충전은 OCPP 1.6에 비해 크게 개선되었습니다. 더욱 세밀하고 안정적인 충전 프로필 관리.
SetChargingProfile 메커니즘
CSMS는 충전소에 메시지를 보냅니다. SetChargingProfileRequest 포함하는
에 ChargingProfile 이는 스테이션이 시간이 지남에 따라 어떻게 전력을 공급해야 하는지를 정의합니다.
에이 ChargingProfile 그리고 다음으로 구성됩니다:
-
충전 프로필목적:
ChargePointMaxProfile(모두 제한 역),TxDefaultProfile(새 거래의 경우 기본값),TxProfile(진행 중인 거래에만 해당) - 스택레벨: 우선순위를 정의하는 정수입니다. 프로필의 경우 겹치는 경우 stacklevel이 가장 높은 사람이 승리합니다.
- 충전 일정: 기간 목록(초 단위의 startPeriod, 시간 경과에 따른 충전 곡선을 정의하는 A 또는 W 제한.
중요 순서: 증가 전 감소
업데이트된 프로필을 여러 스테이션에 보낼 때 먼저 보내는 것이 중요합니다. 전력 감소 명령을 내린 다음 증가 명령을 내립니다. 먼저 늘리면 짧은 간격 동안 네트워크 제한을 초과하여 전기 보호 또는 그에 따른 계약상 처벌로 인한 수요 피크 발생. 이 규칙은 프로덕션 환경에서 협상할 수 없습니다.
Python 구현: OCPP 프로필 생성 2.0.1
"""
OCPP 2.0.1 Smart Charging Profile Generator
Genera ChargingProfile compatibili con la specifica OCPP 2.0.1.
"""
from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum
import json
from datetime import datetime, timezone
class ChargingRateUnit(str, Enum):
WATTS = "W"
AMPERES = "A"
class ChargingProfilePurpose(str, Enum):
CHARGE_POINT_MAX = "ChargePointMaxProfile"
TX_DEFAULT = "TxDefaultProfile"
TX = "TxProfile"
class ChargingProfileKind(str, Enum):
ABSOLUTE = "Absolute"
RECURRING = "Recurring"
RELATIVE = "Relative"
@dataclass
class ChargingSchedulePeriod:
"""Singolo intervallo nella schedule di ricarica."""
start_period: int # secondi dall'inizio della schedule
limit: float # limite in W o A
number_phases: Optional[int] = None
@dataclass
class ChargingSchedule:
"""Schedule completa di un profilo di ricarica."""
id: int
charging_rate_unit: ChargingRateUnit
charging_schedule_period: List[ChargingSchedulePeriod]
duration: Optional[int] = None
start_schedule: Optional[str] = None
min_charging_rate: Optional[float] = None
@dataclass
class ChargingProfile:
"""Profilo di ricarica OCPP 2.0.1 completo."""
id: int
stack_level: int
charging_profile_purpose: ChargingProfilePurpose
charging_profile_kind: ChargingProfileKind
charging_schedule: List[ChargingSchedule]
transaction_id: Optional[str] = None
valid_from: Optional[str] = None
valid_to: Optional[str] = None
def build_equal_share_profile(
profile_id: int,
power_limit_watts: float,
duration_seconds: int = 3600,
stack_level: int = 1
) -> ChargingProfile:
"""
Costruisce un profilo OCPP per distribuzione equa (potenza fissa).
Usato dall'algoritmo Equal Share per applicare il limite calcolato.
"""
schedule_period = ChargingSchedulePeriod(
start_period=0,
limit=power_limit_watts,
number_phases=3
)
schedule = ChargingSchedule(
id=profile_id,
charging_rate_unit=ChargingRateUnit.WATTS,
charging_schedule_period=[schedule_period],
duration=duration_seconds,
start_schedule=datetime.now(timezone.utc).isoformat()
)
return ChargingProfile(
id=profile_id,
stack_level=stack_level,
charging_profile_purpose=ChargingProfilePurpose.TX,
charging_profile_kind=ChargingProfileKind.ABSOLUTE,
charging_schedule=[schedule]
)
def build_time_of_use_profile(
profile_id: int,
schedule_periods: List[tuple],
stack_level: int = 2
) -> ChargingProfile:
"""
Costruisce un profilo Time-of-Use con potenza variabile nel tempo.
Esempio: alta potenza di notte (energia economica), bassa di giorno.
Args:
schedule_periods: lista di (start_period_sec, limit_watts)
"""
periods = [
ChargingSchedulePeriod(start_period=start, limit=limit)
for start, limit in schedule_periods
]
schedule = ChargingSchedule(
id=profile_id,
charging_rate_unit=ChargingRateUnit.WATTS,
charging_schedule_period=periods,
start_schedule=datetime.now(timezone.utc).isoformat()
)
return ChargingProfile(
id=profile_id,
stack_level=stack_level,
charging_profile_purpose=ChargingProfilePurpose.TX_DEFAULT,
charging_profile_kind=ChargingProfileKind.ABSOLUTE,
charging_schedule=[schedule]
)
def profile_to_ocpp_dict(profile: ChargingProfile) -> dict:
"""Serializza il profilo nel formato JSON OCPP 2.0.1."""
return {
"id": profile.id,
"stackLevel": profile.stack_level,
"chargingProfilePurpose": profile.charging_profile_purpose.value,
"chargingProfileKind": profile.charging_profile_kind.value,
"chargingSchedule": [
{
"id": sched.id,
"chargingRateUnit": sched.charging_rate_unit.value,
"chargingSchedulePeriod": [
{
"startPeriod": p.start_period,
"limit": p.limit,
}
for p in sched.charging_schedule_period
],
**({"duration": sched.duration} if sched.duration else {}),
**({"startSchedule": sched.start_schedule} if sched.start_schedule else {}),
}
for sched in profile.charging_schedule
],
}
if __name__ == "__main__":
# Profilo Equal Share: 3.3 kW per 2 ore
profile = build_equal_share_profile(
profile_id=101,
power_limit_watts=3300.0,
duration_seconds=7200
)
print(json.dumps(profile_to_ocpp_dict(profile), indent=2))
# Profilo Time-of-Use: 11 kW da adesso, 3.3 kW dopo 4 ore
tou = build_time_of_use_profile(
profile_id=102,
schedule_periods=[
(0, 11000), # da ora: 11 kW (fascia fuori punta)
(4 * 3600, 3300), # dopo 4 ore: 3.3 kW (fascia F1 inizia)
],
stack_level=2
)
print(json.dumps(profile_to_ocpp_dict(tou), indent=2))
동적 재계산을 사용한 균등 공유 알고리즘
Equal Share는 실제 설치에서 가장 널리 사용되는 로드 밸런싱 알고리즘입니다. 그 강점은 수학적 우아함이 아니라 운영 견고성에 있습니다. 사용자에게 설명하기 쉽고 디버그하기 쉽고 충분히 효과적입니다. 대부분의 설치에는 최대 30-40개의 소켓이 있습니다. 실제 복잡성은 다음과 같습니다. 동적 재계산 관리: EV가 연결되거나 연결이 끊길 때마다, 또는 DSO 신호에 대해 사용 가능한 전력 변경이 있는 경우 시스템은 다시 계산하고 중요한 시퀀스 감소 후 증가를 고려하여 밀리초 단위로 재분배합니다.
"""
Equal Share Load Balancer
Distribuzione equa con ricalcolo dinamico event-driven.
Thread-safe per deployment in produzione con OCPP WebSocket server.
"""
import asyncio
import logging
from dataclasses import dataclass
from typing import Dict, Optional, Callable, Awaitable
from enum import Enum
import time
logger = logging.getLogger(__name__)
class ConnectorStatus(str, Enum):
AVAILABLE = "Available"
CHARGING = "Charging"
SUSPENDED_EV = "SuspendedEV"
FINISHING = "Finishing"
UNAVAILABLE = "Unavailable"
FAULTED = "Faulted"
@dataclass
class ConnectorState:
"""Stato di un singolo connettore nella stazione."""
connector_id: str
status: ConnectorStatus = ConnectorStatus.AVAILABLE
transaction_id: Optional[str] = None
allocated_power_w: float = 0.0
max_power_w: float = 22000.0 # max fisico del connettore (22 kW AC trifase)
min_power_w: float = 1380.0 # min per mantenere la carica (6A x 230V)
soc_percent: Optional[float] = None
connected_at: Optional[float] = None
@property
def is_charging(self) -> bool:
return self.status == ConnectorStatus.CHARGING
@property
def session_duration_min(self) -> float:
if self.connected_at is None:
return 0.0
return (time.time() - self.connected_at) / 60.0
@dataclass
class StationConfig:
"""Configurazione della stazione di ricarica."""
station_id: str
max_station_power_w: float
dynamic_limit_w: Optional[float] = None
rebalance_interval_s: float = 30.0
@property
def available_power_w(self) -> float:
if self.dynamic_limit_w is not None:
return min(self.max_station_power_w, self.dynamic_limit_w)
return self.max_station_power_w
ProfileSendCallback = Callable[[str, str, float], Awaitable[bool]]
class EqualShareBalancer:
"""
Load balancer Equal Share thread-safe e asincrono.
Gestisce la distribuzione equa della potenza disponibile
tra tutti i connettori attivamente in carica.
"""
def __init__(
self,
config: StationConfig,
profile_sender: ProfileSendCallback
):
self._config = config
self._connectors: Dict[str, ConnectorState] = {}
self._send_profile = profile_sender
self._lock = asyncio.Lock()
self._last_allocation: Dict[str, float] = {}
async def register_connector(
self,
connector_id: str,
max_power_w: float = 22000.0,
min_power_w: float = 1380.0
) -> None:
async with self._lock:
self._connectors[connector_id] = ConnectorState(
connector_id=connector_id,
max_power_w=max_power_w,
min_power_w=min_power_w
)
logger.info("Connettore %s registrato (max=%.0f W)", connector_id, max_power_w)
async def on_ev_connected(self, connector_id: str, transaction_id: str) -> None:
"""Chiamato quando un EV si connette. Triggera ricalcolo immediato."""
async with self._lock:
if connector_id not in self._connectors:
logger.warning("Connettore sconosciuto: %s", connector_id)
return
self._connectors[connector_id].status = ConnectorStatus.CHARGING
self._connectors[connector_id].transaction_id = transaction_id
self._connectors[connector_id].connected_at = time.time()
await self._rebalance()
async def on_ev_disconnected(self, connector_id: str) -> None:
"""Chiamato quando un EV si disconnette. Libera la potenza e redistribuisce."""
async with self._lock:
if connector_id not in self._connectors:
return
connector = self._connectors[connector_id]
connector.status = ConnectorStatus.AVAILABLE
connector.transaction_id = None
connector.allocated_power_w = 0.0
connector.connected_at = None
await self._rebalance()
async def update_dynamic_limit(self, limit_w: float) -> None:
"""Aggiorna il limite DSO in tempo reale. Triggera ricalcolo immediato."""
async with self._lock:
self._config.dynamic_limit_w = limit_w
logger.info("Limite DSO aggiornato: %.0f W (stazione %s)", limit_w, self._config.station_id)
await self._rebalance()
async def _rebalance(self) -> None:
"""
Calcola la nuova allocazione Equal Share e invia i profili OCPP.
REGOLA CRITICA: invia prima le riduzioni, poi gli aumenti.
"""
async with self._lock:
active = [c for c in self._connectors.values() if c.is_charging]
if not active:
return
available = self._config.available_power_w
n = len(active)
raw_share = available / n
new_allocations: Dict[str, float] = {}
# Prima passata: applica limiti min/max per connettore
constrained = []
free = []
for connector in active:
if raw_share <= connector.min_power_w:
new_allocations[connector.connector_id] = connector.min_power_w
constrained.append(connector)
elif raw_share >= connector.max_power_w:
new_allocations[connector.connector_id] = connector.max_power_w
constrained.append(connector)
else:
free.append(connector)
# Seconda passata: redistribuisce il residuo tra i connettori liberi
if free:
allocated_constrained = sum(
new_allocations[c.connector_id] for c in constrained
)
residual = available - allocated_constrained
share_free = residual / len(free)
for connector in free:
new_allocations[connector.connector_id] = max(
connector.min_power_w,
min(connector.max_power_w, share_free)
)
# Ordina: prima le riduzioni, poi gli aumenti (regola OCPP)
reductions = []
increases = []
for connector_id, new_power in new_allocations.items():
old_power = self._last_allocation.get(connector_id, float('inf'))
if new_power < old_power:
reductions.append((connector_id, new_power))
else:
increases.append((connector_id, new_power))
for connector_id, power in reductions:
success = await self._send_profile(self._config.station_id, connector_id, power)
if success:
self._last_allocation[connector_id] = power
logger.info("Riduzione: %s -> %.0f W", connector_id, power)
for connector_id, power in increases:
success = await self._send_profile(self._config.station_id, connector_id, power)
if success:
self._last_allocation[connector_id] = power
logger.info("Aumento: %s -> %.0f W", connector_id, power)
logger.info(
"Ribilanciamento: %d EV attivi, %.0f W disponibili, stazione %s",
len(active), self._config.available_power_w, self._config.station_id
)
async def start_periodic_rebalance(self) -> None:
"""Avvia il ricalcolo periodico come safety net per deriva dello stato."""
async def _loop():
while True:
await asyncio.sleep(self._config.rebalance_interval_s)
await self._rebalance()
asyncio.create_task(_loop())
def get_status(self) -> dict:
return {
"station_id": self._config.station_id,
"available_power_w": self._config.available_power_w,
"connectors": [
{
"connector_id": c.connector_id,
"status": c.status.value,
"allocated_power_w": c.allocated_power_w,
"soc_percent": c.soc_percent,
"session_duration_min": round(c.session_duration_min, 1)
}
for c in self._connectors.values()
]
}
async def demo_equal_share():
async def mock_send(station_id, connector_id, power_w) -> bool:
print(f" OCPP -> {connector_id}: {power_w:.0f} W")
return True
config = StationConfig(station_id="STATION_001", max_station_power_w=44000.0)
balancer = EqualShareBalancer(config, mock_send)
for i in range(1, 5):
await balancer.register_connector(f"C0{i}", max_power_w=22000.0, min_power_w=1380.0)
print("\n[EV1 connesso]")
await balancer.on_ev_connected("C01", "TX001") # C01: 44000 W
print("\n[EV2 connesso]")
await balancer.on_ev_connected("C02", "TX002") # C01,C02: 22000 W ciascuno
print("\n[EV3 e EV4 connessi]")
await balancer.on_ev_connected("C03", "TX003")
await balancer.on_ev_connected("C04", "TX004") # tutti: 11000 W
print("\n[DSO riduce limite a 22 kW]")
await balancer.update_dynamic_limit(22000.0) # tutti: 5500 W
print("\n[EV2 si disconnette]")
await balancer.on_ev_disconnected("C02") # C01,C03,C04: 7333 W
if __name__ == "__main__":
asyncio.run(demo_equal_share())
다단계 점수 매기기를 사용한 우선순위 기반 알고리즘
Equal Share는 공정하지만 사용자 관점에서는 최적이 아닙니다. 배터리가 장착된 EV 주인이 30분 안에 떠나야 하는 5%는 훨씬 더 긴급한 필요가 있습니다. 8시간 동안 주차된 상태로 유지되는 60% EV와 비교됩니다. 우선순위 기반 밸런싱 다단계 긴급성 점수 e를 계산하여 이러한 요구에 대응합니다. 우선순위에 비례하여 권력을 분배합니다.
우선순위 점수 공식
점수는 4가지 차원과 구성 가능한 가중치를 결합합니다.
- SoC 긴급성(35%): 얼마나 많은 배터리를 방전. 중요한 사례를 증폭시키는 오목 기능(예: SoC 5% = 매우 높은 긴급성)
- 시간 압박(35%): 출발이 얼마나 가까워졌는지. 마감일이 다가올수록 빠르게 증가하는 지수 함수입니다.
- 사용자 계층(15%): 구독 수준(기본/프리미엄/우선순위). 차별화된 SLA로 비즈니스 모델을 만들어보세요.
- 에너지 효율성(15%): 실제로 힘을 흡수할 수 있는 사람에게 보상을 줍니다. 거의 가득 찬 배터리에 전력을 할당하지 마십시오.
"""
Priority-Based Load Balancer.
Distribuisce potenza proporzionalmente ai punteggi di urgenza,
con algoritmo iterativo per rispettare i vincoli min/max.
"""
import math
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import time
logger = logging.getLogger(__name__)
@dataclass
class ChargingSession:
"""Sessione con tutti i parametri per il calcolo della priorità."""
connector_id: str
transaction_id: str
soc_percent: float
target_soc_percent: float
battery_capacity_kwh: float
departure_time: float # timestamp unix
max_power_w: float
min_power_w: float
user_tier: int = 1 # 1=base, 2=premium, 3=priority
connected_at: float = field(default_factory=time.time)
@property
def energy_needed_kwh(self) -> float:
delta = max(0.0, self.target_soc_percent - self.soc_percent) / 100.0
return self.battery_capacity_kwh * delta
@property
def time_remaining_h(self) -> float:
return max(0.1, (self.departure_time - time.time()) / 3600.0)
class PriorityScoreCalculator:
"""Calcola punteggio di urgenza normalizzato [0, 100]."""
W_SOC = 0.35
W_TIME = 0.35
W_TIER = 0.15
W_EFFICIENCY = 0.15
def calculate(self, session: ChargingSession) -> float:
# 1. Urgenza SoC: concava, amplifica i SoC bassissimi
soc_ratio = session.soc_percent / 100.0
soc_score = (1.0 - soc_ratio) ** 1.5 * 100
# 2. Pressione temporale: esponenziale, cresce rapido vicino alla deadline
time_score = 100.0 * math.exp(-0.3 * session.time_remaining_h)
# 3. Tier: bonus per abbonamenti premium
tier_score = {1: 33.0, 2: 66.0, 3: 100.0}.get(session.user_tier, 33.0)
# 4. Efficienza: premia chi ha batteria da riempire
eff = min(100.0, (session.energy_needed_kwh / max(0.1, session.battery_capacity_kwh)) * 100)
return round(
self.W_SOC * soc_score +
self.W_TIME * time_score +
self.W_TIER * tier_score +
self.W_EFFICIENCY * eff,
2
)
class PriorityBalancer:
"""
Distribuisce potenza proporzionalmente ai punteggi con vincoli min/max.
Algoritmo iterativo: separa i connettori con vincoli attivi e redistribuisce
il residuo tra quelli liberi, ripetendo fino a convergenza.
"""
def __init__(self, station_id: str, max_station_power_w: float, profile_sender):
self._station_id = station_id
self._max_power = max_station_power_w
self._dynamic_limit: Optional[float] = None
self._sessions: Dict[str, ChargingSession] = {}
self._send_profile = profile_sender
self._score_calc = PriorityScoreCalculator()
self._lock = asyncio.Lock()
@property
def available_power_w(self) -> float:
if self._dynamic_limit is not None:
return min(self._max_power, self._dynamic_limit)
return self._max_power
async def add_session(self, session: ChargingSession) -> None:
async with self._lock:
self._sessions[session.connector_id] = session
await self._rebalance()
async def remove_session(self, connector_id: str) -> None:
async with self._lock:
self._sessions.pop(connector_id, None)
await self._rebalance()
async def update_soc(self, connector_id: str, soc_percent: float) -> None:
async with self._lock:
if connector_id in self._sessions:
self._sessions[connector_id].soc_percent = soc_percent
await self._rebalance()
def _compute_allocations(self) -> Dict[str, float]:
"""Algoritmo iterativo per allocazione con vincoli min/max."""
if not self._sessions:
return {}
sessions = list(self._sessions.values())
scores = {s.connector_id: self._score_calc.calculate(s) for s in sessions}
allocations: Dict[str, float] = {}
remaining_power = self.available_power_w
remaining = sessions[:]
for _ in range(10): # max 10 iterazioni per convergenza
if not remaining:
break
total_score = sum(scores[s.connector_id] for s in remaining)
if total_score == 0:
share = remaining_power / len(remaining)
for s in remaining:
allocations[s.connector_id] = max(s.min_power_w, min(s.max_power_w, share))
break
constrained, free_sessions = [], []
for s in remaining:
ratio = scores[s.connector_id] / total_score
proposed = remaining_power * ratio
if proposed <= s.min_power_w:
allocations[s.connector_id] = s.min_power_w
constrained.append(s)
elif proposed >= s.max_power_w:
allocations[s.connector_id] = s.max_power_w
constrained.append(s)
else:
free_sessions.append(s)
if not constrained:
# Nessun vincolo: allocazione finale proporzionale
for s in remaining:
ratio = scores[s.connector_id] / total_score
allocations[s.connector_id] = remaining_power * ratio
break
for s in constrained:
remaining_power -= allocations[s.connector_id]
remaining = free_sessions
return allocations
async def _rebalance(self) -> None:
async with self._lock:
allocs = self._compute_allocations()
# Stampa riepilogo per logging
for cid, power in sorted(allocs.items()):
s = self._sessions.get(cid)
score = self._score_calc.calculate(s) if s else 0
logger.info(
"Priority: %s score=%.1f -> %.0f W (SoC=%.0f%%, %.1fh)",
cid, score, power,
s.soc_percent if s else 0,
s.time_remaining_h if s else 0
)
await self._send_profile(self._station_id, cid, power)
# Demo
async def priority_demo():
async def mock_send(station_id, cid, power_w) -> bool:
print(f" SET {cid}: {power_w:.0f} W")
return True
balancer = PriorityBalancer("HUB_01", 44000.0, mock_send)
now = time.time()
sessions = [
# Urgente: 5% SoC, parte tra 45 min
ChargingSession("C1", "T1", 5.0, 80.0, 77.0, now + 45*60, 22000.0, 1380.0, user_tier=2),
# Normale: 60% SoC, parte tra 8 ore
ChargingSession("C2", "T2", 60.0, 80.0, 40.0, now + 8*3600, 11000.0, 1380.0, user_tier=1),
# Priority tier: 30% SoC, parte tra 2 ore
ChargingSession("C3", "T3", 30.0, 90.0, 100.0, now + 2*3600, 22000.0, 1380.0, user_tier=3),
]
print("\n--- Sessioni aggiunte (44 kW disponibili) ---")
for s in sessions:
await balancer.add_session(s)
print("\n--- SoC C1 sale al 40% (meno urgente) ---")
await balancer.update_soc("C1", 40.0)
if __name__ == "__main__":
asyncio.run(priority_demo())
에너지 비용을 최소화하기 위한 강화 학습
이전 알고리즘은 현재 상태에 반응합니다. 강화 학습은 그 이상입니다. 에이전트는 수백만 번의 시뮬레이션을 통해 최소화하는 최적의 정책을 학습합니다. 시간에 따른 변동 가격, 도착 예측 및 EV 출발 및 네트워크 제약. 2025년 생산 예정인 주요 통신사 충전 허브는 다양한 변형을 사용합니다. PPO(근위 정책 최적화) 에 대한 반응형 알고리즘에 비해 비용을 15~25% 절감합니다(출처: ScienceDirect, 2025).
마르코프 결정 과정으로 공식화
- 상태 공간: 커넥터별(SoC, 남은시간, 현재전력) 플러스 그리드 상태(가용 전력, 에너지 가격, 시간대, PV 예측).
- 행동 공간: 정규화된 전력 수준의 연속 벡터 [0,1] 각 커넥터마다.
- 보상: 에너지 비용을 최소화하고, 도달하지 못한 EV에 불이익을 줍니다. 마감일까지 SoC를 타겟팅하고 네트워크 제한 위반에 대해 처벌합니다.
"""
Ambiente Gymnasium per EV Charging RL.
Compatibile con Stable-Baselines3 (PPO, SAC).
Ogni step = 15 minuti. Un episodio = 24 ore (96 step).
Installazione: pip install gymnasium stable-baselines3 numpy
"""
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from dataclasses import dataclass
from typing import List, Optional, Tuple
@dataclass
class EVSession:
soc: float # [0, 1] corrente
target_soc: float # [0, 1] obiettivo
battery_kwh: float
time_remaining_h: float
max_power_kw: float
min_power_kw: float
class EVChargingEnv(gym.Env):
"""
Ambiente Gymnasium per scheduling EV charging.
Obiettivo: minimizzare costo energia rispettando deadline SoC.
"""
metadata = {"render_modes": ["human"]}
def __init__(
self,
n_connectors: int = 4,
max_station_power_kw: float = 44.0,
episode_steps: int = 96,
electricity_prices: Optional[np.ndarray] = None
):
super().__init__()
self.n = n_connectors
self.max_power = max_station_power_kw
self.episode_steps = episode_steps
self.prices = electricity_prices if electricity_prices is not None \
else self._italian_biorario_prices()
# Action: potenza normalizzata [0,1] per ogni connettore
self.action_space = spaces.Box(
low=0.0, high=1.0, shape=(n_connectors,), dtype=np.float32
)
# Observation: 5 feature per connettore + 3 globali
obs_dim = n_connectors * 5 + 3
self.observation_space = spaces.Box(
low=-1.0, high=1.0, shape=(obs_dim,), dtype=np.float32
)
self.sessions: List[Optional[EVSession]] = [None] * n_connectors
self.step_count = 0
def _italian_biorario_prices(self) -> np.ndarray:
"""Prezzi biorari italiani simulati: F1 (08-19) = 0.28, F2/F3 = 0.18 EUR/kWh."""
prices = np.zeros(96)
for i in range(96):
hour = (i * 15) // 60
prices[i] = 0.28 if 8 <= hour < 19 else 0.18
# Variabilità mercato spot
return (prices + np.random.normal(0, 0.02, 96)).clip(0.10, 0.40)
def _get_obs(self) -> np.ndarray:
obs = []
for s in self.sessions:
if s is not None:
obs.extend([
s.soc,
s.target_soc,
min(1.0, s.time_remaining_h / 12.0),
1.0,
s.max_power_kw / self.max_power
])
else:
obs.extend([0.0, 0.0, 0.0, 0.0, 0.0])
step_idx = self.step_count % len(self.prices)
hour = (self.step_count * 15 // 60) % 24
obs.extend([
self.prices[step_idx] / 0.40,
np.sin(2 * np.pi * hour / 24),
np.cos(2 * np.pi * hour / 24)
])
return np.array(obs, dtype=np.float32)
def step(self, action: np.ndarray) -> Tuple:
dt_h = 15 / 60
price = self.prices[self.step_count % len(self.prices)]
# Converti azioni in potenze reali con vincoli
actual_powers = np.zeros(self.n)
for i, s in enumerate(self.sessions):
if s is not None:
power = action[i] * s.max_power_kw
actual_powers[i] = np.clip(power, s.min_power_kw, s.max_power_kw)
# Rispetta limite stazione
total = actual_powers.sum()
if total > self.max_power:
actual_powers *= self.max_power / total
# Calcola reward
energy_kwh = actual_powers.sum() * dt_h
reward = -energy_kwh * price # costo negativo (vogliamo minimizzare)
# Aggiorna SoC e penalita deadline
for i, s in enumerate(self.sessions):
if s is None:
continue
delta_soc = (actual_powers[i] * dt_h * 0.92) / s.battery_kwh
s.soc = min(1.0, s.soc + delta_soc)
s.time_remaining_h -= dt_h
if s.time_remaining_h <= 0:
gap = max(0, s.target_soc - s.soc)
reward -= gap * 50.0 # penalita pesante per deadline mancata
# Penalita violazione rete
if total > self.max_power * 1.01:
reward -= (total - self.max_power) * 5.0
# Bonus ricarica off-peak
hour = (self.step_count * 15 // 60) % 24
if not (8 <= hour < 19) and total > 0:
reward += total * 0.015
self.step_count += 1
obs = self._get_obs()
terminated = self.step_count >= self.episode_steps
return obs, float(reward), terminated, False, {"price": price, "total_kw": total}
def reset(self, seed=None, options=None) -> Tuple:
super().reset(seed=seed)
self.step_count = 0
self._spawn_random_sessions()
return self._get_obs(), {}
def _spawn_random_sessions(self) -> None:
"""Genera sessioni EV casuali realistiche per il training."""
for i in range(self.n):
if np.random.random() > 0.3:
self.sessions[i] = EVSession(
soc=float(np.random.uniform(0.05, 0.7)),
target_soc=float(np.random.uniform(0.7, 0.95)),
battery_kwh=float(np.random.choice([40.0, 60.0, 77.0, 100.0])),
time_remaining_h=float(np.random.uniform(1.0, 10.0)),
max_power_kw=float(np.random.choice([7.4, 11.0, 22.0])),
min_power_kw=1.38
)
else:
self.sessions[i] = None
def train_ppo_agent(total_timesteps: int = 500_000):
"""
Addestra un agente PPO sull'ambiente di EV charging.
Richiede: pip install stable-baselines3
"""
try:
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
env = EVChargingEnv(n_connectors=4, max_station_power_kw=44.0)
check_env(env)
model = PPO(
"MlpPolicy", env,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
gamma=0.99,
gae_lambda=0.95,
clip_range=0.2,
verbose=1,
)
model.learn(total_timesteps=total_timesteps, progress_bar=True)
model.save("./models/ppo_ev_charging")
print("Agente PPO salvato in ./models/ppo_ev_charging")
return model
except ImportError:
print("Installa: pip install stable-baselines3")
return None
V2G(Vehicle-to-Grid): 네트워크 리소스로서의 차량
V2G는 EV 패러다임의 질적 도약을 나타냅니다. 차량은 더 이상 혼자가 아닙니다. 에너지 소비자가 되지만 양방향 리소스. 에너지 가격이 높거나 전력망에 스트레스가 가해지면 배터리는 EV는 에너지를 다시 그리드로 보내 소유자에게 돈을 벌 수 있습니다. 전력 시스템을 안정화하는 것입니다.
2025년 V2G 표준
| 기준 | 빗자루 | V2G 뉴스 | 상태 |
|---|---|---|---|
| ISO 15118-2 | EV-EVSE AC/DC 통신 | 플러그 앤 충전, 스마트 충전 | 사용 중 |
| ISO 15118-20 | 2세대 풀 V2G | 양방향 BPT, DER 통합, 동적 스케줄링 | 채택 2025-2026 |
| OCPP 2.0.1 | CP-CSMS 통신 | ISO 15118 통합, V2G 충전 프로필 | 사용 중 |
| EU AFIR 등록. | 인프라 규제 | 2026년 1월부터 새로운 시스템에 ISO 15118 의무화 | 시행중 |
유럽에서는 위트레흐트(네덜란드)가 500개 이상의 가장 큰 상용 V2G 배포를 보유하고 있습니다. 자동차 공유의 양방향 르노. 모든 자동차는 돈을 번다 600-1,500 EUR/년 네덜란드 TSO TenneT에 FCR(Frequency Containment Reserve)을 제공합니다. 이탈리아의 V2G 여전히 주로 실험적이지만 새로운 모델인 Nissan Leaf e2+, Volkswagen ID.4 Pro와 Tesla Model 3 Highland의 일부 버전은 이미 양방향성을 지원합니다.
EV 배터리를 사용한 피크 감소 및 주파수 응답
"""
V2G Controller: peak shaving e frequency response.
Gestisce la bidirezionalita delle sessioni EV compatibili V2G.
"""
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Optional
import asyncio
import logging
logger = logging.getLogger(__name__)
class ChargingDirection(str, Enum):
G2V = "G2V" # Grid-to-Vehicle (ricarica normale)
V2G = "V2G" # Vehicle-to-Grid (scarica)
IDLE = "IDLE"
@dataclass
class V2GSession:
"""Sessione V2G con capacità bidirezionale."""
connector_id: str
transaction_id: str
soc_percent: float
battery_capacity_kwh: float
max_charge_kw: float # potenza max di ricarica G2V
max_discharge_kw: float # potenza max di scarica V2G
min_soc_percent: float = 20.0 # SoC minimo garantito (non scarica sotto)
departure_soc_target: float = 80.0
@property
def available_discharge_kwh(self) -> float:
usable = max(0.0, self.soc_percent - self.min_soc_percent) / 100.0
return self.battery_capacity_kwh * usable
class V2GController:
"""Controller V2G per peak shaving e frequency response."""
FREQ_NOMINAL = 50.0
FREQ_DEADBAND = 0.05 # +/- 50 mHz: zona morta
FREQ_FULL_ACTIVATION = 0.5 # +/- 500 mHz: attivazione completa FCR
def __init__(
self,
station_id: str,
max_export_kw: float,
profile_sender
):
self._station_id = station_id
self._max_export = max_export_kw
self._send = profile_sender
self._sessions: Dict[str, V2GSession] = {}
self._lock = asyncio.Lock()
async def add_session(self, session: V2GSession) -> None:
async with self._lock:
self._sessions[session.connector_id] = session
async def peak_shaving(
self,
building_load_kw: float,
peak_threshold_kw: float
) -> dict:
"""
Quando il carico supera la soglia, usa le batterie EV per ridurre l'import.
Restituisce le azioni intraprese per ogni connettore V2G.
"""
if building_load_kw <= peak_threshold_kw:
return {"action": "none", "reason": "under_threshold"}
excess_kw = building_load_kw - peak_threshold_kw
logger.info(
"Peak shaving: %.1f kW > soglia %.1f kW (eccesso %.1f kW)",
building_load_kw, peak_threshold_kw, excess_kw
)
actions = {}
async with self._lock:
sessions = sorted(
self._sessions.values(),
key=lambda s: s.available_discharge_kwh,
reverse=True
)
remaining = excess_kw
for s in sessions:
if remaining <= 0:
break
if s.available_discharge_kwh < 1.0:
continue
discharge = min(s.max_discharge_kw, remaining)
actions[s.connector_id] = {
"direction": ChargingDirection.V2G.value,
"power_kw": discharge
}
# Potenza negativa = V2G discharge in OCPP
await self._send(self._station_id, s.connector_id, -discharge * 1000)
remaining -= discharge
logger.info("V2G: %s scarica %.1f kW (peak shaving)", s.connector_id, discharge)
return {"action": "peak_shaving", "actions": actions, "residual_excess_kw": remaining}
async def frequency_response(self, freq_hz: float) -> dict:
"""
FCR (Frequency Containment Reserve).
Risposta lineare alla deviazione di frequenza, entro 500ms.
Frequenza bassa (< 50 Hz) -> V2G discharge.
Frequenza alta (> 50 Hz) -> aumento ricarica G2V.
"""
deviation = freq_hz - self.FREQ_NOMINAL
abs_dev = abs(deviation)
if abs_dev < self.FREQ_DEADBAND:
return {"action": "none", "frequency_hz": freq_hz}
activation = min(1.0,
(abs_dev - self.FREQ_DEADBAND) / (self.FREQ_FULL_ACTIVATION - self.FREQ_DEADBAND)
)
actions = {}
async with self._lock:
sessions = list(self._sessions.values())
for s in sessions:
if deviation < 0 and s.available_discharge_kwh > 0.5:
# Frequenza bassa: scarica per supportare la rete
power_kw = s.max_discharge_kw * activation
actions[s.connector_id] = {"direction": "V2G", "power_kw": power_kw}
await self._send(self._station_id, s.connector_id, -power_kw * 1000)
elif deviation > 0 and s.soc_percent < 90:
# Frequenza alta: aumenta ricarica per assorbire eccesso
power_kw = s.max_charge_kw * activation
actions[s.connector_id] = {"direction": "G2V", "power_kw": power_kw}
await self._send(self._station_id, s.connector_id, power_kw * 1000)
return {
"frequency_hz": freq_hz,
"deviation_hz": deviation,
"activation_factor": activation,
"actions": actions
}
태양광 통합: EV를 향한 태양광 잉여분
PV 시스템과 EV 충전소 간의 통합은 다음과 같은 사용 사례 중 하나입니다. 에너지 관리의 ROI가 더욱 빨라집니다. 광전지가 무엇보다 더 많은 것을 생산할 때 건물을 소비하고 잉여분은 자동차에 공급되는 대신 EV로 전달됩니다. 네트워크(~0.07-0.10 EUR/kWh GSE로 보상) - 다음과 같은 에너지 사용을 허용합니다. 그렇지 않으면 그리드에서 0.28 EUR/kWh의 비용이 발생합니다. 순 절감액 e 0.18-0.21유로/kWh EV를 통해 자체 소비되는 각 kWh당.
| 전략 | 논리 | 혜택 | 한정 |
|---|---|---|---|
| 초과분만 | PV 잉여분으로 EV 전용 충전 | 최대 자체 소비, 네트워크 비용 없음 | 가변 전력, 느린 충전 |
| 태양광 우선 | 태양광 우선순위, 그리드에서 손상되지 않음 | 태양광 최대 충전으로 안정적인 충전 | 그리드 에너지의 일부 |
| 그린 맥시마이저 | 일기 예보를 통해 4시간 동안 최적화 | % 재생 에너지 최대화 | 정확한 예측이 필요합니다 |
| 비용 최적화 | 태양광 + 현물 가격 + V2G 결합 | 절대 최소 비용 | 높은 알고리즘 복잡성 |
"""
Solar-EV Integration Controller.
Implementa strategie Excess-Only e Solar-First
con isteresi per evitare oscillazioni frequenti.
"""
from dataclasses import dataclass
from typing import Optional
import asyncio
import logging
logger = logging.getLogger(__name__)
@dataclass
class PowerState:
"""Stato istantaneo del sistema energetico."""
pv_production_kw: float
building_load_kw: float # consumo edificio escluso EV
electricity_price_eur: float = 0.25
feed_in_tariff_eur: float = 0.07
@property
def net_surplus_kw(self) -> float:
return max(0.0, self.pv_production_kw - self.building_load_kw)
@property
def savings_per_kwh(self) -> float:
"""Risparmio per kWh autoconsumato invece di cedere+comprare."""
return self.electricity_price_eur - self.feed_in_tariff_eur
class SolarEVController:
"""
Controlla la ricarica EV in funzione del surplus fotovoltaico.
Supporta Excess-Only e Solar-First con isteresi configurabile.
"""
HYSTERESIS_KW = 0.3 # evita comandi continui per piccole oscillazioni
def __init__(
self,
balancer,
strategy: str = "solar_first",
min_ev_power_kw: float = 1.38,
max_grid_supplement_kw: float = 5.0
):
self._balancer = balancer
self._strategy = strategy
self._min_ev_power = min_ev_power_kw
self._max_grid_supp = max_grid_supplement_kw
self._last_limit_kw = 0.0
async def update(self, power_state: PowerState, n_active_evs: int) -> dict:
"""
Ricalcola e applica il limite di potenza EV.
Chiamato ogni 5-15 secondi dal loop di controllo.
"""
if n_active_evs == 0:
return {"action": "no_ev", "allocated_kw": 0}
if self._strategy == "excess_only":
target_kw = self._excess_only(power_state, n_active_evs)
else:
target_kw = self._solar_first(power_state, n_active_evs)
# Applica isteresi
if abs(target_kw - self._last_limit_kw) > self.HYSTERESIS_KW:
await self._balancer.update_dynamic_limit(target_kw * 1000)
self._last_limit_kw = target_kw
logger.info("Solar routing [%s]: %.1f kW -> EV", self._strategy, target_kw)
hourly_savings = target_kw * power_state.savings_per_kwh
return {
"strategy": self._strategy,
"pv_kw": power_state.pv_production_kw,
"surplus_kw": power_state.net_surplus_kw,
"allocated_ev_kw": target_kw,
"savings_eur_h": round(hourly_savings, 3)
}
def _excess_only(self, ps: PowerState, n_evs: int) -> float:
surplus = ps.net_surplus_kw
min_total = self._min_ev_power * n_evs
return surplus if surplus >= min_total else 0.0
def _solar_first(self, ps: PowerState, n_evs: int) -> float:
surplus = ps.net_surplus_kw
min_total = self._min_ev_power * n_evs
if surplus < min_total:
grid_needed = min_total - surplus
return min_total if grid_needed <= self._max_grid_supp else surplus
return min(surplus + self._max_grid_supp, surplus + self._max_grid_supp)
async def solar_routing_demo():
"""Simula una giornata con produzione FV tipica di aprile in Italia."""
hourly_pv = [0, 0, 0, 0, 0, 0.5, 2, 5, 9, 14, 18, 21,
22, 20, 16, 10, 6, 2, 0.5, 0, 0, 0, 0, 0]
print("Ora | PV kW | Bldg kW | Surplus | EV kW | Risparmio €/h")
print("-" * 62)
for h in range(24):
pv = hourly_pv[h]
bldg = 5.0 + (2.0 if 8 <= h < 18 else 0)
surplus = max(0, pv - bldg)
price = 0.28 if 8 <= h < 19 else 0.18
feed_in = 0.07
ev_alloc = min(surplus, 11.0)
savings = ev_alloc * (price - feed_in)
print(
f"{h:02d}:00 | {pv:5.1f} | {bldg:7.1f} | "
f"{surplus:7.1f} | {ev_alloc:5.1f} | {savings:.3f}"
)
if __name__ == "__main__":
asyncio.run(solar_routing_demo())
충전 컨트롤러 마이크로서비스 아키텍처
생산 EV 로드 밸런싱 시스템 및 분산 아키텍처 이벤트를 통해 통신하는 특수 구성 요소입니다. 주요 서비스는 다음과 같습니다.
| 서비스 | 책임 | 스택 | 루게릭병 |
|---|---|---|---|
| OCPP 게이트웨이 | WebSocket 서버, OCPP 번역 1.6/2.0.1 | Python ocpp, 비동기 | 99.9%, <100ms |
| 충전 컨트롤러 | 로드 밸런싱 알고리즘, SetChargingProfile | FastAPI, 레디스 | 99.9%, <500ms |
| 에너지 관리자 | PV/BESS/네트워크 측정, 가용성 계산 | 모드버스 TCP, MQTT, 파이썬 | 99.5%, 폴링 1초 |
| 예측가 | 도착/출발 예상, 가격, FV | LightGBM/LSTM, FastAPI | 99%, 업데이트 15분 |
| DSO 게이트웨이 | OpenADR 신호, 동적 제한 | 파이썬 오픈ADR 2.0b | 99%, 2초 미만 |
| 청구 | 측정, 청구, 세션 | FastAPI, PostgreSQL | 99.5% |
FastAPI를 사용한 충전 컨트롤러 API
"""
Charge Controller Service - FastAPI REST API.
Endpoints per load balancing, monitoring e configurazione.
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from datetime import datetime, timezone
import logging
logger = logging.getLogger(__name__)
app = FastAPI(
title="EV Charge Controller API",
version="2.1.0",
description="Load balancing real-time per stazioni di ricarica EV"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST", "PUT"],
allow_headers=["*"]
)
# -------------------------------------------------------
# Modelli Pydantic
# -------------------------------------------------------
class ConnectorInfo(BaseModel):
connector_id: str
status: str
allocated_power_w: float
soc_percent: Optional[float]
session_duration_min: float
class StationStatus(BaseModel):
station_id: str
available_power_w: float
algorithm: str
connectors: List[ConnectorInfo]
last_updated: str
class EVConnectedEvent(BaseModel):
connector_id: str
transaction_id: str
initial_soc: Optional[float] = None
battery_capacity_kwh: Optional[float] = None
departure_timestamp: Optional[int] = None
user_tier: int = Field(default=1, ge=1, le=3)
class PowerLimitUpdate(BaseModel):
limit_watts: float = Field(gt=0, le=200_000)
source: str = "manual" # "manual" | "dso" | "solar" | "openadr"
valid_until_ts: Optional[int] = None
class AlgorithmSwitch(BaseModel):
algorithm: str = Field(pattern="^(equal_share|priority|rl_ppo)$")
params: Dict = {}
# In-memory registry (in produzione: Redis)
_station_registry: Dict = {}
_algorithm_map: Dict[str, str] = {}
# -------------------------------------------------------
# Endpoints
# -------------------------------------------------------
@app.get("/health")
async def health():
return {
"status": "healthy",
"ts": datetime.now(timezone.utc).isoformat(),
"version": "2.1.0"
}
@app.get("/stations/{station_id}/status", response_model=StationStatus)
async def get_status(station_id: str):
if station_id not in _station_registry:
raise HTTPException(404, f"Stazione {station_id} non trovata")
balancer = _station_registry[station_id]
raw = balancer.get_status()
return StationStatus(
station_id=station_id,
available_power_w=raw["available_power_w"],
algorithm=_algorithm_map.get(station_id, "equal_share"),
connectors=[ConnectorInfo(**c) for c in raw["connectors"]],
last_updated=datetime.now(timezone.utc).isoformat()
)
@app.post("/stations/{station_id}/events/connected")
async def ev_connected(station_id: str, event: EVConnectedEvent, bg: BackgroundTasks):
if station_id not in _station_registry:
raise HTTPException(404, "Stazione non trovata")
bg.add_task(
_station_registry[station_id].on_ev_connected,
event.connector_id,
event.transaction_id
)
return {"status": "accepted", "rebalancing": True}
@app.delete("/stations/{station_id}/connectors/{connector_id}/session")
async def ev_disconnected(station_id: str, connector_id: str, bg: BackgroundTasks):
if station_id not in _station_registry:
raise HTTPException(404, "Stazione non trovata")
bg.add_task(_station_registry[station_id].on_ev_disconnected, connector_id)
return {"status": "accepted"}
@app.put("/stations/{station_id}/power-limit")
async def set_power_limit(station_id: str, limit: PowerLimitUpdate, bg: BackgroundTasks):
if station_id not in _station_registry:
raise HTTPException(404, "Stazione non trovata")
bg.add_task(_station_registry[station_id].update_dynamic_limit, limit.limit_watts)
logger.info("Limite potenza: %s -> %.0f W (source=%s)", station_id, limit.limit_watts, limit.source)
return {"status": "accepted", "new_limit_w": limit.limit_watts, "source": limit.source}
@app.put("/stations/{station_id}/algorithm")
async def set_algorithm(station_id: str, config: AlgorithmSwitch):
_algorithm_map[station_id] = config.algorithm
return {"status": "ok", "algorithm": config.algorithm}
@app.get("/stations/{station_id}/metrics")
async def get_metrics(station_id: str):
"""KPI della stazione (in produzione: query InfluxDB/TimescaleDB)."""
return {
"station_id": station_id,
"period": "last_24h",
"charging_satisfaction_rate": 0.94,
"peak_reduction_percent": 31.2,
"energy_cost_savings_eur": 47.80,
"solar_share_percent": 42.3,
"grid_stress_index": 0.23,
"total_energy_kwh": 201.6,
"avg_power_kw": 8.4,
"rebalance_latency_ms_p95": 180
}
로드 밸런싱 평가를 위한 지표 및 KPI
로드 밸런싱 시스템의 효율성을 평가하려면 정량화 가능한 KPI가 필요합니다. 사용자 만족도와 에너지 최적화를 모두 측정합니다.
| KPI | 정의 | 목표 | 공식 |
|---|---|---|---|
| 충전만족도(CSR) | 마감일까지 SoC 목표에 도달한 세션 비율 | >90% | ok_sessions / total_sessions |
| 피크 감소 % | 피크 감소 대 관리 없음 | >25% | (peak_unmanaged - peak_managed) / peak_unmanaged |
| 에너지 비용 절감 % | kWh 비용 절감 vs 즉시 충전 | >15% | (base_cost - 관리_비용) / base_cost |
| 태양광 자체 소비량(%) | 지역 PV의 EV 에너지 % | >40% | Solar_energy_ev / total_energy_ev |
| 그리드 스트레스 지수(GSI) | 변압기의 평균 압력 [0-1] | <0.3 | 평균(P_actual / P_nominal_trafo) |
| 재조정 대기 시간 P95 | 이벤트부터 OCPP 프로필 적용까지의 시간 | 2초 미만 | t_ocpp_conf - t_event_received (p95) |
이탈리아어 맥락: ARERA, CER(EV 및 인센티브 포함) 2025
이탈리아에서 EV 로드 밸런싱 시스템을 구현하면 규제 문제가 발생합니다. 건축 선택에 직접적인 영향을 미치는 시장.
ARERA 2시간 요금제 및 스마트 충전
이탈리아 요금 체계는 피크 외 충전에 대한 자연스러운 인센티브를 제공합니다.
- F1 밴드 (월~금 8:00~19:00): 약 0.26~0.31 EUR/kWh. EV 충전 시 피해야 할 밴드.
- 밴드 F2 (월~금 7~8, 19~23, 토요일 7~23): 약 0.20~0.24 EUR/kWh.
- 밴드 F3 (밤 11시~오전 7시, 일요일 및 공휴일 종일): 약 0.16-0.19 EUR/kWh. 야간 충전에 이상적입니다.
충전량의 60%를 F1에서 F3으로 전환하는 Time-of-Use 알고리즘은 차량 관리자의 연간 청구 비용 18-25%.
EV를 이용한 재생에너지 커뮤니티(CER)
입법령 199/2021과 2023년 12월 7일의 MASE 장관령은 구체적인 기회를 열었습니다. 이탈리아 CER의 EV 통합:
- 커뮤니티 PV 잉여금으로 충전하는 EV는 GSE 인센티브를 생성합니다. ~의 몫으로 공유에너지 (요금 환급 + 인센티브 최대 11센트/kWh).
- V2G를 통해 EV 배터리는 가상 CER 저장소 역할을 합니다. 정오부터 저녁까지의 태양 에너지(피크 시프트).
- GSE는 다음을 통해 공유되는 에너지를 측정합니다. 맥세, 포함 커뮤니티 회계에서 V2G EV의 양방향 흐름.
2025년까지 이탈리아 EV 인프라
이탈리아에는 58,000개 이상의 공공 충전소(Motus-E, 2024년 말)가 있으며, 그 중 22% 빠른 DC(2023년 14% 대비). 2025년에는 BEV가 94,230대(+46%) 등록되었으며, 시장점유율은 6.2%다. Terna에 따르면 스마트 충전이 없는 인구의 7%가 널리 보급되어 있습니다. 2027년까지 가장 밀집된 지역의 분배 공급 장치에 과부하가 걸릴 것입니다. 북부 이탈리아. PNRR은 인프라에 7억 4천만 유로 이상을 할당했습니다. 2026년까지 21,000개의 새로운 공개 포인트를 목표로 충전합니다.
인센티브에 주목하라
전환 5.0에 따른 스마트 충전에 대한 세금 공제는 다음과 같습니다. 정기적으로 검사를 받습니다. GSE 웹사이트에서 업데이트된 조건을 항상 확인하세요. 인센티브 기반 투자를 계획하기 전에 CSEA. 요금과 이용 조건은 다를 수 있습니다.
결론 및 구현 로드맵
EV 로드 밸런싱은 알고리즘의 우아함의 문제가 아니라 필수입니다. 에너지 전환을 위한 핵심 인프라입니다. 데이터는 명확합니다. 관리되지 않는 EV가 500만 대인 경우 저녁 피크는 35,000MW 증가합니다. 스마트하게 EV 차량을 충전하는 것 자체가 피크를 34% 줄이는 유연한 자원이 됩니다.
프로덕션 환경에서 이러한 시스템을 구현하는 개발 팀의 경우 로드맵 후속 단계에 최적이며:
- 1단계 - 균등 공유: 간단하고 견고하며 설치에 충분함 최대 30-40개의 트릭. 최우선 사항: 재계산을 올바르게 구현 감소 후 증가 순서로 이벤트 구동됩니다. 예상 시간: 2~3주.
-
2단계 - PV 통합: ML 없이 즉각적인 ROI를 얻을 수 있습니다. 그것은 단지
Modbus/MQTT를 통한 태양열 과잉 측정 및 밸런서와의 통합
update_dynamic_limit(). 6~18개월 내에 투자금을 회수하여 빠르게 승리할 수 있습니다. - 3단계 - 우선순위 기반: 수집할 모바일 앱이 있을 때 추가하세요. SoC 및 시작 마감일. CSR(Charging Satisfaction Rate)의 이득은 측정 가능합니다. 경쟁사와 서비스를 차별화합니다.
- 4단계 - V2G 인프라: 양방향 아키텍처 설계 이제 아직 V2G EV가 없더라도 말이죠. ISO 15118-20이 포함된 OCPP 2.0.1이 필수가 되었습니다. 2026년 신규 발전소(EU AFIR). 개조 비용은 5~10배 더 비쌉니다.
- 5단계 - 프로덕션 중인 RL: 6개월 이상 이후에만 Deep RL에 투자 고품질 데이터와 전담 MLOps 팀이 함께합니다. 비용 이득은 실제적입니다(15-25%). 그러나 운영 복잡성이 높습니다.
EnergyTech 시리즈 관련 기사
- 기사 4: 네트워크 디지털 트윈 - 배포 전에 EV가 네트워크에 미치는 영향을 시뮬레이션합니다.
- 기사 5: 재생 에너지 예측 - 태양광 라우팅을 위한 PV 생산 예측
- 기사 7: MQTT 및 InfluxDB - 충전 매개변수의 실시간 원격 측정
- 기사 2: DERMS 아키텍처 - EV를 DER로 분산 관리 시스템에 통합
시리즈 간 통찰력
- MLOps 시리즈(306-315조): MLflow, 드리프트 모니터링 및 재교육을 갖춘 PPO 모델 배포
- AI 엔지니어링 시리즈(316-325조): LLM 기업의 OCPP 및 ISO 15118 문서에 대한 RAG
- 데이터 및 AI 비즈니스 시리즈(267-280조): 스마트 충전 투자를 위한 ROI, 비즈니스 지표 및 로드맵
- PostgreSQL AI 시리즈(356-361조): PostgreSQL에서 TimescaleDB를 사용하여 데이터를 다시 로드하는 시계열







