EV 充電負荷分散: インテリジェント充電のためのリアルタイム アルゴリズム
火曜日の夜の午後6時30分です。何千人ものイタリアのドライバーが仕事を終えて帰宅します。 彼らは電気自動車を駐車し、充電ケーブルを接続します。数分後には質問が 近所の変圧器の電力が上向きに上昇します。インテリジェントな管理がなければ、 このシナリオは、公園が成長するにつれて毎日ますます激しく繰り返されます EV - ネットワークの過負荷、局所的な停電、インフラストラクチャのアップグレードコストにつながる 数十億ユーロのオーダーで。
2025年にはさらに流通するだろう ヨーロッパでは1,700万台の電気自動車、そのうち約 イタリアでは23万人、2025年だけで9万4,230人の新規登録があった(2024年比46%増)。 2030 年までの予測では、ヨーロッパでは 5,000 万台以上の EV が使用されます。各車両には平均で 5,000 万台以上の EV が必要です 充電時の電力は 7 ~ 22 kW です。リチャージする何百万人ものユーザーを掛け合わせると 同じ夕方の時間帯に、前例のないネットワークの安定性の問題が発生しました。
解決策は、ネットワークをさらに構築することではありません。高価で時間がかかりすぎます。解決策は、 インテリジェントな負荷分散: 電力を分配するリアルタイム アルゴリズム 充電中の車両間で利用可能、ネットワークの制約を尊重し、エネルギーコストを最小限に抑える そしてユーザー満足度を最大化します。この記事では、スタック全体について説明します。 技術: OCPP 2.0.1 プロトコルから最適化アルゴリズム、強化学習まで 動作する Python コードとイタリアの規制コンテキストを使用した V2G 統合。
この記事で学べること
- ダックカーブと管理されていないEVが配電ネットワークに与える影響
- 負荷分散アルゴリズムの分類: 静的、動的、予測
- OCPP 2.0.1 スマート充電: SetChargingProfile、ChargingSchedule、StackLevel
- Python 実装: リアルタイムの動的再計算による均等共有
- ヒープキューを使用した優先順位ベースのアルゴリズム (SoC、デッドライン、レート)
- エネルギーコスト最適化のための強化学習 (PPO)
- Vehicle-to-Grid (V2G): 双方向、ISO 15118-20、周波数規制
- 太陽光発電の統合: 太陽光の余剰電力とEVへのルーティング
- マイクロサービス アーキテクチャ: チャージ コントローラー、エネルギー マネージャー、フォーキャスター
- イタリアの背景: 2 時間ごとの料金、EV 付き CER、ARERA 規制
EnergyTech シリーズ - 10 件の記事
| # | アイテム | Stato |
|---|---|---|
| 1 | スマート グリッドと IoT: 将来の電力網のためのアーキテクチャ | 発行済み |
| 2 | DERMS アーキテクチャ: 数百万の分散リソースを集約する | 発行済み |
| 3 | バッテリー管理システム: BESS の制御アルゴリズム | 発行済み |
| 4 | Python と Pandapower を使用した電力網のデジタル ツイン | 発行済み |
| 5 | 再生可能エネルギーの予測: 太陽光発電と風力発電の ML | 発行済み |
| 6 | EV 充電負荷分散: リアルタイム アルゴリズム (ここにいます) | 現在 |
| 7 | リアルタイム エネルギー テレメトリのための MQTT と InfluxDB | 近日公開 |
| 8 | IEC 61850: 変電所における通信 | 近日公開 |
| 9 | 炭素会計ソフトウェア: 排出量の測定と削減 | 近日公開 |
| 10 | CER における P2P エネルギー取引のためのブロックチェーン | 近日公開 |
ピークの問題: ダックカーブと管理されていない EV
EV 負荷分散がオプションではなく必須である理由を理解するには、次のことから始める必要があります。 電力網の物理学と、電力網管理者が日々恐れている概念から: の アヒルの曲線.
アヒルカーブとその悪化
アヒル曲線は、正味電力需要曲線の形状を表します。 太陽光発電の普及率が高いシステムにおける典型的な日中。それがそう呼ばれています なぜなら、その曲線はアヒルの横顔、つまり日中の低いお腹(昼寝しているとき)に似ているからです。 太陽光発電は豊富で、純需要は低い)、夕方には高いこぶが発生します( 日が沈み、太陽光発電の生産が崩壊し、住宅需要が急増します)。
EV がなければ、イタリアの送電網はすでにこの問題に毎日対処しています。 EVの発展に伴い 管理せずに放置すると、問題は大幅に拡大します。 MDPI 調査 (2025 年) は、 最適化により、ピーク需要はベースラインの 22,000 MW から 500 万台の EV で 35,000 MW になります。 スマート充電を使用すると、同じ車両で夜間に分配される電力は 2,000 ~ 3,000 MW しか追加されません。
| シナリオ | ピークタイム | 追加負荷 | ネットワークリスク |
|---|---|---|---|
| ベースライン (EV なし) | 午後7時~午後8時 | 0MW | 管理可能 |
| 50万台の管理されていないEV | 午後6時30分~午後7時30分 | +3,500MW | 変圧器の局所応力 |
| 150万台の管理されていないEV | 18:00~20:00 | +10,500MW | 広範囲にわたる過負荷 |
| 500万台の管理されていないEV | 午後6時~午後9時 | +35,000MW | 全身停電 |
| スマート充電による 1.5M EV | 午後10時から午前6時までにお届けします | +2,100 MW 希釈 | 無視できる |
AI を活用した充電プロファイルの最適化により、充電器のピーク需要を削減できます。 150万台のEVで16%、300 万の場合は 21%、500 万の場合は 34% 増加します。 (出典: MDPI エレクトロニクス、2025)。管理されたシナリオと管理されていないシナリオの違い それは数パーセントのポイントではなく、ネットワークが安定しているか崩壊しているかの違いです。
配電変圧器への影響
問題は伝送システム レベルだけではなく、何よりもネットワーク レベルにあります。 ローカル配布。 400 kVA の近隣変圧器は通常、80 ~ 120 の電力を供給します。 家族。このうち 15 台の EV が午後 6 時半に一斉に 11 kW で充電を開始すると、 追加負荷は 165 kW、変圧器の定格容量のほぼ 41% です。 国内消費はすでに60~70%に達している可能性がある。結果:過熱、減少 耐用年数が切れる可能性があり、最悪の場合は保護が開放される可能性があります。
何もしないことの代償
Terna 氏によると、イタリアの流通ネットワークを強化してサポートするコストは、 スマート充電なしの EV 移行は次のように推定されます。 100~150億ユーロ 2030 年までに。スマート充電が普及すると、移行自体に投資が必要になります インフラストラクチャのコストは、単に負荷を移動する時間帯に 60 ~ 70% 削減されます。 ネットワークには利用可能な容量があります。
負荷分散アルゴリズムの分類
単一の最適な負荷分散アルゴリズムはありません。選択はサイズによって異なります。 インストールの状況、履歴データの可用性、遅延要件、および マネージャーが維持したいと考えている複雑さ。アプローチを3つに分類します 主な家族。
静的アルゴリズム
静的アルゴリズムは電力分配ルールを事前に定義し、 システムの状態に動的に適応することはありません。実装は簡単です 小規模で同種のインストールに最適です。
- ラウンドロビン: 各コネクタは 1 回転ごとに同じ最大電力を受け取ります。 シンプルですが非効率です。ほぼ充電された EV は、空の EV と同じ電力を使用します。
- 均等シェア: 利用可能な総電力は次の間で均等に分割されます。 すべてのアクティブなコネクタ。同種の設置には効果的ですが、 個々のニーズ。
- 先着順: 最初に接続されたEVが最大電力を受け取ります。 後続のものは剰余を除算します。遅れて到着した人には罰を与えます。
動的アルゴリズム
動的アルゴリズムは状態に基づいてリアルタイムで分布を適応させます。 システム電流: 接続されている EV の数、充電レベル (SoC)、利用可能な電力、 ユーザーの優先順位。
- 比例: 電力はエネルギーに比例して配分される 各EVからのリクエスト。バッテリー残量が最も少ない人が、より多くの電力を受け取ります。
- 優先順位ベース: 各EVには複数のEVで計算された優先度スコアがあります。 要因 (SoC、期限、サブスクリプション)。電力はより高い優先順位に流れます。
- ファジーロジック: 「SoC と low E の期限の場合」などの言語ルール そして閉じると優先度が高くなります。」入力データの不確実性を適切に処理します。
予測アルゴリズム (ML ベース)
予測アルゴリズムは機械学習モデルを使用して将来のイベントを予測します (到着、出発、エネルギー価格の変動) を考慮し、スケジュールを最適化します。 長期的に総コストを最小限に抑えます。
- モデル予測制御 (MPC): 最適化問題を解決します 将来の時間軸(例:4時間)にわたって、各制御間隔(例:15分)で、 最初のステップのみを適用し、次のステップで再定式化します。
- 深層強化学習 (DRL): エージェントは最適なポリシーを学習します シミュレータと対話します。 PPO と SAC は実稼働環境で最もよく使用されます。
- 確率的プログラミング: ~に不確実性を明示的に組み込む EVの到着と出発により、最悪のシナリオに耐えられるスケジュールを生成します。
| アルゴリズム | 複雑 | レイテンシ | 最適性 | 要求されたデータ | 理想的な使用例 |
|---|---|---|---|---|---|
| 均等シェア | 低い | <1ms | 低い | 誰でもない | 住宅用、店舗数 10 未満 |
| 優先順位ベース | 平均 | <10ms | 中~高 | SoC、ユーザー期限 | オフィス、ホテル、車両 |
| MPC | 高い | 100ms-1s | 高い | エネルギー価格、予測 | ハブ >50 ソケット、C&I |
| ディープ RL (PPO) | 非常に高い | <50ms (推定) | 非常に高い | 履歴 6 か月以上 | 優れたハブ、内蔵 V2G |
OCPP 2.0.1 スマート充電: インテリジェント充電プロトコル
OCPP (Open Charge Point Protocol) と充電ポイント間の共通言語 (CP) および中央システム (CSMS - 充電ステーション管理システム)。バージョン 2.0.1 では、 スマート充電は OCPP 1.6 と比較して大幅に改善され、以下の機能が導入されました。 充電プロファイルのより詳細で信頼性の高い管理。
SetChargingProfile メカニズム
CSMS は充電ポイントにメッセージを送信します SetChargingProfileRequest 含む
ある ChargingProfile これは、ステーションが時間の経過とともにどのように電力を供給する必要があるかを定義します。
あ ChargingProfile で構成されています:
-
充電プロファイル目的:
ChargePointMaxProfile(すべてを制限する 駅)、TxDefaultProfile(新規トランザクションのデフォルト)、TxProfile(進行中のトランザクションに固有)。 - スタックレベル: 優先度を定義する整数。プロフィールの場合 重複している場合、スタックレベルが最も高いものが優先されます。
- 充電スケジュール: 期間のリスト (秒単位の startPeriod、 時間の経過に伴う充電曲線を定義する制限値 (A または W)。
クリティカルシーケンス: 増加の前に減少
更新されたプロファイルを複数のステーションに送信する場合は、最初に送信することが重要です 電力削減コマンド、次に増加コマンドです。先に増やしてしまうと、 短期間にネットワーク制限を超えて、 電気的保護や、結果的に契約上の罰金を伴う需要ピークの発生。 このルールは本番環境では交渉できません。
Python 実装: OCPP プロファイル生成 2.0.1
"""
OCPP 2.0.1 Smart Charging Profile Generator
Genera ChargingProfile compatibili con la specifica OCPP 2.0.1.
"""
from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum
import json
from datetime import datetime, timezone
class ChargingRateUnit(str, Enum):
WATTS = "W"
AMPERES = "A"
class ChargingProfilePurpose(str, Enum):
CHARGE_POINT_MAX = "ChargePointMaxProfile"
TX_DEFAULT = "TxDefaultProfile"
TX = "TxProfile"
class ChargingProfileKind(str, Enum):
ABSOLUTE = "Absolute"
RECURRING = "Recurring"
RELATIVE = "Relative"
@dataclass
class ChargingSchedulePeriod:
"""Singolo intervallo nella schedule di ricarica."""
start_period: int # secondi dall'inizio della schedule
limit: float # limite in W o A
number_phases: Optional[int] = None
@dataclass
class ChargingSchedule:
"""Schedule completa di un profilo di ricarica."""
id: int
charging_rate_unit: ChargingRateUnit
charging_schedule_period: List[ChargingSchedulePeriod]
duration: Optional[int] = None
start_schedule: Optional[str] = None
min_charging_rate: Optional[float] = None
@dataclass
class ChargingProfile:
"""Profilo di ricarica OCPP 2.0.1 completo."""
id: int
stack_level: int
charging_profile_purpose: ChargingProfilePurpose
charging_profile_kind: ChargingProfileKind
charging_schedule: List[ChargingSchedule]
transaction_id: Optional[str] = None
valid_from: Optional[str] = None
valid_to: Optional[str] = None
def build_equal_share_profile(
profile_id: int,
power_limit_watts: float,
duration_seconds: int = 3600,
stack_level: int = 1
) -> ChargingProfile:
"""
Costruisce un profilo OCPP per distribuzione equa (potenza fissa).
Usato dall'algoritmo Equal Share per applicare il limite calcolato.
"""
schedule_period = ChargingSchedulePeriod(
start_period=0,
limit=power_limit_watts,
number_phases=3
)
schedule = ChargingSchedule(
id=profile_id,
charging_rate_unit=ChargingRateUnit.WATTS,
charging_schedule_period=[schedule_period],
duration=duration_seconds,
start_schedule=datetime.now(timezone.utc).isoformat()
)
return ChargingProfile(
id=profile_id,
stack_level=stack_level,
charging_profile_purpose=ChargingProfilePurpose.TX,
charging_profile_kind=ChargingProfileKind.ABSOLUTE,
charging_schedule=[schedule]
)
def build_time_of_use_profile(
profile_id: int,
schedule_periods: List[tuple],
stack_level: int = 2
) -> ChargingProfile:
"""
Costruisce un profilo Time-of-Use con potenza variabile nel tempo.
Esempio: alta potenza di notte (energia economica), bassa di giorno.
Args:
schedule_periods: lista di (start_period_sec, limit_watts)
"""
periods = [
ChargingSchedulePeriod(start_period=start, limit=limit)
for start, limit in schedule_periods
]
schedule = ChargingSchedule(
id=profile_id,
charging_rate_unit=ChargingRateUnit.WATTS,
charging_schedule_period=periods,
start_schedule=datetime.now(timezone.utc).isoformat()
)
return ChargingProfile(
id=profile_id,
stack_level=stack_level,
charging_profile_purpose=ChargingProfilePurpose.TX_DEFAULT,
charging_profile_kind=ChargingProfileKind.ABSOLUTE,
charging_schedule=[schedule]
)
def profile_to_ocpp_dict(profile: ChargingProfile) -> dict:
"""Serializza il profilo nel formato JSON OCPP 2.0.1."""
return {
"id": profile.id,
"stackLevel": profile.stack_level,
"chargingProfilePurpose": profile.charging_profile_purpose.value,
"chargingProfileKind": profile.charging_profile_kind.value,
"chargingSchedule": [
{
"id": sched.id,
"chargingRateUnit": sched.charging_rate_unit.value,
"chargingSchedulePeriod": [
{
"startPeriod": p.start_period,
"limit": p.limit,
}
for p in sched.charging_schedule_period
],
**({"duration": sched.duration} if sched.duration else {}),
**({"startSchedule": sched.start_schedule} if sched.start_schedule else {}),
}
for sched in profile.charging_schedule
],
}
if __name__ == "__main__":
# Profilo Equal Share: 3.3 kW per 2 ore
profile = build_equal_share_profile(
profile_id=101,
power_limit_watts=3300.0,
duration_seconds=7200
)
print(json.dumps(profile_to_ocpp_dict(profile), indent=2))
# Profilo Time-of-Use: 11 kW da adesso, 3.3 kW dopo 4 ore
tou = build_time_of_use_profile(
profile_id=102,
schedule_periods=[
(0, 11000), # da ora: 11 kW (fascia fuori punta)
(4 * 3600, 3300), # dopo 4 ore: 3.3 kW (fascia F1 inizia)
],
stack_level=2
)
print(json.dumps(profile_to_ocpp_dict(tou), indent=2))
動的再計算による均等分配アルゴリズム
Equal Share は、実際のインストールで最も普及している負荷分散アルゴリズムです。 その強みは数学的な優雅さではなく、運用上の堅牢さにあります。シンプルです。 ユーザーに説明しやすく、デバッグが簡単で、十分に効果的です。 ほとんどの場合、最大 30 ~ 40 個のソケットが取り付けられます。本当の複雑さは次のようなものにあります 動的再計算の管理: EV が接続または切断されるたび、または DSO 信号に利用可能な電力が変化すると、システムは再計算し、 クリティカル シーケンスを考慮して、ミリ秒単位で再配布し、その後増加します。
"""
Equal Share Load Balancer
Distribuzione equa con ricalcolo dinamico event-driven.
Thread-safe per deployment in produzione con OCPP WebSocket server.
"""
import asyncio
import logging
from dataclasses import dataclass
from typing import Dict, Optional, Callable, Awaitable
from enum import Enum
import time
logger = logging.getLogger(__name__)
class ConnectorStatus(str, Enum):
AVAILABLE = "Available"
CHARGING = "Charging"
SUSPENDED_EV = "SuspendedEV"
FINISHING = "Finishing"
UNAVAILABLE = "Unavailable"
FAULTED = "Faulted"
@dataclass
class ConnectorState:
"""Stato di un singolo connettore nella stazione."""
connector_id: str
status: ConnectorStatus = ConnectorStatus.AVAILABLE
transaction_id: Optional[str] = None
allocated_power_w: float = 0.0
max_power_w: float = 22000.0 # max fisico del connettore (22 kW AC trifase)
min_power_w: float = 1380.0 # min per mantenere la carica (6A x 230V)
soc_percent: Optional[float] = None
connected_at: Optional[float] = None
@property
def is_charging(self) -> bool:
return self.status == ConnectorStatus.CHARGING
@property
def session_duration_min(self) -> float:
if self.connected_at is None:
return 0.0
return (time.time() - self.connected_at) / 60.0
@dataclass
class StationConfig:
"""Configurazione della stazione di ricarica."""
station_id: str
max_station_power_w: float
dynamic_limit_w: Optional[float] = None
rebalance_interval_s: float = 30.0
@property
def available_power_w(self) -> float:
if self.dynamic_limit_w is not None:
return min(self.max_station_power_w, self.dynamic_limit_w)
return self.max_station_power_w
ProfileSendCallback = Callable[[str, str, float], Awaitable[bool]]
class EqualShareBalancer:
"""
Load balancer Equal Share thread-safe e asincrono.
Gestisce la distribuzione equa della potenza disponibile
tra tutti i connettori attivamente in carica.
"""
def __init__(
self,
config: StationConfig,
profile_sender: ProfileSendCallback
):
self._config = config
self._connectors: Dict[str, ConnectorState] = {}
self._send_profile = profile_sender
self._lock = asyncio.Lock()
self._last_allocation: Dict[str, float] = {}
async def register_connector(
self,
connector_id: str,
max_power_w: float = 22000.0,
min_power_w: float = 1380.0
) -> None:
async with self._lock:
self._connectors[connector_id] = ConnectorState(
connector_id=connector_id,
max_power_w=max_power_w,
min_power_w=min_power_w
)
logger.info("Connettore %s registrato (max=%.0f W)", connector_id, max_power_w)
async def on_ev_connected(self, connector_id: str, transaction_id: str) -> None:
"""Chiamato quando un EV si connette. Triggera ricalcolo immediato."""
async with self._lock:
if connector_id not in self._connectors:
logger.warning("Connettore sconosciuto: %s", connector_id)
return
self._connectors[connector_id].status = ConnectorStatus.CHARGING
self._connectors[connector_id].transaction_id = transaction_id
self._connectors[connector_id].connected_at = time.time()
await self._rebalance()
async def on_ev_disconnected(self, connector_id: str) -> None:
"""Chiamato quando un EV si disconnette. Libera la potenza e redistribuisce."""
async with self._lock:
if connector_id not in self._connectors:
return
connector = self._connectors[connector_id]
connector.status = ConnectorStatus.AVAILABLE
connector.transaction_id = None
connector.allocated_power_w = 0.0
connector.connected_at = None
await self._rebalance()
async def update_dynamic_limit(self, limit_w: float) -> None:
"""Aggiorna il limite DSO in tempo reale. Triggera ricalcolo immediato."""
async with self._lock:
self._config.dynamic_limit_w = limit_w
logger.info("Limite DSO aggiornato: %.0f W (stazione %s)", limit_w, self._config.station_id)
await self._rebalance()
async def _rebalance(self) -> None:
"""
Calcola la nuova allocazione Equal Share e invia i profili OCPP.
REGOLA CRITICA: invia prima le riduzioni, poi gli aumenti.
"""
async with self._lock:
active = [c for c in self._connectors.values() if c.is_charging]
if not active:
return
available = self._config.available_power_w
n = len(active)
raw_share = available / n
new_allocations: Dict[str, float] = {}
# Prima passata: applica limiti min/max per connettore
constrained = []
free = []
for connector in active:
if raw_share <= connector.min_power_w:
new_allocations[connector.connector_id] = connector.min_power_w
constrained.append(connector)
elif raw_share >= connector.max_power_w:
new_allocations[connector.connector_id] = connector.max_power_w
constrained.append(connector)
else:
free.append(connector)
# Seconda passata: redistribuisce il residuo tra i connettori liberi
if free:
allocated_constrained = sum(
new_allocations[c.connector_id] for c in constrained
)
residual = available - allocated_constrained
share_free = residual / len(free)
for connector in free:
new_allocations[connector.connector_id] = max(
connector.min_power_w,
min(connector.max_power_w, share_free)
)
# Ordina: prima le riduzioni, poi gli aumenti (regola OCPP)
reductions = []
increases = []
for connector_id, new_power in new_allocations.items():
old_power = self._last_allocation.get(connector_id, float('inf'))
if new_power < old_power:
reductions.append((connector_id, new_power))
else:
increases.append((connector_id, new_power))
for connector_id, power in reductions:
success = await self._send_profile(self._config.station_id, connector_id, power)
if success:
self._last_allocation[connector_id] = power
logger.info("Riduzione: %s -> %.0f W", connector_id, power)
for connector_id, power in increases:
success = await self._send_profile(self._config.station_id, connector_id, power)
if success:
self._last_allocation[connector_id] = power
logger.info("Aumento: %s -> %.0f W", connector_id, power)
logger.info(
"Ribilanciamento: %d EV attivi, %.0f W disponibili, stazione %s",
len(active), self._config.available_power_w, self._config.station_id
)
async def start_periodic_rebalance(self) -> None:
"""Avvia il ricalcolo periodico come safety net per deriva dello stato."""
async def _loop():
while True:
await asyncio.sleep(self._config.rebalance_interval_s)
await self._rebalance()
asyncio.create_task(_loop())
def get_status(self) -> dict:
return {
"station_id": self._config.station_id,
"available_power_w": self._config.available_power_w,
"connectors": [
{
"connector_id": c.connector_id,
"status": c.status.value,
"allocated_power_w": c.allocated_power_w,
"soc_percent": c.soc_percent,
"session_duration_min": round(c.session_duration_min, 1)
}
for c in self._connectors.values()
]
}
async def demo_equal_share():
async def mock_send(station_id, connector_id, power_w) -> bool:
print(f" OCPP -> {connector_id}: {power_w:.0f} W")
return True
config = StationConfig(station_id="STATION_001", max_station_power_w=44000.0)
balancer = EqualShareBalancer(config, mock_send)
for i in range(1, 5):
await balancer.register_connector(f"C0{i}", max_power_w=22000.0, min_power_w=1380.0)
print("\n[EV1 connesso]")
await balancer.on_ev_connected("C01", "TX001") # C01: 44000 W
print("\n[EV2 connesso]")
await balancer.on_ev_connected("C02", "TX002") # C01,C02: 22000 W ciascuno
print("\n[EV3 e EV4 connessi]")
await balancer.on_ev_connected("C03", "TX003")
await balancer.on_ev_connected("C04", "TX004") # tutti: 11000 W
print("\n[DSO riduce limite a 22 kW]")
await balancer.update_dynamic_limit(22000.0) # tutti: 5500 W
print("\n[EV2 si disconnette]")
await balancer.on_ev_disconnected("C02") # C01,C03,C04: 7333 W
if __name__ == "__main__":
asyncio.run(demo_equal_share())
多要素スコアリングを備えた優先順位ベースのアルゴリズム
均等シェアは公平ではありますが、ユーザーの観点からは最適ではありません。バッテリーを搭載したEV 所有者が 30 分以内に出なければならない 5% は、さらに緊急の必要がある 8 時間駐車したままの 60% EV と比較して。優先順位に基づいたバランシング 多要素緊急度スコア e を計算することで、このニーズに対応します。 優先順位に比例して電力を分配します。
優先順位のスコア計算式
スコアは、4 つの次元と構成可能な重みを組み合わせたものです。
- SoC 緊急度 (35%): どれくらいの量か、バッテリーを放電してください。重大なケースを増幅する凹型機能 (例: SoC 5% = 非常に高い緊急性)。
- 時間的プレッシャー (35%): 出発がどれほど近づいているか。期限が近づくと急速に増大する指数関数。
- ユーザー層 (15%): サブスクリプション レベル (ベーシック/プレミアム/プライオリティ)。差別化された SLA を備えたビジネス モデルを作成します。
- エネルギー効率 (15%): 実際にパワーを吸収できる人に報酬を与えます。ほぼフルのバッテリーに電力を割り当てないようにしてください。
"""
Priority-Based Load Balancer.
Distribuisce potenza proporzionalmente ai punteggi di urgenza,
con algoritmo iterativo per rispettare i vincoli min/max.
"""
import math
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import time
logger = logging.getLogger(__name__)
@dataclass
class ChargingSession:
"""Sessione con tutti i parametri per il calcolo della priorità."""
connector_id: str
transaction_id: str
soc_percent: float
target_soc_percent: float
battery_capacity_kwh: float
departure_time: float # timestamp unix
max_power_w: float
min_power_w: float
user_tier: int = 1 # 1=base, 2=premium, 3=priority
connected_at: float = field(default_factory=time.time)
@property
def energy_needed_kwh(self) -> float:
delta = max(0.0, self.target_soc_percent - self.soc_percent) / 100.0
return self.battery_capacity_kwh * delta
@property
def time_remaining_h(self) -> float:
return max(0.1, (self.departure_time - time.time()) / 3600.0)
class PriorityScoreCalculator:
"""Calcola punteggio di urgenza normalizzato [0, 100]."""
W_SOC = 0.35
W_TIME = 0.35
W_TIER = 0.15
W_EFFICIENCY = 0.15
def calculate(self, session: ChargingSession) -> float:
# 1. Urgenza SoC: concava, amplifica i SoC bassissimi
soc_ratio = session.soc_percent / 100.0
soc_score = (1.0 - soc_ratio) ** 1.5 * 100
# 2. Pressione temporale: esponenziale, cresce rapido vicino alla deadline
time_score = 100.0 * math.exp(-0.3 * session.time_remaining_h)
# 3. Tier: bonus per abbonamenti premium
tier_score = {1: 33.0, 2: 66.0, 3: 100.0}.get(session.user_tier, 33.0)
# 4. Efficienza: premia chi ha batteria da riempire
eff = min(100.0, (session.energy_needed_kwh / max(0.1, session.battery_capacity_kwh)) * 100)
return round(
self.W_SOC * soc_score +
self.W_TIME * time_score +
self.W_TIER * tier_score +
self.W_EFFICIENCY * eff,
2
)
class PriorityBalancer:
"""
Distribuisce potenza proporzionalmente ai punteggi con vincoli min/max.
Algoritmo iterativo: separa i connettori con vincoli attivi e redistribuisce
il residuo tra quelli liberi, ripetendo fino a convergenza.
"""
def __init__(self, station_id: str, max_station_power_w: float, profile_sender):
self._station_id = station_id
self._max_power = max_station_power_w
self._dynamic_limit: Optional[float] = None
self._sessions: Dict[str, ChargingSession] = {}
self._send_profile = profile_sender
self._score_calc = PriorityScoreCalculator()
self._lock = asyncio.Lock()
@property
def available_power_w(self) -> float:
if self._dynamic_limit is not None:
return min(self._max_power, self._dynamic_limit)
return self._max_power
async def add_session(self, session: ChargingSession) -> None:
async with self._lock:
self._sessions[session.connector_id] = session
await self._rebalance()
async def remove_session(self, connector_id: str) -> None:
async with self._lock:
self._sessions.pop(connector_id, None)
await self._rebalance()
async def update_soc(self, connector_id: str, soc_percent: float) -> None:
async with self._lock:
if connector_id in self._sessions:
self._sessions[connector_id].soc_percent = soc_percent
await self._rebalance()
def _compute_allocations(self) -> Dict[str, float]:
"""Algoritmo iterativo per allocazione con vincoli min/max."""
if not self._sessions:
return {}
sessions = list(self._sessions.values())
scores = {s.connector_id: self._score_calc.calculate(s) for s in sessions}
allocations: Dict[str, float] = {}
remaining_power = self.available_power_w
remaining = sessions[:]
for _ in range(10): # max 10 iterazioni per convergenza
if not remaining:
break
total_score = sum(scores[s.connector_id] for s in remaining)
if total_score == 0:
share = remaining_power / len(remaining)
for s in remaining:
allocations[s.connector_id] = max(s.min_power_w, min(s.max_power_w, share))
break
constrained, free_sessions = [], []
for s in remaining:
ratio = scores[s.connector_id] / total_score
proposed = remaining_power * ratio
if proposed <= s.min_power_w:
allocations[s.connector_id] = s.min_power_w
constrained.append(s)
elif proposed >= s.max_power_w:
allocations[s.connector_id] = s.max_power_w
constrained.append(s)
else:
free_sessions.append(s)
if not constrained:
# Nessun vincolo: allocazione finale proporzionale
for s in remaining:
ratio = scores[s.connector_id] / total_score
allocations[s.connector_id] = remaining_power * ratio
break
for s in constrained:
remaining_power -= allocations[s.connector_id]
remaining = free_sessions
return allocations
async def _rebalance(self) -> None:
async with self._lock:
allocs = self._compute_allocations()
# Stampa riepilogo per logging
for cid, power in sorted(allocs.items()):
s = self._sessions.get(cid)
score = self._score_calc.calculate(s) if s else 0
logger.info(
"Priority: %s score=%.1f -> %.0f W (SoC=%.0f%%, %.1fh)",
cid, score, power,
s.soc_percent if s else 0,
s.time_remaining_h if s else 0
)
await self._send_profile(self._station_id, cid, power)
# Demo
async def priority_demo():
async def mock_send(station_id, cid, power_w) -> bool:
print(f" SET {cid}: {power_w:.0f} W")
return True
balancer = PriorityBalancer("HUB_01", 44000.0, mock_send)
now = time.time()
sessions = [
# Urgente: 5% SoC, parte tra 45 min
ChargingSession("C1", "T1", 5.0, 80.0, 77.0, now + 45*60, 22000.0, 1380.0, user_tier=2),
# Normale: 60% SoC, parte tra 8 ore
ChargingSession("C2", "T2", 60.0, 80.0, 40.0, now + 8*3600, 11000.0, 1380.0, user_tier=1),
# Priority tier: 30% SoC, parte tra 2 ore
ChargingSession("C3", "T3", 30.0, 90.0, 100.0, now + 2*3600, 22000.0, 1380.0, user_tier=3),
]
print("\n--- Sessioni aggiunte (44 kW disponibili) ---")
for s in sessions:
await balancer.add_session(s)
print("\n--- SoC C1 sale al 40% (meno urgente) ---")
await balancer.update_soc("C1", 40.0)
if __name__ == "__main__":
asyncio.run(priority_demo())
エネルギーコストを最小限に抑えるための強化学習
以前のアルゴリズムは現在の状態に反応します。強化学習は次のものを超えます。 エージェントは何百万ものシミュレーションを通じて、最小化する最適なポリシーを学習します。 時間の経過に伴う変動価格、到着予測、および EVの出発、ネットワークの制約。 2025 年の生産では、主要な通信事業者は、 充電ハブは次のバリエーションを使用します。 近接ポリシーの最適化 (PPO) のために リアクティブアルゴリズムと比較してコストを 15 ~ 25% 削減します (出典: ScienceDirect、2025)。
マルコフ決定プロセスとしての定式化
- 状態空間: コネクタごと(SoC、残り時間、現在の電力) 加えて、グリッドのステータス (利用可能な電力、エネルギー価格、時刻、PV 予測)。
- アクションスペース: 正規化された電力レベルの連続ベクトル [0,1] コネクタごとに。
- 報酬: エネルギーコストを最小限に抑え、基準に達しないEVにはペナルティを課します。 期限までにターゲット SoC を設定し、ネットワーク制限の違反にペナルティを課します。
"""
Ambiente Gymnasium per EV Charging RL.
Compatibile con Stable-Baselines3 (PPO, SAC).
Ogni step = 15 minuti. Un episodio = 24 ore (96 step).
Installazione: pip install gymnasium stable-baselines3 numpy
"""
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from dataclasses import dataclass
from typing import List, Optional, Tuple
@dataclass
class EVSession:
soc: float # [0, 1] corrente
target_soc: float # [0, 1] obiettivo
battery_kwh: float
time_remaining_h: float
max_power_kw: float
min_power_kw: float
class EVChargingEnv(gym.Env):
"""
Ambiente Gymnasium per scheduling EV charging.
Obiettivo: minimizzare costo energia rispettando deadline SoC.
"""
metadata = {"render_modes": ["human"]}
def __init__(
self,
n_connectors: int = 4,
max_station_power_kw: float = 44.0,
episode_steps: int = 96,
electricity_prices: Optional[np.ndarray] = None
):
super().__init__()
self.n = n_connectors
self.max_power = max_station_power_kw
self.episode_steps = episode_steps
self.prices = electricity_prices if electricity_prices is not None \
else self._italian_biorario_prices()
# Action: potenza normalizzata [0,1] per ogni connettore
self.action_space = spaces.Box(
low=0.0, high=1.0, shape=(n_connectors,), dtype=np.float32
)
# Observation: 5 feature per connettore + 3 globali
obs_dim = n_connectors * 5 + 3
self.observation_space = spaces.Box(
low=-1.0, high=1.0, shape=(obs_dim,), dtype=np.float32
)
self.sessions: List[Optional[EVSession]] = [None] * n_connectors
self.step_count = 0
def _italian_biorario_prices(self) -> np.ndarray:
"""Prezzi biorari italiani simulati: F1 (08-19) = 0.28, F2/F3 = 0.18 EUR/kWh."""
prices = np.zeros(96)
for i in range(96):
hour = (i * 15) // 60
prices[i] = 0.28 if 8 <= hour < 19 else 0.18
# Variabilità mercato spot
return (prices + np.random.normal(0, 0.02, 96)).clip(0.10, 0.40)
def _get_obs(self) -> np.ndarray:
obs = []
for s in self.sessions:
if s is not None:
obs.extend([
s.soc,
s.target_soc,
min(1.0, s.time_remaining_h / 12.0),
1.0,
s.max_power_kw / self.max_power
])
else:
obs.extend([0.0, 0.0, 0.0, 0.0, 0.0])
step_idx = self.step_count % len(self.prices)
hour = (self.step_count * 15 // 60) % 24
obs.extend([
self.prices[step_idx] / 0.40,
np.sin(2 * np.pi * hour / 24),
np.cos(2 * np.pi * hour / 24)
])
return np.array(obs, dtype=np.float32)
def step(self, action: np.ndarray) -> Tuple:
dt_h = 15 / 60
price = self.prices[self.step_count % len(self.prices)]
# Converti azioni in potenze reali con vincoli
actual_powers = np.zeros(self.n)
for i, s in enumerate(self.sessions):
if s is not None:
power = action[i] * s.max_power_kw
actual_powers[i] = np.clip(power, s.min_power_kw, s.max_power_kw)
# Rispetta limite stazione
total = actual_powers.sum()
if total > self.max_power:
actual_powers *= self.max_power / total
# Calcola reward
energy_kwh = actual_powers.sum() * dt_h
reward = -energy_kwh * price # costo negativo (vogliamo minimizzare)
# Aggiorna SoC e penalita deadline
for i, s in enumerate(self.sessions):
if s is None:
continue
delta_soc = (actual_powers[i] * dt_h * 0.92) / s.battery_kwh
s.soc = min(1.0, s.soc + delta_soc)
s.time_remaining_h -= dt_h
if s.time_remaining_h <= 0:
gap = max(0, s.target_soc - s.soc)
reward -= gap * 50.0 # penalita pesante per deadline mancata
# Penalita violazione rete
if total > self.max_power * 1.01:
reward -= (total - self.max_power) * 5.0
# Bonus ricarica off-peak
hour = (self.step_count * 15 // 60) % 24
if not (8 <= hour < 19) and total > 0:
reward += total * 0.015
self.step_count += 1
obs = self._get_obs()
terminated = self.step_count >= self.episode_steps
return obs, float(reward), terminated, False, {"price": price, "total_kw": total}
def reset(self, seed=None, options=None) -> Tuple:
super().reset(seed=seed)
self.step_count = 0
self._spawn_random_sessions()
return self._get_obs(), {}
def _spawn_random_sessions(self) -> None:
"""Genera sessioni EV casuali realistiche per il training."""
for i in range(self.n):
if np.random.random() > 0.3:
self.sessions[i] = EVSession(
soc=float(np.random.uniform(0.05, 0.7)),
target_soc=float(np.random.uniform(0.7, 0.95)),
battery_kwh=float(np.random.choice([40.0, 60.0, 77.0, 100.0])),
time_remaining_h=float(np.random.uniform(1.0, 10.0)),
max_power_kw=float(np.random.choice([7.4, 11.0, 22.0])),
min_power_kw=1.38
)
else:
self.sessions[i] = None
def train_ppo_agent(total_timesteps: int = 500_000):
"""
Addestra un agente PPO sull'ambiente di EV charging.
Richiede: pip install stable-baselines3
"""
try:
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
env = EVChargingEnv(n_connectors=4, max_station_power_kw=44.0)
check_env(env)
model = PPO(
"MlpPolicy", env,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
gamma=0.99,
gae_lambda=0.95,
clip_range=0.2,
verbose=1,
)
model.learn(total_timesteps=total_timesteps, progress_bar=True)
model.save("./models/ppo_ev_charging")
print("Agente PPO salvato in ./models/ppo_ev_charging")
return model
except ImportError:
print("Installa: pip install stable-baselines3")
return None
Vehicle-to-Grid (V2G): ネットワーク リソースとしての車両
V2G は EV パラダイムの質的飛躍を表しており、車両はもはや単独ではありません エネルギーの消費者ですが、 双方向リソース。 エネルギー価格が高騰したり、送電網にストレスがかかると、 EV はエネルギーを送電網に送り返し、所有者に収益をもたらすことができます そして電力システムの安定化。
2025 年の V2G 標準
| 標準 | ほうき | V2G ニュース | 状態 |
|---|---|---|---|
| ISO 15118-2 | EV-EVSE AC/DC通信 | プラグアンドチャージ、スマート充電 | 使用中 |
| ISO 15118-20 | 第2世代フルV2G | 双方向BPT、DER統合、動的スケジューリング | 導入 2025 ~ 2026 年 |
| OCPP 2.0.1 | CP-CSMS通信 | ISO 15118 統合、V2G 充電プロファイル | 使用中 |
| EU AFIR Reg. | インフラ規制 | 2026 年 1 月から新しいシステムに ISO 15118 が必須 | 有効 |
ヨーロッパでは、ユトレヒト (オランダ) が最大の商用 V2G 導入を行っており、500 以上の V2G が導入されています。 カーシェアリングにおける双方向のルノー。どの車両も儲かります 600~1,500ユーロ/年 オランダの TSO TenneT に周波数封じ込め予備 (FCR) を提供します。イタリアの V2G まだ主に実験段階ですが、新しいモデルの日産リーフ e2+、フォルクスワーゲン ID.4 Pro および Tesla Model 3 Highland の一部のバージョンはすでに双方向性をサポートしています。
EV バッテリーのピークカットと周波数応答
"""
V2G Controller: peak shaving e frequency response.
Gestisce la bidirezionalita delle sessioni EV compatibili V2G.
"""
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Optional
import asyncio
import logging
logger = logging.getLogger(__name__)
class ChargingDirection(str, Enum):
G2V = "G2V" # Grid-to-Vehicle (ricarica normale)
V2G = "V2G" # Vehicle-to-Grid (scarica)
IDLE = "IDLE"
@dataclass
class V2GSession:
"""Sessione V2G con capacità bidirezionale."""
connector_id: str
transaction_id: str
soc_percent: float
battery_capacity_kwh: float
max_charge_kw: float # potenza max di ricarica G2V
max_discharge_kw: float # potenza max di scarica V2G
min_soc_percent: float = 20.0 # SoC minimo garantito (non scarica sotto)
departure_soc_target: float = 80.0
@property
def available_discharge_kwh(self) -> float:
usable = max(0.0, self.soc_percent - self.min_soc_percent) / 100.0
return self.battery_capacity_kwh * usable
class V2GController:
"""Controller V2G per peak shaving e frequency response."""
FREQ_NOMINAL = 50.0
FREQ_DEADBAND = 0.05 # +/- 50 mHz: zona morta
FREQ_FULL_ACTIVATION = 0.5 # +/- 500 mHz: attivazione completa FCR
def __init__(
self,
station_id: str,
max_export_kw: float,
profile_sender
):
self._station_id = station_id
self._max_export = max_export_kw
self._send = profile_sender
self._sessions: Dict[str, V2GSession] = {}
self._lock = asyncio.Lock()
async def add_session(self, session: V2GSession) -> None:
async with self._lock:
self._sessions[session.connector_id] = session
async def peak_shaving(
self,
building_load_kw: float,
peak_threshold_kw: float
) -> dict:
"""
Quando il carico supera la soglia, usa le batterie EV per ridurre l'import.
Restituisce le azioni intraprese per ogni connettore V2G.
"""
if building_load_kw <= peak_threshold_kw:
return {"action": "none", "reason": "under_threshold"}
excess_kw = building_load_kw - peak_threshold_kw
logger.info(
"Peak shaving: %.1f kW > soglia %.1f kW (eccesso %.1f kW)",
building_load_kw, peak_threshold_kw, excess_kw
)
actions = {}
async with self._lock:
sessions = sorted(
self._sessions.values(),
key=lambda s: s.available_discharge_kwh,
reverse=True
)
remaining = excess_kw
for s in sessions:
if remaining <= 0:
break
if s.available_discharge_kwh < 1.0:
continue
discharge = min(s.max_discharge_kw, remaining)
actions[s.connector_id] = {
"direction": ChargingDirection.V2G.value,
"power_kw": discharge
}
# Potenza negativa = V2G discharge in OCPP
await self._send(self._station_id, s.connector_id, -discharge * 1000)
remaining -= discharge
logger.info("V2G: %s scarica %.1f kW (peak shaving)", s.connector_id, discharge)
return {"action": "peak_shaving", "actions": actions, "residual_excess_kw": remaining}
async def frequency_response(self, freq_hz: float) -> dict:
"""
FCR (Frequency Containment Reserve).
Risposta lineare alla deviazione di frequenza, entro 500ms.
Frequenza bassa (< 50 Hz) -> V2G discharge.
Frequenza alta (> 50 Hz) -> aumento ricarica G2V.
"""
deviation = freq_hz - self.FREQ_NOMINAL
abs_dev = abs(deviation)
if abs_dev < self.FREQ_DEADBAND:
return {"action": "none", "frequency_hz": freq_hz}
activation = min(1.0,
(abs_dev - self.FREQ_DEADBAND) / (self.FREQ_FULL_ACTIVATION - self.FREQ_DEADBAND)
)
actions = {}
async with self._lock:
sessions = list(self._sessions.values())
for s in sessions:
if deviation < 0 and s.available_discharge_kwh > 0.5:
# Frequenza bassa: scarica per supportare la rete
power_kw = s.max_discharge_kw * activation
actions[s.connector_id] = {"direction": "V2G", "power_kw": power_kw}
await self._send(self._station_id, s.connector_id, -power_kw * 1000)
elif deviation > 0 and s.soc_percent < 90:
# Frequenza alta: aumenta ricarica per assorbire eccesso
power_kw = s.max_charge_kw * activation
actions[s.connector_id] = {"direction": "G2V", "power_kw": power_kw}
await self._send(self._station_id, s.connector_id, power_kw * 1000)
return {
"frequency_hz": freq_hz,
"deviation_hz": deviation,
"activation_factor": activation,
"actions": actions
}
太陽光発電の統合: EV への太陽光余剰
太陽光発電システムとEV充電ステーションの統合は、 エネルギー管理における ROI の向上。太陽光発電がそれ以上のものを生み出すとき 建物を消費し、余剰分は電気自動車に供給されるのではなく、電気自動車に送られる。 ネットワーク (~0.07 ~ 0.10 EUR/kWh GSE で報酬) - エネルギーの使用を許可します。 それ以外の場合、送電網からのコストは 0.28 ユーロ/kWh になります。純貯蓄額 0.18~0.21ユーロ/kWh EVによる自家消費量1kWhあたり。
| 戦略 | 論理 | 利点 | 制限 |
|---|---|---|---|
| 超過のみ | PV余剰でEVのみ充電 | 自己消費量は最大、ネットワークコストはゼロ | 可変電力、低速充電 |
| 太陽光発電ファースト | 太陽光発電を優先し、送電網をそのままに | ソーラー最大充電による安定した充電 | 送電網からのエネルギーの一部 |
| グリーンマキシマイザー | 天気予報を使用して 4 時間後の最適化を行う | 再生可能エネルギーの割合を最大化する | 正確な予測が必要です |
| コストオプティマイザー | 太陽光発電 + スポット価格 + V2G の組み合わせ | 絶対的な最小コスト | アルゴリズムの複雑さの高さ |
"""
Solar-EV Integration Controller.
Implementa strategie Excess-Only e Solar-First
con isteresi per evitare oscillazioni frequenti.
"""
from dataclasses import dataclass
from typing import Optional
import asyncio
import logging
logger = logging.getLogger(__name__)
@dataclass
class PowerState:
"""Stato istantaneo del sistema energetico."""
pv_production_kw: float
building_load_kw: float # consumo edificio escluso EV
electricity_price_eur: float = 0.25
feed_in_tariff_eur: float = 0.07
@property
def net_surplus_kw(self) -> float:
return max(0.0, self.pv_production_kw - self.building_load_kw)
@property
def savings_per_kwh(self) -> float:
"""Risparmio per kWh autoconsumato invece di cedere+comprare."""
return self.electricity_price_eur - self.feed_in_tariff_eur
class SolarEVController:
"""
Controlla la ricarica EV in funzione del surplus fotovoltaico.
Supporta Excess-Only e Solar-First con isteresi configurabile.
"""
HYSTERESIS_KW = 0.3 # evita comandi continui per piccole oscillazioni
def __init__(
self,
balancer,
strategy: str = "solar_first",
min_ev_power_kw: float = 1.38,
max_grid_supplement_kw: float = 5.0
):
self._balancer = balancer
self._strategy = strategy
self._min_ev_power = min_ev_power_kw
self._max_grid_supp = max_grid_supplement_kw
self._last_limit_kw = 0.0
async def update(self, power_state: PowerState, n_active_evs: int) -> dict:
"""
Ricalcola e applica il limite di potenza EV.
Chiamato ogni 5-15 secondi dal loop di controllo.
"""
if n_active_evs == 0:
return {"action": "no_ev", "allocated_kw": 0}
if self._strategy == "excess_only":
target_kw = self._excess_only(power_state, n_active_evs)
else:
target_kw = self._solar_first(power_state, n_active_evs)
# Applica isteresi
if abs(target_kw - self._last_limit_kw) > self.HYSTERESIS_KW:
await self._balancer.update_dynamic_limit(target_kw * 1000)
self._last_limit_kw = target_kw
logger.info("Solar routing [%s]: %.1f kW -> EV", self._strategy, target_kw)
hourly_savings = target_kw * power_state.savings_per_kwh
return {
"strategy": self._strategy,
"pv_kw": power_state.pv_production_kw,
"surplus_kw": power_state.net_surplus_kw,
"allocated_ev_kw": target_kw,
"savings_eur_h": round(hourly_savings, 3)
}
def _excess_only(self, ps: PowerState, n_evs: int) -> float:
surplus = ps.net_surplus_kw
min_total = self._min_ev_power * n_evs
return surplus if surplus >= min_total else 0.0
def _solar_first(self, ps: PowerState, n_evs: int) -> float:
surplus = ps.net_surplus_kw
min_total = self._min_ev_power * n_evs
if surplus < min_total:
grid_needed = min_total - surplus
return min_total if grid_needed <= self._max_grid_supp else surplus
return min(surplus + self._max_grid_supp, surplus + self._max_grid_supp)
async def solar_routing_demo():
"""Simula una giornata con produzione FV tipica di aprile in Italia."""
hourly_pv = [0, 0, 0, 0, 0, 0.5, 2, 5, 9, 14, 18, 21,
22, 20, 16, 10, 6, 2, 0.5, 0, 0, 0, 0, 0]
print("Ora | PV kW | Bldg kW | Surplus | EV kW | Risparmio €/h")
print("-" * 62)
for h in range(24):
pv = hourly_pv[h]
bldg = 5.0 + (2.0 if 8 <= h < 18 else 0)
surplus = max(0, pv - bldg)
price = 0.28 if 8 <= h < 19 else 0.18
feed_in = 0.07
ev_alloc = min(surplus, 11.0)
savings = ev_alloc * (price - feed_in)
print(
f"{h:02d}:00 | {pv:5.1f} | {bldg:7.1f} | "
f"{surplus:7.1f} | {ev_alloc:5.1f} | {savings:.3f}"
)
if __name__ == "__main__":
asyncio.run(solar_routing_demo())
チャージ コントローラー マイクロサービス アーキテクチャ
プロダクションEVロードバランシングシステムと分散アーキテクチャ イベントを介して通信する特殊なコンポーネント。主なサービスは次のとおりです。
| サービス | 責任 | スタック | ALS |
|---|---|---|---|
| OCPPゲートウェイ | WebSocket サーバー、OCPP 変換 1.6/2.0.1 | Python ocpp、asyncio | 99.9%、<100ms |
| チャージコントローラー | 負荷分散アルゴリズム、SetChargingProfile | FastAPI、Redis | 99.9%、<500ms |
| エネルギーマネージャー | PV/BESS/ネットワークの測定、可用性の計算 | Modbus TCP、MQTT、Python | 99.5%、ポーリング 1 秒 |
| 予報士 | 発着予報、価格、FV | LightGBM/LSTM、FastAPI | 99%、更新 15 分 |
| DSO ゲートウェイ | OpenADR 信号、動的制限 | Python OpenADR 2.0b | 99%、<2秒 |
| 請求する | メータリング、請求、セッション | FastAPI、PostgreSQL | 99.5% |
FastAPI を使用したチャージ コントローラー API
"""
Charge Controller Service - FastAPI REST API.
Endpoints per load balancing, monitoring e configurazione.
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from datetime import datetime, timezone
import logging
logger = logging.getLogger(__name__)
app = FastAPI(
title="EV Charge Controller API",
version="2.1.0",
description="Load balancing real-time per stazioni di ricarica EV"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST", "PUT"],
allow_headers=["*"]
)
# -------------------------------------------------------
# Modelli Pydantic
# -------------------------------------------------------
class ConnectorInfo(BaseModel):
connector_id: str
status: str
allocated_power_w: float
soc_percent: Optional[float]
session_duration_min: float
class StationStatus(BaseModel):
station_id: str
available_power_w: float
algorithm: str
connectors: List[ConnectorInfo]
last_updated: str
class EVConnectedEvent(BaseModel):
connector_id: str
transaction_id: str
initial_soc: Optional[float] = None
battery_capacity_kwh: Optional[float] = None
departure_timestamp: Optional[int] = None
user_tier: int = Field(default=1, ge=1, le=3)
class PowerLimitUpdate(BaseModel):
limit_watts: float = Field(gt=0, le=200_000)
source: str = "manual" # "manual" | "dso" | "solar" | "openadr"
valid_until_ts: Optional[int] = None
class AlgorithmSwitch(BaseModel):
algorithm: str = Field(pattern="^(equal_share|priority|rl_ppo)$")
params: Dict = {}
# In-memory registry (in produzione: Redis)
_station_registry: Dict = {}
_algorithm_map: Dict[str, str] = {}
# -------------------------------------------------------
# Endpoints
# -------------------------------------------------------
@app.get("/health")
async def health():
return {
"status": "healthy",
"ts": datetime.now(timezone.utc).isoformat(),
"version": "2.1.0"
}
@app.get("/stations/{station_id}/status", response_model=StationStatus)
async def get_status(station_id: str):
if station_id not in _station_registry:
raise HTTPException(404, f"Stazione {station_id} non trovata")
balancer = _station_registry[station_id]
raw = balancer.get_status()
return StationStatus(
station_id=station_id,
available_power_w=raw["available_power_w"],
algorithm=_algorithm_map.get(station_id, "equal_share"),
connectors=[ConnectorInfo(**c) for c in raw["connectors"]],
last_updated=datetime.now(timezone.utc).isoformat()
)
@app.post("/stations/{station_id}/events/connected")
async def ev_connected(station_id: str, event: EVConnectedEvent, bg: BackgroundTasks):
if station_id not in _station_registry:
raise HTTPException(404, "Stazione non trovata")
bg.add_task(
_station_registry[station_id].on_ev_connected,
event.connector_id,
event.transaction_id
)
return {"status": "accepted", "rebalancing": True}
@app.delete("/stations/{station_id}/connectors/{connector_id}/session")
async def ev_disconnected(station_id: str, connector_id: str, bg: BackgroundTasks):
if station_id not in _station_registry:
raise HTTPException(404, "Stazione non trovata")
bg.add_task(_station_registry[station_id].on_ev_disconnected, connector_id)
return {"status": "accepted"}
@app.put("/stations/{station_id}/power-limit")
async def set_power_limit(station_id: str, limit: PowerLimitUpdate, bg: BackgroundTasks):
if station_id not in _station_registry:
raise HTTPException(404, "Stazione non trovata")
bg.add_task(_station_registry[station_id].update_dynamic_limit, limit.limit_watts)
logger.info("Limite potenza: %s -> %.0f W (source=%s)", station_id, limit.limit_watts, limit.source)
return {"status": "accepted", "new_limit_w": limit.limit_watts, "source": limit.source}
@app.put("/stations/{station_id}/algorithm")
async def set_algorithm(station_id: str, config: AlgorithmSwitch):
_algorithm_map[station_id] = config.algorithm
return {"status": "ok", "algorithm": config.algorithm}
@app.get("/stations/{station_id}/metrics")
async def get_metrics(station_id: str):
"""KPI della stazione (in produzione: query InfluxDB/TimescaleDB)."""
return {
"station_id": station_id,
"period": "last_24h",
"charging_satisfaction_rate": 0.94,
"peak_reduction_percent": 31.2,
"energy_cost_savings_eur": 47.80,
"solar_share_percent": 42.3,
"grid_stress_index": 0.23,
"total_energy_kwh": 201.6,
"avg_power_kw": 8.4,
"rebalance_latency_ms_p95": 180
}
負荷分散評価のメトリクスと KPI
負荷分散システムの有効性を評価するには、定量化可能な KPI が必要です ユーザー満足度とエネルギー最適化の両方を測定します。
| KPI | 意味 | ターゲット | Formula |
|---|---|---|---|
| 充電満足度(CSR) | 期限までに SoC 目標に到達したセッションの割合 | >90% | ok_sessions / total_sessions |
| ピーク低減率 (%) | ピーク削減と管理なしの比較 | >25% | (ピーク_非管理 - ピーク_管理) / ピーク_非管理 |
| エネルギーコストの節約率 (%) | 即時充電と比較して、kWh コストの節約 | >15% | (基本コスト - 管理コスト) / 基本コスト |
| 太陽光発電の自己消費率 (%) | 地元の太陽光発電からの EV エネルギーの割合 | >40% | ソーラーエネルギー_ev / トータルエネルギー_ev |
| グリッドストレスインデックス (GSI) | 変圧器にかかる平均圧力 [0-1] | <0.3 | avg(P_actual / P_nominal_trafo) |
| リバランスレイテンシ P95 | イベントからOCPPプロファイルが適用されるまでの時間 | <2秒 | t_ocpp_conf - t_event_received (p95) |
イタリアの背景: ARERA、EV およびインセンティブ付き CER 2025
イタリアでの EV 負荷分散システムの導入には規制上の課題が伴う アーキテクチャの選択に直接影響を与える市場。
ARERA の 2 時間ごとの料金とスマート チャージング
イタリアの料金体系は、オフピーク充電に対する自然なインセンティブを生み出します。
- F1バンド (月曜~金曜 8:00~19:00): 約 0.26~0.31 ユーロ/kWh。 EV充電時に避けるべき帯域。
- バンドF2 (月曜~金曜 7~8、19~23、土曜 7~23): 約 0.20~0.24 ユーロ/kWh。
- バンドF3 (夜11時~朝7時、日曜・祝日は終日): 約0.16~0.19ユーロ/kWh。夜間の充電に最適です。
充電の 60% を F1 から F3 にシフトする使用時間アルゴリズムにより、充電時間が短縮されます。 フリート管理者の年間充電コスト 18-25%.
EVによる再生可能エネルギーコミュニティ(CER)
立法令 199/2021 および 2023 年 12 月 7 日の大臣令 MASE により、具体的な機会が開かれました。 イタリアのCERにおけるEV統合向け:
- コミュニティの太陽光発電余剰で充電する EV は GSE インセンティブを生成します のシェアについて 共有エネルギー (運賃の払い戻し + インセンティブ 最大 11 セント/kWh)。
- V2G では、EV バッテリーが仮想 CER ストレージとして機能し、 正午から夕方までの太陽エネルギー(ピークシフト)。
- GSE は、以下を通じて共有されるエネルギーを測定します。 マックセを含む コミュニティアカウンティングにおける V2G EV の双方向フロー。
2025年までにイタリアのEVインフラ
イタリアには 58,000 を超える公共充電ポイント (Motus-E、2024 年末) があり、そのうち 22% の高速 DC (2023 年には 14%)。 2025 年には 94,230 台の BEV が登録されました (+46%)。 市場シェアは6.2%です。 Terna によると、人口の 7% がスマート充電を利用していない状態が蔓延している 配電フィーダーは、2027 年までに、最も密度の高い地域で過負荷になると予想されます。 北イタリア。 PNRRはインフラ整備に7億4000万ユーロ以上を割り当てた 2026 年までに新たに 21,000 の公共ポイントを獲得することを目標としています。
インセンティブに注目
Transition 5.0 に基づくスマート チャージングの税額控除は以下の対象となります。 定期的に検査されます。 GSE ウェブサイトで最新の条件を常に確認してください インセンティブベースの投資を計画する前に、CSEA と CSEA を確認してください。 料金やアクセス条件は異なる場合があります。
結論と実装ロードマップ
EV の負荷分散はアルゴリズムの優雅さの問題ではなく、必要不可欠なものです。 エネルギー転換のための重要なインフラ。データは明白です: 管理されていない EV が 500 万台ある場合、夜のピークは 35,000 MW 増加します。スマートで EV フリート自体の充電は柔軟なリソースとなり、ピークを 34% 削減します。
これらのシステムを運用環境に実装する開発チームのロードマップは、 後続のフェーズに最適で、次のことが可能です。
- フェーズ 1 - 均等シェア: シンプルで堅牢、設置には十分 最大30〜40のトリック。最優先: 再計算を正しく実装する 減少してから増加するシーケンスによるイベント駆動型。推定時間: 2 ~ 3 週間。
-
フェーズ 2 - PV の統合: ML を使用しない即時 ROI。必要なのは、
Modbus/MQTTを介した太陽光余剰の測定とバランサーとの統合
update_dynamic_limit()。 6 ~ 18 か月で回収できる迅速な勝利。 - フェーズ 3 - 優先順位に基づく: 収集するモバイルアプリがある場合に追加します SoC と開始期限。 CSR(充電満足度)の向上は測定可能です そしてサービスを競合他社と差別化します。
- フェーズ 4 - V2G インフラストラクチャ: 双方向アーキテクチャを設計する V2G EV をまだお持ちでない場合でも、今すぐご利用いただけます。 ISO 15118-20 を備えた OCPP 2.0.1 が必須になります 2026 年以降の新規プラント向け (EU AFIR)。改造には5~10倍の費用がかかります。
- フェーズ 5 - 運用中の RL: ディープ RL への投資は 6 か月以上経過した後にのみ行ってください。 高品質のデータと専任の MLOps チームを活用します。コストの増加は実際にあります (15 ~ 25%) しかし、操作の複雑さは非常に高いです。
EnergyTech シリーズの関連記事
- 第 4 条: ネットワーク デジタル ツイン - 導入前にネットワークに対する EV の影響をシミュレーションする
- 第 5 条: 再生可能エネルギーの予測 - 太陽光発電ルートの PV 生産予測
- 第 7 条: MQTT および InfluxDB - 課金パラメータのリアルタイム テレメトリ
- 第 2 条: DERMS アーキテクチャ - EV を DER として分散管理システムに統合する
シリーズ間の洞察
- MLOps シリーズ (条項 306 ~ 315): MLflow を使用した PPO モデルの展開、ドリフト監視および再トレーニング
- AI エンジニアリング シリーズ (条項 316 ~ 325): LLM エンタープライズとの OCPP および ISO 15118 文書に関する RAG
- データおよび AI ビジネス シリーズ (第 267 条から第 280 条): ROI、ビジネス指標およびスマート充電投資のロードマップ
- PostgreSQL AI シリーズ (条項 356 ~ 361): PostgreSQL 上の TimescaleDB を使用したリロード データの時系列







