EV Charging Load Balancing: Real-Time Algorithms for Smart Charging
It is 6:30 PM on a Tuesday evening. Thousands of commuters return home after work, park their electric vehicles and plug in the charging cable. Within minutes, power demand on the neighbourhood transformer spikes sharply upward. Without intelligent management, this scenario — which repeats every day with growing intensity as the EV fleet expands — leads to grid overloads, localised blackouts, and infrastructure upgrade costs running into the billions of euros.
In 2025 more than 17 million electric vehicles circulate in Europe, of which approximately 230,000 in Italy with 94,230 new registrations in 2025 alone (+46% compared to 2024). Projections to 2030 speak of more than 50 million EVs in Europe: each vehicle requires on average between 7 and 22 kW of power during charging. Multiplied across millions of users charging during the same evening hours, we face an unprecedented grid stability challenge.
The solution is not to build more grid: it is too expensive and too slow. The solution is intelligent load balancing: real-time algorithms that distribute available power among charging vehicles while respecting grid constraints, minimising energy costs and maximising user satisfaction. In this article we explore the full technology stack: from the OCPP 2.0.1 protocol to optimisation algorithms, from reinforcement learning to V2G integration, with working Python code and the Italian regulatory context.
What You Will Learn in This Article
- Duck curve and the impact of unmanaged EVs on the distribution grid
- Taxonomy of load balancing algorithms: Static, Dynamic, Predictive
- OCPP 2.0.1 Smart Charging: SetChargingProfile, ChargingSchedule, StackLevel
- Python implementation: Equal Share with real-time dynamic recalculation
- Priority-Based algorithm with heap queue (SoC, deadline, tariff)
- Reinforcement Learning (PPO) for energy cost optimisation
- Vehicle-to-Grid (V2G): bidirectionality, ISO 15118-20, frequency regulation
- Photovoltaic integration: solar surplus routing to EVs
- Microservices architecture: Charge Controller, Energy Manager, Forecaster
- Italian context: time-of-use tariffs, CER with EVs, ARERA regulation
EnergyTech Series - 10 Articles
| # | Article | Status |
|---|---|---|
| 1 | Smart Grid and IoT: Architecture for the Electricity Grid of the Future | Published |
| 2 | DERMS Architecture: Aggregating Millions of Distributed Resources | Published |
| 3 | Battery Management System: Control Algorithms for BESS | Published |
| 4 | Digital Twin of the Electricity Grid with Python and Pandapower | Published |
| 5 | Renewable Energy Forecasting: ML for Solar and Wind | Published |
| 6 | EV Charging Load Balancing: Real-Time Algorithms (you are here) | Current |
| 7 | MQTT and InfluxDB for Real-Time Energy Telemetry | Coming soon |
| 8 | IEC 61850: Communication in the Electrical Substation | Coming soon |
| 9 | Carbon Accounting Software: Measuring and Reducing Emissions | Coming soon |
| 10 | Blockchain for P2P Energy Trading in Renewable Energy Communities | Coming soon |
The Peak Problem: Duck Curve and Unmanaged EVs
To understand why EV load balancing is not an optional feature but a necessity, we must start from the physics of the electricity grid and a concept that grid operators fear every day: the duck curve.
The Duck Curve and Its Worsening
The duck curve describes the shape of the net electricity demand curve throughout a typical day in a system with high solar photovoltaic penetration. It takes its name from the fact that the curve resembles the profile of a duck: a low belly in the middle of the day (when solar produces abundantly and net demand is low) and a high hump in the evening (when the sun sets, PV production collapses and residential demand shoots upward).
Without EVs, the Italian grid already manages this problem every day. With the growth of unmanaged EVs, the problem is dramatically amplified. MDPI research (2025) shows that without optimisation the demand peak rises from a 22,000 MW baseline to 35,000 MW with 5 million EVs. With smart charging the same fleet adds only 2,000–3,000 MW distributed across overnight hours.
| Scenario | Peak hour | Additional load | Grid risk |
|---|---|---|---|
| Baseline (no EVs) | 19:00–20:00 | 0 MW | Manageable |
| 500,000 unmanaged EVs | 18:30–19:30 | +3,500 MW | Local transformer stress |
| 1.5M unmanaged EVs | 18:00–20:00 | +10,500 MW | Widespread overload |
| 5M unmanaged EVs | 18:00–21:00 | +35,000 MW | Systemic blackouts |
| 1.5M EVs with smart charging | Distributed 22:00–06:00 | +2,100 MW diluted | Negligible |
AI-driven optimisation of the charging profile can reduce the demand peak by 16% with 1.5 million EVs, by 21% with 3 million, and by 34% with 5 million (source: MDPI Electronics, 2025). The difference between a managed and an unmanaged scenario is not a few percentage points: it is the difference between a stable grid and a collapse.
Impact on Distribution Transformers
The problem does not only arise at the transmission level: it is above all at the level of the local distribution grid. A neighbourhood transformer rated at 400 kVA typically serves 80–120 households. If 15 of these have an EV that starts charging at 11 kW simultaneously at 6:30 PM, the additional load is 165 kW — almost 41% of the transformer's rated capacity, which may already be at 60–70% from domestic consumption. The result: overheating, reduced service life, and in the worst cases tripping of protective devices.
The Cost of Inaction
According to Terna, the cost of upgrading Italy's distribution grid to support the EV transition without smart charging is estimated at 10–15 billion euros by 2030. With widespread smart charging, the same transition requires infrastructure investments 60–70% lower, simply by shifting load to hours when the grid has available capacity.
Taxonomy of Load Balancing Algorithms
There is no single optimal load balancing algorithm: the choice depends on installation size, availability of historical data, latency requirements and the complexity the operator is willing to maintain. We classify approaches into three main families.
Static Algorithms
Static algorithms define power distribution rules upfront without dynamically adapting to the system state. They are simple to implement and debug, ideal for small and homogeneous installations.
- Round-Robin: each connector receives the same maximum power in rotation. Simple but inefficient: a nearly full EV occupies the same power as a depleted one.
- Equal Share: total available power is divided equally among all active connectors. Effective for homogeneous installations but does not consider individual needs.
- First-Come-First-Served: the first connected EVs receive maximum power; subsequent ones share the remainder. Penalises late arrivals.
Dynamic Algorithms
Dynamic algorithms adapt distribution in real time based on the current system state: number of connected EVs, state of charge (SoC), available power, and user priorities.
- Proportional: power is distributed proportionally to the energy required by each EV. A more depleted battery receives more power.
- Priority-Based: each EV has a priority score calculated from multiple factors (SoC, deadline, subscription tier). Power flows toward the highest priorities.
- Fuzzy Logic: linguistic rules such as "if SoC is low AND deadline is close THEN priority is high". Handles uncertainty in input data well.
Predictive (ML-based) Algorithms
Predictive algorithms use machine learning models to anticipate future events (arrivals, departures, energy price changes) and optimise scheduling ahead of time, minimising total costs over the long term.
- Model Predictive Control (MPC): solves an optimisation problem over a future time horizon (e.g. 4 hours) at each control interval (e.g. 15 min), applying only the first step and reformulating at the next step.
- Deep Reinforcement Learning (DRL): an agent learns the optimal policy by interacting with a simulator. PPO and SAC are the most used in production.
- Stochastic Programming: explicitly incorporates uncertainty in EV arrivals and departures, generating schedules robust to worst-case scenarios.
| Algorithm | Complexity | Latency | Optimality | Data required | Ideal use case |
|---|---|---|---|---|---|
| Equal Share | Low | <1ms | Low | None | Residential, <10 outlets |
| Priority-Based | Medium | <10ms | Medium-High | SoC, user deadline | Offices, hotels, fleets |
| MPC | High | 100ms–1s | High | Energy prices, forecasts | Hub >50 outlets, C&I |
| Deep RL (PPO) | Very high | <50ms (inference) | Very high | 6+ months history | Large hubs, integrated V2G |
OCPP 2.0.1 Smart Charging: The Protocol for Intelligent Charging
OCPP (Open Charge Point Protocol) is the universal language between charge points (CP) and the central system (CSMS — Charging Station Management System). In version 2.0.1, Smart Charging was significantly improved compared to OCPP 1.6, introducing more granular and reliable management of charging profiles.
The SetChargingProfile Mechanism
The CSMS sends the charge point a SetChargingProfileRequest message containing
a ChargingProfile that defines how the station should deliver power over time.
A ChargingProfile consists of:
-
chargingProfilePurpose:
ChargePointMaxProfile(limits the whole station),TxDefaultProfile(default for new transactions),TxProfile(specific to an active transaction). - stackLevel: integer that defines priority. When profiles overlap, the one with the highest stackLevel wins.
- chargingSchedule: list of periods (startPeriod in seconds, limit in A or W) that define the charging curve over time.
Critical Sequence: Reduce Before Increasing
When sending updated profiles to multiple stations, it is essential to send reduction commands first and increase commands afterwards. If you increase first, there is a risk of briefly exceeding the grid limit, triggering electrical protections or generating demand spikes with resulting contractual penalties. This rule is non-negotiable in production.
Python Implementation: OCPP 2.0.1 Profile Generation
"""
OCPP 2.0.1 Smart Charging Profile Generator
Generates ChargingProfiles compatible with the OCPP 2.0.1 specification.
"""
from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum
import json
from datetime import datetime, timezone
class ChargingRateUnit(str, Enum):
WATTS = "W"
AMPERES = "A"
class ChargingProfilePurpose(str, Enum):
CHARGE_POINT_MAX = "ChargePointMaxProfile"
TX_DEFAULT = "TxDefaultProfile"
TX = "TxProfile"
class ChargingProfileKind(str, Enum):
ABSOLUTE = "Absolute"
RECURRING = "Recurring"
RELATIVE = "Relative"
@dataclass
class ChargingSchedulePeriod:
"""Single interval in the charging schedule."""
start_period: int # seconds from schedule start
limit: float # limit in W or A
number_phases: Optional[int] = None
@dataclass
class ChargingSchedule:
"""Complete schedule of a charging profile."""
id: int
charging_rate_unit: ChargingRateUnit
charging_schedule_period: List[ChargingSchedulePeriod]
duration: Optional[int] = None
start_schedule: Optional[str] = None
min_charging_rate: Optional[float] = None
@dataclass
class ChargingProfile:
"""Complete OCPP 2.0.1 charging profile."""
id: int
stack_level: int
charging_profile_purpose: ChargingProfilePurpose
charging_profile_kind: ChargingProfileKind
charging_schedule: List[ChargingSchedule]
transaction_id: Optional[str] = None
valid_from: Optional[str] = None
valid_to: Optional[str] = None
def build_equal_share_profile(
profile_id: int,
power_limit_watts: float,
duration_seconds: int = 3600,
stack_level: int = 1
) -> ChargingProfile:
"""
Builds an OCPP profile for equal share distribution (fixed power).
Used by the Equal Share algorithm to apply the calculated limit.
"""
schedule_period = ChargingSchedulePeriod(
start_period=0,
limit=power_limit_watts,
number_phases=3
)
schedule = ChargingSchedule(
id=profile_id,
charging_rate_unit=ChargingRateUnit.WATTS,
charging_schedule_period=[schedule_period],
duration=duration_seconds,
start_schedule=datetime.now(timezone.utc).isoformat()
)
return ChargingProfile(
id=profile_id,
stack_level=stack_level,
charging_profile_purpose=ChargingProfilePurpose.TX,
charging_profile_kind=ChargingProfileKind.ABSOLUTE,
charging_schedule=[schedule]
)
def build_time_of_use_profile(
profile_id: int,
schedule_periods: List[tuple],
stack_level: int = 2
) -> ChargingProfile:
"""
Builds a Time-of-Use profile with variable power over time.
Example: high power at night (cheap energy), lower during the day.
Args:
schedule_periods: list of (start_period_sec, limit_watts)
"""
periods = [
ChargingSchedulePeriod(start_period=start, limit=limit)
for start, limit in schedule_periods
]
schedule = ChargingSchedule(
id=profile_id,
charging_rate_unit=ChargingRateUnit.WATTS,
charging_schedule_period=periods,
start_schedule=datetime.now(timezone.utc).isoformat()
)
return ChargingProfile(
id=profile_id,
stack_level=stack_level,
charging_profile_purpose=ChargingProfilePurpose.TX_DEFAULT,
charging_profile_kind=ChargingProfileKind.ABSOLUTE,
charging_schedule=[schedule]
)
def profile_to_ocpp_dict(profile: ChargingProfile) -> dict:
"""Serialises the profile into OCPP 2.0.1 JSON format."""
return {
"id": profile.id,
"stackLevel": profile.stack_level,
"chargingProfilePurpose": profile.charging_profile_purpose.value,
"chargingProfileKind": profile.charging_profile_kind.value,
"chargingSchedule": [
{
"id": sched.id,
"chargingRateUnit": sched.charging_rate_unit.value,
"chargingSchedulePeriod": [
{
"startPeriod": p.start_period,
"limit": p.limit,
}
for p in sched.charging_schedule_period
],
**({"duration": sched.duration} if sched.duration else {}),
**({"startSchedule": sched.start_schedule} if sched.start_schedule else {}),
}
for sched in profile.charging_schedule
],
}
if __name__ == "__main__":
# Equal Share profile: 3.3 kW for 2 hours
profile = build_equal_share_profile(
profile_id=101,
power_limit_watts=3300.0,
duration_seconds=7200
)
print(json.dumps(profile_to_ocpp_dict(profile), indent=2))
# Time-of-Use profile: 11 kW now, 3.3 kW after 4 hours
tou = build_time_of_use_profile(
profile_id=102,
schedule_periods=[
(0, 11000), # now: 11 kW (off-peak period)
(4 * 3600, 3300), # after 4 hours: 3.3 kW (F1 period starts)
],
stack_level=2
)
print(json.dumps(profile_to_ocpp_dict(tou), indent=2))
Equal Share Algorithm with Dynamic Recalculation
Equal Share is the most widespread load balancing algorithm in real installations. Its strength is not mathematical elegance but operational robustness: it is simple to explain to users, easy to debug, and sufficiently effective for most installations up to 30–40 outlets. The real complexity lies in managing dynamic recalculations: every time an EV connects or disconnects, or when available power changes due to a DSO signal, the system must recalculate and redistribute in milliseconds, respecting the critical reductions-then-increases sequence.
"""
Equal Share Load Balancer
Equal distribution with event-driven dynamic recalculation.
Thread-safe for production deployment with OCPP WebSocket server.
"""
import asyncio
import logging
from dataclasses import dataclass
from typing import Dict, Optional, Callable, Awaitable
from enum import Enum
import time
logger = logging.getLogger(__name__)
class ConnectorStatus(str, Enum):
AVAILABLE = "Available"
CHARGING = "Charging"
SUSPENDED_EV = "SuspendedEV"
FINISHING = "Finishing"
UNAVAILABLE = "Unavailable"
FAULTED = "Faulted"
@dataclass
class ConnectorState:
"""State of a single connector at the station."""
connector_id: str
status: ConnectorStatus = ConnectorStatus.AVAILABLE
transaction_id: Optional[str] = None
allocated_power_w: float = 0.0
max_power_w: float = 22000.0 # physical max of the connector (22 kW AC three-phase)
min_power_w: float = 1380.0 # minimum to maintain charging (6A x 230V)
soc_percent: Optional[float] = None
connected_at: Optional[float] = None
@property
def is_charging(self) -> bool:
return self.status == ConnectorStatus.CHARGING
@property
def session_duration_min(self) -> float:
if self.connected_at is None:
return 0.0
return (time.time() - self.connected_at) / 60.0
@dataclass
class StationConfig:
"""Charging station configuration."""
station_id: str
max_station_power_w: float
dynamic_limit_w: Optional[float] = None
rebalance_interval_s: float = 30.0
@property
def available_power_w(self) -> float:
if self.dynamic_limit_w is not None:
return min(self.max_station_power_w, self.dynamic_limit_w)
return self.max_station_power_w
ProfileSendCallback = Callable[[str, str, float], Awaitable[bool]]
class EqualShareBalancer:
"""
Thread-safe asynchronous Equal Share load balancer.
Manages equal distribution of available power
among all actively charging connectors.
"""
def __init__(
self,
config: StationConfig,
profile_sender: ProfileSendCallback
):
self._config = config
self._connectors: Dict[str, ConnectorState] = {}
self._send_profile = profile_sender
self._lock = asyncio.Lock()
self._last_allocation: Dict[str, float] = {}
async def register_connector(
self,
connector_id: str,
max_power_w: float = 22000.0,
min_power_w: float = 1380.0
) -> None:
async with self._lock:
self._connectors[connector_id] = ConnectorState(
connector_id=connector_id,
max_power_w=max_power_w,
min_power_w=min_power_w
)
logger.info("Connector %s registered (max=%.0f W)", connector_id, max_power_w)
async def on_ev_connected(self, connector_id: str, transaction_id: str) -> None:
"""Called when an EV connects. Triggers immediate recalculation."""
async with self._lock:
if connector_id not in self._connectors:
logger.warning("Unknown connector: %s", connector_id)
return
self._connectors[connector_id].status = ConnectorStatus.CHARGING
self._connectors[connector_id].transaction_id = transaction_id
self._connectors[connector_id].connected_at = time.time()
await self._rebalance()
async def on_ev_disconnected(self, connector_id: str) -> None:
"""Called when an EV disconnects. Releases power and redistributes."""
async with self._lock:
if connector_id not in self._connectors:
return
connector = self._connectors[connector_id]
connector.status = ConnectorStatus.AVAILABLE
connector.transaction_id = None
connector.allocated_power_w = 0.0
connector.connected_at = None
await self._rebalance()
async def update_dynamic_limit(self, limit_w: float) -> None:
"""Updates the DSO limit in real time. Triggers immediate recalculation."""
async with self._lock:
self._config.dynamic_limit_w = limit_w
logger.info("DSO limit updated: %.0f W (station %s)", limit_w, self._config.station_id)
await self._rebalance()
async def _rebalance(self) -> None:
"""
Calculates the new Equal Share allocation and sends OCPP profiles.
CRITICAL RULE: send reductions first, then increases.
"""
async with self._lock:
active = [c for c in self._connectors.values() if c.is_charging]
if not active:
return
available = self._config.available_power_w
n = len(active)
raw_share = available / n
new_allocations: Dict[str, float] = {}
# First pass: apply per-connector min/max limits
constrained = []
free = []
for connector in active:
if raw_share <= connector.min_power_w:
new_allocations[connector.connector_id] = connector.min_power_w
constrained.append(connector)
elif raw_share >= connector.max_power_w:
new_allocations[connector.connector_id] = connector.max_power_w
constrained.append(connector)
else:
free.append(connector)
# Second pass: redistribute residual among free connectors
if free:
allocated_constrained = sum(
new_allocations[c.connector_id] for c in constrained
)
residual = available - allocated_constrained
share_free = residual / len(free)
for connector in free:
new_allocations[connector.connector_id] = max(
connector.min_power_w,
min(connector.max_power_w, share_free)
)
# Sort: reductions first, then increases (OCPP rule)
reductions = []
increases = []
for connector_id, new_power in new_allocations.items():
old_power = self._last_allocation.get(connector_id, float('inf'))
if new_power < old_power:
reductions.append((connector_id, new_power))
else:
increases.append((connector_id, new_power))
for connector_id, power in reductions:
success = await self._send_profile(self._config.station_id, connector_id, power)
if success:
self._last_allocation[connector_id] = power
logger.info("Reduction: %s -> %.0f W", connector_id, power)
for connector_id, power in increases:
success = await self._send_profile(self._config.station_id, connector_id, power)
if success:
self._last_allocation[connector_id] = power
logger.info("Increase: %s -> %.0f W", connector_id, power)
logger.info(
"Rebalance: %d active EVs, %.0f W available, station %s",
len(active), self._config.available_power_w, self._config.station_id
)
async def start_periodic_rebalance(self) -> None:
"""Starts periodic recalculation as a safety net for state drift."""
async def _loop():
while True:
await asyncio.sleep(self._config.rebalance_interval_s)
await self._rebalance()
asyncio.create_task(_loop())
def get_status(self) -> dict:
return {
"station_id": self._config.station_id,
"available_power_w": self._config.available_power_w,
"connectors": [
{
"connector_id": c.connector_id,
"status": c.status.value,
"allocated_power_w": c.allocated_power_w,
"soc_percent": c.soc_percent,
"session_duration_min": round(c.session_duration_min, 1)
}
for c in self._connectors.values()
]
}
async def demo_equal_share():
async def mock_send(station_id, connector_id, power_w) -> bool:
print(f" OCPP -> {connector_id}: {power_w:.0f} W")
return True
config = StationConfig(station_id="STATION_001", max_station_power_w=44000.0)
balancer = EqualShareBalancer(config, mock_send)
for i in range(1, 5):
await balancer.register_connector(f"C0{i}", max_power_w=22000.0, min_power_w=1380.0)
print("\n[EV1 connected]")
await balancer.on_ev_connected("C01", "TX001") # C01: 44000 W
print("\n[EV2 connected]")
await balancer.on_ev_connected("C02", "TX002") # C01,C02: 22000 W each
print("\n[EV3 and EV4 connected]")
await balancer.on_ev_connected("C03", "TX003")
await balancer.on_ev_connected("C04", "TX004") # all: 11000 W
print("\n[DSO reduces limit to 22 kW]")
await balancer.update_dynamic_limit(22000.0) # all: 5500 W
print("\n[EV2 disconnects]")
await balancer.on_ev_disconnected("C02") # C01,C03,C04: 7333 W
if __name__ == "__main__":
asyncio.run(demo_equal_share())
Priority-Based Algorithm with Multi-Factor Scoring
Equal Share is fair but not optimal from the user perspective. An EV with a 5% battery whose owner must leave in 30 minutes has a far more urgent need than an EV at 60% that will remain parked for 8 hours. Priority-Based Balancing addresses this by calculating a multi-factor urgency score and distributing power proportionally to priorities.
Priority Score Formula
The score combines four dimensions with configurable weights:
- SoC urgency (35%): how depleted the battery is. Concave function to amplify critical cases (e.g. SoC 5% = very high urgency).
- Time pressure (35%): how close the departure is. Exponential function that grows rapidly as the deadline approaches.
- User tier (15%): subscription level (basic/premium/priority). Creates a business model with differentiated SLAs.
- Energy efficiency (15%): rewards those who can actually absorb power. Avoids allocating power to nearly full batteries.
"""
Priority-Based Load Balancer.
Distributes power proportionally to urgency scores,
with an iterative algorithm to respect min/max constraints.
"""
import math
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import time
logger = logging.getLogger(__name__)
@dataclass
class ChargingSession:
"""Session with all parameters for priority calculation."""
connector_id: str
transaction_id: str
soc_percent: float
target_soc_percent: float
battery_capacity_kwh: float
departure_time: float # unix timestamp
max_power_w: float
min_power_w: float
user_tier: int = 1 # 1=basic, 2=premium, 3=priority
connected_at: float = field(default_factory=time.time)
@property
def energy_needed_kwh(self) -> float:
delta = max(0.0, self.target_soc_percent - self.soc_percent) / 100.0
return self.battery_capacity_kwh * delta
@property
def time_remaining_h(self) -> float:
return max(0.1, (self.departure_time - time.time()) / 3600.0)
class PriorityScoreCalculator:
"""Calculates normalised urgency score [0, 100]."""
W_SOC = 0.35
W_TIME = 0.35
W_TIER = 0.15
W_EFFICIENCY = 0.15
def calculate(self, session: ChargingSession) -> float:
# 1. SoC urgency: concave, amplifies very low SoC values
soc_ratio = session.soc_percent / 100.0
soc_score = (1.0 - soc_ratio) ** 1.5 * 100
# 2. Time pressure: exponential, grows fast near deadline
time_score = 100.0 * math.exp(-0.3 * session.time_remaining_h)
# 3. Tier: bonus for premium subscriptions
tier_score = {1: 33.0, 2: 66.0, 3: 100.0}.get(session.user_tier, 33.0)
# 4. Efficiency: rewards sessions with battery to fill
eff = min(100.0, (session.energy_needed_kwh / max(0.1, session.battery_capacity_kwh)) * 100)
return round(
self.W_SOC * soc_score +
self.W_TIME * time_score +
self.W_TIER * tier_score +
self.W_EFFICIENCY * eff,
2
)
class PriorityBalancer:
"""
Distributes power proportionally to scores with min/max constraints.
Iterative algorithm: separates constrained connectors and redistributes
the residual among free ones, repeating until convergence.
"""
def __init__(self, station_id: str, max_station_power_w: float, profile_sender):
self._station_id = station_id
self._max_power = max_station_power_w
self._dynamic_limit: Optional[float] = None
self._sessions: Dict[str, ChargingSession] = {}
self._send_profile = profile_sender
self._score_calc = PriorityScoreCalculator()
self._lock = asyncio.Lock()
@property
def available_power_w(self) -> float:
if self._dynamic_limit is not None:
return min(self._max_power, self._dynamic_limit)
return self._max_power
async def add_session(self, session: ChargingSession) -> None:
async with self._lock:
self._sessions[session.connector_id] = session
await self._rebalance()
async def remove_session(self, connector_id: str) -> None:
async with self._lock:
self._sessions.pop(connector_id, None)
await self._rebalance()
async def update_soc(self, connector_id: str, soc_percent: float) -> None:
async with self._lock:
if connector_id in self._sessions:
self._sessions[connector_id].soc_percent = soc_percent
await self._rebalance()
def _compute_allocations(self) -> Dict[str, float]:
"""Iterative algorithm for allocation with min/max constraints."""
if not self._sessions:
return {}
sessions = list(self._sessions.values())
scores = {s.connector_id: self._score_calc.calculate(s) for s in sessions}
allocations: Dict[str, float] = {}
remaining_power = self.available_power_w
remaining = sessions[:]
for _ in range(10): # max 10 iterations for convergence
if not remaining:
break
total_score = sum(scores[s.connector_id] for s in remaining)
if total_score == 0:
share = remaining_power / len(remaining)
for s in remaining:
allocations[s.connector_id] = max(s.min_power_w, min(s.max_power_w, share))
break
constrained, free_sessions = [], []
for s in remaining:
ratio = scores[s.connector_id] / total_score
proposed = remaining_power * ratio
if proposed <= s.min_power_w:
allocations[s.connector_id] = s.min_power_w
constrained.append(s)
elif proposed >= s.max_power_w:
allocations[s.connector_id] = s.max_power_w
constrained.append(s)
else:
free_sessions.append(s)
if not constrained:
# No constraints: final proportional allocation
for s in remaining:
ratio = scores[s.connector_id] / total_score
allocations[s.connector_id] = remaining_power * ratio
break
for s in constrained:
remaining_power -= allocations[s.connector_id]
remaining = free_sessions
return allocations
async def _rebalance(self) -> None:
async with self._lock:
allocs = self._compute_allocations()
# Print summary for logging
for cid, power in sorted(allocs.items()):
s = self._sessions.get(cid)
score = self._score_calc.calculate(s) if s else 0
logger.info(
"Priority: %s score=%.1f -> %.0f W (SoC=%.0f%%, %.1fh)",
cid, score, power,
s.soc_percent if s else 0,
s.time_remaining_h if s else 0
)
await self._send_profile(self._station_id, cid, power)
# Demo
async def priority_demo():
async def mock_send(station_id, cid, power_w) -> bool:
print(f" SET {cid}: {power_w:.0f} W")
return True
balancer = PriorityBalancer("HUB_01", 44000.0, mock_send)
now = time.time()
sessions = [
# Urgent: 5% SoC, leaving in 45 min
ChargingSession("C1", "T1", 5.0, 80.0, 77.0, now + 45*60, 22000.0, 1380.0, user_tier=2),
# Normal: 60% SoC, leaving in 8 hours
ChargingSession("C2", "T2", 60.0, 80.0, 40.0, now + 8*3600, 11000.0, 1380.0, user_tier=1),
# Priority tier: 30% SoC, leaving in 2 hours
ChargingSession("C3", "T3", 30.0, 90.0, 100.0, now + 2*3600, 22000.0, 1380.0, user_tier=3),
]
print("\n--- Sessions added (44 kW available) ---")
for s in sessions:
await balancer.add_session(s)
print("\n--- C1 SoC rises to 40% (less urgent) ---")
await balancer.update_soc("C1", 40.0)
if __name__ == "__main__":
asyncio.run(priority_demo())
Reinforcement Learning for Energy Cost Minimisation
The previous algorithms react to the current state. Reinforcement Learning goes further: an agent learns, through millions of simulations, the optimal policy that minimises total energy cost while considering time-varying prices, EV arrival and departure forecasts, and grid constraints. In production in 2025, leading charging hub operators use variants of Proximal Policy Optimization (PPO) to reduce costs by 15–25% compared to reactive algorithms (source: ScienceDirect, 2025).
Formulation as a Markov Decision Process
- State space: per connector (SoC, remaining time, current power) plus grid state (available power, energy price, time of day, PV forecast).
- Action space: continuous vector of normalised power levels [0,1] for each connector.
- Reward: minimises energy cost, penalises EVs that do not reach target SoC by deadline, penalises grid limit violations.
"""
Gymnasium Environment for EV Charging RL.
Compatible with Stable-Baselines3 (PPO, SAC).
Each step = 15 minutes. One episode = 24 hours (96 steps).
Installation: pip install gymnasium stable-baselines3 numpy
"""
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from dataclasses import dataclass
from typing import List, Optional, Tuple
@dataclass
class EVSession:
soc: float # [0, 1] current
target_soc: float # [0, 1] target
battery_kwh: float
time_remaining_h: float
max_power_kw: float
min_power_kw: float
class EVChargingEnv(gym.Env):
"""
Gymnasium environment for EV charging scheduling.
Objective: minimise energy cost while respecting SoC deadlines.
"""
metadata = {"render_modes": ["human"]}
def __init__(
self,
n_connectors: int = 4,
max_station_power_kw: float = 44.0,
episode_steps: int = 96,
electricity_prices: Optional[np.ndarray] = None
):
super().__init__()
self.n = n_connectors
self.max_power = max_station_power_kw
self.episode_steps = episode_steps
self.prices = electricity_prices if electricity_prices is not None \
else self._italian_biorario_prices()
# Action: normalised power [0,1] for each connector
self.action_space = spaces.Box(
low=0.0, high=1.0, shape=(n_connectors,), dtype=np.float32
)
# Observation: 5 features per connector + 3 global
obs_dim = n_connectors * 5 + 3
self.observation_space = spaces.Box(
low=-1.0, high=1.0, shape=(obs_dim,), dtype=np.float32
)
self.sessions: List[Optional[EVSession]] = [None] * n_connectors
self.step_count = 0
def _italian_biorario_prices(self) -> np.ndarray:
"""Simulated Italian time-of-use prices: F1 (08-19) = 0.28, F2/F3 = 0.18 EUR/kWh."""
prices = np.zeros(96)
for i in range(96):
hour = (i * 15) // 60
prices[i] = 0.28 if 8 <= hour < 19 else 0.18
# Spot market variability
return (prices + np.random.normal(0, 0.02, 96)).clip(0.10, 0.40)
def _get_obs(self) -> np.ndarray:
obs = []
for s in self.sessions:
if s is not None:
obs.extend([
s.soc,
s.target_soc,
min(1.0, s.time_remaining_h / 12.0),
1.0,
s.max_power_kw / self.max_power
])
else:
obs.extend([0.0, 0.0, 0.0, 0.0, 0.0])
step_idx = self.step_count % len(self.prices)
hour = (self.step_count * 15 // 60) % 24
obs.extend([
self.prices[step_idx] / 0.40,
np.sin(2 * np.pi * hour / 24),
np.cos(2 * np.pi * hour / 24)
])
return np.array(obs, dtype=np.float32)
def step(self, action: np.ndarray) -> Tuple:
dt_h = 15 / 60
price = self.prices[self.step_count % len(self.prices)]
# Convert actions to real powers with constraints
actual_powers = np.zeros(self.n)
for i, s in enumerate(self.sessions):
if s is not None:
power = action[i] * s.max_power_kw
actual_powers[i] = np.clip(power, s.min_power_kw, s.max_power_kw)
# Respect station limit
total = actual_powers.sum()
if total > self.max_power:
actual_powers *= self.max_power / total
# Calculate reward
energy_kwh = actual_powers.sum() * dt_h
reward = -energy_kwh * price # negative cost (we want to minimise)
# Update SoC and deadline penalties
for i, s in enumerate(self.sessions):
if s is None:
continue
delta_soc = (actual_powers[i] * dt_h * 0.92) / s.battery_kwh
s.soc = min(1.0, s.soc + delta_soc)
s.time_remaining_h -= dt_h
if s.time_remaining_h <= 0:
gap = max(0, s.target_soc - s.soc)
reward -= gap * 50.0 # heavy penalty for missed deadline
# Grid violation penalty
if total > self.max_power * 1.01:
reward -= (total - self.max_power) * 5.0
# Off-peak charging bonus
hour = (self.step_count * 15 // 60) % 24
if not (8 <= hour < 19) and total > 0:
reward += total * 0.015
self.step_count += 1
obs = self._get_obs()
terminated = self.step_count >= self.episode_steps
return obs, float(reward), terminated, False, {"price": price, "total_kw": total}
def reset(self, seed=None, options=None) -> Tuple:
super().reset(seed=seed)
self.step_count = 0
self._spawn_random_sessions()
return self._get_obs(), {}
def _spawn_random_sessions(self) -> None:
"""Generates realistic random EV sessions for training."""
for i in range(self.n):
if np.random.random() > 0.3:
self.sessions[i] = EVSession(
soc=float(np.random.uniform(0.05, 0.7)),
target_soc=float(np.random.uniform(0.7, 0.95)),
battery_kwh=float(np.random.choice([40.0, 60.0, 77.0, 100.0])),
time_remaining_h=float(np.random.uniform(1.0, 10.0)),
max_power_kw=float(np.random.choice([7.4, 11.0, 22.0])),
min_power_kw=1.38
)
else:
self.sessions[i] = None
def train_ppo_agent(total_timesteps: int = 500_000):
"""
Trains a PPO agent on the EV charging environment.
Requires: pip install stable-baselines3
"""
try:
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
env = EVChargingEnv(n_connectors=4, max_station_power_kw=44.0)
check_env(env)
model = PPO(
"MlpPolicy", env,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
gamma=0.99,
gae_lambda=0.95,
clip_range=0.2,
verbose=1,
)
model.learn(total_timesteps=total_timesteps, progress_bar=True)
model.save("./models/ppo_ev_charging")
print("PPO agent saved to ./models/ppo_ev_charging")
return model
except ImportError:
print("Install: pip install stable-baselines3")
return None
Vehicle-to-Grid (V2G): The Vehicle as a Grid Resource
V2G represents the qualitative leap in the EV paradigm: the vehicle is no longer just an energy consumer but becomes a bidirectional resource. When electricity prices are high or the grid is under stress, EV batteries can return energy to the grid, earning money for the owner while stabilising the electrical system.
V2G Standards in 2025
| Standard | Scope | V2G features | Status |
|---|---|---|---|
| ISO 15118-2 | EV-EVSE AC/DC communication | Plug & Charge, smart charging | In use |
| ISO 15118-20 | 2nd generation full V2G | BPT bidirectional, DER integration, dynamic scheduling | Adoption 2025–2026 |
| OCPP 2.0.1 | CP-CSMS communication | ISO 15118 integration, V2G charging profiles | In use |
| EU AFIR Reg. | Infrastructure regulation | ISO 15118 mandatory for new installations from Jan 2026 | In force |
In Europe, Utrecht (Netherlands) has the largest commercial V2G deployment with over 500 bidirectional Renaults in a car-sharing scheme. Each vehicle earns 600–1,500 EUR/year by providing Frequency Containment Reserve (FCR) to Dutch TSO TenneT. V2G in Italy is still predominantly experimental, but the new Nissan Leaf e2+, Volkswagen ID.4 Pro and some versions of the Tesla Model 3 Highland already support bidirectionality.
Peak Shaving and Frequency Response with EV Batteries
"""
V2G Controller: peak shaving and frequency response.
Manages the bidirectionality of V2G-capable EV sessions.
"""
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Optional
import asyncio
import logging
logger = logging.getLogger(__name__)
class ChargingDirection(str, Enum):
G2V = "G2V" # Grid-to-Vehicle (normal charging)
V2G = "V2G" # Vehicle-to-Grid (discharging)
IDLE = "IDLE"
@dataclass
class V2GSession:
"""V2G session with bidirectional capability."""
connector_id: str
transaction_id: str
soc_percent: float
battery_capacity_kwh: float
max_charge_kw: float # max G2V charging power
max_discharge_kw: float # max V2G discharging power
min_soc_percent: float = 20.0 # guaranteed minimum SoC (will not discharge below)
departure_soc_target: float = 80.0
@property
def available_discharge_kwh(self) -> float:
usable = max(0.0, self.soc_percent - self.min_soc_percent) / 100.0
return self.battery_capacity_kwh * usable
class V2GController:
"""V2G controller for peak shaving and frequency response."""
FREQ_NOMINAL = 50.0
FREQ_DEADBAND = 0.05 # +/- 50 mHz: dead band
FREQ_FULL_ACTIVATION = 0.5 # +/- 500 mHz: full FCR activation
def __init__(
self,
station_id: str,
max_export_kw: float,
profile_sender
):
self._station_id = station_id
self._max_export = max_export_kw
self._send = profile_sender
self._sessions: Dict[str, V2GSession] = {}
self._lock = asyncio.Lock()
async def add_session(self, session: V2GSession) -> None:
async with self._lock:
self._sessions[session.connector_id] = session
async def peak_shaving(
self,
building_load_kw: float,
peak_threshold_kw: float
) -> dict:
"""
When load exceeds the threshold, use EV batteries to reduce import.
Returns the actions taken for each V2G connector.
"""
if building_load_kw <= peak_threshold_kw:
return {"action": "none", "reason": "under_threshold"}
excess_kw = building_load_kw - peak_threshold_kw
logger.info(
"Peak shaving: %.1f kW > threshold %.1f kW (excess %.1f kW)",
building_load_kw, peak_threshold_kw, excess_kw
)
actions = {}
async with self._lock:
sessions = sorted(
self._sessions.values(),
key=lambda s: s.available_discharge_kwh,
reverse=True
)
remaining = excess_kw
for s in sessions:
if remaining <= 0:
break
if s.available_discharge_kwh < 1.0:
continue
discharge = min(s.max_discharge_kw, remaining)
actions[s.connector_id] = {
"direction": ChargingDirection.V2G.value,
"power_kw": discharge
}
# Negative power = V2G discharge in OCPP
await self._send(self._station_id, s.connector_id, -discharge * 1000)
remaining -= discharge
logger.info("V2G: %s discharging %.1f kW (peak shaving)", s.connector_id, discharge)
return {"action": "peak_shaving", "actions": actions, "residual_excess_kw": remaining}
async def frequency_response(self, freq_hz: float) -> dict:
"""
FCR (Frequency Containment Reserve).
Linear response to frequency deviation, within 500ms.
Low frequency (< 50 Hz) -> V2G discharge.
High frequency (> 50 Hz) -> increased G2V charging.
"""
deviation = freq_hz - self.FREQ_NOMINAL
abs_dev = abs(deviation)
if abs_dev < self.FREQ_DEADBAND:
return {"action": "none", "frequency_hz": freq_hz}
activation = min(1.0,
(abs_dev - self.FREQ_DEADBAND) / (self.FREQ_FULL_ACTIVATION - self.FREQ_DEADBAND)
)
actions = {}
async with self._lock:
sessions = list(self._sessions.values())
for s in sessions:
if deviation < 0 and s.available_discharge_kwh > 0.5:
# Low frequency: discharge to support the grid
power_kw = s.max_discharge_kw * activation
actions[s.connector_id] = {"direction": "V2G", "power_kw": power_kw}
await self._send(self._station_id, s.connector_id, -power_kw * 1000)
elif deviation > 0 and s.soc_percent < 90:
# High frequency: increase charging to absorb excess
power_kw = s.max_charge_kw * activation
actions[s.connector_id] = {"direction": "G2V", "power_kw": power_kw}
await self._send(self._station_id, s.connector_id, power_kw * 1000)
return {
"frequency_hz": freq_hz,
"deviation_hz": deviation,
"activation_factor": activation,
"actions": actions
}
Photovoltaic Integration: Solar Surplus Routing to EVs
The integration between a PV system and an EV charging station is one of the use cases with the fastest ROI in energy management. When photovoltaic produces more than the building consumes, the surplus is routed to EVs instead of being exported to the grid (remunerated at approximately 0.07–0.10 EUR/kWh by GSE) — enabling the use of energy that would otherwise cost 0.28 EUR/kWh from the grid. The net saving is 0.18–0.21 EUR/kWh for every kWh self-consumed via EV.
| Strategy | Logic | Benefit | Limitation |
|---|---|---|---|
| Excess-Only | Charges EVs ONLY with PV surplus | Maximum self-consumption, zero grid cost | Variable power, slow charging |
| Solar-First | Solar priority, supplements from grid | Stable charging with maximum solar | Some energy from grid |
| Green-Maximizer | Optimises over 4h horizon with weather forecast | Maximises renewable energy percentage | Requires accurate forecasts |
| Cost-Optimizer | Solar + spot prices + V2G combined | Absolute minimum cost | High algorithmic complexity |
"""
Solar-EV Integration Controller.
Implements Excess-Only and Solar-First strategies
with hysteresis to avoid frequent oscillations.
"""
from dataclasses import dataclass
from typing import Optional
import asyncio
import logging
logger = logging.getLogger(__name__)
@dataclass
class PowerState:
"""Instantaneous state of the energy system."""
pv_production_kw: float
building_load_kw: float # building consumption excluding EVs
electricity_price_eur: float = 0.25
feed_in_tariff_eur: float = 0.07
@property
def net_surplus_kw(self) -> float:
return max(0.0, self.pv_production_kw - self.building_load_kw)
@property
def savings_per_kwh(self) -> float:
"""Saving per kWh self-consumed instead of exporting and buying."""
return self.electricity_price_eur - self.feed_in_tariff_eur
class SolarEVController:
"""
Controls EV charging based on photovoltaic surplus.
Supports Excess-Only and Solar-First with configurable hysteresis.
"""
HYSTERESIS_KW = 0.3 # avoids continuous commands for small oscillations
def __init__(
self,
balancer,
strategy: str = "solar_first",
min_ev_power_kw: float = 1.38,
max_grid_supplement_kw: float = 5.0
):
self._balancer = balancer
self._strategy = strategy
self._min_ev_power = min_ev_power_kw
self._max_grid_supp = max_grid_supplement_kw
self._last_limit_kw = 0.0
async def update(self, power_state: PowerState, n_active_evs: int) -> dict:
"""
Recalculates and applies the EV power limit.
Called every 5-15 seconds by the control loop.
"""
if n_active_evs == 0:
return {"action": "no_ev", "allocated_kw": 0}
if self._strategy == "excess_only":
target_kw = self._excess_only(power_state, n_active_evs)
else:
target_kw = self._solar_first(power_state, n_active_evs)
# Apply hysteresis
if abs(target_kw - self._last_limit_kw) > self.HYSTERESIS_KW:
await self._balancer.update_dynamic_limit(target_kw * 1000)
self._last_limit_kw = target_kw
logger.info("Solar routing [%s]: %.1f kW -> EV", self._strategy, target_kw)
hourly_savings = target_kw * power_state.savings_per_kwh
return {
"strategy": self._strategy,
"pv_kw": power_state.pv_production_kw,
"surplus_kw": power_state.net_surplus_kw,
"allocated_ev_kw": target_kw,
"savings_eur_h": round(hourly_savings, 3)
}
def _excess_only(self, ps: PowerState, n_evs: int) -> float:
surplus = ps.net_surplus_kw
min_total = self._min_ev_power * n_evs
return surplus if surplus >= min_total else 0.0
def _solar_first(self, ps: PowerState, n_evs: int) -> float:
surplus = ps.net_surplus_kw
min_total = self._min_ev_power * n_evs
if surplus < min_total:
grid_needed = min_total - surplus
return min_total if grid_needed <= self._max_grid_supp else surplus
return min(surplus + self._max_grid_supp, surplus + self._max_grid_supp)
async def solar_routing_demo():
"""Simulates a day with typical April PV production in Italy."""
hourly_pv = [0, 0, 0, 0, 0, 0.5, 2, 5, 9, 14, 18, 21,
22, 20, 16, 10, 6, 2, 0.5, 0, 0, 0, 0, 0]
print("Hour | PV kW | Bldg kW | Surplus | EV kW | Savings EUR/h")
print("-" * 62)
for h in range(24):
pv = hourly_pv[h]
bldg = 5.0 + (2.0 if 8 <= h < 18 else 0)
surplus = max(0, pv - bldg)
price = 0.28 if 8 <= h < 19 else 0.18
feed_in = 0.07
ev_alloc = min(surplus, 11.0)
savings = ev_alloc * (price - feed_in)
print(
f"{h:02d}:00 | {pv:5.1f} | {bldg:7.1f} | "
f"{surplus:7.1f} | {ev_alloc:5.1f} | {savings:.3f}"
)
if __name__ == "__main__":
asyncio.run(solar_routing_demo())
Charge Controller Microservices Architecture
A production EV load balancing system is a distributed architecture with specialised components communicating via events. Here are the main services:
| Service | Responsibility | Stack | SLA |
|---|---|---|---|
| OCPP Gateway | WebSocket server, OCPP 1.6/2.0.1 translation | Python ocpp, asyncio | 99.9%, <100ms |
| Charge Controller | Load balancing algorithm, SetChargingProfile | FastAPI, Redis | 99.9%, <500ms |
| Energy Manager | Measures PV/BESS/grid, calculates availability | Modbus TCP, MQTT, Python | 99.5%, 1s polling |
| Forecaster | Arrival/departure, price and PV forecasting | LightGBM/LSTM, FastAPI | 99%, 15min update |
| DSO Gateway | OpenADR signals, dynamic limits | Python OpenADR 2.0b | 99%, <2s |
| Billing | Metering, invoicing, sessions | FastAPI, PostgreSQL | 99.5% |
Charge Controller API with FastAPI
"""
Charge Controller Service - FastAPI REST API.
Endpoints for load balancing, monitoring and configuration.
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from datetime import datetime, timezone
import logging
logger = logging.getLogger(__name__)
app = FastAPI(
title="EV Charge Controller API",
version="2.1.0",
description="Real-time load balancing for EV charging stations"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST", "PUT"],
allow_headers=["*"]
)
# -------------------------------------------------------
# Pydantic Models
# -------------------------------------------------------
class ConnectorInfo(BaseModel):
connector_id: str
status: str
allocated_power_w: float
soc_percent: Optional[float]
session_duration_min: float
class StationStatus(BaseModel):
station_id: str
available_power_w: float
algorithm: str
connectors: List[ConnectorInfo]
last_updated: str
class EVConnectedEvent(BaseModel):
connector_id: str
transaction_id: str
initial_soc: Optional[float] = None
battery_capacity_kwh: Optional[float] = None
departure_timestamp: Optional[int] = None
user_tier: int = Field(default=1, ge=1, le=3)
class PowerLimitUpdate(BaseModel):
limit_watts: float = Field(gt=0, le=200_000)
source: str = "manual" # "manual" | "dso" | "solar" | "openadr"
valid_until_ts: Optional[int] = None
class AlgorithmSwitch(BaseModel):
algorithm: str = Field(pattern="^(equal_share|priority|rl_ppo)$")
params: Dict = {}
# In-memory registry (in production: Redis)
_station_registry: Dict = {}
_algorithm_map: Dict[str, str] = {}
# -------------------------------------------------------
# Endpoints
# -------------------------------------------------------
@app.get("/health")
async def health():
return {
"status": "healthy",
"ts": datetime.now(timezone.utc).isoformat(),
"version": "2.1.0"
}
@app.get("/stations/{station_id}/status", response_model=StationStatus)
async def get_status(station_id: str):
if station_id not in _station_registry:
raise HTTPException(404, f"Station {station_id} not found")
balancer = _station_registry[station_id]
raw = balancer.get_status()
return StationStatus(
station_id=station_id,
available_power_w=raw["available_power_w"],
algorithm=_algorithm_map.get(station_id, "equal_share"),
connectors=[ConnectorInfo(**c) for c in raw["connectors"]],
last_updated=datetime.now(timezone.utc).isoformat()
)
@app.post("/stations/{station_id}/events/connected")
async def ev_connected(station_id: str, event: EVConnectedEvent, bg: BackgroundTasks):
if station_id not in _station_registry:
raise HTTPException(404, "Station not found")
bg.add_task(
_station_registry[station_id].on_ev_connected,
event.connector_id,
event.transaction_id
)
return {"status": "accepted", "rebalancing": True}
@app.delete("/stations/{station_id}/connectors/{connector_id}/session")
async def ev_disconnected(station_id: str, connector_id: str, bg: BackgroundTasks):
if station_id not in _station_registry:
raise HTTPException(404, "Station not found")
bg.add_task(_station_registry[station_id].on_ev_disconnected, connector_id)
return {"status": "accepted"}
@app.put("/stations/{station_id}/power-limit")
async def set_power_limit(station_id: str, limit: PowerLimitUpdate, bg: BackgroundTasks):
if station_id not in _station_registry:
raise HTTPException(404, "Station not found")
bg.add_task(_station_registry[station_id].update_dynamic_limit, limit.limit_watts)
logger.info("Power limit: %s -> %.0f W (source=%s)", station_id, limit.limit_watts, limit.source)
return {"status": "accepted", "new_limit_w": limit.limit_watts, "source": limit.source}
@app.put("/stations/{station_id}/algorithm")
async def set_algorithm(station_id: str, config: AlgorithmSwitch):
_algorithm_map[station_id] = config.algorithm
return {"status": "ok", "algorithm": config.algorithm}
@app.get("/stations/{station_id}/metrics")
async def get_metrics(station_id: str):
"""Station KPIs (in production: InfluxDB/TimescaleDB query)."""
return {
"station_id": station_id,
"period": "last_24h",
"charging_satisfaction_rate": 0.94,
"peak_reduction_percent": 31.2,
"energy_cost_savings_eur": 47.80,
"solar_share_percent": 42.3,
"grid_stress_index": 0.23,
"total_energy_kwh": 201.6,
"avg_power_kw": 8.4,
"rebalance_latency_ms_p95": 180
}
Metrics and KPIs for Load Balancing Evaluation
To assess the effectiveness of a load balancing system, quantifiable KPIs are needed that measure both user satisfaction and energy optimisation.
| KPI | Definition | Target | Formula |
|---|---|---|---|
| Charging Satisfaction Rate (CSR) | % sessions reaching target SoC by deadline | >90% | successful_sessions / total_sessions |
| Peak Reduction % | Peak reduction vs no-management | >25% | (peak_unmanaged - peak_managed) / peak_unmanaged |
| Energy Cost Savings % | Saving on kWh cost vs immediate charging | >15% | (base_cost - managed_cost) / base_cost |
| Solar Self-Consumption % | % EV energy from local PV | >40% | solar_energy_ev / total_energy_ev |
| Grid Stress Index (GSI) | Average transformer pressure [0-1] | <0.3 | avg(P_actual / P_nominal_trafo) |
| Rebalance Latency P95 | Time from event to OCPP profile applied | <2s | t_ocpp_conf - t_event_received (p95) |
Italian Context: ARERA, CER with EVs and 2025 Incentives
Implementing EV load balancing systems in Italy involves regulatory and market specifics that directly influence architectural choices.
ARERA Time-of-Use Tariffs and Smart Charging
The Italian tariff system creates a natural incentive for off-peak charging:
- Band F1 (Mon–Fri 08:00–19:00): approximately 0.26–0.31 EUR/kWh. The band to avoid for EV charging.
- Band F2 (Mon–Fri 07:00–08:00 and 19:00–23:00; Sat 07:00–23:00): approximately 0.20–0.24 EUR/kWh.
- Band F3 (nights 23:00–07:00; Sundays and public holidays all day): approximately 0.16–0.19 EUR/kWh. Ideal for overnight charging.
A Time-of-Use algorithm that shifts 60% of charging from F1 to F3 reduces the annual charging costs for a fleet manager by 18–25%.
Renewable Energy Communities (CER) with EVs
Legislative Decree 199/2021 and the MASE Ministerial Decree of 7 December 2023 have opened concrete opportunities for EV integration in Italian renewable energy communities:
- EVs that charge using PV surplus from the community generate a GSE incentive on the shared energy share (tariff rebate plus incentive of up to 11 cents/kWh).
- With V2G, EV batteries act as virtual storage for the CER, shifting solar energy from midday to the evening (peak shift).
- The GSE measures shared energy via MACSE, including the bidirectional flows from V2G EVs in the community's accounting.
Italian EV Infrastructure as of 2025
Italy has more than 58,000 public charging points (Motus-E, end of 2024), of which 22% are fast DC (vs 14% in 2023). In 2025, 94,230 BEVs were registered (+46%), with a market share of 6.2%. According to Terna, without widespread smart charging 7% of distribution feeders will be overloaded by 2027 in the denser areas of Northern Italy. The PNRR has allocated more than 740 million euros for charging infrastructure, with a target of 21,000 new public points by 2026.
Note on Incentives
Tax credits for smart charging under Transition 5.0 are subject to periodic review. Always verify the updated conditions on the GSE and CSEA websites before planning investments based on incentives. Rates and access conditions may change.
Conclusions and Implementation Roadmap
EV load balancing is not a problem of algorithmic elegance: it is a critical infrastructure necessity for the energy transition. The data are unambiguous: with 5 million unmanaged EVs the evening peak increases by 35,000 MW; with smart charging the same fleet becomes a flexible resource that reduces the peak by 34%.
For a development team implementing these systems in production, the optimal phased roadmap is:
- Phase 1 - Equal Share: simple, robust, sufficient for installations up to 30–40 outlets. Absolute priority: correctly implement event-driven recalculation with the reductions-then-increases sequence. Estimated time: 2–3 weeks.
-
Phase 2 - PV Integration: immediate ROI without ML. Requires only
solar surplus measurement via Modbus/MQTT and integration with the balancer via
update_dynamic_limit(). Quick win with payback in 6–18 months. - Phase 3 - Priority-Based: add when you have a mobile app to collect SoC and departure deadlines. The gain in CSR (Charging Satisfaction Rate) is measurable and differentiates the service from the competition.
- Phase 4 - V2G Infrastructure: design the bidirectional architecture now, even if you do not yet have V2G EVs. OCPP 2.0.1 with ISO 15118-20 becomes mandatory for new installations from 2026 (EU AFIR). Retrofitting costs 5–10x more.
- Phase 5 - RL in Production: invest in deep RL only after 6+ months of quality data and with a dedicated MLOps team. The cost gain is real (15–25%) but operational complexity is high.
Related Articles in the EnergyTech Series
- Article 4: Digital Twin of the Grid - simulate the impact of EVs on the grid before deployment
- Article 5: Renewable Energy Forecasting - PV production forecasting for solar routing
- Article 7: MQTT and InfluxDB - real-time telemetry of charging parameters
- Article 2: DERMS Architecture - integrating EVs as DERs in the distributed management system
Cross-Series Further Reading
- MLOps Series (art. 306–315): PPO model deployment with MLflow, drift monitoring and retraining
- AI Engineering Series (art. 316–325): RAG on OCPP and ISO 15118 documentation with enterprise LLM
- Data & AI Business Series (art. 267–280): ROI, business metrics and roadmap for smart charging investments
- PostgreSQL AI Series (art. 356–361): charging data timeseries with TimescaleDB on PostgreSQL







