Vertical Farming Automation: Robotic Control via API
A decommissioned industrial warehouse in Cavenago di Brianza, just outside Milan. Inside, 9,000 square meters of vertical shelving where lettuce, basil and arugula grow under blue-red LED lights without ever seeing the sun. Temperature held at a constant 21 degrees Celsius, humidity at 70%, CO2 at 1,200 ppm. Every plant has a personalized light recipe, every growing row communicates with a central system via API, every robot knows exactly where to go in real time. This is Planet Farms, the Italian vertical farm that in 2025 became one of the global benchmarks for automated indoor agriculture.
Vertical farming is no longer a laboratory experiment. In 2025, the global market is worth $9.62 billion with projections reaching $39.2 billion by 2033 (CAGR 19.3%). Next-generation vertical farms use 95% less water than traditional agriculture, produce year-round regardless of climate, eliminate pesticides, and achieve yields per square meter up to 100 times higher than open-field farming. But these results are only possible with industrial-grade software and robotics infrastructure.
In this article we build the complete software architecture for a vertical farm: from environmental sensors to a Python PID controller, from REST API design with FastAPI to ROS2 integration for robots, from digital twin to reinforcement learning-based optimization pipelines. Working code, production-tested architectures, and real numbers from the Italian and global ecosystem.
What You Will Learn in This Article
- Complete software architecture for a vertical farm: sensors, controllers, SCADA, cloud, AI
- LED spectrum management: PAR, DLI, light recipes for lettuce, basil and strawberries
- Python PID controller implementation for temperature, CO2, humidity and irrigation
- REST API design with FastAPI for managing crop recipes and actuator control
- ROS2 integration for automated seeding, transplanting and harvesting robots
- Farm digital twin: plant growth simulation and layout optimization
- Reinforcement learning for light recipe optimization
- IoT infrastructure: Modbus RTU, MQTT, OPC-UA, industrial gateways
- Economic analysis: CAPEX, OPEX, break-even and comparison with outdoor farming
- Case studies: Planet Farms and Agricola Moderna, Italy's leading vertical farms
FoodTech Series - All Articles
| # | Article | Level | Status |
|---|---|---|---|
| 1 | IoT Pipeline for Precision Agriculture with Python and MQTT | Advanced | Published |
| 2 | Edge ML for Crop Monitoring: Computer Vision in the Field | Advanced | Published |
| 3 | Satellite APIs and Vegetation Indices: NDVI with Python and Sentinel-2 | Intermediate | Published |
| 4 | Blockchain Traceability in Food: from Field to Supermarket | Intermediate | Published |
| 5 | Computer Vision for Quality Control in the Food Industry | Advanced | Published |
| 6 | FSMA and Digital Compliance: Automating Regulatory Processes | Intermediate | Published |
| 7 | Vertical Farming Automation: Robotic Control via API (you are here) | Advanced | Current |
| 8 | Demand Forecasting for Food Retail with Prophet and LightGBM | Intermediate | Coming soon |
| 9 | Farm Intelligence Dashboard: Real-Time Analytics with Grafana | Intermediate | Coming soon |
| 10 | Food Supply Chain Optimization: ML for Waste Reduction | Intermediate | Coming soon |
The Vertical Farming Market in 2025: Growth and Technology Drivers
Vertical farming went through a consolidation phase in 2023-2024, with several large North American players (AeroFarms, AppHarvest, Bowery Farming) filing for bankruptcy under the weight of extremely high CAPEX and exploding energy costs. But the market did not stop: it restructured around more efficient models, with a Darwinian selection that rewarded those who built solid unit economics before scaling.
In 2025, the numbers tell a story of growing maturity: the global market is worth $9.62 billion and will grow to $39.2 billion by 2033. Another estimate (Maximize Market Research) places it at $8 billion in 2025 with projections of $39.7 billion by 2032 at a CAGR of 25.7%. The variance in estimates reflects the difficulty of classifying exactly what constitutes "vertical farming" (only indoor stacked trays? also advanced greenhouses?), but the trend is unequivocal. The related agricultural robotics market is worth $10.23 billion in 2025, growing to $28.2 billion by 2030.
Vertical Farming vs Traditional Agriculture: Performance Comparison
| Parameter | Outdoor Farming | Greenhouse | Vertical Farm |
|---|---|---|---|
| Water consumption (relative) | 100% | 30-40% | 5-10% |
| Yield per m² (lettuce) | ~2 kg/m²/year | ~15 kg/m²/year | ~150-200 kg/m²/year |
| Production cycles/year | 1-3 | 4-8 | 12-18 |
| Pesticides | High need | Reduced | Zero |
| Climate dependency | Total | Partial | None |
| Land use | 1x | 1x | 0.01-0.05x |
| Energy required | Low | Medium | High (LED + HVAC) |
| Production cost (lettuce) | $0.5-1/kg | $1.5-3/kg | $4-8/kg |
The growth drivers in 2025 are four: first, the cost of high-efficiency LEDs has dropped 70% over the past decade, making lighting CAPEX much more accessible. Second, AI-based control systems enable energy optimizations that were unthinkable just 3-4 years ago. Third, demand for local, fresh, pesticide-free produce is steadily growing, especially in urban areas. Fourth, robotic modules for seeding, transplanting and harvesting are now available at industrial prices, no longer limited to laboratory settings.
Software Architecture for Vertical Farms: The Complete Stack
A modern vertical farm is essentially a cyber-physical system: every physical decision (turning on a LED, opening a valve, moving a robot) is the result of a software computation. The architecture must be real-time for control, reliable for crop safety, and scalable to manage hundreds of growing zones.
End-to-End Architectural Stack
+-----------------------------------------------------------------------+
| LAYER 1: FIELD DEVICES |
| [Temp/RH Sensors] [CO2 Sensor] [PAR Meter] [Nutrient EC/pH] |
| [Flow Sensor] [RGB-D Camera] [Weight Sensor] [RFID Tray] |
| | | | | |
| +------------------+-------------+--------------+ |
| Modbus RTU / RS-485 / I2C / SPI |
+-----------------------------------------------------------------------+
|
+-----------------------------------------------------------------------+
| LAYER 2: EDGE CONTROLLER |
| [Siemens S7-1500 PLC / Beckhoff CX / Raspberry] |
| - PID loops for temperature, CO2, humidity |
| - Light recipe scheduling (LED driver DMX/PWM) |
| - NFT/DWC irrigation cycle management |
| - Local offline-tolerant buffer |
| - OPC-UA Server / MQTT Publisher |
+-----------------------------------------------------------------------+
|
+-----------------------------------------------------------------------+
| LAYER 3: SCADA / MES |
| [Ignition SCADA / custom Python SCADA] |
| - Multi-zone real-time supervision |
| - Time-series historization (InfluxDB/TimescaleDB) |
| - Alarms and notifications (temp, EC, pH out-of-range) |
| - Crop recipes and batch scheduling |
+-----------------------------------------------------------------------+
|
+-----------------------------------------------------------------------+
| LAYER 4: CLOUD PLATFORM |
| [FastAPI Backend] [Message Broker MQTT/Kafka] [PostgreSQL] |
| - REST API for ERP/WMS/retail integration |
| - Recipe, batch, inventory, order management |
| - OAuth2 authentication, RBAC, audit log |
+-----------------------------------------------------------------------+
|
+-----------------------------------------------------------------------+
| LAYER 5: AI / ANALYTICS |
| [ML Pipeline] [Digital Twin] [Computer Vision] [RL Optimizer] |
| - Light recipe optimization (RL) |
| - Yield prediction and time-to-harvest |
| - Anomaly detection (sensors + vision) |
| - Growth simulation (digital twin) |
+-----------------------------------------------------------------------+
The choice between an industrial PLC (Siemens S7, Beckhoff) and a single-board computer (Raspberry Pi 4, BeagleBone) depends on the required reliability level. For commercial farms with high-value crops, an IEC 61131-3 certified PLC with hardware redundancy is the right choice. For prototypes and experimental farms, a Python solution on embedded hardware is more flexible and faster to develop.
Environmental Control Systems: LED, HVAC, CO2 and Irrigation
Environmental control is the operational heart of a vertical farm. Four parameters dominate: light spectrum and intensity, air temperature, CO2 concentration, and nutrient solution composition. Each requires a dedicated control loop.
LED Spectrum Management: PAR, DLI and Light Recipes
Plants do not use all visible light in the same way. The photosynthetically active range runs from 400 to 700 nm (PAR - Photosynthetically Active Radiation). Within this range, blue (400-500 nm) regulates leaf morphology and aromatic compound synthesis; red (600-700 nm) is the primary photosynthesis driver; far-red (700-800 nm) influences flowering and plant geometry through the phytochrome system.
DLI (Daily Light Integral) measures the total quantity of PAR photons a plant receives in 24 hours, expressed in mol/m²/day. It is the most important metric for sizing light recipes. Research published in Nature Scientific Reports in 2025 shows that LEDs optimized for vertical farming produce up to 32% higher yields compared to standard spectra, with higher fresh weight in lettuce and basil.
Light Parameters for Main Crops
| Crop | PPFD (µmol/m²/s) | DLI Target (mol/m²/day) | Optimal Spectrum | Photoperiod |
|---|---|---|---|---|
| Lettuce (leaf) | 150-250 | 15-18 | R:B = 4:1, +far-red 5% | 16-18h light |
| Lettuce (head) | 200-300 | 17-22 | R:B = 3:1, UV 380nm 2% | 16h light |
| Basil | 200-300 | 14-17 | R:B = 3:1, blue 20-25% | 16h light |
| Spinach | 150-200 | 12-17 | R:B = 4:1 | 14-16h light |
| Strawberry (vegetative) | 200-300 | 15-20 | R:B = 3:1 | 16h light |
| Strawberry (flowering) | 300-400 | 20-25 | R:B:FR = 3:1:0.5 | 12h light |
| Microgreens | 100-200 | 8-12 | White full-spectrum | 16h light |
| Herbs | 200-250 | 14-16 | Blue 15%, R dominant | 16h light |
The LED management controller must translate these recipes into PWM signals toward LED drivers. Each growing zone can have a different recipe, and the recipe can change during the crop cycle (e.g., increased blue in the last 48 hours to intensify basil aromas). Here is a complete Python PID controller implementation:
"""
Vertical Farm Environmental Controller
PID controller for LED management, CO2, temperature and irrigation
"""
import time
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
logger = logging.getLogger(__name__)
class CropStage(Enum):
GERMINATION = "germination"
SEEDLING = "seedling"
VEGETATIVE = "vegetative"
MATURATION = "maturation"
HARVEST_READY = "harvest_ready"
@dataclass
class LightRecipe:
"""Light recipe for a crop at a given growth stage"""
crop_name: str
stage: CropStage
ppfd_target: float # µmol/m²/s
dli_target: float # mol/m²/day
photoperiod_hours: float # hours of light per day
spectrum_red_pct: float # % red channel (620-700nm)
spectrum_blue_pct: float # % blue channel (400-500nm)
spectrum_white_pct: float # % full-spectrum white LED
spectrum_farred_pct: float # % far-red (700-800nm)
spectrum_uv_pct: float = 0.0
def validate(self) -> bool:
total = (self.spectrum_red_pct + self.spectrum_blue_pct +
self.spectrum_white_pct + self.spectrum_farred_pct +
self.spectrum_uv_pct)
return abs(total - 100.0) < 0.1
def ppfd_from_dli(self) -> float:
"""Calculate target PPFD from photoperiod hours and DLI"""
photoperiod_seconds = self.photoperiod_hours * 3600
return (self.dli_target * 1_000_000) / photoperiod_seconds
@dataclass
class PIDController:
"""
Generic PID controller for environmental parameters.
Uses anti-windup to prevent integrator saturation.
"""
kp: float
ki: float
kd: float
setpoint: float
output_min: float = 0.0
output_max: float = 100.0
_integral: float = field(default=0.0, init=False)
_last_error: float = field(default=0.0, init=False)
_last_time: float = field(default_factory=time.time, init=False)
def compute(self, measured_value: float) -> float:
current_time = time.time()
dt = current_time - self._last_time
if dt <= 0:
return self._last_output if hasattr(self, '_last_output') else 0.0
error = self.setpoint - measured_value
# Proportional
p_term = self.kp * error
# Integral with anti-windup (clamping)
self._integral += error * dt
i_term = self.ki * self._integral
# Clamp integral to prevent windup
i_max = (self.output_max - self.output_min) / self.ki if self.ki != 0 else 1000
self._integral = max(-i_max, min(i_max, self._integral))
i_term = self.ki * self._integral
# Derivative
d_term = self.kd * (error - self._last_error) / dt if dt > 0 else 0.0
output = p_term + i_term + d_term
output = max(self.output_min, min(self.output_max, output))
self._last_error = error
self._last_time = current_time
self._last_output = output
return output
class EnvironmentalController:
"""
Main controller for a growing zone.
Manages temperature, CO2, humidity and irrigation via PID.
"""
def __init__(self, zone_id: str, recipe: LightRecipe):
self.zone_id = zone_id
self.recipe = recipe
# Temperature PID: setpoint 21 C, band +/- 1 C
self.temp_pid = PIDController(
kp=2.0, ki=0.5, kd=0.1,
setpoint=21.0,
output_min=-100.0, # cooling
output_max=100.0 # heating
)
# CO2 PID: setpoint 1200 ppm for accelerated growth
self.co2_pid = PIDController(
kp=0.5, ki=0.1, kd=0.05,
setpoint=1200.0,
output_min=0.0,
output_max=100.0 # % CO2 valve opening
)
# Relative humidity PID: setpoint 70%
self.humidity_pid = PIDController(
kp=1.5, ki=0.3, kd=0.05,
setpoint=70.0,
output_min=0.0,
output_max=100.0
)
def compute_led_pwm(self, current_hour: float) -> dict:
"""
Calculate PWM duty cycle for each LED channel
based on time of day and recipe.
"""
# Determine if we are in the active photoperiod
# Photoperiod: 06:00 - (6 + photoperiod_hours)
start_hour = 6.0
end_hour = start_hour + self.recipe.photoperiod_hours
if not (start_hour <= current_hour < end_hour):
return {
'red': 0.0, 'blue': 0.0,
'white': 0.0, 'farred': 0.0, 'uv': 0.0
}
# Calculate normalized intensity (0-1) from target PPFD
# Assuming 100% PWM = 600 µmol/m2/s
max_ppfd = 600.0
intensity = min(self.recipe.ppfd_target / max_ppfd, 1.0)
return {
'red': round(intensity * self.recipe.spectrum_red_pct / 100, 4),
'blue': round(intensity * self.recipe.spectrum_blue_pct / 100, 4),
'white': round(intensity * self.recipe.spectrum_white_pct / 100, 4),
'farred': round(intensity * self.recipe.spectrum_farred_pct / 100, 4),
'uv': round(intensity * self.recipe.spectrum_uv_pct / 100, 4),
}
async def control_loop(self, sensor_reader, actuator_writer, interval: float = 30.0):
"""
Async control loop: reads sensors, computes PID, writes actuators.
Default frequency: every 30 seconds.
"""
logger.info(f"Starting control loop for zone {self.zone_id}")
while True:
try:
# Read sensors
sensors = await sensor_reader.read_zone(self.zone_id)
# Compute PID outputs
temp_output = self.temp_pid.compute(sensors['temperature'])
co2_output = self.co2_pid.compute(sensors['co2_ppm'])
humidity_output = self.humidity_pid.compute(sensors['humidity_rh'])
# Compute LED PWM
from datetime import datetime
current_hour = datetime.now().hour + datetime.now().minute / 60.0
led_pwm = self.compute_led_pwm(current_hour)
# Write actuators
await actuator_writer.set_hvac(self.zone_id, temp_output, humidity_output)
await actuator_writer.set_co2_valve(self.zone_id, co2_output)
await actuator_writer.set_led_channels(self.zone_id, led_pwm)
logger.debug(
f"Zone {self.zone_id} | "
f"T={sensors['temperature']:.1f}C (PID:{temp_output:.1f}%) | "
f"CO2={sensors['co2_ppm']:.0f}ppm (valve:{co2_output:.1f}%) | "
f"RH={sensors['humidity_rh']:.1f}% | "
f"LED R:{led_pwm['red']:.2f} B:{led_pwm['blue']:.2f}"
)
except Exception as e:
logger.error(f"Control loop error zone {self.zone_id}: {e}")
await asyncio.sleep(interval)
# --- Standard recipes for common crops ---
LETTUCE_VEGETATIVE = LightRecipe(
crop_name="Lollo Lettuce",
stage=CropStage.VEGETATIVE,
ppfd_target=200.0,
dli_target=17.0,
photoperiod_hours=16.0,
spectrum_red_pct=65.0,
spectrum_blue_pct=20.0,
spectrum_white_pct=10.0,
spectrum_farred_pct=5.0,
spectrum_uv_pct=0.0,
)
BASIL_VEGETATIVE = LightRecipe(
crop_name="Genovese Basil",
stage=CropStage.VEGETATIVE,
ppfd_target=250.0,
dli_target=15.0,
photoperiod_hours=16.0,
spectrum_red_pct=60.0,
spectrum_blue_pct=25.0, # elevated blue for aroma compounds
spectrum_white_pct=10.0,
spectrum_farred_pct=5.0,
spectrum_uv_pct=0.0,
)
STRAWBERRY_FLOWERING = LightRecipe(
crop_name="Elsanta Strawberry",
stage=CropStage.MATURATION,
ppfd_target=350.0,
dli_target=22.0,
photoperiod_hours=12.0, # short photoperiod triggers flowering
spectrum_red_pct=55.0,
spectrum_blue_pct=20.0,
spectrum_white_pct=20.0,
spectrum_farred_pct=5.0,
spectrum_uv_pct=0.0,
)
Irrigation Systems: NFT, DWC and Aeroponics
The three dominant hydroponic technologies in vertical farms have different control architectures, each with their own critical parameters to monitor and regulate.
Hydroponic Systems Comparison
| System | Ideal Crops | Critical Parameters | Control Complexity | Water Consumption |
|---|---|---|---|---|
| NFT (Nutrient Film Technique) | Lettuce, herbs | Flow rate, channel slope, EC, pH | Medium | Minimal (recirculation) |
| DWC (Deep Water Culture) | Lettuce, spinach | Oxygenation (DO), EC, pH, temperature | Low | Low |
| Aeroponics | Strawberries, roots, herbs | Mist cycle, pressure, EC, pH | High | Minimal (90% vs soil) |
| Substrate (coconut/rockwool) | Tomatoes, peppers | Irrigation cycles, EC, pH, drainage | Medium | Moderate |
"""
Irrigation Controller for NFT system
Pump management, EC/pH monitoring, nutrient dosing
"""
import asyncio
from dataclasses import dataclass
from typing import Optional
import logging
logger = logging.getLogger(__name__)
NUTRIENT_TARGETS = {
"lettuce": {"ec_ms_cm": 1.6, "ph": 6.0, "temp_c": 20.0},
"basil": {"ec_ms_cm": 1.8, "ph": 6.0, "temp_c": 21.0},
"spinach": {"ec_ms_cm": 2.0, "ph": 6.2, "temp_c": 20.0},
"strawberry": {"ec_ms_cm": 2.2, "ph": 5.8, "temp_c": 18.0},
"herbs": {"ec_ms_cm": 1.4, "ph": 6.0, "temp_c": 21.0},
}
@dataclass
class NFTController:
zone_id: str
crop_type: str
flow_rate_lpm: float = 1.5 # liters/minute per channel
channel_slope_pct: float = 2.0 # channel slope in %
def get_targets(self) -> dict:
return NUTRIENT_TARGETS.get(self.crop_type, NUTRIENT_TARGETS["lettuce"])
async def check_and_adjust(self, ec_sensor: float, ph_sensor: float,
ec_doser, ph_doser) -> dict:
targets = self.get_targets()
actions = {}
# EC control
ec_delta = targets["ec_ms_cm"] - ec_sensor
if abs(ec_delta) > 0.2:
if ec_delta > 0:
# EC too low: add nutrient concentrate
dose_ml = ec_delta * 50 # ml of A+B concentrate
await ec_doser.dose(zone=self.zone_id, ml=dose_ml, solution="AB")
actions["ec_dosing"] = f"+{dose_ml:.1f}ml AB"
else:
# EC too high: dilute with RO water
await ec_doser.dose_water(zone=self.zone_id, ml=abs(ec_delta) * 100)
actions["ec_dilution"] = f"+{abs(ec_delta)*100:.0f}ml H2O"
# pH control
ph_delta = targets["ph"] - ph_sensor
if abs(ph_delta) > 0.3:
if ph_delta > 0:
# pH too low: add pH-up (KOH)
dose_ml = abs(ph_delta) * 10
await ph_doser.dose(zone=self.zone_id, ml=dose_ml, solution="ph_up")
actions["ph_adjust"] = f"pH-up +{dose_ml:.1f}ml"
else:
# pH too high: add pH-down (H3PO4)
dose_ml = abs(ph_delta) * 10
await ph_doser.dose(zone=self.zone_id, ml=dose_ml, solution="ph_down")
actions["ph_adjust"] = f"pH-down +{dose_ml:.1f}ml"
logger.info(
f"NFT zone {self.zone_id} | "
f"EC: {ec_sensor:.2f} (target {targets['ec_ms_cm']:.2f}) | "
f"pH: {ph_sensor:.2f} (target {targets['ph']:.2f}) | "
f"Actions: {actions}"
)
return actions
Robotics and Automation: ROS2 in a Vertical Farm
Robotics is the most transformative factor in vertical farming in 2025. The most labor-intensive manual operations are seeding (planting seeds in trays), transplanting (moving seedlings to final growing racks), harvesting, and packaging. A worker can transplant about 500-700 plants per hour; a transplanting robot operates at 2,000-3,000 plants/hour with an error rate below 1%. With 30,000 lettuce trays per day (as in the case of Agricola Moderna in Agnadello), robotics is not a choice, it is a necessity.
ROS2 (Robot Operating System 2) has become the de facto standard for robotic programming in indoor environments. Compared to ROS1, it offers native real-time support (DDS middleware), improved security architecture, native multi-robot support and managed lifecycle nodes. The node and topic structure makes it possible to cleanly separate motion planning logic, motor control, computer vision, and the interface with the farm management system.
"""
ROS2 Node for Harvesting Robot in Vertical Farm
Manages path planning, tray pick-up and delivery
"""
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from geometry_msgs.msg import Pose, PoseStamped
from std_msgs.msg import String, Bool
from sensor_msgs.msg import Image
import json
import asyncio
# Custom messages for the farm (defined in farm_interfaces package)
# from farm_interfaces.msg import TrayInfo, HarvestStatus
# from farm_interfaces.action import HarvestTray
# from farm_interfaces.srv import GetZoneLayout
class HarvestingRobotNode(Node):
"""
ROS2 node for harvesting robot in vertical farm.
Interfaces with:
- SCADA system to receive harvest jobs
- Robotic arm controller (MoveIt2)
- Conveyor system for tray delivery
- Computer vision for maturity verification
"""
def __init__(self):
super().__init__('harvesting_robot_node')
# Robot status publisher
self.status_pub = self.create_publisher(
String, '/farm/robot/harvest/status', 10
)
# Subscriber for harvest jobs from SCADA
self.job_sub = self.create_subscription(
String, '/farm/scada/harvest_jobs',
self.on_harvest_job, 10
)
# Subscriber for end-effector camera image
self.camera_sub = self.create_subscription(
Image, '/robot/camera/raw',
self.on_camera_frame, 10
)
# Client for zone layout service
# self.layout_client = self.create_client(GetZoneLayout, '/farm/zone/layout')
self.current_job: dict = {}
self.is_busy = False
self.get_logger().info('HarvestingRobotNode started')
def on_harvest_job(self, msg: String):
"""Receives harvest job from SCADA"""
if self.is_busy:
self.get_logger().warn('Robot busy, job ignored')
return
try:
job = json.loads(msg.data)
self.get_logger().info(
f"Job received: zone={job['zone_id']}, "
f"tray={job['tray_id']}, "
f"crop={job['crop_type']}"
)
self.current_job = job
self.is_busy = True
# Start harvest sequence in separate thread
self.executor.create_task(self.execute_harvest(job))
except (json.JSONDecodeError, KeyError) as e:
self.get_logger().error(f"Malformed job: {e}")
async def execute_harvest(self, job: dict) -> bool:
"""
Complete harvest sequence:
1. Navigate to target zone
2. Verify maturity with computer vision
3. Pick tray with robotic arm
4. Transport to output conveyor
5. Update SCADA
"""
try:
# Step 1: Navigation
self.publish_status("NAVIGATING", job)
success = await self.navigate_to_zone(job['zone_id'], job['shelf_row'])
if not success:
self.publish_status("NAV_FAILED", job)
return False
# Step 2: Maturity check (computer vision)
maturity_score = await self.check_crop_maturity(job['tray_id'])
if maturity_score < 0.85:
self.get_logger().warn(
f"Tray {job['tray_id']}: maturity {maturity_score:.2f} "
f"below threshold 0.85, harvest postponed"
)
self.publish_status("MATURITY_INSUFFICIENT", job)
self.is_busy = False
return False
# Step 3: Harvest
self.publish_status("HARVESTING", job)
await self.pick_tray(job['tray_id'], job['shelf_position'])
# Step 4: Deliver to conveyor
self.publish_status("DELIVERING", job)
await self.deliver_to_conveyor(job['destination_line'])
# Step 5: Completion
self.publish_status("COMPLETED", job)
self.is_busy = False
return True
except Exception as e:
self.get_logger().error(f"Harvest job error {job.get('tray_id')}: {e}")
self.publish_status("ERROR", job)
self.is_busy = False
return False
def publish_status(self, status: str, job: dict):
msg = String()
msg.data = json.dumps({
"robot_id": self.get_name(),
"status": status,
"tray_id": job.get("tray_id"),
"zone_id": job.get("zone_id"),
"timestamp": self.get_clock().now().to_msg().sec
})
self.status_pub.publish(msg)
async def navigate_to_zone(self, zone_id: str, shelf_row: int) -> bool:
"""Navigate AGV to target zone (stub - uses Nav2 in production)"""
self.get_logger().info(f"Navigating to zone {zone_id} row {shelf_row}")
await asyncio.sleep(2.0) # simulated movement
return True
async def check_crop_maturity(self, tray_id: str) -> float:
"""Computer vision analysis for maturity assessment (stub)"""
# In production: YOLO/custom model inference on camera image
await asyncio.sleep(0.5)
return 0.92 # maturity score 0-1
async def pick_tray(self, tray_id: str, position: dict) -> bool:
"""Robotic arm control for tray pick-up via MoveIt2 (stub)"""
await asyncio.sleep(1.5)
return True
async def deliver_to_conveyor(self, destination_line: str) -> bool:
"""Deposit tray on output conveyor (stub)"""
await asyncio.sleep(1.0)
return True
def on_camera_frame(self, msg: Image):
"""Camera frame callback (processed async on demand)"""
pass
def main(args=None):
rclpy.init(args=args)
node = HarvestingRobotNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
API Design for Vertical Farms: FastAPI REST Backend
The API layer is the integration point between the farm's physical control system and the outside world: corporate ERP, customer portal, operators' mobile app, distribution warehouse WMS. A poorly designed API in this context leads to inconsistencies in crop recipes, scheduling errors, and potentially crop loss. A well-designed API, on the contrary, is the nervous system that coordinates all subsystems.
"""
FastAPI Backend for Vertical Farm Management
Endpoints: crop recipes, zones, production batches, actuators, sensors
"""
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from datetime import datetime, date
from enum import Enum
import uuid
app = FastAPI(
title="Vertical Farm Control API",
description="API for vertical farm management: recipes, zones, robots, sensors",
version="2.1.0"
)
security = HTTPBearer()
# ============================================================
# PYDANTIC MODELS
# ============================================================
class CropTypeEnum(str, Enum):
LETTUCE = "lettuce"
BASIL = "basil"
SPINACH = "spinach"
STRAWBERRY = "strawberry"
MICROGREENS = "microgreens"
HERBS = "herbs"
class GrowingSystemEnum(str, Enum):
NFT = "nft"
DWC = "dwc"
AEROPONICS = "aeroponics"
SUBSTRATE = "substrate"
class LightRecipeCreate(BaseModel):
name: str = Field(..., min_length=3, max_length=100)
crop_type: CropTypeEnum
growth_stage: str
ppfd_target: float = Field(..., ge=50, le=800)
dli_target: float = Field(..., ge=5, le=40)
photoperiod_hours: float = Field(..., ge=8, le=24)
spectrum_red_pct: float = Field(..., ge=0, le=100)
spectrum_blue_pct: float = Field(..., ge=0, le=100)
spectrum_white_pct: float = Field(..., ge=0, le=100)
spectrum_farred_pct: float = Field(default=0.0, ge=0, le=20)
spectrum_uv_pct: float = Field(default=0.0, ge=0, le=10)
notes: Optional[str] = None
@validator('spectrum_blue_pct')
def validate_spectrum_sum(cls, v, values):
total = (values.get('spectrum_red_pct', 0) + v +
values.get('spectrum_white_pct', 0))
# +/- 5% tolerance for rounding
if total > 105:
raise ValueError(f"Spectrum channel sum {total}% exceeds 100%")
return v
class BatchCreate(BaseModel):
zone_id: str
recipe_id: str
crop_type: CropTypeEnum
growing_system: GrowingSystemEnum
seeding_date: date
expected_harvest_date: date
tray_count: int = Field(..., ge=1, le=10000)
seeds_per_tray: int = Field(default=50, ge=1, le=500)
client_order_id: Optional[str] = None
@validator('expected_harvest_date')
def harvest_after_seeding(cls, v, values):
seeding = values.get('seeding_date')
if seeding and v <= seeding:
raise ValueError("Harvest date must be after seeding date")
return v
class SensorReading(BaseModel):
zone_id: str
timestamp: datetime
temperature_c: float
humidity_rh: float
co2_ppm: float
ppfd_umol: Optional[float] = None
ec_ms_cm: Optional[float] = None
ph: Optional[float] = None
water_temp_c: Optional[float] = None
class ActuatorCommand(BaseModel):
zone_id: str
command_type: str # "led_update", "co2_valve", "pump_speed", "hvac"
parameters: dict
priority: int = Field(default=5, ge=1, le=10) # 10 = emergency
# ============================================================
# ROUTES: CROP RECIPES
# ============================================================
@app.post("/api/v1/recipes", status_code=status.HTTP_201_CREATED)
async def create_recipe(
recipe: LightRecipeCreate,
credentials: HTTPAuthorizationCredentials = Depends(security)
):
"""
Creates a new crop recipe in the system.
Recipes define light, environmental and irrigation parameters.
"""
recipe_id = str(uuid.uuid4())
# In production: save to PostgreSQL
return {
"recipe_id": recipe_id,
"name": recipe.name,
"crop_type": recipe.crop_type,
"ppfd_target": recipe.ppfd_target,
"dli_target": recipe.dli_target,
"created_at": datetime.utcnow().isoformat(),
"status": "active"
}
@app.get("/api/v1/recipes/{recipe_id}")
async def get_recipe(recipe_id: str):
"""Retrieve crop recipe by ID"""
# Stub - in production: query PostgreSQL
return {
"recipe_id": recipe_id,
"name": "Lollo Rossa Lettuce - Vegetative",
"crop_type": "lettuce",
"ppfd_target": 200.0,
"dli_target": 17.0,
"photoperiod_hours": 16.0,
"spectrum": {"red": 65, "blue": 20, "white": 10, "farred": 5},
"env_targets": {"temp_c": 21.0, "humidity_rh": 70.0, "co2_ppm": 1200},
"nutrient_targets": {"ec_ms_cm": 1.6, "ph": 6.0}
}
@app.get("/api/v1/recipes")
async def list_recipes(
crop_type: Optional[CropTypeEnum] = None,
active_only: bool = True,
limit: int = Field(default=50, le=200)
):
"""List recipes with filter by crop type"""
return {
"recipes": [],
"total": 0,
"filters": {"crop_type": crop_type, "active_only": active_only}
}
# ============================================================
# ROUTES: PRODUCTION BATCHES
# ============================================================
@app.post("/api/v1/batches", status_code=status.HTTP_201_CREATED)
async def create_batch(
batch: BatchCreate,
background_tasks: BackgroundTasks,
credentials: HTTPAuthorizationCredentials = Depends(security)
):
"""
Starts a new production batch.
Associates zone, recipe, seeding data and harvest target.
Background: schedules LED and irrigation on SCADA.
"""
batch_id = str(uuid.uuid4())
background_tasks.add_task(schedule_batch_on_scada, batch_id, batch)
return {
"batch_id": batch_id,
"zone_id": batch.zone_id,
"crop_type": batch.crop_type,
"seeding_date": batch.seeding_date.isoformat(),
"expected_harvest_date": batch.expected_harvest_date.isoformat(),
"tray_count": batch.tray_count,
"status": "scheduled"
}
async def schedule_batch_on_scada(batch_id: str, batch: BatchCreate):
"""Background task: sends configuration to SCADA for scheduling"""
# In production: API call to SCADA (Ignition, custom Python SCADA)
pass
# ============================================================
# ROUTES: SENSORS AND TELEMETRY
# ============================================================
@app.post("/api/v1/telemetry")
async def ingest_sensor_data(reading: SensorReading):
"""
Endpoint for sensor data ingestion from edge controller.
Validation, alerts and historization on InfluxDB/TimescaleDB.
"""
alerts = []
# Temperature alerts
if reading.temperature_c > 28.0:
alerts.append({"type": "HIGH_TEMP", "value": reading.temperature_c, "threshold": 28.0})
elif reading.temperature_c < 16.0:
alerts.append({"type": "LOW_TEMP", "value": reading.temperature_c, "threshold": 16.0})
# CO2 alerts
if reading.co2_ppm > 2000:
alerts.append({"type": "HIGH_CO2", "value": reading.co2_ppm, "threshold": 2000})
# pH alerts
if reading.ph is not None and (reading.ph < 5.0 or reading.ph > 7.5):
alerts.append({"type": "PH_OUT_OF_RANGE", "value": reading.ph})
# In production: batch write to InfluxDB and publish alert to Kafka/MQTT
return {
"status": "accepted",
"zone_id": reading.zone_id,
"timestamp": reading.timestamp.isoformat(),
"alerts": alerts,
"alert_count": len(alerts)
}
@app.get("/api/v1/zones/{zone_id}/current")
async def get_zone_current_state(zone_id: str):
"""Current environmental state of a zone (last value from InfluxDB)"""
# Stub
return {
"zone_id": zone_id,
"timestamp": datetime.utcnow().isoformat(),
"sensors": {
"temperature_c": 21.3,
"humidity_rh": 69.8,
"co2_ppm": 1185,
"ppfd_umol": 198.5,
"ec_ms_cm": 1.62,
"ph": 6.05
},
"actuators": {
"led_pwm": {"red": 0.617, "blue": 0.192, "white": 0.098, "farred": 0.049},
"co2_valve_pct": 12.5,
"hvac_cooling_pct": 35.0,
"pump_active": True
},
"active_batch_id": "b-2025-001-lollo",
"days_since_seeding": 18
}
# ============================================================
# ROUTES: ACTUATOR CONTROL
# ============================================================
@app.post("/api/v1/actuators/command")
async def send_actuator_command(
command: ActuatorCommand,
credentials: HTTPAuthorizationCredentials = Depends(security)
):
"""
Sends a manual command to a zone actuator.
Used for manual overrides, maintenance and testing.
Requires authentication and is logged for audit purposes.
"""
allowed_commands = {"led_update", "co2_valve", "pump_speed", "hvac", "emergency_stop"}
if command.command_type not in allowed_commands:
raise HTTPException(
status_code=400,
detail=f"Invalid command type: {command.command_type}"
)
command_id = str(uuid.uuid4())
# In production: publish to MQTT/OPC-UA toward edge controller
return {
"command_id": command_id,
"zone_id": command.zone_id,
"command_type": command.command_type,
"parameters": command.parameters,
"status": "sent",
"sent_at": datetime.utcnow().isoformat()
}
Farm Digital Twin: Simulation and Optimization
A digital twin of a vertical farm is a computational model that replicates the physical behavior of the farm accurately enough to allow predictive simulations. This is not a visual 3D replica (that is a "visualization"), but a mathematical model that, given the current state of environmental parameters, predicts plant growth and time to harvest.
The most widely used plant growth models in controlled environments are based on the radiation use efficiency (RUE) approach: accumulated biomass is proportional to intercepted light (PAR) and conversion efficiency, which depends on temperature, CO2, water and nutritional availability. These models, originally developed for yield prediction systems in open fields (e.g., DSSAT, APSIM), have been adapted for indoor environments with experimentally calibrated parameters.
"""
Digital Twin - Plant Growth Model for Vertical Farm
Based on Radiation Use Efficiency (RUE) + temperature/CO2 effects
"""
import numpy as np
from dataclasses import dataclass, field
from typing import List, Optional
from datetime import datetime, timedelta
@dataclass
class PlantGrowthModel:
"""
Simplified growth model for lettuce in a hydroponic system.
Parameters calibrated on experimental data for Lactuca sativa.
"""
# Crop biological parameters
rue_base: float = 1.8 # g biomass / MJ intercepted PAR
temp_base: float = 5.0 # base temperature (C) - no growth below
temp_opt: float = 22.0 # optimal temperature
temp_max: float = 32.0 # maximum survival temperature
co2_base_ppm: float = 400.0 # ambient CO2 reference
co2_enhancement: float = 0.002 # RUE increase per extra ppm CO2
# Current plant state
fresh_weight_g: float = 0.5 # initial fresh weight (seedling ~5g DW)
dry_weight_g: float = 0.05 # initial dry weight
leaf_area_cm2: float = 5.0 # initial leaf area
days_since_seeding: int = 0
# Harvest target
target_fresh_weight_g: float = 150.0 # 150g lettuce
water_content: float = 0.95 # % water relative to fresh weight
def temperature_factor(self, temp: float) -> float:
"""
Temperature factor (0-1) using beta function.
temp_opt gives maximum efficiency (1.0).
"""
if temp <= self.temp_base or temp >= self.temp_max:
return 0.0
if temp <= self.temp_opt:
return (temp - self.temp_base) / (self.temp_opt - self.temp_base)
else:
return (self.temp_max - temp) / (self.temp_max - self.temp_opt)
def co2_factor(self, co2_ppm: float) -> float:
"""CO2 enrichment factor (1.0 at ambient, >1 with enrichment)"""
extra_co2 = max(0, co2_ppm - self.co2_base_ppm)
return 1.0 + (self.co2_enhancement * extra_co2)
def par_intercepted_mj(self, ppfd: float, leaf_area_cm2: float,
photoperiod_h: float) -> float:
"""
Calculate PAR intercepted by the plant in MJ/day.
ppfd: µmol/m2/s -> conversion to W/m2 (1 W/m2 approx 4.6 µmol/m2/s for LEDs)
"""
ppfd_wm2 = ppfd / 4.6
par_w = ppfd_wm2 * (leaf_area_cm2 / 10000) # in m2
par_mj_day = par_w * photoperiod_h * 3600 / 1e6
return par_mj_day
def simulate_day(self, temp: float, co2_ppm: float,
ppfd: float, photoperiod_h: float) -> dict:
"""
Simulates one day of growth and updates plant state.
Returns daily delta and updated state.
"""
# Environmental factors
tf = self.temperature_factor(temp)
cf = self.co2_factor(co2_ppm)
par_intercepted = self.par_intercepted_mj(ppfd, self.leaf_area_cm2, photoperiod_h)
# Dry biomass growth (RUE model)
delta_dw = self.rue_base * par_intercepted * tf * cf
delta_fw = delta_dw / (1 - self.water_content)
self.dry_weight_g += delta_dw
self.fresh_weight_g += delta_fw
# Leaf area update (SLA - specific leaf area)
sla_cm2_per_g = 350 # cm2/g DW for lettuce
self.leaf_area_cm2 = self.dry_weight_g * sla_cm2_per_g
self.days_since_seeding += 1
# Check harvest readiness
harvest_ready = self.fresh_weight_g >= self.target_fresh_weight_g
return {
"day": self.days_since_seeding,
"fresh_weight_g": round(self.fresh_weight_g, 2),
"dry_weight_g": round(self.dry_weight_g, 3),
"leaf_area_cm2": round(self.leaf_area_cm2, 1),
"delta_fw_g": round(delta_fw, 3),
"temp_factor": round(tf, 3),
"co2_factor": round(cf, 3),
"par_intercepted_mj": round(par_intercepted, 6),
"harvest_ready": harvest_ready,
}
def simulate_full_cycle(self, daily_conditions: List[dict]) -> dict:
"""
Simulates the full crop cycle with variable daily conditions.
Returns complete projection and estimated harvest day.
"""
days_log = []
harvest_day = None
for day_idx, cond in enumerate(daily_conditions):
day_state = self.simulate_day(
temp=cond.get('temp', 21.0),
co2_ppm=cond.get('co2_ppm', 1200),
ppfd=cond.get('ppfd', 200),
photoperiod_h=cond.get('photoperiod_h', 16)
)
days_log.append(day_state)
if day_state['harvest_ready'] and harvest_day is None:
harvest_day = day_idx + 1
return {
"days_simulated": len(days_log),
"final_fresh_weight_g": self.fresh_weight_g,
"estimated_harvest_day": harvest_day,
"daily_log": days_log,
"achieved_target": self.fresh_weight_g >= self.target_fresh_weight_g
}
# Digital twin usage example
def predict_harvest_date(recipe: dict, seeding_date: datetime) -> datetime:
"""
Uses the digital twin to predict harvest date
given a constant environmental recipe.
"""
model = PlantGrowthModel()
# Daily conditions from recipe (constant for simplicity)
daily_conditions = [{
'temp': recipe.get('temp_c', 21.0),
'co2_ppm': recipe.get('co2_ppm', 1200),
'ppfd': recipe.get('ppfd_target', 200),
'photoperiod_h': recipe.get('photoperiod_hours', 16)
}] * 40 # maximum 40 days of simulation
result = model.simulate_full_cycle(daily_conditions)
harvest_day = result.get('estimated_harvest_day', 35)
return seeding_date + timedelta(days=harvest_day)
AI for Optimization: Reinforcement Learning for Light Recipes
The digital twin enables something more powerful than simple prediction: recipe optimization through reinforcement learning (RL). The RL agent interacts with the digital twin (not with the real farm), explores thousands of combinations of light parameters, and finds configurations that maximize yield while minimizing energy consumption. Once an optimal recipe is found in simulation, it is validated in a pilot zone of the real farm before full-scale deployment.
This approach, called sim-to-real transfer, is the frontier of research in vertical farming AI. The gap between simulation and reality (sim-to-real gap) requires continuous calibration of the growth model on real data collected from the farm.
"""
Reinforcement Learning for Light Recipe Optimization
Uses Gymnasium + custom environment based on PlantGrowthModel
"""
import gymnasium as gym
import numpy as np
from gymnasium import spaces
from typing import Tuple, Optional
class VerticalFarmEnv(gym.Env):
"""
Gymnasium environment for light recipe optimization.
Observation space: current environmental state + plant state
Action space: LED parameter adjustments (continuous)
Reward: daily growth / energy consumption
"""
metadata = {'render_modes': ['human']}
def __init__(self, crop_type: str = "lettuce", episode_days: int = 30):
super().__init__()
self.crop_type = crop_type
self.episode_days = episode_days
self.current_day = 0
# Action space: [delta_ppfd, delta_red_ratio, delta_blue_ratio, delta_photoperiod]
# Normalized values in [-1, 1], scaled internally
self.action_space = spaces.Box(
low=np.array([-1.0, -1.0, -1.0, -1.0], dtype=np.float32),
high=np.array([1.0, 1.0, 1.0, 1.0], dtype=np.float32)
)
# Observation space: [ppfd, red_ratio, blue_ratio, photoperiod,
# fresh_weight, leaf_area, days, temp, co2]
self.observation_space = spaces.Box(
low=np.array([50, 0, 0, 8, 0, 0, 0, 15, 400], dtype=np.float32),
high=np.array([800, 1, 1, 24, 500, 5000, 45, 30, 2000], dtype=np.float32)
)
# Current state
self.ppfd = 200.0
self.red_ratio = 0.65
self.blue_ratio = 0.20
self.photoperiod = 16.0
self.plant_model = None
def reset(self, seed: Optional[int] = None, **kwargs) -> Tuple[np.ndarray, dict]:
super().reset(seed=seed)
self.current_day = 0
self.ppfd = 200.0
self.red_ratio = 0.65
self.blue_ratio = 0.20
self.photoperiod = 16.0
from digital_twin import PlantGrowthModel # local import
self.plant_model = PlantGrowthModel()
return self._get_obs(), {}
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, bool, dict]:
# Apply action with scaling
self.ppfd = np.clip(self.ppfd + action[0] * 50, 50, 800)
self.red_ratio = np.clip(self.red_ratio + action[1] * 0.1, 0.3, 0.8)
self.blue_ratio = np.clip(self.blue_ratio + action[2] * 0.05, 0.1, 0.35)
self.photoperiod = np.clip(self.photoperiod + action[3] * 1.0, 10, 22)
# Simulate day with new conditions
day_result = self.plant_model.simulate_day(
temp=21.0, co2_ppm=1200,
ppfd=self.ppfd, photoperiod_h=self.photoperiod
)
# Calculate energy consumption (kWh/day per m2 growing area)
energy_kwh = (self.ppfd / 4.6) * (self.photoperiod / 1000) # simplified
# Reward: growth / energy (maximize efficiency)
growth = day_result['delta_fw_g']
reward = growth / max(energy_kwh, 0.001) * 0.01
# Penalty for harvest_ready reached too late
if self.current_day > 35 and not day_result['harvest_ready']:
reward -= 5.0
# Bonus for harvest_ready reached on time
if day_result['harvest_ready'] and self.current_day <= 28:
reward += 20.0
self.current_day += 1
done = day_result['harvest_ready'] or self.current_day >= self.episode_days
return self._get_obs(), reward, done, False, day_result
def _get_obs(self) -> np.ndarray:
pm = self.plant_model
return np.array([
self.ppfd, self.red_ratio, self.blue_ratio, self.photoperiod,
pm.fresh_weight_g if pm else 0.5,
pm.leaf_area_cm2 if pm else 5.0,
self.current_day, 21.0, 1200.0
], dtype=np.float32)
# Training with Stable-Baselines3
# from stable_baselines3 import PPO
# env = VerticalFarmEnv(crop_type="lettuce")
# model = PPO("MlpPolicy", env, verbose=1, learning_rate=3e-4)
# model.learn(total_timesteps=500_000)
# model.save("optimized_lettuce_recipe_v1")
Industrial IoT Infrastructure: Modbus, MQTT, OPC-UA
Vertical farms use an overlay of industrial protocols: Modbus RTU/TCP for communication with legacy sensors and actuators (thermo-hygrometers, CO2 meters, LED controllers), OPC-UA for communication with Siemens/Beckhoff PLCs and SCADA systems, MQTT for sending data to the cloud. The choice depends on the vendor's hardware, the required latency, and the security level.
IoT Protocols in Vertical Farming: Comparison
| Protocol | Layer | Latency | Security | Typical Use Case |
|---|---|---|---|---|
| Modbus RTU | Field-PLC | 10-100ms | None (legacy) | EC/pH sensors, LED drivers |
| Modbus TCP | PLC-SCADA | 5-50ms | Optional TLS | PLC data acquisition |
| OPC-UA | PLC-SCADA-Cloud | 1-50ms | X.509, signing, encryption | Industry 4.0 standard |
| MQTT | Edge-Cloud | 10-500ms | TLS + authentication | Cloud telemetry |
| REST/HTTP | Cloud-Cloud | 50-500ms | HTTPS, OAuth2 | ERP integration API |
"""
Modbus -> MQTT Bridge for vertical farm
Reads sensors via Modbus RTU and publishes to MQTT broker
"""
import asyncio
import json
import time
import logging
from pymodbus.client import AsyncModbusSerialClient
import paho.mqtt.client as mqtt
logger = logging.getLogger(__name__)
# Modbus register map for combo Temp/RH/CO2 sensor (e.g., Vaisala HMP60)
MODBUS_REGISTER_MAP = {
"temperature": {"address": 0x0000, "count": 1, "scale": 0.1, "unit": "C"},
"humidity": {"address": 0x0001, "count": 1, "scale": 0.1, "unit": "%RH"},
"co2_ppm": {"address": 0x0002, "count": 1, "scale": 1.0, "unit": "ppm"},
"ec_ms_cm": {"address": 0x0010, "count": 1, "scale": 0.01, "unit": "mS/cm"},
"ph": {"address": 0x0011, "count": 1, "scale": 0.01, "unit": "pH"},
}
class ModbusMQTTBridge:
def __init__(self, zone_id: str, modbus_port: str,
modbus_address: int, mqtt_broker: str, mqtt_port: int = 1883):
self.zone_id = zone_id
self.modbus_client = AsyncModbusSerialClient(
port=modbus_port, baudrate=9600, timeout=3
)
self.mqtt_client = mqtt.Client(client_id=f"bridge-{zone_id}")
self.mqtt_broker = mqtt_broker
self.mqtt_port = mqtt_port
self.mqtt_topic = f"farm/zones/{zone_id}/telemetry"
async def connect(self):
await self.modbus_client.connect()
self.mqtt_client.connect(self.mqtt_broker, self.mqtt_port, keepalive=60)
self.mqtt_client.loop_start()
logger.info(f"Bridge started for zone {self.zone_id}")
async def read_all_sensors(self, device_id: int = 1) -> dict:
readings = {"zone_id": self.zone_id, "timestamp": time.time()}
for sensor_name, reg in MODBUS_REGISTER_MAP.items():
try:
result = await self.modbus_client.read_holding_registers(
address=reg["address"],
count=reg["count"],
slave=device_id
)
if not result.isError():
raw_value = result.registers[0]
readings[sensor_name] = round(raw_value * reg["scale"], 3)
else:
logger.warning(f"Read error {sensor_name} zone {self.zone_id}")
readings[sensor_name] = None
except Exception as e:
logger.error(f"Modbus exception {sensor_name}: {e}")
readings[sensor_name] = None
return readings
async def publish_loop(self, interval_sec: float = 30.0):
while True:
readings = await self.read_all_sensors()
payload = json.dumps(readings)
result = self.mqtt_client.publish(
topic=self.mqtt_topic,
payload=payload,
qos=1
)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.debug(f"Published to {self.mqtt_topic}: {payload[:80]}...")
else:
logger.error(f"MQTT publish error: rc={result.rc}")
await asyncio.sleep(interval_sec)
Vertical Farming Economics: CAPEX, OPEX and Break-Even
Is vertical farming economically sustainable? The answer in 2025 is: it depends. It depends on the crop (herbs and microgreens are far more profitable than lettuce), scale (economies of scale emerge above 5,000 m²), location (local energy cost and labor cost are decisive factors), and the sales channel (direct B2C at premium prices vs. commodity grocery supply).
Economic Analysis: 1,000 m² Net Growing Farm
| Item | Value | Notes |
|---|---|---|
| CAPEX (initial investment) | ||
| Structure and systems | € 800,000 | Warehouse refurbishment |
| Shelving and NFT channels | € 300,000 | Hydroponic systems |
| LED lighting | € 600,000 | 600W/m² at 2.8 µmol/J efficiency |
| HVAC and climate control | € 250,000 | Cooling + dehumidification |
| CO2 system | € 50,000 | Storage + distribution |
| Automation and robotics | € 400,000 | Seeder, transplanter, harvester |
| Software and integration | € 150,000 | SCADA, API, digital twin |
| Total CAPEX | € 2,550,000 | ~€2,550/m² |
| ANNUAL OPEX | ||
| Electricity | € 420,000 | 35% OPEX - primary cost driver |
| Labor (10 operators) | € 350,000 | 27% OPEX |
| Seeds and substrate | € 80,000 | 6% OPEX |
| Nutrients and CO2 | € 60,000 | 5% OPEX |
| Maintenance | € 90,000 | 7% OPEX |
| Packaging and logistics | € 120,000 | 9% OPEX |
| Other (insurance, etc.) | € 80,000 | 6% OPEX |
| Total OPEX | € 1,200,000 | €1,200/m²/year |
| REVENUE | ||
| Lettuce production (18 cycles x 25kg/m²) | 450 kg/m²/year | 450,000 kg total |
| Premium retail selling price | € 3.5/kg | vs €0.8-1.2 outdoor |
| Gross revenue | € 1,575,000 | |
| EBITDA | € 375,000 | 23.8% margin |
| CAPEX depreciation (10 years) | € 255,000 | |
| Net profit before tax | € 120,000 | 7.6% margin |
| Break-even (years) | ~8-10 years | With herbs: 4-5 years |
The Energy Problem in Vertical Farming
Energy is the existential challenge of vertical farming. The most efficient LEDs available in 2025 achieve around 3.0-3.5 µmol/J of photon efficiency. To produce a DLI of 17 mol/m²/day over a 16-hour photoperiod requires approximately 280 Wh/m²/day, or 102 kWh/m²/year for lighting alone. With a 1,000 m² farm and an energy cost of €0.15/kWh (industrial rate in Italy 2025), the LED electricity bill alone reaches €153,000/year. Add HVAC (typically 60-70% of LED energy), CO2, pumps, automation: total costs easily reach €420,000/year. Those with access to low-cost renewable energy (e.g., Agricola Moderna running on 100% renewables) or on-site solar panels cover a significant portion of demand, but not all.
Vertical farming is not sustainable at all energy price levels. With energy above €0.25/kWh (a real scenario during crisis periods like 2022-2023), many economic models collapse. The bet is that LED efficiency continues to improve and renewable energy keeps getting cheaper.
Case Studies: Planet Farms and Agricola Moderna - The Italian Model
Italy is home to two of the most advanced vertical farming projects in Europe, both based in the Milan hinterland. Their paths are different but complementary, and they represent two approaches to the problem of scalability and profitability in European vertical farming.
Planet Farms - Cavenago di Brianza
Founded in 2018 by Luca Travaglini and Massimiliano Loschi, Planet Farms built its first facility in a former industrial area of Cavenago (MB). The original 9,000 m² plant became in 2024 one of the largest in Europe with 20,000 m² of growing surface. In November 2023, Planet Farms secured a $40 million round at a $500 million valuation, one of the largest in Europe in the sector. The partnership with Swiss Life Asset Managers in 2025 created a €200 million JV to develop vertical farms at scale across Europe.
The technology partnership with Siemens is at the heart of automation: Siemens S7-1500 industrial control systems manage environmental loops, while the Mindsphere platform (now Siemens Industrial Copilot) collects and analyzes data. The flagship product is "Living Herbs" - aromatic herbs sold in trays still with the root attached, a premium segment with higher margins than lettuce.
Agricola Moderna - Agnadello (CR)
Agricola Moderna, founded in Milan in 2018 by Pierluigi Giuliani and Benjamin Franchetti, opened in September 2024 its new 11,000 m² facility in Agnadello (Cremona), financed with a €10 million loan from Intesa Sanpaolo. The plant produces 30,000 bags of salad per day, powered 100% by renewable energy, with in-house AI developed by the R&D team to optimize recipes.
Agricola Moderna's technological differentiator is its hyperspectral imaging system (partnership with Specim) that allows assessment of plant nutritional status before symptoms are visible to the human eye. RGB-D cameras for 3D canopy reconstruction, environmental sensors with thousands of measurement points, and internally developed computer vision algorithms constitute the proprietary technology stack.
Planet Farms vs Agricola Moderna: Comparison
| Parameter | Planet Farms | Agricola Moderna |
|---|---|---|
| Growing area | 20,000 m² | 11,000 m² |
| Production/day | N.A. (herbs focus) | 30,000 salad bags |
| Funding | $40M round + €200M JV | €10M Intesa Sanpaolo |
| PLC/Automation | Siemens S7-1500 | Beckhoff + custom |
| AI Platform | Siemens Industrial Copilot | In-house development |
| Computer Vision | Standard RGB | Hyperspectral (Specim) |
| Energy | Partial renewable | 100% renewable |
| Target market | Premium retail + UK expansion | Italian grocery (IV gamma) |
Challenges, Limits and Industry Reality
An honest article about vertical farming must include the real challenges, not just the benefits. The sector experienced a significant disillusionment phase in 2022-2024, with high-profile failures that burned billions of dollars of investment. Understanding why is fundamental to building sustainable systems.
The Real Challenges of Vertical Farming
- Energy cost: This is the number one killer. 60% of vertical farm operators are not yet profitable, largely due to electricity costs. Without low-cost renewable energy, the economic model only works for premium crops.
- Limited crop variety: Vertical farming excels at leafy greens (lettuce, spinach), aromatic herbs and microgreens. For tomatoes, peppers, cucumbers the yield per unit energy is not yet competitive. Cereals, legumes and root vegetables are outside technical and economic reach.
- High CAPEX: A professional installation costs €2,000-4,000 per m² of growing area. Break-even for lettuce is 8-10 years: too long for many investors. Only aromatic herbs and microgreens reach break-even in 4-5 years.
- LED supply chain dependency: High-quality LED chips are produced by few suppliers (Epistar, Lumileds, Osram). Supply chain disruptions impact both initial CAPEX and ongoing maintenance.
- Real vs perceived sustainability: Water savings (95%) are real. But the carbon footprint depends heavily on the energy mix. A vertical farm powered by coal has a worse carbon footprint than outdoor farming. Only with renewables does the balance become clearly positive.
- Technical scalability: Scaling from a 500 m² pilot farm to a commercial 20,000 m² operation is not a simple multiplication. Control systems, crop management, internal logistics and robotic systems require deep redesign at every scale jump.
Production Deployment: Containerization and Monitoring
The software components of a vertical farm range from the real-time PID controller (which runs directly on the PLC or a Raspberry Pi, not containerized) to the FastAPI backend and AI pipelines (which live comfortably in Docker containers). The typical cloud infrastructure combines an on-premise edge layer with a cloud control plane.
# docker-compose.yml for vertical farm stack (dev/staging)
version: '3.9'
services:
# Main API
farm-api:
build: ./farm-api
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://farm:farm@postgres:5432/farmdb
- MQTT_BROKER=emqx
- INFLUXDB_URL=http://influxdb:8086
- REDIS_URL=redis://redis:6379
depends_on:
- postgres
- influxdb
- emqx
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
# MQTT Broker
emqx:
image: emqx/emqx:5.8
ports:
- "1883:1883" # MQTT
- "8883:8883" # MQTT TLS
- "18083:18083" # Dashboard
environment:
- EMQX_NODE__COOKIE=farm-secret-cookie
volumes:
- emqx_data:/opt/emqx/data
# Time-series database for sensor data
influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=farmpass123
- DOCKER_INFLUXDB_INIT_ORG=verticalfarm
- DOCKER_INFLUXDB_INIT_BUCKET=sensors
volumes:
- influxdb_data:/var/lib/influxdb2
# PostgreSQL for application data (recipes, batches, inventory)
postgres:
image: postgres:16-alpine
environment:
- POSTGRES_DB=farmdb
- POSTGRES_USER=farm
- POSTGRES_PASSWORD=farm
volumes:
- postgres_data:/var/lib/postgresql/data
# Redis for cache and job queue
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
# Grafana for real-time dashboards
grafana:
image: grafana/grafana:11.0.0
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
depends_on:
- influxdb
# ML worker for digital twin and optimization
ml-worker:
build: ./ml-worker
environment:
- INFLUXDB_URL=http://influxdb:8086
- REDIS_URL=redis://redis:6379
- MODEL_PATH=/models
volumes:
- ml_models:/models
volumes:
emqx_data:
influxdb_data:
postgres_data:
grafana_data:
ml_models:
Trends and Innovations in Vertical Farming 2025-2026
The sector is evolving rapidly on three fronts: energy efficiency, crop expansion, and increasingly deeper AI integration. Here are the most significant trends.
Key Innovations in Vertical Farming 2025
| Innovation | Impact | Commercial Status |
|---|---|---|
| LED efficiency 4.0+ µmol/J | -20-25% lighting energy costs | Available (LG, Signify) |
| Hyperspectral imaging | Early plant stress diagnosis, optimization | Early adoption (Specim + Agricola M.) |
| RL for dynamic recipes | +15-25% yield at equal energy | Advanced research / early prod |
| High-speed seeding robots | 3,000+ seedings/hour vs 700 manual | Commercial (Plenty, 80 Acres) |
| Mushroom/fungi cultivation | Crop mix diversification, high profitability | Commercial growth |
| Vertical farm + data center waste heat | Heat recovery to reduce HVAC costs | Pilot (Finland, Germany) |
| Solar PV with BESS integration | -40-60% energy cost in optimal scenarios | Growing adoption |
A particularly interesting emerging trend is the integration of vertical farming with data centers: servers generate waste heat that can be recovered to reduce heating costs in growing facilities during cold months. In Finland, where energy is expensive, several pilot projects are testing this symbiosis. Waste heat at 40-60°C is ideal for maintaining growing temperatures in winter greenhouses without additional energy consumption.
Conclusions: Vertical Farming as a Software Engineering Project
A modern vertical farm is first and foremost a software system. The hardware (LEDs, sensors, robots, shelving) is necessary but not sufficient: it is the software layer that transforms a lit warehouse into an optimized food factory. PID controllers manage the environment according to scientifically calibrated recipes; REST APIs connect the farm to the commercial ecosystem; ROS2 coordinates the robots; the digital twin enables predictive simulations; reinforcement learning finds better recipes than those manually designed by agronomists.
The vertical farming market growing from $9.6 billion in 2025 to $39 billion in 2033 is not guaranteed automatically: it depends on the sector's ability to solve the energy problem and expand the range of crops that can be produced in an economically sustainable way. Italy, with Planet Farms and Agricola Moderna, is in the right position to be a protagonist in this transition, especially following the PNRR and Transizione 5.0 incentives that reduce the cost of technology CAPEX.
For developers looking to enter this sector, the most in-demand skills are: Python for automation and ML, FastAPI for industrial backend, ROS2 for robotics, MQTT/Modbus/OPC-UA for industrial protocols, InfluxDB for time-series data, and the basics of plant physiology to understand the parameters being optimized. You do not need to know how to garden, but understanding why plants grow faster with certain wavelengths makes the difference between a good control system and an excellent one.
Tools and Resources for Further Study
- Pymodbus - Python library for Modbus RTU/TCP, ideal for communicating with PLCs and sensors
- Paho-MQTT - Python MQTT client for integration with brokers (EMQX, HiveMQ, Mosquitto)
- ROS2 Humble / Jazzy - Robotics framework, official documentation at docs.ros.org
- FastAPI - Async Python backend, ideal for industrial control APIs
- InfluxDB 2.x - Time-series database with Flux query language for sensor analytics
- Stable-Baselines3 - RL implementations (PPO, SAC, TD3) for recipe optimization
- Gymnasium - Standard for RL environments, used for digital twin environments
- Ignition SCADA - Industrial SCADA platform with accessible OEM licensing
- Vertical Farm Daily - Primary news source for the vertical farming sector
- USDA CEA Guidelines - Reference parameters for controlled environment agriculture
Next in the Series: Demand Forecasting for Food Retail
In article 8 we tackle the opposite problem: how to forecast demand for food products to optimize production orders (including from the vertical farm) and reduce waste. We will use Meta's Prophet for seasonality and trends, LightGBM for advanced tabular models, and build a complete forecasting pipeline with feature engineering specific to the food retail sector (weather effects, holidays, promotions).
Continue with: Demand Forecasting for Food Retail with Prophet and LightGBM
Related Articles in Other Series
- IoT Pipeline Series: IoT Pipeline for Precision Agriculture - MQTT, InfluxDB, Grafana for outdoor sensor monitoring
- MLOps Series: MLOps for AI Models in Production - How to deploy the RL recipe optimization model to production
- Computer Vision Series: CV for Quality Control - Computer vision techniques for assessing crop maturity and quality
- Data & AI Business Series: AI in Manufacturing - Predictive Maintenance and Digital Twin, twin architectures applicable to vertical farming







