Digital Twin for Energy Infrastructure: Real-Time Simulation
The global digital twin market for the energy sector reached $34.1 billion in 2025 and is set to grow at a CAGR of 34.7% through 2034, according to Fortune Business Insights. This is not a future promise: European utilities are already operating digital twins of HV/MV substations, offshore wind farms, and distribution networks that synchronize thousands of measurement points per second. In Italy, Terna and Enel have launched pilot digital twin programs for the national transmission grid as part of PNRR initiatives for the energy transition.
An energy digital twin is not simply a static simulation model. It is a bidirectional system that mirrors the physical state of an installation in real time, enables predictive simulations, evaluates what-if scenarios, and can command physical assets through a feedback loop. The difference from a simple SCADA is profound: where SCADA monitors and controls, the digital twin understands, simulates, and anticipates.
In this article we explore the practical implementation of digital twins for energy infrastructure: from physical modeling of electrical networks with pandapower and PyPSA, to SCADA/IoT integration, to ML-based predictive maintenance for transformers, through to real-world case studies of HV/MV substations and offshore wind farms.
What You Will Learn
- Digital twin taxonomy: descriptive, informative, predictive, prescriptive
- Layered architecture: Physical Entity, Data Layer, Model Layer, Visualization
- Physical models: Newton-Raphson power flow, transformer thermal model, degradation curves
- Data ingestion from SCADA, IoT sensors, weather APIs, and smart meters
- Network simulation with pandapower and PyPSA in Python
- Real-time synchronization: digital shadow vs bidirectional digital twin
- Predictive maintenance: Random Forest and XGBoost for transformer RUL
- What-if scenarios: N-1 contingencies and network expansion planning
- CIM IEC 61970/61968 and CGMES for model interoperability
- Azure Digital Twins vs AWS IoT TwinMaker: choosing the right cloud platform
- Case study: HV/MV substation and offshore wind farm
- Edge digital twin for minimum latency on industrial gateways
The EnergyTech Series: 10 Articles on Digital Energy
This is the ninth article in the EnergyTech series, dedicated to the protocols, software architectures, and algorithms transforming electrical energy management in the era of the energy transition.
| # | Article | Technologies | Level |
|---|---|---|---|
| 1 | OCPP 2.x Protocol: Building EV Charging Systems | OCPP, WebSocket, Python, ISO 15118 | Advanced |
| 2 | DERMS Architecture: Aggregating Millions of Distributed Resources | DERMS, REST, MQTT, PostgreSQL | Advanced |
| 3 | Renewable Energy Forecasting with ML: LSTM for Solar and Wind | Python, LSTM, Prophet, FastAPI | Advanced |
| 4 | Battery Management System for Grid-Scale Storage | BMS, SoC/SoH, Python, CAN bus | Advanced |
| 5 | IEC 61850 for Software Engineers: Smart Grid Communication | IEC 61850, GOOSE, MMS, SCADA | Advanced |
| 6 | EV Charging Load Balancing: Real-Time Algorithms | Algorithms, Python, OCPP, WebSocket | Advanced |
| 7 | From MQTT to InfluxDB: A Real-Time Energy IoT Platform | MQTT, InfluxDB, Telegraf, Grafana | Intermediate |
| 8 | Carbon Accounting Software Architecture: ESG Platforms | GHG Protocol, Python, API, reporting | Intermediate |
| 9 | Digital Twin for Energy Infrastructure (you are here) | pandapower, PyPSA, ML, Azure DT, Python | Advanced |
| 10 | Blockchain for P2P Energy Trading: Smart Contracts and Constraints | Solidity, Ethereum, Smart Contract | Advanced |
Digital Twin Taxonomy for Energy
Not all "digital twins" are equal. The Grieten-Kritzinger framework, adopted by the IEC and many industrial vendors, defines four maturity levels based on the direction of data flow and processing capabilities:
| Level | Type | Data Flow | Capability | Energy Example |
|---|---|---|---|---|
| L1 | Digital Shadow (Descriptive) | Physical → Digital | Current state monitoring | Real-time SCADA dashboard |
| L2 | Informative Digital Twin | Physical ↔ Digital (historical) | Historical analysis, KPIs, trends | Transformer asset health monitoring |
| L3 | Predictive Digital Twin | Physical ↔ Digital + ML | Fault prediction, RUL, simulation | HV cable remaining useful life prediction |
| L4 | Prescriptive Digital Twin | Physical ↔ Digital (active bidirectional) | Recommendation/execution of optimal actions | Automated maintenance rescheduling |
Digital Shadow vs Digital Twin
A digital shadow collects data from the physical asset but does not influence it (unidirectional flow). A true digital twin has a feedback channel: it can send setpoints, commands, or parameters to the physical asset, closing the control loop. In practice, most industrial systems today operate at L1-L2, with the goal of reaching L3-L4 by 2027.
Layered Architecture of the Energy Digital Twin
An enterprise-grade digital twin for energy infrastructure consists of four distinct layers, each with well-defined responsibilities:
Layer 1: Physical Entity
Instrumented physical assets: HV/MV transformers with DGA (Dissolved Gas Analysis) sensors, busbars with current transformers, overhead lines with temperature and wind sensors, generators with vibration monitoring, batteries with SoC/SoH sensors. The physical layer produces raw data at varying frequencies: from 1 sample/hour for ambient temperature to 1 sample/ms for IEC 61850 protection measurements.
Layer 2: Data Layer
The ingestion, normalization, and storage layer. It includes:
- SCADA/EMS: real-time operational data via DNP3, Modbus, IEC 61850
- IoT Platform: wireless sensors via MQTT/AMQP to cloud brokers
- Time-Series DB: InfluxDB or TimescaleDB for high-frequency telemetry
- Data Lake: historical storage for ML training (S3, ADLS)
- Weather API: meteorological data for thermal models and solar/wind forecasting
- Smart Meter: aggregated load profiles via DLMS/COSEM
Layer 3: Model Layer
The computational core: physical and ML models that replicate the behavior of assets. Includes the power flow solver, thermal models, degradation models, and ML models for predictive maintenance.
Layer 4: Visualization and Integration
Operational dashboards (Grafana, Power BI), 3D rendering of substations (Three.js, Unity, Siemens NX), REST/WebSocket APIs for third-party systems (ERP, CMMS, EMS), and interfaces for prescriptive control.
Physical Models: Power Flow with Newton-Raphson
The heart of any electrical network digital twin is the power flow solver. The Newton-Raphson algorithm solves the nonlinear equations describing active and reactive power balance at every node (bus) of the network. With pandapower we can build a complete substation model and simulate its behavior in a few milliseconds.
Installation and Setup
# Install dependencies
pip install pandapower pypsa numpy pandas scikit-learn xgboost
pip install influxdb-client paho-mqtt asyncio aiohttp
pip install plotly dash # for visualization
# Recommended versions (2025)
# pandapower==2.14.x
# pypsa==0.31.x
# scikit-learn==1.5.x
# xgboost==2.1.x
Network Modeling with pandapower
We build a complete model of an HV/MV substation with distribution lines, transformers, loads, and distributed generators:
import pandapower as pp
import pandapower.networks as pn
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Optional
import json
@dataclass
class SubstationConfig:
"""HV/MV substation configuration"""
name: str
voltage_hv_kv: float = 132.0 # High Voltage kV
voltage_mv_kv: float = 20.0 # Medium Voltage kV
transformer_mva: float = 40.0 # Transformer rated power MVA
n_feeders: int = 6 # Number of MV feeders
peak_load_mw: float = 28.0 # Peak load MW
pv_capacity_mw: float = 4.5 # Connected PV capacity MW
class EnergyDigitalTwin:
"""
Digital Twin for HV/MV substation with real-time power flow simulation.
Implements continuous update from SCADA state.
"""
def __init__(self, config: SubstationConfig):
self.config = config
self.net = pp.create_empty_network(name=config.name)
self._build_network_model()
self._state_cache: dict = {}
def _build_network_model(self) -> None:
"""Builds the network model from configuration."""
cfg = self.config
# HV bus (slack bus = voltage reference)
self.bus_ht = pp.create_bus(
self.net,
vn_kv=cfg.voltage_hv_kv,
name="Bus_HV_Main",
type="b"
)
# Primary MV bus (transformer secondary side)
self.bus_mt_primary = pp.create_bus(
self.net,
vn_kv=cfg.voltage_mv_kv,
name="Bus_MV_Busbar",
type="b"
)
# Main HV/MV transformer
pp.create_transformer_from_parameters(
self.net,
hv_bus=self.bus_ht,
lv_bus=self.bus_mt_primary,
sn_mva=cfg.transformer_mva,
vn_hv_kv=cfg.voltage_hv_kv,
vn_lv_kv=cfg.voltage_mv_kv,
vkr_percent=0.3, # copper losses %
vk_percent=8.5, # short-circuit voltage %
pfe_kw=35.0, # iron losses kW
i0_percent=0.08, # no-load current %
name="TR_HV_MV_Main",
tap_pos=0, # tap changer position
tap_neutral=0,
tap_min=-8,
tap_max=8,
tap_step_percent=1.25
)
# External HV grid (Thevenin equivalent)
pp.create_ext_grid(
self.net,
bus=self.bus_ht,
vm_pu=1.0,
va_degree=0.0,
name="Transmission_Grid",
s_sc_max_mva=3500.0, # grid short-circuit power
rx_max=0.1
)
# Create MV feeders with loads and generation
self.feeder_buses = []
self.load_ids = []
self.pv_ids = []
for i in range(cfg.n_feeders):
# Feeder bus
bus_feeder = pp.create_bus(
self.net,
vn_kv=cfg.voltage_mv_kv,
name=f"Bus_Feeder_{i+1:02d}"
)
self.feeder_buses.append(bus_feeder)
# MV line from main bus to feeder (XLPE 150mm2 cable)
pp.create_line_from_parameters(
self.net,
from_bus=self.bus_mt_primary,
to_bus=bus_feeder,
length_km=2.5 + i * 0.8,
r_ohm_per_km=0.206, # XLPE 150mm2 resistance
x_ohm_per_km=0.083,
c_nf_per_km=320.0,
max_i_ka=0.261,
name=f"Line_Feeder_{i+1:02d}"
)
# Load per feeder (uniform distribution + variance)
load_mw = cfg.peak_load_mw / cfg.n_feeders * (0.8 + 0.4 * np.random.rand())
load_id = pp.create_load(
self.net,
bus=bus_feeder,
p_mw=load_mw,
q_mvar=load_mw * 0.3, # power factor ~ 0.958
name=f"Load_Feeder_{i+1:02d}",
controllable=False
)
self.load_ids.append(load_id)
# Photovoltaic generator (primary MV bus)
pv_id = pp.create_sgen(
self.net,
bus=self.bus_mt_primary,
p_mw=cfg.pv_capacity_mw,
q_mvar=0.0,
name="PV_Rooftop",
type="PV",
controllable=True
)
self.pv_ids.append(pv_id)
def update_from_scada(self, scada_data: dict) -> None:
"""
Updates the model with real-time data from SCADA/IoT.
Args:
scada_data: dictionary with current measurements
- load_mw_per_feeder: list of active powers
- pv_mw: current photovoltaic generation
- tap_position: tap changer position
- ambient_temp_c: ambient temperature
"""
# Update feeder loads
loads = scada_data.get("load_mw_per_feeder", [])
for i, (load_id, p_mw) in enumerate(zip(self.load_ids, loads)):
q_mvar = p_mw * 0.30 # fixed power factor (from model)
self.net.load.at[load_id, "p_mw"] = p_mw
self.net.load.at[load_id, "q_mvar"] = q_mvar
# Update PV generation
pv_mw = scada_data.get("pv_mw", 0.0)
if self.pv_ids:
self.net.sgen.at[self.pv_ids[0], "p_mw"] = pv_mw
# Update tap changer position
tap = scada_data.get("tap_position", 0)
self.net.trafo.at[0, "tap_pos"] = np.clip(tap, -8, 8)
# Cache state for comparison
self._state_cache = scada_data.copy()
def run_power_flow(self) -> dict:
"""
Runs Newton-Raphson simulation and returns results.
Returns:
Dictionary with voltages, currents, losses, and component status
"""
try:
pp.runpp(
self.net,
algorithm="nr", # Newton-Raphson
calculate_voltage_angles=True,
max_iteration=50,
tolerance_mva=1e-8,
enforce_q_lims=True
)
results = {
"converged": True,
"buses": self._extract_bus_results(),
"lines": self._extract_line_results(),
"transformer": self._extract_trafo_results(),
"losses_mw": float(self.net.res_ext_grid["p_mw"].sum() -
self.net.res_load["p_mw"].sum() +
self.net.res_sgen["p_mw"].sum()),
"timestamp": pd.Timestamp.now().isoformat()
}
return results
except pp.LoadflowNotConverged:
return {
"converged": False,
"error": "Power flow did not converge - possible limit violation",
"timestamp": pd.Timestamp.now().isoformat()
}
def _extract_bus_results(self) -> list:
results = []
for idx, row in self.net.res_bus.iterrows():
bus_name = self.net.bus.at[idx, "name"]
vn_kv = self.net.bus.at[idx, "vn_kv"]
results.append({
"bus_id": int(idx),
"name": bus_name,
"vm_pu": float(row["vm_pu"]),
"vm_kv": float(row["vm_pu"] * vn_kv),
"va_degree": float(row["va_degree"]),
"within_limits": 0.95 <= float(row["vm_pu"]) <= 1.05
})
return results
def _extract_line_results(self) -> list:
results = []
for idx, row in self.net.res_line.iterrows():
results.append({
"line_id": int(idx),
"name": self.net.line.at[idx, "name"],
"loading_percent": float(row["loading_percent"]),
"i_ka": float(row["i_ka"]),
"pl_mw": float(row["pl_mw"]),
"overloaded": float(row["loading_percent"]) > 100.0
})
return results
def _extract_trafo_results(self) -> dict:
row = self.net.res_trafo.iloc[0]
return {
"loading_percent": float(row["loading_percent"]),
"pl_mw": float(row["pl_mw"]),
"i_hv_ka": float(row["i_hv_ka"]),
"i_lv_ka": float(row["i_lv_ka"]),
"overloaded": float(row["loading_percent"]) > 100.0
}
def check_n1_contingency(self, line_to_remove: int) -> dict:
"""
N-1 contingency simulation: removes a line and verifies network stability.
Args:
line_to_remove: ID of the line to take out of service
Returns:
Post-contingency power flow results with violations
"""
# Save original state
original_in_service = self.net.line.at[line_to_remove, "in_service"]
# Apply contingency
self.net.line.at[line_to_remove, "in_service"] = False
try:
pp.runpp(self.net, algorithm="nr", max_iteration=50)
violations = []
for idx, row in self.net.res_bus.iterrows():
if not (0.90 <= float(row["vm_pu"]) <= 1.10):
violations.append({
"type": "voltage",
"bus": self.net.bus.at[idx, "name"],
"vm_pu": float(row["vm_pu"])
})
for idx, row in self.net.res_line.iterrows():
if float(row["loading_percent"]) > 120.0:
violations.append({
"type": "overload",
"line": self.net.line.at[idx, "name"],
"loading_percent": float(row["loading_percent"])
})
result = {
"contingency": f"N-1 Line {line_to_remove}",
"converged": True,
"n_violations": len(violations),
"violations": violations,
"secure": len(violations) == 0
}
except pp.LoadflowNotConverged:
result = {
"contingency": f"N-1 Line {line_to_remove}",
"converged": False,
"secure": False,
"error": "Network insecure: power flow did not converge post-contingency"
}
finally:
# Restore original state
self.net.line.at[line_to_remove, "in_service"] = original_in_service
return result
Transformer Thermal Model
The transformer is the most critical asset in a substation. Its health depends directly on the temperature of the oil and windings. The IEC 60076-7 standard defines the analytical thermal model that underpins most industrial transformer digital twins.
import numpy as np
from dataclasses import dataclass
@dataclass
class TransformerThermalModel:
"""
Transformer thermal model per IEC 60076-7.
Implements the exponential model for calculating oil and winding temperatures.
"""
# Design parameters (from nameplate / FAT measurements)
rated_power_mva: float = 40.0
theta_amb_ref: float = 20.0 # reference ambient temperature
delta_theta_or: float = 55.0 # oil temperature rise at full load
delta_theta_hr: float = 23.0 # winding-to-oil gradient at full load
tau_o: float = 3.0 * 3600 # oil thermal time constant [s]
tau_w: float = 10 * 60 # winding thermal time constant [s]
n_exp: float = 0.9 # oil load exponent (ONAN)
m_exp: float = 0.8 # winding load exponent
r_load: float = 6.0 # load/no-load loss ratio
# Initial state
theta_oil: float = 20.0 # current oil temperature
theta_winding: float = 20.0 # current winding temperature
def step(
self,
load_fraction: float,
ambient_temp: float,
dt_seconds: float = 60.0
) -> dict:
"""
Update model with time step dt.
Args:
load_fraction: normalized load (0=no load, 1=full load)
ambient_temp: ambient temperature in degrees C
dt_seconds: time step in seconds
Returns:
Updated thermal state with oil temp, winding temp, and HST
"""
# Steady-state oil temperature rise at this load
k = load_fraction # load factor
delta_theta_o_steady = self.delta_theta_or * (
(1 + self.r_load * k**2) / (1 + self.r_load)
) ** self.n_exp
# Oil dynamics (first differential equation)
d_theta_oil = (
(ambient_temp + delta_theta_o_steady - self.theta_oil) / self.tau_o
) * dt_seconds
self.theta_oil = self.theta_oil + d_theta_oil
# Steady-state winding gradient
delta_theta_h_steady = self.delta_theta_hr * (k ** (2 * self.m_exp))
# Hot Spot Temperature (HST) - hottest winding point
# Faster winding dynamics
d_theta_winding = (
(self.theta_oil + delta_theta_h_steady - self.theta_winding) / self.tau_w
) * dt_seconds
self.theta_winding = self.theta_winding + d_theta_winding
# Aging Acceleration Factor (AAF) for insulation life consumption
# Reference: theta_ref = 98 deg C for Class A insulation
theta_ref = 98.0
aaf = np.exp(
15000.0 / (theta_ref + 273.0) - 15000.0 / (self.theta_winding + 273.0)
)
return {
"theta_oil_c": round(self.theta_oil, 2),
"theta_winding_hst_c": round(self.theta_winding, 2),
"theta_ambient_c": ambient_temp,
"load_fraction": load_fraction,
"aaf": round(float(aaf), 4),
"thermal_limit_exceeded": self.theta_winding > 120.0, # continuous limit
"emergency_limit_exceeded": self.theta_winding > 140.0 # emergency limit
}
def calculate_rul_hours(self, avg_hst: float) -> float:
"""
Estimate Remaining Useful Life (RUL) of cellulosic insulation.
Based on the Arrhenius model (IEC 60076-7 Section 8).
Args:
avg_hst: average Hot Spot temperature in degrees C
Returns:
Estimated remaining life hours
"""
# Nominal insulation life at 98 deg C = 65,000 hours (IEC 60076-7)
nominal_life_hours = 65_000.0
theta_ref = 98.0
# Normalized aging rate
per_unit_life_remaining = 1.0 # assume 100% remaining life
aaf_avg = np.exp(
15000.0 / (theta_ref + 273.15) - 15000.0 / (avg_hst + 273.15)
)
# Remaining hours
rul_hours = nominal_life_hours * per_unit_life_remaining / aaf_avg
return round(rul_hours, 0)
Data Ingestion: From SCADA and IoT to the Digital Twin
The quality of a digital twin depends directly on the quality and latency of its input data. In a real energy installation we must handle heterogeneous sources: RTU/IED via IEC 61850/DNP3, IoT sensors via MQTT, weather forecasts via REST APIs, and smart meters via DLMS/COSEM.
import asyncio
import aiohttp
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import json
from datetime import datetime, timezone
from typing import Callable
class EnergyDataIngestion:
"""
Multi-source data ingestion for energy digital twin.
Supports MQTT (IoT sensors), REST API (weather, market), SCADA (simulated).
"""
def __init__(
self,
influx_url: str,
influx_token: str,
influx_org: str,
influx_bucket: str,
mqtt_broker: str,
mqtt_port: int = 1883
):
# InfluxDB client for time-series storage
self._influx = InfluxDBClient(
url=influx_url,
token=influx_token,
org=influx_org
)
self._write_api = self._influx.write_api(write_options=SYNCHRONOUS)
self._bucket = influx_bucket
self._org = influx_org
# MQTT client for IoT sensors
self._mqtt_client = mqtt.Client(client_id="dt-ingestion-01")
self._mqtt_client.on_connect = self._on_mqtt_connect
self._mqtt_client.on_message = self._on_mqtt_message
self._mqtt_broker = mqtt_broker
self._mqtt_port = mqtt_port
# Callback to update the model in real time
self._model_update_callbacks: list[Callable] = []
def register_model_callback(self, cb: Callable) -> None:
"""Register callback invoked on every data update."""
self._model_update_callbacks.append(cb)
def _on_mqtt_connect(self, client, userdata, flags, rc):
if rc == 0:
# Subscribe to energy topics
topics = [
"substation/+/transformer/+/measurements",
"substation/+/feeder/+/measurements",
"substation/+/weather/measurements",
"substation/+/pv/measurements"
]
for topic in topics:
client.subscribe(topic, qos=1)
print(f"MQTT connected to broker - subscribed to {len(topics)} topics")
def _on_mqtt_message(self, client, userdata, msg):
"""Process MQTT message with parsing and storage."""
try:
payload = json.loads(msg.payload.decode())
topic_parts = msg.topic.split("/")
# Extract metadata from topic
substation_id = topic_parts[1]
asset_type = topic_parts[2]
asset_id = topic_parts[3]
# Create InfluxDB point
point = (
Point(asset_type)
.tag("substation", substation_id)
.tag("asset_id", asset_id)
.time(
datetime.fromisoformat(
payload.get("timestamp", datetime.now(timezone.utc).isoformat())
)
)
)
# Add measurements as fields
for key, value in payload.get("measurements", {}).items():
if isinstance(value, (int, float)):
point = point.field(key, float(value))
# Write to InfluxDB
self._write_api.write(
bucket=self._bucket,
org=self._org,
record=point
)
# Notify callbacks (model update)
for cb in self._model_update_callbacks:
cb(asset_type, asset_id, payload)
except (json.JSONDecodeError, KeyError, ValueError) as e:
print(f"Error parsing MQTT message: {e} - topic: {msg.topic}")
async def fetch_weather_forecast(
self,
lat: float,
lon: float,
session: aiohttp.ClientSession
) -> dict:
"""
Fetch weather forecasts from Open-Meteo API (free, no API key required).
Used for ambient temperature correction in the transformer thermal model.
"""
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat,
"longitude": lon,
"hourly": "temperature_2m,windspeed_10m,direct_radiation",
"forecast_days": 3,
"timezone": "Europe/Rome"
}
async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
data = await resp.json()
return {
"hourly_temp_c": data["hourly"]["temperature_2m"],
"hourly_wind_ms": data["hourly"]["windspeed_10m"],
"hourly_radiation_wm2": data["hourly"]["direct_radiation"],
"times": data["hourly"]["time"]
}
else:
raise ValueError(f"Weather API error: HTTP {resp.status}")
def start_mqtt(self) -> None:
"""Start MQTT connection in a separate thread."""
self._mqtt_client.connect(self._mqtt_broker, self._mqtt_port, keepalive=60)
self._mqtt_client.loop_start()
def stop(self) -> None:
"""Clean up resources."""
self._mqtt_client.loop_stop()
self._mqtt_client.disconnect()
self._influx.close()
Real-Time Synchronization and the Digital Twin Loop
The "digital twin loop" is the continuous cycle that keeps the digital model synchronized with the physical asset. Synchronization frequency depends on process dynamics: for SCADA state measurements, 30-60 seconds is typically sufficient; for IEC 61850 protection, it drops to milliseconds.
import asyncio
import logging
from datetime import datetime, timezone
from typing import Optional
logger = logging.getLogger(__name__)
class DigitalTwinOrchestrator:
"""
Main orchestrator for the energy digital twin.
Manages the synchronization loop and coordinates the different models.
"""
def __init__(
self,
twin: EnergyDigitalTwin,
ingestion: EnergyDataIngestion,
thermal_model: TransformerThermalModel,
sync_interval_s: float = 30.0
):
self.twin = twin
self.ingestion = ingestion
self.thermal_model = thermal_model
self.sync_interval_s = sync_interval_s
self._running = False
self._latest_scada: dict = {}
self._latest_weather: dict = {}
# Register callback for IoT updates
self.ingestion.register_model_callback(self._handle_sensor_update)
def _handle_sensor_update(
self,
asset_type: str,
asset_id: str,
payload: dict
) -> None:
"""IoT callback: updates SCADA data cache."""
measurements = payload.get("measurements", {})
if asset_type == "transformer":
self._latest_scada["transformer"] = measurements
elif asset_type == "feeder":
feeder_data = self._latest_scada.get("feeders", {})
feeder_data[asset_id] = measurements
self._latest_scada["feeders"] = feeder_data
async def run(self) -> None:
"""Main digital twin loop - runs continuously."""
self._running = True
self.ingestion.start_mqtt()
logger.info(f"Digital Twin '{self.twin.config.name}' started")
async with aiohttp.ClientSession() as session:
while self._running:
cycle_start = asyncio.get_event_loop().time()
try:
await self._sync_cycle(session)
except Exception as e:
logger.error(f"Error in sync cycle: {e}", exc_info=True)
# Maintain sync frequency
elapsed = asyncio.get_event_loop().time() - cycle_start
sleep_time = max(0, self.sync_interval_s - elapsed)
await asyncio.sleep(sleep_time)
async def _sync_cycle(self, session: aiohttp.ClientSession) -> None:
"""Single synchronization cycle."""
# 1. Update weather every 30 minutes
now = datetime.now(timezone.utc)
if now.minute % 30 == 0:
try:
self._latest_weather = await self.ingestion.fetch_weather_forecast(
lat=40.85,
lon=16.55,
session=session
)
except Exception as e:
logger.warning(f"Weather fetch failed: {e}")
# 2. Assemble SCADA data for model update
scada_snapshot = self._build_scada_snapshot()
# 3. Update electrical network in the digital twin
self.twin.update_from_scada(scada_snapshot)
# 4. Run power flow
pf_result = self.twin.run_power_flow()
# 5. Update transformer thermal model
ambient_temp = self._get_ambient_temperature()
trafo_loading = pf_result.get("transformer", {}).get("loading_percent", 0) / 100.0
thermal_state = self.thermal_model.step(
load_fraction=trafo_loading,
ambient_temp=ambient_temp,
dt_seconds=self.sync_interval_s
)
# 6. Check alarms and generate alerts
self._check_alerts(pf_result, thermal_state)
# 7. Publish state to InfluxDB (for Grafana dashboard)
self._publish_twin_state(pf_result, thermal_state)
logger.debug(
f"Sync OK | Conv={pf_result['converged']} | "
f"HST={thermal_state['theta_winding_hst_c']} deg C | "
f"Trafo={pf_result.get('transformer', {}).get('loading_percent', 0):.1f}%"
)
def _build_scada_snapshot(self) -> dict:
"""Builds a normalized SCADA snapshot for the model."""
feeders = self._latest_scada.get("feeders", {})
n_feeders = self.twin.config.n_feeders
# Extract per-feeder powers (with fallback to last known value)
load_per_feeder = []
for i in range(n_feeders):
feeder_key = f"feeder_{i+1:02d}"
feeder_data = feeders.get(feeder_key, {})
p_mw = feeder_data.get("active_power_mw",
self.twin.config.peak_load_mw / n_feeders * 0.7)
load_per_feeder.append(float(p_mw))
trafo_data = self._latest_scada.get("transformer", {})
return {
"load_mw_per_feeder": load_per_feeder,
"pv_mw": trafo_data.get("pv_injection_mw", 0.0),
"tap_position": int(trafo_data.get("tap_position", 0)),
"ambient_temp_c": self._get_ambient_temperature()
}
def _get_ambient_temperature(self) -> float:
"""Returns current ambient temperature (from weather API or default)."""
weather = self._latest_weather
if weather and "hourly_temp_c" in weather:
hour_idx = datetime.now().hour
temps = weather["hourly_temp_c"]
if 0 <= hour_idx < len(temps):
return float(temps[hour_idx])
return 20.0 # default
def _check_alerts(self, pf_result: dict, thermal_state: dict) -> None:
"""Generate alerts for anomalous conditions."""
alerts = []
if not pf_result.get("converged", True):
alerts.append({"severity": "CRITICAL", "msg": "Power flow did not converge"})
if thermal_state.get("thermal_limit_exceeded"):
hst = thermal_state["theta_winding_hst_c"]
alerts.append({
"severity": "WARNING",
"msg": f"Transformer HST {hst} deg C exceeds continuous limit of 120 deg C"
})
trafo = pf_result.get("transformer", {})
if trafo.get("overloaded"):
alerts.append({
"severity": "WARNING",
"msg": f"Transformer overloaded: {trafo['loading_percent']:.1f}%"
})
for line in pf_result.get("lines", []):
if line.get("overloaded"):
alerts.append({
"severity": "WARNING",
"msg": f"Line {line['name']} overloaded: {line['loading_percent']:.1f}%"
})
for alert in alerts:
logger.warning(f"[ALERT {alert['severity']}] {alert['msg']}")
def _publish_twin_state(self, pf_result: dict, thermal_state: dict) -> None:
"""Publish current state to InfluxDB for Grafana dashboard."""
from influxdb_client import Point
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
points = []
# Power flow point
trafo = pf_result.get("transformer", {})
pf_point = (
Point("digital_twin_powerflow")
.tag("substation", self.twin.config.name)
.field("converged", int(pf_result.get("converged", 0)))
.field("trafo_loading_pct", trafo.get("loading_percent", 0.0))
.field("trafo_losses_mw", trafo.get("pl_mw", 0.0))
.field("total_losses_mw", pf_result.get("losses_mw", 0.0))
.time(now)
)
points.append(pf_point)
# Thermal point
th_point = (
Point("digital_twin_thermal")
.tag("substation", self.twin.config.name)
.field("theta_oil_c", thermal_state["theta_oil_c"])
.field("theta_hst_c", thermal_state["theta_winding_hst_c"])
.field("aaf", thermal_state["aaf"])
.field("ambient_c", thermal_state["theta_ambient_c"])
.time(now)
)
points.append(th_point)
self.ingestion._write_api.write(
bucket=self.ingestion._bucket,
org=self.ingestion._org,
record=points
)
def stop(self) -> None:
self._running = False
self.ingestion.stop()
Predictive Maintenance with ML: Transformer RUL
Predictive maintenance is one of the most mature use cases for energy digital twins. An HV/MV transformer costs between 500,000 and 5,000,000 euros: an unexpected failure causes blackouts, emergency costs, and regulatory penalties. With a well-trained ML model, faults can be anticipated 3-6 weeks in advance, reducing unplanned failures by 35% according to 2025 data.
Feature Engineering for Transformers
The most predictive features for transformer health combine electrical, thermal, and oil chemistry measurements (DGA - Dissolved Gas Analysis):
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, mean_absolute_error
import xgboost as xgb
import joblib
from pathlib import Path
class TransformerHealthPredictor:
"""
ML model for transformer health state and RUL prediction.
Uses Random Forest for anomaly classification + XGBoost for RUL regression.
"""
# Feature set per IEC 60599 (DGA interpretation) + electrical/thermal measurements
FEATURE_COLUMNS = [
# DGA - Dissolved Gas Analysis (ppm)
"h2_ppm", # hydrogen - partial discharge indicator
"ch4_ppm", # methane - oil overheating
"c2h2_ppm", # acetylene - electrical arcs (critical)
"c2h4_ppm", # ethylene - severe overheating
"c2h6_ppm", # ethane - mild overheating
"co_ppm", # carbon monoxide - cellulose degradation
"co2_ppm", # carbon dioxide - cellulose degradation
# Electrical measurements
"load_factor", # average load factor (0-1)
"load_factor_peak", # peak load factor
"tap_operations_30d", # tap changer operations in last 30 days
"voltage_thd_pct", # voltage total harmonic distortion %
# Thermal measurements
"avg_hst_30d", # average hot spot temperature over 30 days
"max_hst_30d", # peak hot spot temperature over 30 days
"ambient_temp_avg", # average ambient temperature
# Oil measurements
"moisture_ppm", # oil moisture (insulation degradation)
"acidity_mg_koh_g", # oil acidity
"breakdown_voltage_kv", # oil dielectric strength
# Engineered features
"total_dissolved_gas_ppm", # sum of combustible gases
"methane_ratio", # CH4/(CH4+C2H4) Duval ratio
"age_years", # transformer age
"cumulative_aaf", # cumulative AAF (insulation life consumption)
]
def __init__(self, model_dir: str = "models/transformer_health"):
self.model_dir = Path(model_dir)
self.model_dir.mkdir(parents=True, exist_ok=True)
self.scaler = StandardScaler()
# Anomaly classifier: normal / warning / critical
self.anomaly_classifier = RandomForestClassifier(
n_estimators=200,
max_depth=12,
min_samples_leaf=5,
class_weight="balanced", # handles class imbalance
n_jobs=-1,
random_state=42
)
# RUL regressor: remaining insulation life hours
self.rul_regressor = xgb.XGBRegressor(
n_estimators=300,
max_depth=6,
learning_rate=0.05,
subsample=0.8,
colsample_bytree=0.8,
objective="reg:squarederror",
eval_metric="mae",
random_state=42
)
self._is_fitted = False
def engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Compute derived features from raw measurements."""
result = df.copy()
# Total combustible gases (TDCG per IEEE C57.104)
gas_cols = ["h2_ppm", "ch4_ppm", "c2h2_ppm", "c2h4_ppm", "c2h6_ppm"]
available_gas = [c for c in gas_cols if c in result.columns]
result["total_dissolved_gas_ppm"] = result[available_gas].sum(axis=1)
# Duval ratio for fault type diagnosis
if "ch4_ppm" in result.columns and "c2h4_ppm" in result.columns:
denom = result["ch4_ppm"] + result["c2h4_ppm"]
result["methane_ratio"] = np.where(
denom > 0,
result["ch4_ppm"] / denom,
0.5
)
# Normalized cumulative load (degradation proxy)
if "avg_hst_30d" in result.columns and "age_years" in result.columns:
theta_ref = 98.0
result["cumulative_aaf"] = np.exp(
15000.0 / (theta_ref + 273.15) -
15000.0 / (result["avg_hst_30d"] + 273.15)
) * result["age_years"]
# Fill missing columns
for col in self.FEATURE_COLUMNS:
if col not in result.columns:
result[col] = result.get(col, 0.0)
return result[self.FEATURE_COLUMNS].fillna(0.0)
def fit(
self,
df_train: pd.DataFrame,
y_anomaly: pd.Series,
y_rul_hours: pd.Series
) -> dict:
"""
Train both models.
Args:
df_train: DataFrame with historical measurements
y_anomaly: labels {0=normal, 1=warning, 2=critical}
y_rul_hours: actual remaining life hours (from fault history)
Returns:
Training metrics
"""
X = self.engineer_features(df_train)
X_scaled = self.scaler.fit_transform(X)
# Train/validation split
X_tr, X_val, y_a_tr, y_a_val, y_r_tr, y_r_val = train_test_split(
X_scaled, y_anomaly, y_rul_hours,
test_size=0.2, random_state=42, stratify=y_anomaly
)
# Train classifier
self.anomaly_classifier.fit(X_tr, y_a_tr)
cv_scores = cross_val_score(
self.anomaly_classifier, X_scaled, y_anomaly,
cv=5, scoring="f1_weighted"
)
# Train RUL regressor
self.rul_regressor.fit(
X_tr, y_r_tr,
eval_set=[(X_val, y_r_val)],
verbose=False
)
y_pred_rul = self.rul_regressor.predict(X_val)
mae_rul = mean_absolute_error(y_r_val, y_pred_rul)
self._is_fitted = True
# Save models
joblib.dump(self.scaler, self.model_dir / "scaler.pkl")
joblib.dump(self.anomaly_classifier, self.model_dir / "anomaly_rf.pkl")
self.rul_regressor.save_model(str(self.model_dir / "rul_xgb.json"))
return {
"anomaly_f1_cv": float(cv_scores.mean()),
"anomaly_f1_std": float(cv_scores.std()),
"rul_mae_hours": float(mae_rul),
"n_train_samples": len(X_tr),
"feature_importances": dict(zip(
self.FEATURE_COLUMNS,
self.anomaly_classifier.feature_importances_.tolist()
))
}
def predict(self, measurements: dict) -> dict:
"""
Predict health state for a transformer.
Args:
measurements: dictionary with current measurements
Returns:
Health state, probabilities, and estimated RUL
"""
if not self._is_fitted:
raise RuntimeError("Model not trained. Call fit() first.")
df = pd.DataFrame([measurements])
X = self.engineer_features(df)
X_scaled = self.scaler.transform(X)
anomaly_label = int(self.anomaly_classifier.predict(X_scaled)[0])
anomaly_proba = self.anomaly_classifier.predict_proba(X_scaled)[0].tolist()
rul_hours = max(0.0, float(self.rul_regressor.predict(X_scaled)[0]))
status_map = {0: "NORMAL", 1: "WARNING", 2: "CRITICAL"}
return {
"health_status": status_map[anomaly_label],
"probability_normal": round(anomaly_proba[0], 3),
"probability_warning": round(anomaly_proba[1], 3),
"probability_critical": round(anomaly_proba[2], 3),
"rul_hours": round(rul_hours, 0),
"rul_days": round(rul_hours / 24, 1),
"maintenance_urgency": "IMMEDIATE" if anomaly_label == 2 else
"PLANNED" if anomaly_label == 1 else "ROUTINE",
"top_risk_features": self._get_top_risk_features(X)
}
def _get_top_risk_features(self, X: pd.DataFrame) -> list:
"""Return the top-3 features contributing to risk."""
importances = self.anomaly_classifier.feature_importances_
feature_values = X.iloc[0].values
risk_scores = importances * np.abs(feature_values)
top_indices = np.argsort(risk_scores)[::-1][:3]
return [
{
"feature": self.FEATURE_COLUMNS[i],
"importance": round(float(importances[i]), 4),
"value": round(float(feature_values[i]), 3)
}
for i in top_indices
]
Network Optimization with PyPSA: What-If Scenarios
PyPSA (Python for Power System Analysis) is the reference tool for optimizing energy networks at national scale. It enables complex what-if scenarios: adding renewable generation, network expansion, storage integration, and economic dispatch optimization.
import pypsa
import pandas as pd
import numpy as np
class GridExpansionTwin:
"""
Digital twin for network expansion planning with PyPSA.
Uses linear optimization to find the optimal generation + storage mix.
"""
def __init__(self, network_name: str = "Southern_Grid"):
self.network = pypsa.Network()
self.network.name = network_name
# Temporal resolution: 8760 hours (one year)
self.network.set_snapshots(
pd.date_range("2025-01-01", periods=8760, freq="h")
)
def build_regional_network(
self,
buses_df: pd.DataFrame,
lines_df: pd.DataFrame,
generators_df: pd.DataFrame,
loads_df: pd.DataFrame
) -> None:
"""
Build network model from CIM/CSV data.
Args:
buses_df: nodes with nominal voltage
lines_df: lines with impedance and capacity
generators_df: generators with cost curves
loads_df: annual hourly load profiles
"""
# Add buses
for _, bus in buses_df.iterrows():
self.network.add(
"Bus",
name=bus["name"],
v_nom=bus["v_nom_kv"],
x=bus.get("longitude", 0.0),
y=bus.get("latitude", 0.0)
)
# Add lines
for _, line in lines_df.iterrows():
self.network.add(
"Line",
name=line["name"],
bus0=line["bus0"],
bus1=line["bus1"],
r=line["r_ohm"],
x=line["x_ohm"],
s_nom=line["s_nom_mva"],
s_nom_extendable=bool(line.get("extendable", False)),
s_nom_max=line.get("s_nom_max_mva", float("inf"))
)
# Add generators with marginal costs
for _, gen in generators_df.iterrows():
self.network.add(
"Generator",
name=gen["name"],
bus=gen["bus"],
p_nom=gen["p_nom_mw"],
p_nom_extendable=bool(gen.get("extendable", False)),
p_nom_max=gen.get("p_nom_max_mw", float("inf")),
marginal_cost=gen.get("marginal_cost_eur_mwh", 0.0),
capital_cost=gen.get("capital_cost_eur_mw", 0.0),
carrier=gen.get("carrier", "gas"),
efficiency=gen.get("efficiency", 0.45),
committable=False
)
# Add loads with hourly profiles
for _, load in loads_df.iterrows():
p_set = load.get("hourly_profile") # 8760-value time series
self.network.add(
"Load",
name=load["name"],
bus=load["bus"],
p_set=p_set if p_set is not None else load["p_mw"]
)
def add_storage(
self,
bus: str,
p_nom_mw: float,
energy_capacity_mwh: float,
capital_cost_eur_mw: float = 800_000
) -> None:
"""Add a battery energy storage system (BESS) to the network."""
self.network.add(
"StorageUnit",
name=f"BESS_{bus}",
bus=bus,
p_nom=p_nom_mw,
p_nom_extendable=True,
p_nom_max=p_nom_mw * 3,
max_hours=energy_capacity_mwh / p_nom_mw,
capital_cost=capital_cost_eur_mw,
marginal_cost=0.0,
efficiency_store=0.92,
efficiency_dispatch=0.92,
cyclic_state_of_charge=True
)
def run_capacity_expansion(self) -> dict:
"""
Network expansion optimization: find optimal investments
minimizing total cost (CAPEX + OPEX) over an annual horizon.
"""
self.network.optimize(
solver_name="highs", # HiGHS open-source solver
solver_options={
"time_limit": 300, # max 5 minutes
"mip_gap": 0.01 # 1% optimality gap
}
)
total_cost = float(self.network.objective)
gen_by_carrier = (
self.network.generators_t.p
.multiply(self.network.snapshot_weightings.generators, axis=0)
.groupby(self.network.generators.carrier, axis=1)
.sum()
.sum()
/ 1e6 # MWh -> TWh
)
line_loading = (
self.network.lines_t.p0.abs() /
self.network.lines.s_nom
).max()
new_capacity = self.network.generators[
self.network.generators.p_nom_extendable
]["p_nom_opt"]
return {
"total_system_cost_meur": round(total_cost / 1e6, 2),
"generation_twh": gen_by_carrier.to_dict(),
"congested_lines": line_loading[line_loading > 0.9].to_dict(),
"optimal_new_capacity_mw": new_capacity.to_dict(),
"co2_emissions_kt": self._calculate_emissions()
}
def simulate_n1_contingencies(self) -> pd.DataFrame:
"""
Simulate all N-1 contingencies and return a violations report.
Useful for security assessment of the planned network.
"""
results = []
lines = self.network.lines.index.tolist()
for line_name in lines:
n_contingency = self.network.copy()
n_contingency.remove("Line", line_name)
try:
n_contingency.lpf() # Linearized Power Flow (faster)
max_loading = (
n_contingency.lines_t.p0.abs() /
n_contingency.lines.s_nom
).max().max()
results.append({
"contingency": f"N-1: {line_name}",
"max_line_loading": float(max_loading),
"secure": bool(max_loading <= 1.0),
"severity": "OK" if max_loading <= 1.0 else
"WARNING" if max_loading <= 1.2 else "CRITICAL"
})
except Exception as e:
results.append({
"contingency": f"N-1: {line_name}",
"max_line_loading": float("inf"),
"secure": False,
"severity": "CRITICAL",
"error": str(e)
})
return pd.DataFrame(results).sort_values("max_line_loading", ascending=False)
def _calculate_emissions(self) -> float:
"""Calculate total CO2 emissions in kt."""
emission_factors = {
"gas": 0.202, # tCO2/MWh gas CCGT
"coal": 0.820, # tCO2/MWh coal
"nuclear": 0.012, # tCO2/MWh nuclear (lifecycle)
"wind": 0.007, # tCO2/MWh wind
"solar": 0.040, # tCO2/MWh photovoltaic
"hydro": 0.004 # tCO2/MWh hydropower
}
total_kt = 0.0
for carrier, factor in emission_factors.items():
gen_col = [
c for c in self.network.generators_t.p.columns
if self.network.generators.at[c, "carrier"] == carrier
]
if gen_col:
gen_mwh = self.network.generators_t.p[gen_col].sum().sum()
total_kt += gen_mwh * factor / 1000.0
return round(total_kt, 2)
Common Information Model (CIM): IEC 61970/61968
The CIM (Common Information Model) is the international standard for the semantic representation of electrical network assets. Developed by the IEC and maintained by ENTSO-E through the CGMES (Common Grid Model Exchange Specification), the CIM defines a shared ontology that enables systems from different vendors to exchange network models without information loss.
| Standard | Scope | Key Packages | Digital Twin Use |
|---|---|---|---|
| IEC 61970-301 | Core Network Model | TopologyPackage, WiresPackage | Network topology, impedances, transformers |
| IEC 61970-456 | State Variables | StateVariablesPackage | Real-time state: voltages, flows, setpoints |
| IEC 61968-4 | Asset Management | AssetPackage, WorkPackage | Asset data, work orders, maintenance records |
| CGMES 3.0 | Exchange Format | EQ, TP, SV, SSH, DY | Model exchange between pan-European TSOs/DSOs |
from dataclasses import dataclass, field
import uuid
from enum import Enum
from typing import Optional
class WindingConnection(Enum):
"""IEC CIM - Transformer winding connection type."""
D = "D" # Delta
Y = "Y" # Wye (star)
Z = "Z" # Zigzag
Yn = "Yn" # Wye with neutral
@dataclass
class CIMIdentifiedObject:
"""CIM base class for identified objects."""
mrid: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
alias_name: str = ""
@dataclass
class CIMSubstation(CIMIdentifiedObject):
"""CIM Substation - represents an electrical substation."""
region_name: str = ""
voltage_levels: list = field(default_factory=list)
equipment: list = field(default_factory=list)
@dataclass
class CIMPowerTransformer(CIMIdentifiedObject):
"""CIM PowerTransformer - power transformer."""
vector_group: str = "Dyn11"
is_phase_shifting: bool = False
winding_ends: list = field(default_factory=list)
@dataclass
class CIMPowerTransformerEnd:
"""CIM PowerTransformerEnd - transformer winding."""
end_number: int = 1
rated_s_mva: float = 0.0
rated_u_kv: float = 0.0
r_ohm: float = 0.0
x_ohm: float = 0.0
connection_kind: WindingConnection = WindingConnection.Y
grounded: bool = False
class CIMModelBuilder:
"""
Builds a CIM model from structured data for CGMES export.
Enables interoperability with EMS/SCADA from different vendors.
"""
def __init__(self):
self._substations: dict[str, CIMSubstation] = {}
self._transformers: dict[str, CIMPowerTransformer] = {}
def add_substation(self, name: str, region: str = "") -> CIMSubstation:
"""Create and register a substation in the CIM model."""
sub = CIMSubstation(name=name, region_name=region)
self._substations[sub.mrid] = sub
return sub
def add_transformer(
self,
name: str,
substation: CIMSubstation,
hv_kv: float,
lv_kv: float,
rated_mva: float,
uk_percent: float = 8.5,
vector_group: str = "Dyn11"
) -> CIMPowerTransformer:
"""Create a transformer with both windings."""
trafo = CIMPowerTransformer(name=name, vector_group=vector_group)
# Impedance calculation in ohms (referred to HV side)
z_base = (hv_kv ** 2) / rated_mva # ohm base HV side
z_total = (uk_percent / 100.0) * z_base
r_hv = z_total * 0.1 # approximation: R/Z ~ 0.1 for HV/MV transformer
x_hv = z_total * 0.995
# Primary winding (HV)
end_hv = CIMPowerTransformerEnd(
end_number=1,
rated_s_mva=rated_mva,
rated_u_kv=hv_kv,
r_ohm=r_hv,
x_ohm=x_hv,
connection_kind=WindingConnection.D,
grounded=False
)
# Secondary winding (LV)
end_lv = CIMPowerTransformerEnd(
end_number=2,
rated_s_mva=rated_mva,
rated_u_kv=lv_kv,
r_ohm=0.0,
x_ohm=0.0,
connection_kind=WindingConnection.Yn,
grounded=True
)
trafo.winding_ends = [end_hv, end_lv]
self._transformers[trafo.mrid] = trafo
substation.equipment.append(trafo)
return trafo
def to_cgmes_dict(self) -> dict:
"""
Export model to CGMES-compatible format (simplified JSON-LD).
In production, use an RDF/XML serializer compliant with CGMES 3.0.
"""
return {
"@context": "https://iec.ch/TC57/CIM100",
"Substation": [
{
"@id": f"_{sub.mrid}",
"@type": "cim:Substation",
"cim:IdentifiedObject.name": sub.name,
"cim:Substation.Region": sub.region_name
}
for sub in self._substations.values()
],
"PowerTransformer": [
{
"@id": f"_{trafo.mrid}",
"@type": "cim:PowerTransformer",
"cim:IdentifiedObject.name": trafo.name,
"cim:PowerTransformer.vectorGroup": trafo.vector_group,
"cim:PowerTransformer.PowerTransformerEnd": [
{
"cim:PowerTransformerEnd.endNumber": end.end_number,
"cim:PowerTransformerEnd.ratedS":
{"value": end.rated_s_mva, "unit": "MVA"},
"cim:PowerTransformerEnd.ratedU":
{"value": end.rated_u_kv, "unit": "kV"}
}
for end in trafo.winding_ends
]
}
for trafo in self._transformers.values()
]
}
Cloud Platforms: Azure Digital Twins vs AWS IoT TwinMaker
Choosing the right cloud platform is a critical step in scaling a digital twin from pilot to production. The two leading platforms take different approaches:
| Criterion | Azure Digital Twins | AWS IoT TwinMaker |
|---|---|---|
| Data model | DTDL (Digital Twin Definition Language), customizable ontologies | Scene component model, workspace-based |
| SCADA/OT integration | Excellent via Azure IoT Hub + Industrial IoT OPC-UA | Good via AWS IoT Core + Greengrass edge |
| 3D visualization | HoloLens 2, Power BI, partner ecosystem | TwinMaker Scene Composer (Babylon.js), native Grafana plugin |
| Analytics | Azure Data Explorer, Synapse, Power BI | Amazon Timestream, Grafana, Athena |
| ML integration | Azure ML, Cognitive Services | Amazon SageMaker |
| Pricing (base) | $0.10/1000 queries + $0.50/GB storage/month | $0.05/1000 property reads + $0.50/GB/month |
| CIM/CGMES support | Azure Data Manager for Energy (OSDU) | Partial support via custom connectors |
| Energy sector | Excellent: Azure Energy Data Manager, OSDU | Strong for generic OT, less energy-specific |
| Ideal use case | Utilities, TSO/DSO, large SCADA installations | Wind farms, photovoltaic, smart buildings |
Recommendation for the Energy Sector
For utilities and grid operators (TSO/DSO) with existing SCADA systems and NERC/ENTSO-E compliance requirements, Azure Digital Twins with Azure Data Manager for Energy (based on OSDU) offers the best integration with industry standards (CIM/CGMES). For renewable generation operators (wind, solar) with 3D visualization and Grafana integration needs, AWS IoT TwinMaker provides a smoother developer experience.
Edge Digital Twin: Lightweight Models on Gateways
Not all computations can wait for a cloud round-trip. For scenarios where latency is critical (protection, demand response, real-time EV charging), an edge digital twin is implemented on an industrial gateway (Raspberry Pi 4, Siemens IPC, Beckhoff CX) that runs a reduced version of the model with latency below 100ms.
import numpy as np
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class EdgeTwinConfig:
"""Edge digital twin configuration - optimized for limited resources."""
n_buses: int = 6 # max buses simulable in real time
max_iterations: int = 10 # Newton-Raphson: fewer iterations
tolerance_pu: float = 1e-4 # looser convergence tolerance
update_interval_ms: float = 100.0 # update frequency 10 Hz
class LightweightPowerFlowSolver:
"""
Simplified power flow solver for edge computing.
Uses DC (linear) method for maximum speed.
The DC approximation is accurate for networks with high X/R ratio.
"""
def __init__(self, config: EdgeTwinConfig):
self.config = config
self._B_matrix: Optional[np.ndarray] = None # admittance matrix
self._initialized = False
def build_susceptance_matrix(
self,
from_buses: list[int],
to_buses: list[int],
x_ohm: list[float],
base_mva: float = 100.0
) -> None:
"""
Build the B (susceptance) matrix for DC power flow.
Complexity O(n_lines), much faster than the NR method.
"""
n = self.config.n_buses
B = np.zeros((n, n))
for fb, tb, x in zip(from_buses, to_buses, x_ohm):
if x > 1e-8:
b = base_mva / x # susceptance in pu
B[fb][fb] += b
B[tb][tb] += b
B[fb][tb] -= b
B[tb][fb] -= b
# Remove slack bus row/column (bus 0)
self._B_matrix = B[1:, 1:]
self._initialized = True
def solve_dc(self, p_injections_mw: list[float], base_mva: float = 100.0) -> dict:
"""
Solve DC power flow: O(n^2) vs O(n^3) of NR.
Suitable for edge hardware with less than 512 MB RAM.
Args:
p_injections_mw: list of power injections per bus (pos=generation, neg=load)
base_mva: base power for normalization
Returns:
Phase angles and estimated power flows
"""
if not self._initialized:
raise RuntimeError("B matrix not initialized")
t_start = time.monotonic_ns()
# Convert to per-unit (exclude slack bus)
p_pu = np.array(p_injections_mw[1:]) / base_mva
# Solve linear system: B * theta = P
try:
theta = np.linalg.solve(self._B_matrix, p_pu)
except np.linalg.LinAlgError:
return {"converged": False, "error": "Singular matrix - islanded network"}
theta_full = np.concatenate([[0.0], theta]) # add slack bus
elapsed_us = (time.monotonic_ns() - t_start) // 1000
return {
"converged": True,
"theta_rad": theta_full.tolist(),
"theta_deg": np.degrees(theta_full).tolist(),
"solve_time_us": int(elapsed_us),
"within_latency_budget": elapsed_us < (self.config.update_interval_ms * 1000 * 0.1)
}
class EdgeDigitalTwinRuntime:
"""
Edge runtime for lightweight digital twin on industrial gateways.
Runs in a 10 Hz loop with model updates from local sensors.
"""
def __init__(self, config: EdgeTwinConfig):
self.config = config
self.solver = LightweightPowerFlowSolver(config)
self._thermal_model = TransformerThermalModel()
self._last_sync_ts: float = 0.0
self._cloud_buffer: list[dict] = []
def process_sensor_sample(self, sensors: dict) -> dict:
"""
Process a sensor sample and update the edge model.
Target latency: less than 10ms total.
Args:
sensors: current measurements (voltages, currents, temperatures)
Returns:
Computed state + immediate alerts
"""
t_start = time.monotonic_ns()
# Update thermal model (very fast, algebra only)
load_fraction = sensors.get("trafo_loading_pu", 0.5)
ambient_temp = sensors.get("ambient_temp_c", 20.0)
thermal_state = self._thermal_model.step(
load_fraction=load_fraction,
ambient_temp=ambient_temp,
dt_seconds=self.config.update_interval_ms / 1000.0
)
# Fast network state estimate (DC power flow)
p_injections = sensors.get("p_injections_mw", [0.0] * self.config.n_buses)
pf_result = self.solver.solve_dc(p_injections)
# Immediate alerts (highest priority - less than 1ms)
immediate_alerts = self._evaluate_fast_alerts(thermal_state, sensors)
elapsed_ms = (time.monotonic_ns() - t_start) / 1e6
result = {
"thermal": thermal_state,
"power_flow": pf_result,
"alerts": immediate_alerts,
"edge_processing_ms": round(elapsed_ms, 3),
"within_budget": elapsed_ms < 10.0
}
# Accumulate for asynchronous cloud sync
self._cloud_buffer.append(result)
return result
def _evaluate_fast_alerts(
self,
thermal_state: dict,
sensors: dict
) -> list[dict]:
"""Critical alert evaluated in less than 1ms for immediate response."""
alerts = []
hst = thermal_state.get("theta_winding_hst_c", 0.0)
if hst > 140.0:
alerts.append({
"code": "TRAFO_EMERGENCY_TEMP",
"severity": "CRITICAL",
"value": hst,
"action": "REDUCE_LOAD_IMMEDIATELY"
})
elif hst > 120.0:
alerts.append({
"code": "TRAFO_HIGH_TEMP",
"severity": "WARNING",
"value": hst,
"action": "MONITOR_CLOSELY"
})
# Check overvoltage (from raw sensor)
v_pu = sensors.get("voltage_pu", 1.0)
if v_pu > 1.10 or v_pu < 0.90:
alerts.append({
"code": "VOLTAGE_OUT_OF_RANGE",
"severity": "WARNING",
"value": v_pu,
"action": "ADJUST_TAP_CHANGER"
})
return alerts
def flush_to_cloud(self) -> list[dict]:
"""Flush local buffer for cloud transmission (asynchronous call)."""
buffer = self._cloud_buffer.copy()
self._cloud_buffer.clear()
return buffer
Case Study: Offshore Wind Farm Digital Twin
An offshore wind farm with 40 turbines rated at 8 MW each (320 MW total) generates an enormous volume of data: each turbine has more than 200 sensors (accelerometers, strain gauges, anemometers, pitch/yaw encoders, bearing temperatures, generator currents). The digital twin integrates physics, ML, and real-time optimization to maximize energy production while minimizing maintenance costs.
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class WindTurbineState:
"""Current state of a wind turbine."""
turbine_id: str
timestamp: str
wind_speed_ms: float
wind_direction_deg: float
power_kw: float
rotor_rpm: float
pitch_angle_deg: float
yaw_error_deg: float
nacelle_temp_c: float
gearbox_temp_c: float
generator_temp_c: float
main_bearing_vibration_g: float
tower_acceleration_x_g: float
tower_acceleration_y_g: float
operational_status: str # PRODUCING, CURTAILED, FAULT, MAINTENANCE
class OffshoreWindFarmTwin:
"""
Offshore wind farm digital twin.
Manages 40 turbines with wake effect simulation and predictive maintenance.
"""
# Vestas V236-15.0 MW power curve (approximation)
POWER_CURVE_MS = [0, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 25]
POWER_CURVE_KW = [0, 0, 90, 310, 680, 1250, 2100, 3200, 4600, 6200, 7800, 8000, 8000]
def __init__(self, farm_name: str, n_turbines: int = 40):
self.farm_name = farm_name
self.n_turbines = n_turbines
self.turbine_states: dict[str, WindTurbineState] = {}
self._power_interpolator = self._build_power_curve_interp()
def _build_power_curve_interp(self):
"""Build the power curve interpolator."""
from scipy.interpolate import interp1d
return interp1d(
self.POWER_CURVE_MS,
self.POWER_CURVE_KW,
kind="cubic",
bounds_error=False,
fill_value=(0.0, 8000.0)
)
def calculate_wake_effect(
self,
turbine_positions: list[tuple[float, float]],
free_stream_wind_ms: float,
wind_direction_deg: float
) -> list[float]:
"""
Calculate wind speed for each turbine accounting for wake effect.
Uses simplified Jensen (Park) model.
Args:
turbine_positions: list of (x, y) in meters
free_stream_wind_ms: free-stream wind speed
wind_direction_deg: wind direction (0=North)
Returns:
List of effective wind speed for each turbine
"""
n = len(turbine_positions)
effective_wind = [free_stream_wind_ms] * n
k_wake = 0.04 # offshore wake expansion constant
rotor_d = 236.0 # Vestas V236 rotor diameter in meters
ct = 0.79 # thrust coefficient at rated wind speed
# Convert wind direction to vector
wind_rad = np.radians(wind_direction_deg)
wind_vec = np.array([np.sin(wind_rad), np.cos(wind_rad)])
for i, (xi, yi) in enumerate(turbine_positions):
for j, (xj, yj) in enumerate(turbine_positions):
if i == j:
continue
# Vector from turbine j to turbine i
delta = np.array([xi - xj, yi - yj])
dist = np.linalg.norm(delta)
if dist < 1.0:
continue
# Component in wind direction
downstream = np.dot(delta, wind_vec)
if downstream <= 0:
continue # i is not downstream of j
# Lateral distance from wake centerline
lateral = abs(np.cross(delta / dist, wind_vec)) * dist
# Wake radius at downstream distance
wake_radius = rotor_d / 2.0 + k_wake * downstream
# Overlap fraction (simplified: cylindrical)
if lateral < wake_radius:
overlap = max(0.0, 1.0 - lateral / wake_radius)
# Jensen model velocity deficit
deficit = (1.0 - np.sqrt(1.0 - ct)) * (rotor_d / (rotor_d + 2 * k_wake * downstream)) ** 2
effective_wind[i] = min(effective_wind[i], free_stream_wind_ms * (1.0 - overlap * deficit))
return effective_wind
def estimate_farm_production(
self,
wind_speed_ms: float,
wind_direction_deg: float,
turbine_positions: list[tuple[float, float]],
availability: Optional[list[float]] = None
) -> dict:
"""
Estimate total farm production with wake model.
Args:
wind_speed_ms: wind speed at hub height
wind_direction_deg: wind direction
turbine_positions: turbine coordinates
availability: per-turbine availability (0-1)
Returns:
Total production, per-turbine production, and wake losses
"""
if availability is None:
availability = [1.0] * self.n_turbines
# Calculate effective wind speed for each turbine
effective_winds = self.calculate_wake_effect(
turbine_positions, wind_speed_ms, wind_direction_deg
)
# Calculate per-turbine power from curve
powers_kw = []
for i, (v_eff, avail) in enumerate(zip(effective_winds, availability)):
p = float(self._power_interpolator(v_eff)) * avail
powers_kw.append(p)
total_power_mw = sum(powers_kw) / 1000.0
ideal_power_mw = float(self._power_interpolator(wind_speed_ms)) * self.n_turbines / 1000.0
wake_loss_pct = max(0.0, (ideal_power_mw - total_power_mw) / ideal_power_mw * 100) if ideal_power_mw > 0 else 0.0
return {
"total_power_mw": round(total_power_mw, 2),
"ideal_power_mw": round(ideal_power_mw, 2),
"wake_loss_pct": round(wake_loss_pct, 2),
"capacity_factor": round(total_power_mw / (self.n_turbines * 8.0) * 100, 1),
"per_turbine_kw": [round(p, 1) for p in powers_kw],
"effective_wind_ms": [round(v, 2) for v in effective_winds]
}
def detect_yaw_misalignment(
self,
turbine_id: str,
lidar_wind_dir: float,
nacelle_dir: float,
threshold_deg: float = 5.0
) -> dict:
"""
Detect yaw misalignment from LIDAR data + nacelle encoder.
A 10-degree yaw error reduces production by approximately 3% (cosine cubed law).
"""
yaw_error = abs(lidar_wind_dir - nacelle_dir)
# Normalize to -180/+180 range
if yaw_error > 180:
yaw_error = 360 - yaw_error
# Estimate production loss (cosine law)
production_loss_pct = (1 - np.cos(np.radians(yaw_error)) ** 3) * 100
return {
"turbine_id": turbine_id,
"yaw_error_deg": round(yaw_error, 2),
"production_loss_pct": round(production_loss_pct, 2),
"action_required": yaw_error > threshold_deg,
"priority": "HIGH" if yaw_error > 15.0 else
"MEDIUM" if yaw_error > threshold_deg else "NONE"
}
Performance, Scalability, and Security
Performance Requirements
| Component | Target Latency | Update Frequency | Storage/day |
|---|---|---|---|
| Edge twin (DC power flow) | < 10ms | 10 Hz | 50 MB/turbine |
| NR power flow (cloud) | < 500ms | 0.03 Hz (30s) | 5 MB/substation |
| ML inference | < 100ms | 1 Hz | 2 MB/asset |
| 3D visualization sync | < 1s | 0.1 Hz (10s) | negligible |
| CGMES model export | < 5s | 1/hour | 100 MB/network |
Security: Protecting SCADA Data
Warning: Digital Twins as an Attack Vector
An energy digital twin is a high-value target for cyberattacks: it contains the complete network topology, asset vulnerabilities, and protection schemes. The NIS2 Directive (in force across the EU since 2024) imposes strict requirements on operators of essential services in the energy sector:
- Network segmentation: the DT data layer must be in a DMZ separate from the OT network
- Zero-trust access: every component authenticates itself; no implicit trust
- Data sanitization: SCADA data flowing to the cloud DT must pass through a data diode (physical unidirectional device)
- Audit trail: every model query must be logged for compliance
- Encryption at rest: CIM/CGMES models contain sensitive data on network topology
# Recommended security architecture for energy Digital Twin
#
# OT Network (IEC 62443) DMZ Cloud
# +---------------------+ +--------------+ +---------------+
# | IED / RTU / SCADA |--->| Data Diode |--->| Data Broker |
# | (Modbus, IEC 61850) | | (unidirect.) | | (Kafka/MQTT) |
# +---------------------+ +--------------+ +-------+-------+
# | TLS 1.3
# +---------------------+ +--------------+ +-------v-------+
# | Historian (local) | | Firewall | | DT Engine |
# | OSIsoft PI / AVEVA | | IPS/IDS | | (Azure DT / |
# +---------------------+ +--------------+ | AWS TwinMkr) |
# +---------------+
#
# Key rules:
# - No INBOUND connections to the OT network
# - mTLS authentication for all DT services
# - RBAC: operators see only their own assets
# - Historical data: 7-year retention (regulatory requirement)
# - CIM model backup: immutable on S3/ADLS with versioning
# Example: validate SCADA input before updating the twin
from typing import Any
SCADA_FIELD_LIMITS = {
"load_mw": (0.0, 1000.0),
"voltage_kv": (0.0, 500.0),
"tap_position": (-16, 16),
"ambient_temp_c": (-40.0, 60.0),
"current_ka": (0.0, 10.0)
}
def validate_scada_measurement(field: str, value: Any) -> float:
"""
Validate a SCADA measurement before ingestion into the digital twin.
Prevents injection of anomalous values (tampering / sensor errors).
"""
if not isinstance(value, (int, float)):
raise ValueError(f"Field {field}: invalid type {type(value)}")
value = float(value)
if not (-1e9 < value < 1e9): # numeric overflow sanity check
raise ValueError(f"Field {field}: numeric overflow {value}")
if field in SCADA_FIELD_LIMITS:
lo, hi = SCADA_FIELD_LIMITS[field]
if not (lo <= value <= hi):
raise ValueError(
f"Field {field}: value {value} outside physical range [{lo}, {hi}]"
)
return value
The Future: Federated Digital Twins and AI-Augmented Grids
Emerging trends for 2026-2028 in the field of energy digital twins:
2026-2028 Trends: Next-Generation Digital Twins
- Federated Digital Twins: digital twins from different TSOs and DSOs collaborating while preserving data privacy, using federated learning techniques to update shared models without sharing sensitive network topology data.
- AI-Augmented Physics Models: Physics-Informed Neural Networks (PINN) that combine Maxwell/Kirchhoff equations with observed data, reducing simulation error by 60% compared to purely analytical models.
- Autonomous Grid Twin: digital twins with Reinforcement Learning (RL) agents that execute autonomous control actions (tap changers, load shedding, storage dispatch), optimizing in real time under human supervision.
- Digital Twin as a Service (DTaaS) for SMEs: SaaS platforms offering pre-configured digital twins for medium industrial installations (1-50 MW), lowering the entry barrier from 500K to 50K EUR.
- 5G Ultra-Reliable Low-Latency: 5G URLLC connectivity (latency <1ms, 99.999% reliability) will enable field-deployed edge twins without intermediate gateways, with direct sensor-to-cloud synchronization.
Resources and References
| Resource | Type | Use |
|---|---|---|
| pandapower.readthedocs.io | Documentation | Power flow, optimal power flow, short circuit |
| pypsa.readthedocs.io | Documentation | Capacity expansion, unit commitment, multi-period |
| IEC 60076-7:2018 | Standard | Power transformer thermal model |
| IEC 60599:2015 | Standard | DGA interpretation for transformer diagnostics |
| ENTSO-E CIM / CGMES 3.0 | Standard | Network data model, TSO/DSO exchange |
| Azure Digital Twins docs | Cloud Platform | DTDL, CIM ontologies, Azure IoT Hub |
| AWS IoT TwinMaker docs | Cloud Platform | Scene Composer, Grafana plugin, Timestream |
| OpenModelica | Open-source tool | Complex physics simulation (Modelica language) |
| NIS2 Directive (EU 2022/2555) | Regulation | Cybersecurity for essential service operators |
Conclusions
The digital twin for energy infrastructure is today one of the most transformative technologies in the sector: the $34 billion market in 2025 is growing at 34.7% annually, driven by the energy transition, increasing grid complexity with DER, and regulatory pressure on reliability and security.
We have seen how to implement an enterprise-grade digital twin with: pandapower for real-time Newton-Raphson power flow, the IEC 60076-7 thermal model for transformers, PyPSA for network expansion planning optimization, ML models with Random Forest and XGBoost for predictive maintenance, and the CIM/CGMES model for interoperability between systems.
The key to success is a well-separated layered architecture: physical, data, model, and visualization layers must have clear interfaces, enabling each component to evolve independently. The digital shadow (L1) is the accessible first step; the prescriptive twin (L4) is the ultimate goal where the infrastructure self-optimizes.
Next Article in the Series
The next and final article in the EnergyTech series explores Blockchain for P2P Energy Trading: Smart Contracts and Constraints: Solidity smart contract architecture for peer-to-peer energy trading, automatic on-chain settlement, and navigating European regulatory constraints.
Related Series
- MLOps: to bring predictive maintenance ML models to production with MLflow, DVC, and Kubernetes
- AI Engineering: to implement RAG on technical plant documentation and LLMs for fault report analysis
- Data & AI Business: for energy data governance and building ETL/ELT pipelines for analytics







