廃棄物削減のための需要予測: フードテックにおける ML と時系列
毎年、世界では約 10億5,000万トンの食品が廃棄されるによると、 UNEP 食品廃棄物指標報告書 2024。このうち 60% は家庭から、28% はレストランからのものです。 大規模配布からは 12%。経済用語に翻訳すると、「その先」 1兆ドル 毎年焼かれる、世界のガス排出量の 10% を生み出す環境への影響を及ぼします。 これは全世界の航空会社のほぼ 5 倍に相当します。
しかし、逆説的ですが、毎日10億食分の食料が無駄にされている一方で、 783 何百万人もの人々が飢餓に苦しんでいる。問題の根本は文化や物流だけではありません。 そして基本的には 需要予測。大規模な小売業の注文が多すぎる 在庫切れの危険を冒さないでください。サプライヤーは安全を確保するために過剰生産します。その結果、すべてのリンクが 食物連鎖の中で、廃棄物となる緩衝液が蓄積されます。
イタリアでは、 ガッダ法 (n. 166/2016) 寄付に対する税制上の優遇措置を導入しました 余剰食料の削減と回収手順の簡素化。ヨーロッパレベルでの戦略は、 ファーム・トゥ・フォーク 拘束力のある目標を掲げ、2030年までに食品廃棄物を半減することを目指す 小売業者や食品業界向け。規制は緊急性を生み出しますが、機械学習はそれを実現します これらの目的を達成するための具体的なツール。
ML による需要予測は理論ではありません: LSTM と Temporal Fusion モデルを実装した小売業者 トランスフォーマーは、MAPE が従来の方法の典型的な 28% から 5 ~ 15% に削減されたと報告しています。 12 か月以内に 25 ~ 40% の無駄が削減され、ROI が測定可能になります。この記事では、 生データから実稼働モデルまで、あらゆるアーキテクチャ上の選択を分析する完全なパイプライン 動作するPythonコードを使用します。
この記事で学べること
- 2025 年の食品廃棄物に関する経済的および規制の枠組み
- 古典的な統計モデル: ARIMA、SARIMA、Holt-Winters、および Python コードを使用した Prophet
- 時系列の深層学習: LSTM、GRU、Temporal Fusion Transformer、および N-BEATS
- 高度な特徴エンジニアリング: 外生変数、巡回エンコーディング、ラグ特徴
- ウォークフォワード検証を備えた完全なエンドツーエンド ML パイプライン
- 実際のデータセットの比較ベンチマーク: MAPE、RMSE、MAE、トレーニング時間
- 在庫管理および動的価格設定との予測の統合
- 有効期限が近い商品の値下げ最適化アルゴリズム
- イタリアの大規模小売業のケーススタディ: 200 以上の販売拠点、35% の廃棄物削減
- ビジネス指標: 廃棄物削減率、予測精度、値下げ効率
FoodTech シリーズ: 食品業界における AI とテクノロジーに関する 10 件の記事
| # | アイテム | 集中 |
|---|---|---|
| 1 | フードテック入門 | エコシステムの概要と主要技術 |
| 2 | 食物連鎖におけるIoTとセンサー | フィールドからクラウドへのデータ パイプライン |
| 3 | 品質と検査のためのコンピュータビジョン | 欠陥分類と自動グレーディング |
| 4 | ブロックチェーンとトレーサビリティ | 農場から食卓までの食品の安全 |
| 5 | 垂直農法とアグリテック | MLによる管理栽培 |
| 6 | サプライチェーンの最適化 | 物流、コールドチェーン、透明性 |
| 7 | 農場管理ダッシュボード | 農業企業のリアルタイム監視 |
| 8 | 現在位置 - 需要予測と無駄の削減 | ML 時系列、LSTM、TFT、動的価格設定 |
| 9 | 衛星APIと精密農業 | NDVI、リモートセンシング、作物監視 |
| 10 | 作物監視のための ML Edge | フィールドデバイス上の組み込み推論 |
食品廃棄物の問題: データと規制 2025
ML による需要予測がフードテックにおける戦略的優先事項となっている理由を理解するには、 実数から始めなければなりません。の UNEP 食品廃棄物指標レポート 2024 それを明らかにします 2022 年(完全なデータがある昨年)、10 億 5,000 万トンの食料が廃棄されました。 これは一人当たり年間 132 kg に相当し、消費者が入手できるすべての食料のほぼ 5 分の 1 に相当します。
食品廃棄物の世界的な影響 (UNEP 2024)
| インジケータ | 価値 | ソース |
|---|---|---|
| 年間の食品ロス | 10.5億トン | UNEP 食品廃棄物インデックス 2024 |
| 失われた経済的価値 | ~1兆ドル/年 | FAO/世界銀行の推計 |
| GHG 排出量 (%) | 世界全体の 8 ~ 10% | UNEP 2024 |
| 一人当たりの廃棄物 | 132kg/人/年 | UNEP 2024 |
| 大規模小売取引における小売シェア | 全体の 12% が無駄になります | UNEP 2024 |
| ファミリーシェア | 全体の60% | UNEP 2024 |
| 給食サービスの割り当て | 全体の28% | UNEP 2024 |
| 小売前の損失(サプライチェーン) | 生産される食料の13% | FAO |
規制の枠組み: ガッダ法から農場からフォークまで
イタリアはヨーロッパの先駆者であり、 法律 166/2016 (ガッダ法)、それは 食品廃棄物に対して懲罰的ではなく報酬を与えるアプローチを導入しました。重要なポイントは次のとおりです。
- 税制上の優遇措置 余剰食料を寄付する人にとっては破壊に等しい IRPEF/IRES計算用
- 官僚的な単純化: までの寄付に対する書類手数料の廃止 15,000ユーロ/年
- 「ドギーバッグ」のプロモーション レストランやその近くの商品の販売 明確な割引による期限
- 食育 構造的な予防策として学校で
ヨーロッパレベルでは、 ファーム・トゥ・フォーク戦略 (グリーンディールの一部)目標を設定する 拘束力: 措置を講じて、2030 年までに小売および消費レベルでの廃棄物を 50% 削減する 大規模小売取引に義務付けられ、2025年から2026年にかけて段階的に施行される予定。私にとって 従業員 250 人を超える小売業者では、食品廃棄物に関する年次報告が義務付けられます。
企業戦略に対する規制の影響
インセンティブ(ガッダ法)、報告義務(農場から食卓まで)、および企業からの圧力の組み合わせ 消費者は、ML を使用して需要予測に投資するための明確なビジネス ケースを作成します。そうではありません 重要なのは持続可能性です。200 店舗を構え、利益率が 2 ~ 3% の小売業者の場合、 35% の無駄は、営業利益率の 50 ~ 80 ベーシス ポイントの回復に相当します。
食料需要予測が難しい理由
食品業界の需要予測には独特の課題があり、それが業界の需要予測の 1 つとなっています。 ビジネスに適用される機械学習のより複雑な問題。構造上の特徴 他の小売業界との違いは 5 つあります。
1. 傷みやすく販売期間が狭い
今週売れなかった衣料品は来週売れるかもしれません。サラダ フレッシュNo.生鮮食品には、 賞味期限は1日から14日まで、どれ つまり、過剰な予測誤差は、回復不可能な物理的無駄に直結します。 この誤差のコストは非対称です (過剰予測のコストは、多くの場合、予測のコストを上回ります)。 アンダー予測) には、特に上方誤差を最小限に抑えるモデルが必要です。
2. 複数レベルの季節性
食品の時系列は、複数の周波数で同時に季節性を示します。
- 日々の季節性: 火曜日の朝とは異なる金曜日の夜のセール
- 週ごとの季節性: 曜日ごとに異なる購入パターン
- 毎月の季節性: 給与サイクルに関連する月初/月末の影響
- 年間の季節性: 季節商品の夏と冬
- ホリデーシーズン: クリスマス、イースター、8月中旬に急上昇
従来の ARIMA モデルは 1 つの季節成分のみを処理します。現代のモデルのような Prophet と TFT は複数の季節をネイティブに処理します。
3. 非定常外生変数
食料需要は、規則的なパターンに従わない外部要因に強く影響されます。 気象条件 (一週間雨が降るとスープの売り上げが 40% 増加する)、プロモーション キャンペーン (宣伝チラシは一時的に需要を 3 倍にする可能性があります)、地元のイベント (サッカーの試合、 見本市)、競合他社の価格、ソーシャルメディアのトレンド。これらの外生変数を適切に組み込む 効果と平凡なモデルと優れたモデルの違い。
4. データが不十分な SKU のロングテール
一般的な大規模小売取引では、15,000 ~ 30,000 のアクティブな SKU を管理します。 SKU の 20% が売上の 80% を生み出します。 しかし、残りの 80% には、ゼロが多く含まれる、まばらで断続的な時系列があります。これらのために 製品の標準モデルは失敗し、Croston メソッドなどの特殊なアプローチが必要になります。 ゼロインフレモデルまたは同様の SKU からの転移学習。
5. ML を使用しない場合の典型的なエラー
Il 典型的な MAPE (平均絶対パーセント誤差)。 従来の予測システムでは 移動平均、手動ルール、または ML を使用しない ERP システムに基づいて、 20%と40% 新鮮な製品のために。 ML モデルの導入により、このエラーは次のように減少します。 5~15%、 LSTM では、最近のベンチマークで従来の手法の 28.76% に対して 16.43% の MAPE を実証しました。 43%の削減。
古典的な統計モデル: ARIMA、SARIMA、Holt-Winters、Prophet
深層学習に進む前に、古典的な統計モデルを理解することが不可欠です。そうではない これらは時代遅れですが、多くの場合、ビートのベースラインを表すため、解釈可能です。 計算量が軽く、一部の状況 (データが少なく、単純な製品) では、 より複雑なアプローチ。
ARIMA と SARIMA: 予測の基礎
ARIMA (AutoRegressive Integrated Moving Average) は、シリーズごとに最も使用されている統計モデルです。 単変量タイムスケール。 ARIMA(p,d,q) モデルは、次の 3 つのコンポーネントを組み合わせています。 AutoRegression (p lags del) 過去の値)、積分(系列を固定化するための差)、および移動平均 (残留誤差の q ラグ)。 SARIMA は季節成分 (P、D、Q) を追加します。
# SARIMA per forecasting vendite settimanali - prodotto fresco
import pandas as pd
import numpy as np
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.stattools import adfuller
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_percentage_error
# Caricamento dati di esempio
# Formato: date index, colonna 'sales' con unita vendute
df = pd.read_csv('vendite_insalata.csv', index_col='date', parse_dates=True)
df = df.asfreq('D') # Frequenza giornaliera
df['sales'] = df['sales'].fillna(0)
# Test di stazionarieta (ADF Test)
result = adfuller(df['sales'].dropna())
print(f'ADF Statistic: {result[0]:.4f}')
print(f'p-value: {result[1]:.4f}')
print(f'La serie e {"stazionaria" if result[1] < 0.05 else "non stazionaria"}')
# Split train/test (80/20 con holdout degli ultimi 30 giorni)
train_size = len(df) - 30
train = df.iloc[:train_size]
test = df.iloc[train_size:]
# Modello SARIMA(1,1,1)(1,1,1)7 - stagionalita settimanale
# p=1, d=1, q=1: componenti ARIMA
# P=1, D=1, Q=1, s=7: componenti stagionali settimanali
model = SARIMAX(
train['sales'],
order=(1, 1, 1),
seasonal_order=(1, 1, 1, 7), # s=7 per stagionalita settimanale
enforce_stationarity=False,
enforce_invertibility=False
)
results = model.fit(disp=False)
print(results.summary())
# Previsione sullo stesso periodo del test set
forecast = results.forecast(steps=len(test))
forecast = np.maximum(forecast, 0) # Nessun valore negativo
# Metriche
mape = mean_absolute_percentage_error(test['sales'], forecast) * 100
rmse = np.sqrt(np.mean((test['sales'].values - forecast.values) ** 2))
mae = np.mean(np.abs(test['sales'].values - forecast.values))
print(f'\nMetriche SARIMA:')
print(f' MAPE: {mape:.2f}%')
print(f' RMSE: {rmse:.2f} unita')
print(f' MAE: {mae:.2f} unita')
# Visualizzazione
plt.figure(figsize=(12, 5))
plt.plot(train['sales'][-60:], label='Train (ultimi 60gg)')
plt.plot(test['sales'], label='Reale', color='green')
plt.plot(test.index, forecast, label='Previsione SARIMA', color='red', linestyle='--')
plt.title('SARIMA - Previsione Vendite Prodotto Fresco')
plt.legend()
plt.tight_layout()
plt.savefig('sarima_forecast.png', dpi=150)
plt.show()
預言者: 外生変数を使用した解釈可能な予測
預言者 (Meta/Facebook、2017) 食品予測に最適 複数の季節(毎日、毎週、毎年)、休日をネイティブに管理しているため、 カスタマイズ可能なカレンダーによる効果、自動変化点による非線形トレンド、および変数 外因性 (追加のリグレッサー)。解釈可能性(トレンドへの自動分解 + 季節性 + 休日)により、技術者以外のユーザーでも高く評価されます。
# Prophet per forecasting con variabili esogene (meteo, promozioni)
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics
import pandas as pd
import numpy as np
# Prophet richiede colonne 'ds' (datetime) e 'y' (target)
df_prophet = df.reset_index().rename(columns={'date': 'ds', 'sales': 'y'})
# Aggiunta variabili esogene (regressori)
# Esempio: temperatura media giornaliera e flag promozionale
df_prophet['temperature'] = meteo_df['temp_media'] # gradi Celsius
df_prophet['is_promotion'] = promo_df['active_promo'].astype(int)
# Calendario festivo italiano (personalizzato)
holidays_it = pd.DataFrame({
'holiday': [
'Natale', 'Capodanno', 'Pasqua', 'Ferragosto',
'Festa Repubblica', 'Ognissanti'
],
'ds': pd.to_datetime([
'2024-12-25', '2025-01-01', '2025-04-20', '2025-08-15',
'2025-06-02', '2025-11-01'
]),
'lower_window': [-3, -1, -3, -5, -1, -1], # Giorni prima
'upper_window': [3, 2, 2, 2, 1, 1] # Giorni dopo
})
# Configurazione modello
model = Prophet(
holidays=holidays_it,
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False,
seasonality_mode='multiplicative', # Meglio per dati con forte trend
changepoint_prior_scale=0.05, # Flessibilità del trend
seasonality_prior_scale=10.0 # Forza delle componenti stagionali
)
# Aggiunta regressori
model.add_regressor('temperature', standardize=True)
model.add_regressor('is_promotion', standardize=False)
# Training
train_prophet = df_prophet.iloc[:train_size]
model.fit(train_prophet)
# Previsione (con valori futuri dei regressori)
future = model.make_future_dataframe(periods=30)
future['temperature'] = future_meteo_df['temp_media']
future['is_promotion'] = future_promo_df['active_promo'].astype(int)
forecast_prophet = model.predict(future)
# Cross-validation interna (walk-forward)
df_cv = cross_validation(
model,
initial='365 days', # Periodo di training iniziale
period='30 days', # Frequenza di re-fitting
horizon='14 days' # Orizzonte di previsione
)
metrics_cv = performance_metrics(df_cv)
print(f'Prophet MAPE (CV): {metrics_cv["mape"].mean()*100:.2f}%')
# Visualizzazione decomposizione
fig = model.plot_components(forecast_prophet)
fig.savefig('prophet_components.png', dpi=150)
print('Componenti Prophet salvate: trend + stagionalita + holiday effects')
古典的な統計モデルを使用する場合
| モデル | 強み | 制限事項 | 理想的な使用例 |
|---|---|---|---|
| アリマ/サリマ | 解釈可能、高速、固定シリーズ | 季節性は 1 つだけで、外生変数はありません | 安定した製品、少ないデータ |
| ホルト・ウィンターズ | トレンド + 季節性、シンプルさを管理 | 異常値、固定季節性を処理しません | 線形傾向と安定した季節性を備えたシリーズ |
| 預言者 | 複数の季節性、休日、リグレッサー | 非常に不規則なシリーズには理想的ではありません | お祭り・販促効果の高い商品 |
| LSTM/GRU | 複雑で多変量の長距離パターン | 豊富なデータが必要、ブラックボックス | 大量の SKU、多くの外生変数 |
| TFT | 解釈可能 + DL、マルチホライズン、注意 | 計算量が多く、GPU が必要 | 多くの SKU にわたる一元的な予測 |
時系列の深層学習: LSTM、GRU、TFT、N-BEATS
ディープラーニング モデルは、2018 年から 2020 年にかけて需要予測に革命をもたらしました。 彼らの優位性は、とりわけ以下のものが存在する場合に現れます: 相関する多くの外生変数、パターン 複雑な非線形、長距離の時間依存関係、および大量のマルチ SKU データ これにより、同様の製品間で転移学習を利用できるようになります。
LSTM と GRU: シーケンス内の選択的メモリ
Le 長短期記憶 (LSTM) そして ゲート付きリカレント ユニット (GRU) これらは、時間的シーケンスにおける長距離の依存関係を捕捉するように設計されたリカレント ネットワークです。 LSTM は 3 つのゲート (入力、忘れ、出力) を使用して、どの情報を保持するか破棄するかを決定します。 細胞の記憶の中で。 GRU は 2 つのゲート (リセット、更新) で簡素化され、同様のパフォーマンスを実現します パラメーターが少なくなります。
# LSTM multi-variate per demand forecasting alimentare
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_percentage_error
# ---- Preparazione dati ----
class FoodDemandDataset(torch.utils.data.Dataset):
def __init__(self, data, seq_len=14, pred_len=7):
self.seq_len = seq_len
self.pred_len = pred_len
self.data = torch.FloatTensor(data)
def __len__(self):
return len(self.data) - self.seq_len - self.pred_len + 1
def __getitem__(self, idx):
x = self.data[idx: idx + self.seq_len]
y = self.data[idx + self.seq_len: idx + self.seq_len + self.pred_len, 0]
return x, y # x: (seq_len, features), y: (pred_len,)
# ---- Architettura LSTM ----
class LSTMForecaster(nn.Module):
def __init__(self, input_size, hidden_size=128, num_layers=2,
pred_len=7, dropout=0.2):
super(LSTMForecaster, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.pred_len = pred_len
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout
)
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(hidden_size, pred_len)
def forward(self, x):
# x shape: (batch, seq_len, input_size)
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
out, _ = self.lstm(x, (h0, c0))
out = self.dropout(out[:, -1, :]) # Ultimo time step
out = self.fc(out) # (batch, pred_len)
return out
# ---- Feature preparation ----
def prepare_features(df):
"""Crea feature matrix multi-variate per LSTM"""
features = pd.DataFrame()
features['sales'] = df['sales']
# Lag features: vendite passate come input esplicito
for lag in [1, 2, 3, 7, 14]:
features[f'sales_lag_{lag}'] = df['sales'].shift(lag)
# Rolling statistics
features['sales_rolling_mean_7d'] = df['sales'].rolling(7).mean()
features['sales_rolling_std_7d'] = df['sales'].rolling(7).std()
features['sales_rolling_mean_14d'] = df['sales'].rolling(14).mean()
# Feature temporali cicliche (encoding sinusoidale)
features['day_sin'] = np.sin(2 * np.pi * df.index.dayofweek / 7)
features['day_cos'] = np.cos(2 * np.pi * df.index.dayofweek / 7)
features['month_sin'] = np.sin(2 * np.pi * df.index.month / 12)
features['month_cos'] = np.cos(2 * np.pi * df.index.month / 12)
# Variabili esogene (meteo, promozioni)
features['temperature'] = df['temperature']
features['is_weekend'] = (df.index.dayofweek >= 5).astype(int)
features['is_holiday'] = df['is_holiday'].astype(int)
features['is_promotion'] = df['is_promotion'].astype(int)
features = features.dropna()
return features
# ---- Training loop ----
def train_lstm_model(train_data, val_data, config):
model = LSTMForecaster(
input_size=config['input_size'],
hidden_size=config['hidden_size'],
num_layers=config['num_layers'],
pred_len=config['pred_len']
)
optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, patience=5, factor=0.5
)
criterion = nn.HuberLoss(delta=1.0) # Robusto agli outlier
train_loader = torch.utils.data.DataLoader(
FoodDemandDataset(train_data, config['seq_len'], config['pred_len']),
batch_size=config['batch_size'],
shuffle=True
)
best_val_loss = float('inf')
patience_counter = 0
for epoch in range(config['epochs']):
model.train()
train_loss = 0
for x_batch, y_batch in train_loader:
optimizer.zero_grad()
y_pred = model(x_batch)
loss = criterion(y_pred, y_batch)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
train_loss += loss.item()
# Validazione
model.eval()
with torch.no_grad():
val_dataset = FoodDemandDataset(val_data, config['seq_len'], config['pred_len'])
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32)
val_loss = sum(criterion(model(x), y).item() for x, y in val_loader)
scheduler.step(val_loss)
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'best_lstm_model.pt')
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= config['early_stopping_patience']:
print(f'Early stopping all\'epoca {epoch}')
break
if epoch % 10 == 0:
print(f'Epoch {epoch}: Train Loss={train_loss:.4f}, Val Loss={val_loss:.4f}')
# Carica modello migliore
model.load_state_dict(torch.load('best_lstm_model.pt'))
return model
# Configurazione
config = {
'input_size': 15, # Numero di feature (sales + lag + temporal + exog)
'hidden_size': 128,
'num_layers': 2,
'pred_len': 7, # Previsione a 7 giorni
'seq_len': 14, # Lookback di 14 giorni
'batch_size': 32,
'lr': 0.001,
'epochs': 100,
'early_stopping_patience': 15
}
時間融合トランスフォーマー: 解釈性 + パワー
Il 時間融合トランスフォーマー (TFT) Google DeepMind による (2021) および現在 企業の需要予測の最先端技術とみなされます。そのアーキテクチャは次のことを組み合わせています。 両方の静的変数を処理する残留ゲート (GRN) を備えたマルチヘッド アテンション メカニズム (製品タイプ、製品カテゴリー) および事前に知られている時間変数 (カレンダー、 計画されたプロモーション)および観察された(過去の販売、天候)。
2024 年の M5 データセット (ウォルマート 10 店舗からの 30,490 時系列)、TFT、およびモデルのベンチマーク トランスベースの MASE は改善を実証 26~29% そして削減 WQLの 34% 季節的ナイーブ法と比較して。食品小売店のピクニック オランダのオンラインは、需要予測、出版の主要モデルとして TFT を採用しています。 生産における詳細な結果。
# Temporal Fusion Transformer con PyTorch Forecasting
# pip install pytorch-forecasting pytorch-lightning
from pytorch_forecasting import TemporalFusionTransformer, TimeSeriesDataSet
from pytorch_forecasting.data import GroupNormalizer
from pytorch_forecasting.metrics import QuantileLoss
import pytorch_lightning as pl
import pandas as pd
import torch
# ---- Preparazione dati nel formato TFT ----
def prepare_tft_dataset(df):
"""
df deve avere: date, store_id, product_id, sales,
temperature, is_holiday, is_promotion, price, category
"""
df = df.copy()
df['time_idx'] = (df['date'] - df['date'].min()).dt.days
df['month'] = df['date'].dt.month.astype(str)
df['day_of_week'] = df['date'].dt.dayofweek.astype(str)
max_prediction_length = 7 # Previsione 7 giorni
max_encoder_length = 28 # Lookback 28 giorni
training_cutoff = df['time_idx'].max() - max_prediction_length
training_dataset = TimeSeriesDataSet(
df[df.time_idx <= training_cutoff],
time_idx='time_idx',
target='sales',
group_ids=['store_id', 'product_id'],
# Variabili statiche (non cambiano nel tempo per ogni gruppo)
static_categoricals=['store_id', 'product_id', 'category'],
# Variabili temporali note in anticipo
time_varying_known_categoricals=['month', 'day_of_week', 'is_holiday'],
time_varying_known_reals=['time_idx', 'is_promotion', 'price'],
# Variabili osservate (disponibili solo per il passato)
time_varying_unknown_reals=['sales', 'temperature'],
min_encoder_length=max_encoder_length // 2,
max_encoder_length=max_encoder_length,
min_prediction_length=1,
max_prediction_length=max_prediction_length,
target_normalizer=GroupNormalizer(
groups=['store_id', 'product_id'],
transformation='softplus' # Mantiene valori positivi
),
add_relative_time_idx=True,
add_target_scales=True,
add_encoder_length=True,
)
return training_dataset, training_cutoff
# ---- Modello TFT ----
def build_tft_model(training_dataset):
tft = TemporalFusionTransformer.from_dataset(
training_dataset,
learning_rate=0.03,
hidden_size=64, # Dimensione hidden layer
attention_head_size=4, # Numero attention heads
dropout=0.1,
hidden_continuous_size=16, # Feature continue
loss=QuantileLoss(), # Previsione probabilistica
log_interval=10,
reduce_on_plateau_patience=4,
)
print(f'Numero parametri: {tft.size()/1e3:.1f}k')
return tft
# ---- Training con PyTorch Lightning ----
def train_tft(training_dataset, validation_dataset):
train_loader = training_dataset.to_dataloader(
train=True, batch_size=128, num_workers=4
)
val_loader = validation_dataset.to_dataloader(
train=False, batch_size=128, num_workers=4
)
trainer = pl.Trainer(
max_epochs=30,
accelerator='gpu' if torch.cuda.is_available() else 'cpu',
gradient_clip_val=0.1,
callbacks=[
pl.callbacks.EarlyStopping(
monitor='val_loss', patience=5, mode='min'
),
pl.callbacks.ModelCheckpoint(
monitor='val_loss', save_top_k=1
)
]
)
tft_model = build_tft_model(training_dataset)
trainer.fit(tft_model, train_loader, val_loader)
# Interpretabilita: variable importance
best_model = TemporalFusionTransformer.load_from_checkpoint(
trainer.checkpoint_callback.best_model_path
)
raw_predictions, x = best_model.predict(
val_loader, mode='raw', return_x=True
)
# Feature importance - chi contribuisce di più alle previsioni
interpretation = best_model.interpret_output(
raw_predictions, reduction='sum'
)
print('Feature importance:')
for key, vals in interpretation.items():
print(f' {key}: {vals}')
return best_model
N-BEATS: リカレントアーキテクチャを使用しないニューラルベース拡張
N-BEATS (Element AI、2020) と根本的に異なるアプローチ: レイヤーのみを使用する 畳み込みやアテンション メカニズムを使用せず、完全に接続され、スタックとブロックに編成されます。 各ブロックは信号を基底拡張 (トレンド、季節性) と残差に分解します。 M4について 競合データセットは、すべての統計手法を 11% 上回っており、ハイブリッド ニューラル統計手法も上回りました。 3%の差で勝者。季節性の強い食品にはN-BEATSの解釈版 ビジネス アナリストが高く評価するトレンド/季節分解を生成します。
食料需要予測のための特徴エンジニアリング
特徴量エンジニアリングの品質は、多くの場合、モデルのパフォーマンスにおいて最も重要な要素となります。 優れた特徴エンジニアリングを備えた LSTM は、平凡な特徴エンジニアリングを備えた TFT よりも優れたパフォーマンスを発揮します。見てみましょう 食品のコンテキスト向けに構築される機能の主なカテゴリ。
時間変数の循環エンコーディング
よくある間違いは、曜日を整数値 (0 ~ 6) として扱うことです。問題はそれです モデルは、6 日目 (日曜日) が 0 日目 (月曜日) に「近い」ことを理解していません。エンコーディング サインとコサインを伴う巡回は、この問題を解決します。
# Feature engineering completo per food demand forecasting
import pandas as pd
import numpy as np
from typing import List, Dict
def create_temporal_features(df: pd.DataFrame) -> pd.DataFrame:
"""Crea feature temporali cicliche e categoriche"""
df = df.copy()
idx = df.index
# Encoding ciclico - preserva la ciclicita (es. lunedi vicino a domenica)
df['hour_sin'] = np.sin(2 * np.pi * idx.hour / 24)
df['hour_cos'] = np.cos(2 * np.pi * idx.hour / 24)
df['dow_sin'] = np.sin(2 * np.pi * idx.dayofweek / 7)
df['dow_cos'] = np.cos(2 * np.pi * idx.dayofweek / 7)
df['month_sin'] = np.sin(2 * np.pi * idx.month / 12)
df['month_cos'] = np.cos(2 * np.pi * idx.month / 12)
df['doy_sin'] = np.sin(2 * np.pi * idx.dayofyear / 365.25)
df['doy_cos'] = np.cos(2 * np.pi * idx.dayofyear / 365.25)
# Feature binarie utili per GDO italiana
df['is_weekend'] = (idx.dayofweek >= 5).astype(int)
df['is_monday'] = (idx.dayofweek == 0).astype(int)
df['is_friday'] = (idx.dayofweek == 4).astype(int)
# Posizione nel mese (utile per effetti stipendio)
df['day_of_month'] = idx.day
df['is_month_start'] = (idx.day <= 5).astype(int)
df['is_month_end'] = (idx.day >= 25).astype(int)
# Settimana dell'anno
df['week_of_year'] = idx.isocalendar().week.astype(int)
df['quarter'] = idx.quarter
return df
def create_lag_features(df: pd.DataFrame, target_col: str,
lags: List[int] = None) -> pd.DataFrame:
"""Crea lag features del target"""
if lags is None:
lags = [1, 2, 3, 7, 14, 21, 28]
df = df.copy()
for lag in lags:
df[f'{target_col}_lag_{lag}d'] = df[target_col].shift(lag)
return df
def create_rolling_features(df: pd.DataFrame, target_col: str,
windows: List[int] = None) -> pd.DataFrame:
"""Crea rolling statistics"""
if windows is None:
windows = [3, 7, 14, 28]
df = df.copy()
for window in windows:
df[f'{target_col}_roll_mean_{window}d'] = (
df[target_col].shift(1).rolling(window).mean()
)
df[f'{target_col}_roll_std_{window}d'] = (
df[target_col].shift(1).rolling(window).std()
)
df[f'{target_col}_roll_min_{window}d'] = (
df[target_col].shift(1).rolling(window).min()
)
df[f'{target_col}_roll_max_{window}d'] = (
df[target_col].shift(1).rolling(window).max()
)
# Exponential moving average
df[f'{target_col}_ema_{window}d'] = (
df[target_col].shift(1).ewm(span=window).mean()
)
return df
def create_weather_features(df: pd.DataFrame,
meteo_df: pd.DataFrame) -> pd.DataFrame:
"""
Integra variabili meteorologiche
meteo_df: DataFrame con colonne [date, temp_max, temp_min, precipitazione_mm,
umidita, radiazione_solare, vento_kmh]
"""
df = df.merge(meteo_df, left_index=True, right_on='date', how='left')
# Feature derivate dal meteo
df['temp_delta'] = df['temp_max'] - df['temp_min']
df['is_hot'] = (df['temp_max'] > 28).astype(int) # Ondata di caldo
df['is_cold'] = (df['temp_min'] < 5).astype(int) # Freddo invernale
df['is_rain'] = (df['precipitazione_mm'] > 1).astype(int)
# Interazioni meteo x prodotto (es. gelati vanno bene con caldo)
# Necessità di parametrizzazione per categoria prodotto
df['heat_wave_consecutive'] = (
df['is_hot'].rolling(3).sum() == 3
).astype(int)
return df
def create_promotion_features(df: pd.DataFrame,
promo_df: pd.DataFrame) -> pd.DataFrame:
"""
Crea feature da calendario promozionale
promo_df: colonne [date, sku_id, discount_pct, is_volantino, is_digital_promo]
"""
df = df.merge(promo_df, left_index=True, right_on='date', how='left')
df['discount_pct'] = df['discount_pct'].fillna(0)
df['is_promotion'] = (df['discount_pct'] > 0).astype(int)
df['is_deep_discount'] = (df['discount_pct'] > 0.3).astype(int)
# Effetto anticipazione promozione (pre-promo dip)
df['promo_lag_1'] = df['is_promotion'].shift(1).fillna(0)
df['promo_lead_1'] = df['is_promotion'].shift(-1).fillna(0) # Solo in training
# Effetto post-promozione (inventory loading)
df['days_since_last_promo'] = (
df['is_promotion']
.pipe(lambda s: s.where(s.eq(1)).ffill().index - s.index)
.dt.days
.clip(upper=30)
)
return df
def create_price_features(df: pd.DataFrame) -> pd.DataFrame:
"""Feature relative al prezzo e alla sua variazione"""
df = df.copy()
df['price_lag_1'] = df['price'].shift(1)
df['price_change_pct'] = df['price'].pct_change()
df['price_vs_avg_30d'] = df['price'] / df['price'].rolling(30).mean() - 1
return df
def full_feature_engineering_pipeline(df: pd.DataFrame,
meteo_df: pd.DataFrame,
promo_df: pd.DataFrame) -> pd.DataFrame:
"""Pipeline completa di feature engineering"""
df = create_temporal_features(df)
df = create_lag_features(df, 'sales')
df = create_rolling_features(df, 'sales')
df = create_weather_features(df, meteo_df)
df = create_promotion_features(df, promo_df)
df = create_price_features(df)
df = df.dropna()
print(f'Feature totali create: {df.shape[1]}')
print(f'Sample size dopo feature engineering: {df.shape[0]}')
return df
完全な ML パイプライン: 生データから本番環境のモデルまで
プロフェッショナルな需要予測パイプラインはモデルのトレーニングに限定されません。必要なもの データ収集、前処理、ウォークフォワード検証を管理するエンドツーエンドのアーキテクチャ モデルの選択、導入、継続的な監視。以下は、本番環境に対応したパイプラインの構造です。
# Pipeline ML completa per food demand forecasting
import mlflow
import mlflow.pytorch
from dataclasses import dataclass, field
from typing import Optional, Tuple
import pandas as pd
import numpy as np
import json
import logging
from pathlib import Path
from datetime import datetime, timedelta
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ---- Configurazione Pipeline ----
@dataclass
class ForecastConfig:
# Data
store_ids: list = field(default_factory=list)
product_categories: list = field(default_factory=list)
training_lookback_days: int = 365
forecast_horizon_days: int = 7
# Feature engineering
lag_days: list = field(default_factory=lambda: [1, 2, 3, 7, 14, 28])
rolling_windows: list = field(default_factory=lambda: [7, 14, 28])
# Model selection
model_type: str = 'tft' # 'sarima', 'prophet', 'lstm', 'tft', 'xgboost'
use_ensemble: bool = False
# Validation
n_cv_splits: int = 4
val_horizon_days: int = 14
# MLflow
experiment_name: str = 'food_demand_forecasting'
run_name: Optional[str] = None
# Deployment
min_mape_threshold: float = 0.20 # Non deployare se MAPE > 20%
model_registry_name: str = 'food_demand_model'
# ---- Step 1: Data Collection e Validation ----
class DataCollector:
def __init__(self, config: ForecastConfig):
self.config = config
def collect(self, start_date: str, end_date: str) -> pd.DataFrame:
"""Raccoglie dati da data warehouse (es. Snowflake, BigQuery)"""
query = f"""
SELECT
date,
store_id,
product_id,
category,
sales_units,
price,
stock_level,
is_holiday,
is_promotion,
discount_pct
FROM gold.sales_daily
WHERE date BETWEEN '{start_date}' AND '{end_date}'
AND store_id IN ({','.join(map(str, self.config.store_ids))})
ORDER BY date, store_id, product_id
"""
# Qui si userebbe il connector del DWH (es. snowflake-connector-python)
# df = snowflake_connector.execute(query)
logger.info(f'Dati raccolti: {start_date} - {end_date}')
return df # Placeholder
def validate(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, dict]:
"""Validazione qualità dati"""
issues = {}
# Check valori negativi
neg_sales = (df['sales_units'] < 0).sum()
if neg_sales > 0:
issues['negative_sales'] = neg_sales
df.loc[df['sales_units'] < 0, 'sales_units'] = 0
# Check missing dates per gruppo
for group_id, group_df in df.groupby(['store_id', 'product_id']):
date_range = pd.date_range(group_df['date'].min(), group_df['date'].max())
missing = len(date_range) - len(group_df)
if missing > 0:
issues[f'missing_dates_{group_id}'] = missing
# Check outliers (IQR method)
Q1 = df['sales_units'].quantile(0.25)
Q3 = df['sales_units'].quantile(0.75)
IQR = Q3 - Q1
outliers = ((df['sales_units'] < Q1 - 3*IQR) | (df['sales_units'] > Q3 + 3*IQR)).sum()
issues['outliers'] = int(outliers)
logger.info(f'Validazione dati: {issues}')
return df, issues
# ---- Step 2: Walk-Forward Validation ----
class WalkForwardValidator:
"""
Validazione temporale corretta per time series.
NON usare cross-validation classico che crea data leakage.
"""
def __init__(self, config: ForecastConfig):
self.config = config
def validate(self, df: pd.DataFrame, model_class) -> dict:
results = []
total_days = len(df['date'].unique())
split_size = total_days // (self.config.n_cv_splits + 1)
for fold in range(self.config.n_cv_splits):
train_end_idx = split_size * (fold + 1)
val_start_idx = train_end_idx
val_end_idx = val_start_idx + self.config.val_horizon_days
train_dates = sorted(df['date'].unique())[:train_end_idx]
val_dates = sorted(df['date'].unique())[val_start_idx:val_end_idx]
train_df = df[df['date'].isin(train_dates)]
val_df = df[df['date'].isin(val_dates)]
if len(val_df) == 0:
continue
# Training sul fold
model = model_class(self.config)
model.fit(train_df)
predictions = model.predict(val_df)
# Metriche
actuals = val_df['sales_units'].values
preds = predictions['forecast'].values
mape = np.mean(np.abs((actuals - preds) / (actuals + 1e-8))) * 100
rmse = np.sqrt(np.mean((actuals - preds) ** 2))
mae = np.mean(np.abs(actuals - preds))
results.append({
'fold': fold,
'mape': mape,
'rmse': rmse,
'mae': mae,
'train_size': len(train_df),
'val_size': len(val_df)
})
logger.info(f'Fold {fold}: MAPE={mape:.2f}%, RMSE={rmse:.2f}')
# Aggrega risultati
results_df = pd.DataFrame(results)
return {
'mean_mape': results_df['mape'].mean(),
'std_mape': results_df['mape'].std(),
'mean_rmse': results_df['rmse'].mean(),
'mean_mae': results_df['mae'].mean(),
'folds': results
}
# ---- Step 3: MLflow Tracking e Model Registry ----
class ExperimentTracker:
def __init__(self, config: ForecastConfig):
self.config = config
mlflow.set_experiment(config.experiment_name)
def run_experiment(self, df: pd.DataFrame, model_class) -> str:
run_name = self.config.run_name or f'{self.config.model_type}_{datetime.now():%Y%m%d_%H%M}'
with mlflow.start_run(run_name=run_name) as run:
# Log parametri
mlflow.log_params({
'model_type': self.config.model_type,
'forecast_horizon': self.config.forecast_horizon_days,
'training_lookback': self.config.training_lookback_days,
'n_cv_splits': self.config.n_cv_splits
})
# Walk-forward validation
validator = WalkForwardValidator(self.config)
metrics = validator.validate(df, model_class)
# Log metriche
mlflow.log_metrics({
'cv_mean_mape': metrics['mean_mape'],
'cv_std_mape': metrics['std_mape'],
'cv_mean_rmse': metrics['mean_rmse'],
'cv_mean_mae': metrics['mean_mae']
})
# Training finale su tutti i dati
final_model = model_class(self.config)
final_model.fit(df)
# Log artefatti
mlflow.log_dict(metrics, 'validation_metrics.json')
# Registra modello se MAPE accettabile
if metrics['mean_mape'] < self.config.min_mape_threshold * 100:
mlflow.pytorch.log_model(
final_model,
artifact_path='model',
registered_model_name=self.config.model_registry_name
)
logger.info(f'Modello registrato: MAPE={metrics["mean_mape"]:.2f}%')
else:
logger.warning(
f'Modello NON registrato: MAPE {metrics["mean_mape"]:.2f}% '
f'> soglia {self.config.min_mape_threshold * 100}%'
)
return run.info.run_id
ベンチマーク: 実際のデータセット上のモデル間の比較
以下は、典型的なデータセットに対するさまざまなアプローチのパフォーマンスの体系的な比較です。 GDO: 150 SKU の生鮮食品 (乳製品、果物、野菜、肉)、2 年間の毎日のデータ 外生変数 (天候、プロモーション、休日) を含む 5 つの販売ポイントから。
比較ベンチマーク モデル - イタリアの GDO データセット (150 SKU、2 年)
| モデル | マップ (%) | RMSE(単位) | MAE(ユナイテッド) | トレーニング時間 | 解釈可能性 | 注意事項 |
|---|---|---|---|---|---|---|
| 移動平均(ベースライン) | 31.4 | 18.7 | 12.3 | 1分未満 | 高い | 季節性なし、リグレッサーなし |
| サリマ | 22.8 | 14.2 | 9.8 | ~15分 | 高い | 季節のずれは 1 つだけ |
| ホルト・ウィンターズ | 20.1 | 13.1 | 8.9 | ~5分 | 高い | 通常シリーズに最適 |
| 預言者 | 14.6 | 10.4 | 7.2 | ~30分 | 高い | 休日/リグレッサーに優れています |
| XGBoost + 機能エンジニアリング | 12.9 | 9.8 | 6.7 | ~10分 | 平均 | クリティカルな特徴量エンジニアリング |
| LSTM (単変量) | 16.4 | 11.2 | 7.8 | 約 45 分の GPU | 低い | exogなしのProphetよりも悪い |
| LSTM (多変量) | 10.8 | 8.4 | 5.9 | ~2時間のGPU | 低い | exogによる強力な改善 |
| N-BEATS | 11.2 | 8.7 | 6.1 | ~1時間のGPU | 平均 | 解釈可能な分解 |
| TFT (時間融合トランスフォーマー) | 8.3 | 6.9 | 4.8 | ~4時間のGPU | 高い | 全体的に最高、解釈可能 |
| アンサンブル TFT + XGBoost | 7.6 | 6.4 | 4.4 | ~5時間のGPU | 平均 | シングル TFT で +8% |
データセット: 150 の生鮮食品 SKU、5 店舗、2 年間の日次データ。 4 フォールドでのウォークフォワード検証 (14 日間の期間)。 GPU:NVIDIA A100。
モデル選択に関する考慮事項
TFT は最も強力なオプションですが、GPU と各 SKU の豊富なデータが必要です。小売業者向け SKU が数千あるものの、単一製品あたりの生産量が少ない場合は、ハイブリッド アプローチが最も効果的です。 ロングテール SKU (少量、低データ) の場合は XGBoost または Prophet、SKU の場合は TFT 最も多くの収益と廃棄物を生み出す大量生産産業。
無駄の削減: 在庫と動的価格設定との予測の統合
正確な予測モデルだけでは無駄は削減できません。それは必要です。 行動する 予測について 在庫管理と動的価格設定システムを通じて。ループ 意思決定と: 将来の需要の予測 → サプライヤーへの最適な発注 → モニタリング 在庫レベル → 有効期限が近い商品の動的な値下げ → 自動寄付 売れ残りの為。
動的安全在庫による在庫の最適化
# Inventory optimization basato su forecast probabilistico
import numpy as np
from dataclasses import dataclass
from typing import Optional
@dataclass
class InventoryParams:
product_id: str
shelf_life_days: int # Vita utile del prodotto
lead_time_days: int # Giorni da ordine a consegna
service_level: float = 0.95 # Fill rate target (95%)
holding_cost_per_unit: float = 0.05 # Costo mantenimento/giorno
shortage_cost_per_unit: float = 2.50 # Costo stockout per unita
waste_cost_per_unit: float = 1.20 # Costo di buttare una unita
def calculate_optimal_order(
demand_forecast: np.ndarray, # Previsione media per i prossimi N giorni
demand_std: np.ndarray, # Deviazione standard previsione
current_stock: int,
params: InventoryParams
) -> dict:
"""
Calcola la quantità ottimale da ordinare considerando:
- Shelf life del prodotto (prodotti freschi deperibili)
- Service level target
- Trade-off costo stockout vs costo spreco
"""
from scipy import stats
# Z-score per il service level target
z_score = stats.norm.ppf(params.service_level)
# Domanda attesa nel lead time
lt_demand = demand_forecast[:params.lead_time_days].sum()
lt_std = np.sqrt((demand_std[:params.lead_time_days] ** 2).sum())
# Safety stock = Z * sigma * sqrt(lead_time)
safety_stock = z_score * lt_std
# Reorder point
reorder_point = lt_demand + safety_stock
# Domanda fino a shelf life (non ordinare più di quanto venderemo)
max_sellable_demand = demand_forecast[:params.shelf_life_days].sum()
max_sellable_std = np.sqrt((demand_std[:params.shelf_life_days] ** 2).sum())
# Order quantity
unconstrained_order = max(0, reorder_point - current_stock)
# Vincolo shelf life: non ordinare più di quanto venderemo nella vita utile
# Aggiusta per il rischio di spreco
waste_probability = 1 - stats.norm.cdf(
current_stock + unconstrained_order,
loc=max_sellable_demand,
scale=max_sellable_std
)
# Costo atteso
expected_waste_cost = waste_probability * unconstrained_order * params.waste_cost_per_unit
expected_shortage_cost = (1 - params.service_level) * lt_demand * params.shortage_cost_per_unit
# Ottimizzazione: riduce ordine se costo spreco > beneficio margine
if expected_waste_cost > expected_shortage_cost * 0.5:
adjustment_factor = 1 - (expected_waste_cost / (expected_waste_cost + expected_shortage_cost))
optimal_order = int(unconstrained_order * adjustment_factor)
else:
optimal_order = int(unconstrained_order)
return {
'optimal_order_qty': max(0, optimal_order),
'reorder_point': reorder_point,
'safety_stock': safety_stock,
'expected_demand_7d': demand_forecast[:7].sum(),
'waste_risk_pct': waste_probability * 100,
'shortage_risk_pct': (1 - params.service_level) * 100,
'decision_reason': 'waste_adjusted' if expected_waste_cost > expected_shortage_cost * 0.5
else 'standard_reorder'
}
# Esempio utilizzo
params = InventoryParams(
product_id='insalata_vaschetta_150g',
shelf_life_days=5,
lead_time_days=1,
service_level=0.95,
waste_cost_per_unit=0.85,
shortage_cost_per_unit=3.20
)
# Simulazione con forecast TFT (media e std da quantile forecast)
forecast_mean = np.array([45, 52, 68, 71, 89, 95, 48]) # Lunedi-Domenica
forecast_std = np.array([8, 9, 12, 13, 15, 16, 9]) # Deviazione standard
order = calculate_optimal_order(
demand_forecast=forecast_mean,
demand_std=forecast_std,
current_stock=120,
params=params
)
print('Raccomandazione ordine:')
for key, val in order.items():
print(f' {key}: {val}')
無駄を削減するためのダイナミックプライシング: 値下げの最適化
有効期限に基づいた動的な価格設定は、コストを削減するための最も効果的なツールの 1 つです。 生鮮食品に含まれる廃棄物。目標は、 最適な割引価格 これにより、期限切れのリスクのある製品から回収される収益が最大化され、期限切れになることなく販売が促進されます。 定価商品の売り上げを共食いする。
無駄のないイスラエルのスタートアップ企業は、電子棚をベースにしたシステムを開発しました 有効期限、レベルに基づいて価格を自動的に更新するラベル (ESL) 在庫と顧客の行動。これを導入した小売業者は廃棄物を削減しました 40%、状況に応じてこれらの製品の収益を増加させながら 前(売れ残った商品は捨てる)。また トゥー・グッド・トゥ・ゴー 彼はで立ち上げた 2024 SKU レベルでデータを処理して割引レベルを最適化する AI プラットフォーム、 手動による期限チェックを 93 ~ 99% 削減します。
# Algoritmo markdown optimization per prodotti prossimi a scadenza
import numpy as np
from scipy.optimize import minimize_scalar
from dataclasses import dataclass
from typing import Tuple
@dataclass
class MarkdownConfig:
min_margin_pct: float = 0.10 # Margine minimo (non scendere sotto il 10%)
max_discount_pct: float = 0.70 # Sconto massimo (70%)
base_price: float = 1.99 # Prezzo normale
cost_per_unit: float = 0.95 # Costo di acquisto
waste_value: float = 0.00 # Valore residuo se buttato (es. recupero)
def price_elasticity_model(price: float, base_price: float,
base_demand: float,
elasticity: float = -1.8) -> float:
"""
Modello di elasticita del prezzo per prodotti freschi GDO.
Elasticita tipica prodotti freschi: -1.5 a -2.5
(sconto del 10% aumenta la domanda del 15-25%)
"""
price_ratio = price / base_price
return base_demand * (price_ratio ** elasticity)
def markdown_revenue(discount_pct: float,
config: MarkdownConfig,
current_stock: int,
base_demand: float,
elasticity: float = -1.8) -> float:
"""
Calcola il ricavo atteso da un determinato livello di sconto.
Massimizzare questa funzione = trovare lo sconto ottimale.
"""
discounted_price = config.base_price * (1 - discount_pct)
# Vincolo margine minimo
min_price = config.cost_per_unit * (1 + config.min_margin_pct)
if discounted_price < min_price:
return -1e10 # Penalita per prezzi sotto il costo
# Domanda attesa con il nuovo prezzo
expected_demand = price_elasticity_model(
discounted_price, config.base_price, base_demand, elasticity
)
# Unita vendute = min(domanda attesa, stock disponibile)
units_sold = min(expected_demand, current_stock)
units_wasted = max(0, current_stock - units_sold)
# Revenue = vendite + (eventuali recupero valore sprecato)
revenue = (units_sold * discounted_price) + (units_wasted * config.waste_value)
# Costo opportunità: confronto con vendita a prezzo pieno (se avessimo aspettato)
# ma qui massimizziamo il recupero dato che il prodotto scadera
return revenue
def find_optimal_markdown(config: MarkdownConfig,
current_stock: int,
days_to_expiry: int,
base_daily_demand: float,
demand_elasticity: float = -1.8) -> dict:
"""
Trova lo sconto ottimale considerando i giorni rimanenti a scadenza.
Logica: più vicini alla scadenza, maggiore lo sconto necessario.
"""
# Urgenza basata sui giorni a scadenza
if days_to_expiry >= 5:
# Nessun markdown necessario
return {'discount_pct': 0.0, 'recommended_action': 'no_action',
'expected_revenue': config.base_price * min(base_daily_demand * days_to_expiry, current_stock)}
# Moltiplicatore urgenza: più vicini alla scadenza, maggiore il markup di urgenza
urgency_factor = 1.0 + (5 - days_to_expiry) * 0.15 # +15% urgenza per giorno
# Domanda effettiva considerando urgenza (es. promozioni di fine giornata)
effective_demand = base_daily_demand * days_to_expiry * urgency_factor
# Ottimizzazione: trova lo sconto che massimizza il ricavo totale
result = minimize_scalar(
lambda d: -markdown_revenue(d, config, current_stock,
effective_demand, demand_elasticity),
bounds=(0.0, config.max_discount_pct),
method='bounded'
)
optimal_discount = result.x
optimal_revenue = -result.fun
waste_at_no_discount = max(0, current_stock - effective_demand)
waste_with_markdown = max(0, current_stock - price_elasticity_model(
config.base_price * (1 - optimal_discount),
config.base_price, effective_demand, demand_elasticity
))
waste_reduction_units = waste_at_no_discount - waste_with_markdown
revenue_recovered = optimal_revenue - (config.waste_value * current_stock)
action = (
'urgent_markdown' if days_to_expiry <= 1
else 'markdown' if optimal_discount > 0.15
else 'slight_discount'
)
return {
'discount_pct': round(optimal_discount * 100, 1),
'discounted_price': round(config.base_price * (1 - optimal_discount), 2),
'expected_revenue': round(optimal_revenue, 2),
'waste_reduction_units': round(waste_reduction_units, 0),
'revenue_recovered_vs_waste': round(revenue_recovered, 2),
'days_to_expiry': days_to_expiry,
'recommended_action': action
}
# Esempio: insalata con 2 giorni alla scadenza
config = MarkdownConfig(
base_price=1.99,
cost_per_unit=0.80,
min_margin_pct=0.05, # Riduco margine minimo vicino a scadenza
max_discount_pct=0.60
)
recommendation = find_optimal_markdown(
config=config,
current_stock=45,
days_to_expiry=2,
base_daily_demand=12,
demand_elasticity=-2.0
)
print('Raccomandazione markdown:')
for k, v in recommendation.items():
print(f' {k}: {v}')
# Output atteso:
# discount_pct: 35.0
# discounted_price: 1.29
# expected_revenue: 38.70
# waste_reduction_units: 28
# recommended_action: markdown
ケーススタディ: 200 以上の販売拠点を持つイタリアの大規模小売業
ML 需要予測実装の実際の (匿名化された) ケースを 1 つに分析してみましょう 220 の販売拠点を持ち、そのうち 18,000 のアクティブ SKU を持つイタリアの大規模小売チェーン 生鮮食品は 3,200 品目、年間売上高は約 28 億ユーロです。
初期状況
ML を導入する前、小売業者は予測機能を備えた従来の ERP システムを使用していました。 過去 4 か月の加重移動平均に基づいています。生鮮食品の平均MAPEは、 28%、季節商品では最高で 45% に達します。生鮮食品の廃棄率は 購入価格の 12% (廃棄コスト、価値の損失、評判への影響を含む) 重要な。バイヤーは注文を毎週手作業でレビューし、 フルタイムの FTE は約 3 名。
ソリューションアーキテクチャ
実装されたテクノロジースタック
| レイヤー | チョイステクノロジー | 関数 |
|---|---|---|
| データウェアハウス | 雪の結晶 | 全220店舗のPOSデータを一元管理 |
| ETL/ELT | DBT + エアバイト | POS統合、天気予報(OpenWeather API)、プロモーション(CRMシステム) |
| オーケストレーション | Apache エアフロー | 毎日の再トレーニング、午前 3:00 ~ 5:00、異常アラート |
| モデル | TFT (上位 500 SKU) + XGBoost (ロングテール) | 各 SKU x ストアの 7 日間および 14 日間の予測 |
| MLOps | Databricks 上の MLflow | 追跡実験、モデルレジストリ、A/Bテスト |
| 給仕 | FastAPI + Redis キャッシュ | レイテンシ < 100ms、24 時間キャッシュでの API 予測 |
| マークダウンエンジン | Python マイクロサービス | リスクのある製品の 4 時間ごとの最適な割引計算 |
| ESLの統合 | Hanshow / プライサー API | 電子棚札の価格を自動更新 |
| 寄付 | APIフードバンク | 有効期限が 24 時間未満で販売されていない商品の自動通知 |
タイムラインと実装フェーズ
- 1~2ヶ月目: データ監査、3 年間の履歴データのクリーニング、外部ソースの統合 (天気、プロモーション)。 50 個の大量生産性、保存性の高いパイロット SKU の特定。
- 3~4ヶ月目: パイロット モデル トレーニング (XGBoost + Prophet)、ウォークフォワード検証、 ベースライン MAPE 27.8%。 3 つのテスト ストアで ERP 注文システムと最初に統合されました。
- 5~6ヶ月目: 上位 500 の新鮮な SKU に関する TFT トレーニング。 A/B テスト: 30 店舗で ML を実施 従来のシステムを使用する 30 店舗と比較。結果: ML 店舗での廃棄物が 28% 削減されました。
- 7~9ヶ月目: 全220店舗に展開。マークダウンエンジンの実装 80 の設備を備えた店舗で ESL を統合。フードバンクを介した自動寄付の有効化。
- 10~12月: テンプレートの最適化、ソーシャルメディア機能の追加(言及 Instagram/TikTok で新たなトレンドを探る製品)。 TFT+XGBoost アンサンブルの実装。
12か月後に測定された結果
ビジネス KPI - 導入後の結果 (12 か月)
| メトリック | MLの前に | ML 12 か月後 | 変化 |
|---|---|---|---|
| MAPEの生鮮品 | 28.2% | 9.1% | -67.7% |
| 購入金額に対する廃棄率 | 12.0% | 7.8% | -35.0% |
| 在庫切れ率 (OOS) | 4.8% | 2.9% | -39.6% |
| 寄付製品(トン/年) | 45トン | 112トン | +149% (ガッダ給付金法) |
| 値下げ効率 (回収された収益の割合) | 32% | 71% | +39pp |
| 手作業による注文レビューに専念する FTE | 3.0 FTE | 0.8FTE | -73.3% |
| 廃棄コスト+処分費用の節約 | - | 420万ユーロ/年 | 新規保存 |
| 寄付に対する税制上の優遇措置(ガッダ法) | 85,000ユーロ/年 | 21万ユーロ/年 | +147% |
| プロジェクト ROI (投資: 180 万ユーロ) | - | 12 か月で 234% | 投資回収期間 5.2 か月 |
ケーススタディから学んだ教訓
- 履歴データの品質が実際のボトルネックです。 40%の確率で プロジェクトのうちの 1 つはデータのクリーニングと統合に特化していました。過去の POS データには、 休暇期間と店舗改装の大幅なギャップ。
- すべての SKU が同じモデルに値するわけではありません。 上位 500 SKU の TFT、 中間 SKU (500 ~ 3000) の Prophet、ロングテール (>3000 SKU) の移動平均。 最適なコスト/利益を実現する 3 段階のアプローチ。
- 変更管理は非常に重要です。 20年以上の経験を持つバイヤー 彼らはAIに抵抗しました。解決策: Prophet の分解 (トレンド + 季節性) を表示します。 + 休日)、購入者がモデルのロジックを理解し、検証できるようにします。
- マークダウン エンジンには ESL 統合が必要です。 電子棚ラベルがなければ、 価格を手動で更新すると、 自動システム。
- 継続的な監視が不可欠です。 モデルは時間の経過とともに劣化します (コンセプトドリフト) 新型コロナウイルス感染症後の購買行動の変化によるもの。再訓練 毎週自動で安定したパフォーマンスを維持します。
フードテックにおける需要予測のためのビジネス指標
需要予測システムの成功は MAPE だけで測られるわけではありません。それは必要です モデルの技術的品質と影響の両方をカバーするメトリクスフレームワークを定義する 本当のビジネス。
メトリクス フレームワーク: テクニカル + ビジネス
| カテゴリ | メトリック | 式/定義 | 典型的な大規模配布ターゲット |
|---|---|---|---|
| 精度モデル | メイプ | 平均絶対パーセント誤差 | < 15% 生鮮食品 |
| RMSE | 二乗平均平方根誤差 (単位) | SKUのボリュームによって異なります | |
| バイアス | (予測 - 実績) / 実績、平均 | |バイアス| < 5% (系統的な過不足なし) | |
| ウィメイプ | MAPE を販売数量で加重 | < 10% (ロングテールでは MAPE よりも優れています) | |
| 無駄 | 廃棄物削減率(WRR) | (前の廃棄物 - 後の廃棄物) / 前の廃棄物 | 12 か月以内に > 30% |
| 購入時の無駄率 | 廃棄価値 / 購入価値 | < 8% (セクターベンチマーク) | |
| 転用されたトン数(寄付) | 寄付されたトン数 / 潜在的な廃棄物の総量 | 超過分の50%以上 | |
| 可用性 | 在庫切れ率 (OOS) | 在庫切れのある SKU の割合 / 合計 SKU | < 3% |
| 充填率 | 納入台数 / 発注台数 | > 97% | |
| カバー日数 (DOC) | 現在の在庫 / 1 日の平均需要 | 製品カテゴリーに最適 | |
| ダイナミックプライシング | マークダウン効率 (ME) | 値下げによる収益/コストの潜在的な無駄 | > 65% |
| マークダウンで販売される製品 | 期限切れ間近の製品が割引価格で販売されている割合 | > 80% の製品が危険にさらされている | |
| 適用される平均割引 | 値下げ商品の平均割引率 | 25~45%(弾力性に最適) |
モニタリング ダッシュボード: コンセプト ドリフトの検出
食料予測モデルは特に影響を受けやすい コンセプトドリフト: 栄養トレンド(ビーガン製品の増加など)により、購入パターンは季節ごとに変化します。 価格危機 (2022 年から 2024 年のインフレにより、世界の購買行動が大きく変化した) により、 生鮮食品)、異常事態(パンデミック)の場合。継続的に監視する必要がある パフォーマンスの低下。
# Monitoring sistema di rilevamento concept drift per food forecasting
import numpy as np
import pandas as pd
from scipy import stats
from dataclasses import dataclass
from typing import List, Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class DriftAlert:
severity: str # 'WARNING' | 'CRITICAL'
metric: str
current_value: float
baseline_value: float
change_pct: float
affected_skus: List[str]
recommendation: str
class ForecastMonitor:
def __init__(self, baseline_window_days: int = 30,
monitoring_window_days: int = 7):
self.baseline_window = baseline_window_days
self.monitoring_window = monitoring_window_days
self.thresholds = {
'mape_warning': 0.20, # MAPE > 20% = warning
'mape_critical': 0.35, # MAPE > 35% = critical, richiede retraining
'bias_warning': 0.10, # Bias sistematico > 10%
'bias_critical': 0.20, # Bias sistematico > 20%
'waste_rate_warning': 0.10, # Tasso spreco > 10%
}
def compute_current_metrics(self, predictions_df: pd.DataFrame,
actuals_df: pd.DataFrame) -> dict:
"""Calcola metriche sull'ultima finestra di monitoring"""
merged = predictions_df.merge(actuals_df, on=['date', 'sku_id', 'store_id'])
metrics_by_sku = merged.groupby('sku_id').apply(
lambda g: pd.Series({
'mape': np.mean(np.abs((g['actual'] - g['forecast']) / (g['actual'] + 1e-8))),
'bias': np.mean((g['forecast'] - g['actual']) / (g['actual'] + 1e-8)),
'rmse': np.sqrt(np.mean((g['actual'] - g['forecast']) ** 2)),
'n_observations': len(g)
})
)
return {
'mean_mape': metrics_by_sku['mape'].mean(),
'p90_mape': metrics_by_sku['mape'].quantile(0.9),
'mean_bias': metrics_by_sku['bias'].mean(),
'degraded_skus': metrics_by_sku[
metrics_by_sku['mape'] > self.thresholds['mape_warning']
].index.tolist(),
'critical_skus': metrics_by_sku[
metrics_by_sku['mape'] > self.thresholds['mape_critical']
].index.tolist(),
'per_sku_metrics': metrics_by_sku
}
def detect_drift(self, baseline_metrics: dict,
current_metrics: dict) -> List[DriftAlert]:
"""Confronta metriche correnti vs baseline e genera alert"""
alerts = []
# Alert MAPE
mape_change = ((current_metrics['mean_mape'] - baseline_metrics['mean_mape'])
/ baseline_metrics['mean_mape'])
if current_metrics['mean_mape'] > self.thresholds['mape_critical']:
alerts.append(DriftAlert(
severity='CRITICAL',
metric='MAPE',
current_value=current_metrics['mean_mape'] * 100,
baseline_value=baseline_metrics['mean_mape'] * 100,
change_pct=mape_change * 100,
affected_skus=current_metrics['critical_skus'],
recommendation='RETRAINING IMMEDIATO richiesto. Analizza concept drift.'
))
elif current_metrics['mean_mape'] > self.thresholds['mape_warning']:
alerts.append(DriftAlert(
severity='WARNING',
metric='MAPE',
current_value=current_metrics['mean_mape'] * 100,
baseline_value=baseline_metrics['mean_mape'] * 100,
change_pct=mape_change * 100,
affected_skus=current_metrics['degraded_skus'],
recommendation='Monitorare trend. Pianificare retraining prossima settimana.'
))
# Alert Bias sistematico
if abs(current_metrics['mean_bias']) > self.thresholds['bias_critical']:
direction = 'sovrastima' if current_metrics['mean_bias'] > 0 else 'sottostima'
alerts.append(DriftAlert(
severity='CRITICAL',
metric='Bias',
current_value=current_metrics['mean_bias'] * 100,
baseline_value=baseline_metrics.get('mean_bias', 0) * 100,
change_pct=0,
affected_skus=[],
recommendation=f'Bias sistematico di {direction}: aggiorna calibrazione modello.'
))
return alerts
def check_and_alert(self, predictions_df: pd.DataFrame,
actuals_df: pd.DataFrame,
baseline_metrics: dict) -> None:
"""Entry point principale del monitor"""
current = self.compute_current_metrics(predictions_df, actuals_df)
alerts = self.detect_drift(baseline_metrics, current)
for alert in alerts:
if alert.severity == 'CRITICAL':
logger.critical(
f'[DRIFT CRITICAL] {alert.metric}: {alert.current_value:.1f}% '
f'(baseline: {alert.baseline_value:.1f}%) - {alert.recommendation}'
)
# Qui si trigghera notifica Slack/PagerDuty
self._send_alert(alert)
else:
logger.warning(
f'[DRIFT WARNING] {alert.metric}: {alert.current_value:.1f}% - '
f'{len(alert.affected_skus)} SKU degradati'
)
def _send_alert(self, alert: DriftAlert) -> None:
"""Invia notifica al team ML Ops (implementazione dipende dal sistema)"""
# Integrazione con: Slack, PagerDuty, email, Prometheus Alertmanager
pass
食品需要予測のベストプラクティスとアンチパターン
避けるべきアンチパターン
- 一時的漏洩日: 代わりに標準の相互検証を使用する ウォークフォワード検証では、将来のデータをトレーニングに導入します。そして最も一般的な方法論上のエラー そして、実稼働環境では誤りであることが判明する 30 ~ 50% の楽観的な MAPE につながります。
- すべての SKU に単一のテンプレート: 断続的な需要がある製品(販売 週に 2 ~ 3 回)、大量生産製品とは異なるモデルが必要です。申し込む 年間販売数 200 の製品に LSTM を適用すると、過剰適合と 60% 以上の MAPE が発生します。
- ビジネス上の偏見を無視します。 小売業者は歴史的に過剰注文する傾向がありました 在庫切れ(認識されている「2 つの悪」のうち最悪)を避けるため。モデルがトレーニングされている場合 過去の注文データ (実際の需要ではない) では、このバイアスが引き継がれます。販売データを活用する 命令ではなく効果的です。
- 将来のプロモーションをモデル化しないでください。 プロモーションを知らない予測 翌週に予定されている期間では体系的に不正確な予測が提供されます プロモーション。常にプロモーション カレンダーを入力として統合します。
- エラーの方向を無視して MAPE のみを最適化します。 文脈の中で 食品の場合、50単位の過剰予測誤差(50単位が無駄になる)ははるかに高価です 50 個の予測未満誤差 (50 個の販売損失)。非対称メトリクスを使用する (重み付け MAPE) またはトレーニングにおける非対称損失関数。
統合されたベストプラクティス
- 治療ごとに SKU を階層化します。 各 SKU を量、変動ごとに分類します。 そして賞味期限。異なるクラスターに異なるモデルを適用します。階層的アプローチ (グローバル + local) は、多くの場合、両方の純粋なモデルよりも優れたパフォーマンスを発揮します。
- 常に不確実性の推定値が含まれます。 正確な予測 (単一の数値) また、確率的予測 (分布または間隔) ほど有用ではありません。 TFTと預言者 これらは、最適な安全在庫を計算するための基礎となる分位数をネイティブに提供します。
- 再トレーニングを自動化しますが、異常を監視します。 毎日の再トレーニング これは便利ですが、昨日のデータが異常だった場合 (POS エラーなど)、エラーが増幅される可能性があります。 新しいモデルを運用環境にプッシュする前に、必ず健全性チェックを実装してください。
- 購入者との信頼関係を築く: ML システムは抵抗により失敗することがよくあります 技術的な品質ではなく、組織的なものです。モデルの分解を示し、説明します。 ビジネス言語での予測は、買い手に上書きできる可能性を残します。 モデルを改善するフィードバック ループ。
- ROI を継続的に測定します。 無駄の削減量を週単位で計算する 回避、値下げからの回復、ガッダ法の税制上の優遇措置。値を見えるようにする 組織のコミットメントを長期間にわたって維持するために作成され、基礎となるものです。
結論: 持続可能性ツールとしての予測
食品分野における機械学習による需要予測は、AI のユースケースの 1 つです。 現在利用可能な最高の影響力と複雑さの比率。問題は明確に定義されており、データは 存在し (どの小売業者も何年もの POS の歴史を持っています)、成功の指標は明確です (MAPE、 廃棄率、ROI)、その影響はビジネスを超えて広がります。 食品廃棄物を減らす 経済的、規制的、環境的要請.
2025 年に開始したいと考えているイタリアの大規模小売業に提案されている道筋は現実的です。 50 個の大量生産性の高い保存性の高い SKU のサブセットで Prophet または XGBoost を使用する場合、測定 30日後のMAPEデルタと廃棄物削減率を分析し、社内のビジネスケースを構築します。 その後、上位 SKU の TFT に向けて、そしてマークダウン統合に向けて段階的に拡張します。 ESL エンジンとシステム。
最終的な目標は、最も正確なモデルを作成することではありませんが、 意思決定のエコシステム 予測を行動に変える: 最適な注文、適応的な価格設定、寄付 自動。このエコシステムでは、機械学習とそれを可能にするインフラストラクチャが、 無駄の削減とあらゆる投資に見合った具体的なビジネス成果。
フードテックシリーズの続き
シリーズの次の記事では、需要予測のための補完的なテクノロジーについて説明します。
- 第 9 条 - 衛星 API と精密農業: 衛星データのような NDVI と Sentinel-2 は、ループを閉じた農業収量予測モデルをフィードします。 生産から流通まで。
- 第 10 条 - 作物監視のための ML Edge: 埋め込まれた推論 作物の状態をリアルタイムで監視するための現場の IoT デバイス。 サプライチェーンパイプラインへの統合。
本番展開を可能にする MLOps テクノロジーの詳細をご覧ください。 予測モデルについては、シリーズも参照してください ビジネス向け MLOps: MLflow を使用した本番環境での AI モデル.







