OCPP 2.x: 기업용 EV 충전 시스템 구축
글로벌 전기차 충전 인프라 시장이 따라잡았다 2025년에는 402억 2천만 달러 2033년까지 CAGR 25%로 성장할 것입니다. 그랜드뷰 리서치에 따르면 유럽에서는 AFIR(대체연료) 인프라 규정)은 구속력 있는 기한을 부과합니다: TEN-T 네트워크에서 60km마다 2025년 말까지 최소 150kW의 발전소가 있어야 합니다. 이탈리아에서는 너머 2025년 12월 31일까지 공공 충전소 73,000개 그리고 PNRR 고출력 충전 부문과 산업 분야에 7억 유로 이상을 할당했습니다. 완전 폭발.
이 인프라의 핵심은OCPP(개방형 충전 포인트 프로토콜), OCA(Open Charge Alliance)에서 개발한 개방형 표준으로, 충전소(Charging Station)는 중앙 관리 시스템과 통신합니다. (CSMS - 충전소 관리 시스템). 250개 이상의 조직에서 채택됨 40개 이상의 국가에서 OCPP는 업계의 사실상 표준이 되었으며, 다양한 제조업체의 하드웨어와 다양한 관리 소프트웨어 간의 상호 운용성.
이 고급 기술 기사에서 우리는 탐구합니다. OCPP 2.0.1 및 2.1 안으로 깊이: WebSocket 아키텍처, 메시지 구조, 장치 모델, 프로필 보안, 스마트 충전 및 확장 가능한 프로덕션 지원 CSMS 백엔드 구축 방법 TypeScript 및 Python의 전체 코드 예제가 포함된 10~100,000개의 스테이션.
무엇을 배울 것인가
- 1.2에서 2.1로의 OCPP 프로토콜의 진화와 근본적인 아키텍처 차이점
- JSON 메시지 구조(CALL, CALLRESULT, CALLERROR) 및 WebSocket 전송
- OCPP 2.0.1의 16개 기능 블록과 계층적 장치 모델
- 3가지 보안 프로필: 기본 인증, TLS, X.509 인증서가 포함된 mTLS
- 고급 스마트 충전: SetChargingProfile, 부하 관리, 피크 절감, 태양광 통합
- asyncio 및 PostgreSQL을 사용하여 Python에서 CSMS 백엔드 구현
- ISO 15118 및 플러그 앤 충전: PKI, eMAID, 양방향 V2G
- 확장 가능한 아키텍처: WebSocket 클러스터링 및 Kafka를 통해 10~100,000개의 충전 지점
- 모니터링, Grafana 대시보드 및 주요 운영 지표
- 충전 인프라에 대한 AFIR, PNIRE 규정 및 이탈리아 인센티브
EnergyTech 시리즈: 디지털 에너지에 관한 10가지 기사
이 기사는 다음과 관련된 시리즈의 첫 번째 기사입니다.에너지테크: 프로토콜, 전력 관리에 혁명을 일으키는 아키텍처와 소프트웨어, EV 충전부터 스마트 그리드까지, BESS 시스템부터 AI를 통한 에너지 최적화까지.
| # | Articolo | 기술 | 수준 |
|---|---|---|---|
| 1 | OCPP 2.x 프로토콜: EV 충전 시스템 구축(현재 위치) | OCPP, 웹소켓, 파이썬, ISO 15118 | 고급의 |
| 2 | 스마트 그리드 및 OpenADR: 수요 대응 및 에너지 유연성 | OpenADR, IEEE 2030.5, REST, MQTT | 고급의 |
| 3 | BESS(배터리 에너지 저장 장치): 최적화 알고리즘 및 BMS | Python, LP 최적화, CAN 버스, Modbus | 고급의 |
| 4 | Kafka 및 머신 러닝을 활용한 전기 네트워크용 디지털 트윈 | Kafka, InfluxDB, Grafana, ML, Python | 고급의 |
| 5 | 중요 인프라를 위한 SCADA 및 ICS: 보안 및 프로토콜 | 모드버스, DNP3, IEC 61850, OPC UA | 고급의 |
| 6 | AI를 통한 에너지 최적화: 소비 예측 및 수요 예측 | TensorFlow, LSTM, Prophet, FastAPI | 고급의 |
| 7 | 가상 발전소: Python 및 REST API를 사용하여 DER 집계 | DER, DERMS, REST, Python, PostgreSQL | 중급 |
| 8 | 에너지 시장 및 알고리즘 거래: EPEX SPOT 및 API | Python, API 거래, 시계열 | 고급의 |
| 9 | 탄소 회계 소프트웨어: 범위 1, 2, 3 및 GHG 보고 | Python, GHG 프로토콜, API, 보고 | 중급 |
| 10 | 마이크로그리드 및 에너지 아일랜드: 탄력적인 아키텍처 | 마이크로그리드, EMS, 엣지 컴퓨팅, IoT | 고급의 |
프로토콜의 진화: OCPP 1.2에서 2.1로
OCPP의 진화를 이해하는 것은 아키텍처 선택을 이해하는 데 필수적입니다. 버전 2.0.1을 사용하고 레거시 시스템에서 마이그레이션을 계획합니다. 프로토콜이 탄생했습니다 2010년에는 이 문제를 해결하기 위해상호 운용성: 제조사마다 의 스테이션이 자체 독점 프로토콜을 갖고 있어 관리가 불가능했습니다. 다중 공급업체.
| 버전 | 년도 | 수송 | 주요 특징 | 전개 |
|---|---|---|---|---|
| OCPP 1.2 | 2010년 | 비누/XML | 최초 공개 버전, 기본 작업: 부팅, 승인, 시작/중지 | 폐기됨 |
| OCPP 1.5 | 2012년 | 비누/XML | 예약, 스마트 충전대, 데이터 전송, 재설정 | 유산 |
| OCPP 1.6 | 2015년 | SOAP + JSON/WS | WebSocket, 프로필 로드, 메시지 트리거, 로컬 인증 목록 | 매우 널리 퍼져 있음 |
| OCPP 2.0 | 2018 | JSON/WS | 장치 모델, 기능 블록, ISO 15118 기반(2.0.1로 대체) | 희귀한 |
| OCPP 2.0.1 | 2020 | JSON/WS 전용 | 16개의 기능 블록, 장치 모델, 3개의 보안 프로필, 고급 스마트 충전 | 현재 표준 |
| OCPP 2.1 | 2025년 | JSON/WS 전용 | 이전 버전과 호환되는 2.0.1, V2G ISO 15118-20, 기본 충전, 배터리 교체 | 신흥 |
OCPP 1.6과 2.0.1의 근본적인 차이점
OCPP 2.0.1은 단순한 증분 업데이트가 아닙니다. 다시 쓰기 완벽한 건축 용어, 메시지 구조 및 개념적 모델. 이러한 비호환성은 "설계상" 필요했습니다. OCPP 1.6의 구조적 한계를 극복합니다.
| 나는 기다린다 | OCPP 1.6 | OCPP 2.0.1 |
|---|---|---|
| 서버 용어 | 중앙시스템 | CSMS(충전소 관리 시스템) |
| 클라이언트 용어 | 충전 포인트 | 충전소 |
| 충전 장치 | 커넥터 | EVSE(전기차 공급 장비) |
| 업무 | StartTransaction / StopTransaction | 통합 트랜잭션 이벤트(시작됨/업데이트됨/종료됨) |
| 구성 | 고정 키(ChangeConfiguration) | 계층적 장치 모델(GetVariables/SetVariables) |
| 안전 | 선택 사항, 표준화되지 않음 | 3개의 통합 및 필수 보안 프로필 |
| 스마트 충전 | 베이스(커넥터 프로파일) | 고급: EVSE용, 스택 우선순위, 복합 일정 |
| ISO 15118 | 지원되지 않음 | 네이티브 블록 M(플러그 앤 충전) |
| 특정 조직 | 단순 작업 목록 | 사용 사례, 요구 사항, 다이어그램이 포함된 16개의 기능 블록 |
OCPP 2.1: 2025년의 새로운 기능
Open Charge Alliance에서 2025년 1월에 출시된 OCPP 2.1은 전체 버전을 유지합니다. 2.0.1과 이전 버전과 호환되며 미래를 위한 중요한 기능을 추가합니다.
- 고급 V2G(Vehicle-to-Grid).: 양방향 전력 전송으로 ISO 15118-20을 완벽하게 지원하여 EV를 가상 발전소로 구현
- DER 통합: 태양광 패널 및 저장 시스템과 같은 리소스를 사용하여 분산 에너지 최적화를 위한 고급 도구
- 기본 가격: 공급업체별 확장 없이 실시간 요금(kWh, 시간, 주차 요금)을 전달하기 위한 표준화된 데이터 구조
- 배터리 교환: 이륜차, 삼륜차 배터리 교환 스테이션 지원
- 거래 재개: 강제 재부팅 후 데이터 손실 없이 트랜잭션 재개 가능
- 현지 비용: 오프라인 건의 경우 스테이션에서 직접 비용 계산
WebSocket 통신 아키텍처
OCPP 2.0.1은 독점적으로 사용합니다. WebSocket을 통한 JSON 프로토콜로 전송, SOAP/XML을 완전히 포기합니다. 이 아키텍처 선택은 다음을 제공합니다. 지속적인 양방향 통신, 낮은 대기 시간, 경량 페이로드 및 호환성 최신 웹 인프라를 기본으로 제공합니다.
클라이언트-서버 토폴로지
OCPP 모델에서는 충전소 다음과 같이 행동한다 클라이언트 웹소켓 그리고 CSMS ~처럼 웹소켓 서버. 충전소는 연결을 시작하고 메커니즘을 통해 이를 활성 상태로 유지합니다. 심장 박동의. CSMS는 동일한 연결의 스테이션에 명령을 보낼 수 있습니다. WebSocket 열기, 역방향 연결 필요 없음(폴링 없음, 푸시 없음) 별도).
Charging Station CSMS
| |
|--- WebSocket CONNECT ---------------->|
| wss://csms.example.com/ocpp/CS001 |
| Sec-WebSocket-Protocol: ocpp2.0.1 |
| Authorization: Basic base64(...) |
| |
|<-- HTTP 101 Switching Protocols -------|
| Sec-WebSocket-Protocol: ocpp2.0.1 |
| |
|--- BootNotification.req ------------->|
|<-- BootNotification.conf --------------|
| (interval: 300, status: Accepted) |
| |
|--- StatusNotification.req[EVSE1] ---->|
|<-- StatusNotification.conf ------------|
| |
|--- Heartbeat.req (ogni 300s) -------->|
|<-- Heartbeat.conf --------------------|
| |
| <-- utente avvicina RFID --- |
|--- Authorize.req -------------------->|
|<-- Authorize.conf (Accepted) ----------|
| |
|--- TransactionEvent(Started) -------->|
|<-- TransactionEvent.conf -------------|
| |
|<-- SetChargingProfile.req ------------| (CSMS gestisce load)
|--- SetChargingProfile.conf ---------->|
| |
|--- MeterValues (ogni 60s) ----------->|
|<-- MeterValues.conf ------------------|
| |
|--- TransactionEvent(Ended) ---------->|
|<-- TransactionEvent.conf -------------|
연결 URL 및 하위 프로토콜
충전소는 자체 URL이 포함된 CSMS에 연결됩니다.
고유 식별자 경로의 마지막 세그먼트로 사용됩니다.
WebSocket 하위 프로토콜 ocpp2.0.1 악수 중에 협상됩니다.
프로토콜 버전 호환성을 보장하기 위한 HTTP입니다.
# Formato URL
wss://csms.example.com/ocpp/{chargingStationId}
# Esempi reali
wss://csms.example.com/ocpp/IT-MIL-STATION-001
wss://csms.example.com/ocpp/EVSE-PARK-NORD-042
wss://csms.example.com/ocpp/CPO-AUTOGRILL-A7-01
# Headers WebSocket obbligatori
Sec-WebSocket-Protocol: ocpp2.0.1
Authorization: Basic {base64(stationId:password)} # Security Profile 1-2
# Con Security Profile 3 (mTLS): nessun header Authorization,
# l'autenticazione avviene tramite certificato client TLS
OCPP 메시지 구조 2.0.1
OCPP 2.0.1은 세 가지 유형의 JSON 메시지를 정의하며 모두 프레임으로 전송됩니다. 웹소켓 텍스트. 각 메시지는 JSON 배열 형식으로 메시지 유형에 따라 정확합니다. 이 간단한 구조로 인해 구문 분석이 더 쉬워집니다. 디버깅과 SOAP/XML 오버헤드 비교.
CALL(요청) - MessageTypeId 2
메시지 부르다 한 당사자가 보낸 요청을 나타냅니다. (충전소 또는 CSMS)를 다른 곳으로. 고유 ID, 작업 이름이 포함되어 있습니다. 그리고 요청 페이로드.
// Formato: [MessageTypeId, MessageId, Action, Payload]
// Esempio: BootNotification dalla Charging Station
[2, "19223201", "BootNotification", {
"chargingStation": {
"model": "SuperCharger-500",
"vendorName": "EVPower Inc.",
"serialNumber": "SN-2025-00142",
"firmwareVersion": "3.2.1",
"modem": {
"iccid": "8939100000000000001",
"imsi": "310260000000001"
}
},
"reason": "PowerUp"
}]
// Esempio: TransactionEvent dalla Charging Station
[2, "tx-evt-001", "TransactionEvent", {
"eventType": "Started",
"timestamp": "2026-03-09T10:30:00Z",
"triggerReason": "CablePluggedIn",
"seqNo": 0,
"transactionInfo": {
"transactionId": "TXN-2026-0309-001",
"chargingState": "EVConnected"
},
"evse": { "id": 1, "connectorId": 1 },
"idToken": {
"idToken": "RFID-04A2B3C4D5",
"type": "ISO14443"
}
}]
CALLRESULT(응답) - MessageTypeId 3
메시지 호출 결과 그리고 CALL에 대한 긍정적인 반응. 는 MessageId는 원래 CALL의 것과 정확히 일치해야 합니다. 요청-응답 상관 관계.
// Formato: [MessageTypeId, MessageId, Payload]
// Risposta a BootNotification
[3, "19223201", {
"currentTime": "2026-03-09T10:00:00Z",
"interval": 300,
"status": "Accepted"
}]
// Risposta a TransactionEvent (Started)
[3, "tx-evt-001", {
"totalCost": 0,
"chargingPriority": 0,
"idTokenInfo": {
"status": "Accepted",
"groupIdToken": {
"idToken": "GROUP-FLEET-01",
"type": "Central"
}
}
}]
CALLERROR(오류) - MessageTypeId 4
메시지 발신자 수신자가 그렇지 않을 때 전송됩니다. CALL을 처리할 수 있습니다. 표준화된 오류 코드, 설명 포함 읽기 쉽고 구조화된 세부정보.
// Formato: [MessageTypeId, MessageId, ErrorCode, ErrorDescription, ErrorDetails]
[4, "19223201", "FormatViolation",
"Il campo 'vendorName' supera la lunghezza massima di 50 caratteri",
{
"field": "chargingStation.vendorName",
"maxLength": 50,
"actualLength": 67
}
]
// Codici di errore OCPP 2.0.1 standardizzati:
// FormatViolation - messaggio JSON malformato
// GenericError - errore generico non classificabile
// InternalError - errore interno del ricevente
// MessageTypeNotSupported - tipo di messaggio non supportato
// NotImplemented - azione riconosciuta ma non implementata
// NotSupported - azione non supportata dall'implementazione
// OccurrenceConstraintViolation - violazione cardinalita elementi
// PropertyConstraintViolation - vincolo su una proprietà violato
// ProtocolError - violazione del protocollo OCPP
// RpcFrameworkError - errore nel framework RPC di base
// SecurityError - errore di sicurezza o autenticazione
// TypeConstraintViolation - tipo di dato non corretto
요청-응답 상관관계: 중요한 규칙
각 통화에는 고유한 메시지 ID (최대 36자 영숫자)는 이전에 동일한 연결에서 사용되지 않았습니다. 같은 발신자. CALLRESULT 또는 CALLERROR는 이를 사용해야 합니다. 동일한 메시지 ID. 발신자는 시간 초과(권장: 30초)를 유지해야 하며 그 이후에는 요청은 실패한 것으로 간주됩니다. 한 번에 하나의 통화만 대기할 수 있습니다. 통신의 각 방향: 스테이션은 두 번째 CALL을 보낼 수 없습니다. 첫 번째 응답을받지 못했습니다.
OCPP 2.0.1의 16가지 기능 블록
OCPP 2.0.1은 모든 기능을 다음과 같이 구성합니다. 16개의 기능 블록 (A~P), 각각은 세부 요구사항이 있는 특정 사용 사례를 포함합니다. 사전 조건 및 시퀀스 다이어그램. 이 모듈식 조직은 다음을 허용합니다. 구현자는 자신이 지원하는 블록을 선언하고 테스터는 검증해야 합니다. 블록별 준수.
| 차단하다 | 이름 | 주요 메시지 | 의무사항 |
|---|---|---|---|
| A | 보안 | SecurityEventNotification, SignCertificate, CertificateSigned | Si |
| B | 프로비저닝 | BootNotification, SetVariables, GetVariables, NotifyReport | Si |
| C | 권한 부여 | 승인, ClearCache, GetLocalListVersion | Si |
| D | 로컬 인증 목록 | SendLocalList, GetLocalListVersion | No |
| E | 거래 | TransactionEvent, GetTransactionStatus, MeterValues | Si |
| F | 원격 제어 | RequestStartTransaction, RequestStopTransaction, UnlockConnector | No |
| G | 유효성 | 상태알림, 변경 가용성, 하트비트 | Si |
| H | 예약 | 지금 예약, 취소예약 | No |
| I | 관세 및 비용 | 비용 업데이트됨, ShowMessage | No |
| J | 측광 | MeterValues (에너지/전력/전류 측정) | Si |
| K | 스마트 충전 | SetChargingProfile, ClearChargingProfile, GetChargingProfiles, ReportChargingProfiles | No |
| L | 펌웨어 관리 | 업데이트펌웨어, 펌웨어상태알림 | No |
| M | ISO 15118 인증서 관리 | Get15118EVCertificate, DeleteCertificate, CertificateSigned | No |
| N | 진단 | GetLog, LogStatusNotification, SetMonitoringBase, SetVariableMonitoring | No |
| O | 메시지 표시 | SetDisplayMessage, GetDisplayMessages, ClearDisplayMessage | No |
| P | 데이터 전송 | DataTransfer(공급업체별 확장) | No |
장치 모델: OCPP 2.0.1의 핵심
Il 장치 모델 OCPP 2.0.1의 주요 아키텍처 혁신입니다. OCPP 1.6의 하드 구성 키 시스템을 대체합니다(ChangeConfiguration 스패너 포함) 유연한 계층적 모델 기반으로 구성 요소 및 변수. 각 스테이션은 해당 스테이션의 구조를 완벽하게 설명하고 공급업체에 구애받지 않는 방식으로 구성합니다.
ChargingStation (radice)
|
+-- Controller (computer della stazione)
| +-- Variables: Vendor, Model, FirmwareVersion, SerialNumber
|
+-- EVSE[1] (punto di ricarica 1)
| +-- Variables: AvailabilityState, Power, SupplyPhases
| +-- Connector[1] (connettore CCS2 / DC)
| | +-- Variables: ConnectorType, AvailabilityState, MaxCurrent
| +-- Connector[2] (connettore CHAdeMO)
| +-- Variables: ConnectorType, AvailabilityState, MaxCurrent
|
+-- EVSE[2] (punto di ricarica 2)
| +-- Connector[1] (connettore Type2 AC / 22kW)
| +-- Variables: ConnectorType, Phases, MaxCurrent
|
+-- PowerMeter (contatore principale)
| +-- Variables: Energy.Active.Import.Register, Power.Active.Import
|
+-- NetworkInterface (ETH0/LTE)
| +-- Variables: Type, SSID, SignalStrength, ActiveNetworkProfile
|
+-- SecurityCtrlr
| +-- Variables: SecurityProfile, CertificateEntries
|
+-- SmartChargingCtrlr
+-- Variables: ChargingProfileMaxStackLevel, ChargeProfileKindsSupported
// CALL dal CSMS: legge stato EVSE e tipo connettore
[2, "get-var-001", "GetVariables", {
"getVariableData": [
{
"component": { "name": "EVSE", "evse": { "id": 1 } },
"variable": { "name": "AvailabilityState" },
"attributeType": "Actual"
},
{
"component": { "name": "Connector", "evse": { "id": 1, "connectorId": 1 } },
"variable": { "name": "ConnectorType" },
"attributeType": "Actual"
},
{
"component": { "name": "SmartChargingCtrlr" },
"variable": { "name": "ChargingProfileMaxStackLevel" },
"attributeType": "Actual"
}
]
}]
// CALLRESULT dalla Charging Station
[3, "get-var-001", {
"getVariableResult": [
{
"attributeStatus": "Accepted",
"component": { "name": "EVSE", "evse": { "id": 1 } },
"variable": { "name": "AvailabilityState" },
"attributeValue": "Available"
},
{
"attributeStatus": "Accepted",
"component": { "name": "Connector", "evse": { "id": 1, "connectorId": 1 } },
"variable": { "name": "ConnectorType" },
"attributeValue": "cCCS2"
},
{
"attributeStatus": "Accepted",
"component": { "name": "SmartChargingCtrlr" },
"variable": { "name": "ChargingProfileMaxStackLevel" },
"attributeValue": "5"
}
]
}]
OCPP 보안 프로필 2.0.1
OCPP 2.0.1에는 세 가지 기능이 도입되었습니다. 점진적 보안 프로필 정의하는 충전소와 CSMS 간 통신 보호 수준. 프로필 배포 중에 선택해야 하며 메커니즘을 자동으로 구성합니다. 인증 및 암호화.
| 특성 | 프로필 1 | 프로필 2 | 프로필 3 |
|---|---|---|---|
| 웹소켓 URL | ws://(TLS 없음) | wss://(TLS) | wss://(TLS) |
| 암호화 | 없음 | TLS 1.2+ | TLS 1.2+ |
| 인증 스테이션 | 비밀번호(기본인증) | 비밀번호(기본인증) | X.509 클라이언트 인증서 |
| 인증 CSMS | 없음 | TLS 서버 인증서 | TLS 서버 인증서 |
| MitM 보호 | No | 부분(CSMS 인증만 해당) | 전체(상호 TLS) |
| 인증서 관리 | 필요하지 않음 | 장치의 루트 CA만 | 전체 PKI: CA + 클라이언트 인증서 |
| 권장 용도 | 테스트 환경만 | 표준 생산 | 중요 생산, P&C ISO 15118 |
프로덕션 인증서 관리(보안 프로필 3)
보안 프로필 3을 사용하면 인증서 수명주기 관리가 중요한 작업. OCPP 2.0.1에는 전용 메시지가 포함되어 있습니다. 서명증명서 (스테이션에는 CSR 서명이 필요합니다) 인증서서명됨 (CSMS는 서명된 인증서를 설치합니다) 인증서 삭제 (사용되지 않는 인증서 제거) GetInstalledCertificateIds (설치된 인증서 목록). 그리고 하나는 필수 강력한 PKI 최소한 자동 갱신으로 유효기간 30일 전부터 유효성 및 유효성에 대한 지속적인 모니터링 CRL/OCSP 해지 메커니즘.
스마트 충전 및 부하 관리
Lo 스마트 충전 (K 블록) 및 가장 중요한 기능 대규모 설비를 갖춘 운영자. CSMS가 동적으로 제어할 수 있도록 허용 네트워크 제약, 에너지 요금, 사용자 우선순위 및 변압기 용량.
충전 프로필의 계층 구조
OCPP 2.0.1은 스택 수준(우선순위)을 사용하여 네 가지 유형의 충전 프로필을 정의합니다.
| 프로필 유형 | 범위 | 애플리케이션 | 보수 |
|---|---|---|---|
| 충전 스테이션최대 프로필 | 전체 국의 절대 최대 한도 | 변압기 보호, 공급 계약 | 재정의할 수 없음 |
| 충전 스테이션외부 제약 조건 | 외부 시스템(DSO, 수집자)의 제한 | 수요반응, 네트워크 밸런싱 | 더 높은 프로필에서만 |
| TxDefault프로필 | 거래에 대한 기본 프로필 | 관세 정책, 기본 스케줄링, 태양광 | 특정 TxProfile에서 |
| Tx프로필 | 거래에 대한 특정 프로필 | 사용자 우선순위, 개인 선호도 | 최대 보안 |
SetChargingProfile: 피크 감소 및 태양광 통합
// Strategia: integra produzione solare + peak shaving ore serali
// Scenario: sito con 50kW fotovoltaico, trasformatore 100A, picco 19-21h
// 1. Limite massimo stazione (rispetta contratto di fornitura)
[2, "smart-max-001", "SetChargingProfile", {
"evseId": 0,
"chargingProfile": {
"id": 1,
"stackLevel": 1,
"chargingProfilePurpose": "ChargingStationMaxProfile",
"chargingProfileKind": "Absolute",
"chargingSchedule": [{
"id": 1,
"chargingRateUnit": "A",
"chargingSchedulePeriod": [
{ "startPeriod": 0, "limit": 100.0 }
]
}]
}
}]
// 2. Profilo solare + peak shaving per un singolo EVSE
[2, "smart-solar-001", "SetChargingProfile", {
"evseId": 1,
"chargingProfile": {
"id": 100,
"stackLevel": 0,
"chargingProfilePurpose": "TxDefaultProfile",
"chargingProfileKind": "Absolute",
"validFrom": "2026-03-09T00:00:00Z",
"validTo": "2026-03-10T00:00:00Z",
"chargingSchedule": [{
"id": 1,
"chargingRateUnit": "A",
"startSchedule": "2026-03-09T06:00:00Z",
"chargingSchedulePeriod": [
{ "startPeriod": 0, "limit": 8.0, "numberPhases": 3 },
{ "startPeriod": 7200, "limit": 32.0, "numberPhases": 3 },
{ "startPeriod": 14400, "limit": 32.0, "numberPhases": 3 },
{ "startPeriod": 43200, "limit": 16.0, "numberPhases": 3 },
{ "startPeriod": 46800, "limit": 8.0, "numberPhases": 3 },
{ "startPeriod": 54000, "limit": 24.0, "numberPhases": 3 }
]
}]
}
}]
// Orario potenza: 06-08h: 8A (offpeak, bassa produzione solare)
// 08-12h: 32A (piena produzione FV, massima potenza)
// 12-18h: 32A (picco solare, alta produzione)
// 18-19h: 16A (calo solare, riduzione)
// 19-21h: 8A (picco domanda residenziale, min potenza)
// 21-24h: 24A (fine picco, potenza media)
동적 로드 밸런싱 알고리즘
부하 관리 알고리즘은 세션 간에 사용 가능한 전력을 분배합니다. 변압기 한계와 우선순위를 존중하면서 실시간으로 활성화됩니다. 가장 일반적인 접근 방식은 가중 공정 공유 최소/최대 제약 조건이 있습니다.
interface ChargingSession {
readonly stationId: string;
readonly evseId: number;
readonly transactionId: string;
readonly priority: number; // 0-9 (9 = massima)
readonly minChargingRate: number; // A minimi per caricare
readonly maxChargingRate: number; // A massimi del connettore
readonly currentChargingRate: number;
readonly energyDelivered: number; // Wh totali erogati
readonly targetEnergy?: number; // Wh target (se specificato dall'utente)
readonly isEV3Phase: boolean; // Veicolo trifase
}
interface LoadBalancerConfig {
readonly maxSitePowerAmps: number; // A max del trasformatore
readonly reservedBuildingAmps: number; // A riservati per l'edificio
readonly minSessionAmps: number; // A minimi per sessione (tipico: 6A)
readonly rebalanceIntervalSec: number; // Secondi tra ricalcoli (tipico: 30s)
}
interface Allocation {
readonly stationId: string;
readonly evseId: number;
readonly allocatedAmps: number;
readonly phases: number;
}
function calculateChargingAllocations(
sessions: ReadonlyArray<ChargingSession>,
config: LoadBalancerConfig
): ReadonlyArray<Allocation> {
if (sessions.length === 0) return [];
const availableAmps = config.maxSitePowerAmps
- config.reservedBuildingAmps;
// Step 1: ordina per priorità (desc), poi energia erogata (asc = meno carico prima)
const sorted = [...sessions].sort((a, b) => {
if (b.priority !== a.priority) return b.priority - a.priority;
return a.energyDelivered - b.energyDelivered;
});
// Step 2: garantisci potenza minima a tutti
const minRequired = sorted.length * config.minSessionAmps;
if (minRequired > availableAmps) {
// Caso critico: potenza insufficiente, sospendi sessioni a bassa priorità
return sorted
.slice(0, Math.floor(availableAmps / config.minSessionAmps))
.map((s) => ({
stationId: s.stationId,
evseId: s.evseId,
allocatedAmps: config.minSessionAmps,
phases: s.isEV3Phase ? 3 : 1,
}));
}
// Step 3: distribuzione proporzionale ai pesi di priorità
const totalWeight = sorted.reduce(
(sum, s) => sum + (1 + s.priority), 0
);
const remainingAmps = availableAmps - minRequired;
const allocations = sorted.map((session) => {
const weight = (1 + session.priority) / totalWeight;
const bonus = remainingAmps * weight;
const raw = config.minSessionAmps + bonus;
// Applica vincoli min/max del connettore
const allocatedAmps = Math.max(
config.minSessionAmps,
Math.min(session.maxChargingRate, Math.round(raw * 10) / 10)
);
return {
stationId: session.stationId,
evseId: session.evseId,
allocatedAmps,
phases: session.isEV3Phase ? 3 : 1,
};
});
// Step 4: verifica finale che il totale non superi il limite
const total = allocations.reduce((s, a) => s + a.allocatedAmps, 0);
if (total <= availableAmps) return allocations;
// Riscaling proporzionale
const scale = availableAmps / total;
return allocations.map((a) => ({
...a,
allocatedAmps: Math.max(
config.minSessionAmps,
Math.round(a.allocatedAmps * scale * 10) / 10
),
}));
}
Python 및 PostgreSQL을 사용한 CSMS 백엔드 구현
파이썬 라이브러리 ocpp MobilityHouse 작성(오픈 소스, GitHub의 별 2000개 이상)
CSMS에 대한 가장 널리 사용되는 참조 구현입니다. 우리는 도서관과 결합합니다.
asyncio, websockets e asyncpg PostgreSQL용
프로덕션에 즉시 사용할 수 있는 백엔드를 구축하세요.
CSMS용 PostgreSQL 스키마
-- Registro stazioni di ricarica
CREATE TABLE charging_stations (
station_id TEXT PRIMARY KEY,
vendor_name TEXT NOT NULL,
model TEXT NOT NULL,
serial_number TEXT,
firmware_version TEXT,
security_profile SMALLINT NOT NULL DEFAULT 1,
last_boot_reason TEXT,
last_seen_at TIMESTAMPTZ,
is_online BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Stato EVSE e connettori
CREATE TABLE evse_status (
station_id TEXT NOT NULL REFERENCES charging_stations(station_id),
evse_id SMALLINT NOT NULL,
connector_id SMALLINT NOT NULL,
connector_type TEXT, -- cCCS2, cCHAdeMO, cType2, sType3
status TEXT NOT NULL DEFAULT 'Unknown',
error_code TEXT,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (station_id, evse_id, connector_id)
);
-- Transazioni di ricarica
CREATE TABLE transactions (
transaction_id TEXT PRIMARY KEY,
station_id TEXT NOT NULL REFERENCES charging_stations(station_id),
evse_id SMALLINT NOT NULL,
connector_id SMALLINT,
id_token TEXT NOT NULL,
id_token_type TEXT NOT NULL,
state TEXT NOT NULL DEFAULT 'Started',
started_at TIMESTAMPTZ NOT NULL,
ended_at TIMESTAMPTZ,
meter_start_wh NUMERIC(12, 3),
meter_end_wh NUMERIC(12, 3),
energy_wh NUMERIC(12, 3) GENERATED ALWAYS AS (meter_end_wh - meter_start_wh) STORED,
stop_reason TEXT,
total_cost_cents INTEGER,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Meter values (time series - usare TimescaleDB in produzione)
CREATE TABLE meter_values (
id BIGSERIAL PRIMARY KEY,
transaction_id TEXT REFERENCES transactions(transaction_id),
station_id TEXT NOT NULL,
evse_id SMALLINT NOT NULL,
sampled_at TIMESTAMPTZ NOT NULL,
energy_wh NUMERIC(12, 3),
power_w NUMERIC(10, 2),
current_a NUMERIC(8, 3),
voltage_v NUMERIC(8, 2),
soc_pct SMALLINT -- State of Charge da ISO 15118
);
CREATE INDEX idx_meter_values_station_time
ON meter_values(station_id, sampled_at DESC);
CREATE INDEX idx_transactions_station_id
ON transactions(station_id, started_at DESC);
-- Token di autorizzazione (lista locale cache)
CREATE TABLE authorization_cache (
id_token TEXT NOT NULL,
id_token_type TEXT NOT NULL,
status TEXT NOT NULL, -- Accepted, Invalid, Blocked, Expired
group_id TEXT,
expiry_date TIMESTAMPTZ,
cached_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id_token, id_token_type)
);
CSMS 전체 Python 백엔드
import asyncio
import logging
from datetime import datetime, timezone
from typing import Optional, Any
import asyncpg
import websockets
from ocpp.routing import on
from ocpp.v201 import ChargePoint as Cp
from ocpp.v201 import call, call_result
from ocpp.v201.enums import (
Action, RegistrationStatusType, AuthorizationStatusType,
ConnectorStatusType
)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(name)s %(levelname)s %(message)s'
)
log = logging.getLogger('csms')
# Pool globale connessioni PostgreSQL
_db_pool: Optional[asyncpg.Pool] = None
async def get_db() -> asyncpg.Pool:
global _db_pool
if _db_pool is None:
_db_pool = await asyncpg.create_pool(
dsn='postgresql://csms:password@localhost/csms_db',
min_size=5,
max_size=20,
)
return _db_pool
class ChargePointHandler(Cp):
"""
Handler OCPP 2.0.1 per una singola Charging Station.
Un'istanza per ogni connessione WebSocket attiva.
"""
@on(Action.boot_notification)
async def on_boot_notification(
self, charging_station: dict, reason: str, **kwargs
) -> call_result.BootNotification:
log.info(
f"Boot: {self.id} | "
f"{charging_station['vendor_name']} {charging_station['model']} "
f"| reason={reason}"
)
db = await get_db()
await db.execute(
"""
INSERT INTO charging_stations
(station_id, vendor_name, model, serial_number,
firmware_version, last_boot_reason, last_seen_at, is_online)
VALUES ($1, $2, $3, $4, $5, $6, NOW(), TRUE)
ON CONFLICT (station_id) DO UPDATE SET
vendor_name = EXCLUDED.vendor_name,
model = EXCLUDED.model,
firmware_version = EXCLUDED.firmware_version,
last_boot_reason = EXCLUDED.last_boot_reason,
last_seen_at = NOW(),
is_online = TRUE
""",
self.id,
charging_station['vendor_name'],
charging_station['model'],
charging_station.get('serial_number'),
charging_station.get('firmware_version'),
reason,
)
return call_result.BootNotification(
current_time=datetime.now(timezone.utc).isoformat(),
interval=300,
status=RegistrationStatusType.accepted,
)
@on(Action.heartbeat)
async def on_heartbeat(self) -> call_result.Heartbeat:
db = await get_db()
await db.execute(
"UPDATE charging_stations SET last_seen_at = NOW() WHERE station_id = $1",
self.id
)
return call_result.Heartbeat(
current_time=datetime.now(timezone.utc).isoformat()
)
@on(Action.status_notification)
async def on_status_notification(
self, timestamp: str, connector_status: str,
evse_id: int, connector_id: int, **kwargs
) -> call_result.StatusNotification:
log.info(
f"Status: {self.id} EVSE[{evse_id}]"
f"Connector[{connector_id}] = {connector_status}"
)
db = await get_db()
await db.execute(
"""
INSERT INTO evse_status
(station_id, evse_id, connector_id, status, updated_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (station_id, evse_id, connector_id) DO UPDATE SET
status = EXCLUDED.status,
updated_at = NOW()
""",
self.id, evse_id, connector_id, connector_status
)
return call_result.StatusNotification()
@on(Action.authorize)
async def on_authorize(
self, id_token: dict, **kwargs
) -> call_result.Authorize:
token = id_token['id_token']
token_type = id_token['type']
log.info(f"Authorize: {self.id} token={token} type={token_type}")
db = await get_db()
# Controlla prima la cache locale
row = await db.fetchrow(
"""
SELECT status, group_id, expiry_date
FROM authorization_cache
WHERE id_token = $1 AND id_token_type = $2
AND (expiry_date IS NULL OR expiry_date > NOW())
""",
token, token_type
)
status = AuthorizationStatusType.invalid
if row and row['status'] == 'Accepted':
status = AuthorizationStatusType.accepted
return call_result.Authorize(
id_token_info={'status': status}
)
@on(Action.transaction_event)
async def on_transaction_event(
self,
event_type: str,
timestamp: str,
trigger_reason: str,
seq_no: int,
transaction_info: dict,
evse: Optional[dict] = None,
id_token: Optional[dict] = None,
meter_value: Optional[list] = None,
**kwargs
) -> call_result.TransactionEvent:
tx_id = transaction_info['transaction_id']
log.info(
f"TransactionEvent: {self.id} {event_type} "
f"tx={tx_id} trigger={trigger_reason}"
)
db = await get_db()
if event_type == 'Started':
await self._handle_tx_started(
db, tx_id, evse, id_token, timestamp, meter_value
)
elif event_type == 'Updated' and meter_value:
await self._handle_tx_updated(db, tx_id, evse, meter_value)
elif event_type == 'Ended':
await self._handle_tx_ended(
db, tx_id, timestamp, transaction_info, meter_value
)
return call_result.TransactionEvent(
total_cost=0,
charging_priority=0,
id_token_info={'status': AuthorizationStatusType.accepted},
)
async def _handle_tx_started(
self, db, tx_id, evse, id_token, timestamp, meter_value
):
evse_id = evse['id'] if evse else 0
connector_id = evse.get('connector_id') if evse else None
token = id_token['id_token'] if id_token else 'unknown'
token_type = id_token['type'] if id_token else 'Local'
meter_start = self._extract_energy(meter_value)
await db.execute(
"""
INSERT INTO transactions
(transaction_id, station_id, evse_id, connector_id,
id_token, id_token_type, state, started_at, meter_start_wh)
VALUES ($1, $2, $3, $4, $5, $6, 'Started', $7, $8)
ON CONFLICT (transaction_id) DO NOTHING
""",
tx_id, self.id, evse_id, connector_id,
token, token_type, timestamp, meter_start
)
async def _handle_tx_updated(self, db, tx_id, evse, meter_value):
evse_id = evse['id'] if evse else 0
energy = self._extract_energy(meter_value)
power = self._extract_power(meter_value)
if energy is not None:
await db.execute(
"""
INSERT INTO meter_values
(transaction_id, station_id, evse_id, sampled_at, energy_wh, power_w)
VALUES ($1, $2, $3, NOW(), $4, $5)
""",
tx_id, self.id, evse_id, energy, power
)
async def _handle_tx_ended(
self, db, tx_id, timestamp, transaction_info, meter_value
):
meter_end = self._extract_energy(meter_value)
stop_reason = transaction_info.get('stopped_reason')
await db.execute(
"""
UPDATE transactions SET
state = 'Ended',
ended_at = $1,
meter_end_wh = $2,
stop_reason = $3
WHERE transaction_id = $4
""",
timestamp, meter_end, stop_reason, tx_id
)
def _extract_energy(self, meter_values: Optional[list]) -> Optional[float]:
if not meter_values:
return None
for mv in meter_values:
for sv in mv.get('sampled_value', []):
if sv.get('measurand', '') == 'Energy.Active.Import.Register':
return float(sv['value'])
return None
def _extract_power(self, meter_values: Optional[list]) -> Optional[float]:
if not meter_values:
return None
for mv in meter_values:
for sv in mv.get('sampled_value', []):
if sv.get('measurand', '') == 'Power.Active.Import':
return float(sv['value'])
return None
# === Comandi CSMS -> Stazione ===
async def send_remote_start(
self, evse_id: int, id_token: str, limit_amps: float = 32.0
) -> str:
"""Avvia una sessione da remoto su EVSE specificato."""
request = call.RequestStartTransaction(
id_token={'id_token': id_token, 'type': 'Central'},
evse_id=evse_id,
charging_profile={
'id': 999,
'stack_level': 0,
'charging_profile_purpose': 'TxProfile',
'charging_profile_kind': 'Relative',
'charging_schedule': [{
'id': 1,
'charging_rate_unit': 'A',
'charging_schedule_period': [
{'start_period': 0, 'limit': limit_amps}
],
}],
},
)
response = await self.call(request)
log.info(f"RemoteStart {self.id} EVSE{evse_id}: {response.status}")
return response.status
async def send_charging_profile(
self, evse_id: int, profile: dict
) -> str:
"""Imposta un profilo di carica per smart charging."""
request = call.SetChargingProfile(
evse_id=evse_id,
charging_profile=profile
)
response = await self.call(request)
log.info(f"ChargingProfile {self.id} EVSE{evse_id}: {response.status}")
return response.status
# Registry globale delle connessioni attive
_connected_stations: dict[str, ChargePointHandler] = {}
async def on_connect(websocket, path: str):
"""Callback per nuove connessioni WebSocket OCPP."""
station_id = path.strip('/').split('/')[-1]
if not station_id:
await websocket.close(1008, 'Missing station ID')
return
log.info(f"Connessione da: {station_id} | path={path}")
cp = ChargePointHandler(station_id, websocket)
_connected_stations[station_id] = cp
try:
await cp.start()
except websockets.exceptions.ConnectionClosed as e:
log.info(f"Disconnesso: {station_id} code={e.code}")
except Exception as e:
log.error(f"Errore: {station_id} - {e}")
finally:
_connected_stations.pop(station_id, None)
db = await get_db()
await db.execute(
"UPDATE charging_stations SET is_online = FALSE WHERE station_id = $1",
station_id
)
async def main():
await get_db() # Inizializza pool DB
log.info("CSMS OCPP 2.0.1 avviato")
server = await websockets.serve(
on_connect,
'0.0.0.0',
9000,
subprotocols=['ocpp2.0.1'],
# TLS: aggiungere ssl=ssl_context per Security Profile 2-3
ping_interval=60,
ping_timeout=30,
max_size=1_048_576, # 1MB max message size
)
log.info("In ascolto su ws://0.0.0.0:9000/ocpp/{stationId}")
await server.wait_closed()
if __name__ == '__main__':
asyncio.run(main())
ISO 15118 및 플러그 앤 충전
ISO 15118 차량 간의 높은 수준의 통신을 정의합니다. 전력선통신을 통한 전기(EV) 및 충전소(EVSE) (PLC)를 DC 충전 케이블(CCS)에 연결합니다. OCPP 2.0.1은 기본적으로 ISO 15118을 통합합니다. 기능 블록 M을 통해 플러그 앤 충전: 차량은 X.509 디지털 인증서를 통해 자동으로 인증됩니다. RFID 또는 모바일 앱.
V2G(Vehicle-to-Grid) PKI 아키텍처
Plug & Charge 인증 시스템은 PKI(공개 키)를 기반으로 합니다. 전기 이동성을 위한 인프라) 특정 계층 구조:
V2G Root CA (Root of Trust - gestita da OEM o eMSP)
|
+-- V2G Intermediate CA
| |
| +-- EVSE Certificate (installato nella stazione)
| CN = EVSE-IT-MIL-001
|
+-- eMobility Service Provider CA (eMSP)
|
+-- Contract Certificate (installato nel veicolo)
CN = IT.CPO.000001234 (eMAID - e-Mobility Account Identifier)
SubjectAltName = eMAID:IT.CPO.000001234
완전한 플러그 앤 충전 흐름
EV EVSE CSMS eMSP
| | | |
|-- Plug cavo DC --> | | |
| | | |
|<= ISO 15118-2 TLS =>| (PLC sul cavo SLAC) | |
| | | |
|-- ContractCert --->| | |
| (eMAID, X.509) | | |
| |--- Authorize req ----->| |
| | idToken.type=eMAID | |
| | |--- OCPI check --->|
| | |<-- Contract OK --|
| | | |
| |<-- Authorize.conf ----| |
| | status: Accepted | |
| | | |
|<= Charging Start ==>| | |
| | | |
| EV invia target | | |
|-- EnergyRequest --->| | |
| SoC: 45% | | |
| Target: 80% | | |
| Departure: 18:30 | | |
| |--- TransactionEvent -->| |
| | ISO15118Trigger | |
| | | |
| |<-- SetChargingProfile--| |
|<= Schedule via PLC =>| | |
ISO 15118 생산 현황(2026)
- ISO 15118-2: 플러그 앤 충전 AC/DC - HPC DC 충전기(Ionity, Fastned, Tesla Supercharger V3)에서 널리 지원됩니다.
- ISO 15118-20: 양방향 V2G 지원 - 하드웨어 지원 준비, 소프트웨어 출시 예정(2025~2026년)
- AFIR 요구 사항: 모든 새로운 V2G 지원 스테이션은 2026년부터 ISO 15118을 지원해야 합니다.
- AFIR 2027 준수: 2027년 1월 1일 이후에 설치된 모든 충전기는 스마트 충전이 가능해야 합니다.
- 이탈리아의 실제 V2G: V2H(Vehicle-to-Home) 표준에 Enel X Way 및 Nissan Leaf를 사용한 최초의 파일럿
확장 가능한 아키텍처: 충전 포인트 10~100,000개
엔터프라이즈 CSMS는 수만에서 수십만 개의 연결을 관리해야 합니다. WebSocket과 경쟁합니다. 아키텍처는 추가 구성 요소를 사용하여 단계적으로 발전합니다. 그들은 다른 규모로 작용합니다.
1단계: 소규모(10-500개 스테이션)
+------------------+ WebSocket/OCPP +------------------+
| Charging |------------------------| CSMS Monolitico |
| Stations (10-500)| wss://csms:9000/ocpp | Python/asyncio |
+------------------+ | Port 9000 |
+--------+---------+
|
+-------+-------+
| PostgreSQL |
| Redis (cache) |
+---------------+
Stack: Python asyncio + PostgreSQL + Redis
Deployment: 1 VM (4 vCPU, 8GB RAM), 1 DB managed
Costo: ~$200/mese
2단계: 중간 규모(500-10,000개 스테이션)
+------------+ +------------------+ +--------------+
| Load | | WS Gateway #1 | | Message |
| Balancer +---->| (asyncio CSMS) +---->| Broker |
| (HAProxy) | | Max 2000 conn | | (RabbitMQ) |
| | +------------------+ | |
| Sticky +---->| WS Gateway #2 +---->| |
| Sessions | | (asyncio CSMS) | +--------------+
| | +------------------+ |
+------------+ +------+------+
| Business |
| Services |
| (FastAPI) |
+------+------+
|
+----------+----------+
| PostgreSQL (HA) |
| TimescaleDB |
| Redis Cluster |
+---------------------+
Sticky sessions: basate su station_id nel path URL
Cross-node ops: Redis pub/sub per inviare comandi alle stazioni
Costo: ~$2.000/mese (K8s managed)
3단계: 대규모(10,000-100,000개 스테이션)
Global Load Balancer (Anycast)
|
+---------------+---------------+
| |
Region EU-WEST Region EU-SOUTH
+------------------+ +------------------+
| WS Gateway Pool | | WS Gateway Pool |
| (50 pods, 2000 | | (30 pods) |
| conn each = 100K)| +--------+---------+
+--------+---------+ |
| |
+-------------+----------------+
|
+-------+--------+
| Apache Kafka |
| (12 partitions)|
| per topic |
+-------+--------+
|
+-----------------+------------------+
| | |
+------+------+ +-------+------+ +--------+------+
| Transaction | | Smart | | Device |
| Service | | Charging Svc | | Mgmt Svc |
| (10 replicas)| | (5 replicas) | | (3 replicas) |
+------+------+ +-------+------+ +--------+------+
| | |
+--------+--------+------------------+
|
+--------+--------+
| PostgreSQL |
| Citus (sharding) |
| Shard key: |
| station_id hash |
+--------+---------+
|
+--------+--------+
| TimescaleDB |
| (meter values) |
+--------+--------+
Kafka Topics:
- ocpp.boot-notification (chiave: station_id)
- ocpp.transaction-events (chiave: transaction_id)
- ocpp.meter-values (chiave: station_id)
- ocpp.status-notifications (chiave: station_id)
- csms.commands (chiave: station_id)
Throughput target: 1M messaggi/ora, latenza P99 < 200ms
Costo: ~$30.000/mese (multi-region Kubernetes)
Redis를 사용한 교차 노드 연결 관리
다중 노드 배포에서 CSMS는 각각이 어떤 게이트웨이에 있는지 알아야 합니다. 명령(SetChargingProfile, RemoteStart 등)을 보내는 스테이션입니다. Redis 게시/구독 문제를 해결합니다.
import json
import asyncio
import redis.asyncio as aioredis
redis_client = aioredis.from_url(
'redis://redis-cluster:6379',
encoding='utf-8',
decode_responses=True
)
# Registra il nodo della connessione
async def register_connection(station_id: str, gateway_id: str):
await redis_client.setex(
f"csms:gateway:{station_id}",
value=gateway_id,
time=600 # TTL: 10 minuti, rinnovato a ogni heartbeat
)
# Pubblica un comando verso una stazione (qualunque nodo sia)
async def publish_command(station_id: str, action: str, payload: dict):
channel = f"csms:commands:{station_id}"
await redis_client.publish(channel, json.dumps({
'action': action,
'payload': payload
}))
# Su ogni nodo gateway: ascolta i comandi per le stazioni connesse
async def listen_for_commands(connected_stations: dict):
pubsub = redis_client.pubsub()
# Sottoscrivi ai canali delle stazioni connesse a questo nodo
async def subscribe_station(station_id: str):
await pubsub.subscribe(f"csms:commands:{station_id}")
async for message in pubsub.listen():
if message['type'] != 'message':
continue
station_id = message['channel'].split(':')[-1]
cp = connected_stations.get(station_id)
if not cp:
continue # Stazione non su questo nodo, ignora
cmd = json.loads(message['data'])
try:
if cmd['action'] == 'SetChargingProfile':
await cp.send_charging_profile(
cmd['payload']['evse_id'],
cmd['payload']['profile']
)
elif cmd['action'] == 'RemoteStart':
await cp.send_remote_start(
cmd['payload']['evse_id'],
cmd['payload']['id_token'],
)
except Exception as e:
log.error(f"Errore esecuzione comando {cmd['action']}: {e}")
모니터링, 지표 및 Grafana 대시보드
프로덕션 환경의 CSMS에는 포괄적인 관찰 시스템이 필요합니다. 측정항목 모니터링의 핵심은 인프라 상태, 품질 서비스 및 운영 성과.
주요 운영 지표
| 미터법 | 공식/소스 | 목표 SLA | 경고 임계값 |
|---|---|---|---|
| 스테이션 가용성 | 온라인 방송국 / 총 방송국 x 100 | >= 99% | < 95% |
| OCPP 메시지 지연 시간 P99 | CALL 시간 -> CALLRESULT(95번째 백분위수) | 2초 미만 | > 5초 |
| 거래 성공률 | TX 완료 / TX 시작 x 100 | >= 98% | < 95% |
| 전달된 에너지(kWh/시간) | 현재 MeterValues 합계 | 기준선 +10% | < 기준 -20% |
| 인증 거부율 | 인증이 유효하지 않음 / 총 인증 x 100 | < 2% | > 10% (공격 가능) |
| WebSocket 재연결/시간 | 스테이션 당 새로운 연결 카운터 | < 2/시간/역 | > 10/시간/역 |
| 스마트 충전 규정 준수 | 실제 전력과 설정된 프로필 비교 | +/- 5% | 편차 > 15% |
| 인증된 만료일 | TLS 인증서 만료까지 남은 일수 | > 30일 | < 30일(갱신 알림) |
CSMS용 Prometheus 내보내기
from prometheus_client import (
Counter, Gauge, Histogram, start_http_server
)
# Metriche Prometheus
OCPP_MESSAGES_TOTAL = Counter(
'ocpp_messages_total',
'Numero totale messaggi OCPP processati',
['action', 'direction', 'status'] # direction: inbound/outbound
)
OCPP_MESSAGE_DURATION = Histogram(
'ocpp_message_duration_seconds',
'Latenza elaborazione messaggi OCPP',
['action'],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)
STATIONS_CONNECTED = Gauge(
'csms_stations_connected_total',
'Numero stazioni connesse al CSMS'
)
ACTIVE_TRANSACTIONS = Gauge(
'csms_active_transactions_total',
'Numero transazioni di ricarica attive'
)
ENERGY_DELIVERED_WH = Counter(
'csms_energy_delivered_wh_total',
'Energia totale erogata in Wh',
['station_id']
)
AUTH_RESULTS = Counter(
'csms_authorization_results_total',
'Risultati delle autorizzazioni OCPP',
['status'] # Accepted, Invalid, Blocked, Expired
)
SMART_CHARGING_EVENTS = Counter(
'csms_smart_charging_events_total',
'Operazioni smart charging',
['action', 'result']
)
def start_metrics_server(port: int = 8001):
"""Avvia il server HTTP Prometheus su porta specificata."""
start_http_server(port)
log.info(f"Prometheus metrics su http://0.0.0.0:{port}/metrics")
# Decorator per misurare latenza handler
import time
import functools
def track_ocpp_handler(action: str):
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start = time.monotonic()
try:
result = await func(*args, **kwargs)
OCPP_MESSAGES_TOTAL.labels(
action=action, direction='inbound', status='success'
).inc()
return result
except Exception as e:
OCPP_MESSAGES_TOTAL.labels(
action=action, direction='inbound', status='error'
).inc()
raise
finally:
OCPP_MESSAGE_DURATION.labels(action=action).observe(
time.monotonic() - start
)
return wrapper
return decorator
// Pannelli principali per dashboard Grafana CSMS
// 1. Stazioni online (Gauge)
{
"title": "Stazioni Connesse",
"type": "gauge",
"fieldConfig": {
"defaults": {
"thresholds": {
"steps": [
{ "color": "red", "value": 0 },
{ "color": "yellow", "value": 90 },
{ "color": "green", "value": 99 }
]
}
}
},
"targets": [{
"expr": "csms_stations_connected_total / csms_stations_registered_total * 100",
"legendFormat": "Availability %"
}]
}
// 2. Latenza messaggi OCPP P99 (Time Series)
{
"title": "OCPP Message Latency P99",
"type": "timeseries",
"targets": [{
"expr": "histogram_quantile(0.99, rate(ocpp_message_duration_seconds_bucket[5m]))",
"legendFormat": "P99 - {{action}}"
}]
}
// 3. Energia erogata (Stat panel)
{
"title": "Energia Totale Erogata oggi (kWh)",
"type": "stat",
"targets": [{
"expr": "increase(csms_energy_delivered_wh_total[24h]) / 1000",
"legendFormat": "kWh"
}]
}
// 4. Auth rejection rate - alert su attacchi
{
"title": "Rejection Rate Autorizzazioni (%)",
"type": "timeseries",
"targets": [{
"expr": "rate(csms_authorization_results_total{status='Invalid'}[5m]) / rate(csms_authorization_results_total[5m]) * 100"
}]
}
이탈리아 및 유럽 규정: AFIR, PNIRE, PNRR
이탈리아와 유럽의 EV 충전 인프라는 엄격하게 규제됩니다. 규정 준수는 선택 사항이 아닙니다. 이는 다음과 같은 기술적 요구 사항과 관련이 있습니다. 스테이션(법적 계측, 접근성, 결제) 및 통신 표준.
AFIR(대체 연료 인프라 규정 - EU 2023/1804)
AFIR은 2024년 4월에 발효되었으며 정확한 시간표를 정의합니다. TEN-T 네트워크 및 도시 지역의 충전 인프라에 대한 의무:
| 만료 | 요구 사항 | 적용 가능성 |
|---|---|---|
| 2025년 12월 31일 | TEN-T 코어 네트워크에서 60km마다 스테이션 >= 150kW | EU 전체 |
| 2027년 12월 31일 | 스테이션 >= 60km마다 150kW TEN-T 종합 네트워크 | EU 전체 |
| 2025년 4월 14일 | 무료 정적 및 동적 데이터(위치, 커넥터 유형, 가용성) | 공공 방송국 |
| 2027년 1월 1일 | 22kW를 초과하는 신규/개조된 스테이션을 위한 스마트 충전 준비 | 공공 방송국 |
| 즉각적인 | 가입 없이 임시 결제(비접촉식 은행 카드) | 스테이션 > 50kW 공공 |
| 2026+ | V2G 가능 스테이션을 위한 ISO 15118 | 양방향 스테이션 |
이탈리아의 PNIRE 및 PNRR
이탈리아에서는 AFIR 구현이 두 가지 주요 도구를 통해 이루어집니다.
- PNIRE(국가 전기 충전 인프라 계획): MASE(환경에너지안전부)에서 관리하는 국가 목표: 2025년 13,755개 공공역, 도로망 집중 및 도시 지역
- PNRR 미션 2, 투자 4.3: 7억 유로 이상 고속도로 및 지역의 고출력 충전(HPC >= 150kW)에 할당됨 서비스. MEMORY에 따르면 PNRR은 총 127억 유로를 할당했습니다. 아직 부분적으로 사용 중
- MASE 통화 2024-2025: 민간 사업자에 대한 인센티브 충전 밀도가 낮은 지역에 인프라 설치(이탈리아 남부, 농촌 지역). 설치비 최대 60% 지원
이탈리아 상황: 2025년 충전소 73,000개
2025년 12월 31일 기준으로 이탈리아는 73,000개 이상의 공공 충전소 (2024년 대비 +18%), 국토 커버리지는 93%입니다. 이들 중: 약 12,000개는 고속 충전 포인트(> 22kW)이고 5,000개는 HPC입니다. (>= 150kW). 인프라가 가장 많은 지역은 롬바르디아(23%)이며 그 뒤를 이었습니다. 라치오(12%)와 토스카나(9%) 출신입니다. 남부는 여전히 i의 우선순위 지역입니다. 2026년까지 격차를 줄이는 것을 목표로 하는 PNRR 자금 지원.
CSMS 소프트웨어에 대한 기술적 의무
- OCPI(개방형 충전 포인트 인터페이스): CPO(Charge Point Operator)와 eMSP(e-Mobility Service Provider) 간 로밍을 위한 필수 프로토콜입니다. OCPI 버전 2.2.1 권장
- 법적 계측: 독일(Eichrecht) 및 EU(MID - 측정 장비 지침)에서는 측정 시스템이 인증되어야 하며 판독값은 투명하고 사용자가 변경할 수 없어야 합니다.
- GDPR: 충전 데이터(RFID, 위치, 시간)는 개인정보입니다. 개인정보 보호정책, 데이터 최소화, 잊혀질 권리가 필요합니다.
- CDR(청구 내역 기록): 상호 운용 가능한 송장 발행을 위해 OCPI 형식을 준수해야 하며 최소 5년 동안 보관되어야 합니다(이탈리아 납세 의무).
사례 연구: 50개 이상의 충전소를 갖춘 이탈리아 충전 네트워크
이탈리아 운영자를 위한 실제 CSMS 시스템의 아키텍처를 살펴보겠습니다. 도시 주차장에 분산된 50개의 DC 충전소(22~150kW)를 관리합니다. 3개 지역에 쇼핑센터가 있습니다.
운영 요구 사항
- 스테이션 50개, 총 EVSE 150개(스테이션당 평균 EVSE 3개), 커넥터 300개(CCS2 + Type2)
- 일일 최대 사용량: 400~600회 충전 세션(오전 7~9시, 오후 12~2시, 오후 5~9시)
- 스마트 충전: 스테이션당 200A 제한 준수, SEM(Site Energy Manager)과 통합
- 멀티 테넌트: 부분적인 스테이션 가시성을 갖춘 3개의 CPO, Enel을 통한 OCPI 로밍
- 가동 시간 SLA: 스테이션당 월간 99.5%, CSMS 백엔드의 경우 99.9%
선택된 아키텍처
+------------------+ +------------------+ +------------------+
| 50 Stazioni | | HAProxy | | CSMS Primary |
| OCPP 2.0.1 |---->| (WS sticky sess.)|---->| Python asyncio |
| Security Profile 2| | Port 443 (TLS) | | 2 replicas |
+------------------+ +------------------+ +--------+---------+
|
+------------------+ +--------+---------+
| CSMS Worker |<----| Redis Cluster |
| (FastAPI REST) | | (stato sessioni) |
| Dashboard, API | +------------------+
+------------------+
|
+-----------+----------+
| |
+-------+-------+ +---------+------+
| PostgreSQL 16 | | TimescaleDB |
| (transazioni, | | (meter values)|
| auth, device | | 2TB/anno est. |
| model, CDR) | +---------------+
+---------------+
Monitoring: Prometheus + Grafana Cloud
Alerting: PagerDuty (P1: stazione offline >5min, P2: CSMS latency >2s)
CDR/Billing: integrazione ERP via webhook PostgreSQL NOTIFY
실제 운영 지표(일반적인 달)
| 미터법 | Valore | 메모 |
|---|---|---|
| 세션/월 | ~14,000 | 평균 280 세션/일 |
| 에너지 전달 | ~85,000kWh/월 | 평균 6kWh/세션 |
| 평균 스테이션 가용성 | 99.3% | 0.7% 가동 중지 시간 = 스테이션당 ~5시간/월 |
| 인증 승인률 | 96.8% | 3.2% 거부됨 = 만료되었거나 등록되지 않음 |
| OCPP 대기 시간 P95 | 180ms | 왕복 LTE 스테이션 포함 |
| 스마트 충전 이벤트/일 | ~1,200 | 시간당 평균 24번의 재조정 |
| 피크 감소 효능 | 92% | 시간 전력의 92% <= 설정된 한계 |
CSMS 보안: OWASP 및 위협 모델
CSMS는 중요한 인프라입니다. 손상되면 중단될 수 있습니다. 충전, 요금 조작 또는 전력망 공격 충전 부하를 통해. 주요 위협은 다음과 같이 식별할 수 있습니다. 특정 위협 모델.
CSMS 위협 모델
| 위협 | 벡터 | 영향 | 완화 |
|---|---|---|---|
| 비인가 역 | 도난당한 자격 증명으로 연결 | 허위 데이터 주입, 사기 소비 | mTLS(보안 프로필 3), 인증된 고정 |
| 중간자 | 안전하지 않은 네트워크에서 WS 가로채기 | RFID 토큰 가로채기, 명령 조작 | TLS 1.3 필수, 투명성 인증 |
| 재생 공격 | 캡처된 OCPP 메시지 재전송 | 이중 청구, 잘못된 권한 | 고유 MessageId, 타임스탬프 유효성 검사, nonce |
| DDoS 웹소켓 | 연결 또는 메시지의 홍수 | CSMS에 연결할 수 없음, 인프라 DoS | 속도 제한, 연결 제한, WAF |
| OCPP 페이로드를 통한 SQL 주입 | idToken 필드에 SQL 페이로드가 포함된 OCPP 페이로드 | DB 유출, 권한 상승 | 준비된 진술, ORM, 입력 검증 |
| RFID 복제 | 합법적인 RFID 카드 복제 | 다른 사용자가 지불한 세션 | ISO 15118 P&C, RFID 화이트리스트, 이상 탐지 |
| 악성 펌웨어 | 악성코드로 펌웨어 업데이트 | 스테이션의 물리적 제어, 그리드 조작 | 펌웨어 디지털 서명, 보안 부팅, SBOM |
CSMS 강화: 보안 체크리스트
import ssl
import re
from functools import wraps
# 1. Configurazione TLS sicura (Security Profile 2-3)
def create_tls_context(
certfile: str,
keyfile: str,
cafile: str,
require_client_cert: bool = False
) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.load_cert_chain(certfile=certfile, keyfile=keyfile)
if require_client_cert: # Security Profile 3 (mTLS)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(cafile=cafile)
# Disabilita cipher suite deboli
ctx.set_ciphers(
'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:!aNULL:!eNULL:!LOW:!EXPORT'
)
return ctx
# 2. Rate limiting per connessioni WebSocket
from collections import defaultdict
import time
_connection_attempts: dict[str, list[float]] = defaultdict(list)
MAX_CONN_PER_MINUTE = 10
def check_rate_limit(client_ip: str) -> bool:
"""Ritorna True se il client può connettersi, False se throttled."""
now = time.monotonic()
window = _connection_attempts[client_ip]
# Rimuovi tentativi più vecchi di 60 secondi
_connection_attempts[client_ip] = [
t for t in window if now - t < 60
]
if len(_connection_attempts[client_ip]) >= MAX_CONN_PER_MINUTE:
log.warning(f"Rate limit superato per IP: {client_ip}")
return False
_connection_attempts[client_ip].append(now)
return True
# 3. Validazione MessageId per prevenire replay attack
_seen_message_ids: set[str] = set()
_message_id_pattern = re.compile(r'^[a-zA-Z0-9\-_\.]{1,36}






