DERMS アーキテクチャ: 数百万の分散リソースを集約する
2025 年には、歴史上初めて、ヨーロッパで生産される電力の半分以上が、 再生可能な資源から。並外れた成果ですが、同時に並外れた課題も伴います。 少数の大規模な発電所に生産が集中しなくなった電力網をどのように管理するか。 屋根上の何百万もの太陽光発電システム、ガレージの蓄電池、電気自動車に分散 ソケット、インテリジェントヒートポンプ、産業用マイクロコージェネレーターに接続されています。
この課題に対する答えには次のような名前があります。 皮膚、分散型エネルギー資源管理システム。 これは、数千ものデータをリアルタイムで集約、監視、最適化、調整するソフトウェア プラットフォームです。 または数百万の分散型エネルギー リソース (DER) を、 柔軟で制御可能なネットワーク資産です。 DERMS がなければ分散型再生可能エネルギーは成長する それは送電網を脱炭素化するのではなく、不安定化させる危険性があります。
DERMS 市場は活況を呈しています。2025 年の予測は次のとおりです。 11億ドルと14億2000万ドル 情報源に応じて、成長予測は最大で 2030年までに22億ドル CAGRで 14〜16%増加します。しかし、本当の革命はアーキテクチャ面で起こります。つまり、デバイスを 1,000 台から 1,000,000 台に拡張することです。 分散型で、リアルタイム ディスパッチのレイテンシーを 500 ミリ秒未満に保つことは、エンジニアリング上の問題です 本番環境で解決したソフトウェアはほとんどありません。
この記事では、通信プロトコルから最新の DERMS のアーキテクチャ全体を詳しく掘り下げます。 フィールド (OpenADR 2.0b、IEEE 2030.5、SunSpec Modbus) から Kafka を使用したイベント駆動型クラウド プラットフォームまで、 付属サービス市場へのディスパッチ最適化の数学 (PuLP による線形計画法) イタリア語 (テルナの MSD/MGP)。動作する Python コードとイタリアの地域 VPP の実際のケーススタディを含みます。
この記事で学べること
- 公益事業エコシステムにおけるDERMSの定義と位置付け(EMS、SCADA、ADMSとの違い)
- DER の種類: 住宅用 PV、BESS、EV/V2G、デマンド レスポンス、コージェネレーション
- 多層アーキテクチャ: フィールド、エッジ、プラットフォーム、マーケット
- 仮想発電所: 数千の DER を単一の市場資産に集約する方法
- Python 実装: FastAPI サービス + PuLP によるディスパッチの最適化
- 通信プロトコル: OpenADR 2.0b、IEEE 2030.5、MQTT、SunSpec Modbus
- Apache Kafka と CQRS による 100 万 DER のイベント駆動型のスケーラビリティ
- デマンドレスポンス: DRプログラム、M&V、OpenADRイベント
- イタリア語のコンテキスト: CER、GSE、MACSE、MSD/MGP Terna
- ケーススタディ: 5,000 PV + 500 BESS の地域 VPP
EnergyTech シリーズ - 10 件の記事
| # | アイテム | Stato |
|---|---|---|
| 1 | スマート グリッドと IoT: 将来の電力網のためのアーキテクチャ | 発行済み |
| 2 | DERMS アーキテクチャ: 数百万の分散リソースを集約する (ここにいます) | 現在 |
| 3 | バッテリー管理システム: BESS の制御アルゴリズム | Prossimo |
| 4 | Python と Pandapower を使用した電力網のデジタル ツイン | 近日公開 |
| 5 | 再生可能エネルギーの予測: 太陽光発電と風力発電の ML | 近日公開 |
| 6 | EV ロード バランシング: V2G と OCPP によるスマート充電 | 近日公開 |
| 7 | リアルタイム エネルギー テレメトリのための MQTT と InfluxDB | 近日公開 |
| 8 | IEC 61850: 変電所における通信 | 近日公開 |
| 9 | 炭素会計ソフトウェア: 排出量の測定と削減 | 近日公開 |
| 10 | CER における P2P エネルギー取引のためのブロックチェーン | 近日公開 |
DERMS とは何ですか、また公共事業エコシステムの中でどのように位置づけられていますか
技術的なアーキテクチャに入る前に、DERMS と他の DERMS の違いを明確にすることが重要です。 電力会社が数十年にわたって使用してきたエネルギー管理システム。この用語の混乱 ベンダーはマーケティング目的でこれらの頭字語を同じ意味で使用することがよくあります。
管理システムの階層
最新の公共事業のエコシステムでは、複数のシステムが共存し、それぞれが特定の責任を負います。
| システム | 頭字語 | ドメイン | 管理されたリソース | 一般的な遅延 |
|---|---|---|---|---|
| エネルギー管理システム | EMS | トランスミッション(HV) | 大規模発電所、連系線 | 秒-分 |
| 有効期限が切れます | 有効期限が切れます | 送電+配電 | スイッチ、変圧器、線路 | 100ミリ秒~1秒 |
| 高度な流通管理システム | ADMS | 配信(MV/LV) | 配送ネットワーク、キャビン | Secondi |
| 分散型エネルギーリソース管理システム | 皮膚 | 流通 + 最終顧客 | FV、BESS、EV、DR、VPP | 100ミリ秒 - 5分 |
| ホームエネルギー管理システム | HEMS | 住宅のお客様 | 単一のホームデバイス | 秒-分 |
DERMS は独特の位置を占めています。メーターの境界を越えた最初のシステムです。 エンド顧客のドメインに入る。これにより、法的影響 (同意、データプライバシー) が生じます。 技術面(数千のデバイスブランド/モデルとの相互運用性)とビジネス面(誰が所有するか) データは?誰が収益を分配しますか?)。
参照規格: IEEE 2030.x および OpenADR
2 つの標準ファミリーが DERMS エコシステムを推進します。
IEEE 2030.5 (スマート エネルギー プロファイル 2.0 / SEP2)
2013 年に発行され、2023 年に更新されました (IEEE 2030.5-2023、2024 年 12 月)。プロトコルを定義します。 ユーティリティと顧客に配布されたデバイス間の通信。 RESTful アーキテクチャに基づいて、 HTTP/HTTPS、セキュリティのために TLS 1.2+ をサポートします。対象: デマンドレスポンス、負荷制御、価格設定 動的、DER 管理 (太陽光発電、ストレージ、EV)。そしてすべての人に対するカリフォルニア州の義務(規則 21) 新しい太陽光発電システムと蓄電システム。 2023 プロファイルでは DER 固有の機能が導入されました。
OpenADR 2.0b
OpenADR Alliance によって開発された Open Automated Demand Response。バージョン 2.0b とプロファイル 高度なサーバーとクライアント (2.0a および単純なデバイス) の場合は完全です。 HTTP 経由で XML/JSON を使用します。 仮想トップ ノード (VTN - DERMS/ユーティリティ) と仮想エンド ノード (VEN - デバイス/アグリゲータ) を定義します。 プッシュ (VTN 開始) モードとプル (VEN が必要) モードをサポートします。 2025年に最初の認証製品が誕生 OpenADR 3.0 (E.ON SWITCH Platform) が発表されましたが、運用上のリファレンスは 2.0b のままです 世界的な展開の大部分に対応します。
DER の種類: 分散リソースの寓話
DERMS は、それぞれが異なる物理的特性を持つ異種テクノロジーの動物園を管理しなければなりません。 異なる通信インターフェースと異なる操作上の制約。それらを徹底的に知ることが前提条件です 効果的な集約システムを設計します。
| DERタイプ | 標準的な容量 | コントロール性 | 通信遅延 | メインプロトコル | 主な制約 |
|---|---|---|---|---|---|
| 住宅用PV | 3~10kWp | 抑制、ランプ率 | 5~60秒 | IEEE 2030.5、サンスペック | 照射にもよるけど |
| FV C&I (商業および産業) | 50~5,000kWp | 抑制、無効電力 | 1~5秒 | Modbus TCP、DNP3 | PPA契約、ネットワーク制約 |
| BESS レジデンシャル | 5~15kWh / 3~10kW | 高(充電/放電/スタンバイ) | 100ミリ秒~2秒 | サンスペック、IEEE 2030.5 | SoC の最小/最大、ライフサイクル |
| BESS C&I / グリッドスケール | 100kWh~1GWh | 非常に高いミリ秒応答 | 50~500ミリ秒 | Modbus TCP、IEC 60870、IEC 61850 | 温度、SoC、劣化 |
| EV (Vehicle-to-Grid V2G) | 7~100kW双方向 | 接続され有効になっている場合は高 | 1 ~ 10 秒 (OCPP 2.0.1) | OCPP 2.0.1、ISO 15118 | ユーザー SoC、充電時間 |
| EV(スマート充電V1G) | 3.7 ~ 22 kW 全世界指向性 | 中(削減のみ) | 5~30秒 | OCPP 1.6/2.0.1 | ユーザー設定、ターゲット SoC |
| デマンドレスポンス(産業負荷) | 50kW~50MW | 事前資格があれば高い | 10~300秒 | OpenADR 2.0b | イベント期間、回復 |
| ヒートポンプ (HP) | 3~20kWのサーマル | 平均(タイムシフト) | 30~300秒 | Modbus、OpenADR | 温熱快適性、設定値 |
| マイクロコージェネレーション(CHP) | 1-1,000 kWe | 高 (ランタブル) | 1~30秒 | Modbus TCP、OPC-UA | 熱効率、ガス |
異質性の複雑さ
実際の DERMS は、何百もの異なるモデルのインバータ、BMS、カラムと接続する必要があります。 産業用充電およびコントローラー。各メーカーはプロトコルの実装方法が若干異なりますが、 特定のバグ、非標準のタイムアウト、機能のサブセットが含まれます。アダプターを備えた堅牢なドライバー層 最適化について考える前に、パターンとアーキテクチャの最優先事項を理解する必要があります。
DERMS ソフトウェア アーキテクチャ: 多層モデル
最新の DERMS は 4 つの異なる層に分割されており、それぞれの層には明確に定義された責任と 特定の技術。レイヤー間の明確な分離は、スケーラビリティと システムの保守性。
# Architettura DERMS - Vista ad alto livello
+================================================================+
| LAYER 4: MARKET |
| Mercati Energia: MGP, MSD, MO, Capacity Market |
| DSO Flexibility Markets, Aggregatori terzi |
| Revenue stacking, Portfolio optimization |
+================================================================+
| |
Bid/Offer API Settlement data
| |
+================================================================+
| LAYER 3: PLATFORM (Cloud DERMS) |
| |
| +------------------+ +------------------+ |
| | Forecasting | | Dispatch Engine | |
| | (ML: FV, load, | | (Optimization: | |
| | EV availability)| | LP/MILP/MPC) | |
| +------------------+ +------------------+ |
| |
| +------------------+ +------------------+ |
| | Aggregation | | Market Interface | |
| | Service (VPP | | (Bid builder, | |
| | portfolio mgmt) | | settlement) | |
| +------------------+ +------------------+ |
| |
| +------------------+ +------------------+ |
| | Event Bus | | Time-Series DB | |
| | (Apache Kafka) | | (InfluxDB/ | |
| | | | TimescaleDB) | |
| +------------------+ +------------------+ |
| |
| +------------------+ +------------------+ |
| | Device Registry | | API Gateway | |
| | (DER catalog, | | (REST, WebSocket,| |
| | metadata, caps) | | gRPC) | |
| +------------------+ +------------------+ |
+================================================================+
| |
Commands (dispatch) Telemetry (status)
| |
+================================================================+
| LAYER 2: EDGE |
| |
| +------------------+ +------------------+ |
| | Site Aggregator | | Protocol Gateway | |
| | (building/plant | | (Modbus->MQTT, | |
| | controller) | | SunSpec->JSON) | |
| +------------------+ +------------------+ |
| |
| Local optimization, failsafe, buffering, compression |
+================================================================+
| |
Device protocols (Modbus, SunSpec, OCPP, BACnet, OPC-UA)
| |
+================================================================+
| LAYER 1: FIELD |
| Inverter FV - BESS BMS - EV Charger - Smart Meter |
| Industrial Loads - CHP Controller - Heat Pump |
+================================================================+
主要なアーキテクチャ原則
CQRS によるイベント駆動型
DERMS の中心となるのは、書き込みコマンドを分離するイベント バス (運用環境では Apache Kafka) です。 (コマンド側 - 命令をディスパッチ) 読み取りクエリ (クエリ側 - ダッシュボード、レポート) から。パターン CQRS (コマンド クエリ責任分離) を使用すると、2 つのパスを個別にスケーリングできます。 テレメトリ クエリは非常に頻繁に行われます (1 時間あたり数百万のメッセージ)。 発送の頻度はそれほど高くありませんが、配送保証 (少なくとも 1 回または正確に 1 回) が必要です。
エッジファーストの復元力
エッジ層は単純なリレーではありません。オフライン モード (グレースフル デグラデーション) で動作する機能があります。 クラウドへの接続が中断されたとき。サイトアグリゲーターはローカル最適化を実行します 簡素化されたルールのサブセットを使用して、バッテリーが最小 SoC を下回って放電しないようにします。 そして、DERMS クラウドの監視なしでも重要な負荷には電力が供給され続けます。
ドライバーアダプターパターン
各タイプのデバイスには、ネイティブ プロトコル (SunSpec Modbus、OpenADR、 OCPP) を標準化された内部データ モデル (デバイス シャドウ、AWS IoT からインスピレーションを得たもの) で作成します。これは孤立している フィールド プロトコルの複雑さからビジネス ロジックを完全に排除し、追加できるようにします。 システムの核心に触れることなく、新しいタイプの DER を実現します。
仮想発電所: DER を市場資産に変える
仮想発電所 (VPP) は、DER アグリゲーションに経済的価値を与える重要な概念です。 VPP は物理的なプラントではありません。外部から見ると、分散リソースのポートフォリオです。 (電力市場または送電ネットワークから) 仮想発電所のように動作します 制御可能かつ予測可能な特性を備えています。
世界の VPP 市場は 2025 年の 57 億ドルから成長し、この規模に達すると予測されています 2035年までに284億ドル (CAGR 17.4%)。アグリゲーションおよびオーケストレーション ソフトウェア 市場シェアの46%を占め圧倒的な地位を占めています。北米における VPP の総容量が上限に達しました 2025 年には 37.5 GW、2030 年の世界目標は V2G によって実現される 500 GW を超えます。
VPP アグリゲーションの仕組み
集計プロセスは、15 分ごとに繰り返される 4 つの連続したフェーズで発生します。 (欧州市場の一般的なスケジュール期間):
- 個別予想: ポートフォリオ内の各 DER について、システムは 今後数時間以内に利用可能になる予定です。太陽光発電システムの場合、それは予想される放射線量によって異なります。 BESS の場合は、現在の充電状態 (SoC) とすでに計画されているサイクル。 EVの場合、から 接続される確率 (過去のユーザー パターンに基づく)。
- ポートフォリオの集約: 個別の予測がレベルに集約されます。 ネットワークの制約 (輻輳管理) とリソース間の相関関係を考慮した VPP の調整。 その結果、VPP が各価格でどれだけの電力を提供できるかを示す「入札曲線」が得られます。
- 最適化と入札: 最適化アルゴリズム (線形計画法または MILP) は、市場 (MGP、MSD、容量市場) での最適な入札戦略を決定します。 ポートフォリオの予想収益を最大化します。
- リアルタイムで発送: 市場で契約が成立すると、 ディスパッチ エンジンは、物理的な制約とバランスを考慮して、設定値を個々の DER に送信します。 市場ターゲットに対する総合的な反応。
Python 実装: DERMS アグリゲーション サービス
FastAPI とディスパッチ最適化を使用して完全な DER アグリゲーション サービスを構築します 線形計画法に基づいています。コードはシステムの現実的なモジュールで構造化されています 生産。
DER データモデル
# models.py - Modello dati per le risorse distribuite
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import datetime
class DERType(Enum):
SOLAR_PV = "solar_pv"
BATTERY_STORAGE = "battery_storage"
EV_CHARGER = "ev_charger"
DEMAND_RESPONSE = "demand_response"
CHP = "chp"
HEAT_PUMP = "heat_pump"
class DERStatus(Enum):
ONLINE = "online"
OFFLINE = "offline"
DISPATCHING = "dispatching"
FAULT = "fault"
STANDBY = "standby"
@dataclass
class DERCapabilities:
"""capacità fisiche di una DER"""
max_power_kw: float # Potenza massima erogabile (kW)
min_power_kw: float # Potenza minima (0 per curtailment FV)
ramp_up_kw_per_sec: float # Velocita rampa salita (kW/s)
ramp_down_kw_per_sec: float # Velocita rampa discesa (kW/s)
# Solo per storage (BESS/EV)
capacity_kwh: Optional[float] = None
min_soc_pct: Optional[float] = None # SoC minimo (es. 10%)
max_soc_pct: Optional[float] = None # SoC massimo (es. 95%)
roundtrip_efficiency: Optional[float] = None # Efficienza ciclo (es. 0.92)
@dataclass
class DERTelemetry:
"""Stato in tempo reale di una DER"""
der_id: str
timestamp: datetime.datetime
active_power_kw: float # Potenza attuale (positiva = generazione)
reactive_power_kvar: float
voltage_v: float
current_a: float
status: DERStatus
# Solo per storage
soc_pct: Optional[float] = None
available_charge_kw: Optional[float] = None
available_discharge_kw: Optional[float] = None
# Solo per FV
irradiance_wm2: Optional[float] = None
temperature_c: Optional[float] = None
@dataclass
class DERAsset:
"""Registro completo di una DER nel portfolio"""
id: str
type: DERType
name: str
site_id: str # Sito fisico di appartenenza
grid_node_id: str # Nodo di rete (per vincoli topologici)
capabilities: DERCapabilities
protocol: str # "sunspec", "openadr", "ocpp", "modbus"
endpoint: str # URL/IP del dispositivo o del gateway
owner_id: str # Proprietario (utente o azienda)
aggregation_vpp_ids: list = field(default_factory=list) # VPP di appartenenza
# Telemetria più recente (aggiornata dal telemetry service)
last_telemetry: Optional[DERTelemetry] = None
FastAPI を使用したアグリゲーション サービス
# aggregation_service.py - Servizio di aggregazione VPP
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Dict, List, Optional
import asyncio
import logging
from datetime import datetime, timezone
# Import interni
from models import DERAsset, DERTelemetry, DERType, DERStatus
from dispatch_optimizer import DispatchOptimizer
from telemetry_store import TelemetryStore
logger = logging.getLogger(__name__)
app = FastAPI(title="DERMS Aggregation Service", version="2.1.0")
# --- Pydantic schemas per la API ---
class VPPPortfolioResponse(BaseModel):
vpp_id: str
der_count: int
total_capacity_kw: float
available_capacity_kw: float
current_dispatch_kw: float
battery_soc_avg_pct: Optional[float]
timestamp: str
class DispatchRequest(BaseModel):
vpp_id: str
target_power_kw: float # Potenza target per la VPP (positiva = generazione)
duration_minutes: int # Durata del dispatch
priority: str = "normal" # "emergency" | "normal" | "economic"
max_deviation_pct: float = 5.0 # Tolleranza deviazione dal target
class DispatchSetpoint(BaseModel):
der_id: str
power_kw: float
duration_minutes: int
timestamp: str
class DispatchResponse(BaseModel):
dispatch_id: str
vpp_id: str
target_power_kw: float
achieved_power_kw: float
setpoints: List[DispatchSetpoint]
feasibility_score: float # 0.0 - 1.0 (1.0 = target pienamente raggiunto)
timestamp: str
# --- Registro in-memory (in produzione: database PostgreSQL + cache Redis) ---
_vpp_registry: Dict[str, List[str]] = {} # vpp_id -> [der_id]
_der_registry: Dict[str, DERAsset] = {} # der_id -> DERAsset
_telemetry_store = TelemetryStore()
_optimizer = DispatchOptimizer()
# --- Endpoints ---
@app.get("/health")
async def health_check():
return {"status": "ok", "version": "2.1.0", "timestamp": datetime.now(timezone.utc).isoformat()}
@app.get("/api/v1/vpp/{vpp_id}/portfolio", response_model=VPPPortfolioResponse)
async def get_vpp_portfolio(vpp_id: str):
"""
Ritorna la snapshot aggregata dello stato corrente di una VPP.
Consolida telemetria di tutti i DER nel portfolio.
"""
if vpp_id not in _vpp_registry:
raise HTTPException(status_code=404, detail=f"VPP '{vpp_id}' non trovata")
der_ids = _vpp_registry[vpp_id]
der_assets = [_der_registry[did] for did in der_ids if did in _der_registry]
if not der_assets:
raise HTTPException(status_code=503, detail="Nessun DER disponibile nel portfolio")
# Aggregazione metriche
total_capacity = sum(a.capabilities.max_power_kw for a in der_assets)
current_dispatch = 0.0
available_capacity = 0.0
storage_assets = []
for asset in der_assets:
telemetry = _telemetry_store.get_latest(asset.id)
if telemetry and telemetry.status in [DERStatus.ONLINE, DERStatus.DISPATCHING]:
current_dispatch += telemetry.active_power_kw
# capacità disponibile = differenza tra massimo e corrente
if telemetry.available_discharge_kw is not None:
available_capacity += telemetry.available_discharge_kw
else:
available_capacity += (asset.capabilities.max_power_kw - telemetry.active_power_kw)
# Raccogli SoC per storage
if telemetry.soc_pct is not None:
storage_assets.append(telemetry.soc_pct)
battery_soc_avg = (sum(storage_assets) / len(storage_assets)) if storage_assets else None
return VPPPortfolioResponse(
vpp_id=vpp_id,
der_count=len(der_assets),
total_capacity_kw=round(total_capacity, 2),
available_capacity_kw=round(max(0, available_capacity), 2),
current_dispatch_kw=round(current_dispatch, 2),
battery_soc_avg_pct=round(battery_soc_avg, 1) if battery_soc_avg else None,
timestamp=datetime.now(timezone.utc).isoformat()
)
@app.post("/api/v1/vpp/dispatch", response_model=DispatchResponse)
async def dispatch_vpp(request: DispatchRequest, background_tasks: BackgroundTasks):
"""
Esegue un dispatch ottimizzato della VPP verso il target di potenza richiesto.
Usa Linear Programming per distribuire il carico tra i DER disponibili.
"""
if request.vpp_id not in _vpp_registry:
raise HTTPException(status_code=404, detail=f"VPP '{request.vpp_id}' non trovata")
# Recupero DER disponibili e telemetria aggiornata
der_ids = _vpp_registry[request.vpp_id]
available_ders = []
for der_id in der_ids:
asset = _der_registry.get(der_id)
telemetry = _telemetry_store.get_latest(der_id)
if asset and telemetry and telemetry.status in [DERStatus.ONLINE, DERStatus.STANDBY]:
available_ders.append((asset, telemetry))
if not available_ders:
raise HTTPException(status_code=503, detail="Nessun DER disponibile per il dispatch")
# Ottimizzazione del dispatch tramite LP
result = _optimizer.optimize_dispatch(
ders=available_ders,
target_power_kw=request.target_power_kw,
duration_minutes=request.duration_minutes
)
# Invio setpoint in background (async, non bloccante)
background_tasks.add_task(
_send_setpoints_to_ders,
setpoints=result.setpoints,
dispatch_id=result.dispatch_id
)
return result
async def _send_setpoints_to_ders(setpoints: List[DispatchSetpoint], dispatch_id: str):
"""Invia i setpoint a ogni DER in parallelo tramite il rispettivo adapter."""
logger.info(f"Inizio dispatch {dispatch_id} - {len(setpoints)} setpoint")
tasks = [_send_single_setpoint(sp) for sp in setpoints]
results = await asyncio.gather(*tasks, return_exceptions=True)
errors = [r for r in results if isinstance(r, Exception)]
if errors:
logger.warning(f"Dispatch {dispatch_id}: {len(errors)} errori su {len(setpoints)} setpoint")
logger.info(f"Dispatch {dispatch_id} completato")
async def _send_single_setpoint(setpoint: DispatchSetpoint):
"""Stub: in produzione chiama l'adapter specifico del protocollo del DER."""
asset = _der_registry.get(setpoint.der_id)
if not asset:
raise ValueError(f"DER {setpoint.der_id} non trovato nel registro")
logger.debug(f"Setpoint {setpoint.der_id}: {setpoint.power_kw} kW per {setpoint.duration_minutes} min")
# In produzione: chiamata all'adapter (SunSpec/OpenADR/OCPP)
await asyncio.sleep(0.1) # Simulazione latenza rete
線形計画法によるディスパッチオプティマイザー (PuLP)
最適化の中心となるのは、偏差を最小限に抑える線形計画法 (LP) 問題です。 各 DER の物理的制約を考慮した電力目標から:
# dispatch_optimizer.py - Ottimizzazione LP per dispatch DER
import pulp
import uuid
import logging
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import List, Tuple
from models import DERAsset, DERTelemetry, DERType, DERStatus
logger = logging.getLogger(__name__)
@dataclass
class OptimizationResult:
dispatch_id: str
vpp_id: str
target_power_kw: float
achieved_power_kw: float
setpoints: list
feasibility_score: float
timestamp: str
solver_status: str
solve_time_ms: float
class DispatchOptimizer:
"""
Ottimizzatore LP per dispatch di portfolio DER.
Problema: data una richiesta di potenza target P_target,
trovare i setpoint p_i per ogni DER i nel portfolio
che minimizzino |sum(p_i) - P_target|, rispettando:
- p_i_min <= p_i <= p_i_max per ogni DER
- Vincoli SoC per storage (BESS/EV)
- Vincoli ramp rate
- Vincoli di topologia di rete (opzionale)
"""
def optimize_dispatch(
self,
ders: List[Tuple[DERAsset, DERTelemetry]],
target_power_kw: float,
duration_minutes: int,
vpp_id: str = "vpp-001"
) -> OptimizationResult:
import time
start_time = time.time()
dispatch_id = f"disp-{uuid.uuid4().hex[:8]}"
# Costruzione del problema LP con PuLP
prob = pulp.LpProblem(
name=f"DER_Dispatch_{dispatch_id}",
sense=pulp.LpMinimize
)
# Variabili decisionali: setpoint per ogni DER (kW)
# Vincolate tra min e max fisico del dispositivo
p_vars = {}
for asset, telemetry in ders:
# Calcola limiti effettivi considerando SoC per storage
p_min, p_max = self._compute_effective_limits(asset, telemetry, duration_minutes)
var_name = f"p_{asset.id.replace('-', '_')}"
p_vars[asset.id] = pulp.LpVariable(
name=var_name,
lowBound=p_min,
upBound=p_max,
cat=pulp.constants.LpContinuous
)
# Variabile di slack per la deviazione dal target (non-negativa)
slack_pos = pulp.LpVariable("slack_pos", lowBound=0) # Eccesso rispetto target
slack_neg = pulp.LpVariable("slack_neg", lowBound=0) # Deficit rispetto target
# Funzione obiettivo: minimizzare la deviazione assoluta dal target
# Pesi differenziati: penalizza di più il deficit (mancata fornitura)
prob += 1.5 * slack_neg + 1.0 * slack_pos, "MinimizeDeviation"
# Vincolo di bilanciamento della potenza
total_power = pulp.lpSum(p_vars[asset.id] for asset, _ in ders)
prob += (total_power - target_power_kw == slack_pos - slack_neg), "PowerBalance"
# Risoluzione (COIN-BC solver, open source)
solver = pulp.COIN_CMD(msg=False, timeLimit=5.0)
status = prob.solve(solver)
solve_time_ms = (time.time() - start_time) * 1000
solver_status = pulp.LpStatus[prob.status]
# Costruzione setpoint dal risultato
setpoints = []
achieved_power = 0.0
if status in [pulp.LpStatusOptimal := 1, -1]: # Optimal or Infeasible
for asset, telemetry in ders:
if asset.id in p_vars:
p_val = pulp.value(p_vars[asset.id]) or 0.0
achieved_power += p_val
if abs(p_val) > 0.1: # Ignora setpoint trascurabili
setpoints.append({
"der_id": asset.id,
"power_kw": round(p_val, 2),
"duration_minutes": duration_minutes,
"timestamp": datetime.now(timezone.utc).isoformat()
})
# Calcolo feasibility score: 1.0 se target raggiunto, < 1.0 se parziale
if abs(target_power_kw) > 0.1:
deviation = abs(achieved_power - target_power_kw) / abs(target_power_kw)
feasibility_score = max(0.0, 1.0 - deviation)
else:
feasibility_score = 1.0
logger.info(
f"Dispatch {dispatch_id}: target={target_power_kw}kW, "
f"achieved={achieved_power:.1f}kW, score={feasibility_score:.3f}, "
f"solver={solver_status}, time={solve_time_ms:.0f}ms"
)
return OptimizationResult(
dispatch_id=dispatch_id,
vpp_id=vpp_id,
target_power_kw=target_power_kw,
achieved_power_kw=round(achieved_power, 2),
setpoints=setpoints,
feasibility_score=round(feasibility_score, 4),
timestamp=datetime.now(timezone.utc).isoformat(),
solver_status=solver_status,
solve_time_ms=round(solve_time_ms, 1)
)
def _compute_effective_limits(
self,
asset: DERAsset,
telemetry: DERTelemetry,
duration_minutes: int
) -> Tuple[float, float]:
"""
Calcola i limiti effettivi di potenza per un DER considerando:
- Limiti fisici dichiarati
- SoC corrente per storage (quanta energia residua disponibile)
- Temperatura (semplificato)
"""
p_min = asset.capabilities.min_power_kw
p_max = asset.capabilities.max_power_kw
# Vincoli aggiuntivi per storage (BESS o EV)
if asset.capabilities.capacity_kwh and telemetry.soc_pct is not None:
cap_kwh = asset.capabilities.capacity_kwh
soc = telemetry.soc_pct / 100.0
min_soc = (asset.capabilities.min_soc_pct or 10.0) / 100.0
max_soc = (asset.capabilities.max_soc_pct or 95.0) / 100.0
efficiency = asset.capabilities.roundtrip_efficiency or 0.92
duration_h = duration_minutes / 60.0
# Energia disponibile in scarica (discharge -> positivo)
energy_available_discharge_kwh = (soc - min_soc) * cap_kwh * efficiency
p_max_soc = energy_available_discharge_kwh / duration_h if duration_h > 0 else 0
p_max = min(p_max, p_max_soc)
# Energia disponibile in carica (charge -> negativo = consumo)
energy_available_charge_kwh = (max_soc - soc) * cap_kwh / efficiency
p_min_soc = -(energy_available_charge_kwh / duration_h) if duration_h > 0 else 0
p_min = max(p_min, p_min_soc)
return p_min, p_max
通信プロトコル: 標準の数珠
通信プロトコルの選択は、遅延、拡張性、コストに大きく影響します DERMS の統合。普遍的なプロトコルはありません。階層のすべてのレベルで使用されます。 特定のニーズに合わせて最適化されたプロトコル。
| プロトコル | レイヤー | 輸送 | 一般的な遅延 | スケーラビリティ | 主な使用例 |
|---|---|---|---|---|---|
| OpenADR 2.0b | プラットフォーム → サイト | HTTP/XML または JSON | 1~30秒 | 平均(ポーリング) | デマンドレスポンス、DRイベント |
| IEEE 2030.5 (SEP2) | プラットフォーム → デバイス | HTTPS/REST | 1~60秒 | 高 (スケーラブルな REST) | 太陽光発電、蓄電、家庭用EV |
| SunSpec Modbus TCP | エッジ→デバイス | TCP/モドバス | 50~500ミリ秒 | 低 (順次ポーリング) | 太陽光発電インバータ、BESS C&I |
| OCPP 2.0.1 | プラットフォーム → 充電器 | WebSocket/JSON | 100ミリ秒~5秒 | 高 (WebSocket) | EVコラム |
| MQTT | エッジ→プラットフォーム | TCP (TLS) | 10~500ミリ秒 | 非常に高い(ブローカー) | 一般的な IoT テレメトリ |
| IEC 60870-5-104 | SCADA → RTU | TCP | 50~200ミリ秒 | 平均 | 変電所、レガシー RTU |
| DNP3 | SCADA → フィールド | シリアル/TCP | 100ms-1s | 低い | 従来の SCADA ユーティリティ |
| IEC 61850 グース | 変電所 | イーサネット(マルチキャスト) | < 4ms | 高(LAN) | 保護、SE自動化 |
OpenADR 2.0b: VTN/VEN アーキテクチャ
# openadr_adapter.py - Client OpenADR 2.0b (VEN - Virtual End Node)
# Implementazione semplificata per illustrare il flusso di comunicazione
import httpx
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from typing import Optional
import asyncio
import logging
logger = logging.getLogger(__name__)
@dataclass
class DREvent:
"""Evento di Demand Response ricevuto dal VTN (utility/DERMS)"""
event_id: str
program_id: str
signal_name: str # "SIMPLE" | "ELECTRICITY_PRICE" | "LOAD_DISPATCH"
signal_type: str # "LEVEL" | "PRICE" | "X-LOAD_DISPATCH"
signal_value: float # Valore del segnale (es. livello 1/2/3 o prezzo EUR/MWh)
dtstart: str # ISO 8601 - inizio evento
duration_minutes: int
randomize_start_minutes: int = 0 # Randomizzazione per evitare picchi sincroni
class OpenADRVENClient:
"""
Virtual End Node (VEN): rappresenta un sito/aggregatore che riceve
eventi DR dal Virtual Top Node (VTN) del DERMS o della utility.
Flusso tipico OpenADR 2.0b in modalità PULL:
1. VEN -> VTN: oadrRequestEvent (richiede eventi disponibili)
2. VTN -> VEN: oadrDistributeEvent (lista eventi attivi)
3. VEN -> VTN: oadrCreatedEvent (conferma ricezione con optIn/optOut)
4. VEN -> VTN: oadrUpdateReport (report su energia effettivamente modificata)
"""
def __init__(self, vtn_url: str, ven_id: str, ven_name: str):
self.vtn_url = vtn_url.rstrip("/")
self.ven_id = ven_id
self.ven_name = ven_name
self._client = httpx.AsyncClient(timeout=30.0)
self._registered = False
self._active_events: dict = {}
async def register(self) -> bool:
"""Registrazione del VEN sul VTN - obbligatoria prima di ricevere eventi."""
payload = self._build_register_payload()
try:
response = await self._client.post(
f"{self.vtn_url}/OpenADR2/Simple/2.0b/EiRegisterParty",
content=payload,
headers={"Content-Type": "application/xml"}
)
response.raise_for_status()
root = ET.fromstring(response.text)
# Parsing della risposta oadrCreatedParty
registration_id = self._extract_registration_id(root)
if registration_id:
self._registered = True
logger.info(f"VEN {self.ven_id} registrato con ID: {registration_id}")
return True
except Exception as e:
logger.error(f"Errore registrazione VEN: {e}")
return False
async def poll_events(self) -> list:
"""Polling degli eventi DR disponibili sul VTN."""
if not self._registered:
raise RuntimeError("VEN non registrato. Chiamare register() prima.")
payload = self._build_request_event_payload()
try:
response = await self._client.post(
f"{self.vtn_url}/OpenADR2/Simple/2.0b/EiEvent",
content=payload,
headers={"Content-Type": "application/xml"}
)
response.raise_for_status()
root = ET.fromstring(response.text)
events = self._parse_distribute_event(root)
logger.info(f"VEN {self.ven_id}: ricevuti {len(events)} eventi DR")
return events
except Exception as e:
logger.error(f"Errore polling eventi: {e}")
return []
async def confirm_event(self, event_id: str, opt_in: bool = True) -> bool:
"""Conferma ricezione evento e comunicazione optIn/optOut."""
status = "optIn" if opt_in else "optOut"
payload = self._build_created_event_payload(event_id, status)
try:
response = await self._client.post(
f"{self.vtn_url}/OpenADR2/Simple/2.0b/EiEvent",
content=payload,
headers={"Content-Type": "application/xml"}
)
response.raise_for_status()
logger.info(f"Evento {event_id}: {status} confermato")
return True
except Exception as e:
logger.error(f"Errore conferma evento {event_id}: {e}")
return False
async def run_polling_loop(self, poll_interval_seconds: int = 60):
"""Loop di polling continuo - eseguito come task asyncio."""
logger.info(f"Avvio polling loop VEN {self.ven_id} ogni {poll_interval_seconds}s")
while True:
events = await self.poll_events()
for event in events:
if event.event_id not in self._active_events:
self._active_events[event.event_id] = event
# OptIn automatico (in produzione: logica di accettazione business)
await self.confirm_event(event.event_id, opt_in=True)
logger.info(
f"Nuovo evento DR: {event.event_id} | "
f"Segnale: {event.signal_name}={event.signal_value} | "
f"Durata: {event.duration_minutes} min"
)
await asyncio.sleep(poll_interval_seconds)
def _build_register_payload(self) -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<oadrPayload>
<oadrSignedObject>
<oadrRegisterReport specificationID="TELEMETRY_STATUS">
<ei:venID>{self.ven_id}</ei:venID>
</oadrRegisterReport>
</oadrSignedObject>
</oadrPayload>"""
def _build_request_event_payload(self) -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<oadrPayload>
<oadrSignedObject>
<oadrRequestEvent>
<ei:eiRequestEvent>
<ei:venID>{self.ven_id}</ei:venID>
<ei:replyLimit>10</ei:replyLimit>
</ei:eiRequestEvent>
</oadrRequestEvent>
</oadrSignedObject>
</oadrPayload>"""
def _build_created_event_payload(self, event_id: str, status: str) -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<oadrPayload>
<oadrSignedObject>
<oadrCreatedEvent>
<ei:eiCreatedEvent>
<ei:venID>{self.ven_id}</ei:venID>
<ei:eventResponses>
<ei:eventResponse>
<ei:responseCode>200</ei:responseCode>
<ei:requestID>{event_id}</ei:requestID>
<ei:qualifiedEventID>
<ei:eventID>{event_id}</ei:eventID>
<ei:modificationNumber>0</ei:modificationNumber>
</ei:qualifiedEventID>
<ei:optType>{status}</ei:optType>
</ei:eventResponse>
</ei:eventResponses>
</ei:eiCreatedEvent>
</oadrCreatedEvent>
</oadrSignedObject>
</oadrPayload>"""
def _parse_distribute_event(self, root: ET.Element) -> list:
"""Parser semplificato per oadrDistributeEvent."""
events = []
# In produzione: parsing completo con namespace XML OpenADR
# Qui simuliamo la struttura per chiarezza
return events
def _extract_registration_id(self, root: ET.Element) -> Optional[str]:
"""Estrae l'ID di registrazione dalla risposta VTN."""
return "reg-001" # Semplificato
スケーラビリティ: 1,000 ~ 1,000,000 DER
スケーラビリティは、最新の DERMS の最も重要な技術的課題です。 1,000 の DER を手の届く範囲で管理 適切に設計されたシステム。 1,000,000 の DER を 1 秒未満のディスパッチ遅延で管理 業界の大容量システムからインスピレーションを得た、根本的に異なるアーキテクチャが必要です 金融とソーシャルメディア。
スケーラビリティの数値
テレメトリー負荷が完全に動作可能
- 1,000DER: ~60,000 メッセージ/時間 (60 秒ごとにポーリング) - 単一のマイクロサービスで管理可能
- 100,000DER: ~6,000,000 メッセージ/時間 (100,000 メッセージ/分) - シャーディングと Kafka が必要
- 1,000,000 DER: ~60,000,000 メッセージ/時間 - 専用のイベント ストリーミング アーキテクチャ
メッセージあたりの平均ペイロードが 200 バイトの場合、100 万の DER が約 3.3 GB/時間の生のテレメトリ、圧縮前 (通常は 300 ~ 400 MB/時間になります)。
高スケーラビリティ DERMS のための Kafka アーキテクチャ
# kafka_derms_config.py - Configurazione Kafka per DERMS scalabile
from confluent_kafka import Producer, Consumer, KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
import json
import logging
from dataclasses import asdict
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
# === TOPIC DESIGN ===
# Strategia: topic separati per tipo di dato, partitionati per DER ID
# La key del messaggio = der_id garantisce che tutti i messaggi dello stesso
# DER vadano alla stessa partizione (ordering garantito per dispositivo)
KAFKA_TOPICS = {
# Telemetria (alta frequenza, alta velocità)
"der.telemetry.raw": {
"partitions": 48, # 48 partizioni per parallelismo elevato
"replication_factor": 3,
"retention_ms": 86400000, # 24 ore (poi su time-series DB)
"compression_type": "lz4", # LZ4 per compressione veloce
"config": {"cleanup.policy": "delete"}
},
# Comandi di dispatch (bassa frequenza, alta affidabilità)
"der.dispatch.commands": {
"partitions": 12,
"replication_factor": 3,
"retention_ms": 604800000, # 7 giorni
"compression_type": "gzip",
"config": {"cleanup.policy": "delete", "min.insync.replicas": "2"}
},
# Conferme dispatch (ack dai dispositivi)
"der.dispatch.acks": {
"partitions": 12,
"replication_factor": 3,
"retention_ms": 604800000,
"config": {"cleanup.policy": "delete"}
},
# Aggregati VPP (output dell'aggregation service)
"vpp.portfolio.snapshots": {
"partitions": 4,
"replication_factor": 3,
"retention_ms": 2592000000, # 30 giorni
"config": {"cleanup.policy": "compact"} # Log compaction: mantieni ultima snapshot
},
# Alert e fault detection
"der.alerts": {
"partitions": 6,
"replication_factor": 3,
"retention_ms": 2592000000,
"config": {"cleanup.policy": "delete"}
}
}
class DERMSTelemetryProducer:
"""
Producer Kafka per la telemetria DER.
Usato dai Gateway Edge per pubblicare telemetria verso il cloud.
"""
def __init__(self, bootstrap_servers: str):
self._producer = Producer({
"bootstrap.servers": bootstrap_servers,
"client.id": "derms-telemetry-producer",
# Affidabilità: ACK da tutti i broker in-sync
"acks": "all",
# Performance: batching aggressivo per throughput
"linger.ms": 20,
"batch.size": 65536,
"compression.type": "lz4",
# Retry per fault tolerance
"retries": 5,
"retry.backoff.ms": 200,
"enable.idempotence": True # Exactly-once semantics
})
def publish_telemetry(self, der_id: str, telemetry: dict) -> None:
"""
Pubblica telemetria su Kafka.
La key = der_id garantisce ordering per dispositivo.
"""
payload = json.dumps({
**telemetry,
"_published_at": datetime.now(timezone.utc).isoformat()
}).encode("utf-8")
self._producer.produce(
topic="der.telemetry.raw",
key=der_id.encode("utf-8"),
value=payload,
on_delivery=self._delivery_callback
)
self._producer.poll(0) # Non-blocking flush
def flush(self, timeout: float = 10.0) -> None:
"""Flush dei messaggi in coda prima dello shutdown."""
pending = self._producer.flush(timeout=timeout)
if pending > 0:
logger.warning(f"{pending} messaggi non ancora consegnati dopo flush")
def _delivery_callback(self, err, msg):
if err:
logger.error(f"Errore consegna messaggio: {err}")
else:
logger.debug(f"Messaggio consegnato: topic={msg.topic()}, partition={msg.partition()}")
class DERMSDispatchConsumer:
"""
Consumer Kafka per i comandi di dispatch.
Ogni Site Aggregator consuma dal topic dispatch.commands
i setpoint relativi ai propri DER.
"""
def __init__(self, bootstrap_servers: str, group_id: str, site_id: str):
self.site_id = site_id
self._consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"client.id": f"site-aggregator-{site_id}",
"auto.offset.reset": "latest", # Solo messaggi recenti (no backlog storico)
"enable.auto.commit": False, # Commit manuale dopo elaborazione
"max.poll.interval.ms": 30000,
"session.timeout.ms": 10000
})
self._consumer.subscribe(["der.dispatch.commands"])
def process_commands(self, timeout_seconds: float = 1.0):
"""Poll e processa comandi di dispatch per questo sito."""
msg = self._consumer.poll(timeout=timeout_seconds)
if msg is None:
return None
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
return None
logger.error(f"Errore consumer: {msg.error()}")
return None
command = json.loads(msg.value().decode("utf-8"))
# Filtra solo i comandi per i DER di questo sito
if command.get("site_id") == self.site_id:
logger.info(
f"Sito {self.site_id}: ricevuto dispatch "
f"der={command['der_id']} power={command['power_kw']}kW"
)
# Commit esplicito dopo elaborazione riuscita (at-least-once)
self._consumer.commit(msg)
return command
return None
DERMS での CQRS とイベント ソーシング
拡張性の高い DERMS では、CQRS パターンは以下を分離します。
コマンド側: Kafka でコマンドをディスパッチします (不変、追加専用)
- 送信された各設定値はログ内の永続的なイベントになります。
クエリ側: Redis で実体化されたプロジェクション (現在の状態キャッシュ)
各 DER の)および InfluxDB/TimescaleDB(予測と M&V の時系列)。
このパターンを使用すると、バグが発生した場合にシステム状態を最初から再計算できます。
イベント ログ (イベント ソーシング) を再度読み取るだけで、投影が行われます。
デマンドレスポンス:理論と実装
デマンドレスポンス (DR) は、信号に応じて電力消費を変更する機能です。 ネットワークまたは市場の。これは VPP が提供できる最も収益性の高いサービスの 1 つであり、現時点では それ自体は、測定要件により正しく実装するのが最も複雑なものの 1 つです および検証 (M&V)。
DR プログラムの種類
| DRプログラム | 信号 | リードタイム | 通常の期間 | 一般的な報酬(IT) |
|---|---|---|---|---|
| ファストリザーブ(FR) | グリッド周波数 (自動) | < 1 秒 | 15分 | ~20-30 ユーロ/MW/時間 |
| セカンダリ リザーブ (RS) | AGCテルナ信号 | Secondi | 15分~数時間 | ~15-25 ユーロ/MW/時間 |
| 三次リザーブ (RT) | Terna 明示的ディスパッチ | 15分 | 1~4時間 | ~5-15 ユーロ/MWh |
| 残高 (MB) | リアルタイム市場でのオファー | 5~30分 | 15分~数時間 | 市場価格(変動) |
| DR の中断可能性 | オペレーターコール | 15~30分 | 1~4時間 | ~30,000~50,000 ユーロ/MW/年 |
| ARERA 解像度 300/2017 | GSE シグナル (CER インセンティブ) | Minuti | 変数 | GSEインセンティブプレミアム |
ベースライン計算と M&V
# mv_service.py - Measurement & Verification per Demand Response
# Calcola la riduzione effettiva di carico rispetto alla baseline
import numpy as np
from typing import List, Tuple
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class MVService:
"""
Measurement & Verification (M&V) per programmi Demand Response.
Metodo: CBL (Customer Baseline Load) - approccio standard FERC/ENTSO-E
La baseline e calcolata come media degli N giorni simili più recenti
prima dell'evento, escludendo giorni con altri eventi DR.
"""
def __init__(self, n_baseline_days: int = 10, exclude_top_bottom: bool = True):
self.n_baseline_days = n_baseline_days
self.exclude_top_bottom = exclude_top_bottom # High-5 / Low-5 exclusion
def calculate_baseline(
self,
site_id: str,
event_date: datetime,
event_hour: int,
historical_consumption: dict # {date_str: {hour: kw}}
) -> dict:
"""
Calcola la Customer Baseline Load (CBL) per il sito.
Algoritmo CBL con High-5/Low-5 exclusion (CAISO/PJM standard):
1. Seleziona N giorni simili recenti (stessa tipologia giorno: lavorativo/festivo)
2. Esclude il top 20% e il bottom 20% dei giorni per consumo nell'ora evento
3. Media i restanti giorni
"""
target_weekday = event_date.weekday() # 0=Lunedi, 6=Domenica
is_target_workday = target_weekday < 5 # Lavorativo vs weekend
# Raccolta dati storici compatibili
similar_days = []
check_date = event_date - timedelta(days=1)
while len(similar_days) < self.n_baseline_days and check_date > event_date - timedelta(days=60):
date_str = check_date.strftime("%Y-%m-%d")
is_check_workday = check_date.weekday() < 5
# Stesso tipo di giorno (lavorativo/festivo)
if is_check_workday == is_target_workday and date_str in historical_consumption:
day_data = historical_consumption[date_str]
if event_hour in day_data:
similar_days.append({
"date": date_str,
"consumption_kw": day_data[event_hour]
})
check_date -= timedelta(days=1)
if len(similar_days) < 3:
logger.warning(f"Dati insufficienti per baseline sito {site_id}: solo {len(similar_days)} giorni")
return {"baseline_kw": None, "method": "insufficient_data", "days_used": len(similar_days)}
consumptions = [d["consumption_kw"] for d in similar_days]
# High-5/Low-5 exclusion (standard CAISO)
if self.exclude_top_bottom and len(consumptions) >= 10:
n_exclude = max(1, len(consumptions) // 5) # 20% per lato
sorted_consumptions = sorted(consumptions)
filtered_consumptions = sorted_consumptions[n_exclude:-n_exclude]
else:
filtered_consumptions = consumptions
baseline_kw = np.mean(filtered_consumptions)
return {
"site_id": site_id,
"baseline_kw": round(baseline_kw, 2),
"method": "CBL_HighLow_Exclusion",
"days_analyzed": len(similar_days),
"days_used": len(filtered_consumptions),
"std_dev_kw": round(np.std(filtered_consumptions), 2)
}
def calculate_demand_reduction(
self,
site_id: str,
baseline_kw: float,
actual_consumption_kw: float,
event_duration_hours: float
) -> dict:
"""
Calcola la riduzione di carico e l'energia risparmiata.
Risultato usato per il settlement del programma DR.
"""
reduction_kw = max(0, baseline_kw - actual_consumption_kw)
reduction_pct = (reduction_kw / baseline_kw * 100) if baseline_kw > 0 else 0
energy_reduced_kwh = reduction_kw * event_duration_hours
return {
"site_id": site_id,
"baseline_kw": baseline_kw,
"actual_kw": actual_consumption_kw,
"reduction_kw": round(reduction_kw, 2),
"reduction_pct": round(reduction_pct, 1),
"energy_reduced_kwh": round(energy_reduced_kwh, 3),
"verified": reduction_pct >= 5.0 # Soglia minima per settlement
}
イタリアの背景: CER、GSE、および補助サービス市場
イタリアは、DERMS および VPP にとってヨーロッパで最も興味深い市場の 1 つです。 急速に進化する規制環境とヨーロッパで最も多くの太陽光発電設置拠点の一つ (2024年末に約37GWの太陽光発電が設置され、2030年までに80GWを目標)。
再生可能エネルギーコミュニティ (CER)
MASE 法令 n. 2025 年 6 月 25 日に発行された 2025 年 5 月 16 日の第 127 号では、重要な事項が紹介されています。 CER 向けのニュース。住民数 50,000 人までの自治体に奨励金を拡大。 CER が表すのは、 DER の分散アグリゲーションを行うイタリアの研究所:
CER インセンティブ構造 (ポスト立法政令 199/2021 および政令 2025)
- インセンティブ率: 共有エネルギーで認識され、地理的エリアとシステムの規模によって異なります。 200 kWp 未満のシステムの場合: 80 ~ 110 ユーロ/MWh (中北部)、90 ~ 120 ユーロ/MWh (南部および諸島)
- アレラ料金: 自家消費エネルギーの評価額 ~8 EUR/MWh (料金要素の償還)
- 間隔:運用開始から20年
- 許容電力: 単一プラントで最大 1 MW (同じ CER 内に複数のプラントの可能性)
- 地理的要件: 消費ポイントは同じ一次変電所の下に接続されている必要があります
イタリアの補助サービス市場 (MSD/MGP)
イタリアの TSO である Terna は、VPP が柔軟性を提供できる市場を管理しています。 2025年には、 UVAM (混合仮想ユニット) と UVAC (仮想ユニット) の漸進的な導入のおかげで 消費が有効になっている場合)、集約された分散リソースも MSD に参加できます。
| 市場 | 頭字語 | 類型学 | 地平線 | VPP/UVAM アクセス |
|---|---|---|---|---|
| マーケット前日 | MGP | 前日のエネルギー | D-1 午前9時 | はい (BSP 経由) |
| 日中市場 | MI | 日中のエネルギー | 6 セッションで D-0 | はい (BSP 経由) |
| 事前補助サービス市場 | MSD 事前 | リザーブ、バランス調整 | D-1~D-0 | はい (UVAM が有効) |
| バランシングマーケット | MB | リアルタイムバランシング | リアルタイムのD-0 | はい (待ち時間が 15 分未満の UVAM) |
| ファーストリザーブ | FR | 超高速リザーブ | 自動 | レイテンシが 1 秒未満の BESS のみ |
| MACSE (ストレージ容量調達メカニズム) | マックセ | ストレージ容量 | 15年 | グリッドスケールストレージのみ |
2025 年の MACSE
2025 年 9 月に開催された最初の MACSE オークションでは、この地域で 10 GWh の容量が落札されました。 中部、南部、諸島では 15 年間の料金契約があり、平均価格は約 13,000 ユーロ/MWh/年です。 このメカニズムは、大規模なグリッド規模の BESS システムの安定した収益を保証しますが、 小規模な集約 VPP からアクセス可能。住宅用/商業用 VPP の場合、ルート メインは UVAM 経由で MSD のままです。
PNRR と柔軟性への投資
PNRR Transition 5.0 では、エネルギーのデジタル化と エネルギーコミュニティ。 CER対策に加え、 配電ネットワークの近代化 (2G スマート メーター、変電所自動化) これらは、新世代の DERMS を可能にするインフラストラクチャを構成します。
ケーススタディ: イタリア地域 VPP (5,000 FV + 500 BESS)
イタリアの地域 VPP の設計とサイジングの具体的な事例を分析します。 2025 年のイタリア市場の現実的なパラメーターに基づいています。
参照シナリオ
VPP ポートフォリオ「SunFlex プーリア」
- 住宅用PV:4,500システム、中型5kWp、合計22.5MWp
- FV C&I: 500 システム、平均サイズ 80 kWp、合計 40 MWp
- 住宅用BESS:450システム、中型10kWh/5kW、合計4.5MWh/2.25MW
- ベス C&I:50システム、中型500kWh/250kW、合計25MWh/12.5MW
- 総容量: 62.5 MWp PV + 29.5 MWh/14.75 MW BESS
- 地理的領域: プーリア州 (南部地域 - 高日射量、高太陽光発電普及率)
VPP の収益源
| 収益の流れ | 市場 | 電力/エネルギー | 推定収益 | 注意事項 |
|---|---|---|---|---|
| 太陽光発電の販売 | MGP前日 | 62.5 MW (ピーク)、~1,750 eq.時間/年 | ~520万ユーロ/年 | 平均スポット価格で ~48 ユーロ/MWh |
| ファーストリザーブ BESS C&I | FRテルナ | 12.5MW (24時間365日) | ~270万ユーロ/年 | ~25 ユーロ/MW/時間 x 8,760 時間 |
| MSDバランシング | MSD/MB | 5MW相当(BESS+DR) | ~080万ユーロ/年 | 入札に応じた支払い、高い変動性 |
| FV 削減 (補助的) | MSD 事前 | 20MWの削減が可能 | ~040万ユーロ/年 | 軽減強化 |
| CER インセンティブ (1 MW の CER 5 個) | GSE - CER | CER 構成で 5 MWp | ~0.600万ユーロ/年 | 南部奨励料金: ~120 ユーロ/MWh |
| 合計 | ~970万ユーロ/年 | プラットフォームとネットワークのコストを除く総コスト |
プラットフォーム技術スタック
# docker-compose.yml - Stack DERMS per VPP regionale
# Configurazione di sviluppo/staging (produzione su Kubernetes)
version: "3.9"
services:
# ========================
# INGESTION LAYER
# ========================
# Broker MQTT per telemetria edge
mosquitto:
image: eclipse-mosquitto:2.0
ports:
- "1883:1883" # MQTT non sicuro (solo LAN interna)
- "8883:8883" # MQTT over TLS (produzione)
volumes:
- ./config/mosquitto.conf:/mosquitto/config/mosquitto.conf
- mosquitto-data:/mosquitto/data
# Apache Kafka (broker eventi principale)
kafka:
image: confluentinc/cp-kafka:7.6.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_NUM_PARTITIONS: 12
KAFKA_DEFAULT_REPLICATION_FACTOR: 1 # 3 in produzione
KAFKA_LOG_RETENTION_HOURS: 168 # 7 giorni
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
# ========================
# PROCESSING LAYER
# ========================
# DERMS Aggregation Service (FastAPI)
aggregation-service:
build: ./services/aggregation
ports:
- "8080:8080"
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
REDIS_URL: redis://redis:6379/0
INFLUXDB_URL: http://influxdb:8086
INFLUXDB_TOKEN: {{INFLUXDB_TOKEN}}
INFLUXDB_ORG: sunflex-puglia
INFLUXDB_BUCKET: der-telemetry
depends_on:
- kafka
- redis
- influxdb
# Dispatch Optimizer Service
dispatch-optimizer:
build: ./services/dispatch
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
REDIS_URL: redis://redis:6379/0
SOLVER: COIN_CMD # o CPLEX in produzione per portfolio grandi
depends_on:
- kafka
- redis
# Forecasting Service (ML - produzione FV + carico)
forecasting-service:
build: ./services/forecasting
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
INFLUXDB_URL: http://influxdb:8086
MODEL_REGISTRY_URL: http://mlflow:5000
WEATHER_API_KEY: {{OPENMETEO_API_KEY}}
depends_on:
- kafka
- influxdb
- mlflow
# OpenADR VTN (Virtual Top Node - invia eventi DR ai siti)
openadr-vtn:
build: ./services/openadr-vtn
ports:
- "8081:8081"
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
POSTGRES_URL: postgresql://postgres:5432/derms
depends_on:
- kafka
- postgres
# ========================
# STORAGE LAYER
# ========================
# Time-Series Database per telemetria
influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
volumes:
- influxdb-data:/var/lib/influxdb2
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_ORG: sunflex-puglia
DOCKER_INFLUXDB_INIT_BUCKET: der-telemetry
DOCKER_INFLUXDB_INIT_RETENTION: 30d
# Cache per stato corrente DER (Device Shadow)
redis:
image: redis:7.2-alpine
command: redis-server --maxmemory 4gb --maxmemory-policy allkeys-lru
ports:
- "6379:6379"
# Database relazionale per asset registry, events, settlement
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: derms
POSTGRES_USER: derms_user
POSTGRES_PASSWORD: {{POSTGRES_PASSWORD}}
volumes:
- postgres-data:/var/lib/postgresql/data
# ========================
# OBSERVABILITY
# ========================
# MLflow per tracking modelli forecasting
mlflow:
image: ghcr.io/mlflow/mlflow:v2.11.0
ports:
- "5000:5000"
command: mlflow server --host 0.0.0.0 --backend-store-uri postgresql://mlflow:5432/mlflow
# Grafana per dashboard operativo
grafana:
image: grafana/grafana:10.4.0
ports:
- "3000:3000"
volumes:
- grafana-data:/var/lib/grafana
- ./config/grafana/dashboards:/etc/grafana/provisioning/dashboards
volumes:
influxdb-data:
postgres-data:
redis-data:
grafana-data:
mosquitto-data:
VPP の運用 KPI と SLA
| KPI | ターゲット | 測定 | 尊重されない場合の影響 |
|---|---|---|---|
| ディスパッチ遅延 (95 パーセンタイル) | < 500 ミリ秒 | コマンドからデバイスの設定値までの時間 | ファスト リザーブ テルナの対象外です |
| テレメトリーの鮮度 | 60 秒未満 | ディスパッチのテレメトリ データの最大保存期間 | 古いデータに基づいた最適化 |
| DER の可用性 (オンライン料金) | > 95% | いつでもDERパーセンテージに到達可能 | VPP 容量の削減、市場ペナルティ |
| 派遣精度 | > 90% の実現可能性スコア | 派遣目標からの平均偏差 | MSD/MB市場における罰則 |
| 予報MAPE(1時間先のFV) | < 8% | 平均絶対パーセント誤差 | 市場における不均衡損失 |
| プラットフォームの稼働時間 | > 99.5% | DERMS クラウドの可用性 | 柔軟性の義務の喪失 |
| サイバーセキュリティ - MTTR | 4時間未満 | セキュリティインシデントへの平均応答時間 | NIS2 要件 (2024 年 10 月発効) |
DERMS のベスト プラクティスとアンチパターン
ベストプラクティス
1. デバイスシャドウパターン
各 DER の更新された「シャドウ」を常にキャッシュ (Redis) に保持します。最新の状態が含まれます。 既知のデバイス (SoC、現在の電力、ステータス) とタイムスタンプ。発送は待たされません リアルタイム テレメトリ: 非同期的に更新されるシャドウを使用します。これにより、 ディスパッチ待ち時間は数秒からミリ秒です。
2. レベルの正常な低下
クリア操作レベルを定義します: NORMAL (クラウド + エッジ)、DEGRADED (エッジのみ、最適化) 簡素化されたローカル)、MINIMAL (安全保護のみ - 経済的な派遣なし)。システム どのレベルにあるかを常に把握し、それを通知を通じて市場に伝える必要があります。 更新された可用性。
3. ディスパッチコマンドのべき等性
各ディスパッチ コマンドには一意の ID が必要です。デバイスアダプターは実装する必要があります 冪等性: 同じコマンドを 2 回 (再試行のため) 受信しても、2 回のディスパッチが発生してはなりません。 重複排除のために TTL を使用した最近のコマンドのログを使用します。
4. スケジューリングとリアルタイムの分離
最適化モデルを使用したスケジューリング (市場オファーの 24 時間前計画) 複雑で遅い (解決時間は数分の MILP)。リアルタイムディスパッチは簡素化された LP を使用します ミリ秒以内に解決されます。同じプロセス内でこれら 2 つのパスを決して混合しないでください。
避けるべきアンチパターン
アンチパターン 1: カスケード同期ポーリング
第一世代の DERMS で最も一般的なパターン: 中央サーバーが各 DER をポーリングする 順番に。 1,000 DER とデバイスごとのタイムアウトが 10 秒の場合、ポーリング サイクルは完了します 持続時間は 10,000 秒 (ほぼ 3 時間!)。解決策: 接続プール + イベント駆動による並列ポーリング プッシュをサポートするデバイスの場合。
アンチパターン 2: SoC の制約のない最適化
バッテリーの SoC 制約を無視した積極的なディスパッチにより、充放電サイクルが発生する 深いものはセルパックを急速に劣化させます(数か月で耐用年数が 30 ~ 50% 減少します)。 すべてのオプティマイザは、常に SoC 制約をソフト制約ではなくハード制約として含める必要があります。
アンチパターン 3: テレメトリ用のリレーショナル データベース
PostgreSQL または MySQL を使用して 1 秒あたり数百万のテレメトリ ポイントを保存すると、次のような問題が発生します。 すぐにパフォーマンスの問題が発生し、持続不可能なストレージコストが発生します。時系列データベース (InfluxDB、TimescaleDB、QuestDB) リレーショナル データベースと比較してデータを 10 ~ 50 倍圧縮 最適化された一時クエリ (ウィンドウ関数、自動ダウンサンプリング) をサポートします。
アンチパターン 4: ネットワーク トポロジを無視する
配信ネットワークの物理トポロジを考慮せずに DER を集約すると、次のような問題が発生する可能性があります。 VPP のディスパッチが局所的な輻輳、電圧違反、または 変圧器の過負荷。次に必要なステップは、DSO の ADMS との統合です。 成熟したDERMS向け。欧州の ATTEST プロジェクト (2021 ~ 2024 年) は、この統合の標準を定義しました。
DERMS のセキュリティとコンプライアンス
DERMS は重要なインフラを制御:サイバー攻撃が成功すると停電を引き起こす可能性がある ネットワークを特定したり、ネットワークを不安定にしたりします。 NIS2 指令 (イタリアでは立法 138/2024 により施行、 2024 年 10 月発効)は、DERMS を「必須サービス事業者」として分類し、次の条件を満たします。 厳格なサイバーセキュリティ義務。
DERMS の主要なセキュリティ要件
- デバイス認証: 各 DER は X.509 (PKI) 証明書を使用して認証します - 静的資格情報の共有はありません
- 転送中の暗号化: レガシー プロトコル (MQTT over TLS、OpenADR/IEEE 2030.5 の HTTPS) を含む、すべての通信に TLS 1.3 が必須
- ゼロトラストネットワーク: デバイスなし、デフォルトで「信頼済み」 - すべてのリクエストが認証および許可されます。
- 粒度の高い RBAC: DER オペレーター、アグリゲーター、所有者は厳密に区別された権限を持っています
- 不変の監査ログ: 各ディスパッチコマンドはデジタル署名付きで記録されます (決済のために否認不可能)
- SIEM統合: テレメトリ パターンとコマンドのリアルタイム異常検出
- MTTR < 4 時間: インシデント対応のための NIS2 要件
- ネットワークのセグメンテーション: OT (フィールドプロトコル) と IT (クラウド DERMS) 間の物理的/論理的分離
結論と次のステップ
最新の DERMS のアーキテクチャは、ソフトウェア エンジニアリングにおける最も複雑な課題の 1 つです。 エネルギー分野: 異なる数十年に開発された異種プロトコルの統合が必要 (1979 年の Modbus、2009 年の OpenADR、2022 年の ISO 15118)、数千から数百万までのスケール 分散デバイスは 1 秒未満の遅延を維持し、規制環境で動作します 常に進化しています。
覚えておくべき重要なポイント:
- DERMS は SCADA ではありません。DERMS はメーターの境界を越えてエンド顧客のドメイン内で動作し、そこから派生するすべての法的および同意管理への影響を伴います。
- マルチレイヤー アーキテクチャ (フィールド / エッジ / プラットフォーム / マーケット) と義務的な責任の分離 - オプションではありません
- Kafka + CQRS + イベント ソーシング、および 100,000 DER を超えてスケールするデファクト スタック - リレーショナル データベース アーキテクチャは持ちこたえられない
- LP/MILP によるディスパッチの最適化は数学的に明確に定義されていますが、ハード制約などの物理的制約 (SoC、ランプレート) を常に考慮する必要があります。
- イタリアの状況 (CER、UVAM、MACSE、NIS2) は急速に進化しています。競争上の優位性は、規制スキルと技術スキルを組み合わせることで構築されます。
EnergyTech シリーズの次の記事では、 バッテリー管理システム (BMS): 充電状態 (SoC) 推定からのストレージ システム (BESS) の制御アルゴリズム 熱保護とセルバランスのためのカルマンフィルター付き。必読の書 VPP に統合されたストレージ システムを使用するすべてのユーザー。
リソースと洞察
- OpenADR 2.0bの仕様: openadr.org (登録後無料でダウンロードできます)
- IEEE 2030.5-2023: IEEE Xplore (有料)、無料の概要 スマートグリッド.ieee.org
- CACER/CER 運用ルール: gse.it (付録 1、2025 年 7 月)
- Terna UVAM ドキュメント: テルナイット、派遣課
- PuLP (Python LP ソルバー): Coin-or.github.io/pulp
- Confluent Kafka Python クライアント: github.com/confluentinc/confluent-kafka-python
- FERC Order 2222 (DER アグリゲーションの米国ベンチマーク): ferc.gov
- ATTEST EU プロジェクト (DSO-TSO 調整): attest-project.eu
関連記事
- MLOpsシリーズ: 実稼働環境での PV 予測モデルの展開用 DERMS に統合 - このブログの MLOps シリーズを参照してください
- AIエンジニアリング / RAGシリーズ: オペレーターへの運用支援のための LLM VPP (市場データに関する自然言語クエリ、インテリジェントなアラート)
- PostgreSQL AIシリーズ: 消費パターンの類似性検索用 pgvector 履歴 (CBL と予測に役立ちます)







