MQTT から InfluxDB へ: リアルタイム エネルギー IoT プラットフォーム
世界の産業用エネルギーIoT市場は、 2025年には220億ドル そして、2030 年までに 540 億に向けて 19.8% の CAGR で成長します。各 10 MW の太陽光発電システム 超えて生み出す 1 日あたり 400 万回の測定 インバーター、放射線センサー、 ストリングメーターと気象データロガー。各風力発電所は振動データを追加します。 ナセルからのトルクと温度。各EV充電ステーションは接続状況を公表しており、 電力出力とコネクタの健康状態を 30 秒ごとに表示します。
課題は、このデータを収集することではなく、それを確実に、スケーラブルに、待ち時間を抑えて収集することです。 1 秒未満、何年にもわたって履歴化を維持し、分析クエリを作成する機能 リアルタイム。 MQTT プロトコルは、帯域幅が限られた産業用ネットワーク用に作成され、 エネルギー IIoT の共通言語。 InfluxDB と時系列ストレージ ソリューション このドメインで最も多く採用されています。 Telegraf をブリッジとして、Grafana をレイヤーとして使用して連携 可視化、小規模な家庭用システムからユーティリティまでを管理するスタックを形成 マルチギガワット。
この記事では、アーキテクチャから完全なエネルギー IoT プラットフォームを構築します。 エンドツーエンド、各コンポーネントの詳細な構成を経て、Docker に至るまで 200 台のインバーターを備えた 10 MW 太陽光発電パークの作業構成と実際のケーススタディ。 各セクションには、テストされたコードと運用戦略が含まれています。
この記事で学べること
- エンドツーエンドのアーキテクチャ: センサー → ゲートウェイ → MQTT → Telegraf → InfluxDB → Grafana
- MQTT の詳細: QoS、保持、メッセージ、エネルギーのためのトピック名前空間設計
- ブローカーの比較: スケーラビリティとクラスタリングに関する Mosquitto、EMQX、HiveMQ
- Telegraf: MQTT コンシューマ構成、プロセッサ パイプライン、InfluxDB 出力
- InfluxDB 3.x: バケット設計、保持ポリシー、Flux クエリ、タスク スケジューリング
- Python および pymodbus を使用した Modbus RTU/TCP → MQTT ブリッジ
- 本番スタック用の完全な Docker Compose
- セキュリティ: TLS 相互、MQTT 認証、InfluxDB トークン、ネットワーク セグメンテーション
- ケーススタディ: 200 個のインバータを備えた 10 MW 太陽光発電所、100K メッセージ/秒
- 高度なアラート: Grafana、PagerDuty、Slack の統合
EnergyTech シリーズ - 記事の場所
| # | アイテム | レベル | Stato |
|---|---|---|---|
| 1 | OCPP 2.x プロトコル: EV 充電システムの構築 | 高度な | 発行済み |
| 2 | DERMS アーキテクチャ: 数百万の分散リソースを集約する | 高度な | 発行済み |
| 3 | ML を使用した再生可能エネルギー予測: Python LSTM | 高度な | 発行済み |
| 4 | グリッドスケールストレージ向けバッテリー管理システム | 高度な | 発行済み |
| 5 | ソフトウェアエンジニア向け IEC 61850: スマートグリッド通信 | 高度な | 発行済み |
| 6 | EV 充電負荷分散: リアルタイム アルゴリズム | 高度な | 発行済み |
| 7 | 現在位置 - MQTT から InfluxDB へ: リアルタイム エネルギー IoT プラットフォーム | 高度な | 現在 |
| 8 | 炭素会計ソフトウェア アーキテクチャ: ESG プラットフォーム | 高度な | Prossimo |
| 9 | エネルギーインフラ向けデジタルツイン: リアルタイムシミュレーション | 高度な | 近日公開 |
| 10 | P2P エネルギー取引のためのブロックチェーン: スマート コントラクトと制約 | 高度な | 近日公開 |
エンドツーエンドのアーキテクチャ: センサーからダッシュボードまで
リアルタイム エネルギー IoT プラットフォームのアーキテクチャは 5 つの異なるレベルに分かれています。 それぞれに特定の責任と異なる信頼性要件があります。理解する この分離は、最初の構成行を記述する前に不可欠です。
レベル 1: フィールド (フィールド層)
最下位レベルには、太陽光発電インバータ、風速計、日射計などの物理デバイスがあります。 ストリングメーター、PMU (フェーザー測定ユニット)、スマートメーター、EV 充電ステーション。 これらのデバイスのほとんどは、産業用フィールド プロトコルを通信します。 Modbus RTU RS-485では、 Modbus TCP イーサネット経由で、 IEC 61850 変電所では、 DNP3 ユーティリティグリッドの場合、またはパワーコンディショナ用の SunSpec などの独自のプロトコル。
これらのプロトコルは、最新の IP やクラウドとネイティブ互換性がありません。彼らの投票 一般的なサイクルの範囲は 100 ミリ秒 (PMU) から 60 秒 (従来のデータ ロガー) です。最適な周波数は、 太陽光発電インバーターと電気量 (電圧、電流、電力) の場合は 1 ~ 5 秒 熱量 (モジュール温度、インバーター温度) の場合は 30 ~ 60 秒。
レベル 2: ゲートウェイ / エッジ (エッジ層)
エッジ ゲートウェイとトランスレーター: フィールド プロトコルを読み取り、MQTT 経由でブローカーにパブリッシュします。 中央。組み込みデバイス (Raspberry Pi 4、BeagleBone、Moxa ioThinx) にすることもできます。 統合された MQTT スタックを備えた PLC、または複数のインストール用のローカル エッジ サーバー (NUC、ODROID) 大きい。ゲートウェイは 3 つの重要な機能を実行します。
- プロトコルの翻訳: Modbus → MQTT、IEC 61850 → MQTT、DNP3 → MQTT
- ローカルバッファリング: WAN切断中にデータを蓄積(ストアアンドフォワード)
- エッジの前処理: フィルタリング、集約、ローカル異常検出
レイヤ 3: メッセージ ブローカ (トランスポート層)
MQTT ブローカーはトランスポート システムの中心です。エッジゲートウェイからメッセージを受信します。 加入者 (主に Telegraf ですが、システムなどの他の消費者にも配布します) SCADA またはリアルタイム通知)。ブローカーの選択は規模によって異なります: Mosquitto 単一または小規模インストール (接続数 10,000 未満)、デプロイメントごとに EMQX または HiveMQ エンタープライズおよびマルチサイト。
レベル 4: 取り込みとストレージ (データ層)
Telegraf は MQTT トピックをサブスクライブし、次を使用してメッセージを InfluxDB メトリクスに変換します。 構成可能なプロセッサ パイプライン。 InfluxDB は保持付きの時系列を受信してインデックスを作成します 差別化されたポリシー: 30 日間の高解像度生データ、1 時間ごとに集計 1 年間、5 年間の毎日の集計。 Flux タスクのスケジュール ダウンサンプリング 自動アラートと計算されたアラート。
レベル 5: 視覚化とアラート (プレゼンテーション層)
Grafana はデータソース プラグイン経由で InfluxDB に接続し、リアルタイム ダッシュボードをレンダリングします 5 ~ 30 秒ごとに更新します。 Grafana のアラート システムはクエリを評価します 定期的に、PagerDuty、Slack、電子メール、またはカスタム Webhook に通知を送信します。 しきい値を超えています。
完全なアーキテクチャ図
データ フローは次のパスに従います。 インバータ/センサー (Modbus/SunSpec) → ゲートウェイエッジ (Raspberry Pi/NUC と pymodbus) → MQTT ブローカー (モスキート/EMQX) → 電信 (MQTT コンシューマー + JSON パーサー) → 流入DB (時系列DB) → グラファナ (ダッシュボード + アラート)。並行して、 アラートマネージャー InfluxDB では Flux タスクを毎分評価し、次のように書き込むことができます。 専用バケット上のイベント、または HTTP エンドポイントを呼び出します。
MQTT の詳細: エネルギーのためのプロトコルと設計
MQTT 5.0 の基礎
MQTT (Message Queuing Telemetry Transport) は、設計されたパブリッシュ/サブスクライブ プロトコルです。 帯域幅が限られ、遅延が大きいネットワーク向け。バージョン 5.0 (OASIS 標準 RFC 2019) エンタープライズ アプリケーションに重要な機能を追加します: セッションの有効期限間隔、メッセージ 有効期限間隔、拡張理由コード、メッセージ内のユーザー プロパティ、フロー制御 受信最大値とトピック エイリアスを使用してヘッダーのサイズを削減します。
QoS 0、1、2: エネルギー データの正しい選択
サービス品質は、エネルギー MQTT システムの設計において最も重要な選択です。 3 つのオプションは、スループット、ネットワーク オーバーヘッド、および 配送保証:
| QoS | 保証 | オーバーヘッド | ユースケースのエネルギー |
|---|---|---|---|
| QoS0 | 最大 1 回 (ファイアアンドフォーゲット) | 最小 (1 RTT) | 高周波テレメトリー (1Hz+)、時折損失が許容されるデータ: 放射照度、周囲温度 |
| QoS1 | 少なくとも 1 回 (重複あり) | 中 (2 RTT) | 生産メトリクス (kWh、kW)、アラーム、デバイスの状態 - タイムスタンプを介して InfluxDB によって管理される重複 |
| QoS2 | 正確にオンス | 高 (4 RTT) | 制御コマンド (インバーターの設定値、スイッチの開閉)、財務データ (送電網で販売されたエネルギー) |
一般的な PV システムの実際的な推奨事項は次のとおりです。 テレメトリの QoS 0 気象(日射量、温度、風)、生産および警報用の QoS 1、 QoS 2 は制御コマンドのみ。この選択により、信頼性とスループットのバランスが取れます。 これにより、ブローカーは WAN 帯域幅を飽和させることなく 50 ~ 100,000 メッセージ/秒を管理できるようになります。
メッセージと遺言メッセージを保持する
過小評価されがちですが、エネルギー システムにとって重要な 2 つの MQTT 機能:
-
メッセージを保持する: ブローカーは最後のメッセージをretain=trueで保存します。
トピックごとに。新しい加入者が接続すると、すぐに値を受け取ります
次の出版物を待たずに最新の情報を入手できます。ステータス トピックの基本:
plant/PV001/status保持を使用すると、新しいダッシュボードごとに確実に システムのステータスをすぐに確認できます。 -
遺言メッセージ (遺言と遺言): ゲートウェイはメッセージを構成します
これは、クライアントが異常に切断した場合にブローカーが自動的に公開します。
ゲートウェイの切断を知らせるために使用されます。リモート サイトの Raspberry Pi が落ちた場合、
ブローカーが発行する
plant/PV001/connectivity offlineすぐに、 セッションタイムアウトを待たずに。
トピック エネルギー システムの名前空間設計
トピック名前空間の設計は、あらゆる側面に影響を与えるアーキテクチャ上の決定です。 システム: ルーティング、フィルタリング、ACL セキュリティ、InfluxDB の構成。 適切に設計された階層構造は、次のパターンに従います。
# Schema topic namespace per impianto energetico
# Struttura: {tipo_impianto}/{plant_id}/{subsystem}/{device_id}/{metric_type}
# === FOTOVOLTAICO ===
# Inverter string-level metrics
plant/PV001/inverter/INV001/metrics
plant/PV001/inverter/INV001/alarms
plant/PV001/inverter/INV001/status
# String-level measurements
plant/PV001/string/STR001/metrics
plant/PV001/string/STR001/metrics/voltage
plant/PV001/string/STR001/metrics/current
# Meteorological sensors
plant/PV001/weather/WS001/metrics
# Plant-level aggregates (calcolati dal gateway)
plant/PV001/totals/power_ac
plant/PV001/totals/energy_today
# === EOLICO ===
wind/WF001/turbine/T001/nacelle/metrics
wind/WF001/turbine/T001/blade/metrics
wind/WF001/turbine/T001/gearbox/temperature
wind/WF001/substation/SS001/metrics
# === STAZIONI DI RICARICA EV ===
ev/SITE001/charger/CHRG001/connector/1/metrics
ev/SITE001/charger/CHRG001/session/current
ev/SITE001/charger/CHRG001/status
ev/SITE001/totals/energy_dispensed
# === SISTEMA ===
# Heartbeat e connectivity
plant/PV001/gateway/GW001/heartbeat # ogni 60s
plant/PV001/gateway/GW001/status # retain=true
plant/PV001/gateway/GW001/version # retain=true
# === WILDCARD SUBSCRIPTIONS ===
# Telegraf sottoscrive tutti i metrics:
plant/+/inverter/+/metrics # tutti gli inverter di tutti gli impianti
plant/PV001/+/+/metrics # tutti i metrics dell'impianto PV001
plant/# # tutto il plant PV001 (usare con cautela)
名前空間の基本的なルール: 各レベルには意味的な意味がなければなりません。 制御トピック (コマンド) はテレメトリ トピック (データ) から分離する必要があります。 一貫性を保つためにsnake_caseを使用し、特殊文字とスペースを避け、深さを制限します さまざまなブローカーやクライアントとの互換性を実現するために、最大 6 レベルでサポートされています。
MQTT ブローカーの比較: Mosquitto、EMQX、HiveMQ
MQTT ブローカーの選択は、システムのスケーラビリティと信頼性の基礎となります。 エネルギー分野で最もよく利用されている 3 つのブローカーは、非常に異なる特性を持っています。
| 特徴 | モスキート 2.x | EMQX 5.x | HiveMQ 4.x |
|---|---|---|---|
| 最大。接続 | 100K (単一ノード) | 100M (23 ノードクラスター) | 200M (クラスター) |
| クラスタリング | ネイティブではありません | はい (Erlang 分散) | はい(エンタープライズ) |
| MQTTのバージョン | 3.1、3.1.1、5.0 | 3.1、3.1.1、5.0 | 3.1、3.1.1、5.0 |
| ルールエンジン | No | はい (SQL ベース) | はい(エンタープライズ) |
| Bridge | Si | Si | Si |
| 認証 | ファイル、TLS、プラグイン | JWT、OAuth2、LDAP、mTLS | JWT、OAuth2、エンタープライズ |
| ライセンス | EPL/EDL (オープンソース) | Apache 2.0 / エンタープライズ | コミュニティ / 企業 |
| RAM (100K 接続) | ~500MB | ~2GB | ~3GB |
| ユースケースのエネルギー | 単一のプラント、エッジゲートウェイ、ラボ | マルチサイト、ユーティリティ規模、エンタープライズ | 公共事業規模のエンタープライズコンプライアンス |
実稼働用の Mosquitto 構成
# /etc/mosquitto/mosquitto.conf - Produzione con TLS e ACL
# Network listeners
listener 1883 localhost
listener 8883
protocol mqtt
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
require_certificate true # mutual TLS
use_identity_as_username true
# WebSocket listener per dashboard web
listener 9001
protocol websockets
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
# Autenticazione
allow_anonymous false
password_file /etc/mosquitto/passwd
acl_file /etc/mosquitto/acl
# Persistenza
persistence true
persistence_location /var/lib/mosquitto/
autosave_interval 60
# Logging
log_dest file /var/log/mosquitto/mosquitto.log
log_type error
log_type warning
log_type notice
log_type information
log_timestamp true
log_timestamp_format %Y-%m-%dT%H:%M:%S
# Performance tuning
max_queued_messages 10000
max_packet_size 1048576 # 1MB
message_size_limit 262144 # 256KB per messaggio
max_connections 10000
max_keepalive 300
# /etc/mosquitto/acl - Access Control List per ruoli
# Telegraf: legge tutto
user telegraf_reader
topic read plant/#
topic read wind/#
topic read ev/#
# Gateway PV001: scrive solo per il suo impianto
user gateway_PV001
topic write plant/PV001/#
topic read plant/PV001/commands/#
# Gateway EV SITE001: scrive solo per il suo sito
user gateway_SITE001
topic write ev/SITE001/#
# Dashboard/SCADA: solo lettura
user dashboard_user
topic read plant/#
topic read wind/#
topic read ev/#
# Admin: tutto (solo per maintenance)
user mqtt_admin
topic #
ゲートウェイ エッジ: Python を使用したブリッジ Modbus → MQTT
エッジ ゲートウェイは、産業フィールド プロトコルの世界を接続するコンポーネントです MQTT エコシステムを使用します。これは、太陽光発電インバーターの完全な実装です。 Modbus TCP プロトコル (WAN 切断を処理するためのローカル バッファリングを含む)。
#!/usr/bin/env python3
"""
gateway_modbus_mqtt.py
Gateway edge per inverter fotovoltaici Modbus TCP → MQTT
Compatibile con registro SunSpec (standard de facto per inverter PV)
"""
import asyncio
import json
import logging
import time
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Optional
import aiofiles
from pymodbus.client import AsyncModbusTcpClient
from pymodbus.exceptions import ModbusException
import aiomqtt
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('gateway')
@dataclass
class InverterMetrics:
"""Metriche SunSpec inverter (Model 103 - Three Phase Inverter)"""
timestamp: str
plant_id: str
inverter_id: str
# AC output
ac_power_w: float # Potenza AC totale (W)
ac_voltage_l1_v: float # Tensione fase L1 (V)
ac_voltage_l2_v: float # Tensione fase L2 (V)
ac_voltage_l3_v: float # Tensione fase L3 (V)
ac_current_a: float # Corrente totale AC (A)
ac_frequency_hz: float # Frequenza rete (Hz)
# DC input
dc_voltage_v: float # Tensione DC ingresso (V)
dc_current_a: float # Corrente DC ingresso (A)
dc_power_w: float # Potenza DC ingresso (W)
# Energy
energy_wh_total: float # Energia prodotta totale (Wh)
energy_wh_today: float # Energia prodotta oggi (Wh)
# Thermal
temp_cabinet_c: float # Temperatura cabinet inverter (C)
temp_heatsink_c: float # Temperatura heatsink (C)
# Status
operating_state: int # 1=Off, 2=Sleeping, 3=Starting, 4=MPPT, 5=Throttled
error_code: int # Codice errore (0=no error)
# Efficiency
efficiency_pct: float # Efficienza conversione (%)
class ModbusMqttGateway:
"""
Gateway che legge metriche Modbus da inverter SunSpec
e le pubblica su MQTT con buffering locale per disconnessioni WAN
"""
def __init__(self, config: dict):
self.plant_id = config['plant_id']
self.inverters = config['inverters'] # lista di {id, host, port, unit_id}
self.mqtt_host = config['mqtt']['host']
self.mqtt_port = config['mqtt']['port']
self.mqtt_user = config['mqtt']['username']
self.mqtt_password = config['mqtt']['password']
self.poll_interval = config.get('poll_interval_sec', 5)
self.buffer_file = config.get('buffer_file', '/var/lib/gateway/buffer.jsonl')
self.clients: dict[str, AsyncModbusTcpClient] = {}
self._buffer: list[dict] = []
self._mqtt_connected = False
async def connect_inverter(self, inverter_cfg: dict) -> AsyncModbusTcpClient:
"""Crea e connette client Modbus TCP"""
client = AsyncModbusTcpClient(
host=inverter_cfg['host'],
port=inverter_cfg.get('port', 502),
timeout=5,
retries=3,
retry_on_empty=True,
)
await client.connect()
if not client.connected:
raise ConnectionError(
f"Cannot connect to inverter {inverter_cfg['id']} "
f"at {inverter_cfg['host']}:{inverter_cfg.get('port', 502)}"
)
logger.info(f"Connected to inverter {inverter_cfg['id']}")
return client
async def read_sunspec_model103(
self,
client: AsyncModbusTcpClient,
unit_id: int,
plant_id: str,
inverter_id: str
) -> Optional[InverterMetrics]:
"""
Legge registri SunSpec Model 103 (Three Phase Inverter)
SunSpec base address: 40001 (Modbus address 40000)
Model 103 start: 40070 (tipico, dipende da inverter)
"""
try:
# Leggi blocco principale Model 103: 40069 - 40094 (26 registri)
result = await client.read_holding_registers(
address=40069,
count=26,
slave=unit_id
)
if result.isError():
logger.error(f"Modbus error reading INV {inverter_id}: {result}")
return None
regs = result.registers
# Scala con SF (scale factors) SunSpec
ac_current_sf = self._sf(regs[10])
ac_voltage_sf = self._sf(regs[14])
ac_power_sf = self._sf(regs[17])
ac_freq_sf = self._sf(regs[19])
dc_current_sf = self._sf(regs[21])
dc_voltage_sf = self._sf(regs[23])
dc_power_sf = self._sf(regs[25])
# Leggi energia prodotta (registri 40093-40096)
energy_result = await client.read_holding_registers(
address=40093, count=4, slave=unit_id
)
energy_regs = energy_result.registers
energy_total_wh = ((energy_regs[0] << 16) | energy_regs[1]) * 1.0
# Leggi temperatura (registri 40103-40108)
temp_result = await client.read_holding_registers(
address=40103, count=3, slave=unit_id
)
temp_regs = temp_result.registers
ac_power_raw = self._signed(regs[16])
dc_power_w = self._signed(regs[24]) * (10 ** dc_power_sf)
efficiency = (ac_power_raw / dc_power_w * 100) if dc_power_w > 0 else 0.0
return InverterMetrics(
timestamp=datetime.now(timezone.utc).isoformat(),
plant_id=plant_id,
inverter_id=inverter_id,
ac_power_w=ac_power_raw * (10 ** ac_power_sf),
ac_voltage_l1_v=regs[11] * (10 ** ac_voltage_sf),
ac_voltage_l2_v=regs[12] * (10 ** ac_voltage_sf),
ac_voltage_l3_v=regs[13] * (10 ** ac_voltage_sf),
ac_current_a=(regs[7] + regs[8] + regs[9]) * (10 ** ac_current_sf),
ac_frequency_hz=regs[18] * (10 ** ac_freq_sf),
dc_voltage_v=self._signed(regs[22]) * (10 ** dc_voltage_sf),
dc_current_a=self._signed(regs[20]) * (10 ** dc_current_sf),
dc_power_w=dc_power_w,
energy_wh_total=energy_total_wh,
energy_wh_today=0.0, # da registro specifico vendor
temp_cabinet_c=self._signed(temp_regs[0]) * 0.01,
temp_heatsink_c=self._signed(temp_regs[1]) * 0.01,
operating_state=regs[0],
error_code=regs[1],
efficiency_pct=round(efficiency, 2),
)
except ModbusException as e:
logger.error(f"Modbus exception for {inverter_id}: {e}")
return None
def _signed(self, val: int) -> int:
"""Converte uint16 in int16 (signed)"""
return val if val < 32768 else val - 65536
def _sf(self, val: int) -> int:
"""Scale factor SunSpec: int16 nel range -10..10"""
s = self._signed(val)
return s if -10 <= s <= 10 else 0
async def publish_metrics(
self,
mqtt_client: aiomqtt.Client,
metrics: InverterMetrics
) -> None:
"""Pubblica metriche su MQTT con QoS 1"""
topic = f"plant/{metrics.plant_id}/inverter/{metrics.inverter_id}/metrics"
payload = json.dumps(asdict(metrics), default=str)
await mqtt_client.publish(
topic=topic,
payload=payload.encode(),
qos=1,
retain=False,
)
# Pubblica anche stato con retain
status = {
"timestamp": metrics.timestamp,
"operating_state": metrics.operating_state,
"error_code": metrics.error_code,
"online": True,
}
await mqtt_client.publish(
topic=f"plant/{metrics.plant_id}/inverter/{metrics.inverter_id}/status",
payload=json.dumps(status).encode(),
qos=1,
retain=True, # retain per status
)
async def run(self):
"""Loop principale del gateway"""
# Carica buffer da disco se esiste
await self._load_buffer()
# Will message: pubblica offline status se il gateway crolla
will = aiomqtt.Will(
topic=f"plant/{self.plant_id}/gateway/GW001/status",
payload=json.dumps({"online": False, "reason": "unexpected_disconnect"}).encode(),
qos=1,
retain=True,
)
async with aiomqtt.Client(
hostname=self.mqtt_host,
port=self.mqtt_port,
username=self.mqtt_user,
password=self.mqtt_password,
will=will,
keepalive=60,
tls_params=aiomqtt.TLSParameters(
ca_certs='/etc/gateway/certs/ca.crt',
certfile='/etc/gateway/certs/gateway.crt',
keyfile='/etc/gateway/certs/gateway.key',
),
) as mqtt:
self._mqtt_connected = True
logger.info("MQTT connected")
# Pubblica online status con retain
await mqtt.publish(
topic=f"plant/{self.plant_id}/gateway/GW001/status",
payload=json.dumps({"online": True}).encode(),
qos=1, retain=True,
)
# Svuota buffer offline
await self._flush_buffer(mqtt)
# Connetti a tutti gli inverter
for inv_cfg in self.inverters:
try:
self.clients[inv_cfg['id']] = await self.connect_inverter(inv_cfg)
except ConnectionError as e:
logger.error(f"Failed to connect: {e}")
# Loop principale di polling
while True:
poll_start = time.monotonic()
tasks = [
self._poll_and_publish(inv_cfg, mqtt)
for inv_cfg in self.inverters
]
await asyncio.gather(*tasks, return_exceptions=True)
elapsed = time.monotonic() - poll_start
sleep_time = max(0, self.poll_interval - elapsed)
await asyncio.sleep(sleep_time)
async def _poll_and_publish(self, inv_cfg: dict, mqtt: aiomqtt.Client):
"""Polling singolo inverter e pubblicazione"""
inv_id = inv_cfg['id']
client = self.clients.get(inv_id)
if client is None or not client.connected:
try:
client = await self.connect_inverter(inv_cfg)
self.clients[inv_id] = client
except ConnectionError:
return
metrics = await self.read_sunspec_model103(
client, inv_cfg.get('unit_id', 1),
self.plant_id, inv_id
)
if metrics:
try:
await self.publish_metrics(mqtt, metrics)
except Exception as e:
logger.warning(f"MQTT publish failed, buffering: {e}")
self._buffer.append(asdict(metrics))
await self._save_buffer()
async def _load_buffer(self):
"""Carica buffer dal disco"""
try:
async with aiofiles.open(self.buffer_file) as f:
async for line in f:
self._buffer.append(json.loads(line))
logger.info(f"Loaded {len(self._buffer)} buffered messages")
except FileNotFoundError:
pass
async def _save_buffer(self):
"""Salva buffer su disco"""
async with aiofiles.open(self.buffer_file, 'w') as f:
for item in self._buffer:
await f.write(json.dumps(item) + '\n')
async def _flush_buffer(self, mqtt: aiomqtt.Client):
"""Pubblica messaggi bufferizzati"""
if not self._buffer:
return
logger.info(f"Flushing {len(self._buffer)} buffered messages")
published = []
for item in self._buffer:
try:
metrics = InverterMetrics(**item)
await self.publish_metrics(mqtt, metrics)
published.append(item)
except Exception as e:
logger.error(f"Failed to flush buffered message: {e}")
break
self._buffer = [i for i in self._buffer if i not in published]
await self._save_buffer()
if __name__ == '__main__':
config = {
'plant_id': 'PV001',
'poll_interval_sec': 5,
'mqtt': {
'host': '10.0.1.10',
'port': 8883,
'username': 'gateway_PV001',
'password': 'secret',
},
'inverters': [
{'id': 'INV001', 'host': '192.168.1.101', 'port': 502, 'unit_id': 1},
{'id': 'INV002', 'host': '192.168.1.102', 'port': 502, 'unit_id': 1},
# ... fino a INV200 per impianto da 10 MW
],
'buffer_file': '/var/lib/gateway/buffer.jsonl',
}
asyncio.run(ModbusMqttGateway(config).run())
Telegraf: MQTT ブリッジ → 変換パイプラインを使用した InfluxDB
Telegraf は、Go で書かれた InfluxData のデータ コレクターであり、300 を超えるプラグインが含まれています。 私たちのスタックでは、Telegraf は MQTT トピックをサブスクライブし、JSON ペイロードをデコードし、 プロセッサ経由で変換を適用し、InfluxDB に書き込みます。バージョン1.30以降 MQTT 5.0 とタグとしてのトピック解析をネイティブにサポートします。
Telegraf 構成を完了する
# telegraf.conf - Configurazione completa per piattaforma IoT energetica
# ============================================================
# AGENT SETTINGS
# ============================================================
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 10000
metric_buffer_limit = 100000
collection_jitter = "1s"
flush_interval = "10s"
flush_jitter = "1s"
precision = "1ns"
debug = false
quiet = false
logtarget = "file"
logfile = "/var/log/telegraf/telegraf.log"
logfile_rotation_interval = "24h"
logfile_rotation_max_size = "50MB"
logfile_rotation_max_archives = 5
# ============================================================
# OUTPUT: InfluxDB 2.x / 3.x
# ============================================================
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "plant_metrics_raw"
timeout = "5s"
# Tag per routing - usato per bucket separati per plant
[outputs.influxdb_v2.tagpass]
data_type = ["inverter_metrics", "weather_metrics", "string_metrics"]
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "ev_metrics_raw"
timeout = "5s"
[outputs.influxdb_v2.tagpass]
data_type = ["ev_metrics"]
# Output di backup su file locale (per audit e recovery)
[[outputs.file]]
files = ["/var/log/telegraf/metrics.jsonl"]
rotation_interval = "1h"
rotation_max_size = "500MB"
data_format = "json"
[outputs.file.tagpass]
error_code = ["*"] # solo metriche con errori
# ============================================================
# INPUT: MQTT Consumer - Inverter Fotovoltaici
# ============================================================
[[inputs.mqtt_consumer]]
name_override = "pv_inverter"
servers = ["ssl://mosquitto:8883"]
topics = ["plant/+/inverter/+/metrics"]
qos = 1
connection_timeout = "30s"
max_undelivered_messages = 5000
# Autenticazione mTLS
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-energy-01"
persistent_session = true # QoS 1+ richiede sessione persistente
# Parsing topic come tag
# topic: plant/PV001/inverter/INV001/metrics
[[inputs.mqtt_consumer.topic_parsing]]
topic = "plant/+/inverter/+/metrics"
measurement = "_/plant_id/_/inverter_id/_"
tags = "_/plant_id/_/inverter_id/_"
# Estrae plant_id=PV001, inverter_id=INV001 come tag
# Formato payload
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_power_w"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l1_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l2_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l3_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_frequency_hz"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_voltage_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_power_w"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "energy_wh_total"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temp_cabinet_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temp_heatsink_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "operating_state"
type = "int"
[[inputs.mqtt_consumer.json_v2.field]]
path = "error_code"
type = "int"
[[inputs.mqtt_consumer.json_v2.field]]
path = "efficiency_pct"
type = "float"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "plant_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "inverter_id"
[[inputs.mqtt_consumer.json_v2.timestamp]]
path = "timestamp"
format = "2006-01-02T15:04:05.999999999Z07:00" # RFC3339Nano
# INPUT: MQTT Consumer - Stazioni Meteo
[[inputs.mqtt_consumer]]
name_override = "pv_weather"
servers = ["ssl://mosquitto:8883"]
topics = ["plant/+/weather/+/metrics"]
qos = 0 # QoS 0 per dati meteo (perdita occasionale accettabile)
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-weather-01"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "plant/+/weather/+/metrics"
tags = "_/plant_id/_/station_id/_"
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "irradiance_wm2"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temperature_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "wind_speed_ms"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "humidity_pct"
type = "float"
[[inputs.mqtt_consumer.json_v2.timestamp]]
path = "timestamp"
format = "2006-01-02T15:04:05Z07:00"
# INPUT: MQTT Consumer - Stazioni EV
[[inputs.mqtt_consumer]]
name_override = "ev_charger"
servers = ["ssl://mosquitto:8883"]
topics = ["ev/+/charger/+/connector/+/metrics"]
qos = 1
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-ev-01"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "ev/+/charger/+/connector/+/metrics"
tags = "_/site_id/_/charger_id/_/connector_id/_"
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "power_kw"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "energy_kwh"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "voltage_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "status"
type = "string"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "site_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "charger_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "connector_id"
# ============================================================
# PROCESSORS: Trasformazione dati
# ============================================================
# Aggiungi tag data_type per routing output
[[processors.converter]]
namepass = ["pv_inverter"]
[processors.converter.tags]
string = ["data_type"]
[[processors.override]]
namepass = ["pv_inverter"]
[processors.override.tags]
data_type = "inverter_metrics"
[[processors.override]]
namepass = ["ev_charger"]
[processors.override.tags]
data_type = "ev_metrics"
# Filtra metriche con valori anomali (sanity check)
[[processors.dedup]]
dedup_interval = "1s" # evita duplicati QoS 1
[[processors.enum]]
namepass = ["pv_inverter"]
[[processors.enum.mapping]]
field = "operating_state"
dest = "operating_state_str"
[processors.enum.mapping.value_mappings]
1 = "off"
2 = "sleeping"
3 = "starting"
4 = "mppt"
5 = "throttled"
6 = "shutting_down"
7 = "fault"
8 = "standby"
# ============================================================
# MONITORING: Telegraf self-monitoring
# ============================================================
[[inputs.internal]]
collect_memstats = true
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "telegraf_monitoring"
[outputs.influxdb_v2.namepass]
internal = ["*"]
InfluxDB: 時系列ストレージとエネルギーのクエリ
エネルギー システム向け InfluxDB 2.x / 3.x アーキテクチャ
InfluxDB は、エネルギー IoT 用の参照時系列データベースです。バージョン 2.x (2025 年現在でも運用環境で広く使用されています) クエリ言語として Flux を使用し、 の概念 バケツ 統合された保持ポリシーを使用します。バージョン 3.x、 Apache Arrow と DataFusion を使用して Rust で書き直され、主言語として SQL が導入されました また、高いカーディナリティに対するスケーラビリティが大幅に向上します。
5 秒ごとにサンプリングする 200 台のインバータを備えた 10 MW システムの場合、データ量は 各インバーターは約 15 フィールドを生成し、5 回ごとに合計 3,000 フィールドを生成します。 秒、または 1 分あたり 216,000 データ ポイント。 1日で約3億1100万 データポイント。 30 日間の保存には、約 93 億の生データ ポイントが必要です。 ダウンサンプリング戦略は不可欠です。
初期セットアップ: バケットと構成
#!/bin/bash
# setup_influxdb.sh - Configurazione iniziale InfluxDB per piattaforma energetica
INFLUX_HOST="http://localhost:8086"
INFLUX_TOKEN="my-super-secret-admin-token"
INFLUX_ORG="energy-corp"
# Setup iniziale (solo prima volta)
influx setup \
--host "$INFLUX_HOST" \
--username admin \
--password "SecurePassword123!" \
--org "$INFLUX_ORG" \
--bucket plant_metrics_raw \
--retention 720h \ # 30 giorni dati grezzi
--token "$INFLUX_TOKEN" \
--force
# Bucket dati grezzi PV (alta risoluzione: 30 giorni)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_raw" \
--retention "720h" \
--shard-group-duration "24h" # shard giornalieri per query efficienti
# Bucket downsampled 1 ora (retention 1 anno)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_1h" \
--retention "8760h" \
--shard-group-duration "168h" # shard settimanali
# Bucket downsampled 1 giorno (retention 5 anni)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_1d" \
--retention "43800h" \ # ~5 anni
--shard-group-duration "720h" # shard mensili
# Bucket per alerting e eventi
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "energy_alerts" \
--retention "8760h" # 1 anno per audit
# Bucket per stazioni EV
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "ev_metrics_raw" \
--retention "720h"
# Token con permessi limitati per Telegraf (write only)
influx auth create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--description "Telegraf write-only token" \
--write-bucket plant_metrics_raw \
--write-bucket ev_metrics_raw \
--write-bucket telegraf_monitoring
# Token per Grafana (read only)
influx auth create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--description "Grafana read-only token" \
--read-bucket plant_metrics_raw \
--read-bucket plant_metrics_1h \
--read-bucket plant_metrics_1d \
--read-bucket ev_metrics_raw \
--read-bucket energy_alerts
echo "InfluxDB setup completato"
Flux タスク: ダウンサンプリングと自動アラート
Flux タスクは、Telegraf が InfluxDB 内で定期的に実行するスケジュールされたジョブです。 生データの自動ダウンサンプリング、KPI 計算の 3 つの目的に使用します。 集計、およびバケットへの書き込みによる異常検出アラート。
// task_downsample_1h.flux
// Task: Downsampling dati inverter da 5s a 1h
// Schedulato ogni ora, processa ultima ora
option task = {
name: "downsample_pv_inverter_1h",
every: 1h,
offset: 5m, // aspetta 5 min per assicurarsi che tutti i dati siano arrivati
}
from(bucket: "plant_metrics_raw")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) =>
r._field == "ac_power_w" or
r._field == "dc_power_w" or
r._field == "energy_wh_total" or
r._field == "ac_voltage_l1_v" or
r._field == "temp_cabinet_c" or
r._field == "efficiency_pct"
)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column) => tables
|> mean(column: column), // media oraria
createEmpty: false
)
// Aggiungi statistiche aggiuntive per power
|> map(fn: (r) => ({r with _measurement: "pv_inverter_1h"}))
|> to(bucket: "plant_metrics_1h", org: "energy-corp")
// task_plant_kpi.flux
// Task: Calcolo KPI giornalieri per impianto
// Produccion, efficienza, performance ratio
option task = {
name: "calculate_plant_daily_kpi",
every: 15m,
}
// Potenza attiva totale per impianto (somma tutti inverter)
totalPower = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "ac_power_w")
|> aggregateWindow(every: 15m, fn: sum, createEmpty: false)
|> group(columns: ["plant_id", "_time"])
|> sum()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "total_ac_power_w",
}))
// Energia prodotta totale (max energy_wh_total - min energy_wh_total per periodo)
energyDelta = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "energy_wh_total")
|> difference()
|> group(columns: ["plant_id", "_time"])
|> sum()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "energy_delta_wh",
}))
// Temperatura media inverter
avgTemp = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> aggregateWindow(every: 15m, fn: mean, createEmpty: false)
|> group(columns: ["plant_id", "_time"])
|> mean()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "avg_inverter_temp_c",
}))
union(tables: [totalPower, energyDelta, avgTemp])
|> to(bucket: "plant_metrics_1h", org: "energy-corp")
// task_thermal_alert.flux
// Task: Rilevamento surriscaldamento inverter
// Alert se temperatura > 75°C per più di 5 minuti
option task = {
name: "thermal_alert_inverter",
every: 1m,
}
threshold = 75.0
from(bucket: "plant_metrics_raw")
|> range(start: -6m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> filter(fn: (r) => r._value > threshold)
// Conta quanti valori sopra soglia negli ultimi 6 min (= almeno 5 min)
|> aggregateWindow(every: 6m, fn: count, createEmpty: false)
|> filter(fn: (r) => r._value >= 60) // 60 campioni * 5sec = 5 min
|> map(fn: (r) => ({r with
_measurement: "energy_alert",
_field: "alert_type",
_value: "thermal_overtemperature",
severity: "high",
description: "Inverter temperature exceeds " + string(v: threshold) + "°C for > 5 minutes",
}))
|> to(bucket: "energy_alerts", org: "energy-corp")
太陽光発電分析用のクエリフラックス
// query_production_analysis.flux
// Analisi produzione giornaliera per impianto PV001
// Confronto con irraggiamento (Performance Ratio)
import "math"
// Produzione AC ogni 5 minuti
production = from(bucket: "plant_metrics_raw")
|> range(start: today(), stop: now())
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "ac_power_w")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: true)
|> group(columns: ["plant_id"])
|> sum() // somma tutti gli inverter per potenza totale impianto
// Irraggiamento dalla stazione meteo
irradiance = from(bucket: "plant_metrics_raw")
|> range(start: today(), stop: now())
|> filter(fn: (r) => r._measurement == "pv_weather")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "irradiance_wm2")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: true)
// Join per calcolo Performance Ratio
// PR = (energia prodotta / (irraggiamento * superficie)) * 100
join(
tables: {production: production, irradiance: irradiance},
on: ["_time", "plant_id"]
)
// PV001: 10 MW peak, superficie pannelli ~50.000 m2
|> map(fn: (r) => ({r with
performance_ratio: if r.irradiance_irradiance_wm2 > 50.0 then
(r.production__value / (r.irradiance_irradiance_wm2 * 50000.0)) * 100.0
else
0.0,
}))
|> keep(columns: ["_time", "production__value", "irradiance_irradiance_wm2", "performance_ratio"])
Docker Compose: 完全な運用スタック
次の Docker Compose は、実稼働デプロイメントの開始点です。 フルスタック。各サービスにはヘルスチェック、再起動ポリシー、およびボリューム構成があります。 セキュリティのための永続的かつ分離されたネットワーク。
# docker-compose.yml
# Stack IoT Energetico: Mosquitto + Telegraf + InfluxDB + Grafana
# Produzione-ready con TLS, health checks e persistent volumes
version: '3.9'
networks:
iot_network:
driver: bridge
ipam:
config:
- subnet: 172.20.0.0/24
# Network separata per accesso esterno (solo Grafana e Mosquitto esposti)
external_network:
driver: bridge
volumes:
mosquitto_data:
mosquitto_logs:
influxdb_data:
influxdb_config:
grafana_data:
grafana_provisioning:
telegraf_buffer:
services:
# ============================================================
# MOSQUITTO MQTT BROKER
# ============================================================
mosquitto:
image: eclipse-mosquitto:2.0.20
container_name: mosquitto
restart: unless-stopped
networks:
- iot_network
- external_network
ports:
- "1883:1883" # MQTT plain (solo per sviluppo/testing locale)
- "8883:8883" # MQTT over TLS
- "9001:9001" # WebSocket over TLS
volumes:
- ./config/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro
- ./config/mosquitto/passwd:/mosquitto/config/passwd:ro
- ./config/mosquitto/acl:/mosquitto/config/acl:ro
- ./certs:/etc/mosquitto/certs:ro
- mosquitto_data:/mosquitto/data
- mosquitto_logs:/mosquitto/log
healthcheck:
test: ["CMD", "mosquitto_pub", "-h", "localhost", "-p", "1883",
"-t", "health", "-m", "ping", "--quiet"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
deploy:
resources:
limits:
memory: 512m
cpus: '1.0'
# ============================================================
# INFLUXDB 2.7 (stable production)
# ============================================================
influxdb:
image: influxdb:2.7-alpine
container_name: influxdb
restart: unless-stopped
networks:
- iot_network
ports:
- "8086:8086" # solo su rete interna, proxy via Nginx se necessario
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: {{ "$INFLUXDB_ADMIN_PASSWORD" }}
DOCKER_INFLUXDB_INIT_ORG: energy-corp
DOCKER_INFLUXDB_INIT_BUCKET: plant_metrics_raw
DOCKER_INFLUXDB_INIT_RETENTION: 720h
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: {{ "$INFLUXDB_TOKEN" }}
# Performance tuning
INFLUXD_STORAGE_CACHE_MAX_MEMORY_SIZE: 1073741824 # 1GB cache
INFLUXD_STORAGE_COMPACT_THROUGHPUT_BURST: 50331648 # 48MB/s compaction
INFLUXD_HTTP_MAX_BODY_SIZE: 104857600 # 100MB max body
INFLUXD_QUERY_MEMORY_BYTES: 2147483648 # 2GB query memory limit
volumes:
- influxdb_data:/var/lib/influxdb2
- influxdb_config:/etc/influxdb2
healthcheck:
test: ["CMD", "influx", "ping"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
deploy:
resources:
limits:
memory: 4g
cpus: '2.0'
# ============================================================
# TELEGRAF
# ============================================================
telegraf:
image: telegraf:1.34-alpine
container_name: telegraf
restart: unless-stopped
networks:
- iot_network
- external_network # per raggiungere MQTT su rete esterna
depends_on:
influxdb:
condition: service_healthy
mosquitto:
condition: service_healthy
environment:
INFLUXDB_TOKEN: {{ "$INFLUXDB_TOKEN" }}
MQTT_PASSWORD: {{ "$MQTT_PASSWORD" }}
HOSTNAME: telegraf-energy-01
volumes:
- ./config/telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro
- ./certs:/etc/telegraf/certs:ro
- telegraf_buffer:/var/lib/telegraf
- /var/log/telegraf:/var/log/telegraf
healthcheck:
test: ["CMD", "telegraf", "--test", "--config", "/etc/telegraf/telegraf.conf"]
interval: 60s
timeout: 30s
retries: 3
deploy:
resources:
limits:
memory: 1g
cpus: '1.0'
# ============================================================
# GRAFANA
# ============================================================
grafana:
image: grafana/grafana-oss:11.4.0
container_name: grafana
restart: unless-stopped
networks:
- iot_network
- external_network
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: {{ "$GRAFANA_ADMIN_PASSWORD" }}
GF_SECURITY_SECRET_KEY: {{ "$GRAFANA_SECRET_KEY" }}
GF_SERVER_ROOT_URL: "https://grafana.energy-corp.it"
GF_SERVER_DOMAIN: "grafana.energy-corp.it"
# Analytics disabilitate per privacy
GF_ANALYTICS_REPORTING_ENABLED: "false"
GF_ANALYTICS_CHECK_FOR_UPDATES: "false"
# SMTP per alerting
GF_SMTP_ENABLED: "true"
GF_SMTP_HOST: {{ "$SMTP_HOST" }}
GF_SMTP_USER: {{ "$SMTP_USER" }}
GF_SMTP_PASSWORD: {{ "$SMTP_PASSWORD" }}
GF_SMTP_FROM_ADDRESS: "alerts@energy-corp.it"
# InfluxDB datasource
GF_DATASOURCES_DEFAULT: InfluxDB
INFLUXDB_TOKEN_GRAFANA: {{ "$INFLUXDB_TOKEN_GRAFANA" }}
volumes:
- grafana_data:/var/lib/grafana
- ./config/grafana/provisioning:/etc/grafana/provisioning:ro
- ./config/grafana/dashboards:/var/lib/grafana/dashboards:ro
depends_on:
influxdb:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:3000/api/health || exit 1"]
interval: 30s
timeout: 10s
retries: 3
deploy:
resources:
limits:
memory: 512m
cpus: '0.5'
# ============================================================
# NGINX REVERSE PROXY (opzionale ma raccomandato)
# ============================================================
nginx:
image: nginx:1.27-alpine
container_name: nginx
restart: unless-stopped
networks:
- external_network
ports:
- "80:80"
- "443:443"
volumes:
- ./config/nginx/nginx.conf:/etc/nginx/nginx.conf:ro
- ./certs:/etc/nginx/certs:ro
- /var/log/nginx:/var/log/nginx
depends_on:
- grafana
- mosquitto
healthcheck:
test: ["CMD", "nginx", "-t"]
interval: 30s
timeout: 10s
retries: 3
Grafana の自動プロビジョニング
Grafana は、YAML ファイルを介したデータソースとダッシュボードの自動プロビジョニングをサポートしています。
これは、コンテナが再作成されるコンテナ化された環境にとって非常に重要です。
ファイル config/grafana/provisioning/datasources/influxdb.yml を含める必要があります
InfluxDB データソースの構成、JSON 形式のダッシュボードは
config/grafana/dashboards/.
# config/grafana/provisioning/datasources/influxdb.yml
apiVersion: 1
datasources:
- name: InfluxDB-Raw
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: plant_metrics_raw
tlsSkipVerify: false
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
- name: InfluxDB-1h
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: plant_metrics_1h
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
- name: InfluxDB-Alerts
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: energy_alerts
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
Grafana: 太陽光発電システムのダッシュボードとアラート
太陽光発電システムのダッシュボード構造
太陽光発電システムの効果的なダッシュボードは、運用に関する質問に答える必要があります 3 秒以内に視覚的に読み取れます。推奨される構造は次の行にまとめられています。
- 行 1 - 日次 KPI: 今日の生産量 (kWh) を示す 4 つの「統計」パネル、 現在の電力 (kW)、性能比 (%)、障害のあるインバーターの数。 30代更新です。
- 行 2 - 一時的な生産: AC 電力の合計を示す面グラフ 過去 24 時間の照度を正規化した放射照度に重ね合わせます。 InfluxDB-Raw を使用してクエリを実行します 5 分間の集計ウィンドウ。
- 行 3 - インバーターのヒートマップ: X 軸 = 時間、Y 軸 = inverter_id のヒートマップ 値 = AC 電力。発電量の少ないインバータを視覚的に識別できます。 1分更新。
- 行 4 - 温度とアラーム: インバーターの最高温度のゲージチャート、 タイムスタンプ順に並べ替えられた、energy_alerts バケットからの最後の 50 件のアラームを含むテーブル パネル。
- 5 行目 - ネットワークの効率と品質: 散布図効率対温度、 主電源周波数と相電圧の時系列。
高度なアラート: Grafana + PagerDuty + Slack
# config/grafana/provisioning/alerting/rules.yml
apiVersion: 1
groups:
- name: "PV Plant Alerts"
folder: "Energy Alerts"
interval: 1m
rules:
# Alert: Produzione impianto drasticamente ridotta
- uid: pv001-production-drop
title: "PV001 - Produzione ridotta >30%"
condition: C
data:
# Query A: Produzione attuale (15 min rolling mean)
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "ac_power_w")
|> mean()
|> sum()
# Query B: Irraggiamento attuale (per normalizzare)
- refId: B
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_weather")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "irradiance_wm2")
|> mean()
# Condition C: se irraggiamento > 200 W/m2 (giorno chiaro)
# e produzione normalizzata < 70% del teorico
- refId: C
datasourceUid: __expr__
model:
type: math
expression: "$A / ($B * 50000) < 0.70 and $B > 200"
for: 10m # alert solo se condizione persiste 10 min
labels:
severity: "warning"
plant_id: "PV001"
category: "production"
annotations:
summary: "PV001: produzione sotto il 70% del teorico"
description: |
Produzione attuale: {{ $A }} W
Irraggiamento: {{ $B }} W/m2
Performance attesa: > 70%
runbook_url: "https://wiki.energy-corp.it/runbook/pv-production-drop"
# Alert: Inverter in fault
- uid: pv001-inverter-fault
title: "PV001 - Inverter Fault Rilevato"
condition: C
data:
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -2m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "operating_state")
|> filter(fn: (r) => r._value == 7) // stato fault
|> count()
- refId: C
datasourceUid: __expr__
model:
type: threshold
expression: "$A"
conditions:
- evaluator:
params: [0]
type: gt
for: 1m
labels:
severity: "critical"
plant_id: "PV001"
annotations:
summary: "Inverter in stato FAULT rilevato"
# Alert: Temperatura critica
- uid: pv001-thermal-critical
title: "PV001 - Temperatura Inverter Critica"
condition: C
data:
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> max()
- refId: C
datasourceUid: __expr__
model:
type: threshold
expression: "$A"
conditions:
- evaluator:
params: [85]
type: gt
for: 5m
labels:
severity: "critical"
category: "thermal"
# Contact points
contactPoints:
- name: "PagerDuty Critical"
receivers:
- uid: pagerduty-critical
type: pagerduty
settings:
integrationKey: $PAGERDUTY_INTEGRATION_KEY
severity: critical
class: "EnergyAlerts"
component: "PV Plant"
- name: "Slack Operations"
receivers:
- uid: slack-ops
type: slack
settings:
url: $SLACK_WEBHOOK_URL
channel: "#energy-ops-alerts"
username: "Grafana Alert Bot"
iconEmoji: ":zap:"
title: "{{ template \"slack.title\" . }}"
text: "{{ template \"slack.message\" . }}"
# Routing
policies:
- receiver: "Slack Operations"
group_by: ["plant_id", "severity"]
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
routes:
- receiver: "PagerDuty Critical"
matchers:
- "severity = critical"
continue: true # invia anche a Slack
セキュリティ: TLS、認証、ネットワークセグメンテーション
エネルギーおよび重要な IoT プラットフォームのセキュリティ: 生産データは 商業上の機密情報、およびインバータにコマンドを送信する機能 不正なアクセスから保護する必要があります。多層防御戦略 4つのレベルに分かれています。
スクリプトによるTLS証明書の生成
#!/bin/bash
# generate_certs.sh - Genera CA e certificati per mutual TLS
set -euo pipefail
CERTS_DIR="./certs"
mkdir -p "$CERTS_DIR"
# 1. CA (Certificate Authority) - Root of trust
openssl genrsa -out "$CERTS_DIR/ca.key" 4096
openssl req -x509 -new -nodes \
-key "$CERTS_DIR/ca.key" \
-sha256 -days 3650 \
-out "$CERTS_DIR/ca.crt" \
-subj "/C=IT/ST=Puglia/L=Bari/O=EnergyCorp/CN=Energy IoT CA"
# 2. Server certificate (Mosquitto)
openssl genrsa -out "$CERTS_DIR/server.key" 2048
openssl req -new \
-key "$CERTS_DIR/server.key" \
-out "$CERTS_DIR/server.csr" \
-subj "/C=IT/O=EnergyCorp/CN=mosquitto"
# SAN per tutti i possibili hostname
cat > "$CERTS_DIR/server-ext.cnf" <<EOF
[req]
req_extensions = v3_req
[v3_req]
subjectAltName = @alt_names
[alt_names]
DNS.1 = mosquitto
DNS.2 = mqtt.energy-corp.it
DNS.3 = localhost
IP.1 = 127.0.0.1
IP.2 = 172.20.0.10
EOF
openssl x509 -req \
-in "$CERTS_DIR/server.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/server.crt" \
-days 825 \
-sha256 \
-extfile "$CERTS_DIR/server-ext.cnf" \
-extensions v3_req
# 3. Client certificate per Telegraf
openssl genrsa -out "$CERTS_DIR/telegraf.key" 2048
openssl req -new \
-key "$CERTS_DIR/telegraf.key" \
-out "$CERTS_DIR/telegraf.csr" \
-subj "/C=IT/O=EnergyCorp/CN=telegraf"
openssl x509 -req \
-in "$CERTS_DIR/telegraf.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/telegraf.crt" \
-days 825 -sha256
# 4. Client certificate per gateway PV001
openssl genrsa -out "$CERTS_DIR/gateway_PV001.key" 2048
openssl req -new \
-key "$CERTS_DIR/gateway_PV001.key" \
-out "$CERTS_DIR/gateway_PV001.csr" \
-subj "/C=IT/O=EnergyCorp/CN=gateway_PV001"
openssl x509 -req \
-in "$CERTS_DIR/gateway_PV001.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/gateway_PV001.crt" \
-days 365 -sha256 # 1 anno per gateway (rotazione annuale)
echo "Certificati generati in $CERTS_DIR/"
echo "CA: ca.crt"
echo "Server (Mosquitto): server.crt + server.key"
echo "Client Telegraf: telegraf.crt + telegraf.key"
echo "Client Gateway PV001: gateway_PV001.crt + gateway_PV001.key"
安全性: 生産における必須の慣行
- 相互 TLS は常に次のことを行います。 クライアントとサーバーの両方が証明書を提示する必要があります。 ユーザー名/パスワードを知っている場合でも、未承認のゲートウェイからの接続を阻止します。
- 環境変数またはシークレットマネージャーを介したシークレット: 決してハードコードしないでください コードまたは構成ファイル内の InfluxDB トークン、MQTT パスワード、または API キー git にコミットしました。 Docker シークレット、HashiCorp Vault、または AWS Secrets Manager を使用します。
- ネットワークのセグメンテーション: MQTT ブローカー、InfluxDB、および Telegraf は必要ありません。 インターネット上に直接公開されることはありません。 Grafana は HTTPS リバース プロキシ経由のみ。 エッジ ゲートウェイの MQTT ブローカーは、専用の VPN 経由で公開できます。
- 証明書のローテーション: ゲートウェイ証明書の有効期間は 1 年間です (365日)。 Grafana アラームが 30 日前に期限切れになるように設定します。
- MQTT 監査ログ: すべての接続とトピックのログを有効にする モスキートで。異常アクセスを検出するために SIEM と統合します。
パフォーマンス: 高スケール向けのベンチマークと最適化
10 MW システムのスタック容量
10 MW の太陽光発電所には通常、それぞれ 50 kW のインバーターが 200 台あります。 5 秒ごとのポーリングの場合、MQTT メッセージの量は次のようになります。
| 成分 | メトリクス | 音量 |
|---|---|---|
| インバータ200台×5秒 | 15 フィールド/メッセージ | 40 メッセージ/秒、600 データ ポイント/秒 |
| 10 個の気象観測所 x 10 秒 | 8 フィールド/メッセージ | 1 メッセージ/秒、8 データポイント/秒 |
| プラント合計 x 15 分 | フラックス凝集体 | DB内で計算 |
| 合計 | - | ~650 データポイント/秒、~5,600 万データポイント/日 |
| 生の保管 (30 日間) | ~16億8,000万のデータポイント | ~8 ~ 15 GB (InfluxDB 圧縮あり) |
このボリュームは、単一の InfluxDB 2.7 ノードで十分に管理できます。 8 GB RAM と NVMe SSD。 4 vCPU と 8 GB RAM を備えた VM 上の EMQX は 100K の接続を処理します 1M メッセージ/秒のスループットを持つ競合他社。単体設置の場合 200 台のインバーター、Raspberry Pi 4 上の単一の Mosquitto ブローカー、そして十分以上のものから (実際のテスト: 10K msg/秒、500 MB RAM)。
MQTT ベンチマーク スクリプト
#!/usr/bin/env python3
"""
mqtt_benchmark.py
Benchmark del broker MQTT per simulare 200 inverter
Verifica throughput, latenza e affidabilità
"""
import asyncio
import json
import time
import random
import statistics
from datetime import datetime, timezone
import aiomqtt
async def simulate_inverter(
plant_id: str,
inverter_id: str,
broker_host: str,
broker_port: int,
duration_sec: int,
poll_interval: float = 5.0
) -> dict:
"""Simula un inverter che pubblica metriche su MQTT"""
messages_sent = 0
latencies = []
errors = 0
start_time = time.monotonic()
async with aiomqtt.Client(
hostname=broker_host,
port=broker_port,
client_id=f"bench_{plant_id}_{inverter_id}",
keepalive=30,
) as client:
while time.monotonic() - start_time < duration_sec:
# Simula metriche realistiche
base_power = 50000 * random.uniform(0.3, 1.0) # 0-50 kW
payload = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"plant_id": plant_id,
"inverter_id": inverter_id,
"ac_power_w": base_power,
"ac_voltage_l1_v": random.uniform(225, 235),
"dc_voltage_v": random.uniform(550, 650),
"dc_current_a": base_power / random.uniform(550, 650),
"temp_cabinet_c": random.uniform(35, 65),
"efficiency_pct": random.uniform(94, 98.5),
"operating_state": 4, # MPPT
"error_code": 0,
"energy_wh_total": messages_sent * base_power * poll_interval / 3600,
}
topic = f"plant/{plant_id}/inverter/{inverter_id}/metrics"
t_send = time.monotonic()
try:
await client.publish(
topic=topic,
payload=json.dumps(payload).encode(),
qos=1,
)
latencies.append((time.monotonic() - t_send) * 1000) # ms
messages_sent += 1
except Exception:
errors += 1
await asyncio.sleep(poll_interval)
return {
"inverter_id": inverter_id,
"messages_sent": messages_sent,
"errors": errors,
"avg_latency_ms": statistics.mean(latencies) if latencies else 0,
"p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)] if latencies else 0,
}
async def run_benchmark(
num_inverters: int = 200,
duration_sec: int = 60,
broker_host: str = "localhost",
broker_port: int = 1883,
) -> None:
"""Esegui benchmark con N inverter simulati"""
print(f"Avvio benchmark: {num_inverters} inverter per {duration_sec}s")
print(f"Broker: {broker_host}:{broker_port}")
print("-" * 60)
start = time.monotonic()
tasks = [
simulate_inverter(
plant_id="BENCH001",
inverter_id=f"INV{i:03d}",
broker_host=broker_host,
broker_port=broker_port,
duration_sec=duration_sec,
poll_interval=5.0,
)
for i in range(1, num_inverters + 1)
]
results = await asyncio.gather(*tasks)
elapsed = time.monotonic() - start
# Aggregazione risultati
total_messages = sum(r["messages_sent"] for r in results)
total_errors = sum(r["errors"] for r in results)
all_latencies_avg = [r["avg_latency_ms"] for r in results]
all_latencies_p99 = [r["p99_latency_ms"] for r in results]
print(f"Risultati benchmark:")
print(f" Durata: {elapsed:.1f}s")
print(f" Messaggi inviati: {total_messages:,}")
print(f" Errori: {total_errors} ({total_errors/total_messages*100:.2f}%)")
print(f" Throughput: {total_messages/elapsed:.1f} msg/sec")
print(f" Latenza media: {statistics.mean(all_latencies_avg):.2f} ms")
print(f" Latenza P99: {statistics.mean(all_latencies_p99):.2f} ms")
print(f" Inverter senza errori: {sum(1 for r in results if r['errors'] == 0)}/{num_inverters}")
if __name__ == "__main__":
asyncio.run(run_benchmark(
num_inverters=200,
duration_sec=120,
broker_host="localhost",
broker_port=1883,
))
ケーススタディ: 10 MW 太陽光発電パークのモニタリング
このケーススタディでは、エネルギー IoT プラットフォームの実際の実装について説明します。 太陽光発電パーク向け 10MW 南イタリアで 50 kW の SMA Sunny Tripower インバーター 200 台 8つの気象観測所。
初期セットアップと課題
サイトは 4G WAN ネットワーク上にあり、帯域幅は 10 Mbps で、遅延は可変です。 30ミリ秒と200ミリ秒。各インバータはローカル LAN ネットワーク (192.168.x.x) 経由で Modbus TCP を通信します。 産業用スイッチ。エッジ ゲートウェイは、256 GB SSD を搭載した Raspberry Pi 4 (8 GB RAM) です。 ローカル バッファリング用に、専用データ SIM を備えた産業用 4G ルーター経由で WAN に接続します。
| パラメータ | 価値 |
|---|---|
| システム電源 | 10.2 MW ピーク (SMA STP50-US-40) |
| インバータ番号 | 200 (SMA サニー トライパワー、50 kW) |
| 気象観測所 | 8 (日射量、T、HR、風) |
| エッジゲートウェイ | ラズベリーパイ4 8GB + 256GB SSD |
| ポーリング間隔インバータ | 5秒 |
| 天気のポーリング間隔 | 10秒 |
| MQTTメッセージ量 | ~41 msg/秒 (200 受信 x 1/5 秒 + 8 天気 x 1/10 秒) |
| 使用されているWAN帯域幅 | ~180 KB/秒 (利用可能な 10 Mbps の 2%) |
| MQTT ブローカー | クラウド VM 上の EMQX 5.8 (4 vCPU、8 GB) |
| 流入DB | 専用 VM 上の 2.7.10 (8 vCPU、32 GB RAM、NVMe) |
6ヶ月の手術後の結果
- プラットフォームの可用性: 99.94% (合計ダウンタイム: 6 か月で 2.6 時間、 すべてクラウドサーバーの定期メンテナンスのため)
- WAN 切断によるデータ損失: 0.002% (ローカルバッファリングのおかげで) ゲートウェイと MQTT 永続セッションの接続)
- 自動的に検出されたインバータ障害アラート: 6か月で47のイベント、 即時アラートのおかげで、MTTR (平均修理時間) が 8 時間から 2.3 時間に短縮されました PagerDuty 上 (翌日の手動検出と比較)
- 平均パフォーマンス率: 82.3% vs 前期 79.1% (なし) リアルタイム監視): +3.2 パーセント ポイント = +320 追加 MWh/年
- 予知保全による節約: 12台のインバータを検出 故障前の早期劣化(ヒートシンク温度は平均値と比較して +8°C) と宣言した。予防的介入により、緊急交換と比較して 45,000 ユーロの節約効果が得られると推定されています。
- 6 か月後の InfluxDB ストレージ: 生データの場合は 18.7 GB (30 日ローリング)、 1 時間集計の場合は 4.2 GB (6 か月)、1 日集計の場合は 0.8 GB (合計 180 日)
教訓: バッファリングエッジは交渉の余地がない
運用開始から最初の 1 か月間、4G 接続は 3 回中断されました 電話オペレーターの問題により延長されました (それぞれ 2 ~ 6 時間)。なしで Raspberry Pi の SSD でローカル バッファリングを行うと、すべての測定値が失われます。 それらの窓の間に。 64 GB のバッファとストア アンド フォワード ロジックのおかげで、 接続が戻るたびに、蓄積されたメッセージはすべて削除されます。 元のタイムスタンプを使用して時系列順に MQTT で公開され、InfluxDB で公開されます。 順不同の挿入に対して正しく受け入れられました。
ベストプラクティスとアンチパターン
ベストプラクティス
- ブローカー内ではなく、ペイロード内のタイムスタンプ: ゲートウェイには常に次のものが含まれている必要があります JSON ペイロード内のキャプチャの正確なタイムスタンプ。 MQTT ブローカーの受信。オフラインバッファリングの場合、メッセージの到着が遅くなります ただし、データは時間的には正しいです。 InfluxDB は順不同の挿入を受け入れます。
-
テレメトリ チャネルをコマンド チャネルから分離します。 テレメトリーのトピック
(
plant/+/inverter/+/metrics) とコマンドのもの (plant/+/commands/#) これらは、別個の ACL を持つ別個の名前空間上に存在する必要があります。コマンドには QoS 2 e が必要です より厳格な認証。 - 履歴データの積極的なダウンサンプリング: 5秒の生データ これらは事後の故障分析には価値がありますが、履歴傾向の集計には十分です。 15分または回。 Flux のダウンサンプリングと保持タスクをすぐに実装します。
- ゲートウェイの健全性監視: ゲートウェイの MQTT ハートビートを使用する (専用のトピックで 60 秒ごとに公開)、Grafana で監視します。心拍が失われた場合 3 期間にわたって、重大な警告: ブローカーの問題ではなく、ゲートウェイが故障している可能性があります。
- InfluxDB のゲートウェイ システム メトリクス: エッジゲートウェイ上の Telegraf CPU、RAM、CPU 温度、バッファーに使用可能なディスク容量などのメトリクスを収集できます。 そしてそれらを MQTT で公開します。 Raspberry Pi にストレスがかかっているかどうかを知るために不可欠です。
避けるべきアンチパターン
重大なアンチパターン
-
集約しないとトピックが細分化しすぎます: シングルごとにリリースする
Modbus レジスタについては別のトピックで説明します (例:
plant/PV001/INV001/register/40001) ブローカー内で数万のトピックと膨大なメタデータのオーバーヘッドが生成されます。 関連するログは常に、デバイスごとに 1 つの JSON ペイロードに集約してください。 - 高周波テレメトリの QoS 2: 四者握手は、 QoS 2 では、制御メッセージの数が 4 倍になります。 1 ~ 5Hz データの場合、QoS 1 を使用します。 (重複はタイムスタンプ冪等性を介して InfluxDB によって処理されます)。コマンドのみの QoS 2。
-
テレメトリのトピックを保持します。 保持はステータスのトピックに役立ちます
(最後の値は重要です)、ただし高周波テレメトリのトピックに関するものです。
ブローカーは、オーバーヘッドを伴い、パブリケーションごとに保持されたメッセージを更新する必要があります。
ストレージとCPUの。保持のみを使用する
/statuse/config. -
InfluxDB の無制限のカーディナリティ: 高い値のタグを避ける
カーディナリティ (例: 使用
session_idEVステーションのタグとして新規作成 各セッションのシリーズ)。タグではなく、代わりにフィールドとして使用します。 - InfluxDB にはレート制限なし: 同時クエリの制限なし、 単一の重い Grafana クエリ (6 か月分の生データのエクスポートなど) が飽和する可能性があります。 DB メモリが原因でキラー OOM が発生します。常にクエリ制限を構成します。
リソースと技術リファレンス
- MQTT 5.0 仕様 (OASIS): 公式の MQTT 5.0 プロトコル仕様では、QoS、保持、 メッセージ、セッション管理、ユーザー プロパティを含みます。実装の基本 企業。
- Telegraf MQTT コンシューマ プラグイン: InfluxData の公式ドキュメントには、すべての構成パラメータが説明されています mqtt_consumer プラグインの詳細 (トピック解析およびサポートされるデータ形式を含む)。 URL: docs.influxdata.com/telegraf
- InfluxDB Flux ドキュメント: Flux 言語は強力ですが、学習には時間がかかります。ドキュメント 公式には、aggregateWindow を含むすべての関数を備えたリファレンスが含まれています。 結合、ピボット、マップ。
- サンスペックアライアンス: SunSpec 標準は、太陽光発電インバータの Modbus レジスタ マップを定義しています。 バッテリーとメーター。 sunspec.org で入手できます。間の相互運用性を確保します。 さまざまなメーカーのデバイス。
- EMQX ドキュメント: EMQX ドキュメントには、クラスタリング、ルール エンジン、セキュリティ、および InfluxDB などの外部ストレージ システムとの統合。
- Grafana アラート: 統合アラート システム (v9 以降) の Grafana ドキュメントには次のものが含まれます。 YAML によるプロビジョニング、コンタクト ポイントの構成、ルーティングに関するガイド。
- IIoT セキュリティ: IEC 62443: IEC 62443 規格は、オートメーション システムの安全要件を定義しています。 エネルギーシステムを含む産業用制御。認定には必須 NIS2 (イタリアでは立法令 138/2024 により導入)。
結論と次のステップ
私たちは、ブリッジから、すぐに生産可能な完全なエネルギー IoT プラットフォームを構築しました。 Grafana、PagerDuty、およびローカル バッファリングによるマルチチャネル アラートを備えた Modbus-MQTT たるみ。 MQTT + Telegraf + InfluxDB + Grafana スタックはテストされ、文書化されており、スケーラブルです。 10 MW システムの 650 データ ポイント/秒を豊富なマージンで管理します。 クラスタリングおよび複数ノードの EMQX InfluxDB はユーティリティ規模の展開に直線的に拡張します 数百MW。
ケーススタディの数字がそれを物語っています: 99.94% の可用性、MTTR の削減 2.3 時間で 8、パフォーマンス比ポイント +3.2、メンテナンス費用の 45,000 ユーロの節約 6か月以内の予測。リアルタイム監視はエネルギー システムにとって贅沢ではありません 再生可能エネルギー: 収益と資産の耐用年数の直接的な乗数。
EnergyTech シリーズの次の記事では、次のトピックを取り上げます。 カーボン 会計: 回避排出量を計算する ESG プラットフォームを構築する方法、 収集された生産データから始まるグリーン証明書と炭素クレジット 構築したばかりの IoT プラットフォームから。
関連記事
- エナジーテックシリーズ: この記事は専用シリーズの一部です 再生可能エネルギーのソフトウェアエンジニアリングまで。前回の記事は、 EV ロード バランシングとそれに続くカーボン アカウンティングによって全体像が完成します エネルギー移行のためのソフトウェア インフラストラクチャの構築。
- MLOps シリーズ: ダウンサンプリングと異常検出の実装 Flux の場合、それらは最初のステップにすぎません。 MLOps シリーズでは、モデルを統合する方法について説明します。 ML (生産予測には LSTM、異常検出には Isolation Forest) データは同じ InfluxDB プラットフォームから取得されます。
- データ&AIビジネスシリーズ: MQTT-InfluxDB アーキテクチャは、 IoT データ レイクハウスの例。データと AI ビジネス シリーズでは、その方法を詳しく掘り下げます。 DBT と Airflow を使用して、この時系列データをエンタープライズ分析パイプラインに取り込みます。
- PostgreSQL AI シリーズ: より複雑な分析の場合は、 リレーショナル データとの JOIN (例: 契約データ、プラント マスター データ、履歴) メンテナンス)、Foreign Data Wrapper を介した InfluxDB と PostgreSQL の統合 PostgreSQL AI シリーズでも取り上げられています。







