폐기물 감소를 위한 수요 예측: FoodTech의 ML 및 시계열
매년 세계는 약 음식물 쓰레기 10억 5천만 톤,에 따르면 UNEP 음식물 쓰레기 지수 보고서 2024. 이 중 60%는 가족, 28%는 식당에서 발생합니다. 대규모 유통의 경우 12%입니다. 경제 용어로 번역하면: 너머 1조 달러 매년 불탔다, 전 세계 가스 배출량의 10%를 생성하는 환경 영향 온실은 전 세계 항공의 거의 5배에 달하는 규모입니다.
그런데 역설적이게도 매일 수십억 개의 음식이 낭비되는 가운데, 783 수백만 명이 굶주림에 시달리고 있다. 문제의 근원은 단지 문화나 물류에만 있는 것이 아닙니다. 그리고 기본적으로 문제는 수요 예측. 대규모 소매업 주문이 너무 많다 품절 위험을 감수하지 마십시오. 공급업체는 안전을 위해 과잉 생산합니다. 결과적으로 모든 링크는 먹이사슬의 완충 장치가 축적되어 폐기물이 됩니다.
이탈리아에서는 가다법(n. 166/2016) 기부에 대한 세금 인센티브를 도입했습니다. 잉여 식량을 줄이고 회수 절차를 단순화했습니다. 유럽 수준에서는 전략 농장에서 포크까지 2030년까지 음식물 쓰레기를 절반으로 줄이는 것이 목표입니다. 소매업체와 식품 산업을 위한 제품입니다. 규제는 긴급성을 야기하지만 머신러닝은 이를 제공합니다. 이러한 목표를 달성하기 위한 구체적인 도구.
ML을 사용한 수요 예측은 이론이 아닙니다. LSTM 및 Temporal Fusion 모델을 구현한 소매업체 Transformers는 MAPE가 기존 방법의 일반적인 28%에서 5~15%로 감소했다고 보고합니다. 12개월 이내에 25-40%의 폐기물 감소 및 측정 가능한 ROI를 제공합니다. 이 기사에서 우리는 원시 데이터부터 생산 모델까지 모든 아키텍처 선택을 분석하는 완전한 파이프라인 작동하는 Python 코드로.
이 기사에서 배울 내용
- 2025년 음식물 쓰레기에 대한 경제 및 규제 프레임워크
- 고전적인 통계 모델: ARIMA, SARIMA, Holt-Winters 및 Python 코드를 사용한 Prophet
- 시계열을 위한 딥 러닝: LSTM, GRU, Temporal Fusion Transformer 및 N-BEATS
- 고급 기능 엔지니어링: 외생 변수, 순환 인코딩, 지연 기능
- 순방향 검증을 통해 엔드투엔드 ML 파이프라인 완성
- 실제 데이터세트 비교 벤치마크: MAPE, RMSE, MAE, 훈련 시간
- 재고 관리 및 동적 가격 책정과의 예측 통합
- 만료가 임박한 제품에 대한 마크다운 최적화 알고리즘
- 이탈리아 대규모 소매 무역 사례 연구: 200개 이상의 판매 지점, 35% 폐기물 감소
- 비즈니스 지표: 폐기물 감소율, 예측 정확도, 가격 인하 효율성
FoodTech 시리즈: 식품 산업의 AI 및 기술에 관한 10가지 기사
| # | Articolo | 집중하다 |
|---|---|---|
| 1 | 푸드테크 소개 | 생태계 개요 및 핵심 기술 |
| 2 | 먹이 사슬의 IoT 및 센서 | 필드-클라우드 데이터 파이프라인 |
| 3 | 품질 및 검사를 위한 컴퓨터 비전 | 결함 분류 및 자동 등급화 |
| 4 | 블록체인 및 추적성 | 농장에서 포크까지 식품 안전 |
| 5 | 수직 농업과 농업 기술 | ML을 통한 재배 제어 |
| 6 | 공급망 최적화 | 물류, 콜드체인 및 투명성 |
| 7 | 농장 관리 대시보드 | 농업회사 실시간 모니터링 |
| 8 | 현재 위치 - 수요 예측 및 폐기물 감소 | ML 시계열, LSTM, TFT, 동적 가격 책정 |
| 9 | 위성 API 및 정밀 농업 | NDVI, 원격 감지, 작물 모니터링 |
| 10 | 작물 모니터링을 위한 ML Edge | 현장 장치에 내장된 추론 |
음식물 쓰레기 문제: 데이터 및 규정 2025
ML을 통한 수요 예측이 FoodTech의 전략적 우선순위가 된 이유를 이해하려면 다음과 같이 하세요. 실수부터 시작해야 합니다. 그만큼 UNEP 음식물 쓰레기 지수 보고서 2024 그것을 밝힙니다 2022년(완전한 데이터를 기준으로 지난해) 10억 5천만 톤의 식품이 낭비되었으며, 이는 1인당 연간 132kg에 해당하며 소비자가 먹을 수 있는 전체 식품의 거의 5분의 1에 해당합니다.
음식물 쓰레기가 전 세계에 미치는 영향(UNEP 2024)
| 지시자 | Valore | 원천 |
|---|---|---|
| 연간 음식물 쓰레기 | 10억 5천만톤 | UNEP 음식물 쓰레기 지수 2024 |
| 경제적 가치 상실 | ~$1조/년 | FAO/세계은행 추정 |
| % 온실가스 배출 | 8-10% 글로벌 | UNEP 2024 |
| 1인당 폐기물 | 132kg/인/년 | UNEP 2024 |
| 대규모 소매업에서 소매점 비중 | 전체 낭비의 12% | UNEP 2024 |
| 가족 공유 | 전체의 60% | UNEP 2024 |
| 음식 서비스 할당량 | 전체의 28% | UNEP 2024 |
| 소매 전 손실(공급망) | 생산된 식품의 13% | FAO |
규제 프레임워크: Gadda 법률에서 농장, 포크까지
이탈리아는 유럽의 개척자였습니다. 법률 166/2016(가다법), 이는 음식물 쓰레기에 대해 징벌적 접근보다는 보상적 접근 방식을 도입했습니다. 핵심 사항은 다음과 같습니다.
- 세금 인센티브 잉여 식량을 기부하는 사람들: 파괴와 동일 IRPEF/IRES 계산용
- 관료적 단순화: 기부금에 대한 서류비용 면제 15,000유로/년
- '강아지 가방' 프로모션 음식점에서 가까운 곳에서 상품을 판매하거나 투명한 할인이 적용되는 마감일
- 식품교육 학교에서 구조적 예방 조치로
유럽 차원에서는, 농장에서 식탁까지의 전략 (그린 딜의 일부) 목표 설정 구속력: 조치를 통해 2030년까지 소매 및 소비 수준에서 폐기물을 50% 감소 2025년부터 2026년까지 점진적으로 시행될 대규모 소매업에 의무화됩니다. 나를 위해 직원이 250명 이상인 소매업체에서는 음식물 쓰레기에 대한 연간 보고가 의무화되었습니다.
기업 전략에 대한 규제 영향
인센티브(Gadda Law), 보고 의무(Farm to Fork) 및 압력의 조합 소비자는 ML을 통해 수요 예측에 투자하기 위한 명확한 비즈니스 사례를 만듭니다. 아니요 예 지속 가능성이 가장 중요합니다. 매장이 200개이고 마진이 2~3%인 소매업체의 경우 35% 낭비는 영업 이익률의 50-80bp를 회복하는 것과 같습니다.
식량 수요 예측이 어려운 이유
식품 산업의 수요 예측은 이를 가장 중요한 분야 중 하나로 만드는 독특한 과제를 제시합니다. 비즈니스에 적용되는 기계 학습의 더 복잡한 문제. 구조적 특성 다른 소매 부문과 차별화되는 5가지 요소가 있습니다.
1. 부패 가능성 및 좁은 판매 창구
이번 주에 판매되지 않은 의류 품목이 다음 주에 판매될 수도 있습니다. 샐러드 신선한 아니. 신선한 식품에는 유통기한은 1~14일, 어느 과도한 예측 오류는 복구할 수 없는 물리적 낭비로 직접적으로 변환된다는 의미입니다. 이러한 오류 비용 비대칭(과잉 예측 비용은 종종 예측 비용을 초과함) 과소예측)에는 특히 상향 오류를 최소화하는 모델이 필요합니다.
2. 다단계 계절성
음식 시계열은 여러 빈도에서 동시 계절성을 나타냅니다.
- 일일 계절성: 금요일 저녁 영업은 화요일 오전과 다름
- 주간 계절성: 요일별 구매 패턴이 다름
- 월별 계절성: 급여주기에 따른 월초/말 효과
- 연간 계절성: 여름 vs 겨울 제철상품
- 축제 시즌: 크리스마스, 부활절, 8월 중순에 갑자기 급증
기존 ARIMA 모델은 하나의 계절 성분만 처리합니다. 다음과 같은 현대 모델 Prophet와 TFT는 기본적으로 여러 시즌을 처리합니다.
3. 비정상 외생변수
식량 수요는 규칙적인 패턴을 따르지 않는 외부 요인에 의해 크게 영향을 받습니다. 기상 조건(일주일 동안 비가 오면 수프 판매가 40% 증가), 프로모션 캠페인 (홍보 전단지는 일시적으로 수요를 3배로 늘릴 수 있음), 지역 행사(축구 경기, 무역 박람회), 경쟁사 가격, 소셜 미디어 동향. 이러한 외생 변수를 적절하게 통합하십시오. 효과적이며 평범한 모델과 우수한 모델의 차이입니다.
4. 데이터가 부족한 SKU의 롱테일
일반적인 대규모 소매업에서는 15,000~30,000개의 활성 SKU를 관리합니다. SKU의 20%가 매출의 80%를 창출합니다. 그러나 나머지 80%에는 0이 많은 희박하고 간헐적인 시계열이 있습니다. 이들을 위해 제품의 표준모형은 실패하고 Croston 방법과 같은 특화된 접근방식이 필요하며, 제로 팽창 모델 또는 유사한 SKU로부터의 전이 학습.
5. ML이 없는 일반적인 오류
Il 일반적인 MAPE(평균 절대 백분율 오류)입니다. 전통적인 예측 시스템에서는 ML이 없는 이동 평균, 수동 규칙 또는 ERP 시스템을 기반으로 20%와 40% 신선한 제품을 위해. ML 모델을 도입하면 이 오류가 다음과 같이 줄어듭니다. 5-15%, LSTM은 최근 벤치마크에서 기존 방법의 28.76%에 비해 16.43%의 MAPE를 보여주었습니다. 43% 감소.
고전적 통계 모델: ARIMA, SARIMA, Holt-Winters 및 Prophet
딥러닝으로 넘어가기 전에, 고전적인 통계 모델을 이해하는 것이 필수적입니다. 아니다 더 이상 사용되지 않지만 종종 이길 수 있는 기준을 나타내기 때문에 해석이 가능합니다. 계산적으로 가볍고 일부 상황(적은 데이터, 단순한 제품)에서는 더 복잡한 접근 방식.
ARIMA 및 SARIMA: 예측의 기초
ARIMA(AutoRegressive Integrated Moving Average)는 계열별로 가장 많이 사용되는 통계 모델입니다. 일변량 시간 척도. ARIMA(p,d,q) 모델은 세 가지 구성요소를 결합합니다. AutoRegression(p lags del 과거 가치), 통합(계열을 고정시키기 위한 차이) 및 이동 평균 (잔차 오류의 q 지연). SARIMA는 계절 구성요소(P,D,Q)를 추가합니다.
# SARIMA per forecasting vendite settimanali - prodotto fresco
import pandas as pd
import numpy as np
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.stattools import adfuller
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_percentage_error
# Caricamento dati di esempio
# Formato: date index, colonna 'sales' con unita vendute
df = pd.read_csv('vendite_insalata.csv', index_col='date', parse_dates=True)
df = df.asfreq('D') # Frequenza giornaliera
df['sales'] = df['sales'].fillna(0)
# Test di stazionarieta (ADF Test)
result = adfuller(df['sales'].dropna())
print(f'ADF Statistic: {result[0]:.4f}')
print(f'p-value: {result[1]:.4f}')
print(f'La serie e {"stazionaria" if result[1] < 0.05 else "non stazionaria"}')
# Split train/test (80/20 con holdout degli ultimi 30 giorni)
train_size = len(df) - 30
train = df.iloc[:train_size]
test = df.iloc[train_size:]
# Modello SARIMA(1,1,1)(1,1,1)7 - stagionalita settimanale
# p=1, d=1, q=1: componenti ARIMA
# P=1, D=1, Q=1, s=7: componenti stagionali settimanali
model = SARIMAX(
train['sales'],
order=(1, 1, 1),
seasonal_order=(1, 1, 1, 7), # s=7 per stagionalita settimanale
enforce_stationarity=False,
enforce_invertibility=False
)
results = model.fit(disp=False)
print(results.summary())
# Previsione sullo stesso periodo del test set
forecast = results.forecast(steps=len(test))
forecast = np.maximum(forecast, 0) # Nessun valore negativo
# Metriche
mape = mean_absolute_percentage_error(test['sales'], forecast) * 100
rmse = np.sqrt(np.mean((test['sales'].values - forecast.values) ** 2))
mae = np.mean(np.abs(test['sales'].values - forecast.values))
print(f'\nMetriche SARIMA:')
print(f' MAPE: {mape:.2f}%')
print(f' RMSE: {rmse:.2f} unita')
print(f' MAE: {mae:.2f} unita')
# Visualizzazione
plt.figure(figsize=(12, 5))
plt.plot(train['sales'][-60:], label='Train (ultimi 60gg)')
plt.plot(test['sales'], label='Reale', color='green')
plt.plot(test.index, forecast, label='Previsione SARIMA', color='red', linestyle='--')
plt.title('SARIMA - Previsione Vendite Prodotto Fresco')
plt.legend()
plt.tight_layout()
plt.savefig('sarima_forecast.png', dpi=150)
plt.show()
Prophet: 외생변수를 이용한 해석 가능한 예측
예언자 (Meta/Facebook, 2017) 및 식품 예측을 위한 탁월한 선택 기본적으로 여러 계절(일별, 주별, 연간), 휴일을 관리하기 때문입니다. 사용자 정의 가능한 달력, 자동 변경 지점이 있는 비선형 추세 및 변수를 통한 효과 외인성(추가 회귀변수). 해석 가능성(추세로 자동 분해 + 계절성 + 휴일) 기술이 아닌 사용자에게도 높이 평가됩니다.
# Prophet per forecasting con variabili esogene (meteo, promozioni)
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics
import pandas as pd
import numpy as np
# Prophet richiede colonne 'ds' (datetime) e 'y' (target)
df_prophet = df.reset_index().rename(columns={'date': 'ds', 'sales': 'y'})
# Aggiunta variabili esogene (regressori)
# Esempio: temperatura media giornaliera e flag promozionale
df_prophet['temperature'] = meteo_df['temp_media'] # gradi Celsius
df_prophet['is_promotion'] = promo_df['active_promo'].astype(int)
# Calendario festivo italiano (personalizzato)
holidays_it = pd.DataFrame({
'holiday': [
'Natale', 'Capodanno', 'Pasqua', 'Ferragosto',
'Festa Repubblica', 'Ognissanti'
],
'ds': pd.to_datetime([
'2024-12-25', '2025-01-01', '2025-04-20', '2025-08-15',
'2025-06-02', '2025-11-01'
]),
'lower_window': [-3, -1, -3, -5, -1, -1], # Giorni prima
'upper_window': [3, 2, 2, 2, 1, 1] # Giorni dopo
})
# Configurazione modello
model = Prophet(
holidays=holidays_it,
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False,
seasonality_mode='multiplicative', # Meglio per dati con forte trend
changepoint_prior_scale=0.05, # Flessibilità del trend
seasonality_prior_scale=10.0 # Forza delle componenti stagionali
)
# Aggiunta regressori
model.add_regressor('temperature', standardize=True)
model.add_regressor('is_promotion', standardize=False)
# Training
train_prophet = df_prophet.iloc[:train_size]
model.fit(train_prophet)
# Previsione (con valori futuri dei regressori)
future = model.make_future_dataframe(periods=30)
future['temperature'] = future_meteo_df['temp_media']
future['is_promotion'] = future_promo_df['active_promo'].astype(int)
forecast_prophet = model.predict(future)
# Cross-validation interna (walk-forward)
df_cv = cross_validation(
model,
initial='365 days', # Periodo di training iniziale
period='30 days', # Frequenza di re-fitting
horizon='14 days' # Orizzonte di previsione
)
metrics_cv = performance_metrics(df_cv)
print(f'Prophet MAPE (CV): {metrics_cv["mape"].mean()*100:.2f}%')
# Visualizzazione decomposizione
fig = model.plot_components(forecast_prophet)
fig.savefig('prophet_components.png', dpi=150)
print('Componenti Prophet salvate: trend + stagionalita + holiday effects')
고전적 통계 모델을 사용해야 하는 경우
| 모델 | 강점 | 제한 사항 | 이상적인 사용 사례 |
|---|---|---|---|
| 아리마/사리마 | 해석 가능하고 빠르며 고정된 시리즈 | 하나의 계절성만 있고 외생변수는 없습니다. | 안정적인 제품, 적은 데이터 |
| 홀트-윈터스 | 트렌드 + 계절성, 단순성 관리 | 이상치, 고정 계절성을 처리하지 않습니다. | 선형 추세와 안정적인 계절성을 갖는 계열 |
| 예언자 | 다계절성, 휴일, 회귀자 | 매우 불규칙한 시리즈에는 적합하지 않음 | 축제/판촉 효과가 강한 제품 |
| LSTM/GRU | 복잡하고 다변수이며 장거리 패턴 | 풍부한 데이터가 필요하다, 블랙박스 | 대량 SKU, 많은 외생 변수 |
| TFT | 해석 가능 + DL, 다중 수평, 주의 | 계산량이 많고 GPU가 필요함 | 여러 SKU에 대한 중앙 집중식 예측 |
시계열을 위한 딥 러닝: LSTM, GRU, TFT 및 N-BEATS
딥 러닝 모델은 2018~2020년부터 수요 예측에 혁명을 일으켰습니다. 이들의 우월성은 무엇보다도 다음과 같은 상황에서 나타납니다. 상관관계가 있는 많은 외생 변수, 패턴 복잡한 비선형, 장거리 시간 종속성 및 대량의 다중 SKU 데이터 이를 통해 유사한 제품 간의 전이 학습을 활용할 수 있습니다.
LSTM 및 GRU: 시퀀스의 선택적 메모리
Le 장단기 기억(LSTM) 그리고 GRU(게이트 순환 단위) 이는 시간적 시퀀스에서 장거리 종속성을 포착하도록 설계된 순환 네트워크입니다. LSTM은 세 가지 게이트(입력, 망각, 출력)를 사용하여 어떤 정보를 유지할지 또는 삭제할지 결정합니다. 세포 기억에서. GRU는 두 개의 게이트(재설정, 업데이트)로 단순화하여 유사한 성능을 얻습니다. 더 적은 수의 매개변수를 사용합니다.
# LSTM multi-variate per demand forecasting alimentare
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_percentage_error
# ---- Preparazione dati ----
class FoodDemandDataset(torch.utils.data.Dataset):
def __init__(self, data, seq_len=14, pred_len=7):
self.seq_len = seq_len
self.pred_len = pred_len
self.data = torch.FloatTensor(data)
def __len__(self):
return len(self.data) - self.seq_len - self.pred_len + 1
def __getitem__(self, idx):
x = self.data[idx: idx + self.seq_len]
y = self.data[idx + self.seq_len: idx + self.seq_len + self.pred_len, 0]
return x, y # x: (seq_len, features), y: (pred_len,)
# ---- Architettura LSTM ----
class LSTMForecaster(nn.Module):
def __init__(self, input_size, hidden_size=128, num_layers=2,
pred_len=7, dropout=0.2):
super(LSTMForecaster, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.pred_len = pred_len
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout
)
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(hidden_size, pred_len)
def forward(self, x):
# x shape: (batch, seq_len, input_size)
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
out, _ = self.lstm(x, (h0, c0))
out = self.dropout(out[:, -1, :]) # Ultimo time step
out = self.fc(out) # (batch, pred_len)
return out
# ---- Feature preparation ----
def prepare_features(df):
"""Crea feature matrix multi-variate per LSTM"""
features = pd.DataFrame()
features['sales'] = df['sales']
# Lag features: vendite passate come input esplicito
for lag in [1, 2, 3, 7, 14]:
features[f'sales_lag_{lag}'] = df['sales'].shift(lag)
# Rolling statistics
features['sales_rolling_mean_7d'] = df['sales'].rolling(7).mean()
features['sales_rolling_std_7d'] = df['sales'].rolling(7).std()
features['sales_rolling_mean_14d'] = df['sales'].rolling(14).mean()
# Feature temporali cicliche (encoding sinusoidale)
features['day_sin'] = np.sin(2 * np.pi * df.index.dayofweek / 7)
features['day_cos'] = np.cos(2 * np.pi * df.index.dayofweek / 7)
features['month_sin'] = np.sin(2 * np.pi * df.index.month / 12)
features['month_cos'] = np.cos(2 * np.pi * df.index.month / 12)
# Variabili esogene (meteo, promozioni)
features['temperature'] = df['temperature']
features['is_weekend'] = (df.index.dayofweek >= 5).astype(int)
features['is_holiday'] = df['is_holiday'].astype(int)
features['is_promotion'] = df['is_promotion'].astype(int)
features = features.dropna()
return features
# ---- Training loop ----
def train_lstm_model(train_data, val_data, config):
model = LSTMForecaster(
input_size=config['input_size'],
hidden_size=config['hidden_size'],
num_layers=config['num_layers'],
pred_len=config['pred_len']
)
optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, patience=5, factor=0.5
)
criterion = nn.HuberLoss(delta=1.0) # Robusto agli outlier
train_loader = torch.utils.data.DataLoader(
FoodDemandDataset(train_data, config['seq_len'], config['pred_len']),
batch_size=config['batch_size'],
shuffle=True
)
best_val_loss = float('inf')
patience_counter = 0
for epoch in range(config['epochs']):
model.train()
train_loss = 0
for x_batch, y_batch in train_loader:
optimizer.zero_grad()
y_pred = model(x_batch)
loss = criterion(y_pred, y_batch)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
train_loss += loss.item()
# Validazione
model.eval()
with torch.no_grad():
val_dataset = FoodDemandDataset(val_data, config['seq_len'], config['pred_len'])
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32)
val_loss = sum(criterion(model(x), y).item() for x, y in val_loader)
scheduler.step(val_loss)
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'best_lstm_model.pt')
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= config['early_stopping_patience']:
print(f'Early stopping all\'epoca {epoch}')
break
if epoch % 10 == 0:
print(f'Epoch {epoch}: Train Loss={train_loss:.4f}, Val Loss={val_loss:.4f}')
# Carica modello migliore
model.load_state_dict(torch.load('best_lstm_model.pt'))
return model
# Configurazione
config = {
'input_size': 15, # Numero di feature (sales + lag + temporal + exog)
'hidden_size': 128,
'num_layers': 2,
'pred_len': 7, # Previsione a 7 giorni
'seq_len': 14, # Lookback di 14 giorni
'batch_size': 32,
'lr': 0.001,
'epochs': 100,
'early_stopping_patience': 15
}
시간 융합 변환기: 해석 가능성 + 힘
Il 시간 융합 변압기(TFT) Google DeepMind(2021) 및 현재 기업 수요 예측을 위한 최첨단 기술로 간주됩니다. 그 아키텍처는 결합 두 정적 변수를 모두 처리하는 잔류 게이트(GRN)가 있는 다중 헤드 주의 메커니즘 (제품 유형, 제품 카테고리) 및 사전에 알려진 시간적 변수(캘린더, 계획된 프로모션) 및 관찰(과거 판매, 날씨).
M5 데이터 세트(10개 Walmart 매장의 30,490개 시계열), TFT 및 모델에 대한 2024년 벤치마크 Transformer 기반 MASE는 개선 사항을 입증했습니다. 26-29% 그리고 감소 WQL의 34% 계절적 순진법과 비교. 식품 소매업체 피크닉 더치 온라인은 TFT를 수요 예측, 출판의 주요 모델로 채택했습니다. 자세한 생산 결과.
# Temporal Fusion Transformer con PyTorch Forecasting
# pip install pytorch-forecasting pytorch-lightning
from pytorch_forecasting import TemporalFusionTransformer, TimeSeriesDataSet
from pytorch_forecasting.data import GroupNormalizer
from pytorch_forecasting.metrics import QuantileLoss
import pytorch_lightning as pl
import pandas as pd
import torch
# ---- Preparazione dati nel formato TFT ----
def prepare_tft_dataset(df):
"""
df deve avere: date, store_id, product_id, sales,
temperature, is_holiday, is_promotion, price, category
"""
df = df.copy()
df['time_idx'] = (df['date'] - df['date'].min()).dt.days
df['month'] = df['date'].dt.month.astype(str)
df['day_of_week'] = df['date'].dt.dayofweek.astype(str)
max_prediction_length = 7 # Previsione 7 giorni
max_encoder_length = 28 # Lookback 28 giorni
training_cutoff = df['time_idx'].max() - max_prediction_length
training_dataset = TimeSeriesDataSet(
df[df.time_idx <= training_cutoff],
time_idx='time_idx',
target='sales',
group_ids=['store_id', 'product_id'],
# Variabili statiche (non cambiano nel tempo per ogni gruppo)
static_categoricals=['store_id', 'product_id', 'category'],
# Variabili temporali note in anticipo
time_varying_known_categoricals=['month', 'day_of_week', 'is_holiday'],
time_varying_known_reals=['time_idx', 'is_promotion', 'price'],
# Variabili osservate (disponibili solo per il passato)
time_varying_unknown_reals=['sales', 'temperature'],
min_encoder_length=max_encoder_length // 2,
max_encoder_length=max_encoder_length,
min_prediction_length=1,
max_prediction_length=max_prediction_length,
target_normalizer=GroupNormalizer(
groups=['store_id', 'product_id'],
transformation='softplus' # Mantiene valori positivi
),
add_relative_time_idx=True,
add_target_scales=True,
add_encoder_length=True,
)
return training_dataset, training_cutoff
# ---- Modello TFT ----
def build_tft_model(training_dataset):
tft = TemporalFusionTransformer.from_dataset(
training_dataset,
learning_rate=0.03,
hidden_size=64, # Dimensione hidden layer
attention_head_size=4, # Numero attention heads
dropout=0.1,
hidden_continuous_size=16, # Feature continue
loss=QuantileLoss(), # Previsione probabilistica
log_interval=10,
reduce_on_plateau_patience=4,
)
print(f'Numero parametri: {tft.size()/1e3:.1f}k')
return tft
# ---- Training con PyTorch Lightning ----
def train_tft(training_dataset, validation_dataset):
train_loader = training_dataset.to_dataloader(
train=True, batch_size=128, num_workers=4
)
val_loader = validation_dataset.to_dataloader(
train=False, batch_size=128, num_workers=4
)
trainer = pl.Trainer(
max_epochs=30,
accelerator='gpu' if torch.cuda.is_available() else 'cpu',
gradient_clip_val=0.1,
callbacks=[
pl.callbacks.EarlyStopping(
monitor='val_loss', patience=5, mode='min'
),
pl.callbacks.ModelCheckpoint(
monitor='val_loss', save_top_k=1
)
]
)
tft_model = build_tft_model(training_dataset)
trainer.fit(tft_model, train_loader, val_loader)
# Interpretabilita: variable importance
best_model = TemporalFusionTransformer.load_from_checkpoint(
trainer.checkpoint_callback.best_model_path
)
raw_predictions, x = best_model.predict(
val_loader, mode='raw', return_x=True
)
# Feature importance - chi contribuisce di più alle previsioni
interpretation = best_model.interpret_output(
raw_predictions, reduction='sum'
)
print('Feature importance:')
for key, vals in interpretation.items():
print(f' {key}: {vals}')
return best_model
N-BEATS: 반복 아키텍처가 없는 신경 기반 확장
N-비트 (Element AI, 2020) 그리고 근본적으로 다른 접근 방식: 레이어만 사용 컨볼루션이나 어텐션 메커니즘 없이 스택과 블록으로 완전히 연결되어 구성됩니다. 각 블록은 신호를 기본 확장(추세, 계절성) 및 잔차로 분해합니다. M4에서 경쟁 데이터 세트는 모든 통계 방법을 11% 능가했으며 하이브리드 신경 통계 방법도 능가했습니다. 3% 차이로 승자. 계절성이 강한 식품에 대한 해석이 가능한 N-BEATS 버전 비즈니스 분석가가 높이 평가하는 추세/계절별 분해를 생성합니다.
식품 수요 예측을 위한 특성 엔지니어링
특성 엔지니어링의 품질은 모델 성능에 있어 가장 중요한 요소인 경우가 많습니다. 뛰어난 기능 엔지니어링을 갖춘 LSTM은 평범한 기능 엔지니어링을 갖춘 TFT보다 성능이 뛰어납니다. 보자 음식의 맥락에 맞게 구축될 기능의 주요 범주입니다.
시간 변수에 대한 순환 인코딩
일반적인 실수는 요일을 정수 값(0-6)으로 처리하는 것입니다. 문제는 모델은 6일(일요일)이 0일(월요일)과 "가까움"을 이해하지 못합니다. 인코딩 사인과 코사인을 사용한 순환은 이 문제를 해결합니다.
# Feature engineering completo per food demand forecasting
import pandas as pd
import numpy as np
from typing import List, Dict
def create_temporal_features(df: pd.DataFrame) -> pd.DataFrame:
"""Crea feature temporali cicliche e categoriche"""
df = df.copy()
idx = df.index
# Encoding ciclico - preserva la ciclicita (es. lunedi vicino a domenica)
df['hour_sin'] = np.sin(2 * np.pi * idx.hour / 24)
df['hour_cos'] = np.cos(2 * np.pi * idx.hour / 24)
df['dow_sin'] = np.sin(2 * np.pi * idx.dayofweek / 7)
df['dow_cos'] = np.cos(2 * np.pi * idx.dayofweek / 7)
df['month_sin'] = np.sin(2 * np.pi * idx.month / 12)
df['month_cos'] = np.cos(2 * np.pi * idx.month / 12)
df['doy_sin'] = np.sin(2 * np.pi * idx.dayofyear / 365.25)
df['doy_cos'] = np.cos(2 * np.pi * idx.dayofyear / 365.25)
# Feature binarie utili per GDO italiana
df['is_weekend'] = (idx.dayofweek >= 5).astype(int)
df['is_monday'] = (idx.dayofweek == 0).astype(int)
df['is_friday'] = (idx.dayofweek == 4).astype(int)
# Posizione nel mese (utile per effetti stipendio)
df['day_of_month'] = idx.day
df['is_month_start'] = (idx.day <= 5).astype(int)
df['is_month_end'] = (idx.day >= 25).astype(int)
# Settimana dell'anno
df['week_of_year'] = idx.isocalendar().week.astype(int)
df['quarter'] = idx.quarter
return df
def create_lag_features(df: pd.DataFrame, target_col: str,
lags: List[int] = None) -> pd.DataFrame:
"""Crea lag features del target"""
if lags is None:
lags = [1, 2, 3, 7, 14, 21, 28]
df = df.copy()
for lag in lags:
df[f'{target_col}_lag_{lag}d'] = df[target_col].shift(lag)
return df
def create_rolling_features(df: pd.DataFrame, target_col: str,
windows: List[int] = None) -> pd.DataFrame:
"""Crea rolling statistics"""
if windows is None:
windows = [3, 7, 14, 28]
df = df.copy()
for window in windows:
df[f'{target_col}_roll_mean_{window}d'] = (
df[target_col].shift(1).rolling(window).mean()
)
df[f'{target_col}_roll_std_{window}d'] = (
df[target_col].shift(1).rolling(window).std()
)
df[f'{target_col}_roll_min_{window}d'] = (
df[target_col].shift(1).rolling(window).min()
)
df[f'{target_col}_roll_max_{window}d'] = (
df[target_col].shift(1).rolling(window).max()
)
# Exponential moving average
df[f'{target_col}_ema_{window}d'] = (
df[target_col].shift(1).ewm(span=window).mean()
)
return df
def create_weather_features(df: pd.DataFrame,
meteo_df: pd.DataFrame) -> pd.DataFrame:
"""
Integra variabili meteorologiche
meteo_df: DataFrame con colonne [date, temp_max, temp_min, precipitazione_mm,
umidita, radiazione_solare, vento_kmh]
"""
df = df.merge(meteo_df, left_index=True, right_on='date', how='left')
# Feature derivate dal meteo
df['temp_delta'] = df['temp_max'] - df['temp_min']
df['is_hot'] = (df['temp_max'] > 28).astype(int) # Ondata di caldo
df['is_cold'] = (df['temp_min'] < 5).astype(int) # Freddo invernale
df['is_rain'] = (df['precipitazione_mm'] > 1).astype(int)
# Interazioni meteo x prodotto (es. gelati vanno bene con caldo)
# Necessità di parametrizzazione per categoria prodotto
df['heat_wave_consecutive'] = (
df['is_hot'].rolling(3).sum() == 3
).astype(int)
return df
def create_promotion_features(df: pd.DataFrame,
promo_df: pd.DataFrame) -> pd.DataFrame:
"""
Crea feature da calendario promozionale
promo_df: colonne [date, sku_id, discount_pct, is_volantino, is_digital_promo]
"""
df = df.merge(promo_df, left_index=True, right_on='date', how='left')
df['discount_pct'] = df['discount_pct'].fillna(0)
df['is_promotion'] = (df['discount_pct'] > 0).astype(int)
df['is_deep_discount'] = (df['discount_pct'] > 0.3).astype(int)
# Effetto anticipazione promozione (pre-promo dip)
df['promo_lag_1'] = df['is_promotion'].shift(1).fillna(0)
df['promo_lead_1'] = df['is_promotion'].shift(-1).fillna(0) # Solo in training
# Effetto post-promozione (inventory loading)
df['days_since_last_promo'] = (
df['is_promotion']
.pipe(lambda s: s.where(s.eq(1)).ffill().index - s.index)
.dt.days
.clip(upper=30)
)
return df
def create_price_features(df: pd.DataFrame) -> pd.DataFrame:
"""Feature relative al prezzo e alla sua variazione"""
df = df.copy()
df['price_lag_1'] = df['price'].shift(1)
df['price_change_pct'] = df['price'].pct_change()
df['price_vs_avg_30d'] = df['price'] / df['price'].rolling(30).mean() - 1
return df
def full_feature_engineering_pipeline(df: pd.DataFrame,
meteo_df: pd.DataFrame,
promo_df: pd.DataFrame) -> pd.DataFrame:
"""Pipeline completa di feature engineering"""
df = create_temporal_features(df)
df = create_lag_features(df, 'sales')
df = create_rolling_features(df, 'sales')
df = create_weather_features(df, meteo_df)
df = create_promotion_features(df, promo_df)
df = create_price_features(df)
df = df.dropna()
print(f'Feature totali create: {df.shape[1]}')
print(f'Sample size dopo feature engineering: {df.shape[0]}')
return df
완전한 ML 파이프라인: 원시 데이터에서 프로덕션 모델까지
전문적인 수요 예측 파이프라인은 모델 교육에만 국한되지 않습니다. 필요하다 데이터 수집, 전처리, 순방향 검증을 관리하는 엔드투엔드 아키텍처, 모델 선택, 배포 및 지속적인 모니터링. 다음은 프로덕션 준비 파이프라인의 구조입니다.
# Pipeline ML completa per food demand forecasting
import mlflow
import mlflow.pytorch
from dataclasses import dataclass, field
from typing import Optional, Tuple
import pandas as pd
import numpy as np
import json
import logging
from pathlib import Path
from datetime import datetime, timedelta
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ---- Configurazione Pipeline ----
@dataclass
class ForecastConfig:
# Data
store_ids: list = field(default_factory=list)
product_categories: list = field(default_factory=list)
training_lookback_days: int = 365
forecast_horizon_days: int = 7
# Feature engineering
lag_days: list = field(default_factory=lambda: [1, 2, 3, 7, 14, 28])
rolling_windows: list = field(default_factory=lambda: [7, 14, 28])
# Model selection
model_type: str = 'tft' # 'sarima', 'prophet', 'lstm', 'tft', 'xgboost'
use_ensemble: bool = False
# Validation
n_cv_splits: int = 4
val_horizon_days: int = 14
# MLflow
experiment_name: str = 'food_demand_forecasting'
run_name: Optional[str] = None
# Deployment
min_mape_threshold: float = 0.20 # Non deployare se MAPE > 20%
model_registry_name: str = 'food_demand_model'
# ---- Step 1: Data Collection e Validation ----
class DataCollector:
def __init__(self, config: ForecastConfig):
self.config = config
def collect(self, start_date: str, end_date: str) -> pd.DataFrame:
"""Raccoglie dati da data warehouse (es. Snowflake, BigQuery)"""
query = f"""
SELECT
date,
store_id,
product_id,
category,
sales_units,
price,
stock_level,
is_holiday,
is_promotion,
discount_pct
FROM gold.sales_daily
WHERE date BETWEEN '{start_date}' AND '{end_date}'
AND store_id IN ({','.join(map(str, self.config.store_ids))})
ORDER BY date, store_id, product_id
"""
# Qui si userebbe il connector del DWH (es. snowflake-connector-python)
# df = snowflake_connector.execute(query)
logger.info(f'Dati raccolti: {start_date} - {end_date}')
return df # Placeholder
def validate(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, dict]:
"""Validazione qualità dati"""
issues = {}
# Check valori negativi
neg_sales = (df['sales_units'] < 0).sum()
if neg_sales > 0:
issues['negative_sales'] = neg_sales
df.loc[df['sales_units'] < 0, 'sales_units'] = 0
# Check missing dates per gruppo
for group_id, group_df in df.groupby(['store_id', 'product_id']):
date_range = pd.date_range(group_df['date'].min(), group_df['date'].max())
missing = len(date_range) - len(group_df)
if missing > 0:
issues[f'missing_dates_{group_id}'] = missing
# Check outliers (IQR method)
Q1 = df['sales_units'].quantile(0.25)
Q3 = df['sales_units'].quantile(0.75)
IQR = Q3 - Q1
outliers = ((df['sales_units'] < Q1 - 3*IQR) | (df['sales_units'] > Q3 + 3*IQR)).sum()
issues['outliers'] = int(outliers)
logger.info(f'Validazione dati: {issues}')
return df, issues
# ---- Step 2: Walk-Forward Validation ----
class WalkForwardValidator:
"""
Validazione temporale corretta per time series.
NON usare cross-validation classico che crea data leakage.
"""
def __init__(self, config: ForecastConfig):
self.config = config
def validate(self, df: pd.DataFrame, model_class) -> dict:
results = []
total_days = len(df['date'].unique())
split_size = total_days // (self.config.n_cv_splits + 1)
for fold in range(self.config.n_cv_splits):
train_end_idx = split_size * (fold + 1)
val_start_idx = train_end_idx
val_end_idx = val_start_idx + self.config.val_horizon_days
train_dates = sorted(df['date'].unique())[:train_end_idx]
val_dates = sorted(df['date'].unique())[val_start_idx:val_end_idx]
train_df = df[df['date'].isin(train_dates)]
val_df = df[df['date'].isin(val_dates)]
if len(val_df) == 0:
continue
# Training sul fold
model = model_class(self.config)
model.fit(train_df)
predictions = model.predict(val_df)
# Metriche
actuals = val_df['sales_units'].values
preds = predictions['forecast'].values
mape = np.mean(np.abs((actuals - preds) / (actuals + 1e-8))) * 100
rmse = np.sqrt(np.mean((actuals - preds) ** 2))
mae = np.mean(np.abs(actuals - preds))
results.append({
'fold': fold,
'mape': mape,
'rmse': rmse,
'mae': mae,
'train_size': len(train_df),
'val_size': len(val_df)
})
logger.info(f'Fold {fold}: MAPE={mape:.2f}%, RMSE={rmse:.2f}')
# Aggrega risultati
results_df = pd.DataFrame(results)
return {
'mean_mape': results_df['mape'].mean(),
'std_mape': results_df['mape'].std(),
'mean_rmse': results_df['rmse'].mean(),
'mean_mae': results_df['mae'].mean(),
'folds': results
}
# ---- Step 3: MLflow Tracking e Model Registry ----
class ExperimentTracker:
def __init__(self, config: ForecastConfig):
self.config = config
mlflow.set_experiment(config.experiment_name)
def run_experiment(self, df: pd.DataFrame, model_class) -> str:
run_name = self.config.run_name or f'{self.config.model_type}_{datetime.now():%Y%m%d_%H%M}'
with mlflow.start_run(run_name=run_name) as run:
# Log parametri
mlflow.log_params({
'model_type': self.config.model_type,
'forecast_horizon': self.config.forecast_horizon_days,
'training_lookback': self.config.training_lookback_days,
'n_cv_splits': self.config.n_cv_splits
})
# Walk-forward validation
validator = WalkForwardValidator(self.config)
metrics = validator.validate(df, model_class)
# Log metriche
mlflow.log_metrics({
'cv_mean_mape': metrics['mean_mape'],
'cv_std_mape': metrics['std_mape'],
'cv_mean_rmse': metrics['mean_rmse'],
'cv_mean_mae': metrics['mean_mae']
})
# Training finale su tutti i dati
final_model = model_class(self.config)
final_model.fit(df)
# Log artefatti
mlflow.log_dict(metrics, 'validation_metrics.json')
# Registra modello se MAPE accettabile
if metrics['mean_mape'] < self.config.min_mape_threshold * 100:
mlflow.pytorch.log_model(
final_model,
artifact_path='model',
registered_model_name=self.config.model_registry_name
)
logger.info(f'Modello registrato: MAPE={metrics["mean_mape"]:.2f}%')
else:
logger.warning(
f'Modello NON registrato: MAPE {metrics["mean_mape"]:.2f}% '
f'> soglia {self.config.min_mape_threshold * 100}%'
)
return run.info.run_id
벤치마크: 실제 데이터 세트의 모델 간 비교
다음은 일반적인 데이터 세트에 대한 다양한 접근 방식의 성능을 체계적으로 비교한 것입니다. GDO: 신선 제품(유제품, 과일, 야채, 육류) 150 SKU, 2년간의 일일 데이터 외생 변수(날씨, 프로모션, 휴일)가 있는 5개 판매 지점에서.
비교 벤치마크 모델 - 이탈리아 GDO 데이터 세트(150 SKU, 2년)
| 모델 | 마페(%) | RMSE(단위) | MAE (유나이티드) | 훈련시간 | 해석 가능성 | 메모 |
|---|---|---|---|---|---|---|
| 이동 평균(기준선) | 31.4 | 18.7 | 12.3 | < 1분 | 높은 | 계절성 없음, 회귀자 없음 |
| 사리마 | 22.8 | 14.2 | 9.8 | ~15분 | 높은 | 단 하나의 계절적 지연 |
| 홀트-윈터스 | 20.1 | 13.1 | 8.9 | ~5분 | 높은 | 정규 시리즈에 적합 |
| 예언자 | 14.6 | 10.4 | 7.2 | ~30분 | 높은 | 휴일/회귀자에 탁월함 |
| XGBoost + 기능 영어 | 12.9 | 9.8 | 6.7 | ~10분 | 평균 | 중요한 기능 엔지니어링 |
| LSTM(단변량) | 16.4 | 11.2 | 7.8 | ~45분 GPU | 낮은 | 엑소그 없는 선지자보다 더 나쁘다 |
| LSTM(다변수) | 10.8 | 8.4 | 5.9 | ~2시간 GPU | 낮은 | exog를 통한 강력한 개선 |
| N-비트 | 11.2 | 8.7 | 6.1 | ~1시간 GPU | 평균 | 해석 가능한 분해 |
| TFT(시간 융합 변압기) | 8.3 | 6.9 | 4.8 | ~4시간 GPU | 높은 | 전반적으로 최고, 해석 가능 |
| 앙상블 TFT + XGBoost | 7.6 | 6.4 | 4.4 | ~5시간 GPU | 평균 | 단일 TFT에서 +8% |
데이터 세트: 신선 제품 SKU 150개, 매장 5개, 2년 일일 데이터. 4개 접기에 대한 Walk-forward 검증(14일 지평선). GPU: 엔비디아 A100.
모델 선택에 대한 고려 사항
TFT는 가장 강력한 옵션이지만 각 SKU마다 GPU와 풍부한 데이터가 필요합니다. 소매업체의 경우 수천 개의 SKU가 있지만 단일 제품당 수량은 적으므로 하이브리드 접근 방식이 가장 효과적입니다. 롱테일 SKU(낮은 볼륨, 열악한 데이터)용 XGBoost 또는 Prophet, SKU용 TFT 가장 많은 수익과 폐기물을 발생시키는 대규모 산업.
폐기물 감소: 재고 및 동적 가격과 예측 통합
정확한 예측 모델만으로는 낭비를 줄일 수 없습니다. 필요합니다. 행위 예측에 재고 관리 및 동적 가격 책정 시스템을 통해. 루프 의사결정 및: 미래 수요 예측 → 공급업체에 대한 최적 주문 → 모니터링 재고 수준 → 유통 기한이 임박한 제품에 대한 동적 인하 → 자동 기부 판매되지 않은 잉여금에 대해.
동적 안전 재고를 통한 재고 최적화
# Inventory optimization basato su forecast probabilistico
import numpy as np
from dataclasses import dataclass
from typing import Optional
@dataclass
class InventoryParams:
product_id: str
shelf_life_days: int # Vita utile del prodotto
lead_time_days: int # Giorni da ordine a consegna
service_level: float = 0.95 # Fill rate target (95%)
holding_cost_per_unit: float = 0.05 # Costo mantenimento/giorno
shortage_cost_per_unit: float = 2.50 # Costo stockout per unita
waste_cost_per_unit: float = 1.20 # Costo di buttare una unita
def calculate_optimal_order(
demand_forecast: np.ndarray, # Previsione media per i prossimi N giorni
demand_std: np.ndarray, # Deviazione standard previsione
current_stock: int,
params: InventoryParams
) -> dict:
"""
Calcola la quantità ottimale da ordinare considerando:
- Shelf life del prodotto (prodotti freschi deperibili)
- Service level target
- Trade-off costo stockout vs costo spreco
"""
from scipy import stats
# Z-score per il service level target
z_score = stats.norm.ppf(params.service_level)
# Domanda attesa nel lead time
lt_demand = demand_forecast[:params.lead_time_days].sum()
lt_std = np.sqrt((demand_std[:params.lead_time_days] ** 2).sum())
# Safety stock = Z * sigma * sqrt(lead_time)
safety_stock = z_score * lt_std
# Reorder point
reorder_point = lt_demand + safety_stock
# Domanda fino a shelf life (non ordinare più di quanto venderemo)
max_sellable_demand = demand_forecast[:params.shelf_life_days].sum()
max_sellable_std = np.sqrt((demand_std[:params.shelf_life_days] ** 2).sum())
# Order quantity
unconstrained_order = max(0, reorder_point - current_stock)
# Vincolo shelf life: non ordinare più di quanto venderemo nella vita utile
# Aggiusta per il rischio di spreco
waste_probability = 1 - stats.norm.cdf(
current_stock + unconstrained_order,
loc=max_sellable_demand,
scale=max_sellable_std
)
# Costo atteso
expected_waste_cost = waste_probability * unconstrained_order * params.waste_cost_per_unit
expected_shortage_cost = (1 - params.service_level) * lt_demand * params.shortage_cost_per_unit
# Ottimizzazione: riduce ordine se costo spreco > beneficio margine
if expected_waste_cost > expected_shortage_cost * 0.5:
adjustment_factor = 1 - (expected_waste_cost / (expected_waste_cost + expected_shortage_cost))
optimal_order = int(unconstrained_order * adjustment_factor)
else:
optimal_order = int(unconstrained_order)
return {
'optimal_order_qty': max(0, optimal_order),
'reorder_point': reorder_point,
'safety_stock': safety_stock,
'expected_demand_7d': demand_forecast[:7].sum(),
'waste_risk_pct': waste_probability * 100,
'shortage_risk_pct': (1 - params.service_level) * 100,
'decision_reason': 'waste_adjusted' if expected_waste_cost > expected_shortage_cost * 0.5
else 'standard_reorder'
}
# Esempio utilizzo
params = InventoryParams(
product_id='insalata_vaschetta_150g',
shelf_life_days=5,
lead_time_days=1,
service_level=0.95,
waste_cost_per_unit=0.85,
shortage_cost_per_unit=3.20
)
# Simulazione con forecast TFT (media e std da quantile forecast)
forecast_mean = np.array([45, 52, 68, 71, 89, 95, 48]) # Lunedi-Domenica
forecast_std = np.array([8, 9, 12, 13, 15, 16, 9]) # Deviazione standard
order = calculate_optimal_order(
demand_forecast=forecast_mean,
demand_std=forecast_std,
current_stock=120,
params=params
)
print('Raccomandazione ordine:')
for key, val in order.items():
print(f' {key}: {val}')
폐기물 감소를 위한 동적 가격 책정: 마크다운 최적화
만료 날짜를 기준으로 한 동적 가격 책정은 비용을 절감하는 가장 효과적인 도구 중 하나입니다. 신선한 제품의 낭비. 목표는 다음을 찾는 것입니다. 최적의 할인 가격 이는 만료 위험이 있는 제품에서 회수되는 수익을 극대화하여 판매를 가속화합니다. 정가 제품에 대한 판매를 잠식합니다.
낭비 없는이스라엘 스타트업인 는 전자 선반 기반 시스템을 개발했습니다. 만료 날짜, 수준에 따라 가격을 자동으로 업데이트하는 라벨(ESL) 재고와 고객 행동. 이를 채택한 소매업체는 폐기물을 줄였습니다. 40%, 상황에 따라 이러한 제품에 대한 수익을 늘리는 동시에 이전 (팔리지 않은 제품은 폐기). 또한 가기엔 너무 좋아 그는 2024년에는 SKU 수준에서 데이터를 처리하여 할인 수준을 최적화하는 AI 플랫폼, 수동 마감일 확인을 93-99% 줄입니다.
# Algoritmo markdown optimization per prodotti prossimi a scadenza
import numpy as np
from scipy.optimize import minimize_scalar
from dataclasses import dataclass
from typing import Tuple
@dataclass
class MarkdownConfig:
min_margin_pct: float = 0.10 # Margine minimo (non scendere sotto il 10%)
max_discount_pct: float = 0.70 # Sconto massimo (70%)
base_price: float = 1.99 # Prezzo normale
cost_per_unit: float = 0.95 # Costo di acquisto
waste_value: float = 0.00 # Valore residuo se buttato (es. recupero)
def price_elasticity_model(price: float, base_price: float,
base_demand: float,
elasticity: float = -1.8) -> float:
"""
Modello di elasticita del prezzo per prodotti freschi GDO.
Elasticita tipica prodotti freschi: -1.5 a -2.5
(sconto del 10% aumenta la domanda del 15-25%)
"""
price_ratio = price / base_price
return base_demand * (price_ratio ** elasticity)
def markdown_revenue(discount_pct: float,
config: MarkdownConfig,
current_stock: int,
base_demand: float,
elasticity: float = -1.8) -> float:
"""
Calcola il ricavo atteso da un determinato livello di sconto.
Massimizzare questa funzione = trovare lo sconto ottimale.
"""
discounted_price = config.base_price * (1 - discount_pct)
# Vincolo margine minimo
min_price = config.cost_per_unit * (1 + config.min_margin_pct)
if discounted_price < min_price:
return -1e10 # Penalita per prezzi sotto il costo
# Domanda attesa con il nuovo prezzo
expected_demand = price_elasticity_model(
discounted_price, config.base_price, base_demand, elasticity
)
# Unita vendute = min(domanda attesa, stock disponibile)
units_sold = min(expected_demand, current_stock)
units_wasted = max(0, current_stock - units_sold)
# Revenue = vendite + (eventuali recupero valore sprecato)
revenue = (units_sold * discounted_price) + (units_wasted * config.waste_value)
# Costo opportunità: confronto con vendita a prezzo pieno (se avessimo aspettato)
# ma qui massimizziamo il recupero dato che il prodotto scadera
return revenue
def find_optimal_markdown(config: MarkdownConfig,
current_stock: int,
days_to_expiry: int,
base_daily_demand: float,
demand_elasticity: float = -1.8) -> dict:
"""
Trova lo sconto ottimale considerando i giorni rimanenti a scadenza.
Logica: più vicini alla scadenza, maggiore lo sconto necessario.
"""
# Urgenza basata sui giorni a scadenza
if days_to_expiry >= 5:
# Nessun markdown necessario
return {'discount_pct': 0.0, 'recommended_action': 'no_action',
'expected_revenue': config.base_price * min(base_daily_demand * days_to_expiry, current_stock)}
# Moltiplicatore urgenza: più vicini alla scadenza, maggiore il markup di urgenza
urgency_factor = 1.0 + (5 - days_to_expiry) * 0.15 # +15% urgenza per giorno
# Domanda effettiva considerando urgenza (es. promozioni di fine giornata)
effective_demand = base_daily_demand * days_to_expiry * urgency_factor
# Ottimizzazione: trova lo sconto che massimizza il ricavo totale
result = minimize_scalar(
lambda d: -markdown_revenue(d, config, current_stock,
effective_demand, demand_elasticity),
bounds=(0.0, config.max_discount_pct),
method='bounded'
)
optimal_discount = result.x
optimal_revenue = -result.fun
waste_at_no_discount = max(0, current_stock - effective_demand)
waste_with_markdown = max(0, current_stock - price_elasticity_model(
config.base_price * (1 - optimal_discount),
config.base_price, effective_demand, demand_elasticity
))
waste_reduction_units = waste_at_no_discount - waste_with_markdown
revenue_recovered = optimal_revenue - (config.waste_value * current_stock)
action = (
'urgent_markdown' if days_to_expiry <= 1
else 'markdown' if optimal_discount > 0.15
else 'slight_discount'
)
return {
'discount_pct': round(optimal_discount * 100, 1),
'discounted_price': round(config.base_price * (1 - optimal_discount), 2),
'expected_revenue': round(optimal_revenue, 2),
'waste_reduction_units': round(waste_reduction_units, 0),
'revenue_recovered_vs_waste': round(revenue_recovered, 2),
'days_to_expiry': days_to_expiry,
'recommended_action': action
}
# Esempio: insalata con 2 giorni alla scadenza
config = MarkdownConfig(
base_price=1.99,
cost_per_unit=0.80,
min_margin_pct=0.05, # Riduco margine minimo vicino a scadenza
max_discount_pct=0.60
)
recommendation = find_optimal_markdown(
config=config,
current_stock=45,
days_to_expiry=2,
base_daily_demand=12,
demand_elasticity=-2.0
)
print('Raccomandazione markdown:')
for k, v in recommendation.items():
print(f' {k}: {v}')
# Output atteso:
# discount_pct: 35.0
# discounted_price: 1.29
# expected_revenue: 38.70
# waste_reduction_units: 28
# recommended_action: markdown
사례 연구: 매장이 200개 이상인 이탈리아의 대규모 소매업
ML 수요예측 구현의 실제(익명화된) 사례를 하나로 분석해 보겠습니다. 220개의 판매 지점을 보유하고 있으며 그 중 18,000개의 활성 SKU를 보유한 이탈리아의 대규모 소매 체인입니다. 3,200여종의 신선제품, 연간 매출 약 28억 유로.
초기 상황
ML을 구현하기 전에 소매업체는 예측 기능이 포함된 기존 ERP 시스템을 사용하고 있었습니다. 최근 4개월간 가중이동평균을 기준으로 합니다. 신선한 농산물의 평균 MAPE는 시즌 상품은 45%로 최고치를 기록하며 28%를 기록했습니다. 신선제품의 낭비율은 구매 가치의 12%, 폐기 비용, 가치 손실 및 평판에 대한 영향 포함 중요합니다. 구매자는 주문에 대해 매주 수동 검토를 수행하여 점유합니다. 정규직 FTE는 약 3명입니다.
솔루션 아키텍처
기술 스택 구현
| 레이어 | 초이스 테크놀로지 | 기능 |
|---|---|---|
| 데이터 웨어하우스 | 설화 | 중앙 집중식 스토리지, 전체 220개 매장의 POS 데이터 |
| ETL/ELT | DBT + 에어바이트 | POS 통합, 날씨(OpenWeather API), 프로모션(CRM 시스템) |
| 관현악법 | 아파치 에어플로우 | 매일 재교육 오전 3시~5시, 이상 경고 |
| 모델 | TFT(상위 500 SKU) + XGBoost(롱테일) | 각 SKU x 매장에 대한 7일 및 14일 예측 |
| MLOps | Databricks의 MLflow | 추적 실험, 모델 등록, A/B 테스트 |
| 피복재 | FastAPI + Redis 캐시 | 지연 시간이 100ms 미만, 24시간 캐시인 API 예측 |
| 마크다운 엔진 | Python 마이크로서비스 | 위험에 처한 제품에 대해 4시간마다 최적의 할인 계산 |
| ESL 통합 | Hanshow / 가격 API | 전자 선반 라벨의 가격 자동 업데이트 |
| 기부 | API 푸드뱅크 | 유통기한이 24시간 미만인 제품에 대한 자동 알림은 판매되지 않습니다. |
타임라인 및 구현 단계
- 1~2개월: 데이터 감사, 3년간의 과거 데이터 정리, 외부 소스 통합 (날씨, 프로모션). 50개의 대용량, 부패 가능성이 높은 파일럿 SKU 식별.
- 3~4개월: 파일럿 모델 훈련(XGBoost + Prophet), Walk-forward 검증, 기준선 MAPE 27.8%. 3개 테스트 매장에서 ERP 주문 시스템과 최초로 통합되었습니다.
- 5~6개월: 상위 500개 신규 SKU에 대한 TFT 교육. A/B 테스트: ML을 사용하는 매장 30개 vs 전통적인 시스템을 갖춘 30개 매장. 결과: ML 매장에서 폐기물이 28% 감소했습니다.
- 7~9개월: 전체 220개 매장에 출시됩니다. 마크다운 엔진 구현 80개 매장에 ESL 통합이 가능합니다. 푸드뱅크를 통한 자동 기부 활성화.
- 10-12개월: 템플릿 최적화, 소셜 미디어 기능 추가(멘션 새로운 트렌드에 대한 Instagram/TikTok의 제품). TFT+XGBoost 앙상블 구현.
12개월에 측정된 결과
비즈니스 KPI - 구현 후 결과(12개월)
| 미터법 | ML 이전 | 12개월 후 ML | 변화 |
|---|---|---|---|
| MAPE 신선한 제품 | 28.2% | 9.1% | -67.7% |
| 구매 가치에 대한 낭비율 | 12.0% | 7.8% | -35.0% |
| 품절률(OOS) | 4.8% | 2.9% | -39.6% |
| 기부제품(톤/년) | 45톤 | 112톤 | +149% (가다 혜택법) |
| 가격 인하 효율성(% 수익 회수) | 32% | 71% | +39pp |
| 수동 주문 검토 전담 FTE | 3.0 정규직 | 0.8 FTE | -73.3% |
| 폐기물 비용 절감 + 폐기 | - | €420만/년 | 새로운 저축 |
| 기부금에 대한 세금 혜택 (Gadda Law) | €85K/년 | €210K/년 | +147% |
| 프로젝트 ROI(투자: 180만 유로) | - | 12개월에 234% | 투자 회수 기간 5.2개월 |
사례 연구에서 얻은 교훈
- 과거 데이터의 품질은 실제 병목 현상입니다. 40%의 시간 프로젝트 중 데이터 정리 및 통합에 전념했습니다. 과거 POS 데이터는 휴가 기간과 매장 리노베이션에 상당한 격차가 발생합니다.
- 모든 SKU가 동일한 모델을 가질 자격이 있는 것은 아닙니다. 상위 500개 SKU를 위한 TFT, 중간 SKU(500-3000)의 예언자, 롱테일(>3000 SKU)의 이동 평균. 최적의 비용/이익을 제공하는 3계층 접근 방식입니다.
- 변경 관리가 중요합니다. 20년 이상의 경험을 가진 구매자 그들은 AI에 저항했습니다. 해결 방법: Prophet 분해(추세 + 계절성) 표시 + 휴일) 구매자가 모델의 논리를 이해하고 검증할 수 있습니다.
- 마크다운 엔진에는 ESL 통합이 필요합니다. 전자 선반 라벨 없이, 가격을 수동으로 업데이트하면 반응 속도의 이점이 상쇄됩니다. 자동 시스템.
- 지속적인 모니터링이 필수적입니다. 시간이 지남에 따라 모델 성능이 저하됨 (개념 드리프트) 포스트 코로나19 구매 행동 변화로 인한 것입니다. 재교육 자동 주간 안정적인 성능을 유지했습니다.
FoodTech 수요 예측을 위한 비즈니스 지표
수요예측 시스템의 성공은 MAPE만으로 측정되지 않습니다. 그것은 필요하다 모델의 기술적 품질과 영향을 모두 다루는 측정 프레임워크를 정의합니다. 진짜 사업.
지표 프레임워크: 기술 + 비즈니스
| 범주 | 미터법 | 공식 / 정의 | 일반적인 대규모 유통 대상 |
|---|---|---|---|
| 정확도 모델 | 마페 | 평균 절대 백분율 오류 | < 15% 신선한 제품 |
| RMSE | 제곱 평균 오차(단위) | SKU 수량에 따라 다릅니다. | |
| 편견 | (예측 - 실제) / 실제, 평균 | |편향| < 5%(체계적인 오버/언더 없음) | |
| WMAPE | 판매량에 따라 가중치가 부여된 MAPE | < 10%(롱테일의 경우 MAPE보다 우수함) | |
| 쓰레기 | 폐기물 감소율(WRR) | (전 폐기물 - 후 폐기물) / 폐기물 전 | > 12개월 이내 30% |
| 구매시 폐기물 % | 낭비된 가치 / 구매한 가치 | < 8% (부문 벤치마크) | |
| 전환된 톤(기부금) | 기부된 톤 / 총 잠재적 폐기물 | > 초과분의 50% | |
| 유효성 | 품절률(OOS) | 품절된 SKU % / 총 SKU | < 3% |
| 채우기 비율 | 배송된 수량 / 주문된 수량 | > 97% | |
| 커버 일수(DOC) | 현재재고 / 일평균수요 | 제품 카테고리에 최적 | |
| 동적 가격 | 마크다운 효율성(ME) | 마크다운 수익/비용 잠재적 낭비 | > 65% |
| 마크다운을 통해 판매되는 제품 | 유통기한이 임박한 제품 %를 할인된 가격으로 판매 | > 위험에 처한 제품의 80% | |
| 평균 할인이 적용됨 | 가격 인하 제품 평균 할인 | 25-45% (탄력성에 최적) |
모니터링 대시보드: 개념 드리프트 감지
식품 예측 모델은 특히 다음 사항에 취약합니다. 컨셉 드리프트: 영양 동향(예: 비건 제품 증가)으로 인해 계절별로 구매 패턴이 바뀌고, 가격 위기(2022~2024년 인플레이션으로 인해 구매 행동이 크게 변화됨) 신선한 제품), 특별한 이벤트(유행병)를 위한 것입니다. 지속적으로 모니터링이 필요하다 성능 저하.
# Monitoring sistema di rilevamento concept drift per food forecasting
import numpy as np
import pandas as pd
from scipy import stats
from dataclasses import dataclass
from typing import List, Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class DriftAlert:
severity: str # 'WARNING' | 'CRITICAL'
metric: str
current_value: float
baseline_value: float
change_pct: float
affected_skus: List[str]
recommendation: str
class ForecastMonitor:
def __init__(self, baseline_window_days: int = 30,
monitoring_window_days: int = 7):
self.baseline_window = baseline_window_days
self.monitoring_window = monitoring_window_days
self.thresholds = {
'mape_warning': 0.20, # MAPE > 20% = warning
'mape_critical': 0.35, # MAPE > 35% = critical, richiede retraining
'bias_warning': 0.10, # Bias sistematico > 10%
'bias_critical': 0.20, # Bias sistematico > 20%
'waste_rate_warning': 0.10, # Tasso spreco > 10%
}
def compute_current_metrics(self, predictions_df: pd.DataFrame,
actuals_df: pd.DataFrame) -> dict:
"""Calcola metriche sull'ultima finestra di monitoring"""
merged = predictions_df.merge(actuals_df, on=['date', 'sku_id', 'store_id'])
metrics_by_sku = merged.groupby('sku_id').apply(
lambda g: pd.Series({
'mape': np.mean(np.abs((g['actual'] - g['forecast']) / (g['actual'] + 1e-8))),
'bias': np.mean((g['forecast'] - g['actual']) / (g['actual'] + 1e-8)),
'rmse': np.sqrt(np.mean((g['actual'] - g['forecast']) ** 2)),
'n_observations': len(g)
})
)
return {
'mean_mape': metrics_by_sku['mape'].mean(),
'p90_mape': metrics_by_sku['mape'].quantile(0.9),
'mean_bias': metrics_by_sku['bias'].mean(),
'degraded_skus': metrics_by_sku[
metrics_by_sku['mape'] > self.thresholds['mape_warning']
].index.tolist(),
'critical_skus': metrics_by_sku[
metrics_by_sku['mape'] > self.thresholds['mape_critical']
].index.tolist(),
'per_sku_metrics': metrics_by_sku
}
def detect_drift(self, baseline_metrics: dict,
current_metrics: dict) -> List[DriftAlert]:
"""Confronta metriche correnti vs baseline e genera alert"""
alerts = []
# Alert MAPE
mape_change = ((current_metrics['mean_mape'] - baseline_metrics['mean_mape'])
/ baseline_metrics['mean_mape'])
if current_metrics['mean_mape'] > self.thresholds['mape_critical']:
alerts.append(DriftAlert(
severity='CRITICAL',
metric='MAPE',
current_value=current_metrics['mean_mape'] * 100,
baseline_value=baseline_metrics['mean_mape'] * 100,
change_pct=mape_change * 100,
affected_skus=current_metrics['critical_skus'],
recommendation='RETRAINING IMMEDIATO richiesto. Analizza concept drift.'
))
elif current_metrics['mean_mape'] > self.thresholds['mape_warning']:
alerts.append(DriftAlert(
severity='WARNING',
metric='MAPE',
current_value=current_metrics['mean_mape'] * 100,
baseline_value=baseline_metrics['mean_mape'] * 100,
change_pct=mape_change * 100,
affected_skus=current_metrics['degraded_skus'],
recommendation='Monitorare trend. Pianificare retraining prossima settimana.'
))
# Alert Bias sistematico
if abs(current_metrics['mean_bias']) > self.thresholds['bias_critical']:
direction = 'sovrastima' if current_metrics['mean_bias'] > 0 else 'sottostima'
alerts.append(DriftAlert(
severity='CRITICAL',
metric='Bias',
current_value=current_metrics['mean_bias'] * 100,
baseline_value=baseline_metrics.get('mean_bias', 0) * 100,
change_pct=0,
affected_skus=[],
recommendation=f'Bias sistematico di {direction}: aggiorna calibrazione modello.'
))
return alerts
def check_and_alert(self, predictions_df: pd.DataFrame,
actuals_df: pd.DataFrame,
baseline_metrics: dict) -> None:
"""Entry point principale del monitor"""
current = self.compute_current_metrics(predictions_df, actuals_df)
alerts = self.detect_drift(baseline_metrics, current)
for alert in alerts:
if alert.severity == 'CRITICAL':
logger.critical(
f'[DRIFT CRITICAL] {alert.metric}: {alert.current_value:.1f}% '
f'(baseline: {alert.baseline_value:.1f}%) - {alert.recommendation}'
)
# Qui si trigghera notifica Slack/PagerDuty
self._send_alert(alert)
else:
logger.warning(
f'[DRIFT WARNING] {alert.metric}: {alert.current_value:.1f}% - '
f'{len(alert.affected_skus)} SKU degradati'
)
def _send_alert(self, alert: DriftAlert) -> None:
"""Invia notifica al team ML Ops (implementazione dipende dal sistema)"""
# Integrazione con: Slack, PagerDuty, email, Prometheus Alertmanager
pass
식품 수요 예측의 모범 사례 및 안티 패턴
피해야 할 안티패턴
- 일시적인 누출 날짜: 대신 표준 교차 검증을 사용하십시오. Walk-forward 검증은 미래 데이터를 훈련에 도입합니다. 그리고 가장 흔한 방법론적 오류 그리고 생산에서 거짓으로 판명되는 30-50%의 낙관적 MAPE로 이어집니다.
- 모든 SKU에 대한 단일 템플릿: 간헐적인 수요가 있는 제품(판매 주 2~3회) 대용량 제품과는 다른 모델이 필요합니다. 적용 연간 판매량이 200개인 제품에 대한 LSTM은 과적합과 60% 이상의 MAPE로 이어집니다.
- 비즈니스 편견 무시: 소매업체는 역사적으로 과잉 주문하는 경향이 있었습니다. 품절을 피하기 위해(인식된 "두 가지 악" 중 최악) 모델이 훈련된 경우 실제 수요가 아닌 과거 주문 데이터에 대해서는 이러한 편향을 물려받습니다. 판매 데이터 사용 명령이 아니라 효과적입니다.
- 향후 프로모션을 모델링하지 마세요. 프로모션을 모르는 예측 다음 주에 계획된 것은 해당 기간에 체계적으로 잘못된 예측을 제공합니다. 프로모션. 항상 프로모션 캘린더를 입력으로 통합하세요.
- 오류 방향을 무시하고 MAPE만 최적화합니다. 맥락에서 식품의 경우 50단위(50단위 낭비)의 과잉 예측 오류가 훨씬 더 비쌉니다. 50개 단위의 과소 예측 오류(50개 판매 손실). 비대칭 측정항목 사용 (Weighted MAPE) 또는 교육의 비대칭 손실 기능.
통합된 모범 사례
- 처리별로 SKU를 계층화합니다. 각 SKU를 볼륨, 가변성으로 분류합니다. 그리고 유통기한. 다양한 클러스터에 다양한 모델을 적용합니다. 계층적 접근 방식(글로벌 + local)은 종종 두 순수 모델 모두보다 성능이 뛰어납니다.
- 항상 불확실성 추정값을 포함합니다. 정확한 예측(단일 숫자) 확률적 예측(분포 또는 간격)보다 덜 유용합니다. TFT와 선지자 최적의 안전 재고를 계산하는 데 기본이 되는 분위수를 기본적으로 제공합니다.
- 재교육을 자동화하되 이상 현상을 모니터링합니다. 매일 재교육 유용하지만 어제의 데이터가 비정상적인 경우(예: POS 오류) 오류를 증폭시킬 수 있습니다. 새 모델을 프로덕션에 적용하기 전에 항상 온전성 검사를 구현하세요.
- 구매자와의 신뢰 구축: ML 시스템은 저항으로 인해 실패하는 경우가 많습니다. 기술적 품질이 아닌 조직적입니다. 모델의 분해를 보여주고, 설명하세요. 비즈니스 언어로 예측하면 구매자는 모델을 개선하는 피드백 루프.
- 지속적으로 ROI 측정: 매주 폐기물 절감액을 계산해 보세요. 회피, 인하로부터의 회복, Gadda Law 세금 혜택. 가치를 가시화하라 시간이 지나도 조직의 헌신을 유지하기 위해 만들어지고 기본이 됩니다.
결론: 지속 가능성 도구로서의 예측
식품 부문에서 머신러닝을 활용한 수요 예측은 AI 활용 사례 중 하나입니다. 현재 이용 가능한 최고의 영향/복잡성 비율입니다. 문제가 잘 정의되어 있고 데이터가 존재하며(모든 소매업체는 수년간의 POS 기록을 보유하고 있음), 성공 지표가 명확합니다(MAPE, 폐기물 발생률, ROI), 그 영향은 비즈니스 범위를 넘어 확장됩니다. 음식물 쓰레기를 줄이다 경제적, 규제적, 환경적 필요성.
2025년에 시작하려는 이탈리아 대규모 소매업에 대해 제안된 경로는 실용적입니다. 50개의 대용량, 부패 가능성이 높은 SKU의 하위 집합에 Prophet 또는 XGBoost를 사용하여 측정 30일 후 MAPE 델타 및 폐기물 감소율, 내부 비즈니스 사례 구축, 그런 다음 상위 SKU를 위한 TFT와 마크다운 통합을 향해 점진적으로 확장합니다. ESL 엔진 및 시스템.
최종 목표는 가장 정확한 모델을 갖는 것이 아니라, 의사결정 생태계 예측을 행동으로 바꾸는 것: 최적주문, 적응가격, 기부 자동. 이 생태계에서는 기계 학습과 지원 인프라가 있지만 폐기물 감소와 모든 투자를 정당화하는 구체적인 비즈니스 결과.
FoodTech 시리즈에서 계속
시리즈의 다음 기사에서는 수요 예측을 위한 보완 기술을 살펴봅니다.
- 9조 - 위성 API 및 정밀 농업: 위성 데이터처럼 NDVI 및 Sentinel-2는 농업 수확량 예측 모델을 제공하여 루프를 닫습니다. 생산부터 유통까지.
- 기사 10 - 작물 모니터링을 위한 ML Edge: 내장된 추론 농작물 상태를 실시간으로 모니터링할 수 있는 현장 IoT 장치 공급망 파이프라인에 통합됩니다.
프로덕션 배포를 가능하게 하는 MLOps 기술에 대해 자세히 알아보세요. 예측 모델의 경우 시리즈도 참조하세요. 비즈니스용 MLOps: MLflow를 사용하여 프로덕션 중인 AI 모델.







