OCPP 2.x: エンタープライズ EV 充電システムの構築
世界の電気自動車充電インフラ市場が追いついてきた 2025 年には 402 億 2,000 万ドル 2033 年までは 25% の CAGR で成長すると予想されます。 グランドビューリサーチによると。欧州ではAFIR(代替燃料) インフラ規制)により拘束力のある期限が課されます:TEN-T ネットワークでは 60 km ごと 2025 年末までに少なくとも 150 kW の発電所がなければなりません。イタリアでは、 超えて 2025 年 12 月 31 日までに 73,000 の公共充電ポイント そしてPNRR ハイパワー充電、この分野、および分野に7億ユーロ以上を割り当てています。 完全な爆発。
このインフラストラクチャの中心となるのは、オープンチャージポイントプロトコル(OCPP)、 Open Charge Alliance (OCA) によって開発されたオープン標準であり、その方法を定義します。 充電ステーション(Charging Station)は集中管理システムと通信します (CSMS - 充電ステーション管理システム)。 250 以上の組織で採用されています。 40 か国以上で、OCPP は業界の事実上の標準となっており、 異なるメーカーのハードウェアと異なる管理ソフトウェア間の相互運用性。
この高度な技術記事では、次のことを検討します。 OCPP 2.0.1 および 2.1 で 深さ: WebSocket アーキテクチャ、メッセージ構造、デバイス モデル、プロファイル セキュリティ、スマート充電、およびスケーラブルな運用対応 CSMS バックエンドの構築方法 10 ~ 100,000 ステーション、TypeScript と Python の完全なコード例が含まれています。
何を学ぶか
- 1.2 から 2.1 への OCPP プロトコルの進化と基本的なアーキテクチャの違い
- JSON メッセージ構造 (CALL、CALLRESULT、CALLERROR) と WebSocket トランスポート
- OCPP 2.0.1 の 16 の機能ブロックと階層型デバイス モデル
- 3 つのセキュリティ プロファイル: 基本認証、TLS、X.509 証明書を使用した mTLS
- 高度なスマート充電: SetChargingProfile、負荷管理、ピークシェービング、ソーラー統合
- asyncio と PostgreSQL を使用した Python での CSMS バックエンド実装
- ISO 15118 およびプラグアンドチャージ: PKI、eMAID、双方向 V2G
- スケーラブルなアーキテクチャ: WebSocket クラスタリングと Kafka による 10 ~ 100,000 のチャージ ポイント
- モニタリング、Grafana ダッシュボード、主要な運用指標
- AFIR、PNIRE 規制と充電インフラに対するイタリアのインセンティブ
EnergyTech シリーズ: デジタル エネルギーに関する 10 の記事
この記事は、次のことに特化したシリーズの最初の記事ですエナジーテック: プロトコル、 電力管理に革命をもたらすアーキテクチャとソフトウェア、 EV充電からスマートグリッド、BESSシステムからAIによるエネルギー最適化まで。
| # | アイテム | テクノロジー | レベル |
|---|---|---|---|
| 1 | OCPP 2.x プロトコル: EV 充電システムの構築 (ここにいます) | OCPP、WebSocket、Python、ISO 15118 | 高度な |
| 2 | スマート グリッドと OpenADR: デマンド レスポンスとエネルギーの柔軟性 | OpenADR、IEEE 2030.5、REST、MQTT | 高度な |
| 3 | BESS (バッテリー エネルギー ストレージ): 最適化アルゴリズムと BMS | Python、LP最適化、CANバス、Modbus | 高度な |
| 4 | Kafka と機械学習を使用した電気ネットワークのデジタル ツイン | Kafka、InfluxDB、Grafana、ML、Python | 高度な |
| 5 | 重要なインフラストラクチャ向けの SCADA と ICS: セキュリティとプロトコル | Modbus、DNP3、IEC 61850、OPC UA | 高度な |
| 6 | AI によるエネルギー最適化: 消費量予測と需要予測 | TensorFlow、LSTM、Prophet、FastAPI | 高度な |
| 7 | 仮想発電所: Python と REST API を使用した集約 DER | DER、DERMS、REST、Python、PostgreSQL | 中級 |
| 8 | エネルギー市場とアルゴリズム取引: EPEX SPOT と API | Python、API取引、時系列 | 高度な |
| 9 | 炭素会計ソフトウェア: スコープ 1、2、3 および GHG 報告 | Python、GHG プロトコル、API、レポート | 中級 |
| 10 | マイクログリッドとエネルギーアイランド: 回復力のあるアーキテクチャ | マイクログリッド、EMS、エッジコンピューティング、IoT | 高度な |
プロトコルの進化: OCPP 1.2 から 2.1 へ
アーキテクチャ上の選択を理解するには、OCPP の進化を理解することが重要です バージョン 2.0.1 を導入し、レガシー システムからの移行を計画します。プロトコルが誕生しました 2010 年に次の問題を解決するために相互運用性:各メーカー のステーションには独自のプロトコルがあり、管理が不可能でした マルチベンダー。
| バージョン | Anno | 輸送 | 主な特長 | 導入 |
|---|---|---|---|---|
| OCPP 1.2 | 2010年 | SOAP/XML | 最初の公開バージョン、基本操作: ブート、認証、開始/停止 | 廃止されました |
| OCPP 1.5 | 2012年 | SOAP/XML | 予約、スマート充電ベース、データ転送、リセット | 遺産 |
| OCPP 1.6 | 2015年 | SOAP + JSON/WS | WebSocket、ロードプロファイル、トリガーメッセージ、ローカル認証リスト | 非常に広く普及している |
| OCPP 2.0 | 2018年 | JSON/WS | デバイスモデル、機能ブロック、ISO 15118 ベース (2.0.1 に置き換えられました) | レア |
| OCPP 2.0.1 | 2020年 | JSON/WSのみ | 16 の機能ブロック、デバイス モデル、3 つのセキュリティ プロファイル、高度なスマート充電 | 現在の規格 |
| OCPP 2.1 | 2025年 | JSON/WSのみ | 下位互換性 2.0.1、V2G ISO 15118-20、ネイティブ充電、バッテリー交換 | 新興 |
OCPP 1.6 と 2.0.1 の基本的な違い
OCPP 2.0.1 は単純な増分アップデートではありません。 書き換え 完全な建築 これにより、用語、メッセージ構造、および 概念モデル。この非互換性は「設計上」必要なものでした。 OCPP 1.6 の構造的制限を克服します。
| 待ってます | OCPP 1.6 | OCPP 2.0.1 |
|---|---|---|
| サーバー用語 | 中央システム | CSMS (充電ステーション管理システム) |
| クライアントの用語 | チャージポイント | 充電ステーション |
| 充電ユニット | コネクタ | EVSE(電気自動車供給装置) |
| トランザクション | StartTransaction / StopTransaction | 統合されたトランザクションイベント (開始/更新/終了) |
| 構成 | 固定キー (ChangeConfiguration) | 階層型デバイス モデル (GetVariables/SetVariables) |
| 安全性 | オプション、標準化されていない | 3 つの統合された必須のセキュリティ プロファイル |
| スマート充電 | ベース(コネクタプロファイル) | アドバンスト: EVSE、スタック優先順位、複合スケジュール用 |
| ISO15118 | サポートされていません | ネイティブブロックM(プラグ&チャージ) |
| 特定の組織 | 操作のフラットリスト | ユースケース、要件、図を含む 16 の機能ブロック |
OCPP 2.1: 2025 年の新機能
Open Charge Alliance によって 2025 年 1 月にリリースされた OCPP 2.1 はまだ完全版です 2.0.1 との下位互換性があり、将来に向けて重要な機能が追加されます。
- 高度な V2G (Vehicle-to-Grid)。: 双方向電力伝送による ISO 15118-20 の完全サポートにより、EV を仮想発電所として実現
- DERの統合: 太陽光発電パネルや蓄電システムなどのリソースを使用して分散エネルギーを最適化するための高度なツール
- ネイティブ価格設定: ベンダー固有の拡張機能を使用せずにリアルタイム料金 (kWh、時間、駐車料金) を通信するための標準化されたデータ構造
- バッテリー交換: 二輪車および三輪車のバッテリー交換ステーションのサポート
- トランザクションを再開する: 強制再起動後にデータを失うことなくトランザクションを再開できる可能性
- 現地コスト: オフラインの場合、ステーション上で直接コスト計算
WebSocket 通信アーキテクチャ
OCPP 2.0.1 は排他的に使用します WebSocket 上の JSON プロトコルとして トランスポート、SOAP/XML を完全に放棄します。このアーキテクチャ上の選択により、 永続的な双方向通信、低遅延、軽量ペイロード、互換性 最新の Web インフラストラクチャをネイティブにサポートします。
クライアント/サーバー トポロジ
OCPP モデルでは、 充電ステーション のように動作します クライアント Webソケット そして CSMS として WebSocketサーバー。 充電ステーションは接続を開始し、メカニズムを使用して接続をアクティブに保ちます。 心拍数。 CSMS は同じ接続上でステーションにコマンドを送信できます WebSocket を開くと、リバース接続は必要ありません (ポーリングやプッシュは必要ありません) 別途)。
Charging Station CSMS
| |
|--- WebSocket CONNECT ---------------->|
| wss://csms.example.com/ocpp/CS001 |
| Sec-WebSocket-Protocol: ocpp2.0.1 |
| Authorization: Basic base64(...) |
| |
|<-- HTTP 101 Switching Protocols -------|
| Sec-WebSocket-Protocol: ocpp2.0.1 |
| |
|--- BootNotification.req ------------->|
|<-- BootNotification.conf --------------|
| (interval: 300, status: Accepted) |
| |
|--- StatusNotification.req[EVSE1] ---->|
|<-- StatusNotification.conf ------------|
| |
|--- Heartbeat.req (ogni 300s) -------->|
|<-- Heartbeat.conf --------------------|
| |
| <-- utente avvicina RFID --- |
|--- Authorize.req -------------------->|
|<-- Authorize.conf (Accepted) ----------|
| |
|--- TransactionEvent(Started) -------->|
|<-- TransactionEvent.conf -------------|
| |
|<-- SetChargingProfile.req ------------| (CSMS gestisce load)
|--- SetChargingProfile.conf ---------->|
| |
|--- MeterValues (ogni 60s) ----------->|
|<-- MeterValues.conf ------------------|
| |
|--- TransactionEvent(Ended) ---------->|
|<-- TransactionEvent.conf -------------|
接続URLとサブプロトコル
充電ステーションは、独自の URL を含む URL を使用して CSMS に接続します。
一意の識別子 パスの最後のセグメントとして。
WebSocket サブプロトコル ocpp2.0.1 握手中に交渉される
プロトコル バージョンの互換性を確保するための HTTP。
# Formato URL
wss://csms.example.com/ocpp/{chargingStationId}
# Esempi reali
wss://csms.example.com/ocpp/IT-MIL-STATION-001
wss://csms.example.com/ocpp/EVSE-PARK-NORD-042
wss://csms.example.com/ocpp/CPO-AUTOGRILL-A7-01
# Headers WebSocket obbligatori
Sec-WebSocket-Protocol: ocpp2.0.1
Authorization: Basic {base64(stationId:password)} # Security Profile 1-2
# Con Security Profile 3 (mTLS): nessun header Authorization,
# l'autenticazione avviene tramite certificato client TLS
OCPP メッセージ構造 2.0.1
OCPP 2.0.1 では 3 種類の JSON メッセージが定義されており、すべてフレームとして転送されます。 WebSocketのテキスト。それぞれのメッセージは、 JSON配列 フォーマット付き メッセージの種類に基づいて正確です。この単純な構造により解析が容易になります デバッグと SOAP/XML のオーバーヘッドの比較。
CALL (リクエスト) - MessageTypeId 2
メッセージ 電話 一方の当事者によって送信されたリクエストを表します (充電ステーションまたは CSMS) を別の機器に接続します。一意の ID、アクションの名前が含まれます。 そしてリクエストペイロード。
// Formato: [MessageTypeId, MessageId, Action, Payload]
// Esempio: BootNotification dalla Charging Station
[2, "19223201", "BootNotification", {
"chargingStation": {
"model": "SuperCharger-500",
"vendorName": "EVPower Inc.",
"serialNumber": "SN-2025-00142",
"firmwareVersion": "3.2.1",
"modem": {
"iccid": "8939100000000000001",
"imsi": "310260000000001"
}
},
"reason": "PowerUp"
}]
// Esempio: TransactionEvent dalla Charging Station
[2, "tx-evt-001", "TransactionEvent", {
"eventType": "Started",
"timestamp": "2026-03-09T10:30:00Z",
"triggerReason": "CablePluggedIn",
"seqNo": 0,
"transactionInfo": {
"transactionId": "TXN-2026-0309-001",
"chargingState": "EVConnected"
},
"evse": { "id": 1, "connectorId": 1 },
"idToken": {
"idToken": "RFID-04A2B3C4D5",
"type": "ISO14443"
}
}]
CALLRESULT (応答) - MessageTypeId 3
メッセージ 電話結果 そしてCALLに対する肯定的な反応。の 許可するには、MessageId が元の CALL の MessageId と正確に一致する必要があります。 リクエストとレスポンスの相関関係。
// Formato: [MessageTypeId, MessageId, Payload]
// Risposta a BootNotification
[3, "19223201", {
"currentTime": "2026-03-09T10:00:00Z",
"interval": 300,
"status": "Accepted"
}]
// Risposta a TransactionEvent (Started)
[3, "tx-evt-001", {
"totalCost": 0,
"chargingPriority": 0,
"idTokenInfo": {
"status": "Accepted",
"groupIdToken": {
"idToken": "GROUP-FLEET-01",
"type": "Central"
}
}
}]
CALLERROR (エラー) - MessageTypeId 4
メッセージ 発信者エラー 受信者が送信しない場合に送信されます CALL を処理できます。標準化されたエラーコードと説明が含まれます 読みやすく構造化された詳細。
// Formato: [MessageTypeId, MessageId, ErrorCode, ErrorDescription, ErrorDetails]
[4, "19223201", "FormatViolation",
"Il campo 'vendorName' supera la lunghezza massima di 50 caratteri",
{
"field": "chargingStation.vendorName",
"maxLength": 50,
"actualLength": 67
}
]
// Codici di errore OCPP 2.0.1 standardizzati:
// FormatViolation - messaggio JSON malformato
// GenericError - errore generico non classificabile
// InternalError - errore interno del ricevente
// MessageTypeNotSupported - tipo di messaggio non supportato
// NotImplemented - azione riconosciuta ma non implementata
// NotSupported - azione non supportata dall'implementazione
// OccurrenceConstraintViolation - violazione cardinalita elementi
// PropertyConstraintViolation - vincolo su una proprietà violato
// ProtocolError - violazione del protocollo OCPP
// RpcFrameworkError - errore nel framework RPC di base
// SecurityError - errore di sicurezza o autenticazione
// TypeConstraintViolation - tipo di dato non corretto
リクエストとレスポンスの相関: 重要なルール
各 CALL には 一意のメッセージ ID (最大36文字 英数字)、以前に同じ接続で使用されていないもの 同じ差出人です。 CALLRESULT または CALLERROR はこれを使用する必要があります 同じメッセージID。 送信者はタイムアウト (推奨: 30 秒) を維持する必要があります。 リクエストは失敗したとみなされます。一度に保留できるのは 1 つの CALL だけです 通信の各方向: ステーションは次の CALL を送信することはできません。 最初のものには応答がありませんでした。
OCPP 2.0.1 の 16 の機能ブロック
OCPP 2.0.1 では、すべての機能が次のように整理されています。 16 の機能ブロック (A ~ P)、それぞれに詳細な要件を伴う特定のユースケースが含まれています。 前提条件とシーケンス図。このモジュール構成により、 実装者はサポートするブロックを宣言し、テスターは検証する ブロックごとのコンプライアンス。
| ブロック | 名前 | トップメッセージ | 義務的 |
|---|---|---|---|
| A | 安全 | SecurityEventNotification、SignCertificate、CertificateSigned | Si |
| B | プロビジョニング | BootNotification、SetVariables、GetVariables、NotifyReport | Si |
| C | 認可 | 認可、ClearCache、GetLocalListVersion | Si |
| D | ローカル認証リスト | SendLocalList、GetLocalListVersion | No |
| E | 取引 | TransactionEvent、GetTransactionStatus、MeterValues | Si |
| F | リモコン | RequestStartTransaction、RequestStopTransaction、UnlockConnector | No |
| G | 可用性 | StatusNotification、ChangeAvailability、ハートビート | Si |
| H | 予約 | 今すぐ予約、キャンセル予約 | No |
| I | 料金と費用 | CostUpdated、ShowMessage | No |
| J | 測光 | MeterValues (エネルギー/電力/電流測定) | Si |
| K | スマート充電 | SetChargingProfile、ClearChargingProfile、GetChargingProfiles、ReportChargingProfiles | No |
| L | ファームウェア管理 | UpdateFirmware、FirmwareStatusNotification | No |
| M | ISO 15118認証管理 | Get15118EVCertificate、DeleteCertificate、CertificateSigned | No |
| N | 診断 | GetLog、LogStatusNotification、SetMonitoringBase、SetVariableMonitoring | No |
| O | メッセージを表示する | SetDisplayMessage、GetDisplayMessages、ClearDisplayMessage | No |
| P | データ転送 | DataTransfer (ベンダー固有の拡張機能) | No |
デバイス モデル: OCPP 2.0.1 の核心
Il デバイスモデル そして、OCPP 2.0.1 の主要なアーキテクチャの革新。 OCPP 1.6 のハード構成キー システム (ChangeConfiguration) を置き換えます。 スパナ付き) 柔軟な階層モデル に基づいて コンポーネントと変数。各ステーションはその構造を完全に説明しており、 ベンダーに依存しない方法で構成できます。
ChargingStation (radice)
|
+-- Controller (computer della stazione)
| +-- Variables: Vendor, Model, FirmwareVersion, SerialNumber
|
+-- EVSE[1] (punto di ricarica 1)
| +-- Variables: AvailabilityState, Power, SupplyPhases
| +-- Connector[1] (connettore CCS2 / DC)
| | +-- Variables: ConnectorType, AvailabilityState, MaxCurrent
| +-- Connector[2] (connettore CHAdeMO)
| +-- Variables: ConnectorType, AvailabilityState, MaxCurrent
|
+-- EVSE[2] (punto di ricarica 2)
| +-- Connector[1] (connettore Type2 AC / 22kW)
| +-- Variables: ConnectorType, Phases, MaxCurrent
|
+-- PowerMeter (contatore principale)
| +-- Variables: Energy.Active.Import.Register, Power.Active.Import
|
+-- NetworkInterface (ETH0/LTE)
| +-- Variables: Type, SSID, SignalStrength, ActiveNetworkProfile
|
+-- SecurityCtrlr
| +-- Variables: SecurityProfile, CertificateEntries
|
+-- SmartChargingCtrlr
+-- Variables: ChargingProfileMaxStackLevel, ChargeProfileKindsSupported
// CALL dal CSMS: legge stato EVSE e tipo connettore
[2, "get-var-001", "GetVariables", {
"getVariableData": [
{
"component": { "name": "EVSE", "evse": { "id": 1 } },
"variable": { "name": "AvailabilityState" },
"attributeType": "Actual"
},
{
"component": { "name": "Connector", "evse": { "id": 1, "connectorId": 1 } },
"variable": { "name": "ConnectorType" },
"attributeType": "Actual"
},
{
"component": { "name": "SmartChargingCtrlr" },
"variable": { "name": "ChargingProfileMaxStackLevel" },
"attributeType": "Actual"
}
]
}]
// CALLRESULT dalla Charging Station
[3, "get-var-001", {
"getVariableResult": [
{
"attributeStatus": "Accepted",
"component": { "name": "EVSE", "evse": { "id": 1 } },
"variable": { "name": "AvailabilityState" },
"attributeValue": "Available"
},
{
"attributeStatus": "Accepted",
"component": { "name": "Connector", "evse": { "id": 1, "connectorId": 1 } },
"variable": { "name": "ConnectorType" },
"attributeValue": "cCCS2"
},
{
"attributeStatus": "Accepted",
"component": { "name": "SmartChargingCtrlr" },
"variable": { "name": "ChargingProfileMaxStackLevel" },
"attributeValue": "5"
}
]
}]
OCPP セキュリティ プロファイル 2.0.1
OCPP 2.0.1 では 3 つの機能が導入されています プログレッシブセキュリティプロファイル 定義するもの 充電ステーションと CSMS 間の通信の保護レベル。プロフィール 導入時に選択する必要があり、メカニズムが自動的に構成されます 認証と暗号化。
| 特性 | プロフィール 1 | プロフィール 2 | プロフィール 3 |
|---|---|---|---|
| WebソケットURL | ws:// (TLSなし) | wss:// (TLS) | wss:// (TLS) |
| 暗号化 | なし | TLS 1.2+ | TLS 1.2+ |
| 認証ステーション | パスワード(基本認証) | パスワード(基本認証) | X.509クライアント証明書 |
| 認証 CSMS | なし | TLSサーバー証明書 | TLSサーバー証明書 |
| MitM 保護 | No | 部分的 (CSMS 認証のみ) | フル (相互 TLS) |
| 証明書の管理 | 必要ありません | デバイス上のルート CA のみ | 完全な PKI: CA + クライアント証明書 |
| 推奨される使用方法 | テスト環境のみ | 標準生産 | クリティカルプロダクション、P&C ISO 15118 |
本番環境での証明書の管理 (セキュリティ プロファイル 3)
セキュリティ プロファイル 3 では、証明書のライフサイクル管理が次のようになります。 重要な操作。 OCPP 2.0.1 には、次の専用メッセージが含まれています。 署名証明書 (ステーションは CSR の署名を必要とします)、 証明書署名済み (CSMS は署名付き証明書をインストールします)、 証明書の削除 (古い証明書を削除します)、 GetInstalledCertificateIds (インストールされている証明書のリスト)。 そしてもう一つは欠かせない 堅牢な PKI 少なくとも自動更新付き 有効期限の 30 日前、有効性と有効性を継続的に監視 CRL/OCSP 失効メカニズム。
スマートな充電と負荷管理
Lo スマート充電 (K ブロック) および最も重要な機能 大規模な設備を備えたオペレーター。 CSMS による動的制御を可能にします ネットワークの制約、エネルギー料金、 ユーザーの優先順位と変圧器の容量。
充電プロファイルの階層
OCPP 2.0.1 では、スタック レベル (優先度) を持つ 4 種類の課金プロファイルが定義されています。
| プロファイルの種類 | 範囲 | 応用 | オーバーライド |
|---|---|---|---|
| 充電ステーション最大プロファイル | ステーション全体の絶対最大値 | 変圧器保護、供給契約 | オーバーライド不可 |
| 充電ステーションの外部制約 | 外部システム (DSO、アグリゲーター) からの制限 | デマンドレスポンス、ネットワークバランシング | より高いプロフィールからのみ |
| Txデフォルトプロファイル | トランザクションのデフォルトプロファイル | 料金政策、基本的なスケジュール、太陽光発電 | 特定の TxProfile から |
| 送信プロファイル | トランザクションの特定のプロファイル | ユーザーの優先順位、個人の好み | 最大の安全性 |
SetChargingProfile: ピークシェービングと太陽光発電の統合
// Strategia: integra produzione solare + peak shaving ore serali
// Scenario: sito con 50kW fotovoltaico, trasformatore 100A, picco 19-21h
// 1. Limite massimo stazione (rispetta contratto di fornitura)
[2, "smart-max-001", "SetChargingProfile", {
"evseId": 0,
"chargingProfile": {
"id": 1,
"stackLevel": 1,
"chargingProfilePurpose": "ChargingStationMaxProfile",
"chargingProfileKind": "Absolute",
"chargingSchedule": [{
"id": 1,
"chargingRateUnit": "A",
"chargingSchedulePeriod": [
{ "startPeriod": 0, "limit": 100.0 }
]
}]
}
}]
// 2. Profilo solare + peak shaving per un singolo EVSE
[2, "smart-solar-001", "SetChargingProfile", {
"evseId": 1,
"chargingProfile": {
"id": 100,
"stackLevel": 0,
"chargingProfilePurpose": "TxDefaultProfile",
"chargingProfileKind": "Absolute",
"validFrom": "2026-03-09T00:00:00Z",
"validTo": "2026-03-10T00:00:00Z",
"chargingSchedule": [{
"id": 1,
"chargingRateUnit": "A",
"startSchedule": "2026-03-09T06:00:00Z",
"chargingSchedulePeriod": [
{ "startPeriod": 0, "limit": 8.0, "numberPhases": 3 },
{ "startPeriod": 7200, "limit": 32.0, "numberPhases": 3 },
{ "startPeriod": 14400, "limit": 32.0, "numberPhases": 3 },
{ "startPeriod": 43200, "limit": 16.0, "numberPhases": 3 },
{ "startPeriod": 46800, "limit": 8.0, "numberPhases": 3 },
{ "startPeriod": 54000, "limit": 24.0, "numberPhases": 3 }
]
}]
}
}]
// Orario potenza: 06-08h: 8A (offpeak, bassa produzione solare)
// 08-12h: 32A (piena produzione FV, massima potenza)
// 12-18h: 32A (picco solare, alta produzione)
// 18-19h: 16A (calo solare, riduzione)
// 19-21h: 8A (picco domanda residenziale, min potenza)
// 21-24h: 24A (fine picco, potenza media)
動的負荷分散アルゴリズム
負荷管理アルゴリズムにより、セッション間で利用可能な電力が分配されます。 トランスの制限と優先順位を考慮して、リアルタイムでアクティブになります。 最も一般的なアプローチは、 加重公正シェア 最小/最大制約あり。
interface ChargingSession {
readonly stationId: string;
readonly evseId: number;
readonly transactionId: string;
readonly priority: number; // 0-9 (9 = massima)
readonly minChargingRate: number; // A minimi per caricare
readonly maxChargingRate: number; // A massimi del connettore
readonly currentChargingRate: number;
readonly energyDelivered: number; // Wh totali erogati
readonly targetEnergy?: number; // Wh target (se specificato dall'utente)
readonly isEV3Phase: boolean; // Veicolo trifase
}
interface LoadBalancerConfig {
readonly maxSitePowerAmps: number; // A max del trasformatore
readonly reservedBuildingAmps: number; // A riservati per l'edificio
readonly minSessionAmps: number; // A minimi per sessione (tipico: 6A)
readonly rebalanceIntervalSec: number; // Secondi tra ricalcoli (tipico: 30s)
}
interface Allocation {
readonly stationId: string;
readonly evseId: number;
readonly allocatedAmps: number;
readonly phases: number;
}
function calculateChargingAllocations(
sessions: ReadonlyArray<ChargingSession>,
config: LoadBalancerConfig
): ReadonlyArray<Allocation> {
if (sessions.length === 0) return [];
const availableAmps = config.maxSitePowerAmps
- config.reservedBuildingAmps;
// Step 1: ordina per priorità (desc), poi energia erogata (asc = meno carico prima)
const sorted = [...sessions].sort((a, b) => {
if (b.priority !== a.priority) return b.priority - a.priority;
return a.energyDelivered - b.energyDelivered;
});
// Step 2: garantisci potenza minima a tutti
const minRequired = sorted.length * config.minSessionAmps;
if (minRequired > availableAmps) {
// Caso critico: potenza insufficiente, sospendi sessioni a bassa priorità
return sorted
.slice(0, Math.floor(availableAmps / config.minSessionAmps))
.map((s) => ({
stationId: s.stationId,
evseId: s.evseId,
allocatedAmps: config.minSessionAmps,
phases: s.isEV3Phase ? 3 : 1,
}));
}
// Step 3: distribuzione proporzionale ai pesi di priorità
const totalWeight = sorted.reduce(
(sum, s) => sum + (1 + s.priority), 0
);
const remainingAmps = availableAmps - minRequired;
const allocations = sorted.map((session) => {
const weight = (1 + session.priority) / totalWeight;
const bonus = remainingAmps * weight;
const raw = config.minSessionAmps + bonus;
// Applica vincoli min/max del connettore
const allocatedAmps = Math.max(
config.minSessionAmps,
Math.min(session.maxChargingRate, Math.round(raw * 10) / 10)
);
return {
stationId: session.stationId,
evseId: session.evseId,
allocatedAmps,
phases: session.isEV3Phase ? 3 : 1,
};
});
// Step 4: verifica finale che il totale non superi il limite
const total = allocations.reduce((s, a) => s + a.allocatedAmps, 0);
if (total <= availableAmps) return allocations;
// Riscaling proporzionale
const scale = availableAmps / total;
return allocations.map((a) => ({
...a,
allocatedAmps: Math.max(
config.minSessionAmps,
Math.round(a.allocatedAmps * scale * 10) / 10
),
}));
}
Python と PostgreSQL を使用した CSMS バックエンドの実装
Python ライブラリ ocpp MobilityHouse 著 (オープンソース、GitHub で 2000 個以上のスター)
CSMS の最も一般的なリファレンス実装です。ライブラリを組み合わせます
asyncio, websockets e asyncpg PostgreSQL用
本番環境に対応したバックエンドを構築します。
CSMS の PostgreSQL スキーマ
-- Registro stazioni di ricarica
CREATE TABLE charging_stations (
station_id TEXT PRIMARY KEY,
vendor_name TEXT NOT NULL,
model TEXT NOT NULL,
serial_number TEXT,
firmware_version TEXT,
security_profile SMALLINT NOT NULL DEFAULT 1,
last_boot_reason TEXT,
last_seen_at TIMESTAMPTZ,
is_online BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Stato EVSE e connettori
CREATE TABLE evse_status (
station_id TEXT NOT NULL REFERENCES charging_stations(station_id),
evse_id SMALLINT NOT NULL,
connector_id SMALLINT NOT NULL,
connector_type TEXT, -- cCCS2, cCHAdeMO, cType2, sType3
status TEXT NOT NULL DEFAULT 'Unknown',
error_code TEXT,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (station_id, evse_id, connector_id)
);
-- Transazioni di ricarica
CREATE TABLE transactions (
transaction_id TEXT PRIMARY KEY,
station_id TEXT NOT NULL REFERENCES charging_stations(station_id),
evse_id SMALLINT NOT NULL,
connector_id SMALLINT,
id_token TEXT NOT NULL,
id_token_type TEXT NOT NULL,
state TEXT NOT NULL DEFAULT 'Started',
started_at TIMESTAMPTZ NOT NULL,
ended_at TIMESTAMPTZ,
meter_start_wh NUMERIC(12, 3),
meter_end_wh NUMERIC(12, 3),
energy_wh NUMERIC(12, 3) GENERATED ALWAYS AS (meter_end_wh - meter_start_wh) STORED,
stop_reason TEXT,
total_cost_cents INTEGER,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Meter values (time series - usare TimescaleDB in produzione)
CREATE TABLE meter_values (
id BIGSERIAL PRIMARY KEY,
transaction_id TEXT REFERENCES transactions(transaction_id),
station_id TEXT NOT NULL,
evse_id SMALLINT NOT NULL,
sampled_at TIMESTAMPTZ NOT NULL,
energy_wh NUMERIC(12, 3),
power_w NUMERIC(10, 2),
current_a NUMERIC(8, 3),
voltage_v NUMERIC(8, 2),
soc_pct SMALLINT -- State of Charge da ISO 15118
);
CREATE INDEX idx_meter_values_station_time
ON meter_values(station_id, sampled_at DESC);
CREATE INDEX idx_transactions_station_id
ON transactions(station_id, started_at DESC);
-- Token di autorizzazione (lista locale cache)
CREATE TABLE authorization_cache (
id_token TEXT NOT NULL,
id_token_type TEXT NOT NULL,
status TEXT NOT NULL, -- Accepted, Invalid, Blocked, Expired
group_id TEXT,
expiry_date TIMESTAMPTZ,
cached_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id_token, id_token_type)
);
CSMS 完全な Python バックエンド
import asyncio
import logging
from datetime import datetime, timezone
from typing import Optional, Any
import asyncpg
import websockets
from ocpp.routing import on
from ocpp.v201 import ChargePoint as Cp
from ocpp.v201 import call, call_result
from ocpp.v201.enums import (
Action, RegistrationStatusType, AuthorizationStatusType,
ConnectorStatusType
)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(name)s %(levelname)s %(message)s'
)
log = logging.getLogger('csms')
# Pool globale connessioni PostgreSQL
_db_pool: Optional[asyncpg.Pool] = None
async def get_db() -> asyncpg.Pool:
global _db_pool
if _db_pool is None:
_db_pool = await asyncpg.create_pool(
dsn='postgresql://csms:password@localhost/csms_db',
min_size=5,
max_size=20,
)
return _db_pool
class ChargePointHandler(Cp):
"""
Handler OCPP 2.0.1 per una singola Charging Station.
Un'istanza per ogni connessione WebSocket attiva.
"""
@on(Action.boot_notification)
async def on_boot_notification(
self, charging_station: dict, reason: str, **kwargs
) -> call_result.BootNotification:
log.info(
f"Boot: {self.id} | "
f"{charging_station['vendor_name']} {charging_station['model']} "
f"| reason={reason}"
)
db = await get_db()
await db.execute(
"""
INSERT INTO charging_stations
(station_id, vendor_name, model, serial_number,
firmware_version, last_boot_reason, last_seen_at, is_online)
VALUES ($1, $2, $3, $4, $5, $6, NOW(), TRUE)
ON CONFLICT (station_id) DO UPDATE SET
vendor_name = EXCLUDED.vendor_name,
model = EXCLUDED.model,
firmware_version = EXCLUDED.firmware_version,
last_boot_reason = EXCLUDED.last_boot_reason,
last_seen_at = NOW(),
is_online = TRUE
""",
self.id,
charging_station['vendor_name'],
charging_station['model'],
charging_station.get('serial_number'),
charging_station.get('firmware_version'),
reason,
)
return call_result.BootNotification(
current_time=datetime.now(timezone.utc).isoformat(),
interval=300,
status=RegistrationStatusType.accepted,
)
@on(Action.heartbeat)
async def on_heartbeat(self) -> call_result.Heartbeat:
db = await get_db()
await db.execute(
"UPDATE charging_stations SET last_seen_at = NOW() WHERE station_id = $1",
self.id
)
return call_result.Heartbeat(
current_time=datetime.now(timezone.utc).isoformat()
)
@on(Action.status_notification)
async def on_status_notification(
self, timestamp: str, connector_status: str,
evse_id: int, connector_id: int, **kwargs
) -> call_result.StatusNotification:
log.info(
f"Status: {self.id} EVSE[{evse_id}]"
f"Connector[{connector_id}] = {connector_status}"
)
db = await get_db()
await db.execute(
"""
INSERT INTO evse_status
(station_id, evse_id, connector_id, status, updated_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (station_id, evse_id, connector_id) DO UPDATE SET
status = EXCLUDED.status,
updated_at = NOW()
""",
self.id, evse_id, connector_id, connector_status
)
return call_result.StatusNotification()
@on(Action.authorize)
async def on_authorize(
self, id_token: dict, **kwargs
) -> call_result.Authorize:
token = id_token['id_token']
token_type = id_token['type']
log.info(f"Authorize: {self.id} token={token} type={token_type}")
db = await get_db()
# Controlla prima la cache locale
row = await db.fetchrow(
"""
SELECT status, group_id, expiry_date
FROM authorization_cache
WHERE id_token = $1 AND id_token_type = $2
AND (expiry_date IS NULL OR expiry_date > NOW())
""",
token, token_type
)
status = AuthorizationStatusType.invalid
if row and row['status'] == 'Accepted':
status = AuthorizationStatusType.accepted
return call_result.Authorize(
id_token_info={'status': status}
)
@on(Action.transaction_event)
async def on_transaction_event(
self,
event_type: str,
timestamp: str,
trigger_reason: str,
seq_no: int,
transaction_info: dict,
evse: Optional[dict] = None,
id_token: Optional[dict] = None,
meter_value: Optional[list] = None,
**kwargs
) -> call_result.TransactionEvent:
tx_id = transaction_info['transaction_id']
log.info(
f"TransactionEvent: {self.id} {event_type} "
f"tx={tx_id} trigger={trigger_reason}"
)
db = await get_db()
if event_type == 'Started':
await self._handle_tx_started(
db, tx_id, evse, id_token, timestamp, meter_value
)
elif event_type == 'Updated' and meter_value:
await self._handle_tx_updated(db, tx_id, evse, meter_value)
elif event_type == 'Ended':
await self._handle_tx_ended(
db, tx_id, timestamp, transaction_info, meter_value
)
return call_result.TransactionEvent(
total_cost=0,
charging_priority=0,
id_token_info={'status': AuthorizationStatusType.accepted},
)
async def _handle_tx_started(
self, db, tx_id, evse, id_token, timestamp, meter_value
):
evse_id = evse['id'] if evse else 0
connector_id = evse.get('connector_id') if evse else None
token = id_token['id_token'] if id_token else 'unknown'
token_type = id_token['type'] if id_token else 'Local'
meter_start = self._extract_energy(meter_value)
await db.execute(
"""
INSERT INTO transactions
(transaction_id, station_id, evse_id, connector_id,
id_token, id_token_type, state, started_at, meter_start_wh)
VALUES ($1, $2, $3, $4, $5, $6, 'Started', $7, $8)
ON CONFLICT (transaction_id) DO NOTHING
""",
tx_id, self.id, evse_id, connector_id,
token, token_type, timestamp, meter_start
)
async def _handle_tx_updated(self, db, tx_id, evse, meter_value):
evse_id = evse['id'] if evse else 0
energy = self._extract_energy(meter_value)
power = self._extract_power(meter_value)
if energy is not None:
await db.execute(
"""
INSERT INTO meter_values
(transaction_id, station_id, evse_id, sampled_at, energy_wh, power_w)
VALUES ($1, $2, $3, NOW(), $4, $5)
""",
tx_id, self.id, evse_id, energy, power
)
async def _handle_tx_ended(
self, db, tx_id, timestamp, transaction_info, meter_value
):
meter_end = self._extract_energy(meter_value)
stop_reason = transaction_info.get('stopped_reason')
await db.execute(
"""
UPDATE transactions SET
state = 'Ended',
ended_at = $1,
meter_end_wh = $2,
stop_reason = $3
WHERE transaction_id = $4
""",
timestamp, meter_end, stop_reason, tx_id
)
def _extract_energy(self, meter_values: Optional[list]) -> Optional[float]:
if not meter_values:
return None
for mv in meter_values:
for sv in mv.get('sampled_value', []):
if sv.get('measurand', '') == 'Energy.Active.Import.Register':
return float(sv['value'])
return None
def _extract_power(self, meter_values: Optional[list]) -> Optional[float]:
if not meter_values:
return None
for mv in meter_values:
for sv in mv.get('sampled_value', []):
if sv.get('measurand', '') == 'Power.Active.Import':
return float(sv['value'])
return None
# === Comandi CSMS -> Stazione ===
async def send_remote_start(
self, evse_id: int, id_token: str, limit_amps: float = 32.0
) -> str:
"""Avvia una sessione da remoto su EVSE specificato."""
request = call.RequestStartTransaction(
id_token={'id_token': id_token, 'type': 'Central'},
evse_id=evse_id,
charging_profile={
'id': 999,
'stack_level': 0,
'charging_profile_purpose': 'TxProfile',
'charging_profile_kind': 'Relative',
'charging_schedule': [{
'id': 1,
'charging_rate_unit': 'A',
'charging_schedule_period': [
{'start_period': 0, 'limit': limit_amps}
],
}],
},
)
response = await self.call(request)
log.info(f"RemoteStart {self.id} EVSE{evse_id}: {response.status}")
return response.status
async def send_charging_profile(
self, evse_id: int, profile: dict
) -> str:
"""Imposta un profilo di carica per smart charging."""
request = call.SetChargingProfile(
evse_id=evse_id,
charging_profile=profile
)
response = await self.call(request)
log.info(f"ChargingProfile {self.id} EVSE{evse_id}: {response.status}")
return response.status
# Registry globale delle connessioni attive
_connected_stations: dict[str, ChargePointHandler] = {}
async def on_connect(websocket, path: str):
"""Callback per nuove connessioni WebSocket OCPP."""
station_id = path.strip('/').split('/')[-1]
if not station_id:
await websocket.close(1008, 'Missing station ID')
return
log.info(f"Connessione da: {station_id} | path={path}")
cp = ChargePointHandler(station_id, websocket)
_connected_stations[station_id] = cp
try:
await cp.start()
except websockets.exceptions.ConnectionClosed as e:
log.info(f"Disconnesso: {station_id} code={e.code}")
except Exception as e:
log.error(f"Errore: {station_id} - {e}")
finally:
_connected_stations.pop(station_id, None)
db = await get_db()
await db.execute(
"UPDATE charging_stations SET is_online = FALSE WHERE station_id = $1",
station_id
)
async def main():
await get_db() # Inizializza pool DB
log.info("CSMS OCPP 2.0.1 avviato")
server = await websockets.serve(
on_connect,
'0.0.0.0',
9000,
subprotocols=['ocpp2.0.1'],
# TLS: aggiungere ssl=ssl_context per Security Profile 2-3
ping_interval=60,
ping_timeout=30,
max_size=1_048_576, # 1MB max message size
)
log.info("In ascolto su ws://0.0.0.0:9000/ocpp/{stationId}")
await server.wait_closed()
if __name__ == '__main__':
asyncio.run(main())
ISO 15118 とプラグアンドチャージ
ISO15118 車両間の高レベル通信を定義します 電力線通信による電気(EV)および充電ステーション(EVSE) DC 充電ケーブル (CCS) の (PLC)。 OCPP 2.0.1 は ISO 15118 をネイティブに統合します 機能ブロック M を通じて、 プラグアンドチャージ: 車両は X.509 デジタル証明書を介して自動的に認証されます。 RFID またはモバイル アプリ。
V2G (Vehicle-to-Grid) PKI アーキテクチャ
Plug & Charge の証明書システムは PKI (公開鍵) に基づいています。 インフラストラクチャ)電動モビリティの特定の階層:
V2G Root CA (Root of Trust - gestita da OEM o eMSP)
|
+-- V2G Intermediate CA
| |
| +-- EVSE Certificate (installato nella stazione)
| CN = EVSE-IT-MIL-001
|
+-- eMobility Service Provider CA (eMSP)
|
+-- Contract Certificate (installato nel veicolo)
CN = IT.CPO.000001234 (eMAID - e-Mobility Account Identifier)
SubjectAltName = eMAID:IT.CPO.000001234
プラグアンドチャージの完全なフロー
EV EVSE CSMS eMSP
| | | |
|-- Plug cavo DC --> | | |
| | | |
|<= ISO 15118-2 TLS =>| (PLC sul cavo SLAC) | |
| | | |
|-- ContractCert --->| | |
| (eMAID, X.509) | | |
| |--- Authorize req ----->| |
| | idToken.type=eMAID | |
| | |--- OCPI check --->|
| | |<-- Contract OK --|
| | | |
| |<-- Authorize.conf ----| |
| | status: Accepted | |
| | | |
|<= Charging Start ==>| | |
| | | |
| EV invia target | | |
|-- EnergyRequest --->| | |
| SoC: 45% | | |
| Target: 80% | | |
| Departure: 18:30 | | |
| |--- TransactionEvent -->| |
| | ISO15118Trigger | |
| | | |
| |<-- SetChargingProfile--| |
|<= Schedule via PLC =>| | |
ISO 15118 製造状況 (2026)
- ISO 15118-2: プラグ アンド チャージ AC/DC - HPC DC 充電器 (Ionity、Fastned、Tesla Supercharger V3) で広くサポートされています。
- ISO 15118-20: 双方向 V2G サポート - ハードウェア サポート準備完了、ソフトウェアは 2025 ~ 2026 年に展開予定
- AFIR要件: すべての新しい V2G 対応ステーションは、2026 年から ISO 15118 をサポートする必要があります
- AFIR 2027 準拠: 2027 年 1 月 1 日以降に設置された充電器はスマート充電対応でなければなりません
- イタリアの本物の V2G:Enel X WayとNissan LeafでV2H(Vehicle-to-Home)規格の最初のパイロットを実施
スケーラブルなアーキテクチャ: 10 ~ 100,000 充電ポイント
エンタープライズ CSMS は、数万から数十万の接続を管理する必要があります 競合する WebSocket。アーキテクチャは段階的に進化し、コンポーネントが追加されます。 それらはさまざまなスケールで影響を及ぼします。
フェーズ 1: 小規模 (10 ~ 500 ステーション)
+------------------+ WebSocket/OCPP +------------------+
| Charging |------------------------| CSMS Monolitico |
| Stations (10-500)| wss://csms:9000/ocpp | Python/asyncio |
+------------------+ | Port 9000 |
+--------+---------+
|
+-------+-------+
| PostgreSQL |
| Redis (cache) |
+---------------+
Stack: Python asyncio + PostgreSQL + Redis
Deployment: 1 VM (4 vCPU, 8GB RAM), 1 DB managed
Costo: ~$200/mese
フェーズ 2: 中規模 (500 ~ 10,000 ステーション)
+------------+ +------------------+ +--------------+
| Load | | WS Gateway #1 | | Message |
| Balancer +---->| (asyncio CSMS) +---->| Broker |
| (HAProxy) | | Max 2000 conn | | (RabbitMQ) |
| | +------------------+ | |
| Sticky +---->| WS Gateway #2 +---->| |
| Sessions | | (asyncio CSMS) | +--------------+
| | +------------------+ |
+------------+ +------+------+
| Business |
| Services |
| (FastAPI) |
+------+------+
|
+----------+----------+
| PostgreSQL (HA) |
| TimescaleDB |
| Redis Cluster |
+---------------------+
Sticky sessions: basate su station_id nel path URL
Cross-node ops: Redis pub/sub per inviare comandi alle stazioni
Costo: ~$2.000/mese (K8s managed)
フェーズ 3: 大規模 (10,000 ~ 100,000 ステーション)
Global Load Balancer (Anycast)
|
+---------------+---------------+
| |
Region EU-WEST Region EU-SOUTH
+------------------+ +------------------+
| WS Gateway Pool | | WS Gateway Pool |
| (50 pods, 2000 | | (30 pods) |
| conn each = 100K)| +--------+---------+
+--------+---------+ |
| |
+-------------+----------------+
|
+-------+--------+
| Apache Kafka |
| (12 partitions)|
| per topic |
+-------+--------+
|
+-----------------+------------------+
| | |
+------+------+ +-------+------+ +--------+------+
| Transaction | | Smart | | Device |
| Service | | Charging Svc | | Mgmt Svc |
| (10 replicas)| | (5 replicas) | | (3 replicas) |
+------+------+ +-------+------+ +--------+------+
| | |
+--------+--------+------------------+
|
+--------+--------+
| PostgreSQL |
| Citus (sharding) |
| Shard key: |
| station_id hash |
+--------+---------+
|
+--------+--------+
| TimescaleDB |
| (meter values) |
+--------+--------+
Kafka Topics:
- ocpp.boot-notification (chiave: station_id)
- ocpp.transaction-events (chiave: transaction_id)
- ocpp.meter-values (chiave: station_id)
- ocpp.status-notifications (chiave: station_id)
- csms.commands (chiave: station_id)
Throughput target: 1M messaggi/ora, latenza P99 < 200ms
Costo: ~$30.000/mese (multi-region Kubernetes)
Redis を使用したクロスノード接続管理
マルチノード展開では、CSMS はそれぞれがどのゲートウェイ上にあるかを認識する必要があります。 ステーションからコマンド (SetChargingProfile、RemoteStart など) を送信します。 Redis パブ/サブスクライブ 問題を解決します:
import json
import asyncio
import redis.asyncio as aioredis
redis_client = aioredis.from_url(
'redis://redis-cluster:6379',
encoding='utf-8',
decode_responses=True
)
# Registra il nodo della connessione
async def register_connection(station_id: str, gateway_id: str):
await redis_client.setex(
f"csms:gateway:{station_id}",
value=gateway_id,
time=600 # TTL: 10 minuti, rinnovato a ogni heartbeat
)
# Pubblica un comando verso una stazione (qualunque nodo sia)
async def publish_command(station_id: str, action: str, payload: dict):
channel = f"csms:commands:{station_id}"
await redis_client.publish(channel, json.dumps({
'action': action,
'payload': payload
}))
# Su ogni nodo gateway: ascolta i comandi per le stazioni connesse
async def listen_for_commands(connected_stations: dict):
pubsub = redis_client.pubsub()
# Sottoscrivi ai canali delle stazioni connesse a questo nodo
async def subscribe_station(station_id: str):
await pubsub.subscribe(f"csms:commands:{station_id}")
async for message in pubsub.listen():
if message['type'] != 'message':
continue
station_id = message['channel'].split(':')[-1]
cp = connected_stations.get(station_id)
if not cp:
continue # Stazione non su questo nodo, ignora
cmd = json.loads(message['data'])
try:
if cmd['action'] == 'SetChargingProfile':
await cp.send_charging_profile(
cmd['payload']['evse_id'],
cmd['payload']['profile']
)
elif cmd['action'] == 'RemoteStart':
await cp.send_remote_start(
cmd['payload']['evse_id'],
cmd['payload']['id_token'],
)
except Exception as e:
log.error(f"Errore esecuzione comando {cmd['action']}: {e}")
モニタリング、メトリクス、Grafana ダッシュボード
実稼働環境の CSMS には、包括的な可観測性システムが必要です。指標 インフラストラクチャの健全性、インフラストラクチャの品質を監視するための鍵 サービスと運用パフォーマンス。
主要な運用指標
| メトリック | 式/出典 | 目標SLA | アラートしきい値 |
|---|---|---|---|
| 駅の空き状況 | オンラインステーション数 / トータルステーション数×100 | >= 99% | < 95% |
| OCPP メッセージ遅延 P99 | CALL 時間 -> CALLRESULT (95 パーセンタイル) | < 2秒 | > 5秒 |
| トランザクション成功率 | 送信完了 / 送信開始 × 100 | >= 98% | < 95% |
| 供給されるエネルギー (kWh/時間) | 今のところSum MeterValues | ベースライン +10% | < ベースライン -20% |
| 認証拒否率 | 無効な認証 / 合計認証 x 100 | < 2% | > 10% (攻撃の可能性あり) |
| WebSocket 再接続/時間 | ステーションごとに新しい接続をカウンターします | < 2/時間/ステーション | > 10/時間/ステーション |
| スマート充電コンプライアンス | 実際の電力と設定プロファイル | +/- 5% | 偏差 > 15% |
| 認定有効期限日数 | TLS 証明書の有効期限が切れるまでの日数 | > 30日 | < 30 日 (更新アラート) |
CSMS 用 Prometheus エクスポーター
from prometheus_client import (
Counter, Gauge, Histogram, start_http_server
)
# Metriche Prometheus
OCPP_MESSAGES_TOTAL = Counter(
'ocpp_messages_total',
'Numero totale messaggi OCPP processati',
['action', 'direction', 'status'] # direction: inbound/outbound
)
OCPP_MESSAGE_DURATION = Histogram(
'ocpp_message_duration_seconds',
'Latenza elaborazione messaggi OCPP',
['action'],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)
STATIONS_CONNECTED = Gauge(
'csms_stations_connected_total',
'Numero stazioni connesse al CSMS'
)
ACTIVE_TRANSACTIONS = Gauge(
'csms_active_transactions_total',
'Numero transazioni di ricarica attive'
)
ENERGY_DELIVERED_WH = Counter(
'csms_energy_delivered_wh_total',
'Energia totale erogata in Wh',
['station_id']
)
AUTH_RESULTS = Counter(
'csms_authorization_results_total',
'Risultati delle autorizzazioni OCPP',
['status'] # Accepted, Invalid, Blocked, Expired
)
SMART_CHARGING_EVENTS = Counter(
'csms_smart_charging_events_total',
'Operazioni smart charging',
['action', 'result']
)
def start_metrics_server(port: int = 8001):
"""Avvia il server HTTP Prometheus su porta specificata."""
start_http_server(port)
log.info(f"Prometheus metrics su http://0.0.0.0:{port}/metrics")
# Decorator per misurare latenza handler
import time
import functools
def track_ocpp_handler(action: str):
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start = time.monotonic()
try:
result = await func(*args, **kwargs)
OCPP_MESSAGES_TOTAL.labels(
action=action, direction='inbound', status='success'
).inc()
return result
except Exception as e:
OCPP_MESSAGES_TOTAL.labels(
action=action, direction='inbound', status='error'
).inc()
raise
finally:
OCPP_MESSAGE_DURATION.labels(action=action).observe(
time.monotonic() - start
)
return wrapper
return decorator
// Pannelli principali per dashboard Grafana CSMS
// 1. Stazioni online (Gauge)
{
"title": "Stazioni Connesse",
"type": "gauge",
"fieldConfig": {
"defaults": {
"thresholds": {
"steps": [
{ "color": "red", "value": 0 },
{ "color": "yellow", "value": 90 },
{ "color": "green", "value": 99 }
]
}
}
},
"targets": [{
"expr": "csms_stations_connected_total / csms_stations_registered_total * 100",
"legendFormat": "Availability %"
}]
}
// 2. Latenza messaggi OCPP P99 (Time Series)
{
"title": "OCPP Message Latency P99",
"type": "timeseries",
"targets": [{
"expr": "histogram_quantile(0.99, rate(ocpp_message_duration_seconds_bucket[5m]))",
"legendFormat": "P99 - {{action}}"
}]
}
// 3. Energia erogata (Stat panel)
{
"title": "Energia Totale Erogata oggi (kWh)",
"type": "stat",
"targets": [{
"expr": "increase(csms_energy_delivered_wh_total[24h]) / 1000",
"legendFormat": "kWh"
}]
}
// 4. Auth rejection rate - alert su attacchi
{
"title": "Rejection Rate Autorizzazioni (%)",
"type": "timeseries",
"targets": [{
"expr": "rate(csms_authorization_results_total{status='Invalid'}[5m]) / rate(csms_authorization_results_total[5m]) * 100"
}]
}
イタリアおよびヨーロッパの規制: AFIR、PNIRE、PNRR
イタリアとヨーロッパのEV充電インフラは厳しく規制されています。 規制遵守は任意ではありません。規制遵守は、次の両方の技術要件に関係します。 ステーション(法定計量、アクセシビリティ、支払い)および通信規格。
AFIR (代替燃料インフラ規制 - EU 2023/1804)
AFIR は 2024 年 4 月に発効し、正確なスケジュールを定義しています。 TEN-T ネットワークおよび都市部のインフラストラクチャへの充電義務:
| 有効期限 | 要件 | 適用性 |
|---|---|---|
| 2025 年 12 月 31 日 | TEN-T コア ネットワーク上で 60 km ごとに 150 kW 以上のステーション | EU全体 |
| 2027 年 12 月 31 日 | ステーション >= 150 kW 60 km ごと TEN-T 包括的なネットワーク | EU全体 |
| 2025 年 4 月 14 日 | 無料の静的および動的データ (場所、コネクタの種類、可用性) | 公共駅 |
| 2027 年 1 月 1 日 | 22 kWを超える新規/改修ステーション向けのスマート充電対応 | 公共駅 |
| すぐに | サブスクリプションなしのアドホック支払い (非接触型銀行カード) | 50 kW を超える公共ステーション |
| 2026年以降 | V2G 対応ステーション用の ISO 15118 | 双方向ステーション |
イタリアのPNIREとPNRR
イタリアでは、AFIR の導入は 2 つの主要なツールを通じて行われます。
- PNIRE (国家充電インフラ計画): MASE (環境エネルギー安全省) によって管理されており、 国家目標: 2025 年に公共駅を 13,755 か所、道路網に重点を置く および都市部
- PNRR ミッション 2、投資 4.3:7億ユーロ以上 高速道路および地域での高出力充電 (HPC >= 150 kW) に割り当てられます。 サービス。 MEMORY によると、PNRR は総額 127 億ユーロを割り当てました。 まだ部分的に使用されています
- MASE コール 2024 ~ 2025: 民間事業者に対するインセンティブ 充電密度が低い地域(イタリア南部、 田舎)。設置費用の最大60%を補助
イタリアの状況: 2025 年に 73,000 の充電ポイント
2025 年 12 月 31 日現在、イタリアがカウントされます 73,000 以上の公共充電ポイント (2024 年と比較して +18%)、全国領土を 93% カバーしています。そのうち: 約 12,000 箇所が急速充電ポイント (> 22 kW)、5,000 箇所が HPC です。 (>= 150 kW)。最も多くのインフラが整備されている地域はロンバルディア州 (23%) で、次に ラツィオ州 (12%) とトスカーナ州 (9%) からの。南部は依然として私にとっての優先地域である 2026 年までにギャップを埋めることを目的とした PNRR の資金提供。
CSMS ソフトウェアの技術的義務
- OCPI (オープンチャージポイントインターフェース): CPO (Charge Point Operator) と eMSP (e-Mobility Service Provider) 間のローミングに必須のプロトコル。 OCPI バージョン 2.2.1 を推奨
- 法定計量学: ドイツ (アイヒレヒト) および EU (MID - 測定機器指令) では、測定システムは認証され、測定値は透過的でユーザーが変更できないものでなければなりません。
- GDPR: 充電セッション データ (RFID、場所、時間) は個人データです。プライバシー ポリシー、データの最小化、忘れられる権利が必要
- CDR (料金明細記録): 相互運用可能な請求書発行のための OCPI 形式に準拠し、少なくとも 5 年間保存する必要があります (イタリアの納税義務)
ケーススタディ: 50 以上のステーションを備えたイタリアの充電ネットワーク
イタリアのオペレーター向けの実際の CSMS システムのアーキテクチャを調べてみましょう。 都市の駐車場に分散された 50 台の DC 充電ステーション (22 ~ 150 kW) を管理 3つの地域にあるショッピングセンター。
動作要件
- 50 ステーション、合計 150 EVSE (ステーションあたり平均 3 EVSE)、300 コネクタ (CCS2 + Type2)
- 毎日のピーク: 400 ~ 600 回の充電セッション (午前 7 ~ 9 時、午後 12 ~ 2 時、午後 5 ~ 9 時)
- スマート充電: ステーションあたり 200A 制限への準拠、SEM (サイト エネルギー マネージャー) との統合
- マルチテナント: 部分的なステーション可視性を備えた 3 つの CPO、Enel による OCPI ローミング
- 稼働時間 SLA: ステーションごとに月間 99.5%、CSMS バックエンドについては 99.9%
選択されたアーキテクチャ
+------------------+ +------------------+ +------------------+
| 50 Stazioni | | HAProxy | | CSMS Primary |
| OCPP 2.0.1 |---->| (WS sticky sess.)|---->| Python asyncio |
| Security Profile 2| | Port 443 (TLS) | | 2 replicas |
+------------------+ +------------------+ +--------+---------+
|
+------------------+ +--------+---------+
| CSMS Worker |<----| Redis Cluster |
| (FastAPI REST) | | (stato sessioni) |
| Dashboard, API | +------------------+
+------------------+
|
+-----------+----------+
| |
+-------+-------+ +---------+------+
| PostgreSQL 16 | | TimescaleDB |
| (transazioni, | | (meter values)|
| auth, device | | 2TB/anno est. |
| model, CDR) | +---------------+
+---------------+
Monitoring: Prometheus + Grafana Cloud
Alerting: PagerDuty (P1: stazione offline >5min, P2: CSMS latency >2s)
CDR/Billing: integrazione ERP via webhook PostgreSQL NOTIFY
実際の運用指標 (通常の月)
| メトリック | 価値 | 注意事項 |
|---|---|---|
| セッション/月 | ~14,000 | 平均 280 セッション/日 |
| 供給されるエネルギー | ~85,000kWh/月 | 平均 6 kWh/セッション |
| 平均的なステーションの可用性 | 99.3% | 0.7% のダウンタイム = ステーションあたり約 5 時間/月 |
| 認証受け入れ率 | 96.8% | 3.2% 拒否 = 期限切れまたは未登録 |
| OCPP レイテンシ P95 | 180ミリ秒 | 往復LTEステーションを含む |
| スマート充電イベント/日 | ~1,200 | 平均 24 回/時間のリバランス |
| ピークシェービング効果 | 92% | 92% の時間の電力 <= 設定された制限値 |
CSMS セキュリティ: OWASP と脅威モデル
CSMS は重要なインフラストラクチャです。侵害は混乱を引き起こす可能性があります 充電、請求操作、または電力網への攻撃 充電負荷を介して。主な脅威は次のように識別できます。 特定の脅威モデル。
CSMS 脅威モデル
| 脅威 | ベクター | インパクト | 緩和 |
|---|---|---|---|
| 不正局 | 盗まれた認証情報との接続 | 誤ったデータの挿入、不正な使用 | セキュリティ プロファイル 3 (mTLS)、認定ピン接続 |
| 中間者 | 安全でないネットワークでの WS インターセプト | RFIDトークンの傍受、コマンド操作 | TLS 1.3 必須、認定された透明性 |
| リプレイアタック | キャプチャされた OCPP メッセージの再送信 | 二重請求、無効な権限 | 一意の MessageId、タイムスタンプ検証、ノンス |
| DDoS Webソケット | 接続またはメッセージの洪水 | CSMS が到達不能、インフラストラクチャ DoS | レート制限、接続スロットル、WAF |
| OCPPペイロードによるSQLインジェクション | idToken フィールドに SQL ペイロードを含む OCPP ペイロード | DB の抽出、権限昇格 | プリペアドステートメント、ORM、入力検証 |
| RFIDのクローン作成 | 正規の RFID カードのクローン作成 | 他のユーザーが支払ったセッション | ISO 15118 P&C、RFID ホワイトリスト、異常検出 |
| 悪意のあるファームウェア | マルウェアによるファームウェアアップデート | ステーションの物理制御、グリッド操作 | ファームウェアデジタル署名、セキュアブート、SBOM |
CSMS 強化: セキュリティ チェックリスト
import ssl
import re
from functools import wraps
# 1. Configurazione TLS sicura (Security Profile 2-3)
def create_tls_context(
certfile: str,
keyfile: str,
cafile: str,
require_client_cert: bool = False
) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.load_cert_chain(certfile=certfile, keyfile=keyfile)
if require_client_cert: # Security Profile 3 (mTLS)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(cafile=cafile)
# Disabilita cipher suite deboli
ctx.set_ciphers(
'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:!aNULL:!eNULL:!LOW:!EXPORT'
)
return ctx
# 2. Rate limiting per connessioni WebSocket
from collections import defaultdict
import time
_connection_attempts: dict[str, list[float]] = defaultdict(list)
MAX_CONN_PER_MINUTE = 10
def check_rate_limit(client_ip: str) -> bool:
"""Ritorna True se il client può connettersi, False se throttled."""
now = time.monotonic()
window = _connection_attempts[client_ip]
# Rimuovi tentativi più vecchi di 60 secondi
_connection_attempts[client_ip] = [
t for t in window if now - t < 60
]
if len(_connection_attempts[client_ip]) >= MAX_CONN_PER_MINUTE:
log.warning(f"Rate limit superato per IP: {client_ip}")
return False
_connection_attempts[client_ip].append(now)
return True
# 3. Validazione MessageId per prevenire replay attack
_seen_message_ids: set[str] = set()
_message_id_pattern = re.compile(r'^[a-zA-Z0-9\-_\.]{1,36}






