MQTT에서 InfluxDB까지: 실시간 에너지 IoT 플랫폼
글로벌 산업용 에너지 IoT 시장은 2025년에는 220억 달러 2030년까지 연평균 성장률(CAGR) 19.8%로 성장하여 2030년까지 540억개 규모로 성장할 것입니다. 각 10MW 태양광 발전 시스템 이상으로 생성 하루에 400만 건의 측정 인버터, 방사선 센서, 스트링 미터 및 기상 데이터 로거. 각 풍력 발전소는 다음과 같은 진동 데이터를 추가합니다. 나셀의 토크와 온도. 각 EV 충전소는 연결 상태를 게시하고, 30초마다 전원 출력 및 커넥터 상태를 확인합니다.
문제는 이 데이터를 수집하는 것이 아니라 지연 시간을 두고 안정적이고 확장 가능하게 수행하는 것입니다. 1초 미만, 수년에 걸쳐 기록을 유지하고 분석 쿼리를 작성하는 능력 실시간. 대역폭이 제한된 산업 네트워크를 위해 만들어진 MQTT 프로토콜은 에너지 IIoT의 공통 언어입니다. InfluxDB 및 시계열 스토리지 솔루션 이 도메인에 가장 많이 채택되었습니다. Telegraf를 브리지로 사용하고 Grafana를 레이어로 사용 시각화, 소규모 가정용 시스템부터 유틸리티까지 관리하는 스택 형성 멀티 기가와트.
이 기사에서는 아키텍처를 통해 완전한 에너지 IoT 플랫폼을 구축합니다. 각 구성 요소의 세부 구성을 통해 Docker까지 End-to-End 200개의 인버터를 갖춘 10MW 태양광 단지에 대한 작업 구성 및 실제 사례 연구. 각 섹션에는 테스트된 코드와 생산 전략이 포함되어 있습니다.
이 기사에서 배울 내용
- 엔드투엔드 아키텍처: 센서 → 게이트웨이 → MQTT → Telegraf → InfluxDB → Grafana
- MQTT 심층 분석: QoS, 메시지 유지, 의지, 에너지를 위한 주제 네임스페이스 디자인
- 브로커 비교: 확장성과 클러스터링을 위한 Mosquitto vs EMQX vs HiveMQ
- Telegraf: MQTT 소비자 구성, 프로세서 파이프라인, InfluxDB 출력
- InfluxDB 3.x: 버킷 설계, 보존 정책, Flux 쿼리 및 작업 예약
- Modbus RTU/TCP → Python 및 pymodbus를 사용한 MQTT 브리지
- 프로덕션 스택을 위한 Docker Compose 완성
- 보안: TLS 상호, MQTT 인증, InfluxDB 토큰, 네트워크 세분화
- 사례 연구: 200개의 인버터, 100K msg/sec를 갖춘 10MW 태양광 발전소
- 고급 경고: Grafana, PagerDuty 및 Slack 통합
EnergyTech 시리즈 - 기사 위치
| # | Articolo | 수준 | 상태 |
|---|---|---|---|
| 1 | OCPP 2.x 프로토콜: EV 충전 시스템 구축 | 고급의 | 게시됨 |
| 2 | DERMS 아키텍처: 수백만 개의 분산 리소스 수집 | 고급의 | 게시됨 |
| 3 | ML을 사용한 재생 에너지 예측: Python LSTM | 고급의 | 게시됨 |
| 4 | 그리드 규모 스토리지를 위한 배터리 관리 시스템 | 고급의 | 게시됨 |
| 5 | 소프트웨어 엔지니어를 위한 IEC 61850: 스마트 그리드 통신 | 고급의 | 게시됨 |
| 6 | EV 충전 부하 분산: 실시간 알고리즘 | 고급의 | 게시됨 |
| 7 | 현재 위치 - MQTT에서 InfluxDB까지: 실시간 에너지 IoT 플랫폼 | 고급의 | 현재의 |
| 8 | 탄소 회계 소프트웨어 아키텍처: ESG 플랫폼 | 고급의 | 다음 |
| 9 | 에너지 인프라를 위한 디지털 트윈: 실시간 시뮬레이션 | 고급의 | 곧 출시 예정 |
| 10 | P2P 에너지 거래를 위한 블록체인: 스마트 계약 및 제약 | 고급의 | 곧 출시 예정 |
엔드투엔드 아키텍처: 센서에서 대시보드까지
실시간 에너지 IoT 플랫폼의 아키텍처는 5가지 수준으로 구분됩니다. 각각 특정 책임과 다양한 신뢰성 요구 사항이 있습니다. 이해 이러한 분리는 첫 번째 구성 라인을 작성하기 전에 필수적입니다.
레벨 1: 필드(필드 레이어)
가장 낮은 수준에서는 태양광 인버터, 풍속계, 일사계 등의 물리적 장치를 찾습니다. 스트링 미터, PMU(위상 측정 장치), 스마트 미터 및 EV 충전소. 이러한 장치의 대부분은 산업 현장 프로토콜을 사용합니다. 모드버스 RTU RS-485에서, 모드버스 TCP 이더넷을 통해, IEC 61850 변전소에서, DNP3 유틸리티 그리드용 또는 인버터용 SunSpec과 같은 독점 프로토콜입니다.
이러한 프로토콜은 기본적으로 최신 IP 또는 클라우드와 호환되지 않습니다. 그들의 여론조사 일반적인 주기 범위는 100ms(PMU)에서 60초(레거시 데이터 로거)입니다. 최적의 빈도 태양광 인버터 및 전기량(전압, 전류, 전력)에 1~5초 소요 열량(모듈 온도, 인버터 온도)은 30~60초입니다.
레벨 2: 게이트웨이/에지(에지 레이어)
에지 게이트웨이 및 변환기: 필드 프로토콜을 읽고 MQTT를 통해 브로커에 게시합니다. 중앙. 임베디드 장치(Raspberry Pi 4, BeagleBone, Moxa ioThinx)일 수 있으며, MQTT 스택이 통합된 PLC 또는 다중 설치를 위한 로컬 에지 서버(NUC, ODROID) 크다. 게이트웨이는 세 가지 중요한 기능을 수행합니다.
- 프로토콜 번역: Modbus → MQTT, IEC 61850 → MQTT, DNP3 → MQTT
- 로컬 버퍼링: WAN 연결이 끊긴 동안 데이터 축적(저장 및 전달)
- 엣지 전처리: 필터링, 집계, 국소 이상 탐지
계층 3: 메시지 브로커(전송 계층)
MQTT 브로커는 전송 시스템의 핵심입니다. Edge 게이트웨이로부터 메시지를 수신합니다.li 가입자에게 배포합니다(주로 Telegraf뿐 아니라 시스템과 같은 다른 소비자에게도 배포). SCADA 또는 실시간 알림). 브로커 선택은 규모에 따라 다릅니다. 단일 또는 소규모 설치(10K 연결 미만), 배포당 EMQX 또는 HiveMQ 기업 및 다중 사이트.
수준 4: 수집 및 저장(데이터 영역)
Telegraf는 MQTT 주제를 구독하고 다음을 사용하여 메시지를 InfluxDB 측정항목으로 변환합니다. 구성 가능한 프로세서 파이프라인. InfluxDB는 보존을 통해 시계열을 수신하고 인덱싱합니다. 차별화된 정책: 30일 동안의 고해상도 원시 데이터, 1년, 5년 동안의 일일 집계입니다. Flux 작업 일정 다운샘플링 자동 및 계산된 경고.
레벨 5: 시각화 및 경고(프레젠테이션 계층)
Grafana는 데이터 소스 플러그인을 통해 InfluxDB에 연결하고 실시간 대시보드를 렌더링합니다. 5~30초마다 업데이트됩니다. Grafana의 알림 시스템은 쿼리를 평가합니다. 주기적으로 PagerDuty, Slack, 이메일 또는 사용자 정의 웹후크에 알림을 보냅니다. 임계값이 초과되었습니다.
완전한 아키텍처 다이어그램
데이터 흐름은 다음 경로를 따릅니다. 인버터/센서 (모드버스/썬스펙) → 게이트웨이 엣지 (pymodbus가 있는 라즈베리 파이/NUC) → MQTT 브로커 (모기/EMQX) → 전신 (MQTT 소비자 + JSON 파서) → 인플럭스DB (시계열 DB) → 그라파나 (대시보드 + 알림). 동시에, 경보 관리자 InfluxDB에서는 매분마다 Flux 작업을 평가하고 쓸 수 있습니다. 전용 버킷의 이벤트 또는 HTTP 엔드포인트 호출.
MQTT 심층 분석: 에너지 프로토콜 및 설계
MQTT 5.0 기본 사항
MQTT(Message Queuing Telemetry Transport)는 설계된 게시-구독 프로토콜입니다. 대역폭이 제한되고 대기 시간이 긴 네트워크의 경우. 버전 5.0(OASIS 표준 RFC 2019) 엔터프라이즈 애플리케이션에 중요한 기능 추가: 세션 만료 간격, 메시지 만료 간격, 확장된 이유 코드, 메시지의 사용자 속성, 흐름 제어 헤더 크기를 줄이기 위해 최대 및 주제 별칭을 수신합니다.
QoS 0, 1, 2: 에너지 데이터에 대한 올바른 선택
서비스 품질은 에너지 MQTT 시스템 설계에서 가장 중요한 선택입니다. 세 가지 옵션은 처리량, 네트워크 오버헤드 및 성능 측면에서 매우 다른 영향을 미칩니다. 배송 보장:
| QoS | 보증 | 간접비 | 사용 사례 에너지 |
|---|---|---|---|
| QoS 0 | 최대 한 번(실행 후 잊어버리기) | 최소(1RTT) | 고주파 원격 측정(1Hz+), 간헐적인 손실이 허용되는 데이터: 방사조도, 주변 온도 |
| QoS 1 | 최소 1회(중복 포함) | 중간(RTT 2개) | 생산 지표(kWh, kW), 경보, 장치 상태 - 타임스탬프를 통해 InfluxDB에서 관리되는 중복 항목 |
| QoS 2 | 정확히 온스 | 높음(4RTT) | 제어 명령(인버터 설정값, 스위치 열기/닫기), 금융 데이터(그리드에서 판매되는 에너지) |
일반적인 PV 시스템의 경우 실제 권장 사항은 다음과 같습니다. 원격 측정의 경우 QoS 0 기상(조사, 온도, 바람), 생산 및 경보용 QoS 1, 제어 명령에만 QoS 2. 이 선택은 안정성과 처리량의 균형을 유지합니다. 브로커는 WAN 대역폭을 포화시키지 않고 초당 50~100K 메시지를 관리할 수 있습니다.
메시지 및 유언장 메시지 유지
종종 과소평가되지만 에너지 시스템을 위한 중요한 MQTT 기능 두 가지는 다음과 같습니다.
-
메시지 보관: 브로커는 keep=true로 마지막 메시지를 저장합니다.
각 주제마다. 새로운 구독자가 연결되면 즉시 값을 받습니다.
다음 출판을 기다리지 않고 최신 상태로 유지됩니다. 상태 항목의 기본 사항:
plant/PV001/status유지를 사용하면 각각의 새 대시보드에서 시스템 상태를 즉시 확인할 수 있습니다. -
유언장 메시지(유언장 및 유언장): 게이트웨이가 메시지를 구성합니다.
클라이언트 연결이 비정상적으로 끊어지면 브로커가 자동으로 게시합니다.
게이트웨이 연결 끊김을 알리는 데 사용됩니다. 원격 사이트의 Raspberry Pi가 떨어지면
브로커가 게시
plant/PV001/connectivity offline즉시, 세션 시간 초과를 기다리지 않고.
주제 에너지 시스템을 위한 네임스페이스 설계
토픽 네임스페이스의 디자인은 모든 측면에 영향을 미치는 아키텍처 결정입니다. 시스템: InfluxDB의 라우팅, 필터링, ACL 보안 및 구성. 잘 설계된 계층 구조는 다음 패턴을 따릅니다.
# Schema topic namespace per impianto energetico
# Struttura: {tipo_impianto}/{plant_id}/{subsystem}/{device_id}/{metric_type}
# === FOTOVOLTAICO ===
# Inverter string-level metrics
plant/PV001/inverter/INV001/metrics
plant/PV001/inverter/INV001/alarms
plant/PV001/inverter/INV001/status
# String-level measurements
plant/PV001/string/STR001/metrics
plant/PV001/string/STR001/metrics/voltage
plant/PV001/string/STR001/metrics/current
# Meteorological sensors
plant/PV001/weather/WS001/metrics
# Plant-level aggregates (calcolati dal gateway)
plant/PV001/totals/power_ac
plant/PV001/totals/energy_today
# === EOLICO ===
wind/WF001/turbine/T001/nacelle/metrics
wind/WF001/turbine/T001/blade/metrics
wind/WF001/turbine/T001/gearbox/temperature
wind/WF001/substation/SS001/metrics
# === STAZIONI DI RICARICA EV ===
ev/SITE001/charger/CHRG001/connector/1/metrics
ev/SITE001/charger/CHRG001/session/current
ev/SITE001/charger/CHRG001/status
ev/SITE001/totals/energy_dispensed
# === SISTEMA ===
# Heartbeat e connectivity
plant/PV001/gateway/GW001/heartbeat # ogni 60s
plant/PV001/gateway/GW001/status # retain=true
plant/PV001/gateway/GW001/version # retain=true
# === WILDCARD SUBSCRIPTIONS ===
# Telegraf sottoscrive tutti i metrics:
plant/+/inverter/+/metrics # tutti gli inverter di tutti gli impianti
plant/PV001/+/+/metrics # tutti i metrics dell'impianto PV001
plant/# # tutto il plant PV001 (usare con cautela)
네임스페이스의 기본 규칙: 각 수준은 의미론적 의미를 가져야 합니다. 제어 주제(명령)는 원격 측정 주제(데이터)와 분리되어야 합니다. 일관성을 위해 snake_case를 사용하고 특수 문자와 공백을 피하고 깊이를 제한하십시오. 다양한 브로커 및 클라이언트와의 호환성을 위해 최대 6개 레벨로 구성됩니다.
MQTT 브로커 비교: Mosquitto vs EMQX vs HiveMQ
MQTT 브로커의 선택은 시스템의 확장성과 안정성을 위한 기본입니다. 에너지 부문에서 가장 많이 사용되는 세 가지 브로커는 매우 다른 특성을 가지고 있습니다.
| 특징 | 모기 2.x | EMQX 5.x | HiveMQ 4.x |
|---|---|---|---|
| 최대. 사이 | 100K(단일 노드) | 100M(23노드 클러스터) | 200M(클러스터) |
| 클러스터링 | 네이티브 없음 | 예(Erlang 배포) | 예(기업) |
| MQTT 버전 | 3.1, 3.1.1, 5.0 | 3.1, 3.1.1, 5.0 | 3.1, 3.1.1, 5.0 |
| 규칙 엔진 | No | 예(SQL 기반) | 예(기업) |
| 다리 | Si | Si | Si |
| 입증 | 파일, TLS, 플러그인 | JWT, OAuth2, LDAP, mTLS | JWT, OAuth2, 엔터프라이즈 |
| 특허 | EPL/EDL(오픈 소스) | 아파치 2.0 / 엔터프라이즈 | 커뮤니티 / 기업 |
| RAM(100K 연결) | ~500MB | ~2GB | ~3GB |
| 사용 사례 에너지 | 단일 공장, 엣지 게이트웨이, 연구소 | 다중 사이트, 유틸리티 규모, 엔터프라이즈 | 유틸리티 규모, 기업 규정 준수 |
생산을 위한 모기 구성
# /etc/mosquitto/mosquitto.conf - Produzione con TLS e ACL
# Network listeners
listener 1883 localhost
listener 8883
protocol mqtt
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
require_certificate true # mutual TLS
use_identity_as_username true
# WebSocket listener per dashboard web
listener 9001
protocol websockets
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
# Autenticazione
allow_anonymous false
password_file /etc/mosquitto/passwd
acl_file /etc/mosquitto/acl
# Persistenza
persistence true
persistence_location /var/lib/mosquitto/
autosave_interval 60
# Logging
log_dest file /var/log/mosquitto/mosquitto.log
log_type error
log_type warning
log_type notice
log_type information
log_timestamp true
log_timestamp_format %Y-%m-%dT%H:%M:%S
# Performance tuning
max_queued_messages 10000
max_packet_size 1048576 # 1MB
message_size_limit 262144 # 256KB per messaggio
max_connections 10000
max_keepalive 300
# /etc/mosquitto/acl - Access Control List per ruoli
# Telegraf: legge tutto
user telegraf_reader
topic read plant/#
topic read wind/#
topic read ev/#
# Gateway PV001: scrive solo per il suo impianto
user gateway_PV001
topic write plant/PV001/#
topic read plant/PV001/commands/#
# Gateway EV SITE001: scrive solo per il suo sito
user gateway_SITE001
topic write ev/SITE001/#
# Dashboard/SCADA: solo lettura
user dashboard_user
topic read plant/#
topic read wind/#
topic read ev/#
# Admin: tutto (solo per maintenance)
user mqtt_admin
topic #
게이트웨이 엣지: 브리지 Modbus → Python을 사용한 MQTT
엣지 게이트웨이는 산업 현장 프로토콜의 세계를 연결하는 구성 요소입니다. MQTT 생태계를 통해 다음은 광전지 인버터에 대한 완전한 구현입니다. WAN 연결 끊김을 처리하기 위한 로컬 버퍼링을 포함한 Modbus TCP 프로토콜.
#!/usr/bin/env python3
"""
gateway_modbus_mqtt.py
Gateway edge per inverter fotovoltaici Modbus TCP → MQTT
Compatibile con registro SunSpec (standard de facto per inverter PV)
"""
import asyncio
import json
import logging
import time
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Optional
import aiofiles
from pymodbus.client import AsyncModbusTcpClient
from pymodbus.exceptions import ModbusException
import aiomqtt
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('gateway')
@dataclass
class InverterMetrics:
"""Metriche SunSpec inverter (Model 103 - Three Phase Inverter)"""
timestamp: str
plant_id: str
inverter_id: str
# AC output
ac_power_w: float # Potenza AC totale (W)
ac_voltage_l1_v: float # Tensione fase L1 (V)
ac_voltage_l2_v: float # Tensione fase L2 (V)
ac_voltage_l3_v: float # Tensione fase L3 (V)
ac_current_a: float # Corrente totale AC (A)
ac_frequency_hz: float # Frequenza rete (Hz)
# DC input
dc_voltage_v: float # Tensione DC ingresso (V)
dc_current_a: float # Corrente DC ingresso (A)
dc_power_w: float # Potenza DC ingresso (W)
# Energy
energy_wh_total: float # Energia prodotta totale (Wh)
energy_wh_today: float # Energia prodotta oggi (Wh)
# Thermal
temp_cabinet_c: float # Temperatura cabinet inverter (C)
temp_heatsink_c: float # Temperatura heatsink (C)
# Status
operating_state: int # 1=Off, 2=Sleeping, 3=Starting, 4=MPPT, 5=Throttled
error_code: int # Codice errore (0=no error)
# Efficiency
efficiency_pct: float # Efficienza conversione (%)
class ModbusMqttGateway:
"""
Gateway che legge metriche Modbus da inverter SunSpec
e le pubblica su MQTT con buffering locale per disconnessioni WAN
"""
def __init__(self, config: dict):
self.plant_id = config['plant_id']
self.inverters = config['inverters'] # lista di {id, host, port, unit_id}
self.mqtt_host = config['mqtt']['host']
self.mqtt_port = config['mqtt']['port']
self.mqtt_user = config['mqtt']['username']
self.mqtt_password = config['mqtt']['password']
self.poll_interval = config.get('poll_interval_sec', 5)
self.buffer_file = config.get('buffer_file', '/var/lib/gateway/buffer.jsonl')
self.clients: dict[str, AsyncModbusTcpClient] = {}
self._buffer: list[dict] = []
self._mqtt_connected = False
async def connect_inverter(self, inverter_cfg: dict) -> AsyncModbusTcpClient:
"""Crea e connette client Modbus TCP"""
client = AsyncModbusTcpClient(
host=inverter_cfg['host'],
port=inverter_cfg.get('port', 502),
timeout=5,
retries=3,
retry_on_empty=True,
)
await client.connect()
if not client.connected:
raise ConnectionError(
f"Cannot connect to inverter {inverter_cfg['id']} "
f"at {inverter_cfg['host']}:{inverter_cfg.get('port', 502)}"
)
logger.info(f"Connected to inverter {inverter_cfg['id']}")
return client
async def read_sunspec_model103(
self,
client: AsyncModbusTcpClient,
unit_id: int,
plant_id: str,
inverter_id: str
) -> Optional[InverterMetrics]:
"""
Legge registri SunSpec Model 103 (Three Phase Inverter)
SunSpec base address: 40001 (Modbus address 40000)
Model 103 start: 40070 (tipico, dipende da inverter)
"""
try:
# Leggi blocco principale Model 103: 40069 - 40094 (26 registri)
result = await client.read_holding_registers(
address=40069,
count=26,
slave=unit_id
)
if result.isError():
logger.error(f"Modbus error reading INV {inverter_id}: {result}")
return None
regs = result.registers
# Scala con SF (scale factors) SunSpec
ac_current_sf = self._sf(regs[10])
ac_voltage_sf = self._sf(regs[14])
ac_power_sf = self._sf(regs[17])
ac_freq_sf = self._sf(regs[19])
dc_current_sf = self._sf(regs[21])
dc_voltage_sf = self._sf(regs[23])
dc_power_sf = self._sf(regs[25])
# Leggi energia prodotta (registri 40093-40096)
energy_result = await client.read_holding_registers(
address=40093, count=4, slave=unit_id
)
energy_regs = energy_result.registers
energy_total_wh = ((energy_regs[0] << 16) | energy_regs[1]) * 1.0
# Leggi temperatura (registri 40103-40108)
temp_result = await client.read_holding_registers(
address=40103, count=3, slave=unit_id
)
temp_regs = temp_result.registers
ac_power_raw = self._signed(regs[16])
dc_power_w = self._signed(regs[24]) * (10 ** dc_power_sf)
efficiency = (ac_power_raw / dc_power_w * 100) if dc_power_w > 0 else 0.0
return InverterMetrics(
timestamp=datetime.now(timezone.utc).isoformat(),
plant_id=plant_id,
inverter_id=inverter_id,
ac_power_w=ac_power_raw * (10 ** ac_power_sf),
ac_voltage_l1_v=regs[11] * (10 ** ac_voltage_sf),
ac_voltage_l2_v=regs[12] * (10 ** ac_voltage_sf),
ac_voltage_l3_v=regs[13] * (10 ** ac_voltage_sf),
ac_current_a=(regs[7] + regs[8] + regs[9]) * (10 ** ac_current_sf),
ac_frequency_hz=regs[18] * (10 ** ac_freq_sf),
dc_voltage_v=self._signed(regs[22]) * (10 ** dc_voltage_sf),
dc_current_a=self._signed(regs[20]) * (10 ** dc_current_sf),
dc_power_w=dc_power_w,
energy_wh_total=energy_total_wh,
energy_wh_today=0.0, # da registro specifico vendor
temp_cabinet_c=self._signed(temp_regs[0]) * 0.01,
temp_heatsink_c=self._signed(temp_regs[1]) * 0.01,
operating_state=regs[0],
error_code=regs[1],
efficiency_pct=round(efficiency, 2),
)
except ModbusException as e:
logger.error(f"Modbus exception for {inverter_id}: {e}")
return None
def _signed(self, val: int) -> int:
"""Converte uint16 in int16 (signed)"""
return val if val < 32768 else val - 65536
def _sf(self, val: int) -> int:
"""Scale factor SunSpec: int16 nel range -10..10"""
s = self._signed(val)
return s if -10 <= s <= 10 else 0
async def publish_metrics(
self,
mqtt_client: aiomqtt.Client,
metrics: InverterMetrics
) -> None:
"""Pubblica metriche su MQTT con QoS 1"""
topic = f"plant/{metrics.plant_id}/inverter/{metrics.inverter_id}/metrics"
payload = json.dumps(asdict(metrics), default=str)
await mqtt_client.publish(
topic=topic,
payload=payload.encode(),
qos=1,
retain=False,
)
# Pubblica anche stato con retain
status = {
"timestamp": metrics.timestamp,
"operating_state": metrics.operating_state,
"error_code": metrics.error_code,
"online": True,
}
await mqtt_client.publish(
topic=f"plant/{metrics.plant_id}/inverter/{metrics.inverter_id}/status",
payload=json.dumps(status).encode(),
qos=1,
retain=True, # retain per status
)
async def run(self):
"""Loop principale del gateway"""
# Carica buffer da disco se esiste
await self._load_buffer()
# Will message: pubblica offline status se il gateway crolla
will = aiomqtt.Will(
topic=f"plant/{self.plant_id}/gateway/GW001/status",
payload=json.dumps({"online": False, "reason": "unexpected_disconnect"}).encode(),
qos=1,
retain=True,
)
async with aiomqtt.Client(
hostname=self.mqtt_host,
port=self.mqtt_port,
username=self.mqtt_user,
password=self.mqtt_password,
will=will,
keepalive=60,
tls_params=aiomqtt.TLSParameters(
ca_certs='/etc/gateway/certs/ca.crt',
certfile='/etc/gateway/certs/gateway.crt',
keyfile='/etc/gateway/certs/gateway.key',
),
) as mqtt:
self._mqtt_connected = True
logger.info("MQTT connected")
# Pubblica online status con retain
await mqtt.publish(
topic=f"plant/{self.plant_id}/gateway/GW001/status",
payload=json.dumps({"online": True}).encode(),
qos=1, retain=True,
)
# Svuota buffer offline
await self._flush_buffer(mqtt)
# Connetti a tutti gli inverter
for inv_cfg in self.inverters:
try:
self.clients[inv_cfg['id']] = await self.connect_inverter(inv_cfg)
except ConnectionError as e:
logger.error(f"Failed to connect: {e}")
# Loop principale di polling
while True:
poll_start = time.monotonic()
tasks = [
self._poll_and_publish(inv_cfg, mqtt)
for inv_cfg in self.inverters
]
await asyncio.gather(*tasks, return_exceptions=True)
elapsed = time.monotonic() - poll_start
sleep_time = max(0, self.poll_interval - elapsed)
await asyncio.sleep(sleep_time)
async def _poll_and_publish(self, inv_cfg: dict, mqtt: aiomqtt.Client):
"""Polling singolo inverter e pubblicazione"""
inv_id = inv_cfg['id']
client = self.clients.get(inv_id)
if client is None or not client.connected:
try:
client = await self.connect_inverter(inv_cfg)
self.clients[inv_id] = client
except ConnectionError:
return
metrics = await self.read_sunspec_model103(
client, inv_cfg.get('unit_id', 1),
self.plant_id, inv_id
)
if metrics:
try:
await self.publish_metrics(mqtt, metrics)
except Exception as e:
logger.warning(f"MQTT publish failed, buffering: {e}")
self._buffer.append(asdict(metrics))
await self._save_buffer()
async def _load_buffer(self):
"""Carica buffer dal disco"""
try:
async with aiofiles.open(self.buffer_file) as f:
async for line in f:
self._buffer.append(json.loads(line))
logger.info(f"Loaded {len(self._buffer)} buffered messages")
except FileNotFoundError:
pass
async def _save_buffer(self):
"""Salva buffer su disco"""
async with aiofiles.open(self.buffer_file, 'w') as f:
for item in self._buffer:
await f.write(json.dumps(item) + '\n')
async def _flush_buffer(self, mqtt: aiomqtt.Client):
"""Pubblica messaggi bufferizzati"""
if not self._buffer:
return
logger.info(f"Flushing {len(self._buffer)} buffered messages")
published = []
for item in self._buffer:
try:
metrics = InverterMetrics(**item)
await self.publish_metrics(mqtt, metrics)
published.append(item)
except Exception as e:
logger.error(f"Failed to flush buffered message: {e}")
break
self._buffer = [i for i in self._buffer if i not in published]
await self._save_buffer()
if __name__ == '__main__':
config = {
'plant_id': 'PV001',
'poll_interval_sec': 5,
'mqtt': {
'host': '10.0.1.10',
'port': 8883,
'username': 'gateway_PV001',
'password': 'secret',
},
'inverters': [
{'id': 'INV001', 'host': '192.168.1.101', 'port': 502, 'unit_id': 1},
{'id': 'INV002', 'host': '192.168.1.102', 'port': 502, 'unit_id': 1},
# ... fino a INV200 per impianto da 10 MW
],
'buffer_file': '/var/lib/gateway/buffer.jsonl',
}
asyncio.run(ModbusMqttGateway(config).run())
Telegraf: MQTT Bridge → 변환 파이프라인이 있는 InfluxDB
Telegraf는 Go로 작성되었으며 300개 이상의 플러그인을 갖춘 InfluxData의 데이터 수집기입니다. 우리 스택에서 Telegraf는 MQTT 주제를 구독하고 JSON 페이로드를 디코딩합니다. 프로세서를 통해 변환을 적용하고 InfluxDB에 씁니다. 버전 1.30+ 기본적으로 MQTT 5.0 및 주제 구문 분석을 태그로 지원합니다.
Telegraf 구성 완료
# telegraf.conf - Configurazione completa per piattaforma IoT energetica
# ============================================================
# AGENT SETTINGS
# ============================================================
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 10000
metric_buffer_limit = 100000
collection_jitter = "1s"
flush_interval = "10s"
flush_jitter = "1s"
precision = "1ns"
debug = false
quiet = false
logtarget = "file"
logfile = "/var/log/telegraf/telegraf.log"
logfile_rotation_interval = "24h"
logfile_rotation_max_size = "50MB"
logfile_rotation_max_archives = 5
# ============================================================
# OUTPUT: InfluxDB 2.x / 3.x
# ============================================================
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "plant_metrics_raw"
timeout = "5s"
# Tag per routing - usato per bucket separati per plant
[outputs.influxdb_v2.tagpass]
data_type = ["inverter_metrics", "weather_metrics", "string_metrics"]
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "ev_metrics_raw"
timeout = "5s"
[outputs.influxdb_v2.tagpass]
data_type = ["ev_metrics"]
# Output di backup su file locale (per audit e recovery)
[[outputs.file]]
files = ["/var/log/telegraf/metrics.jsonl"]
rotation_interval = "1h"
rotation_max_size = "500MB"
data_format = "json"
[outputs.file.tagpass]
error_code = ["*"] # solo metriche con errori
# ============================================================
# INPUT: MQTT Consumer - Inverter Fotovoltaici
# ============================================================
[[inputs.mqtt_consumer]]
name_override = "pv_inverter"
servers = ["ssl://mosquitto:8883"]
topics = ["plant/+/inverter/+/metrics"]
qos = 1
connection_timeout = "30s"
max_undelivered_messages = 5000
# Autenticazione mTLS
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-energy-01"
persistent_session = true # QoS 1+ richiede sessione persistente
# Parsing topic come tag
# topic: plant/PV001/inverter/INV001/metrics
[[inputs.mqtt_consumer.topic_parsing]]
topic = "plant/+/inverter/+/metrics"
measurement = "_/plant_id/_/inverter_id/_"
tags = "_/plant_id/_/inverter_id/_"
# Estrae plant_id=PV001, inverter_id=INV001 come tag
# Formato payload
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_power_w"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l1_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l2_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l3_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_frequency_hz"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_voltage_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_power_w"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "energy_wh_total"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temp_cabinet_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temp_heatsink_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "operating_state"
type = "int"
[[inputs.mqtt_consumer.json_v2.field]]
path = "error_code"
type = "int"
[[inputs.mqtt_consumer.json_v2.field]]
path = "efficiency_pct"
type = "float"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "plant_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "inverter_id"
[[inputs.mqtt_consumer.json_v2.timestamp]]
path = "timestamp"
format = "2006-01-02T15:04:05.999999999Z07:00" # RFC3339Nano
# INPUT: MQTT Consumer - Stazioni Meteo
[[inputs.mqtt_consumer]]
name_override = "pv_weather"
servers = ["ssl://mosquitto:8883"]
topics = ["plant/+/weather/+/metrics"]
qos = 0 # QoS 0 per dati meteo (perdita occasionale accettabile)
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-weather-01"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "plant/+/weather/+/metrics"
tags = "_/plant_id/_/station_id/_"
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "irradiance_wm2"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temperature_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "wind_speed_ms"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "humidity_pct"
type = "float"
[[inputs.mqtt_consumer.json_v2.timestamp]]
path = "timestamp"
format = "2006-01-02T15:04:05Z07:00"
# INPUT: MQTT Consumer - Stazioni EV
[[inputs.mqtt_consumer]]
name_override = "ev_charger"
servers = ["ssl://mosquitto:8883"]
topics = ["ev/+/charger/+/connector/+/metrics"]
qos = 1
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-ev-01"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "ev/+/charger/+/connector/+/metrics"
tags = "_/site_id/_/charger_id/_/connector_id/_"
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "power_kw"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "energy_kwh"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "voltage_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "status"
type = "string"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "site_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "charger_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "connector_id"
# ============================================================
# PROCESSORS: Trasformazione dati
# ============================================================
# Aggiungi tag data_type per routing output
[[processors.converter]]
namepass = ["pv_inverter"]
[processors.converter.tags]
string = ["data_type"]
[[processors.override]]
namepass = ["pv_inverter"]
[processors.override.tags]
data_type = "inverter_metrics"
[[processors.override]]
namepass = ["ev_charger"]
[processors.override.tags]
data_type = "ev_metrics"
# Filtra metriche con valori anomali (sanity check)
[[processors.dedup]]
dedup_interval = "1s" # evita duplicati QoS 1
[[processors.enum]]
namepass = ["pv_inverter"]
[[processors.enum.mapping]]
field = "operating_state"
dest = "operating_state_str"
[processors.enum.mapping.value_mappings]
1 = "off"
2 = "sleeping"
3 = "starting"
4 = "mppt"
5 = "throttled"
6 = "shutting_down"
7 = "fault"
8 = "standby"
# ============================================================
# MONITORING: Telegraf self-monitoring
# ============================================================
[[inputs.internal]]
collect_memstats = true
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "telegraf_monitoring"
[outputs.influxdb_v2.namepass]
internal = ["*"]
InfluxDB: 시계열 저장 및 에너지 쿼리
에너지 시스템을 위한 InfluxDB 2.x / 3.x 아키텍처
InfluxDB는 에너지 IoT용 참조 시계열 데이터베이스입니다. 버전 2.x (2025년에도 여전히 널리 사용됨) Flux를 쿼리 언어로 사용하고 개념 버킷 통합 보존 정책으로 버전 3.x, Apache Arrow 및 DataFusion을 사용하여 Rust로 다시 작성하고 SQL을 기본 언어로 도입합니다. 높은 카디널리티에 대한 확장성을 대폭 향상합니다.
5초마다 샘플링하는 200개의 인버터를 갖춘 10MW 시스템의 경우 데이터 양은 상당함: 각 인버터는 약 15개의 필드를 생성하므로 5개마다 총 3,000개의 필드가 생성됩니다. 초 또는 분당 216,000개의 데이터 포인트입니다. 하루에 약 3억 1천 1백만 명이 발생합니다. 데이터 포인트. 30일 보존에는 약 93억 개의 원시 데이터 포인트가 필요합니다. 다운샘플링 전략은 필수적입니다.
초기 설정: 버킷 및 조직
#!/bin/bash
# setup_influxdb.sh - Configurazione iniziale InfluxDB per piattaforma energetica
INFLUX_HOST="http://localhost:8086"
INFLUX_TOKEN="my-super-secret-admin-token"
INFLUX_ORG="energy-corp"
# Setup iniziale (solo prima volta)
influx setup \
--host "$INFLUX_HOST" \
--username admin \
--password "SecurePassword123!" \
--org "$INFLUX_ORG" \
--bucket plant_metrics_raw \
--retention 720h \ # 30 giorni dati grezzi
--token "$INFLUX_TOKEN" \
--force
# Bucket dati grezzi PV (alta risoluzione: 30 giorni)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_raw" \
--retention "720h" \
--shard-group-duration "24h" # shard giornalieri per query efficienti
# Bucket downsampled 1 ora (retention 1 anno)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_1h" \
--retention "8760h" \
--shard-group-duration "168h" # shard settimanali
# Bucket downsampled 1 giorno (retention 5 anni)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_1d" \
--retention "43800h" \ # ~5 anni
--shard-group-duration "720h" # shard mensili
# Bucket per alerting e eventi
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "energy_alerts" \
--retention "8760h" # 1 anno per audit
# Bucket per stazioni EV
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "ev_metrics_raw" \
--retention "720h"
# Token con permessi limitati per Telegraf (write only)
influx auth create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--description "Telegraf write-only token" \
--write-bucket plant_metrics_raw \
--write-bucket ev_metrics_raw \
--write-bucket telegraf_monitoring
# Token per Grafana (read only)
influx auth create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--description "Grafana read-only token" \
--read-bucket plant_metrics_raw \
--read-bucket plant_metrics_1h \
--read-bucket plant_metrics_1d \
--read-bucket ev_metrics_raw \
--read-bucket energy_alerts
echo "InfluxDB setup completato"
Flux 작업: 다운샘플링 및 자동 경고
Flux 작업은 Telegraf가 InfluxDB 내에서 주기적으로 실행하는 예약된 작업입니다. 원시 데이터 자동 다운샘플링, KPI 계산이라는 세 가지 목적으로 사용합니다. 버킷 경고 쓰기를 통한 집계 및 이상 탐지.
// task_downsample_1h.flux
// Task: Downsampling dati inverter da 5s a 1h
// Schedulato ogni ora, processa ultima ora
option task = {
name: "downsample_pv_inverter_1h",
every: 1h,
offset: 5m, // aspetta 5 min per assicurarsi che tutti i dati siano arrivati
}
from(bucket: "plant_metrics_raw")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) =>
r._field == "ac_power_w" or
r._field == "dc_power_w" or
r._field == "energy_wh_total" or
r._field == "ac_voltage_l1_v" or
r._field == "temp_cabinet_c" or
r._field == "efficiency_pct"
)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column) => tables
|> mean(column: column), // media oraria
createEmpty: false
)
// Aggiungi statistiche aggiuntive per power
|> map(fn: (r) => ({r with _measurement: "pv_inverter_1h"}))
|> to(bucket: "plant_metrics_1h", org: "energy-corp")
// task_plant_kpi.flux
// Task: Calcolo KPI giornalieri per impianto
// Produccion, efficienza, performance ratio
option task = {
name: "calculate_plant_daily_kpi",
every: 15m,
}
// Potenza attiva totale per impianto (somma tutti inverter)
totalPower = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "ac_power_w")
|> aggregateWindow(every: 15m, fn: sum, createEmpty: false)
|> group(columns: ["plant_id", "_time"])
|> sum()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "total_ac_power_w",
}))
// Energia prodotta totale (max energy_wh_total - min energy_wh_total per periodo)
energyDelta = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "energy_wh_total")
|> difference()
|> group(columns: ["plant_id", "_time"])
|> sum()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "energy_delta_wh",
}))
// Temperatura media inverter
avgTemp = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> aggregateWindow(every: 15m, fn: mean, createEmpty: false)
|> group(columns: ["plant_id", "_time"])
|> mean()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "avg_inverter_temp_c",
}))
union(tables: [totalPower, energyDelta, avgTemp])
|> to(bucket: "plant_metrics_1h", org: "energy-corp")
// task_thermal_alert.flux
// Task: Rilevamento surriscaldamento inverter
// Alert se temperatura > 75°C per più di 5 minuti
option task = {
name: "thermal_alert_inverter",
every: 1m,
}
threshold = 75.0
from(bucket: "plant_metrics_raw")
|> range(start: -6m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> filter(fn: (r) => r._value > threshold)
// Conta quanti valori sopra soglia negli ultimi 6 min (= almeno 5 min)
|> aggregateWindow(every: 6m, fn: count, createEmpty: false)
|> filter(fn: (r) => r._value >= 60) // 60 campioni * 5sec = 5 min
|> map(fn: (r) => ({r with
_measurement: "energy_alert",
_field: "alert_type",
_value: "thermal_overtemperature",
severity: "high",
description: "Inverter temperature exceeds " + string(v: threshold) + "°C for > 5 minutes",
}))
|> to(bucket: "energy_alerts", org: "energy-corp")
태양광 분석을 위한 쿼리 Flux
// query_production_analysis.flux
// Analisi produzione giornaliera per impianto PV001
// Confronto con irraggiamento (Performance Ratio)
import "math"
// Produzione AC ogni 5 minuti
production = from(bucket: "plant_metrics_raw")
|> range(start: today(), stop: now())
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "ac_power_w")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: true)
|> group(columns: ["plant_id"])
|> sum() // somma tutti gli inverter per potenza totale impianto
// Irraggiamento dalla stazione meteo
irradiance = from(bucket: "plant_metrics_raw")
|> range(start: today(), stop: now())
|> filter(fn: (r) => r._measurement == "pv_weather")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "irradiance_wm2")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: true)
// Join per calcolo Performance Ratio
// PR = (energia prodotta / (irraggiamento * superficie)) * 100
join(
tables: {production: production, irradiance: irradiance},
on: ["_time", "plant_id"]
)
// PV001: 10 MW peak, superficie pannelli ~50.000 m2
|> map(fn: (r) => ({r with
performance_ratio: if r.irradiance_irradiance_wm2 > 50.0 then
(r.production__value / (r.irradiance_irradiance_wm2 * 50000.0)) * 100.0
else
0.0,
}))
|> keep(columns: ["_time", "production__value", "irradiance_irradiance_wm2", "performance_ratio"])
Docker Compose: 완전한 프로덕션 스택
다음 Docker Compose는 프로덕션 배포의 시작점입니다. 풀 스택. 각 서비스에는 상태 확인, 재시작 정책 및 볼륨 구성이 있습니다. 보안을 위한 지속적이고 격리된 네트워킹.
# docker-compose.yml
# Stack IoT Energetico: Mosquitto + Telegraf + InfluxDB + Grafana
# Produzione-ready con TLS, health checks e persistent volumes
version: '3.9'
networks:
iot_network:
driver: bridge
ipam:
config:
- subnet: 172.20.0.0/24
# Network separata per accesso esterno (solo Grafana e Mosquitto esposti)
external_network:
driver: bridge
volumes:
mosquitto_data:
mosquitto_logs:
influxdb_data:
influxdb_config:
grafana_data:
grafana_provisioning:
telegraf_buffer:
services:
# ============================================================
# MOSQUITTO MQTT BROKER
# ============================================================
mosquitto:
image: eclipse-mosquitto:2.0.20
container_name: mosquitto
restart: unless-stopped
networks:
- iot_network
- external_network
ports:
- "1883:1883" # MQTT plain (solo per sviluppo/testing locale)
- "8883:8883" # MQTT over TLS
- "9001:9001" # WebSocket over TLS
volumes:
- ./config/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro
- ./config/mosquitto/passwd:/mosquitto/config/passwd:ro
- ./config/mosquitto/acl:/mosquitto/config/acl:ro
- ./certs:/etc/mosquitto/certs:ro
- mosquitto_data:/mosquitto/data
- mosquitto_logs:/mosquitto/log
healthcheck:
test: ["CMD", "mosquitto_pub", "-h", "localhost", "-p", "1883",
"-t", "health", "-m", "ping", "--quiet"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
deploy:
resources:
limits:
memory: 512m
cpus: '1.0'
# ============================================================
# INFLUXDB 2.7 (stable production)
# ============================================================
influxdb:
image: influxdb:2.7-alpine
container_name: influxdb
restart: unless-stopped
networks:
- iot_network
ports:
- "8086:8086" # solo su rete interna, proxy via Nginx se necessario
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: {{ "$INFLUXDB_ADMIN_PASSWORD" }}
DOCKER_INFLUXDB_INIT_ORG: energy-corp
DOCKER_INFLUXDB_INIT_BUCKET: plant_metrics_raw
DOCKER_INFLUXDB_INIT_RETENTION: 720h
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: {{ "$INFLUXDB_TOKEN" }}
# Performance tuning
INFLUXD_STORAGE_CACHE_MAX_MEMORY_SIZE: 1073741824 # 1GB cache
INFLUXD_STORAGE_COMPACT_THROUGHPUT_BURST: 50331648 # 48MB/s compaction
INFLUXD_HTTP_MAX_BODY_SIZE: 104857600 # 100MB max body
INFLUXD_QUERY_MEMORY_BYTES: 2147483648 # 2GB query memory limit
volumes:
- influxdb_data:/var/lib/influxdb2
- influxdb_config:/etc/influxdb2
healthcheck:
test: ["CMD", "influx", "ping"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
deploy:
resources:
limits:
memory: 4g
cpus: '2.0'
# ============================================================
# TELEGRAF
# ============================================================
telegraf:
image: telegraf:1.34-alpine
container_name: telegraf
restart: unless-stopped
networks:
- iot_network
- external_network # per raggiungere MQTT su rete esterna
depends_on:
influxdb:
condition: service_healthy
mosquitto:
condition: service_healthy
environment:
INFLUXDB_TOKEN: {{ "$INFLUXDB_TOKEN" }}
MQTT_PASSWORD: {{ "$MQTT_PASSWORD" }}
HOSTNAME: telegraf-energy-01
volumes:
- ./config/telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro
- ./certs:/etc/telegraf/certs:ro
- telegraf_buffer:/var/lib/telegraf
- /var/log/telegraf:/var/log/telegraf
healthcheck:
test: ["CMD", "telegraf", "--test", "--config", "/etc/telegraf/telegraf.conf"]
interval: 60s
timeout: 30s
retries: 3
deploy:
resources:
limits:
memory: 1g
cpus: '1.0'
# ============================================================
# GRAFANA
# ============================================================
grafana:
image: grafana/grafana-oss:11.4.0
container_name: grafana
restart: unless-stopped
networks:
- iot_network
- external_network
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: {{ "$GRAFANA_ADMIN_PASSWORD" }}
GF_SECURITY_SECRET_KEY: {{ "$GRAFANA_SECRET_KEY" }}
GF_SERVER_ROOT_URL: "https://grafana.energy-corp.it"
GF_SERVER_DOMAIN: "grafana.energy-corp.it"
# Analytics disabilitate per privacy
GF_ANALYTICS_REPORTING_ENABLED: "false"
GF_ANALYTICS_CHECK_FOR_UPDATES: "false"
# SMTP per alerting
GF_SMTP_ENABLED: "true"
GF_SMTP_HOST: {{ "$SMTP_HOST" }}
GF_SMTP_USER: {{ "$SMTP_USER" }}
GF_SMTP_PASSWORD: {{ "$SMTP_PASSWORD" }}
GF_SMTP_FROM_ADDRESS: "alerts@energy-corp.it"
# InfluxDB datasource
GF_DATASOURCES_DEFAULT: InfluxDB
INFLUXDB_TOKEN_GRAFANA: {{ "$INFLUXDB_TOKEN_GRAFANA" }}
volumes:
- grafana_data:/var/lib/grafana
- ./config/grafana/provisioning:/etc/grafana/provisioning:ro
- ./config/grafana/dashboards:/var/lib/grafana/dashboards:ro
depends_on:
influxdb:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:3000/api/health || exit 1"]
interval: 30s
timeout: 10s
retries: 3
deploy:
resources:
limits:
memory: 512m
cpus: '0.5'
# ============================================================
# NGINX REVERSE PROXY (opzionale ma raccomandato)
# ============================================================
nginx:
image: nginx:1.27-alpine
container_name: nginx
restart: unless-stopped
networks:
- external_network
ports:
- "80:80"
- "443:443"
volumes:
- ./config/nginx/nginx.conf:/etc/nginx/nginx.conf:ro
- ./certs:/etc/nginx/certs:ro
- /var/log/nginx:/var/log/nginx
depends_on:
- grafana
- mosquitto
healthcheck:
test: ["CMD", "nginx", "-t"]
interval: 30s
timeout: 10s
retries: 3
자동 Grafana 프로비저닝
Grafana는 YAML 파일을 통해 데이터 소스 및 대시보드의 자동 프로비저닝을 지원합니다.
이는 컨테이너가 다시 생성되는 컨테이너화된 환경에 매우 중요합니다.
파일 config/grafana/provisioning/datasources/influxdb.yml 반드시 포함해야 함
InfluxDB 데이터 소스의 구성, JSON 형식의 대시보드는
config/grafana/dashboards/.
# config/grafana/provisioning/datasources/influxdb.yml
apiVersion: 1
datasources:
- name: InfluxDB-Raw
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: plant_metrics_raw
tlsSkipVerify: false
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
- name: InfluxDB-1h
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: plant_metrics_1h
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
- name: InfluxDB-Alerts
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: energy_alerts
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
Grafana: 태양광 발전 시스템을 위한 대시보드 및 경고
PV 시스템 대시보드 구조
태양광 발전 시스템을 위한 효과적인 대시보드는 운영 관련 질문에 답해야 합니다. 시각적으로 읽는 데 3초도 채 걸리지 않습니다. 권장되는 구조는 행으로 구성됩니다.
- 1행 - 일일 KPI: 오늘의 생산량(kWh)이 포함된 4개의 "상태" 패널, 현재 전력(kW), 성능 비율(%), 결함이 있는 인버터 수. 30년대 업데이트.
- 라인 2 - 임시 생산: 총 AC 전력이 표시된 면적 차트 지난 24시간 동안의 정규화된 방사조도에 중첩됩니다. InfluxDB-Raw 쿼리 5분 집계 창.
- 라인 3 - 인버터 히트맵: X축 = 시간, Y축 = 인버터_id로 구성된 히트맵 값 = AC 전원. 이를 통해 생산량이 적은 인버터를 시각적으로 식별할 수 있습니다. 1분 업데이트.
- 라인 4 - 온도 및 경보: 최대 인버터 온도에 대한 게이지 차트, 타임스탬프별로 정렬된 Energy_alerts 버킷의 최근 50개 알람이 포함된 테이블 패널.
- 라인 5 - 네트워크 효율성 및 품질: 산점도 효율성과 온도, 주전원 주파수 및 위상 전압에 대한 시계열.
고급 경고: Grafana + PagerDuty + Slack
# config/grafana/provisioning/alerting/rules.yml
apiVersion: 1
groups:
- name: "PV Plant Alerts"
folder: "Energy Alerts"
interval: 1m
rules:
# Alert: Produzione impianto drasticamente ridotta
- uid: pv001-production-drop
title: "PV001 - Produzione ridotta >30%"
condition: C
data:
# Query A: Produzione attuale (15 min rolling mean)
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "ac_power_w")
|> mean()
|> sum()
# Query B: Irraggiamento attuale (per normalizzare)
- refId: B
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_weather")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "irradiance_wm2")
|> mean()
# Condition C: se irraggiamento > 200 W/m2 (giorno chiaro)
# e produzione normalizzata < 70% del teorico
- refId: C
datasourceUid: __expr__
model:
type: math
expression: "$A / ($B * 50000) < 0.70 and $B > 200"
for: 10m # alert solo se condizione persiste 10 min
labels:
severity: "warning"
plant_id: "PV001"
category: "production"
annotations:
summary: "PV001: produzione sotto il 70% del teorico"
description: |
Produzione attuale: {{ $A }} W
Irraggiamento: {{ $B }} W/m2
Performance attesa: > 70%
runbook_url: "https://wiki.energy-corp.it/runbook/pv-production-drop"
# Alert: Inverter in fault
- uid: pv001-inverter-fault
title: "PV001 - Inverter Fault Rilevato"
condition: C
data:
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -2m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "operating_state")
|> filter(fn: (r) => r._value == 7) // stato fault
|> count()
- refId: C
datasourceUid: __expr__
model:
type: threshold
expression: "$A"
conditions:
- evaluator:
params: [0]
type: gt
for: 1m
labels:
severity: "critical"
plant_id: "PV001"
annotations:
summary: "Inverter in stato FAULT rilevato"
# Alert: Temperatura critica
- uid: pv001-thermal-critical
title: "PV001 - Temperatura Inverter Critica"
condition: C
data:
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> max()
- refId: C
datasourceUid: __expr__
model:
type: threshold
expression: "$A"
conditions:
- evaluator:
params: [85]
type: gt
for: 5m
labels:
severity: "critical"
category: "thermal"
# Contact points
contactPoints:
- name: "PagerDuty Critical"
receivers:
- uid: pagerduty-critical
type: pagerduty
settings:
integrationKey: $PAGERDUTY_INTEGRATION_KEY
severity: critical
class: "EnergyAlerts"
component: "PV Plant"
- name: "Slack Operations"
receivers:
- uid: slack-ops
type: slack
settings:
url: $SLACK_WEBHOOK_URL
channel: "#energy-ops-alerts"
username: "Grafana Alert Bot"
iconEmoji: ":zap:"
title: "{{ template \"slack.title\" . }}"
text: "{{ template \"slack.message\" . }}"
# Routing
policies:
- receiver: "Slack Operations"
group_by: ["plant_id", "severity"]
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
routes:
- receiver: "PagerDuty Critical"
matchers:
- "severity = critical"
continue: true # invia anche a Slack
보안: TLS, 인증 및 네트워크 분할
에너지 및 중요한 IoT 플랫폼의 보안: 생산 데이터는 민감한 상업정보, 인버터에 명령을 보내는 기능 무단 접근으로부터 보호되어야 합니다. 심층 방어 전략 4가지 레벨로 나누어져 있습니다.
스크립트를 사용한 TLS 인증서 생성
#!/bin/bash
# generate_certs.sh - Genera CA e certificati per mutual TLS
set -euo pipefail
CERTS_DIR="./certs"
mkdir -p "$CERTS_DIR"
# 1. CA (Certificate Authority) - Root of trust
openssl genrsa -out "$CERTS_DIR/ca.key" 4096
openssl req -x509 -new -nodes \
-key "$CERTS_DIR/ca.key" \
-sha256 -days 3650 \
-out "$CERTS_DIR/ca.crt" \
-subj "/C=IT/ST=Puglia/L=Bari/O=EnergyCorp/CN=Energy IoT CA"
# 2. Server certificate (Mosquitto)
openssl genrsa -out "$CERTS_DIR/server.key" 2048
openssl req -new \
-key "$CERTS_DIR/server.key" \
-out "$CERTS_DIR/server.csr" \
-subj "/C=IT/O=EnergyCorp/CN=mosquitto"
# SAN per tutti i possibili hostname
cat > "$CERTS_DIR/server-ext.cnf" <<EOF
[req]
req_extensions = v3_req
[v3_req]
subjectAltName = @alt_names
[alt_names]
DNS.1 = mosquitto
DNS.2 = mqtt.energy-corp.it
DNS.3 = localhost
IP.1 = 127.0.0.1
IP.2 = 172.20.0.10
EOF
openssl x509 -req \
-in "$CERTS_DIR/server.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/server.crt" \
-days 825 \
-sha256 \
-extfile "$CERTS_DIR/server-ext.cnf" \
-extensions v3_req
# 3. Client certificate per Telegraf
openssl genrsa -out "$CERTS_DIR/telegraf.key" 2048
openssl req -new \
-key "$CERTS_DIR/telegraf.key" \
-out "$CERTS_DIR/telegraf.csr" \
-subj "/C=IT/O=EnergyCorp/CN=telegraf"
openssl x509 -req \
-in "$CERTS_DIR/telegraf.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/telegraf.crt" \
-days 825 -sha256
# 4. Client certificate per gateway PV001
openssl genrsa -out "$CERTS_DIR/gateway_PV001.key" 2048
openssl req -new \
-key "$CERTS_DIR/gateway_PV001.key" \
-out "$CERTS_DIR/gateway_PV001.csr" \
-subj "/C=IT/O=EnergyCorp/CN=gateway_PV001"
openssl x509 -req \
-in "$CERTS_DIR/gateway_PV001.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/gateway_PV001.crt" \
-days 365 -sha256 # 1 anno per gateway (rotazione annuale)
echo "Certificati generati in $CERTS_DIR/"
echo "CA: ca.crt"
echo "Server (Mosquitto): server.crt + server.key"
echo "Client Telegraf: telegraf.crt + telegraf.key"
echo "Client Gateway PV001: gateway_PV001.crt + gateway_PV001.key"
안전: 생산 시 필수 관행
- 상호 TLS는 항상: 클라이언트와 서버 모두 인증서를 제시해야 합니다. 사용자 이름/비밀번호를 알고 있더라도 승인되지 않은 게이트웨이로부터의 연결을 방지합니다.
- 환경 변수 또는 비밀 관리자를 통한 비밀: 절대 하드 코딩하지 마세요 코드 또는 구성 파일의 InfluxDB 토큰, MQTT 비밀번호 또는 API 키 git에 전념했습니다. Docker 비밀, HashiCorp Vault 또는 AWS Secrets Manager를 사용하십시오.
- 네트워크 세분화: MQTT 브로커, InfluxDB 및 Telegraf는 그럴 필요가 없습니다. 절대 인터넷에 직접 노출되지 마세요. Grafana는 HTTPS 역방향 프록시를 통해서만 가능합니다. Edge 게이트웨이용 MQTT 브로커는 전용 VPN을 통해 노출될 수 있습니다.
- 인증서 순환: 게이트웨이 인증서는 1년 동안 유효합니다. (365일). Grafana 경보가 30일 전에 만료되도록 설정하세요.
- MQTT 감사 로그: 모든 연결 및 주제에 대한 로깅 활성화 모기에서. 비정상적인 액세스 감지를 위해 SIEM과 통합합니다.
성능: 대규모를 위한 벤치마킹 및 최적화
10MW 시스템의 스택 용량
10MW 태양광 발전소에는 일반적으로 각각 50kW의 인버터 200개가 있습니다. 5초마다 폴링하면 MQTT 메시지의 양은 다음과 같습니다.
| 요소 | 측정항목 | 용량 |
|---|---|---|
| 인버터 200개 x 5s | 15개 필드/메시지 | 40 메시지/초, 600 데이터 포인트/초 |
| 기상 관측소 10개 x 10개 | 8개 필드/메시지 | 1msg/초, 8개 데이터 포인트/초 |
| 식물 전체 x 15분 | 플럭스 집합체 | DB 내에서 계산됨 |
| Totale | - | ~650개 데이터 포인트/초, ~56M 데이터 포인트/일 |
| 원시 저장(30일) | ~16억 8천만 개의 데이터 포인트 | ~8-15GB(InfluxDB 압축 사용) |
이 볼륨은 단일 InfluxDB 2.7 노드로 풍부하게 관리할 수 있습니다. 8GB RAM 및 NVMe SSD. vCPU가 4개이고 RAM이 8GB인 VM의 EMQX는 100,000개의 연결을 처리합니다. 초당 100만 메시지 처리량을 자랑하는 경쟁업체입니다. 단일 설치의 경우 200개의 인버터, Raspberry Pi 4의 단일 Mosquitto 브로커 및 그 이상 (실제 테스트: 10K msg/sec, 500MB RAM)
MQTT 벤치마킹 스크립트
#!/usr/bin/env python3
"""
mqtt_benchmark.py
Benchmark del broker MQTT per simulare 200 inverter
Verifica throughput, latenza e affidabilità
"""
import asyncio
import json
import time
import random
import statistics
from datetime import datetime, timezone
import aiomqtt
async def simulate_inverter(
plant_id: str,
inverter_id: str,
broker_host: str,
broker_port: int,
duration_sec: int,
poll_interval: float = 5.0
) -> dict:
"""Simula un inverter che pubblica metriche su MQTT"""
messages_sent = 0
latencies = []
errors = 0
start_time = time.monotonic()
async with aiomqtt.Client(
hostname=broker_host,
port=broker_port,
client_id=f"bench_{plant_id}_{inverter_id}",
keepalive=30,
) as client:
while time.monotonic() - start_time < duration_sec:
# Simula metriche realistiche
base_power = 50000 * random.uniform(0.3, 1.0) # 0-50 kW
payload = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"plant_id": plant_id,
"inverter_id": inverter_id,
"ac_power_w": base_power,
"ac_voltage_l1_v": random.uniform(225, 235),
"dc_voltage_v": random.uniform(550, 650),
"dc_current_a": base_power / random.uniform(550, 650),
"temp_cabinet_c": random.uniform(35, 65),
"efficiency_pct": random.uniform(94, 98.5),
"operating_state": 4, # MPPT
"error_code": 0,
"energy_wh_total": messages_sent * base_power * poll_interval / 3600,
}
topic = f"plant/{plant_id}/inverter/{inverter_id}/metrics"
t_send = time.monotonic()
try:
await client.publish(
topic=topic,
payload=json.dumps(payload).encode(),
qos=1,
)
latencies.append((time.monotonic() - t_send) * 1000) # ms
messages_sent += 1
except Exception:
errors += 1
await asyncio.sleep(poll_interval)
return {
"inverter_id": inverter_id,
"messages_sent": messages_sent,
"errors": errors,
"avg_latency_ms": statistics.mean(latencies) if latencies else 0,
"p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)] if latencies else 0,
}
async def run_benchmark(
num_inverters: int = 200,
duration_sec: int = 60,
broker_host: str = "localhost",
broker_port: int = 1883,
) -> None:
"""Esegui benchmark con N inverter simulati"""
print(f"Avvio benchmark: {num_inverters} inverter per {duration_sec}s")
print(f"Broker: {broker_host}:{broker_port}")
print("-" * 60)
start = time.monotonic()
tasks = [
simulate_inverter(
plant_id="BENCH001",
inverter_id=f"INV{i:03d}",
broker_host=broker_host,
broker_port=broker_port,
duration_sec=duration_sec,
poll_interval=5.0,
)
for i in range(1, num_inverters + 1)
]
results = await asyncio.gather(*tasks)
elapsed = time.monotonic() - start
# Aggregazione risultati
total_messages = sum(r["messages_sent"] for r in results)
total_errors = sum(r["errors"] for r in results)
all_latencies_avg = [r["avg_latency_ms"] for r in results]
all_latencies_p99 = [r["p99_latency_ms"] for r in results]
print(f"Risultati benchmark:")
print(f" Durata: {elapsed:.1f}s")
print(f" Messaggi inviati: {total_messages:,}")
print(f" Errori: {total_errors} ({total_errors/total_messages*100:.2f}%)")
print(f" Throughput: {total_messages/elapsed:.1f} msg/sec")
print(f" Latenza media: {statistics.mean(all_latencies_avg):.2f} ms")
print(f" Latenza P99: {statistics.mean(all_latencies_p99):.2f} ms")
print(f" Inverter senza errori: {sum(1 for r in results if r['errors'] == 0)}/{num_inverters}")
if __name__ == "__main__":
asyncio.run(run_benchmark(
num_inverters=200,
duration_sec=120,
broker_host="localhost",
broker_port=1883,
))
사례 연구: 10MW 태양광 발전소 모니터링
이 사례 연구에서는 에너지 IoT 플랫폼의 실제 구현을 설명합니다. 태양광발전소를 위해 10MW 남부 이탈리아에서 50kW SMA Sunny Tripower 인버터 200대 8개의 기상 관측소가 있습니다.
초기 설정 및 과제
이 사이트는 10Mbps의 대역폭과 다양한 대기 시간을 보장하는 4G WAN 네트워크에 있습니다. 30 및 200ms. 각 인버터는 다음을 통해 로컬 LAN 네트워크(192.168.x.x)에서 Modbus TCP를 사용합니다. 산업용 스위치. 엣지 게이트웨이는 256GB SSD를 갖춘 Raspberry Pi 4(8GB RAM)입니다. 로컬 버퍼링을 위해 전용 데이터 SIM이 있는 산업용 4G 라우터를 통해 WAN에 연결됩니다.
| 매개변수 | Valore |
|---|---|
| 시스템 전원 | 10.2MW 피크(SMA STP50-US-40) |
| 인버터 번호 | 200(SMA Sunny Tripower, 50kW) |
| 기상 관측소 | 8(조도, T, HR, 바람) |
| 엣지 게이트웨이 | 라즈베리 파이 4 8GB + 256GB SSD |
| 폴링 간격 인버터 | 5초 |
| 날씨 폴링 간격 | 10초 |
| MQTT 메시지 볼륨 | ~41 msg/초(200 inv x 1/5s + 8 날씨 x 1/10s) |
| 사용된 WAN 대역폭 | ~180KB/s(사용 가능한 10Mbps의 2%) |
| MQTT 브로커 | 클라우드 VM의 EMQX 5.8(vCPU 4개, 8GB) |
| 인플럭스DB | 전용 VM(8 vCPU, 32GB RAM, NVMe)의 2.7.10 |
6개월간 수술한 결과
- 플랫폼 가용성: 99.94% (총 다운타임 : 6개월간 2.6시간, 클라우드 서버의 예정된 유지 관리를 위해 모두)
- WAN 연결 끊김으로 인해 데이터 손실: 0.002% (로컬 버퍼링 덕분에) 게이트웨이 및 MQTT 영구 세션)
- 자동으로 감지된 인버터 오류 경고: 6개월 동안 47개의 이벤트, 즉각적인 경고 덕분에 MTTR(평균 수리 시간)이 8시간에서 2.3시간으로 단축되었습니다. PagerDuty에서(다음날 수동 검색과 비교)
- 평균 성과 비율: 82.3% vs. 이전 기간 79.1%(제외) 실시간 모니터링): +3.2% 포인트 = +320 추가 MWh/년
- 예측 유지 관리로 인한 절감: 12개의 인버터 감지 고장 전 조기 성능 저하(평균 대비 방열판 온도 +8°C) 선언했다. 예방적 개입으로 응급 교체 대비 45,000 EUR 절감 효과가 있는 것으로 추산됩니다.
- 6개월 후 InfluxDB 스토리지: 원시 데이터용 18.7GB(30일 연속), 1시간 집계용 4.2GB(6개월), 1d 집계용 0.8GB(총 180일)
교훈: 버퍼링 에지는 협상 불가능합니다.
운영 첫 달 동안 4G 연결이 3번 중단되었습니다. 전화 교환원 문제로 인해 연장(각 2~6시간)됩니다. 없이 Raspberry Pi SSD의 로컬 버퍼링으로 인해 모든 측정값이 손실되었을 수 있습니다. 그 창 동안. 64GB 버퍼와 저장 및 전달 논리 덕분에 연결이 다시 이루어질 때마다 누적된 메시지는 모두 삭제됩니다. 원래 타임스탬프와 함께 시간순으로 MQTT에 게시되고 InfluxDB 잘못된 순서로 삽입되도록 올바르게 승인했습니다.
모범 사례 및 안티 패턴
모범 사례
- 브로커가 아닌 페이로드의 타임스탬프: 게이트웨이에는 항상 다음이 포함되어야 합니다. JSON 페이로드의 정확한 캡처 타임스탬프를 확인하려면 MQTT 브로커 수신. 오프라인 버퍼링의 경우 메시지가 늦게 도착합니다. 하지만 데이터는 일시적으로 정확합니다. InfluxDB는 순서가 잘못된 삽입을 허용합니다.
-
원격 측정 채널을 명령 채널과 분리합니다. 원격 측정 주제
(
plant/+/inverter/+/metrics) 및 명령 (plant/+/commands/#) 별도의 ACL이 있는 별도의 네임스페이스에 있어야 합니다. 명령에는 QoS 2가 필요합니다. 인증이 더욱 엄격해졌습니다. - 기록 데이터에 대한 공격적인 다운샘플링: 5초의 원시 데이터 사후 실패 분석에는 유용하지만 과거 추세 집계에는 충분합니다. 15분 또는 여러 번. Flux 다운샘플링 및 보존 작업을 즉시 구현합니다.
- 게이트웨이 상태 모니터링: 게이트웨이의 MQTT 하트비트 사용 (전용 주제에 대해 60초마다 게시) Grafana에서 모니터링합니다. 심장박동이 누락된 경우 3개 기간 동안 중요한 경고: 브로커 문제가 아니라 게이트웨이가 손상된 것일 수 있습니다.
- InfluxDB의 게이트웨이 시스템 메트릭: 엣지 게이트웨이의 Telegraf 메트릭 CPU, RAM, CPU 온도, 버퍼에 사용 가능한 디스크 공간을 수집할 수 있습니다. MQTT에 게시합니다. Raspberry Pi가 스트레스를 받고 있는지 확인하는 것이 중요합니다.
피해야 할 안티패턴
중요한 안티 패턴
-
집계 없이 너무 세분화된 주제: 하나하나 발매해
Modbus는 별도의 주제에 등록합니다(예:
plant/PV001/INV001/register/40001) 브로커에서 수만 개의 주제와 엄청난 메타데이터 오버헤드를 생성합니다. 항상 관련 로그를 장치당 단일 JSON 페이로드로 집계합니다. - 고주파 원격 측정을 위한 QoS 2: 4자간 악수는 QoS 2는 제어 메시지 수를 4배로 늘립니다. 1~5Hz 데이터의 경우 QoS 1을 사용하세요. (타임스탬프 멱등성을 통해 InfluxDB에서 처리하는 중복) 명령에만 QoS 2.
-
원격 측정 항목 유지: 유지는 상태 주제에 유용합니다.
(마지막 값이 중요함), 고주파 원격 측정 주제에 대해서는
브로커는 게시할 때마다 보유된 메시지를 오버헤드와 함께 업데이트해야 합니다.
스토리지와 CPU의 다음에만 유지를 사용하세요.
/statuse/config. -
InfluxDB의 무제한 카디널리티: 값이 높은 태그는 피하세요.
카디널리티(예:
session_idEV 스테이션의 태그로 새로운 생성 각 세션의 시리즈). 대신 태그가 아닌 필드로 사용하세요. - InfluxDB에는 속도 제한이 없습니다. 동시 쿼리 제한 없이, 단일 무거운 Grafana 쿼리(예: 6개월치의 원시 데이터 내보내기)가 포화될 수 있습니다. DB 메모리로 인해 킬러 OOM이 발생합니다. 항상 쿼리 제한을 구성하십시오.
리소스 및 기술 참조
- MQTT 5.0 사양(OASIS): 공식 MQTT 5.0 프로토콜 사양에는 QoS, 유지, 메시지, 세션 관리 및 사용자 속성이 있습니다. 구현을 위한 기본 사항 기업.
- Telegraf MQTT 소비자 플러그인: 공식 InfluxData 문서는 모든 구성 매개변수를 설명합니다. 주제 구문 분석 및 지원되는 데이터 형식을 포함한 mqtt_consumer 플러그인. URL: docs.influxdata.com/telegraf
- InfluxDB 플럭스 문서: Flux 언어는 강력하지만 학습 곡선이 있습니다. 문서 공식에는 AggregateWindow를 포함한 모든 기능을 갖춘 참조가 포함되어 있습니다. 조인, 피벗 및 매핑.
- SunSpec 동맹: SunSpec 표준은 광전지 인버터에 대한 Modbus 레지스터 맵을 정의합니다. 배터리와 미터. sunspec.org에서 이용 가능합니다. 간의 상호 운용성을 보장합니다. 다양한 제조업체의 장치.
- EMQX 문서: EMQX 문서에는 클러스터링, 규칙 엔진, 보안 및 InfluxDB를 포함한 외부 스토리지 시스템과 통합.
- Grafana 경고: 통합 경고 시스템(v9+)에 대한 Grafana 문서에는 다음이 포함됩니다. YAML을 통한 프로비저닝, 접점 구성 및 라우팅에 대한 가이드입니다.
- IIoT 보안: IEC 62443: IEC 62443 표준은 자동화 시스템에 대한 안전 요구 사항을 정의합니다. 에너지 시스템을 포함한 산업 제어. 인증 필수 NIS2(이탈리아에서 입법령 138/2024로 시행).
결론 및 다음 단계
우리는 완벽하고 생산 준비가 완료된 에너지 IoT 플랫폼을 구축했습니다. Grafana, PagerDuty를 통한 다중 채널 경고까지 로컬 버퍼링 기능을 갖춘 Modbus-MQTT 여유. MQTT + Telegraf + InfluxDB + Grafana 스택은 테스트되고 문서화되었으며 확장 가능합니다. 풍부한 마진을 갖춘 10MW 시스템에 대해 초당 650개의 데이터 포인트를 관리합니다. 클러스터링 및 다중 노드의 EMQX InfluxDB는 유틸리티 규모 배포에 선형적으로 확장됩니다. 수백MW.
사례 연구 수치는 99.94% 가용성, MTTR 감소를 말해줍니다. 2.3시간에 8, +3.2 성능 비율 포인트, 45,000 EUR 유지 관리 비용 절감 6개월 후 예측. 실시간 모니터링은 에너지 시스템의 사치품이 아닙니다. 재생 가능 에너지: 수익과 자산의 유효 수명이 직접적으로 증가합니다.
EnergyTech 시리즈의 다음 기사에서는 다음 주제를 다룹니다. 탄소 회계: 배출 저감을 계산하는 ESG 플랫폼을 구축하는 방법, 수집된 생산 데이터를 바탕으로 친환경 인증 및 탄소 배출권 취득 우리가 방금 구축한 IoT 플랫폼에서 말이죠.
관련 기사
- 에너지테크 시리즈: 이 기사는 전용 시리즈의 일부입니다. 재생 에너지를 위한 소프트웨어 공학에. 에 대한 이전 기사 EV 로드 밸런싱과 탄소 회계에 대한 후속 로드 밸런싱이 그림을 완성합니다. 에너지 전환을 위한 소프트웨어 인프라 구축
- MLOps 시리즈: 다운샘플링 및 이상 탐지 구현 Flux를 사용하면 이는 단지 첫 번째 단계일 뿐입니다. MLOps 시리즈는 모델 통합 방법을 설명합니다. ML(생산 예측을 위한 LSTM, 이상 탐지를 위한 Isolation Forest) 동일한 InfluxDB 플랫폼에서 나오는 데이터입니다.
- 데이터 및 AI 비즈니스 시리즈: MQTT-InfluxDB 아키텍처는 IoT 데이터 레이크하우스 예시. 데이터 & AI 비즈니스 시리즈는 어떻게 DBT 및 Airflow를 사용하여 이 시계열 데이터를 엔터프라이즈 분석 파이프라인으로 가져옵니다.
- PostgreSQL AI 시리즈: 보다 복잡한 분석이 필요한 경우 관계형 데이터(예: 계약 데이터, 플랜트 마스터 데이터, 이력)와 결합 유지 관리), 외부 데이터 래퍼를 통한 InfluxDB-PostgreSQL 통합 PostgreSQL AI 시리즈에 등장했습니다.







