Python と MQTT を使用した精密農業用の IoT パイプライン
プーリア州の小麦畑。 3,000 ヘクタール、さまざまな深さに埋められた 40 個のセンサー、からのデータ 30秒ごとに届く温度、土壌湿度、pH、電気伝導度。なし 構造化されたデータ パイプライン、これは単なるデジタル ノイズです。適切なパイプラインがあれば、それがエンジンになります 水消費量を 40% 削減し、収量を 15% 増加させ、コストを削減する意思決定の割合 肥料を4分の1に減らします。
精密農業は将来の約束ではありません。それは 2025 年にも有効な産業上の現実です。 147.7億ドル 世界的に、市場は次のようになると予測されています。 2030年までに268.6億人 CAGRは12.7%です。イタリアでは、28.5%の企業が ISTAT 2024 データによると、農業ではすでに精密農業技術が使用されており、ピークでは 100ヘクタール以上のUAAを持つ企業では41.1%。イタリアの農業セクターは価値を生み出してきた によって追加されました 2024年に424億ユーロイタリアがヨーロッパで最初であることを確認し、 そしてデジタル化がこのパフォーマンスの主な原動力です。
しかし、現場のセンサーと正しい農学上の決定の間には、複雑な技術的パスがあります。 低電力無線プロトコル、MQTT ブローカー、スキーム検証、強化パイプライン、 データレイク上のメダリオンアーキテクチャ、リアルタイムダッシュボード、アラートシステム。この記事 動作する Python コード、実際のアーキテクチャ、ベスト プラクティスを使用して、このチェーンのすべてのステップをカバーします。 本番環境で検証済み。
この記事で学べること
- 精密農業向けの IoT システムのエンドツーエンド アーキテクチャ
- 農業用センサーと無線プロトコルの種類: MQTT、LoRaWAN、Zigbee の比較
- MQTT の詳細: QoS 0/1/2、保持されるメッセージ、遺言、トピックの設計
- paho-mqtt を使用した完全な Python 実装: センサー パブリッシャーとコンシューマー パイプライン
- Pydantic を使用した IoT データの検証とスキーマの適用
- 時系列のための InfluxDB とストリーム処理のための Apache Kafka との統合
- 農業データ用のメダリオン アーキテクチャ (ブロンズ/シルバー/ゴールド)
- Grafana ダッシュボードとクリティカルしきい値アラート システム
- イタリアの背景: CAP、PNRR Transition 5.0、AgriTech 2025 インセンティブ
フードテックシリーズ - すべての記事
| # | アイテム | レベル | Stato |
|---|---|---|---|
| 1 | 精密農業のための IoT パイプライン (ここにいます) | 高度な | 現在 |
| 2 | 作物監視のための ML Edge: 圃場でのコンピュータ ビジョン | 高度な | 近日公開 |
| 3 | 衛星 API と植生インデックス: Python と Sentinel-2 を使用した NDVI | 中級 | 近日公開 |
| 4 | 食品におけるブロックチェーンのトレーサビリティ: 現場からスーパーマーケットまで | 中級 | 近日公開 |
| 5 | 食品産業における品質管理のためのコンピュータービジョン | 高度な | 近日公開 |
| 6 | FSMA とデジタル コンプライアンス: 規制プロセスの自動化 | 中級 | 近日公開 |
| 7 | 垂直農法: IoT と ML による環境制御 | 高度な | 近日公開 |
| 8 | Prophet と LightGBM を使用した食品小売の需要予測 | 中級 | 近日公開 |
| 9 | ファーム インテリジェンス ダッシュボード: Grafana を使用したリアルタイム分析 | 中級 | 近日公開 |
| 10 | サプライチェーンの食品の最適化: 廃棄物削減のための ML | 中級 | 近日公開 |
2025 年のアグリテック市場: 数字と傾向
わずか数年で、精密農業はニッチな技術から戦略的推進力へと成長しました。 第一次産業の競争力。数字がそれを明確に裏付けています。 精密農業が適用される 2025年には147.7億ドル そしてそれは次のように成長します 2030 年までに 268 億 6,000 万。しかし、アグリテックの全体像はさらに広範囲に及びます。 管理ソフトウェア、農業用ドローン、ロボット工学、デジタルバイオインプットを含む拡張は、 さまざまな調査情報によると、2025 年には 300 億人に達し、2031 年までの CAGR は 16 ~ 23% になると予想されています。
イタリアでは、2024 年が転換点となりました。ISTAT によると、イタリアの農業は次の目標に達しました。 付加価値額は424億ユーロ(2023年比9%増)で欧州第1位。 農業生産は量で1.4%増加し、付加価値は3.5%増加した。 同時に、イタリアの農業企業の 28.5% はすでに精密な技術を使用しています。 北東部 (33%) と北西部 (32.1%) および大手通信事業者に集中しています。 (UAA が 100 ヘクタールを超える企業では 41.1%)。
2025 年に精密農業技術を実現
| テクノロジー | 主な用途 | イタリアの養子縁組 | 平均ROI |
|---|---|---|---|
| IoT土壌センサー | 可変的な水やり、施肥 | アルタ (北イタリア) | 投入コストの 15 ~ 25% 削減 |
| 農業用ドローン | マッピング、処理、葉の分析 | 平均 | 農薬を 30 ~ 40% 節約 |
| 衛星画像 | NDVI、水ストレス、予測収量 | 中~高 | 5 ~ 10% の収率の最適化 |
| IoT気象観測所 | 病気予測モデル、灌漑 | 高い | 治療回数が 10 ~ 20% 削減 |
| 可変レート技術 | 播種、可変施肥 | 低~中 | 8 ~ 15% の入力削減 |
| フィールドデータの機械学習 | 収量予測、農業最適化 | 低い | +10~20%の収率、-15%のインプット |
PNRR はその加速において決定的な役割を果たしました: ミッション 2「緑の革命と 「エコロジカル・トランジション」では車両群の近代化に4億ユーロを割り当てた 農業は 4.0 テクノロジーに向けて。移行計画 5.0、総予算は 2024~2025年の2年間で127億ユーロ (特に63億 移行 5.0)、農業にも適用可能: 2025 年予算法 (L. 207/2024) 適用範囲を拡大し、農業事業者も税額控除の対象となる デジタルおよびエネルギー効率の高いテクノロジーへの投資に最適です。
農業向け IoT アーキテクチャ: センサーからデータレイクまで
コードを 1 行書く前に、システム アーキテクチャを理解することが重要です 完了しました。初期の設計ミスは、次からスケールするときにコストのかかる書き直しにつながります。 10,000 個のセンサー。ここで説明するアーキテクチャは、主要なアーキテクチャで採用されているものです。 セクター内のオペレーターを対象としており、実際の運用環境で検証されています。
エンドツーエンドのアーキテクチャ: レイヤーとコンポーネント
┌─────────────────────────────────────────────────────────────────────┐
│ FIELD LAYER (Campo) │
│ [Sensore Suolo] [Stazione Meteo] [Sensore pH] [Drone Mapping] │
│ │ │ │ │ │
│ └────────────────┴────────────────┘ │ │
│ │ LoRaWAN / Zigbee / RS-485 │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ GATEWAY LAYER (Edge) │
│ [Gateway LoRaWAN / Raspberry Pi 4] │
│ - Aggregazione dati multi-sensore │
│ - Pre-elaborazione e filtro outlier │
│ - Buffer locale (offline tolerance) │
│ - Protocollo: MQTT publish su broker locale │
└────────────────────────┼────────────────────────────────────────────┘
│ MQTT / TLS
┌────────────────────────┼────────────────────────────────────────────┐
│ BROKER LAYER (Fog/Cloud) │
│ [EMQX / HiveMQ / Eclipse Mosquitto] │
│ - Topic management gerarchico │
│ - Autenticazione mTLS / JWT │
│ - QoS management e message persistence │
│ - Bridge verso cloud (AWS IoT / Azure IoT Hub) │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ PROCESSING LAYER (Cloud) │
│ [Apache Kafka] ──► [Stream Processor] ──► [InfluxDB] │
│ - Ingestion stream - Validazione schema - Time-series store │
│ - Partitioning - Enrichment - Retention policy │
│ - Consumer groups - Alerting real-time - Downsampling │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ DATA LAKE (Medallion Architecture) │
│ [Bronze: Raw S3] ──► [Silver: Cleaned] ──► [Gold: Analytics] │
│ - Dati grezzi MQTT - Schema validato - Aggregazioni │
│ - Immutabile - Outlier rimossi - ML features │
│ - Formato: Parquet - Formato: Delta/Iceberg - Formato: Parquet │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ SERVING LAYER (Applicazioni) │
│ [Grafana Dashboard] [Alert Manager] [ML Models] [Mobile App] │
│ - Real-time monitoring - SMS/Email/Push - Previsioni rese │
│ - Mappa campo - Soglie critiche - Ottimizzazione input │
└─────────────────────────────────────────────────────────────────────┘
各レベルには個別の分離された責任があります。この分離と批判により、次のことが可能になります。 コンポーネントに触れずにコンポーネントを置き換えます (例: スケーリング時に Mosquitto から EMQX に切り替える)。 他は。また、オフライン耐性を実装することもできます。クラウドへの接続が切断された場合、ゲートウェイは データをローカルで収集してバッファリングし続けます。
農業用センサー: 種類、プロトコル、展開
フィールド用センサーの主なカテゴリ
適切なセンサーの選択は、作物、土壌の状態、目的によって異なります。 農学者。 2025 年の技術的特徴と予想コストを含む主なカテゴリは次のとおりです。
精密農業用センサー
| カテゴリ | パラメータ | テクノロジー | 単価 | プロトコル |
|---|---|---|---|---|
| 土壌水分 (VWC) | 体積水分含量 | FDR、TDR、容量性 | 30~150ユーロ | SDI-12、RS-485、LoRa |
| 地温 | Tグランド 10/30/50cm | PT100、NTC | 20~80ユーロ | SDI-12、I2C、1-Wire |
| 土壌pH | 現場での酸性度 | ISE電極 | 80~300ユーロ | RS-485、Modbus |
| 電気伝導率(EC) | 塩分濃度、肥沃度 | 誘導、接触 | 60~200ユーロ | SDI-12、RS-485 |
| 気象観測所 | T、HR、風、雨、放射線 | 統合されたマルチセンサー | 200~800ユーロ | RS-485、WiFi、LoRa |
| リーフセンサー | 葉の湿度、温度 | 容量性、IR | 40~120ユーロ | SDI-12、I2C |
| 灌漑流量計 | 水流量 | 超音波、プロペラ | 80~350ユーロ | パルス、RS-485 |
| ポータブルNDVIセンサー | 植物指数 | マルチスペクトル | 300~1500ユーロ | ブルートゥース、WiFi |
無線通信プロトコル: 究極の比較
ワイヤレス プロトコルの選択は、おそらくシステム アーキテクチャにおいて最も重要な決定です。 農業IoT。フィールドには、最大 10km の距離、物理的な障害物 (列、列、 樹木、田舎の建物)、主電源なし、温度は摂氏 -20 ~ +60 度です。
農業IoT向け無線プロトコルの比較
| プロトコル | 範囲 | バンド | ドラム | インフラコスト | 使用事例 |
|---|---|---|---|---|---|
| LoRaWAN | 3~15km | 0.3~50kbps | 5~10年 | 中(ゲートウェイ) | 土壌センサー、遠隔地気象 |
| NB-IoT | 10km以上 | 20~250kbps | 3~8年 | 低 (SIM カード) | 4G/5G通信可能エリア |
| ジグビー | 10~100メートル | 250kbps | 1~3年 | 低(メッシュ) | 温室、自動灌漑システム |
| WiFi 6 | 100~200メートル | 高 (Gbps) | 時間/日 | 中(AP) | カメラシステム、ビデオ品質分析 |
| 4G/LTE | 無制限 | 高い | 1~5年 | 中(SIM) | 農業機械、モバイルゲートウェイ |
| RS-485(有線) | 1200メートル | 10Mbps | 該当なし | ベース(リード) | 管理された温室、固定システム |
典型的なイタリアの農場 (50 ~ 500 ヘクタールの露地) の場合、最も一般的なソリューションは、 2025 年とハイブリッド アーキテクチャ: 土壌センサー用 LoRaWAN 人里離れた野原で、 農機向けNB-IoT 動いている、e 温室用 WiFi/有線 ここで、精度とサンプリングレートは最大になります。中央ゲートウェイ (多くの場合、 Raspberry Pi 4 または Dragino 産業用ゲートウェイ) はすべてを集約し、MQTT 経由でクラウドに公開します。
MQTT の詳細: アーキテクチャ、QoS、ベスト プラクティス
MQTT (Message Queuing Telemetry Transport) は、IoT の事実上のプロトコルです。 IBMによって作成されました 1990 年代に衛星経由で石油パイプラインを監視することが ISO/IEC 20922 規格となり、心臓部 あらゆる本格的な IoT システムに対応します。そのシンプルさと強力さにより、次のような環境に最適です。 限られた帯域幅と低消費電力のデバイス。
パブリッシュ/サブスクライブ モデル
HTTP の典型的なリクエスト/レスポンス モデルとは異なり、MQTT は次のパラダイムを使用します。 パブリッシュ/サブスクライブ: データ作成者 (発行者) は、誰がデータを読むのかを知りません。 消費者(購読者)は誰が発行しているのか知りません。デカップリングは完全であり、次のものによって媒介されます。 と呼ばれる中心コンポーネント ブローカ.
データは次のように整理されています。 トピック、スラッシュで区切られた階層文字列。 データの性質を説明します。適切に設計されたトピックは、スケーラブルなアーキテクチャの鍵です。 複数の区画の農場の場合、次の構造が推奨されます。
# Struttura topic MQTT consigliata per agricoltura di precisione
# Pattern: azienda/appezzamento/dispositivo/tipo-sensore/metrica
# Esempi concreti:
farm/campo-nord/sensor-001/soil/moisture # Umidita suolo sensore 001
farm/campo-nord/sensor-001/soil/temperature # Temperatura suolo sensore 001
farm/campo-nord/sensor-001/soil/ph # pH suolo sensore 001
farm/campo-nord/sensor-001/soil/ec # Conducibilita elettrica
farm/campo-sud/weather-station/air/temperature # Temperatura aria stazione meteo
farm/campo-sud/weather-station/air/humidity # Umidita aria
farm/campo-sud/weather-station/wind/speed # Velocita vento
farm/campo-sud/weather-station/rain/mm # Precipitazioni
farm/+/+/soil/moisture # Wildcard: umidita suolo da TUTTI i campi e sensori
farm/campo-nord/# # Wildcard: TUTTI i dati dal campo nord
farm/# # Wildcard: TUTTI i dati dell'azienda
# Topic di sistema (prefisso $)
$SYS/brokers/emqx/connections/count # Statistiche broker
farm/campo-nord/sensor-001/$status # Status device (LWT)
farm/campo-nord/sensor-001/$command # Comandi al dispositivo
サービス品質 (QoS): 3 つのレベル
MQTT QoS は、クライアントとブローカー間のメッセージ配信の保証を定義します。 適切なレベルを選択すると、バッテリー、帯域幅、システムの信頼性に直接影響します。
MQTT QoS: 詳細な比較
| レベル | 名前 | 保証 | オーバーヘッド | 農業での使用 |
|---|---|---|---|---|
| QoS0 | せいぜい1回 | なし (ファイアアンドフォーゲット) | 最小(1パッケージ) | 高周波テレメトリー (T 5 秒ごと)、許容損失 |
| QoS1 | 少なくとも一度は | 少なくとも 1 つの配信 (重複の可能性あり) | 低 (2 パケット、ACK) | 湿度/pHセンサーの測定値、灌水ログ |
| QoS2 | 必ず 1 回 | 保証された配信は 1 回だけです | ハイ(4パック) | 灌水バルブ制御、重大警報、肥料投与 |
残されたメッセージと遺言書
農業に特に役立つ 2 つの MQTT 機能:
- 保持されるメッセージ: ブローカーはトピックに関する最後のメッセージを保存し、 すべての新規加入者に即座に配信されます。センサーの現在の状態にとって重要: 接続したダッシュボードは、次の値を待たずにすぐに最新の値を受け取ります。 出版サイクル。
- 遺言書 (LWT): 接続すると各機器が ブローカーが自動的に公開する「遺言」メッセージを設定できます。 接続が異常に失敗します。ポーリングなしでオフラインセンサーを検出するために不可欠 アクティブ: センサーが正しく切断されない場合 (バッテリー残量低下、干渉)、 ブローカーはステータス トピックで「オフライン」ステータスを自動的に公開します。
完全な Python 実装: Sensor Publisher
湿度を伴う現実的な農業センサーノードをシミュレートする Python パブリッシャーを実装します。
土壌、温度、pH、EC。コードは使用します paho-mqtt 2.x (コールバックを備えた最新の API
更新済み)、すべてのベスト プラクティスを実装します: LWT、保持メッセージ、適切な QoS、
自動再接続と構造化された JSON スキーマ。
# sensor_node.py
# Nodo sensore MQTT per agricoltura di precisione
# Dipendenze: pip install paho-mqtt pydantic
import paho.mqtt.client as mqtt
import json
import time
import random
import math
import logging
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from typing import Optional
import ssl
# ── Configurazione ────────────────────────────────────────────────────────────
BROKER_HOST = "emqx.azienda-agricola.it"
BROKER_PORT = 8883 # TLS
KEEPALIVE = 60 # secondi
CLIENT_ID = "sensor-campo-nord-001"
FARM_ID = "farm-001"
FIELD_ID = "campo-nord"
SENSOR_ID = "sensor-001"
# Topic base
TOPIC_BASE = f"{FARM_ID}/{FIELD_ID}/{SENSOR_ID}"
TOPIC_SOIL = f"{TOPIC_BASE}/soil"
TOPIC_STATUS = f"{TOPIC_BASE}/$status"
TOPIC_COMMAND = f"{TOPIC_BASE}/$command"
# Intervallo di pubblicazione in secondi
PUBLISH_INTERVAL = 30
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
log = logging.getLogger(__name__)
# ── Modello dati sensore ───────────────────────────────────────────────────────
@dataclass
class SoilReading:
"""Lettura completa da un nodo sensore suolo"""
sensor_id: str
farm_id: str
field_id: str
timestamp: str # ISO 8601 UTC
latitude: float
longitude: float
depth_cm: int # Profondità di installazione
# Misurazioni suolo
moisture_pct: float # Umidita volumetrica (VWC) %
temperature_c: float # Temperatura suolo gradi C
ph: float # pH suolo (4.0 - 9.0)
ec_ds_m: float # Conducibilita elettrica dS/m
# Metadata dispositivo
battery_pct: int # Livello batteria
rssi_dbm: int # Signal strength in dBm
firmware_version: str
# Flag qualità
quality_flag: str # "OK", "WARN", "ERROR"
quality_notes: Optional[str] = None
def read_sensors_from_hardware() -> SoilReading:
"""
In produzione: legge i sensori reali via SDI-12 o RS-485.
Qui: simula dati realistici con variazione temporale.
"""
now = datetime.now(timezone.utc)
# Ciclo circadiano per temperatura (più alta nelle ore centrali)
hour = now.hour
temp_base = 18.0
temp_variation = 6.0 * math.sin(math.pi * (hour - 6) / 12) if 6 <= hour <= 18 else -2.0
temperature = temp_base + temp_variation + random.gauss(0, 0.3)
# Umidita: degrada lentamente senza pioggia, segue ciclo stagionale
base_moisture = 35.0 # % VWC campo (35% = campo saturo irrigazione)
moisture = base_moisture + random.gauss(0, 1.5)
moisture = max(5.0, min(60.0, moisture)) # clamp fisico
# pH relativamente stabile
ph = 6.8 + random.gauss(0, 0.1)
ph = max(4.0, min(9.0, ph))
# EC: correlata alla salinita e fertilizzazione
ec = 1.2 + random.gauss(0, 0.05)
ec = max(0.1, min(5.0, ec))
# Battery che decresce lentamente (simulazione)
battery = max(10, 95 - int(time.time() / 3600) % 85)
# Quality flag automatico
quality = "OK"
notes = None
if moisture < 10.0:
quality = "WARN"
notes = "Umidita sotto soglia minima critica"
elif moisture > 55.0:
quality = "WARN"
notes = "Umidita sopra soglia saturazione"
if battery < 15:
quality = "WARN"
notes = (notes or "") + " | Batteria in esaurimento"
return SoilReading(
sensor_id = SENSOR_ID,
farm_id = FARM_ID,
field_id = FIELD_ID,
timestamp = now.isoformat(),
latitude = 40.4164,
longitude = 17.9308,
depth_cm = 30,
moisture_pct = round(moisture, 2),
temperature_c = round(temperature, 2),
ph = round(ph, 2),
ec_ds_m = round(ec, 3),
battery_pct = battery,
rssi_dbm = random.randint(-95, -45),
firmware_version = "2.4.1",
quality_flag = quality,
quality_notes = notes,
)
# ── Client MQTT ───────────────────────────────────────────────────────────────
class AgriSensorNode:
"""Nodo sensore MQTT con auto-reconnect e LWT"""
def __init__(self):
self.client = mqtt.Client(
client_id=CLIENT_ID,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
self._setup_auth()
self._setup_tls()
self._setup_callbacks()
self._setup_lwt()
self.connected = False
def _setup_auth(self):
self.client.username_pw_set(
username="sensor-user",
password="<TOKEN_SEGRETO>"
)
def _setup_tls(self):
"""TLS mutuo con certificato dispositivo"""
self.client.tls_set(
ca_certs = "/certs/ca.crt",
certfile = "/certs/sensor.crt",
keyfile = "/certs/sensor.key",
tls_version = ssl.PROTOCOL_TLS_CLIENT,
)
def _setup_callbacks(self):
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
self.client.on_publish = self._on_publish
def _setup_lwt(self):
"""Last Will Testament: pubblicato dal broker se la connessione cade"""
lwt_payload = json.dumps({
"sensor_id": SENSOR_ID,
"status": "offline",
"reason": "connection_lost",
"timestamp": datetime.now(timezone.utc).isoformat(),
})
self.client.will_set(
topic = TOPIC_STATUS,
payload = lwt_payload,
qos = 1,
retain = True, # Retain: dashboard vede subito lo stato offline
)
def _on_connect(self, client, userdata, flags, reason_code, properties):
if reason_code.is_failure:
log.error(f"Connessione fallita: {reason_code}")
return
log.info(f"Connesso al broker: {BROKER_HOST}")
self.connected = True
# Pubblica stato online (retained)
online_payload = json.dumps({
"sensor_id": SENSOR_ID,
"status": "online",
"timestamp": datetime.now(timezone.utc).isoformat(),
"firmware": "2.4.1",
})
client.publish(TOPIC_STATUS, online_payload, qos=1, retain=True)
# Sottoscrivi ai comandi
client.subscribe(TOPIC_COMMAND, qos=2)
log.info(f"Sottoscritto a: {TOPIC_COMMAND}")
def _on_disconnect(self, client, userdata, flags, reason_code, properties):
self.connected = False
log.warning(f"Disconnesso: {reason_code}. Tentativo riconnessione...")
def _on_message(self, client, userdata, message):
"""Gestione comandi ricevuti dal broker (es. cambio intervallo)"""
try:
payload = json.loads(message.payload.decode())
cmd = payload.get("command")
log.info(f"Comando ricevuto: {cmd}")
if cmd == "set_interval":
global PUBLISH_INTERVAL
PUBLISH_INTERVAL = int(payload.get("value", 30))
log.info(f"Intervallo aggiornato a {PUBLISH_INTERVAL}s")
elif cmd == "reboot":
log.warning("Comando reboot ricevuto")
# In produzione: riavvia il sistema
except Exception as e:
log.error(f"Errore parsing comando: {e}")
def _on_publish(self, client, userdata, mid, reason_code, properties):
log.debug(f"Messaggio {mid} pubblicato con successo")
def connect(self):
self.client.connect(
host = BROKER_HOST,
port = BROKER_PORT,
keepalive = KEEPALIVE,
)
self.client.loop_start() # Thread background per I/O
def publish_reading(self, reading: SoilReading):
"""Pubblica lettura sensore su topic appropriati"""
# Payload principale: lettura completa
payload_full = json.dumps(asdict(reading), default=str)
result = self.client.publish(
topic = TOPIC_SOIL,
payload = payload_full,
qos = 1,
retain = True, # Ultimo valore sempre disponibile
)
# Pubblica anche metriche singole per dashboard real-time
metrics = {
"moisture": (reading.moisture_pct, 1),
"temperature": (reading.temperature_c, 0),
"ph": (reading.ph, 1),
"ec": (reading.ec_ds_m, 0),
}
for metric, (value, qos) in metrics.items():
self.client.publish(
topic = f"{TOPIC_SOIL}/{metric}",
payload = str(value),
qos = qos,
retain = True,
)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
log.info(
f"Pubblicato | Moisture: {reading.moisture_pct}% | "
f"Temp: {reading.temperature_c}C | pH: {reading.ph} | "
f"EC: {reading.ec_ds_m} dS/m | Quality: {reading.quality_flag}"
)
else:
log.error(f"Errore pubblicazione: {result.rc}")
def run(self):
"""Loop principale del nodo sensore"""
self.connect()
# Attendi connessione iniziale
timeout = 10
while not self.connected and timeout > 0:
time.sleep(1)
timeout -= 1
if not self.connected:
log.error("Impossibile connettersi al broker")
return
log.info(f"Nodo sensore avviato. Intervallo: {PUBLISH_INTERVAL}s")
try:
while True:
reading = read_sensors_from_hardware()
self.publish_reading(reading)
time.sleep(PUBLISH_INTERVAL)
except KeyboardInterrupt:
log.info("Shutdown richiesto")
finally:
# Disconnessione pulita: il LWT NON viene inviato
offline_payload = json.dumps({
"sensor_id": SENSOR_ID,
"status": "offline",
"reason": "graceful_shutdown",
"timestamp": datetime.now(timezone.utc).isoformat(),
})
self.client.publish(TOPIC_STATUS, offline_payload, qos=1, retain=True)
time.sleep(0.5)
self.client.loop_stop()
self.client.disconnect()
if __name__ == "__main__":
node = AgriSensorNode()
node.run()
消費者パイプライン: 検証、強化、ストレージ
消費者はシステムのもう一方の端です。 MQTT トピックをサブスクライブし、受信したデータを検証します。 Pydantic、コンテキスト データ (農業情報、気象警報) を使用して読み取り値を強化します。 そしてそれらを時系列ストレージとして InfluxDB にルーティングし、データ レイクとして S3 にルーティングします。
# pipeline_consumer.py
# Consumer MQTT + validazione Pydantic + storage InfluxDB
# Dipendenze: pip install paho-mqtt pydantic influxdb-client boto3
import paho.mqtt.client as mqtt
import json
import logging
from datetime import datetime, timezone
from typing import Optional
from pydantic import BaseModel, Field, field_validator, ValidationError
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import boto3
import io
log = logging.getLogger(__name__)
# ── Schema di validazione Pydantic ────────────────────────────────────────────
class SoilReadingSchema(BaseModel):
"""Schema di validazione per letture sensore suolo"""
sensor_id: str
farm_id: str
field_id: str
timestamp: str
latitude: float = Field(ge=-90, le=90)
longitude: float = Field(ge=-180, le=180)
depth_cm: int = Field(ge=0, le=200)
moisture_pct: float = Field(ge=0.0, le=100.0)
temperature_c: float = Field(ge=-40.0, le=80.0)
ph: float = Field(ge=0.0, le=14.0)
ec_ds_m: float = Field(ge=0.0, le=20.0)
battery_pct: int = Field(ge=0, le=100)
rssi_dbm: int = Field(ge=-150, le=0)
firmware_version: str
quality_flag: str = Field(pattern="^(OK|WARN|ERROR)$")
quality_notes: Optional[str] = None
@field_validator("timestamp")
@classmethod
def validate_timestamp(cls, v: str) -> str:
"""Verifica che il timestamp sia ISO 8601 valido e non nel futuro"""
try:
ts = datetime.fromisoformat(v)
if ts > datetime.now(timezone.utc):
raise ValueError("Timestamp nel futuro")
except ValueError as e:
raise ValueError(f"Timestamp non valido: {e}")
return v
@field_validator("ph")
@classmethod
def validate_ph_agronomico(cls, v: float) -> float:
"""pH fuori range agronomico (4.5-8.5) e anomalia"""
if v < 4.5 or v > 8.5:
log.warning(f"pH {v} fuori range agronomico tipico [4.5-8.5]")
return v
def has_critical_alert(self) -> bool:
"""Verifica se la lettura richiede un alert critico"""
return (
self.moisture_pct < 10.0 or
self.moisture_pct > 58.0 or
self.ph < 4.5 or
self.ph > 8.5 or
self.ec_ds_m > 4.0 or
self.battery_pct < 10
)
# ── Storage InfluxDB ──────────────────────────────────────────────────────────
class InfluxDBWriter:
"""Writer per time-series su InfluxDB 2.x"""
def __init__(self, url: str, token: str, org: str, bucket: str):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.bucket = bucket
self.org = org
def write_soil_reading(self, reading: SoilReadingSchema):
"""Scrive una lettura suolo su InfluxDB con tags e fields ottimizzati"""
point = (
Point("soil_reading")
# Tags: usati per filtro e group-by (cardinalita limitata)
.tag("sensor_id", reading.sensor_id)
.tag("farm_id", reading.farm_id)
.tag("field_id", reading.field_id)
.tag("depth_cm", str(reading.depth_cm))
.tag("quality", reading.quality_flag)
# Fields: metriche numeriche
.field("moisture_pct", reading.moisture_pct)
.field("temperature_c", reading.temperature_c)
.field("ph", reading.ph)
.field("ec_ds_m", reading.ec_ds_m)
.field("battery_pct", float(reading.battery_pct))
.field("rssi_dbm", float(reading.rssi_dbm))
# Timestamp dalla lettura del sensore (non dall'arrivo)
.time(datetime.fromisoformat(reading.timestamp), WritePrecision.SECONDS)
)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
log.debug(f"Scritto su InfluxDB: {reading.sensor_id} @ {reading.timestamp}")
def close(self):
self.client.close()
# ── Bronze Layer su S3 ────────────────────────────────────────────────────────
class S3BronzeWriter:
"""Scrive dati grezzi su S3 (Bronze layer Medallion Architecture)"""
def __init__(self, bucket: str, region: str = "eu-south-1"):
self.s3 = boto3.client("s3", region_name=region)
self.bucket = bucket
def write_raw(self, raw_payload: str, topic: str, received_at: datetime):
"""
Salva il payload grezzo in formato NDJSON su S3.
Partitionamento per data: year/month/day/hour/
"""
prefix = received_at.strftime("year=%Y/month=%m/day=%d/hour=%H")
filename = f"{prefix}/{received_at.isoformat()}.json"
envelope = {
"topic": topic,
"received_at": received_at.isoformat(),
"payload": json.loads(raw_payload),
}
self.s3.put_object(
Bucket = self.bucket,
Key = filename,
Body = json.dumps(envelope).encode("utf-8"),
ContentType = "application/json",
)
log.debug(f"Bronze layer: scritto {filename}")
# ── Alert Manager ─────────────────────────────────────────────────────────────
class AlertManager:
"""Gestione alert critici con cooldown per evitare spam"""
def __init__(self):
self._last_alert: dict[str, datetime] = {}
self.cooldown_seconds = 300 # 5 minuti tra un alert e l'altro per sensor
def check_and_alert(self, reading: SoilReadingSchema):
if not reading.has_critical_alert():
return
sensor_key = reading.sensor_id
now = datetime.now(timezone.utc)
last = self._last_alert.get(sensor_key)
if last and (now - last).total_seconds() < self.cooldown_seconds:
return # In cooldown, skip
self._last_alert[sensor_key] = now
self._send_alert(reading)
def _send_alert(self, reading: SoilReadingSchema):
"""In produzione: invia SMS/email/push. Qui: log."""
alerts = []
if reading.moisture_pct < 10.0:
alerts.append(f"STRESS IDRICO: umidita {reading.moisture_pct}% sotto soglia critica 10%")
if reading.moisture_pct > 58.0:
alerts.append(f"SATURAZIONE: umidita {reading.moisture_pct}% sopra saturazione")
if reading.ph < 4.5:
alerts.append(f"pH CRITICO: {reading.ph} - suolo troppo acido")
if reading.ec_ds_m > 4.0:
alerts.append(f"SALINITA CRITICA: EC {reading.ec_ds_m} dS/m")
if reading.battery_pct < 10:
alerts.append(f"BATTERIA: {reading.battery_pct}% - sostituire")
for alert in alerts:
log.critical(f"ALERT [{reading.sensor_id}] {alert}")
# TODO: self.sms_client.send(...)
# TODO: self.email_client.send(...)
# ── Pipeline Consumer principale ──────────────────────────────────────────────
class AgriPipelineConsumer:
"""Consumer MQTT con validazione, storage e alerting integrati"""
def __init__(self):
self.influx = InfluxDBWriter(
url = "https://influxdb.azienda.it:8086",
token = "<INFLUX_TOKEN>",
org = "azienda-agricola",
bucket = "farm-sensors",
)
self.s3_bronze = S3BronzeWriter(bucket="farm-raw-data-bronze")
self.alert_mgr = AlertManager()
self.client = mqtt.Client(
client_id = "pipeline-consumer-001",
protocol = mqtt.MQTTv5,
callback_api_version = mqtt.CallbackAPIVersion.VERSION2,
)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.stats = {"received": 0, "valid": 0, "errors": 0, "alerts": 0}
def _on_connect(self, client, userdata, flags, rc, props):
log.info("Consumer connesso al broker")
# Sottoscrive a TUTTI i sensori suolo di TUTTE le farm
client.subscribe("farm/+/+/soil", qos=1)
client.subscribe("farm/+/+/$status", qos=1)
log.info("Sottoscritto a: farm/+/+/soil e farm/+/+/$status")
def _on_message(self, client, userdata, message):
received_at = datetime.now(timezone.utc)
self.stats["received"] += 1
raw_payload = message.payload.decode("utf-8")
try:
# 1. Parse JSON
data = json.loads(raw_payload)
# 2. Salva Bronze layer (dato grezzo, prima di qualsiasi trasformazione)
self.s3_bronze.write_raw(raw_payload, message.topic, received_at)
# 3. Valida con Pydantic
reading = SoilReadingSchema(**data)
self.stats["valid"] += 1
# 4. Scrivi su InfluxDB (time-series)
self.influx.write_soil_reading(reading)
# 5. Check alert
self.alert_mgr.check_and_alert(reading)
if reading.has_critical_alert():
self.stats["alerts"] += 1
log.info(
f"Processed | {reading.sensor_id} | "
f"moisture={reading.moisture_pct}% | "
f"quality={reading.quality_flag}"
)
except json.JSONDecodeError as e:
self.stats["errors"] += 1
log.error(f"JSON non valido da {message.topic}: {e}")
except ValidationError as e:
self.stats["errors"] += 1
log.error(f"Validazione fallita per {message.topic}: {e}")
# Salva comunque su Bronze (dato anomalo ma registrato)
except Exception as e:
self.stats["errors"] += 1
log.exception(f"Errore imprevisto: {e}")
def run(self):
self.client.connect("emqx.azienda-agricola.it", 8883)
log.info("Pipeline consumer avviata. Ctrl+C per fermare.")
try:
self.client.loop_forever()
except KeyboardInterrupt:
pass
finally:
self.influx.close()
log.info(
f"Stats finali: {self.stats}"
)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
consumer = AgriPipelineConsumer()
consumer.run()
農業データのメダリオン アーキテクチャ: ブロンズ、シルバー、ゴールド
メダリオン アーキテクチャ (Databricks によって導入されましたが、現在はデータ エンジニアリングの事実上の標準となっています) データを 3 つの段階的な品質レイヤーに編成します。農業IoTデータに応用して解決します 実際の問題: 異常値を送信するセンサー、RTC ドリフトの不正なタイムスタンプ、測定値 QoS 1 での MQTT 再送信用に複製されており、ダッシュボードごとに異なる集計が必要です リアルタイム ML モデルと長期 ML モデルの比較。
農業IoT向けのメダリオンアーキテクチャ
| レイヤー | 形式 | コンテンツ | 運営 | 保持 |
|---|---|---|---|---|
| ブロンズ (未加工) | JSON / 寄木細工 | 未加工の不変 MQTT ペイロード | 変換なし、保存するだけ | 5年(規定) |
| シルバー(クリーニング済み) | デルタ湖 / 氷山 | 正規化されたパターン、外れ値は削除 | 重複排除、外れ値フィルター、型キャスト | 3年 |
| ゴールド (分析) | 寄木細工 | 時間/日ごとの集計、ML 機能 | 集約、天気/衛星との結合 | 10年 |
# medallion_pipeline.py
# Pipeline Medallion per dati agricoli IoT con PySpark / pandas
# In produzione: usa Databricks, AWS Glue, o dbt su Spark
# Qui: versione pandas per sviluppo locale e testing
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path
import json
import hashlib
DATA_ROOT = Path("/data/farm")
# ── BRONZE LAYER ──────────────────────────────────────────────────────────────
def load_bronze(date: str) -> pd.DataFrame:
"""
Carica dati grezzi dal Bronze layer (JSON NDJSON).
Nessuna trasformazione: solo lettura e schema enforcement minimo.
"""
bronze_path = DATA_ROOT / "bronze" / date
records = []
for f in bronze_path.glob("**/*.json"):
with open(f) as fp:
envelope = json.load(fp)
records.append(envelope)
if not records:
return pd.DataFrame()
# DataFrame con schema minimo garantito
df = pd.json_normalize(records, sep="_")
df["_bronze_loaded_at"] = datetime.utcnow().isoformat()
df["_source_file"] = [str(f) for f in bronze_path.glob("**/*.json")]
return df
# ── SILVER LAYER ──────────────────────────────────────────────────────────────
def transform_bronze_to_silver(df_bronze: pd.DataFrame) -> pd.DataFrame:
"""
Trasformazioni Bronze → Silver:
1. Parse e normalizza timestamp
2. Rimuovi duplicati (QoS 1 può consegnare più volte)
3. Filtra outlier fisicamente impossibili
4. Cast tipi corretti
5. Aggiungi colonne derivate
"""
if df_bronze.empty:
return pd.DataFrame()
df = df_bronze.copy()
# 1. Parse timestamp
df["ts"] = pd.to_datetime(df["payload_timestamp"], utc=True, errors="coerce")
df = df.dropna(subset=["ts"])
# 2. Deduplica: stesso sensor_id + timestamp = stesso dato
df["_dedup_key"] = df.apply(
lambda r: hashlib.md5(
f"{r.get('payload_sensor_id', '')}{r.get('payload_timestamp', '')}".encode()
).hexdigest(),
axis=1
)
df = df.drop_duplicates(subset=["_dedup_key"], keep="first")
# 3. Filtro outlier fisici
df = df[df["payload_moisture_pct"].between(0, 100)]
df = df[df["payload_temperature_c"].between(-40, 80)]
df = df[df["payload_ph"].between(0, 14)]
df = df[df["payload_ec_ds_m"].between(0, 20)]
# 4. Flag outlier agronomici (mantieni, ma etichetta)
df["is_agronomic_outlier"] = (
~df["payload_moisture_pct"].between(5, 60) |
~df["payload_ph"].between(4.5, 8.5) |
(df["payload_ec_ds_m"] > 4.0)
)
# 5. Colonne derivate
df["date"] = df["ts"].dt.date
df["hour"] = df["ts"].dt.hour
df["month"] = df["ts"].dt.month
# 6. Rename colonne per schema pulito
df = df.rename(columns={
"payload_sensor_id": "sensor_id",
"payload_farm_id": "farm_id",
"payload_field_id": "field_id",
"payload_moisture_pct": "moisture_pct",
"payload_temperature_c": "temperature_c",
"payload_ph": "ph",
"payload_ec_ds_m": "ec_ds_m",
"payload_battery_pct": "battery_pct",
"payload_quality_flag": "quality_flag",
})
# Seleziona solo colonne rilevanti
cols = [
"sensor_id", "farm_id", "field_id", "ts", "date", "hour", "month",
"moisture_pct", "temperature_c", "ph", "ec_ds_m", "battery_pct",
"quality_flag", "is_agronomic_outlier"
]
df = df[[c for c in cols if c in df.columns]]
df["_silver_processed_at"] = datetime.utcnow().isoformat()
return df
# ── GOLD LAYER ────────────────────────────────────────────────────────────────
def transform_silver_to_gold(df_silver: pd.DataFrame) -> dict[str, pd.DataFrame]:
"""
Trasformazioni Silver → Gold:
Produce più tabelle di aggregazione per usi diversi.
"""
if df_silver.empty:
return {}
# Filtra outlier agronomici per analytics
df = df_silver[~df_silver["is_agronomic_outlier"]].copy()
gold_tables = {}
# ─── Aggregazione oraria per dashboard ───
hourly = df.groupby(["sensor_id", "farm_id", "field_id", "date", "hour"]).agg(
moisture_avg = ("moisture_pct", "mean"),
moisture_min = ("moisture_pct", "min"),
moisture_max = ("moisture_pct", "max"),
temp_avg = ("temperature_c", "mean"),
ph_avg = ("ph", "mean"),
ec_avg = ("ec_ds_m", "mean"),
readings_n = ("moisture_pct", "count"),
).reset_index()
hourly["moisture_avg"] = hourly["moisture_avg"].round(2)
gold_tables["hourly_aggregations"] = hourly
# ─── Aggregazione giornaliera per reporting agronomico ───
daily = df.groupby(["sensor_id", "farm_id", "field_id", "date"]).agg(
moisture_avg = ("moisture_pct", "mean"),
moisture_std = ("moisture_pct", "std"),
temp_min = ("temperature_c", "min"),
temp_max = ("temperature_c", "max"),
temp_avg = ("temperature_c", "mean"),
ph_avg = ("ph", "mean"),
ec_avg = ("ec_ds_m", "mean"),
stress_hours = ("moisture_pct", lambda x: (x < 20).sum()),
readings_n = ("moisture_pct", "count"),
).reset_index()
gold_tables["daily_agronomic"] = daily
# ─── Feature engineering per ML (predizione irrigazione) ───
ml_features = df.copy()
ml_features["moisture_lag_1h"] = ml_features.groupby("sensor_id")["moisture_pct"].shift(2)
ml_features["moisture_lag_3h"] = ml_features.groupby("sensor_id")["moisture_pct"].shift(6)
ml_features["moisture_trend"] = (
ml_features["moisture_pct"] - ml_features["moisture_lag_3h"]
)
ml_features["needs_irrigation"] = (ml_features["moisture_pct"] < 25.0).astype(int)
gold_tables["ml_irrigation_features"] = ml_features.dropna(
subset=["moisture_lag_1h", "moisture_lag_3h"]
)
return gold_tables
def run_medallion_pipeline(date: str):
"""Esegue la pipeline Medallion per una data specifica"""
log_prefix = f"[Medallion {date}]"
print(f"{log_prefix} Caricamento Bronze...")
bronze = load_bronze(date)
print(f"{log_prefix} Bronze: {len(bronze)} record")
print(f"{log_prefix} Trasformazione Silver...")
silver = transform_bronze_to_silver(bronze)
print(f"{log_prefix} Silver: {len(silver)} record (dopo dedup e filtro outlier)")
# Salva Silver
silver_path = DATA_ROOT / "silver" / date / "soil_readings.parquet"
silver_path.parent.mkdir(parents=True, exist_ok=True)
silver.to_parquet(silver_path, index=False)
print(f"{log_prefix} Trasformazione Gold...")
gold_tables = transform_silver_to_gold(silver)
for table_name, df in gold_tables.items():
gold_path = DATA_ROOT / "gold" / date / f"{table_name}.parquet"
gold_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(gold_path, index=False)
print(f"{log_prefix} Gold {table_name}: {len(df)} record")
print(f"{log_prefix} Pipeline completata.")
if __name__ == "__main__":
today = datetime.utcnow().strftime("%Y-%m-%d")
run_medallion_pipeline(today)
InfluxDB および Apache Kafka との統合
農業時系列用の InfluxDB
InfluxDB は、高頻度の IoT 時系列に最適なデータベースです。 PostgreSQLとは異なります
または MySQL を使用し、時系列での大量の書き込み、自動データ圧縮用に最適化されています。
ネイティブ集計関数を使用した履歴 (ダウンサンプリング) および時間間隔でのクエリ
(mean(), max(), moving_average()).
30 秒ごとに投稿する 50 個のセンサーを備えたファームの場合、InfluxDB が処理します 快適に 1秒あたり100回の書き込み コンシューマ ハードウェア上、保持ポリシーあり 自動: 30 日間の生データ、1 年間の時間ごとの集計、日次の集計 10年間。すべてはセンサーごとに年間数 MB のストレージに収まります。
# influxdb_queries.py
# Query Flux per analytics agricole su InfluxDB 2.x
from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta
client = InfluxDBClient(
url = "https://influxdb.azienda.it:8086",
token = "<TOKEN>",
org = "azienda-agricola",
)
query_api = client.query_api()
# ─── Query 1: Media umidita ultime 24 ore per campo ───────────────────────────
QUERY_MOISTURE_24H = """
from(bucket: "farm-sensors")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "moisture_pct")
|> filter(fn: (r) => r.field_id == "campo-nord")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> yield(name: "hourly_moisture")
"""
# ─── Query 2: Alert stress idrico (moisture < 20% nelle ultime 6h) ────────────
QUERY_DROUGHT_ALERT = """
from(bucket: "farm-sensors")
|> range(start: -6h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "moisture_pct")
|> filter(fn: (r) => r._value < 20.0)
|> group(columns: ["sensor_id", "field_id"])
|> count()
|> filter(fn: (r) => r._value > 3)
|> yield(name: "drought_sensors")
"""
# ─── Query 3: Trend temperatura settimanale con banda statistica ──────────────
QUERY_TEMP_WEEKLY = """
from(bucket: "farm-sensors")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "temperature_c")
|> aggregateWindow(every: 6h, fn: mean)
|> movingAverage(n: 4)
|> yield(name: "temp_trend_7d")
"""
# ─── Query 4: Stato batterie sensori (per manutenzione preventiva) ────────────
QUERY_BATTERY_STATUS = """
from(bucket: "farm-sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "battery_pct")
|> last()
|> filter(fn: (r) => r._value < 20)
|> sort(columns: ["_value"])
|> yield(name: "low_battery_sensors")
"""
def run_analytics():
print("=== Dashboard Analytics Agricole ===")
# Esegui query umidita
tables = query_api.query(QUERY_MOISTURE_24H)
for table in tables:
for record in table.records:
print(f" {record.get_time()} | "
f"sensor={record.values.get('sensor_id', 'N/A')} | "
f"moisture={record.get_value():.1f}%")
# Check alert stress idrico
drought_tables = query_api.query(QUERY_DROUGHT_ALERT)
for table in drought_tables:
for record in table.records:
print(f" ALERT STRESS IDRICO: sensor {record.values.get('sensor_id')} "
f"in field {record.values.get('field_id')}")
if __name__ == "__main__":
run_analytics()
高スケールのストリーム処理のための Apache Kafka
センサーの数が数百を超える場合、または 1 秒未満の遅延が必要な場合、 MQTT だけでは分散システムとして十分ではありません。 Apache Kafka がメッセージ バスとして登場 MQTT ブローカーと処理層の間のエンタープライズ。標準的なパターンは次のとおりです。 MQTT ブローカー → Kafka Connect (MQTT ソース コネクタ) → Kafka トピック → コンシューマ グループ.
Kafka は、このコンテキストで重要な利点をもたらします: メッセージの再生 (コンシューマーにバグがある場合、 すべてが再処理されます)、フィールド/地理的エリアによる分割、複数の独立したコンシューマー (InfluxDB ライター、アラート エンジン、ML 推論、データ レイク ライターは、同じデータを読み取ります。 干渉)、Kafka トランザクションによる正確に 1 回の保証。
# kafka_consumer.py
# Consumer Kafka per dati sensori agricoli
# Dipendenze: pip install confluent-kafka
from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import logging
log = logging.getLogger(__name__)
KAFKA_CONFIG = {
"bootstrap.servers": "kafka-broker-1:9092,kafka-broker-2:9092",
"group.id": "farm-analytics-consumer",
"auto.offset.reset": "earliest",
"enable.auto.commit": False, # Commit manuale per exactly-once
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "farm-consumer",
"sasl.password": "<KAFKA_PASSWORD>",
}
TOPICS = ["farm.soil.readings", "farm.weather.readings"]
def process_soil_message(data: dict) -> bool:
"""Elabora un messaggio sensore suolo"""
sensor_id = data.get("sensor_id", "unknown")
moisture = data.get("moisture_pct", 0)
# Routing logica: stress idrico → alert immediato
if moisture < 15.0:
log.warning(f"STRESS IDRICO critico: sensor {sensor_id} = {moisture}%")
# In produzione: pubblica su topic alert
return True
log.info(f"OK | sensor={sensor_id} | moisture={moisture}%")
return True
def run_kafka_consumer():
consumer = Consumer(KAFKA_CONFIG)
consumer.subscribe(TOPICS)
log.info(f"Consumer Kafka avviato su topics: {TOPICS}")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
log.debug(f"Raggiunto EOF: {msg.topic()}[{msg.partition()}]")
else:
raise KafkaException(msg.error())
continue
# Processa messaggio
try:
data = json.loads(msg.value().decode("utf-8"))
success = process_soil_message(data)
if success:
consumer.commit(message=msg) # Commit solo se processato correttamente
except json.JSONDecodeError as e:
log.error(f"JSON non valido: {e}")
consumer.commit(message=msg) # Commit anche errori per non bloccare
except KeyboardInterrupt:
log.info("Consumer fermato")
finally:
consumer.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
run_kafka_consumer()
Grafana ダッシュボードとアラート システム
Grafana は、IoT データをリアルタイムで視覚化するための標準ツールです。ネイティブに統合されます データソース経由で InfluxDB 2.x を使用すると、地理的マップを使用したインタラクティブなダッシュボードを構築できます。 フロントエンド コードを記述せずに、時系列チャートとステータス パネルを作成できます。
農業ビジネスの場合、標準ダッシュボードには次のものが含まれます。
- フィールドマップ: センサー オーバーレイを備えた Geomap プラグイン、水ストレスの色
- 湿度の時系列: 色付きのしきい値を含む 24 時間グラフ (赤 < 15%、黄色 < 25%)
- バッテリーゲージ: メンテナンス計画のためのすべてのセンサーのバッテリー状態
- ヒートマップ pH: キックの決定のためのピッチごとの pH ヒート マップ
- 水やり予報: 次の 48 時間に ML モデルが出力されます
# grafana_alert_rules.yaml
# Regole di alerting Grafana per agricoltura di precisione
apiVersion: 1
groups:
- name: farm-critical-alerts
interval: 1m
rules:
# Alert stress idrico critico
- uid: drought-critical
title: "Stress Idrico Critico"
condition: C
data:
- refId: A
datasourceUid: influxdb-farm
model:
query: |
from(bucket: "farm-sensors")
|> range(start: -30m)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "moisture_pct")
|> last()
- refId: C
datasourceUid: __expr__
model:
type: threshold
conditions:
- evaluator:
params: [15.0]
type: lt
query: { params: ["A"] }
noDataState: Alerting
execErrState: Alerting
for: 10m
annotations:
summary: "Sensore {{ $labels.sensor_id }} in stress idrico"
description: "Umidita {{ $values.A }}% sotto soglia critica 15%"
labels:
severity: critical
team: agronomist
# Alert batteria scarica
- uid: battery-low
title: "Batteria Sensore in Esaurimento"
condition: C
data:
- refId: A
datasourceUid: influxdb-farm
model:
query: |
from(bucket: "farm-sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "battery_pct")
|> last()
- refId: C
datasourceUid: __expr__
model:
type: threshold
conditions:
- evaluator:
params: [15]
type: lt
query: { params: ["A"] }
for: 5m
annotations:
summary: "Batteria bassa su {{ $labels.sensor_id }}"
labels:
severity: warning
team: maintenance
アンチパターン: ユニバーサル固定しきい値
初期の実装でよくある間違いは、すべての環境に同じ湿度しきい値を使用することです。 作物やあらゆる種類の土壌。 20% という臨界湿度しきい値は土壌にとって適切です トマトの多い粘土質の土壌には適していますが、ブドウの木が生えている砂質の土壌やトマトの場合は完全に間違っています。 園芸保育園。閾値はセンサー、培養、フェーズごとに設定可能である必要がある 季節学的。動的しきい値構成システムを実装します。ハードコード化されていません。 アラートコード。
導入、セキュリティ、スケーラビリティ
ローカル開発用の Docker Compose
クラウドに依存せずにスタック全体をローカルで開発およびテストするには、
docker-compose.yml これには、MQTT、InfluxDB、Grafana ブローカーが含まれます。
# docker-compose.yml
# Stack IoT agricolo completo per sviluppo locale
version: "3.9"
services:
# Broker MQTT - Eclipse Mosquitto
mosquitto:
image: eclipse-mosquitto:2.0
ports:
- "1883:1883" # MQTT plain (solo sviluppo)
- "8883:8883" # MQTT TLS
- "9001:9001" # WebSocket
volumes:
- ./config/mosquitto.conf:/mosquitto/config/mosquitto.conf
- ./certs:/mosquitto/certs:ro
- mosquitto_data:/mosquitto/data
- mosquitto_logs:/mosquitto/log
restart: unless-stopped
# Time-series database
influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: changeme123!
DOCKER_INFLUXDB_INIT_ORG: azienda-agricola
DOCKER_INFLUXDB_INIT_BUCKET: farm-sensors
DOCKER_INFLUXDB_INIT_RETENTION: 30d
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: my-super-secret-token
volumes:
- influxdb_data:/var/lib/influxdb2
restart: unless-stopped
# Dashboard e alerting
grafana:
image: grafana/grafana:10.4.0
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
GF_INSTALL_PLUGINS: grafana-worldmap-panel,grafana-clock-panel
volumes:
- grafana_data:/var/lib/grafana
- ./config/grafana/provisioning:/etc/grafana/provisioning
depends_on:
- influxdb
restart: unless-stopped
# Pipeline consumer (il nostro codice Python)
pipeline-consumer:
build:
context: .
dockerfile: Dockerfile.consumer
environment:
MQTT_HOST: mosquitto
MQTT_PORT: "1883"
INFLUX_URL: http://influxdb:8086
INFLUX_TOKEN: my-super-secret-token
INFLUX_ORG: azienda-agricola
INFLUX_BUCKET: farm-sensors
depends_on:
- mosquitto
- influxdb
restart: unless-stopped
volumes:
mosquitto_data:
mosquitto_logs:
influxdb_data:
grafana_data:
農業 IoT のセキュリティのベスト プラクティス
農業 IoT システムのセキュリティは過小評価されることがよくあります。センサーが侵害されると変化する可能性があります 異常な灌漑、廃棄物、または作物への損傷を引き起こします。最低限のもの:
農業IoTセキュリティチェックリスト
- 相互 TLS (mTLS): 各デバイスには一意のクライアント証明書があります。ブローカー ユーザー名/パスワードだけでなく、証明書によってデバイスを認証します。
-
トピック ACL: 各センサーは独自のトピックでのみ公開できます
(
farm/campo-nord/sensor-001/#)、他のセンサーからトピックを読み取らないでください。 - ファームウェアの署名: OTA (無線) アップデートには署名が必要です 暗号化して悪意のあるファームウェアを防止します。
- 資格情報のローテーション: 1 年間の有効期限を持つトークンと証明書、ローテーション ACME または AWS IoT 証明書ローテーション経由で自動。
- ネットワークのセグメンテーション: ネットワークから隔離された専用 VLAN 内の IoT デバイス 企業。ブローカーへのアクセスはゲートウェイからのみであり、インターネットから直接アクセスすることはありません。
- レート制限: ブローカーは、1 秒あたりのメッセージ数を制限します。 偶発的な浸水や攻撃を防ぐための装置。
イタリアの精密農業: 2025 年の機会とインセンティブ
イタリアはヨーロッパのアグリテックのパノラマの中で独特の立場にあり、食品部門を持っています。 世界最高級(DOP、IGP、DOC)、細分化された生産体制(中堅企業) EU 平均の 33 ヘクタールと比較して 11 ヘクタール) ですが、非常に強力な農業専門知識を備えており、世界で最初のです。 欧州の農業付加価値は2024年に424億ユーロ(前年比9%増)。
95%を占める農業中小企業にもIoTやAI技術を導入することが課題 生産的な生地の。 2025 年に利用可能な資金調達ツールは次のとおりです。
イタリアにおけるアグリテックとIoTの奨励金(2025年)
| 測定 | 金額・特典 | 受益者 | 有効期限 |
|---|---|---|---|
| 移行計画 5.0 | デジタル + エネルギー投資に対する 35 ~ 45% の税額控除 | すべての事業(農業を含む) | 2025/12/31 |
| PNRR ミッション 2.3 - 機械化 | 4億ユーロの返済不要の拠出金 | 4.0 機械向け農業ビジネス | 完売 (新たな電話が予想されます) |
| 税額控除 4.0 (旧称インダストリー 4.0) | 技術資本財の20% | 農業ATECOを持つ企業 | 延長2025年 |
| INAIL ISI 農業通知 | 最大 65% 返済不要 | 農業事業者および農業協同組合 | 毎年恒例(春のオープン) |
| PSP (CAP戦略計画2023-2027) | 精密農業に対するプレミアム付きエコ制度対策 | UAA を登録している企業 | 年次 (CAP アプリケーション) |
| PNRR アグリボルタコール | 農業に統合された太陽光発電システムに15億ユーロ | 農業関連事業 | 2025~2026年 |
完全な IoT システム (センサー、ゲートウェイ、ソフトウェア、 接続性)、総コストは 50,000ユーロ、移行計画 5.0 までカバーできます 22,500ユーロ (コストの 45%) 税額控除として、 水と肥料の節約により、投資は 2 ~ 3 年で回収可能 そして人力。
ケーススタディ: Masseria Pugliese 500 ヘクタール - オリーブの木と小麦
プーリアの農業会社の実際のケース (集約され匿名化されたデータ) 120 個の土壌水分センサー、8 つの気象観測所、および 自動灌漑システム:
- 初期投資: 85,000 ユーロ (ハードウェア + ソフトウェア + 設置)
- PNRR と Transition 4.0 への貢献: 38,000ユーロ (44.7%)
- 純投資: 47,000ユーロ
- 1 年目の節水: -42% (灌漑料金で年間 -18,000 ユーロ)
- 肥料の節約: -22% (-8,500 ユーロ/年)
- 穀物収量の増加: +11% (+15,000 ユーロ/年売上高)
- 回収期間: 2.3年
- 5 年間の ROI: 342%
避けるべきベストプラクティスとアンチパターン
統合されたベストプラクティス
-
バージョン管理を伴う進化図: 常にフィールドを含める
schema_versionMQTT ペイロード内。新しいフィールドを追加するときは、バージョンを増やします。消費者が管理する 重大な変更を加えずに異なるバージョンを作成できます。 - サーバーではなくデバイスからのタイムスタンプ: タイムスタンプは同じである必要があります センサーがブローカーに到着したときではなく、読み取り時のセンサーです。ネットワーク遅延 それは変数であり、時系列を改ざんする可能性があります。
- デフォルトでは QoS 1、重要なコマンドのみ QoS 2: QoS 2 4倍 ネットワークトラフィック。作動コマンド (バルブの開放、ポンプの始動) にのみ使用してください。 重複が認められない場合。
- 必須のローカルバッファ: 各ゲートウェイにはローカル バッファが必要です (SQLite、NDJSON ファイル) を使用して、ネットワークの切断中にデータの収集を継続します。 農業では、つながりは信頼できません。
- 定期的なセンサーの校正: 土壌水分センサーの由来 時間の経過とともに (通常は年間 2 ~ 5%)半年ごとの校正をスケジュールして実装する 隣接するセンサーを比較することによる自動ドリフト検出。
- InfluxDB 保持ポリシー: 自動ダウンサンプリングの構成: 生データ 30 日間、時間ごとの集計、1 年間、毎日の集計は無制限。保存 長期的な分析に必要な粒度を失うことなくストレージを利用できます。
避けるべき重大なアンチパターン
-
MQTT フラット トピック (階層なし): のようなトピック
sensor_001_moistureの代わりにfarm/campo-nord/sensor-001/soil/moisture使用できなくする 集約されたサブスクリプションと数百のセンサーに拡張するためのワイルドカード。 - イベント駆動型ではなくポーリング: N ごとに MQTT ブローカーにクエリを実行しない 新しいデータを「要求」するまでの秒数。 MQTT およびプッシュベース: パブリッシャーは、必要なときに送信します。 データを受信すると、加入者はすぐに受信します。ポーリングはプロトコルの利点をすべて無効にします。
- スキーマ検証なし: ブローカーからの JSON を何もせずに受け入れます。 スキーマの検証。数値や文字列の代わりに null、文字列を送信する欠陥のあるセンサー 範囲外の値 (温度 -999) は、検証なしでデータ レイク全体を汚染します。
- ブロンズからゴールドへの直接書き込み: シルバーレイヤーをスキップして書き込みます 生データを直接集計に取り込みます。ロジックのバグを発見したとき 変換する場合、生データがまだ変換されていないため、もう一度再処理する必要があります。 保存されています。
- 認証なしのパブリック ブローカー: パブリック MQTT ブローカーを使用する (test.mosquitto.org) または TLS/認証のないプライベート ブローカー。農業データは、 会社の機密資産。競合他社があなたのトピックを購読して読むことができます 御社の生産状況をリアルタイムに把握します。
- ブローカー上の単一障害点: 単一の MQTT ブローカー クラスタリングまたはフェイルオーバー。 EMQX と HiveMQ はネイティブ クラスタリングをサポートします。蚊が必要とするもの HA 用の外部ソリューション。ブローカーがダウンすると、すべてのデータ収集がブロックされます。
拡張性: 10 ~ 10,000 個のセンサー
説明されているアーキテクチャは、変更を加えることなく最大数千のセンサーまで直線的に拡張できます。 建築的な。 MQTT ブローカーのハードウェア構成の一般的な数値は次のとおりです。
構成用の MQTT ブローカー機能
| 構成 | ブローカ | 最大センサー数 | メッセージ/秒 | 月額費用 |
|---|---|---|---|---|
| 開発/テスト | Raspberry Pi 4 のモスキート | 100 | 500 | 0 (ハードウェア所有) |
| 中小企業 (50 ~ 500 ヘクタール) | VPS 4 vCPU/8GB 上の EMQX | 1,000 | 5,000 | 40~80ユーロ |
| 大企業 (500 ヘクタール以上) | EMQX クラスター 3 ノード | 10,000 | 50,000 | 200~400ユーロ |
| 協同組合/地区 | HiveMQ エンタープライズ / AWS IoT コア | 100,000+ | 無制限 | 使用ごとに支払い |
AWS IoT Core と Azure IoT Hub は、国内規模または複数の企業規模向けのクラウドネイティブな選択肢です。 スケーラビリティを自動的に管理し、99.99% の SLA を提供し、ネイティブに統合します。 それぞれのエコシステム (AWS の AWS Timestream、Lambda、S3、Azure Data Explorer、Stream) と連携 分析、Azure のデータ レイク)。通常、コストは 100 万メッセージあたり 1 ~ 5 ドルです。 最初のテストには十分な無料枠が付いています。
結論と次のステップ
私たちは、現場のセンサーから精密農業のための IoT パイプライン全体を構築しました。 適切な QoS、Pydantic 検証を備えた MQTT 経由で構造化データ レイクに送信 時系列用の InfluxDB ストレージとデータ レイク用の Medallion アーキテクチャ。 Python コード 示されているものは実稼働準備ができており、実際のシステムで見られる基本的なパターンをカバーしています。
持っていくべき重要なポイント:
- MQTT は農業 IoT に最適なプロトコルです。軽量、非同期で、不安定な接続をサポートします。
- オープンフィールドには LoRaWAN、温室には WiFi/RS-485: 単一のユニバーサル無線プロトコルはありません
- スキーマ検証 (Pydantic) はオプションではありません: センサーは思ったより頻繁に不正なデータを送信します
- メダリオン アーキテクチャ (ブロンズ/シルバー/ゴールド) は、再処理可能性と進歩的なデータ品質を保証します。
- InfluxDB + Grafana は、ライセンス費用なしでリアルタイム監視を行うための最小限のスタックです
- PNRR と Transition 5.0 のインセンティブは、イタリアの農業ビジネスへの投資の最大 45% をカバーします。
FoodTech シリーズの次回の記事では、テンプレートを適用する方法について説明します。 機械学習のエッジ ゲートウェイ上で作物を直接監視するため、 遅延を数分からミリ秒に短縮し、重要なイベントに対するリアルタイムの対応を可能にします。 初期段階での突然の霜や真菌の攻撃など。
リソースと洞察
- 公式の paho-mqtt ドキュメント: eclipse.dev/パホ
- EMQX ブローカー (オープンソース、クラスター対応): emqx.io
- InfluxDB 2.x ドキュメント: docs.influxdata.com
- 移行計画 5.0 MIMIT: mimit.gov.it
- ISTAT 精密農業 2024: isstat.it
- Databricks メダリオン アーキテクチャ: databricks.com
他のシリーズの関連記事
- MLOps: 収量予測 ML モデルを本番環境にデプロイする方法
- データ&AIビジネス - 製造業におけるAI: Kafka と OPC-UA を使用して業界に適用される IoT パターン
- AIエンジニアリング: 農学者向けの仮想アシスタントのための農学文書に関する RAG







