ML을 사용한 재생 에너지 예측: 태양광 및 풍력을 위한 Python LSTM
2024년에는 이탈리아가 따라잡았다 74.3GW의 재생 가능 용량 설치, 태양광으로 이는 37GW(연간 추가 6.8GW)를 초과하고, 풍력은 13GW 안팎이다. 전 세계적으로, 전력 믹스에서 재생에너지가 차지하는 비중은 전례 없는 속도로 계속 증가하고 있지만 그와 함께 나타나고 있습니다. 중요한 기술적, 경제적 과제: 생산의 간헐성과 가변성.
밤에는 태양이 빛나지 않으며, 바람도 항상 같은 강도로 불지 않습니다. 사이의 편차 예상 생산량과 실제 생산량은 전력 시장에 직접적인 비용을 발생시킵니다. 유럽에서는 실수 재생 에너지에 대한 예측은 다음과 같은 불균형 비용을 발생시킵니다. 5 및 50 EUR/MWh, i 위에 봉우리가 있음 1,000 EUR/MWh 이벤트 중 Dunkelflaute(바람이 적고 일사량이 적은 날). 10MW 태양광 발전소 운영 1년 240일, 예상 RMSE가 10% 감소하면 연간 절감 효과를 얻을 수 있습니다. 수십만 유로.
이 문서에서는 예측을 위한 전문적인 ML/딥 러닝 모델을 구축하는 방법을 안내합니다. 태양광 및 풍력 에너지, 기능 엔지니어링부터 LSTM 아키텍처, 확률 모델까지 프로덕션에 배포합니다. Python 코드는 완전하고 테스트되었으며 사용할 준비가 되었습니다.
무엇을 배울 것인가
- 태양광(GHI, DNI, DHI, 태양각) 및 풍력(Weibull, 윈드 시어, 전력 곡선)에 대한 고급 기능 엔지니어링
- PyTorch를 사용한 다단계 예측을 위한 LSTM 인코더-디코더 아키텍처
- 바람 예측을 위한 TCN(시간적 합성곱 네트워크)
- 앙상블 NWP + LSTM + 최적화된 가중치를 사용한 그래디언트 부스팅
- 분위수 회귀 및 핀볼 손실을 사용한 확률적 예측
- 프로덕션 배포를 위한 MLflow + FastAPI 파이프라인
- 실제 데이터 세트에 대한 벤치마크 모델: Persistence, ARIMA, XGBoost, LSTM, TCN, TFT
- 이탈리아 전력 시장 상황: MGP, MI, PUN 지수 GME, 시장 구역
EnergyTech 시리즈 개요
이 시리즈는 IoT 데이터 수집부터 완전한 스마트 에너지 기술 스택을 다룹니다. 머신러닝, 디지털 트윈, 에너지 블록체인을 통해 클라우드 아키텍처로 전환합니다.
| # | Articolo | 수준 | 상태 |
|---|---|---|---|
| 1 | 스마트 그리드 및 에너지 IoT: 아키텍처 및 프로토콜 | 중급 | 사용 가능 |
| 2 | 에너지 시계열용 MQTT 및 InfluxDB | 중급 | 사용 가능 |
| 3 | ML을 이용한 재생 에너지 예측: 태양광 및 풍력을 위한 LSTM | 고급의 | 당신은 여기에 있습니다 |
| 4 | Azure 및 Python을 사용한 에너지 시스템용 디지털 트윈 | 고급의 | 사용 가능 |
| 5 | DERMS: 분산 에너지 자원 관리 | 고급의 | 사용 가능 |
| 6 | 배터리 관리 시스템: 알고리즘 및 소프트웨어 아키텍처 | 고급의 | 사용 가능 |
| 7 | IEC 61850: 전기 변전소에 대한 통신 표준 | 고급의 | 사용 가능 |
| 8 | EV 로드 밸런싱: 전기자동차 충전 최적화 | 중급 | 사용 가능 |
| 9 | 탄소 회계 소프트웨어: CO2 측정 및 감소 | 중급 | 사용 가능 |
| 10 | 블록체인 P2P 에너지 거래: 에너지에 대한 스마트 계약 | 고급의 | 사용 가능 |
에너지 예측의 기초
시간적 지평선
에너지 예측은 단일 문제가 아닙니다. 다양한 시간 범위가 있으며 각각 다양한 특성, 데이터 소스 및 최적의 모델.
| 수평선 | 창문 | 해결 | 주요 용도 | 추천 모델 |
|---|---|---|---|---|
| 초단기 | 0~4시간 | 1~15분 | 규제, 주파수 제어 | LSTM, GRU, 온라인 학습 |
| 일중 | 4~24시간 | 15~60분 | 장중 시장(MI) | LSTM, TCN, NWP-하이브리드 |
| 하루 앞으로 | 24~48시간 | 60분 | MGP 마켓(제공) | LSTM, TFT 인코더-디코더 |
| 일주일 앞으로 | 2~7일 | 4~24시간 | 유지보수 계획 | 트랜스포머, 프로핏+ML |
| 계절별 | 1~12개월 | 일일 | 용량 계획 | SARIMA, LSTM 시즌 |
평가 지표
에너지 측면에서는 특정 지표가 사용됩니다. MAPE는 값이 있는 계열에는 권장되지 않습니다. 0에 가까움(밤, 잔잔한 바람): 기준선과 비교하여 nMAE 또는 스킬 점수를 사용하는 것이 더 좋습니다. 지속성.
| 미터법 | 공식 | 해석 | 메모 |
|---|---|---|---|
| RMSE | sqrt(평균((y-y_hat)^2)) | 큰 실수를 처벌 | kW 또는 MW 단위는 규모에 따라 다름 |
| MAE | 평균(|y-y_hat|) | 평균 절대 오차 | 특이치에 더 강력함 |
| nRMSE | RMSE / P_등급 | 정규화된 오류 % | 다른 시스템 간의 비교 |
| 스킬 점수 | 1 - RMSE_모델 / RMSE_지속성 | 개선 대 기준선 | 0=지속성과 동일, 1=완벽함 |
| 핀볼 손실 | 최대(q*(y-y_hat), (q-1)*(y-y_hat)) | 품질 분위수 | 확률적 예측의 경우 |
기준선으로서의 NWP
ECMWF, GFS 등 NWP(Numerical Weather Prediction) 모델이나 DWD의 ICON 모델은 특히 하루의 전망에 대해 이미 경쟁력 있는 기준이 되는 일기 예보입니다. ML의 부가가치는 일반적으로 체계적 편향 수정 모델 중 NWP(Model Output Statistics, MOS) 및 식물별 지역 패턴 캡처.
에너지에 대한 공개 데이터세트
- PVGIS(EU) - 유럽 전역의 과거 태양 데이터, 무료 REST API, 2005년까지 시간별 해상도
- NSRDB(미국) - 국립 태양 복사 데이터베이스, SAM 데이터, 4km x 4km
- 개방형 날씨 - 오픈 소스 날씨 API, NWP 앙상블, 6시간마다 업데이트
- 코페르니쿠스 ERA5 - ECMWF 글로벌 재분석, 1979-현재, 31km 해상도
- GSE 이탈리아 - 인센티브 식물의 생산 데이터, 국가 통계
- 테르나 오픈 데이터 - 네트워크 데이터, 생성, 소비, 이탈리아 시장 영역
태양광을 위한 기능 엔지니어링
기능의 품질은 좋은 태양광 모델을 위한 가장 중요한 요소입니다. 훈련된 LSTM 잘못되었거나 잘못 정규화된 기능에서는 항상 단순한 기능보다 성능이 낮습니다. 잘 구성된 특징을 가진 선형 회귀 분석기입니다.
태양 복사 조도: GHI, DNI, DHI
태양 복사 조도는 모델이 학습해야 하는 세 가지 물리적으로 구별되는 구성 요소로 분해됩니다. 임플란트의 기하학적 구조와 결합하려면:
- GHI(전역 수평 방사조도): 수평면의 총 방사조도 [W/m2]. 이는 가장 일반적인 측정값이며 NWP 데이터 세트에서 발견되는 측정값입니다.
- DNI(직접 일반 조사): 태양에 수직인 직접 구성요소. CSP 및 추적 패널에 대한 비판.
- DHI(확산 수평 조사): 확산 구성요소(하늘, 구름). GHI = DNI * cos(천정) + DHI.
태양광 패널의 온도는 매우 중요합니다. 섭씨 25°C를 초과할 때마다 효율성이 감소합니다. 대략 0.4-0.5% 결정질 모듈용(표준 온도 계수). 셀 온도는 NOCT(공칭 작동 셀 온도) 모델을 사용하여 추정됩니다. T_cell = T_amb + (NOCT - 20) * GHI / 800.
코드: 태양광 기능 공학
import numpy as np
import pandas as pd
from pvlib import solarposition, irradiance, atmosphere
from sklearn.preprocessing import MinMaxScaler
def create_solar_features(df: pd.DataFrame,
lat: float,
lon: float,
altitude: float = 0,
panel_tilt: float = 30,
panel_azimuth: float = 180) -> pd.DataFrame:
"""
Costruisce feature complete per forecasting solare.
Args:
df: DataFrame con colonne [ghi, dni, dhi, temp_air, wind_speed, cloud_cover]
e indice DatetimeTzIndex
lat, lon: coordinate impianto
altitude: quota s.l.m. [m]
panel_tilt: inclinazione pannelli [gradi]
panel_azimuth: azimut pannelli [gradi, 180=sud]
Returns:
DataFrame con tutte le feature engineered
"""
feat = df.copy()
times = feat.index
# --- Geometria solare ---
solpos = solarposition.get_solarposition(times, lat, lon, altitude)
feat['solar_zenith'] = solpos['apparent_zenith']
feat['solar_azimuth'] = solpos['azimuth']
feat['solar_elevation'] = solpos['apparent_elevation']
feat['solar_elevation_rad'] = np.radians(feat['solar_elevation'])
# Angolo di incidenza pannello (AOI)
aoi = irradiance.aoi(panel_tilt, panel_azimuth,
solpos['apparent_zenith'],
solpos['azimuth'])
feat['aoi'] = aoi
feat['cos_aoi'] = np.cos(np.radians(aoi)).clip(0, 1)
# --- Irradianza sul piano inclinato (POA) ---
airmass = atmosphere.get_relative_airmass(solpos['apparent_zenith'])
poa = irradiance.get_total_irradiance(
panel_tilt, panel_azimuth,
solpos['apparent_zenith'], solpos['azimuth'],
feat['dni'], feat['ghi'], feat['dhi'],
airmass=airmass
)
feat['poa_global'] = poa['poa_global']
feat['poa_direct'] = poa['poa_direct']
feat['poa_diffuse'] = poa['poa_diffuse']
# --- Temperatura cella (modello NOCT) ---
NOCT = 45 # grado Celsius tipico modulo standard
feat['temp_cell'] = (feat['temp_air'] +
(NOCT - 20) * feat['poa_global'] / 800)
# Derating per temperatura (coefficiente -0.0045 /degC)
feat['temp_derating'] = 1 + (-0.0045) * (feat['temp_cell'] - 25)
# --- Clearness Index (kt) ---
# Extra-terrestrial radiation per calcolo clearness
et_rad = irradiance.get_extra_radiation(times)
feat['clearness_index'] = (feat['ghi'] /
(et_rad * np.cos(np.radians(
solpos['apparent_zenith']))
).clip(lower=1e-3)).clip(0, 1.2)
# --- Cloud cover encoding ---
feat['cloud_cover_norm'] = feat['cloud_cover'] / 100.0
feat['clear_sky_fraction'] = 1.0 - feat['cloud_cover_norm']
# --- Encoding ciclico temporale ---
feat['hour_sin'] = np.sin(2 * np.pi * times.hour / 24)
feat['hour_cos'] = np.cos(2 * np.pi * times.hour / 24)
feat['doy_sin'] = np.sin(2 * np.pi * times.dayofyear / 365.25)
feat['doy_cos'] = np.cos(2 * np.pi * times.dayofyear / 365.25)
feat['month_sin'] = np.sin(2 * np.pi * times.month / 12)
feat['month_cos'] = np.cos(2 * np.pi * times.month / 12)
feat['dow_sin'] = np.sin(2 * np.pi * times.dayofweek / 7)
feat['dow_cos'] = np.cos(2 * np.pi * times.dayofweek / 7)
# --- Feature lag (produzione storica) ---
for lag in [1, 2, 3, 6, 12, 24, 48]:
feat[f'power_lag_{lag}h'] = feat['power'].shift(lag)
# Rolling stats (finestra 6h, 24h)
feat['power_roll6h_mean'] = feat['power'].rolling(6, min_periods=1).mean()
feat['power_roll24h_mean'] = feat['power'].rolling(24, min_periods=1).mean()
feat['ghi_roll3h_mean'] = feat['ghi'].rolling(3, min_periods=1).mean()
# --- Ramping feature ---
feat['power_delta_1h'] = feat['power'].diff(1)
feat['ghi_delta_1h'] = feat['ghi'].diff(1)
# Maschera ore notturne (sun elevation < 5 gradi)
feat['is_daytime'] = (feat['solar_elevation'] > 5).astype(int)
return feat.dropna()
def prepare_sequences(features: np.ndarray,
targets: np.ndarray,
lookback: int = 48,
horizon: int = 24) -> tuple:
"""
Crea sequenze input/output per LSTM.
Returns:
X: (N, lookback, n_features)
y: (N, horizon)
"""
X, y = [], []
for i in range(lookback, len(features) - horizon + 1):
X.append(features[i - lookback:i])
y.append(targets[i:i + horizon])
return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32)
경고: 데이터 유출
스케일러 피팅은 훈련 세트에서만 수행되어야 하며 그런 다음 다음에 적용(변환)되어야 합니다.
검증 및 테스트. 전체 데이터세트와 소스 중 하나에 MinMaxScaler().fit_transform()을 사용하세요.
시계열 예측에서 가장 일반적인 데이터 유출 문제.
항상 사용 scaler.fit(X_train) 그 다음에 scaler.transform(X_val).
풍력 발전을 위한 기능 엔지니어링
바람 예측은 태양광 예측과 다른 물리적 문제를 제시합니다. 풍속과의 관계 그리고 생산된 전력 큐빅 터빈의 작동 영역(P ∝ v³)에서, 공칭 전력에서는 포화 상태이고 극단적인 경우에는 차단됩니다(컷인 및 컷아웃 속도). 풍속 분포는 다음과 같습니다. 와이블 분포.
전력 곡선 및 바람 전단
10m(기상기준)에서 측정된 풍속은 고도에 외삽되어야 함 대수 프로파일을 사용하는 터빈 허브(일반적으로 유틸리티 규모 터빈의 경우 80-150m) 바람이나 거듭제곱의 법칙:
v_hub = v_ref * (h_hub / h_ref)^알파여기서 알파는 전단 지수입니다. (0.1-0.3은 땅의 거칠기에 따라 다름)
import numpy as np
import pandas as pd
from scipy.stats import weibull_min
def create_wind_features(df: pd.DataFrame,
hub_height: float = 100,
ref_height: float = 10,
rated_power: float = 2000, # kW
cut_in: float = 3.0, # m/s
rated_speed: float = 12.0, # m/s
cut_out: float = 25.0 # m/s
) -> pd.DataFrame:
"""
Feature engineering per forecasting eolico.
df deve contenere:
- wind_speed_10m: velocità vento a 10m [m/s]
- wind_dir_10m: direzione vento [gradi 0-360]
- temp_air: temperatura aria [degC]
- pressure: pressione atmosferica [Pa]
- roughness_length: lunghezza rugosita terreno [m]
"""
feat = df.copy()
# --- Wind shear: extrapolazione all'altezza hub ---
# Profilo logaritmico (più accurato del power law per terreni complessi)
z0 = feat.get('roughness_length', 0.03) # default terreno agricolo
feat['wind_speed_hub'] = (feat['wind_speed_10m'] *
np.log(hub_height / z0) /
np.log(ref_height / z0))
# Power law come alternativa
shear_exp = 0.14 # terreno aperto standard
feat['wind_speed_hub_powerlaw'] = (feat['wind_speed_10m'] *
(hub_height / ref_height) ** shear_exp)
# --- Densita aria (funzione di T e P) ---
R = 287.05 # costante gas reale aria secca [J/kg/K]
feat['air_density'] = feat['pressure'] / (R * (feat['temp_air'] + 273.15))
# Velocita normalizzata per densita (v_equiv = v * (rho/rho_ref)^(1/3))
rho_ref = 1.225 # kg/m3 a 15degC, 1013.25 hPa
feat['wind_speed_density_corrected'] = (feat['wind_speed_hub'] *
(feat['air_density'] / rho_ref) ** (1/3))
# --- Curva di potenza teorica ---
v = feat['wind_speed_density_corrected'].values
def power_curve(v: np.ndarray) -> np.ndarray:
"""Curva di potenza cubica con saturazione."""
p = np.zeros_like(v)
# Zona cubica: cut_in <= v < rated_speed
mask_cubic = (v >= cut_in) & (v < rated_speed)
p[mask_cubic] = rated_power * ((v[mask_cubic] - cut_in) /
(rated_speed - cut_in)) ** 3
# Zona nominale: rated_speed <= v < cut_out
mask_rated = (v >= rated_speed) & (v < cut_out)
p[mask_rated] = rated_power
# Zona spenta: v < cut_in o v >= cut_out
return p
feat['theoretical_power'] = power_curve(v)
feat['power_ratio'] = feat['theoretical_power'] / rated_power
# --- Componenti vettoriali del vento ---
wind_dir_rad = np.radians(feat['wind_dir_10m'])
feat['wind_u'] = -feat['wind_speed_hub'] * np.sin(wind_dir_rad)
feat['wind_v'] = -feat['wind_speed_hub'] * np.cos(wind_dir_rad)
# --- Turbolenza (Turbulence Intensity) ---
feat['turbulence_intensity'] = (feat['wind_speed_hub'].rolling(10).std() /
feat['wind_speed_hub'].rolling(10).mean().clip(lower=0.1))
# --- Weibull: parametri stimati su finestra mobile ---
def estimate_weibull_shape(series: pd.Series, min_count: int = 20) -> float:
"""Stima parametro di forma k della distribuzione Weibull."""
vals = series.dropna().values
if len(vals) < min_count:
return np.nan
try:
params = weibull_min.fit(vals[vals > 0], floc=0)
return params[0] # k (shape parameter)
except Exception:
return np.nan
feat['weibull_k_72h'] = (feat['wind_speed_hub']
.rolling(72, min_periods=20)
.apply(estimate_weibull_shape, raw=False))
# --- Encoding temporale ciclico ---
times = feat.index
feat['hour_sin'] = np.sin(2 * np.pi * times.hour / 24)
feat['hour_cos'] = np.cos(2 * np.pi * times.hour / 24)
feat['doy_sin'] = np.sin(2 * np.pi * times.dayofyear / 365.25)
feat['doy_cos'] = np.cos(2 * np.pi * times.dayofyear / 365.25)
feat['month_sin'] = np.sin(2 * np.pi * times.month / 12)
feat['month_cos'] = np.cos(2 * np.pi * times.month / 12)
# --- Lag e rolling per potenza storica ---
for lag in [1, 2, 3, 6, 12, 24, 48, 168]: # 168h = 1 settimana
feat[f'power_lag_{lag}h'] = feat['power'].shift(lag)
feat['power_roll6h_std'] = feat['power'].rolling(6).std()
feat['power_roll24h_mean'] = feat['power'].rolling(24).mean()
feat['wind_roll3h_mean'] = feat['wind_speed_hub'].rolling(3).mean()
# --- Wake effect proxy (vento in direzione impianto vicino) ---
# Semplificato: penalizzazione direzione principale
feat['wake_factor'] = np.where(
(feat['wind_dir_10m'] > 170) & (feat['wind_dir_10m'] < 220),
0.85, # direzione dominante = wake effect stimato 15%
1.0
)
return feat.dropna()
태양광용 LSTM 인코더-디코더 아키텍처
시간별 해상도를 갖춘 일일 예측(24~48시간 전)의 경우 아키텍처 인코더-디코더 seq2seq 단순한 일대다 LSTM보다 우수합니다. 인코더는 전체 스토리를 고정된 컨텍스트로 압축하고 디코더는 각 단계를 생성합니다. 그 맥락에 따라 미래가 결정됩니다. 우리는 또한 메커니즘을 추가합니다 주목 가장 관련성이 높은 과거 시간을 측정합니다.
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from typing import Optional
class SolarEncoder(nn.Module):
"""Encoder LSTM per sequenze storiche di input."""
def __init__(self, input_size: int, hidden_size: int, num_layers: int,
dropout: float = 0.2):
super().__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0.0,
bidirectional=False
)
self.layer_norm = nn.LayerNorm(hidden_size)
def forward(self, x: torch.Tensor):
# x: (batch, seq_len, input_size)
outputs, (hidden, cell) = self.lstm(x)
# outputs: (batch, seq_len, hidden_size) - tutti gli hidden state
outputs = self.layer_norm(outputs)
return outputs, hidden, cell
class BahdanauAttention(nn.Module):
"""Meccanismo di attention Bahdanau (additive attention)."""
def __init__(self, hidden_size: int):
super().__init__()
self.W_q = nn.Linear(hidden_size, hidden_size, bias=False)
self.W_k = nn.Linear(hidden_size, hidden_size, bias=False)
self.v = nn.Linear(hidden_size, 1, bias=False)
def forward(self, query: torch.Tensor,
keys: torch.Tensor) -> tuple:
# query: (batch, 1, hidden_size) - decoder hidden state
# keys: (batch, seq_len, hidden_size) - encoder outputs
q = self.W_q(query) # (batch, 1, hidden_size)
k = self.W_k(keys) # (batch, seq_len, hidden_size)
scores = self.v(torch.tanh(q + k)) # (batch, seq_len, 1)
weights = torch.softmax(scores, dim=1) # (batch, seq_len, 1)
context = (weights * keys).sum(dim=1, keepdim=True) # (batch, 1, hidden)
return context, weights.squeeze(-1)
class SolarDecoder(nn.Module):
"""Decoder LSTM con attention per forecast multi-step."""
def __init__(self, input_size: int, hidden_size: int, num_layers: int,
dropout: float = 0.2):
super().__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.attention = BahdanauAttention(hidden_size)
self.lstm = nn.LSTM(
input_size=input_size + hidden_size, # input + context
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0.0
)
self.output_proj = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size // 2, 1)
)
self.layer_norm = nn.LayerNorm(hidden_size)
def forward(self, x: torch.Tensor,
encoder_outputs: torch.Tensor,
hidden: torch.Tensor,
cell: torch.Tensor) -> tuple:
# x: (batch, 1, input_size) - input corrente del decoder
context, attn_weights = self.attention(
hidden[-1:].permute(1, 0, 2), # (batch, 1, hidden)
encoder_outputs
)
lstm_input = torch.cat([x, context], dim=-1)
output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))
output = self.layer_norm(output)
prediction = self.output_proj(output) # (batch, 1, 1)
return prediction.squeeze(-1), hidden, cell, attn_weights
class SolarForecastModel(nn.Module):
"""Modello encoder-decoder completo per previsione solare."""
def __init__(self,
encoder_features: int, # numero feature input storiche
decoder_features: int, # numero feature future note (NWP)
hidden_size: int = 256,
num_layers: int = 3,
horizon: int = 24,
dropout: float = 0.2):
super().__init__()
self.horizon = horizon
self.hidden_size = hidden_size
self.encoder = SolarEncoder(encoder_features, hidden_size,
num_layers, dropout)
self.decoder = SolarDecoder(decoder_features, hidden_size,
num_layers, dropout)
# Proiezione per adattare hidden encoder a decoder (se layers != 1)
self.hidden_proj = nn.Linear(hidden_size, hidden_size)
def forward(self, encoder_input: torch.Tensor,
decoder_input: torch.Tensor,
teacher_forcing_ratio: float = 0.5) -> torch.Tensor:
"""
Args:
encoder_input: (batch, lookback, encoder_features) - storia
decoder_input: (batch, horizon, decoder_features) - NWP futuro
teacher_forcing_ratio: prob. di usare ground truth nel training
Returns:
predictions: (batch, horizon)
"""
batch_size = encoder_input.size(0)
# Encoding della storia
encoder_outputs, hidden, cell = self.encoder(encoder_input)
# Proiezione hidden state
hidden = self.hidden_proj(hidden)
predictions = torch.zeros(batch_size, self.horizon,
device=encoder_input.device)
# Decoding step-by-step
decoder_in = decoder_input[:, 0:1, :] # primo passo
for t in range(self.horizon):
pred, hidden, cell, _ = self.decoder(
decoder_in, encoder_outputs, hidden, cell
)
predictions[:, t] = pred.squeeze(-1)
# Teacher forcing: usa ground truth o previsione precedente
if t < self.horizon - 1:
decoder_in = decoder_input[:, t+1:t+2, :]
return predictions
def train_solar_model(
model: SolarForecastModel,
train_loader: DataLoader,
val_loader: DataLoader,
epochs: int = 100,
lr: float = 1e-3,
patience: int = 15,
device: str = 'cuda' if torch.cuda.is_available() else 'cpu'
) -> dict:
"""
Training loop completo con early stopping e LR scheduler.
Returns:
dict con history di loss e best_epoch
"""
model = model.to(device)
optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
criterion = nn.HuberLoss(delta=1.0) # più robusto a outlier vs MSE
# Cosine Annealing con Warm Restarts
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(
optimizer, T_0=20, T_mult=2, eta_min=1e-6
)
history = {'train_loss': [], 'val_loss': [], 'lr': []}
best_val_loss = float('inf')
patience_counter = 0
best_state = None
for epoch in range(epochs):
# Training
model.train()
train_loss = 0.0
for batch in train_loader:
enc_in, dec_in, targets = [b.to(device) for b in batch]
optimizer.zero_grad()
# Teacher forcing decay: da 0.8 a 0.1 nel corso del training
tf_ratio = max(0.1, 0.8 - epoch * 0.007)
preds = model(enc_in, dec_in, tf_ratio)
loss = criterion(preds, targets)
loss.backward()
# Gradient clipping per stabilità
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
train_loss += loss.item()
# Validation
model.eval()
val_loss = 0.0
with torch.no_grad():
for batch in val_loader:
enc_in, dec_in, targets = [b.to(device) for b in batch]
preds = model(enc_in, dec_in, teacher_forcing_ratio=0.0)
val_loss += criterion(preds, targets).item()
train_loss /= len(train_loader)
val_loss /= len(val_loader)
scheduler.step()
current_lr = optimizer.param_groups[0]['lr']
history['train_loss'].append(train_loss)
history['val_loss'].append(val_loss)
history['lr'].append(current_lr)
print(f"Epoch {epoch+1:3d} | Train: {train_loss:.4f} | "
f"Val: {val_loss:.4f} | LR: {current_lr:.2e}")
# Early stopping
if val_loss < best_val_loss - 1e-4:
best_val_loss = val_loss
patience_counter = 0
best_state = {k: v.clone() for k, v in model.state_dict().items()}
else:
patience_counter += 1
if patience_counter >= patience:
print(f"Early stopping at epoch {epoch+1}")
break
# Ripristina il miglior modello
if best_state:
model.load_state_dict(best_state)
history['best_epoch'] = epoch + 1 - patience_counter
return history
바람 예측을 위한 TCN 모델
바람예보의 경우, 시간적 컨벌루션 네트워크(TCN) 그들은 제안한다 LSTM과 비교한 장점: 병렬화 가능한 훈련, 다음을 통해 수용 필드 제어 가능 팽창, 더 안정적인 기울기. 2024~2025년 연구에 따르면 TCN-ATT-LSTM은 MAE를 감소시키는 것으로 나타났습니다. 의 32.81% 바람 예측을 위해 LSTM만 사용한 것과 비교됩니다.
확장된 인과 컨볼루션을 사용하여 TCN을 구현합니다. 지수 팽창(1, 2, 4, 8, ...) 이를 통해 단 몇 개의 레이어만으로 매우 긴 컨텍스트를 처리할 수 있습니다.
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import List
class CausalConv1d(nn.Module):
"""Convoluzione causale: usa solo input passati, mai futuri."""
def __init__(self, in_channels: int, out_channels: int,
kernel_size: int, dilation: int = 1):
super().__init__()
self.dilation = dilation
self.kernel_size = kernel_size
# Padding causale: solo a sinistra
self.padding = (kernel_size - 1) * dilation
self.conv = nn.Conv1d(in_channels, out_channels,
kernel_size=kernel_size,
padding=self.padding,
dilation=dilation)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x: (batch, channels, seq_len)
out = self.conv(x)
# Rimuovi padding aggiunto a destra per causality
if self.padding > 0:
out = out[:, :, :-self.padding]
return out
class TCNResidualBlock(nn.Module):
"""Blocco residuale TCN con dilated causal convolutions."""
def __init__(self, in_channels: int, out_channels: int,
kernel_size: int, dilation: int, dropout: float = 0.1):
super().__init__()
self.conv1 = CausalConv1d(in_channels, out_channels,
kernel_size, dilation)
self.conv2 = CausalConv1d(out_channels, out_channels,
kernel_size, dilation)
self.bn1 = nn.BatchNorm1d(out_channels)
self.bn2 = nn.BatchNorm1d(out_channels)
self.dropout = nn.Dropout(dropout)
self.activation = nn.GELU()
# Skip connection con 1x1 conv se canali diversi
self.skip = (nn.Conv1d(in_channels, out_channels, 1)
if in_channels != out_channels else nn.Identity())
def forward(self, x: torch.Tensor) -> torch.Tensor:
residual = self.skip(x)
out = self.activation(self.bn1(self.conv1(x)))
out = self.dropout(out)
out = self.activation(self.bn2(self.conv2(out)))
out = self.dropout(out)
return self.activation(out + residual)
class TCNWindForecast(nn.Module):
"""
Temporal Convolutional Network per previsione eolica multi-step.
Architettura: Input -> TCN blocks (dilatazione esponenziale)
-> Global Average Pool -> FC -> Output
"""
def __init__(self,
input_size: int,
num_channels: List[int],
kernel_size: int = 3,
horizon: int = 24,
dropout: float = 0.1):
"""
Args:
input_size: numero di feature in input
num_channels: canali per ogni livello TCN, es. [64, 128, 256, 256]
kernel_size: dimensione kernel (3 o 5 tipici)
horizon: passi di previsione futura
dropout: tasso di dropout
"""
super().__init__()
self.horizon = horizon
# Input projection
self.input_proj = nn.Conv1d(input_size, num_channels[0], kernel_size=1)
# TCN blocks con dilatazione esponenziale
layers = []
for i, (in_ch, out_ch) in enumerate(
zip(num_channels[:-1], num_channels[1:])):
dilation = 2 ** i # 1, 2, 4, 8, 16, ...
layers.append(TCNResidualBlock(in_ch, out_ch, kernel_size,
dilation, dropout))
self.tcn = nn.Sequential(*layers)
# Output head
final_channels = num_channels[-1]
self.output_head = nn.Sequential(
nn.AdaptiveAvgPool1d(1), # global average pooling temporale
nn.Flatten(),
nn.Linear(final_channels, final_channels // 2),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(final_channels // 2, horizon)
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x: (batch, seq_len, input_size) -> porta in (batch, input_size, seq_len)
x = x.permute(0, 2, 1)
x = self.input_proj(x) # (batch, ch[0], seq_len)
x = self.tcn(x) # (batch, ch[-1], seq_len)
out = self.output_head(x) # (batch, horizon)
return out
# Esempio utilizzo
def build_wind_model(n_features: int = 35, horizon: int = 24) -> TCNWindForecast:
return TCNWindForecast(
input_size=n_features,
num_channels=[64, 128, 256, 256, 128],
kernel_size=3,
horizon=horizon,
dropout=0.15
)
# Calcolo del campo ricettivo TCN
def receptive_field(num_levels: int, kernel_size: int) -> int:
"""Campo ricettivo del TCN (quante ore passate 'vede')."""
return 1 + 2 * (kernel_size - 1) * (2 ** num_levels - 1)
# Con 5 livelli e kernel 3: RF = 1 + 2*2*(32-1) = 125 ore ~5 giorni
print(f"Receptive field: {receptive_field(5, 3)} timesteps")
앙상블 및 모델 스태킹
생산 과정에서 단일 모델이 모든 시야와 모든 기상 조건을 지배할 수는 없습니다. 승리하는 접근방식과계층적 앙상블: 다양한 모델의 기본 레벨 (통계적 NWP, LSTM, TCN, 그래디언트 부스팅) 및 예측 결합 방법을 학습하는 메타 학습기 상황(계절, 시간, 구름량)을 기반으로 합니다.
import numpy as np
import pandas as pd
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler
from typing import Dict, List
import lightgbm as lgb
class WeightedEnsembleForecaster:
"""
Ensemble adattivo che combina previsioni da più modelli base.
Strategia: weighted average con pesi ottimizzati via Ridge regression
sul validation set, condizionati sul contesto meteorologico.
"""
def __init__(self, model_names: List[str], n_quantiles: int = 4):
self.model_names = model_names
self.n_quantiles = n_quantiles
# Meta-learner per ogni slot temporale (es. 24 ore)
self.meta_learners: Dict = {}
self.scalers: Dict = {}
def fit(self, base_predictions: Dict[str, np.ndarray],
targets: np.ndarray,
context_features: np.ndarray) -> 'WeightedEnsembleForecaster':
"""
Allena i meta-learner sull'insieme di validazione.
Args:
base_predictions: {model_name: array (N, horizon)}
targets: array (N, horizon)
context_features: array (N, n_context) - GHI, ora, stagione, ecc.
"""
horizon = targets.shape[1]
n_models = len(self.model_names)
for h in range(horizon):
# Stack previsioni base per passo h
X = np.column_stack([
base_predictions[m][:, h] for m in self.model_names
]) # (N, n_models)
# Aggiungi feature di contesto
X_ctx = np.hstack([X, context_features]) # (N, n_models + n_ctx)
y = targets[:, h]
# Scaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_ctx)
self.scalers[h] = scaler
# Ridge regression con coefficienti non negativi (weight positivi)
# Usiamo Ridge standard + clip post-hoc
meta = Ridge(alpha=1.0, fit_intercept=True)
meta.fit(X_scaled, y)
self.meta_learners[h] = meta
return self
def predict(self, base_predictions: Dict[str, np.ndarray],
context_features: np.ndarray) -> np.ndarray:
"""
Genera previsione ensemble.
Returns:
array (N, horizon)
"""
horizon = len(self.meta_learners)
N = list(base_predictions.values())[0].shape[0]
ensemble_pred = np.zeros((N, horizon))
for h in range(horizon):
X = np.column_stack([
base_predictions[m][:, h] for m in self.model_names
])
X_ctx = np.hstack([X, context_features])
X_scaled = self.scalers[h].transform(X_ctx)
ensemble_pred[:, h] = self.meta_learners[h].predict(X_scaled)
return ensemble_pred
def get_model_weights(self, hour_idx: int) -> Dict[str, float]:
"""Restituisce i pesi normalizzati dei modelli base per un'ora."""
meta = self.meta_learners[hour_idx]
# Solo i pesi dei modelli base (esclude context features)
raw_weights = meta.coef_[:len(self.model_names)]
raw_weights = np.clip(raw_weights, 0, None) # non negativi
total = raw_weights.sum()
if total > 0:
norm_weights = raw_weights / total
else:
norm_weights = np.ones(len(self.model_names)) / len(self.model_names)
return dict(zip(self.model_names, norm_weights))
# LightGBM come modello base alternativo a LSTM per contesti tabulari
def train_lgbm_solar(X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray, y_val: np.ndarray,
horizon: int = 24) -> List[lgb.Booster]:
"""Allena un modello LightGBM per ogni passo del forecast."""
models = []
for h in range(horizon):
params = {
'objective': 'huber',
'metric': 'huber',
'num_leaves': 63,
'learning_rate': 0.05,
'feature_fraction': 0.8,
'bagging_fraction': 0.8,
'bagging_freq': 5,
'verbose': -1,
'n_jobs': -1
}
train_data = lgb.Dataset(X_train, label=y_train[:, h])
val_data = lgb.Dataset(X_val, label=y_val[:, h], reference=train_data)
model = lgb.train(
params, train_data,
num_boost_round=500,
valid_sets=[val_data],
callbacks=[
lgb.early_stopping(stopping_rounds=30),
lgb.log_evaluation(period=100)
]
)
models.append(model)
return models
분위수 회귀를 이용한 확률적 예측
하나의 예측 포인트만으로는 전력 시장에 대한 최적의 결정을 내리기에 충분하지 않습니다. 상인 그들에게 필요한 에너지 신뢰구간: 그럴 가능성은 얼마나 되나? 생산량이 X kW를 초과합니까? 거기 분위수 회귀 이 문제를 해결합니다 단일 아키텍처에서 여러 분위수(예: q10, q25, q50, q75, q90)를 생성합니다.
손실함수는 핀볼 손실 (분위수 손실이라고도 함): 분위수 q의 경우 오류는 비대칭입니다. 즉, q를 과소평가하고 (1-q)를 과대평가하면 처벌을 받습니다.
import torch
import torch.nn as nn
import numpy as np
from typing import List
class PinballLoss(nn.Module):
"""
Pinball loss (quantile loss) per quantile regression.
Supporta multipli quantili simultaneamente.
"""
def __init__(self, quantiles: List[float]):
super().__init__()
self.quantiles = torch.FloatTensor(quantiles)
def forward(self, preds: torch.Tensor,
targets: torch.Tensor) -> torch.Tensor:
"""
Args:
preds: (batch, horizon, n_quantiles)
targets: (batch, horizon)
Returns:
loss scalare
"""
q = self.quantiles.to(preds.device) # (n_quantiles,)
# Espandi targets: (batch, horizon, 1)
y = targets.unsqueeze(-1)
# errors: (batch, horizon, n_quantiles)
errors = y - preds
# Pinball: max(q*e, (q-1)*e)
loss = torch.max(q * errors, (q - 1) * errors)
return loss.mean()
class QuantileLSTM(nn.Module):
"""
LSTM che produce simultaneamente previsioni per N quantili.
Più efficiente che allenare N modelli separati.
"""
def __init__(self,
input_size: int,
hidden_size: int,
num_layers: int,
quantiles: List[float],
horizon: int,
dropout: float = 0.2):
super().__init__()
self.quantiles = quantiles
self.n_q = len(quantiles)
self.horizon = horizon
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0.0
)
self.dropout = nn.Dropout(dropout)
# Head per ogni quantile con shared trunk + separate heads
self.shared_head = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.LayerNorm(hidden_size // 2),
nn.GELU()
)
# Output separato per ogni quantile (garantisce ordering)
self.quantile_heads = nn.ModuleList([
nn.Linear(hidden_size // 2, horizon)
for _ in quantiles
])
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Returns:
(batch, horizon, n_quantiles) - quantili ORDINATI
"""
lstm_out, (hidden, _) = self.lstm(x)
# Usa l'ultimo hidden state
last_hidden = self.dropout(hidden[-1]) # (batch, hidden_size)
shared = self.shared_head(last_hidden) # (batch, hidden//2)
# Produce raw quantili
raw_quantiles = torch.stack([
head(shared) for head in self.quantile_heads
], dim=-1) # (batch, horizon, n_quantiles)
# Quantile crossing fix: ordina i quantili per monotonia
# Tecnica: softplus cumsum per garantire q1 <= q2 <= ... <= qN
ordered = torch.cumsum(
F.softplus(raw_quantiles), dim=-1
) - F.softplus(raw_quantiles[..., :1])
# Ancora al quantile mediano della prima head
median_base = self.quantile_heads[len(self.quantiles)//2](shared)
ordered = ordered + median_base.unsqueeze(-1)
return ordered
def compute_coverage_and_sharpness(
y_true: np.ndarray,
q_low: np.ndarray,
q_high: np.ndarray,
nominal_level: float = 0.8) -> dict:
"""
Valuta calibrazione (coverage) e sharpness degli intervalli predittivi.
Args:
y_true: valori reali (N,)
q_low: quantile inferiore (N,) - es. q10
q_high: quantile superiore (N,) - es. q90
nominal_level: livello nominale (es. 0.80 per 10%-90%)
Returns:
{'coverage': float, 'sharpness': float, 'winkler_score': float}
"""
coverage = np.mean((y_true >= q_low) & (y_true <= q_high))
sharpness = np.mean(q_high - q_low) # larghezza media intervallo
# Winkler Score: penalizza sia scarsa coverage che larghezza eccessiva
alpha = 1 - nominal_level # 0.20 per 80% PI
winkler = sharpness.copy() if hasattr(sharpness, 'copy') else sharpness
undercover = y_true < q_low
overcover = y_true > q_high
winkler_arr = (q_high - q_low).copy()
winkler_arr[undercover] += (2 / alpha) * (q_low[undercover] - y_true[undercover])
winkler_arr[overcover] += (2 / alpha) * (y_true[overcover] - q_high[overcover])
return {
'coverage': float(coverage),
'nominal_level': nominal_level,
'sharpness_mean_kw': float(sharpness),
'winkler_score': float(np.mean(winkler_arr)),
'calibration_error': float(abs(coverage - nominal_level))
}
신뢰성 다이어그램: 해석 방법
분위수에 대한 신뢰도 다이어그램(보정 도표)은 각 명목상 분위수 q에 대해 다음을 보여줍니다. 실제 값이 예측보다 떨어지는 관찰 빈도입니다. 완벽한 모델 보정하면 대각선이 생성됩니다. 체계적인 과대평가(대각선 위 곡선)는 다음을 나타냅니다. 간격이 너무 좁습니다. 과소평가(아래 곡선)는 간격이 너무 넓다는 것을 나타냅니다.
벤치마크: 실제 데이터 세트의 모델 비교
풀리아(Puglia)의 5MW 태양광 발전소와 캄파니아(Campania)의 12MW 풍력 발전소에서 실시된 벤치마크, 데이터 세트 2022-2024, 70/15/15 분할(학습/평가/테스트), 24시간 전 지평선, 시간별 해상도. 지표는 일광 시간(태양광) 또는 바람 > 컷인(바람)이 있는 시간에만 계산됩니다.
| 모델 | 태양광 nRMSE | 태양광 MAE(kW) | 스킬 스코어 솔. | nRMSE 바람 | MAE 풍력(kW) | 스킬 스코어 Eol. | 훈련시간 |
|---|---|---|---|---|---|---|---|
| 고집 | 18.4% | 312kW | 0.000 | 22.1% | 487kW | 0.000 | - |
| 아리마 | 14.2% | 241kW | 0.228 | 18.3% | 403kW | 0.172 | ~30분 |
| XGBoost | 9.8% | 166kW | 0.467 | 13.7% | 302kW | 0.380 | ~5분 |
| LSTM(기본) | 8.1% | 137kW | 0.560 | 11.8% | 260kW | 0.466 | ~45분 |
| LSTM Enc-Dec | 6.9% | 117kW | 0.625 | 10.4% | 229kW | 0.530 | ~90분 |
| TCN | 7.3% | 124kW | 0.603 | 9.6% | 211kW | 0.566 | ~60분 |
| TFT(시간 융합 변압기) | 5.8% | 98kW | 0.685 | 8.4% | 185kW | 0.620 | ~3시간 |
| 앙상블(LSTM+TCN+XGB) | 5.3% | 90kW | 0.712 | 7.9% | 174kW | 0.643 | ~2시간 |
앙상블(LSTM 인코더-디코더 + TCN + Ridge 메타 학습기가 포함된 LightGBM)은 모든 모델보다 성능이 뛰어납니다. 싱글, 개선됨 71% 지속성 기준과 비교한 nRMSE 태양광용. TFT는 비슷하지만 훈련 시간이 3배 더 필요합니다.
프로덕션에 배포: MLflow + FastAPI
예측 모델은 운영 시스템에 통합될 때까지 유용하지 않습니다. 파이프라인 생산에서 관리해야 하는 사항: NWP 데이터 수집, 기능 계산, 추론, 예측 로깅, 드리프트 모니터링 및 자동 재교육.
"""
Pipeline di produzione per forecasting rinnovabili.
Stack: MLflow (tracking + registry) + FastAPI (serving) + APScheduler (scheduling)
"""
import mlflow
import mlflow.pytorch
import torch
import numpy as np
import pandas as pd
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
import logging
from typing import Optional, List
import httpx
logger = logging.getLogger(__name__)
# ---- MLflow Experiment Setup ----
MLFLOW_TRACKING_URI = "http://mlflow-server:5000"
EXPERIMENT_NAME = "solar_forecast_puglia_5mw"
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
mlflow.set_experiment(EXPERIMENT_NAME)
def log_training_run(model, metrics: dict, params: dict,
tags: dict) -> str:
"""Logga training run su MLflow e registra il modello."""
with mlflow.start_run() as run:
# Log parametri
mlflow.log_params(params)
# Log metriche
mlflow.log_metrics(metrics)
# Log tags
mlflow.set_tags(tags)
# Salva modello PyTorch
mlflow.pytorch.log_model(
pytorch_model=model,
artifact_path="model",
registered_model_name="solar_forecast_encoder_decoder"
)
run_id = run.info.run_id
logger.info(f"MLflow run {run_id} logged successfully")
return run_id
def load_production_model(model_name: str,
stage: str = "Production") -> tuple:
"""Carica modello dalla MLflow Model Registry."""
client = mlflow.tracking.MlflowClient()
versions = client.get_latest_versions(model_name, stages=[stage])
if not versions:
raise ValueError(f"No model in stage {stage} for {model_name}")
model_version = versions[0]
model_uri = f"models:/{model_name}/{stage}"
model = mlflow.pytorch.load_model(model_uri)
model.eval()
return model, model_version
# ---- FastAPI Serving ----
app = FastAPI(
title="Renewable Energy Forecast API",
description="Day-ahead and intraday power forecast for solar/wind plants",
version="1.0.0"
)
class ForecastRequest(BaseModel):
plant_id: str = Field(..., description="ID impianto (es. 'solar_puglia_5mw')")
forecast_date: str = Field(..., description="Data previsione YYYY-MM-DD")
horizon_hours: int = Field(default=24, ge=1, le=72,
description="Orizzonte forecast in ore")
include_quantiles: bool = Field(default=True,
description="Includi intervalli probabilistici")
class ForecastResponse(BaseModel):
plant_id: str
forecast_date: str
generated_at: str
model_version: str
forecasts: List[dict] # {timestamp, power_kw, q10, q25, q75, q90}
metrics: Optional[dict] = None
# Cache modello in memoria
_model_cache: dict = {}
def get_model(plant_id: str):
"""Lazy loading del modello con cache."""
if plant_id not in _model_cache:
model_name = f"forecast_{plant_id}"
model, version = load_production_model(model_name)
_model_cache[plant_id] = {'model': model, 'version': version}
return _model_cache[plant_id]
async def fetch_nwp_data(lat: float, lon: float,
start_date: str, hours: int) -> pd.DataFrame:
"""Scarica previsioni NWP da Open-Meteo API."""
params = {
"latitude": lat,
"longitude": lon,
"hourly": "shortwave_radiation,direct_radiation,diffuse_radiation,"
"temperature_2m,wind_speed_10m,wind_direction_10m,"
"cloud_cover,precipitation",
"start_date": start_date,
"end_date": start_date,
"forecast_days": max(1, hours // 24 + 1),
"models": "best_match"
}
async with httpx.AsyncClient() as client:
resp = await client.get("https://api.open-meteo.com/v1/forecast",
params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
# Converti in DataFrame
hourly = data['hourly']
df = pd.DataFrame(hourly)
df['timestamp'] = pd.to_datetime(df['time'])
df = df.set_index('timestamp').drop(columns=['time'])
return df
@app.post("/forecast", response_model=ForecastResponse)
async def generate_forecast(request: ForecastRequest):
"""Endpoint principale per generare previsioni."""
try:
# Configurazione impianto (in produzione: da database)
plant_config = {
'solar_puglia_5mw': {'lat': 40.5, 'lon': 17.2,
'rated_power': 5000, 'type': 'solar'},
'wind_campania_12mw': {'lat': 40.8, 'lon': 15.1,
'rated_power': 12000, 'type': 'wind'}
}
if request.plant_id not in plant_config:
raise HTTPException(404, f"Plant {request.plant_id} not found")
config = plant_config[request.plant_id]
cached = get_model(request.plant_id)
model = cached['model']
model_version = str(cached['version'].version)
# Fetch dati NWP
nwp_data = await fetch_nwp_data(
config['lat'], config['lon'],
request.forecast_date, request.horizon_hours
)
# Preprocessing e feature engineering (semplificato)
# In produzione: pipeline standardizzata e validata
features = preprocess_for_inference(nwp_data, config)
# Inference
with torch.no_grad():
enc_input = torch.FloatTensor(features['encoder']).unsqueeze(0)
dec_input = torch.FloatTensor(features['decoder']).unsqueeze(0)
if request.include_quantiles:
predictions = model.predict_quantiles(enc_input, dec_input)
# predictions: (1, horizon, 5) per q10,q25,q50,q75,q90
preds_np = predictions.squeeze(0).numpy()
else:
predictions = model(enc_input, dec_input)
preds_np = predictions.squeeze(0).numpy()
# Costruisci risposta
start_ts = pd.Timestamp(request.forecast_date)
forecasts = []
for h in range(min(request.horizon_hours, len(preds_np))):
ts = start_ts + pd.Timedelta(hours=h)
entry = {
'timestamp': ts.isoformat(),
'power_kw': max(0, float(preds_np[h, 2] if request.include_quantiles
else preds_np[h]))
}
if request.include_quantiles:
entry.update({
'q10': max(0, float(preds_np[h, 0])),
'q25': max(0, float(preds_np[h, 1])),
'q75': max(0, float(preds_np[h, 3])),
'q90': max(0, float(preds_np[h, 4]))
})
forecasts.append(entry)
return ForecastResponse(
plant_id=request.plant_id,
forecast_date=request.forecast_date,
generated_at=datetime.utcnow().isoformat(),
model_version=model_version,
forecasts=forecasts
)
except Exception as e:
logger.error(f"Forecast error for {request.plant_id}: {e}",
exc_info=True)
raise HTTPException(500, f"Forecast generation failed: {str(e)}")
def preprocess_for_inference(nwp_data: pd.DataFrame,
config: dict) -> dict:
"""Preprocessing NWP per inference (placeholder)."""
# In produzione: replicare esattamente la pipeline di training
n_enc = 48 # lookback 48 ore
n_dec = 24 # horizon 24 ore
n_enc_feat = 25
n_dec_feat = 10
return {
'encoder': np.random.randn(n_enc, n_enc_feat).astype(np.float32),
'decoder': np.random.randn(n_dec, n_dec_feat).astype(np.float32)
}
# ---- Scheduler per forecast automatici ----
scheduler = BackgroundScheduler()
@app.on_event("startup")
async def startup_event():
"""Avvia scheduler al boot dell'applicazione."""
# Genera forecast day-ahead ogni giorno alle 11:00 (dopo asta MGP)
scheduler.add_job(
trigger_daily_forecast,
'cron', hour=11, minute=0,
id='day_ahead_forecast',
replace_existing=True
)
# Aggiorna forecast intraday ogni 4 ore
scheduler.add_job(
trigger_intraday_update,
'cron', hour='*/4',
id='intraday_update',
replace_existing=True
)
scheduler.start()
logger.info("Forecast scheduler started")
def trigger_daily_forecast():
"""Genera forecast day-ahead per tutti gli impianti."""
tomorrow = (datetime.now() + timedelta(days=1)).strftime('%Y-%m-%d')
plants = ['solar_puglia_5mw', 'wind_campania_12mw']
for plant_id in plants:
try:
logger.info(f"Generating day-ahead forecast for {plant_id} on {tomorrow}")
# In produzione: chiamata asincrona all'endpoint /forecast
except Exception as e:
logger.error(f"Failed forecast for {plant_id}: {e}")
모니터링 및 드리프트 감지
에너지 예측 모델은 다음과 같은 어려움을 겪고 있습니다. 컨셉 드리프트 계절의 (여름과 겨울 사이에 패턴이 바뀜) 데이터 드리프트 에 링크됨 공장 설비의 변화(신규 모듈, 패널 성능 저하). 모니터링 시스템 성능이 저하되는 시기를 자동으로 감지해야 합니다.
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import RegressionPreset, DataDriftPreset
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class ForecastMonitor:
"""
Monitoring continuo delle performance di forecasting.
Usa Evidently per drift detection e report automatici.
"""
def __init__(self, baseline_window_days: int = 30,
alert_rmse_threshold: float = 0.15, # nRMSE > 15% = alert
alert_drift_threshold: float = 0.3):
self.baseline_window_days = baseline_window_days
self.alert_rmse_threshold = alert_rmse_threshold
self.alert_drift_threshold = alert_drift_threshold
self.performance_history: list = []
def evaluate_forecast(self,
y_true: np.ndarray,
y_pred: np.ndarray,
rated_power: float,
timestamp: datetime) -> dict:
"""Calcola metriche e aggiunge alla storia."""
rmse = np.sqrt(np.mean((y_true - y_pred) ** 2))
mae = np.mean(np.abs(y_true - y_pred))
nrmse = rmse / rated_power
# Skill score vs persistence
y_persist = np.roll(y_true, 24) # previsione persistence 24h
rmse_persist = np.sqrt(np.mean((y_true[24:] - y_persist[24:]) ** 2))
skill = 1 - rmse / rmse_persist if rmse_persist > 0 else 0
metrics = {
'timestamp': timestamp.isoformat(),
'rmse': float(rmse),
'mae': float(mae),
'nrmse': float(nrmse),
'skill_score': float(skill),
'alert': nrmse > self.alert_rmse_threshold
}
self.performance_history.append(metrics)
if metrics['alert']:
logger.warning(
f"PERFORMANCE ALERT at {timestamp}: "
f"nRMSE={nrmse:.3f} > threshold {self.alert_rmse_threshold}"
)
return metrics
def generate_drift_report(self,
reference_features: pd.DataFrame,
current_features: pd.DataFrame,
output_path: str = "drift_report.html") -> Report:
"""Genera report Evidently per data drift."""
column_mapping = ColumnMapping(
target="power",
prediction="forecast",
numerical_features=reference_features.select_dtypes(
include=[np.number]).columns.tolist()
)
report = Report(metrics=[
DataDriftPreset(),
RegressionPreset()
])
report.run(
reference_data=reference_features,
current_data=current_features,
column_mapping=column_mapping
)
report.save_html(output_path)
logger.info(f"Drift report saved to {output_path}")
return report
def should_retrain(self) -> bool:
"""Determina se e necessario un retraining."""
if len(self.performance_history) < 7:
return False
recent = self.performance_history[-7:] # ultima settimana
avg_nrmse = np.mean([m['nrmse'] for m in recent])
n_alerts = sum(1 for m in recent if m['alert'])
return avg_nrmse > self.alert_rmse_threshold * 1.2 or n_alerts >= 3
이탈리아 전력 시장: 예측을 위한 맥락
시장 상황을 이해하는 것은 시장 상황을 정량화하는 데 필수적입니다. 경제적 가치 예측의. 이탈리아 전력 시장은 다음과 같이 관리됩니다. GME(시장 관리자 활력이 넘친다) 순차적 시장으로 구성되어 있습니다.
| 시장 | 두문자어 | 폐쇄 | 설명 | 관련성 예측 |
|---|---|---|---|---|
| 시장 전날 | MGP | 12시 D-1 | 다음날 시간별 경매 | 중요: D+1 24시간 예측 |
| 장중 시장 | E1-E7 | 여러 세션 | 장중 위치 조정 | 높음: 4~12시간 앞으로 예측 |
| 파견 서비스 시장 | MSD | 마디 없는 | 테르나 그리드 밸런싱 | 높음: 불균형 예측 |
| 선도시장 | MTE | 앞으로 | 월간/연간 선도 계약 | 평균: 계절별 예측 |
Il PUN 지수 GME (2025년 1월 1일부터) 기존 PUN을 대체했습니다. 지역별 가격의 가중 평균을 통해 지역 상황을 더욱 충실하게 반영 수요와 공급의. 이는 다음과 같은 영역에 대한 예측을 더욱 중요하게 만듭니다. 남부 이탈리아와 같이 높은 재생 가능 보급률.
이탈리아의 시장 지역
이탈리아는 다음과 같이 나누어진다. 6개 시장 지역 가상 상호 연결 영역도 포함됩니다. 남부 지역(SOUTH, CSUD)의 가격은 태양 피크 시간대에 하락하는 경향이 있습니다. 풍부한 태양광 발전 시스템이 설치되어 있기 때문입니다. 정확하게 예측하는 트레이더 지역 생산은 판매 제안을 최적화할 수 있습니다.
예측의 경제적 가치(추정)
- 5MW 태양광 시스템, 240일/년, 평균 생산량 4등가 시간/일
- 연간 예상 생산량: ~4,800MWh
- 평균 불균형 비용: 보상되지 않는 오류당 10-30 EUR/MWh
- nRMSE가 18%에서 6%로 개선됨: ~65% 노출 감소
- 예상 연간 절감액: 31,000 - 94,000 EUR 5MW 시스템용
- ML 플랫폼 투자 회수 기간은 일반적으로 6~18개월입니다.
이탈리아 에너지 믹스 (2024-2025)
이탈리아는 2024~2025년 2년 동안 재생에너지 부문에서 역사적인 성과를 달성했습니다.
- 태양광 발전: 설치용량 40GW 초과(2025년 7월), 2024년에만 6.8GW, 2025년 6.4GW 추가
- 바람: 약 13GW 설치, 2030년까지 28.1GW 목표
- 총 재생 가능 용량: 2024년 말 74.3GW(Legambiente), 1년 내 +7.5GW
- PNIEC 2030 목표: 총 재생에너지 131GW, 여전히 채워져야 할 상당한 격차
이러한 폭발적인 성장으로 인해 예측은 전략적 자산 모두를 위해 공급망의 참가자: 생산자, 수집자, ESO/DSO 및 에너지 거래자.
안티 패턴 및 모범 사례
피해야 할 중요한 안티 패턴
- 일시적인 누출 날짜: 미래의 기능을 입력으로 사용하지 마십시오. 분할하기 전에 전체 데이터세트로 확장하지 마세요.
- 계절성 무시: 여름에만 훈련된 모델은 겨울에는 제대로 작동하지 않습니다. 항상 다년간의 데이터를 사용하세요
- 밤/조용한 시간에 MAPE: 0 또는 0에 가까운 값으로 나눕니다. 항상 nRMSE 또는 정규화된 MAE를 사용하세요.
- 단일 임플란트에 대한 과적합: 식물별 모델이 필요합니다. 전이 학습 없이 직접 가중치를 전달하지 마십시오.
- 풍력 곡선을 무시하십시오. v-P 관계는 비선형적이며 극단에서 잘립니다. ML 모델은 이 물리학에 대해 학습되어야 합니다.
- 간격 없는 예측: 시장 맥락에서 추정된 불확실성이 없는 예측 시점은 위험합니다. 항상 분위수 예측을 사용하세요
생산 모범 사례
- 엄격한 버전 관리: 각 모델과 각 데이터 세트는 데이터 해시를 사용하여 MLflow에서 버전이 지정되어야 합니다.
- 섀도우 모드: 배포하기 전에 새 모델을 최소 14일 동안 병렬로 실행하여 성능을 비교합니다.
- 자동 재교육: nRMSE가 3일 이상 연속으로 임계값을 초과하면 재훈련을 트리거합니다.
- 기능 드리프트 경고: NWP 기능 분포를 모니터링하여 날씨 공급자의 변화를 감지합니다.
- 되메우기 및 간격 채우기: 누락된 SCADA 데이터에 대한 강력한 전략 구현(고장 센서, 유지 관리)
- 앙상블 다양성: 항상 앙상블에 기본 지속성과 순수 NWP를 포함합니다. 다양성은 차이를 줄인다
결론 및 다음 단계
이 기사에서는 전문적인 에너지 예측 시스템을 구축하는 방법을 배웠습니다. 태양광과 풍력, 기능 엔지니어링부터 LSTM 인코더-디코더 아키텍처까지 주의 깊게 살펴보세요. 풍력 발전을 위한 TCN부터 분위수 회귀를 사용한 확률적 예측까지. 데이터 벤치마크 실제 데이터에 따르면 앙상블(LSTM + TCN + LightGBM)이 nRMSE에서 71% 개선을 달성한 것으로 나타났습니다. 지속성 기준과 비교했을 때 경제적 가치는 수만 달러로 추산됩니다. 단일 중간 규모 공장의 경우에도 연간 유로입니다.
이탈리아 상황은 특히 유리합니다. 40GW 이상의 태양광 발전 시설이 설치되어 있으며 재생 가능 보급률이 높은 지역을 강화하는 PUN 지수 GME로 전환 정확한 예측은 생산자와 거래자에게 점점 더 결정적인 경쟁 우위를 제공합니다.
EnergyTech 시리즈의 다음 기사에서는 시스템용 디지털 트윈 에너지: 태양광 발전 시스템과 풍력 발전 단지의 디지털 복제물을 만드는 방법 Azure Digital Twins 및 Python을 사용하여 운영, 예측 유지 관리 및 시나리오 시뮬레이션.
EnergyTech 시리즈 계속하기
- 다음: Azure 및 Python을 사용한 에너지 시스템용 디지털 트윈
- 관련 항목: DERMS - 분산 에너지 자원 관리
- 크로스 시리즈: 비즈니스용 MLOps - MLflow를 사용하여 프로덕션 중인 AI 모델
- 크로스 시리즈: 제조 분야의 AI: 예측 유지 관리 및 디지털 트윈
리소스 및 참고 자료
- PVGIS API(EU): re.jrc.ec.europa.eu/pvg_tools
- 개방형 날씨(무료 NWP): open-meteo.com
- 코페르니쿠스 ERA5: cds.climate.copernicus.eu
- GME 전력 시장: mercatoelettrico.org
- GSE 통계 데이터: gse.it/statistiche
- pvlib Python: 태양광 모델링을 위한 오픈 소스 라이브러리
- PyTorch 예측: TFT 및 고급 모델을 사용한 시계열 예측용 라이브러리







