適応学習アルゴリズム: 理論から本番まで
すべての生徒の個人教師、つまり生徒のレベルを継続的に調整する教師の夢です。 今日では、困難を解決し、知識のギャップを特定し、適切なコンテンツを適切なタイミングで提供します。 ~のおかげで達成可能 適応学習アルゴリズム。それは SF ではありません: プラットフォーム Khan Academy、Duolingo、Coursera などは、カスタム生成されたパスで何百万人もの学生にサービスを提供しています アルゴリズム的にリアルタイムで。
課題は理論的なものではなく、エンジニアリング的なものです。システムの導入方法 項目応答理論 (IRT) 遅延を悪化させることなく 100 万人の生徒に拡張できるでしょうか?モデルを統合する方法 知識の追跡 継続的なモニタリングと A/B テストを備えた本番環境の ML パイプラインで? 探索と活用の両方が必要なレコメンダー システムで、どのようにバランスをとるのですか? 正確で教育学的に有効ですか?
この記事では、具体的なコード、スケーラブルなアーキテクチャ、教訓を使ってこれらの質問に答えます。 本番環境のシステムから学びました。 IRT の数学から開始し、ベイズ知識を通過します。 トレースとディープナレッジトレースにより、機能パイプライン、モデルを備えた完全なシステムに到達します。 導入された A/B テスト フレームワーク。
何を学ぶか
- 項目応答理論 (IRT) の数学的基礎とそれを Python で実装する方法
- ベイジアン ナレッジ トレーシング (BKT) とディープ ナレッジ トレーシング (DKT): いつどちらを使用するか
- 信号を学習するための特徴エンジニアリング: 時間、信頼性、連続エラー
- FastAPI と PostgreSQL を使用した適応型レコメンデーション エンジンのアーキテクチャ
- 本番環境でアルゴリズムを検証するための A/B テスト フレームワーク
- 教育モデルのモニタリングとドリフト検出
1. 項目応答理論: 基本モデル
L'項目応答理論 (IRT) そして現代の教育測定の数学的基礎。 1960 年代に Georg Rasch と Lord によって導入された IRT は、生徒が反応する確率をモデル化します。 潜在的な能力とアイテムの特性に応じて、アイテム (質問) に正しく答えます。
2PL (Two-Parameter Logistic) モデルは実稼働環境で最もよく使用されます。
P(X=1 | シータ, a, b) = 1 / (1 + exp(-a * (シータ - b)))
どこ theta そして生徒の能力、 a そして識別パラメータ
(その項目が、さまざまな能力の生徒をどの程度区別できるか) e b とパラメータ
難易度 (正解の確率が 0.5 であるシータの値)。
import numpy as np
from scipy.optimize import minimize
from scipy.special import expit # sigmoid function
class IRTModel2PL:
"""
Two-Parameter Logistic IRT Model.
Calibra difficulty (b) e discrimination (a) per ogni item.
Stima l'ability (theta) per ogni studente.
"""
def __init__(self):
self.item_params = {} # {item_id: {'a': float, 'b': float}}
self.student_abilities = {} # {student_id: float}
def probability_correct(self, theta: float, a: float, b: float) -> float:
"""P(correct | theta, a, b) usando il modello 2PL."""
return expit(a * (theta - b))
def log_likelihood_student(self, theta: float, responses: list) -> float:
"""
Log-likelihood per uno studente data la sequenza di risposte.
responses: lista di tuple (item_id, is_correct)
"""
ll = 0.0
for item_id, is_correct in responses:
if item_id not in self.item_params:
continue
a = self.item_params[item_id]['a']
b = self.item_params[item_id]['b']
p = self.probability_correct(theta, a, b)
# Clip per stabilità numerica
p = np.clip(p, 1e-9, 1 - 1e-9)
ll += is_correct * np.log(p) + (1 - is_correct) * np.log(1 - p)
return ll
def estimate_student_ability(
self,
student_id: str,
responses: list,
prior_mean: float = 0.0,
prior_std: float = 1.0
) -> float:
"""
MAP estimation dell'ability di uno studente.
Usa prior Gaussiano N(0,1) per regolarizzazione.
"""
def negative_map(theta_array):
theta = theta_array[0]
ll = self.log_likelihood_student(theta, responses)
# Log prior Gaussiano
log_prior = -0.5 * ((theta - prior_mean) / prior_std) ** 2
return -(ll + log_prior)
result = minimize(
negative_map,
x0=[0.0],
method='L-BFGS-B',
bounds=[(-4.0, 4.0)]
)
ability = result.x[0]
self.student_abilities[student_id] = ability
return ability
def next_item_cat(
self,
student_id: str,
available_items: list,
strategy: str = 'max_information'
) -> str:
"""
Computerized Adaptive Testing: seleziona il prossimo item ottimale.
Strategie disponibili:
- 'max_information': massimizza l'informazione di Fisher al theta corrente
- 'target_difficulty': seleziona item con b vicino a theta
"""
theta = self.student_abilities.get(student_id, 0.0)
best_item = None
best_score = -np.inf
for item_id in available_items:
if item_id not in self.item_params:
continue
a = self.item_params[item_id]['a']
b = self.item_params[item_id]['b']
if strategy == 'max_information':
# Informazione di Fisher per il modello 2PL
p = self.probability_correct(theta, a, b)
q = 1 - p
fisher_info = (a ** 2) * p * q
score = fisher_info
elif strategy == 'target_difficulty':
# Item più vicino al livello dello studente
score = -abs(b - theta)
if score > best_score:
best_score = score
best_item = item_id
return best_item
def calibrate_items_mle(self, response_matrix: np.ndarray) -> None:
"""
Calibra i parametri degli item usando Maximum Likelihood.
response_matrix: (n_students, n_items) con valori 0/1
Implementazione semplificata - in produzione usa py-irt o mirt.
"""
# Stima iniziale delle abilita con proporzione di risposte corrette
n_students, n_items = response_matrix.shape
abilities = np.zeros(n_students)
for iteration in range(20): # EM iterations
# E-step: aggiorna abilita dato params items
for s in range(n_students):
responses = [
(f'item_{i}', int(response_matrix[s, i]))
for i in range(n_items)
if f'item_{i}' in self.item_params
]
if responses:
abilities[s] = self.estimate_student_ability(
f'student_{s}', responses
)
# M-step: aggiorna params items dato abilita
for i in range(n_items):
item_id = f'item_{i}'
# Stima semplificata: in produzione usa marginal MLE
p_mean = response_matrix[:, i].mean()
b_estimate = -np.log(p_mean / (1 - p_mean + 1e-9))
self.item_params[item_id] = {
'a': 1.0, # discrimination iniziale
'b': np.clip(b_estimate, -3.0, 3.0)
}
# Esempio di utilizzo
model = IRTModel2PL()
# Calibra con dati storici
import numpy as np
np.random.seed(42)
responses = np.random.binomial(1, 0.6, (500, 50))
model.calibrate_items_mle(responses)
# Stima ability di un nuovo studente
student_responses = [
('item_0', 1), ('item_5', 0), ('item_10', 1),
('item_15', 1), ('item_20', 0)
]
ability = model.estimate_student_ability('student_new', student_responses)
print(f"Estimated ability: {ability:.3f}") # es: 0.342
# Seleziona il prossimo item
next_item = model.next_item_cat('student_new', [f'item_{i}' for i in range(50)])
print(f"Next optimal item: {next_item}")
2. ベイズ知識の追跡: 時間の経過に伴う進行のモデル化
IRT は生徒の能力を瞬時に撮影しますが、 ベイジアン ナレッジ トレーシング (BKT) 学習を通じて時間の経過とともに知識がどのように変化するかをモデル化します。 1994 年にコーベットとアンダーソンによって導入されました。 BKT は、その解釈可能性により、適応システムの主力であり続けています。
BKT は、知識コンポーネント (KC) の 4 つのパラメーターを備えた隠れマルコフ モデルです。
- P(L0): 生徒が KC を知っている初期確率
- P(T): 遷移確率 (練習機会後の学習 KC)
- P(G):推測する確率(分からないのに正解する確率)
- P(S):スリップ確率(分かっているのに誤答する)
from dataclasses import dataclass
from typing import Optional
import json
@dataclass
class BKTParams:
"""Parametri BKT per un knowledge component."""
p_learn: float # P(L0) - prior knowledge
p_transit: float # P(T) - learning rate
p_guess: float # P(G) - guess rate
p_slip: float # P(S) - slip rate
kc_id: str # Knowledge Component ID
class BKTTracker:
"""
Bayesian Knowledge Tracing tracker per singolo studente.
Aggiorna la probabilità di padronanza dopo ogni risposta.
"""
def __init__(self, params: BKTParams):
self.params = params
self.p_mastery = params.p_learn # Stima corrente di padronanza
self.history = []
def update(self, is_correct: bool) -> float:
"""
Aggiorna la stima di padronanza dopo una risposta.
Restituisce la nuova probabilità di padronanza.
Passi:
1. Calcola P(correct | mastery) e P(correct | !mastery)
2. Applica Bayes per aggiornare P(mastery | correct/incorrect)
3. Applica la transizione di apprendimento
"""
p = self.p_mastery
# Step 1: Calcola probabilità di osservare la risposta
if is_correct:
# P(correct | mastery) = 1 - P(slip)
p_obs_given_know = 1 - self.params.p_slip
# P(correct | !mastery) = P(guess)
p_obs_given_not = self.params.p_guess
else:
# P(incorrect | mastery) = P(slip)
p_obs_given_know = self.params.p_slip
# P(incorrect | !mastery) = 1 - P(guess)
p_obs_given_not = 1 - self.params.p_guess
# Step 2: Posterior di padronanza (formula di Bayes)
p_correct_total = (
p_obs_given_know * p +
p_obs_given_not * (1 - p)
)
# Evita divisione per zero
if p_correct_total < 1e-10:
p_posterior = p
else:
p_posterior = (p_obs_given_know * p) / p_correct_total
# Step 3: Applica transizione di apprendimento
p_new = p_posterior + (1 - p_posterior) * self.params.p_transit
self.p_mastery = p_new
self.history.append({
'response': is_correct,
'p_mastery_before': p,
'p_mastery_after': p_new
})
return p_new
def is_mastered(self, threshold: float = 0.95) -> bool:
"""Ritorna True se la padronanza supera la soglia."""
return self.p_mastery >= threshold
def recommended_action(self) -> str:
"""Raccomanda l'azione successiva basata sullo stato corrente."""
p = self.p_mastery
if p >= 0.95:
return 'advance' # Procedi al KC successivo
elif p >= 0.70:
return 'practice' # Pratica ulteriore
elif p >= 0.40:
return 'hint' # Offri un hint
else:
return 'remediation' # Torna ai prerequisiti
class BKTSystem:
"""Sistema BKT multi-studente multi-KC."""
def __init__(self, kc_params: dict):
"""
kc_params: {kc_id: BKTParams}
"""
self.kc_params = kc_params
self.trackers = {} # {(student_id, kc_id): BKTTracker}
def get_tracker(self, student_id: str, kc_id: str) -> BKTTracker:
"""Recupera o crea un tracker per student/KC."""
key = (student_id, kc_id)
if key not in self.trackers:
if kc_id not in self.kc_params:
raise ValueError(f"KC {kc_id} non trovato")
self.trackers[key] = BKTTracker(self.kc_params[kc_id])
return self.trackers[key]
def process_response(
self,
student_id: str,
kc_id: str,
is_correct: bool
) -> dict:
"""Processa una risposta e restituisce lo stato aggiornato."""
tracker = self.get_tracker(student_id, kc_id)
p_mastery = tracker.update(is_correct)
return {
'student_id': student_id,
'kc_id': kc_id,
'p_mastery': round(p_mastery, 4),
'is_mastered': tracker.is_mastered(),
'recommended_action': tracker.recommended_action()
}
def get_mastery_profile(self, student_id: str) -> dict:
"""Restituisce il profilo completo di padronanza di uno studente."""
profile = {}
for (sid, kc_id), tracker in self.trackers.items():
if sid == student_id:
profile[kc_id] = {
'p_mastery': round(tracker.p_mastery, 4),
'is_mastered': tracker.is_mastered(),
'attempts': len(tracker.history)
}
return profile
# Esempio configurazione sistema
kc_params = {
'algebra_linear': BKTParams(
p_learn=0.20, # 20% conosce gia
p_transit=0.15, # 15% impara per ogni pratica
p_guess=0.25, # 25% probabilità di indovinare
p_slip=0.10, # 10% probabilità di sbagliare pur sapendo
kc_id='algebra_linear'
),
'calcolo_derivate': BKTParams(
p_learn=0.10,
p_transit=0.12,
p_guess=0.15,
p_slip=0.08,
kc_id='calcolo_derivate'
)
}
bkt_system = BKTSystem(kc_params)
# Simula una sessione di apprendimento
responses_sequence = [False, False, True, True, True, True, False, True]
for i, correct in enumerate(responses_sequence):
result = bkt_system.process_response('student_42', 'algebra_linear', correct)
print(f"Risposta {i+1}: {'corretta' if correct else 'errata'} -> "
f"P(mastery)={result['p_mastery']:.3f}, "
f"Azione: {result['recommended_action']}")
3. 深い知識の追跡: ニューラルアプローチ
Il ディープナレッジトレーシング (DKT)、Piechらによって導入されました。 2015年に置き換えられます 依存関係をキャプチャできるリカレント ニューラル ネットワークを使用した BKT の仮定の簡素化 知識コンポーネント間の複雑さ。 DKT はデータセットで BKT よりも 20 ~ 30% 高い精度を達成します 2025 年の調査によると実際 (最近の調査では 87.5% の予測精度)。
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
class DKTModel(nn.Module):
"""
Deep Knowledge Tracing con LSTM.
Input: sequenza di (item_id, risposta) encodata
Output: probabilità di risposta corretta per ogni item
"""
def __init__(
self,
n_items: int,
hidden_size: int = 128,
n_layers: int = 2,
dropout: float = 0.2
):
super().__init__()
self.n_items = n_items
self.hidden_size = hidden_size
# Input encoding: item_id * 2 (correct/incorrect)
# Ogni item ha 2 rappresentazioni: risposta corretta e risposta errata
self.input_size = n_items * 2
self.lstm = nn.LSTM(
input_size=self.input_size,
hidden_size=hidden_size,
num_layers=n_layers,
dropout=dropout if n_layers > 1 else 0,
batch_first=True
)
self.output_layer = nn.Sequential(
nn.Linear(hidden_size, n_items),
nn.Sigmoid()
)
def encode_input(
self,
item_ids: torch.Tensor,
responses: torch.Tensor
) -> torch.Tensor:
"""
Encoding one-hot per (item, response).
item_ids: (batch, seq_len) - ID degli item
responses: (batch, seq_len) - 0 o 1
Restituisce: (batch, seq_len, n_items * 2)
"""
batch_size, seq_len = item_ids.shape
x = torch.zeros(batch_size, seq_len, self.input_size)
# Item risposto correttamente: posizione item_id
# Item risposto erroneamente: posizione item_id + n_items
for b in range(batch_size):
for t in range(seq_len):
iid = item_ids[b, t].item()
r = responses[b, t].item()
if r == 1:
x[b, t, iid] = 1.0
else:
x[b, t, iid + self.n_items] = 1.0
return x
def forward(
self,
item_ids: torch.Tensor,
responses: torch.Tensor
) -> torch.Tensor:
"""
Forward pass.
Restituisce predizioni per il prossimo item in sequenza.
Output shape: (batch, seq_len, n_items)
"""
x = self.encode_input(item_ids, responses)
lstm_out, _ = self.lstm(x)
predictions = self.output_layer(lstm_out)
return predictions
class DKTDataset(Dataset):
"""Dataset per DKT - sequenze di interazioni studente."""
def __init__(
self,
sequences: list,
max_seq_len: int = 200,
pad_value: int = -1
):
"""
sequences: lista di dizionari
{'item_ids': [...], 'responses': [...]}
"""
self.sequences = sequences
self.max_seq_len = max_seq_len
self.pad_value = pad_value
def __len__(self):
return len(self.sequences)
def __getitem__(self, idx):
seq = self.sequences[idx]
item_ids = seq['item_ids'][:self.max_seq_len]
responses = seq['responses'][:self.max_seq_len]
seq_len = len(item_ids)
# Padding
pad_len = self.max_seq_len - seq_len
item_ids = item_ids + [0] * pad_len
responses = responses + [0] * pad_len
mask = [1] * seq_len + [0] * pad_len
return {
'item_ids': torch.tensor(item_ids, dtype=torch.long),
'responses': torch.tensor(responses, dtype=torch.float),
'mask': torch.tensor(mask, dtype=torch.bool),
'seq_len': seq_len
}
def train_dkt(
model: DKTModel,
train_loader: DataLoader,
val_loader: DataLoader,
n_epochs: int = 50,
lr: float = 1e-3
) -> dict:
"""Training loop per DKT con early stopping."""
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.BCELoss(reduction='none')
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, patience=5, factor=0.5
)
best_val_auc = 0.0
history = {'train_loss': [], 'val_auc': []}
for epoch in range(n_epochs):
model.train()
total_loss = 0.0
for batch in train_loader:
item_ids = batch['item_ids']
responses = batch['responses']
mask = batch['mask']
# Predici su t-1, target su t
# input: sequenza[:-1], target: sequenza[1:]
pred = model(item_ids[:, :-1], responses[:, :-1])
# Raccoglie le predizioni per gli item effettivi
target_items = item_ids[:, 1:]
target_responses = responses[:, 1:]
target_mask = mask[:, 1:]
# Estrai predizione per l'item corretto
batch_size, seq_len, n_items = pred.shape
pred_flat = pred.reshape(-1, n_items)
target_items_flat = target_items.reshape(-1)
pred_selected = pred_flat.gather(
1, target_items_flat.unsqueeze(1)
).squeeze(1)
# Loss mascherata (ignora padding)
loss = criterion(
pred_selected,
target_responses.reshape(-1)
)
mask_flat = target_mask.reshape(-1).float()
loss = (loss * mask_flat).sum() / mask_flat.sum()
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
history['train_loss'].append(avg_loss)
# Validation
val_auc = evaluate_dkt(model, val_loader)
history['val_auc'].append(val_auc)
scheduler.step(1 - val_auc) # Minimizza 1-AUC
if val_auc > best_val_auc:
best_val_auc = val_auc
torch.save(model.state_dict(), 'best_dkt_model.pt')
print(f"Epoch {epoch+1}/{n_epochs} - "
f"Loss: {avg_loss:.4f}, Val AUC: {val_auc:.4f}")
return history
4. 信号を学習するための特徴量エンジニアリング
多くの場合、モデルのアーキテクチャよりも機能の品質の方が重要です。適応型システム 運用環境では、単純なペア (項目、正しい/間違っている) を超えて、コンテキストをキャプチャする必要があります。 学習インタラクションが豊富です。
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
import math
@dataclass
class LearningInteraction:
"""Una singola interazione di apprendimento."""
student_id: str
item_id: str
kc_ids: list # Knowledge components coinvolti
is_correct: bool
response_time_ms: int # Tempo di risposta in millisecondi
timestamp: datetime
hint_used: bool = False
attempt_number: int = 1
confidence_level: Optional[int] = None # 1-5 se richiesto
@dataclass
class StudentFeatures:
"""Feature aggregate per uno studente su un KC."""
student_id: str
kc_id: str
# Accuracy features
total_attempts: int = 0
correct_count: int = 0
recent_accuracy: float = 0.0 # Ultimi 5 tentativi
streak_correct: int = 0 # Sequenza corrente di corrette
streak_incorrect: int = 0
# Time features
avg_response_time: float = 0.0
response_time_trend: float = 0.0 # Positivo = rallentamento
time_since_last_practice: float = 0.0 # In ore
# Engagement features
hint_rate: float = 0.0 # Proporzione con hint
first_attempt_accuracy: float = 0.0 # Solo primi tentativi
# Memory features (Ebbinghaus forgetting curve)
estimated_retention: float = 1.0
class FeatureExtractor:
"""
Estrae feature per il sistema adattivo da sequenze di interazioni.
"""
def __init__(self, decay_factor: float = 0.3):
"""
decay_factor: controlla la decadenza della curva di dimenticanza
"""
self.decay_factor = decay_factor
def ebbinghaus_retention(
self,
days_since_last: float,
memory_strength: float = 1.0
) -> float:
"""
Modello di dimenticanza di Ebbinghaus.
R = exp(-t / (S * stability_factor))
"""
if days_since_last <= 0:
return 1.0
stability = max(memory_strength, 0.1)
return math.exp(-days_since_last / (stability * 30))
def compute_features(
self,
student_id: str,
kc_id: str,
interactions: list
) -> StudentFeatures:
"""
Calcola le feature per uno studente su un KC.
interactions: lista di LearningInteraction ordinate per timestamp
"""
kc_interactions = [
i for i in interactions
if kc_id in i.kc_ids and i.student_id == student_id
]
if not kc_interactions:
return StudentFeatures(student_id=student_id, kc_id=kc_id)
features = StudentFeatures(student_id=student_id, kc_id=kc_id)
# Accuracy
features.total_attempts = len(kc_interactions)
features.correct_count = sum(1 for i in kc_interactions if i.is_correct)
# Recent accuracy (finestra ultimi 5)
recent = kc_interactions[-5:]
if recent:
features.recent_accuracy = sum(1 for i in recent if i.is_correct) / len(recent)
# Streak
streak_c = 0
streak_i = 0
for interaction in reversed(kc_interactions):
if interaction.is_correct:
if streak_i == 0:
streak_c += 1
else:
break
else:
if streak_c == 0:
streak_i += 1
else:
break
features.streak_correct = streak_c
features.streak_incorrect = streak_i
# Response time
times = [i.response_time_ms for i in kc_interactions]
features.avg_response_time = sum(times) / len(times)
# Trend risposta (positivo = rallentamento)
if len(times) >= 3:
recent_avg = sum(times[-3:]) / 3
older_avg = sum(times[:-3]) / max(len(times) - 3, 1)
features.response_time_trend = (recent_avg - older_avg) / max(older_avg, 1)
# Tempo dall'ultima pratica
last_practice = kc_interactions[-1].timestamp
now = datetime.now()
hours_since = (now - last_practice).total_seconds() / 3600
features.time_since_last_practice = hours_since
# Hint rate
features.hint_rate = sum(1 for i in kc_interactions if i.hint_used) / len(kc_interactions)
# First attempt accuracy
first_attempts = [i for i in kc_interactions if i.attempt_number == 1]
if first_attempts:
features.first_attempt_accuracy = (
sum(1 for i in first_attempts if i.is_correct) / len(first_attempts)
)
# Retention stimata
days_since = hours_since / 24
memory_strength = features.first_attempt_accuracy * 2 # 0-2
features.estimated_retention = self.ebbinghaus_retention(days_since, memory_strength)
return features
def compute_next_review_time(self, features: StudentFeatures) -> datetime:
"""
Calcola il momento ottimale per la prossima revisione (spaced repetition).
Ispirato all'algoritmo SM-2 di SuperMemo.
"""
accuracy = features.first_attempt_accuracy
attempts = features.total_attempts
# Intervallo di base in giorni
if attempts <= 1:
interval_days = 1
elif attempts == 2:
interval_days = 3
else:
# Ease factor basato sull'accuracy
ease = 1.3 + (2.5 - 1.3) * accuracy
interval_days = max(1, round(
features.time_since_last_practice / 24 * ease
))
return datetime.now() + timedelta(days=interval_days)
5. 生産システムのアーキテクチャ
運用環境の適応型システムは、以下のレイテンシで 1 秒あたり数百のリクエストを処理する必要があります。 100ミリ秒。次のアーキテクチャは、 オンラインパス (リアルタイムで提供) からオフラインパス (バッチトレーニングとキャリブレーション)。
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio
import aioredis
import asyncpg
app = FastAPI(title="Adaptive Learning API")
# ---- Modelli Pydantic ----
class ResponseRequest(BaseModel):
student_id: str
item_id: str
is_correct: bool
response_time_ms: int
hint_used: bool = False
class NextItemResponse(BaseModel):
item_id: str
estimated_difficulty: float
reason: str
mastery_state: dict
# ---- Servizi ----
class AdaptiveEngine:
"""
Engine principale per raccomandazioni adattive.
Usa Redis per feature cache e PostgreSQL per persistenza.
"""
def __init__(self, redis_client, db_pool):
self.redis = redis_client
self.db = db_pool
self.bkt_system = None # Inizializzato al startup
self.irt_model = None
async def process_response(
self,
request: ResponseRequest,
background_tasks: BackgroundTasks
) -> dict:
"""
Pipeline di processing per una risposta.
1. Persiste la risposta nel DB (async)
2. Aggiorna lo stato BKT dalla cache Redis
3. Seleziona il prossimo item
4. Schedula aggiornamento features in background
"""
student_id = request.student_id
item_id = request.item_id
# 1. Get KCs per questo item
kc_ids = await self.get_item_kcs(item_id)
# 2. Aggiorna BKT per ogni KC (via Redis)
mastery_updates = {}
for kc_id in kc_ids:
cache_key = f"bkt:{student_id}:{kc_id}"
# Recupera stato dalla cache
cached = await self.redis.get(cache_key)
if cached:
p_mastery = float(cached)
else:
# Fallback al DB
p_mastery = await self.get_mastery_from_db(student_id, kc_id)
# Aggiorna con nuova risposta
tracker = BKTTracker(await self.get_bkt_params(kc_id))
tracker.p_mastery = p_mastery
new_mastery = tracker.update(request.is_correct)
# Salva in Redis con TTL 24h
await self.redis.setex(cache_key, 86400, str(new_mastery))
mastery_updates[kc_id] = new_mastery
# 3. Seleziona prossimo item
next_item = await self.select_next_item(student_id, mastery_updates)
# 4. Background: persisti nel DB
background_tasks.add_task(
self.persist_interaction,
request,
mastery_updates
)
return {
'mastery_updates': mastery_updates,
'next_item': next_item
}
async def select_next_item(
self,
student_id: str,
mastery_updates: dict
) -> dict:
"""
Seleziona il prossimo item usando una strategia multi-obiettivo:
- Bilancia apprendimento di nuovi KC vs rinforzo di quelli deboli
- Considera la curva di dimenticanza
- Rispetta il curriculum graph
"""
# Ottieni profilo completo del curriculum
curriculum = await self.get_curriculum_graph(student_id)
# Calcola priorità per ogni KC disponibile
priorities = {}
for kc_id, kc_data in curriculum.items():
mastery = mastery_updates.get(kc_id, kc_data.get('mastery', 0.0))
# Priorità alta per KC quasi-mastered (0.7-0.9) - spingili oltre la soglia
if 0.70 <= mastery < 0.95:
priority = 2.0 * (mastery - 0.70) / 0.25
# Priorità media per KC molto deboli - ma non sopraffatti
elif mastery < 0.30:
priority = 0.5
# KC mastered - bassa priorità, solo spaced repetition
elif mastery >= 0.95:
retention = kc_data.get('estimated_retention', 1.0)
priority = (1 - retention) * 0.3
else:
priority = 1.0
priorities[kc_id] = priority
# Seleziona KC con più alta priorità
if not priorities:
raise HTTPException(status_code=404, detail="No available KCs")
target_kc = max(priorities, key=priorities.get)
# Seleziona item per questo KC usando IRT
available_items = await self.get_items_for_kc(target_kc)
student_ability = await self.get_student_ability(student_id)
# CAT selection
optimal_item = self.irt_model.next_item_cat(
student_id, available_items, strategy='max_information'
)
item_params = self.irt_model.item_params.get(optimal_item, {'a': 1.0, 'b': 0.0})
return {
'item_id': optimal_item,
'kc_id': target_kc,
'estimated_difficulty': item_params.get('b', 0.0),
'student_ability': student_ability,
'mastery_before': mastery_updates.get(target_kc, 0.0),
'priority_reason': 'near_mastery' if priorities[target_kc] > 1.5 else 'standard'
}
async def get_item_kcs(self, item_id: str) -> list:
"""Recupera i KC associati a un item dal DB."""
async with self.db.acquire() as conn:
rows = await conn.fetch(
"SELECT kc_id FROM item_kc_mapping WHERE item_id = $1",
item_id
)
return [row['kc_id'] for row in rows]
# ---- Route FastAPI ----
@app.post("/api/v1/adaptive/response")
async def process_response(
request: ResponseRequest,
background_tasks: BackgroundTasks,
engine: AdaptiveEngine = None # Dipendenza iniettata
) -> NextItemResponse:
"""
Processa una risposta e restituisce il prossimo item adattivo.
Target latency: <100ms (p95)
"""
result = await engine.process_response(request, background_tasks)
next_item = result['next_item']
return NextItemResponse(
item_id=next_item['item_id'],
estimated_difficulty=next_item['estimated_difficulty'],
reason=next_item['priority_reason'],
mastery_state=result['mastery_updates']
)
6. 適応アルゴリズムの A/B テスト
運用環境で新しいアルゴリズムを検証するには、 教育の特殊性: 生徒は交換可能ではない、学習の効果 これらは累積的であり、関連する指標はクリックスルー率を超えています。
import hashlib
from enum import Enum
from typing import Callable
import scipy.stats as stats
class ExperimentVariant(Enum):
CONTROL = "control"
TREATMENT = "treatment"
class ABTestManager:
"""
Framework A/B testing per algoritmi adattivi.
Usa student_id hashing per assegnazione deterministica.
"""
def __init__(self, experiment_id: str, treatment_fraction: float = 0.5):
self.experiment_id = experiment_id
self.treatment_fraction = treatment_fraction
self.metrics = {
ExperimentVariant.CONTROL: [],
ExperimentVariant.TREATMENT: []
}
def assign_variant(self, student_id: str) -> ExperimentVariant:
"""
Assegna uno studente a control o treatment in modo deterministico.
Lo stesso studente riceve sempre la stessa variante (no leakage).
"""
hash_input = f"{self.experiment_id}:{student_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
bucket = (hash_value % 100) / 100.0
return (
ExperimentVariant.TREATMENT
if bucket < self.treatment_fraction
else ExperimentVariant.CONTROL
)
def record_metric(
self,
student_id: str,
metric_value: float,
metric_name: str = 'mastery_gain'
):
"""Registra una metrica per uno studente."""
variant = self.assign_variant(student_id)
self.metrics[variant].append(metric_value)
def analyze_results(self) -> dict:
"""
Analisi statistica con Welch's t-test.
Adatto a campioni di dimensioni diverse.
"""
control = self.metrics[ExperimentVariant.CONTROL]
treatment = self.metrics[ExperimentVariant.TREATMENT]
if len(control) < 30 or len(treatment) < 30:
return {'error': 'Campione insufficiente (min 30 per variante)'}
t_stat, p_value = stats.ttest_ind(
treatment, control, equal_var=False
)
control_mean = sum(control) / len(control)
treatment_mean = sum(treatment) / len(treatment)
relative_lift = (treatment_mean - control_mean) / max(abs(control_mean), 1e-9)
# Cohen's d per effect size
pooled_std = (
(sum((x - control_mean)**2 for x in control) +
sum((x - treatment_mean)**2 for x in treatment)) /
(len(control) + len(treatment) - 2)
) ** 0.5
cohens_d = (treatment_mean - control_mean) / max(pooled_std, 1e-9)
return {
'experiment_id': self.experiment_id,
'control_n': len(control),
'treatment_n': len(treatment),
'control_mean': round(control_mean, 4),
'treatment_mean': round(treatment_mean, 4),
'relative_lift': round(relative_lift * 100, 2), # Percentuale
'p_value': round(p_value, 4),
'statistically_significant': p_value < 0.05,
'cohens_d': round(cohens_d, 3),
'practical_significance': abs(cohens_d) > 0.2,
'recommendation': (
'Deploy treatment' if p_value < 0.05 and cohens_d > 0.2
else 'Keep control' if p_value < 0.05 and cohens_d < -0.2
else 'No significant difference'
)
}
# Metriche chiave per A/B testing in EdTech
EDTECH_METRICS = [
'mastery_gain_per_session', # Incremento P(mastery) per sessione
'time_to_mastery', # Minuti necessari per raggiungere P(mastery)>=0.95
'session_completion_rate', # % sessioni completate
'return_rate_7d', # % studenti che tornano entro 7 giorni
'assessment_accuracy_post', # Accuratezza nei test formali post-training
]
7. モニタリングとドリフト検出
教育モデルの対象となるのは、 コンセプトドリフト さまざまな理由で: 変更 カリキュラム、学術的な季節性、教材の更新。監視システム 堅牢であり、長期にわたって予測の品質を維持するために不可欠です。
適応システムにおけるドリフト信号
- 精度ドリフト:今期のDKTモデルのAUCは0.70を下回る
- 校正ドリフト: 確率バケットが観測された精度と一致しません
- 特徴ドリフト: 応答時間の分布が大きく変化する
- ラベルシフト: 安定したコホートでは正答率が変化します
from collections import deque
import numpy as np
class ModelMonitor:
"""
Monitor per rilevamento drift in modelli adattivi.
Implementa ADWIN (Adaptive Windowing) per concept drift detection.
"""
def __init__(
self,
window_size: int = 1000,
drift_threshold: float = 0.02,
alert_callback: Callable = None
):
self.window = deque(maxlen=window_size)
self.drift_threshold = drift_threshold
self.alert_callback = alert_callback
self.baseline_auc = None
def add_prediction(self, predicted_prob: float, actual_label: int):
"""Aggiunge una predizione al buffer di monitoraggio."""
self.window.append((predicted_prob, actual_label))
if len(self.window) >= 100:
self._check_drift()
def _check_drift(self):
"""Verifica il drift confrontando finestre recenti vs storiche."""
window_list = list(self.window)
n = len(window_list)
half = n // 2
# Calcola AUC su prima e seconda meta
first_half = window_list[:half]
second_half = window_list[half:]
auc_first = self._compute_auc(first_half)
auc_second = self._compute_auc(second_half)
drift_magnitude = abs(auc_first - auc_second)
if drift_magnitude > self.drift_threshold:
alert = {
'type': 'concept_drift',
'auc_old': round(auc_first, 4),
'auc_new': round(auc_second, 4),
'magnitude': round(drift_magnitude, 4),
'recommended_action': (
'retrain' if drift_magnitude > 0.10
else 'monitor_closely'
)
}
if self.alert_callback:
self.alert_callback(alert)
def _compute_auc(self, predictions: list) -> float:
"""Calcola AUC-ROC per una lista di (predicted_prob, actual_label)."""
if not predictions:
return 0.5
sorted_preds = sorted(predictions, key=lambda x: x[0], reverse=True)
n_pos = sum(1 for _, y in predictions if y == 1)
n_neg = len(predictions) - n_pos
if n_pos == 0 or n_neg == 0:
return 0.5
tpr_points = []
fpr_points = []
tp = fp = 0
for prob, label in sorted_preds:
if label == 1:
tp += 1
else:
fp += 1
tpr_points.append(tp / n_pos)
fpr_points.append(fp / n_neg)
# Trapezoidal integration
auc = 0.0
for i in range(1, len(fpr_points)):
auc += (fpr_points[i] - fpr_points[i-1]) * (tpr_points[i] + tpr_points[i-1]) / 2
return auc
アンチパターン: 間違った指標の最適化
適応型システムにおける最も潜行的な危険は、エンゲージメント指標の最適化です。 実際の学習成果ではなく、(プラットフォームの使用時間、クリック数、セッション)を重視します。 プラットフォーム上の時間を最大化するシステムは、簡単なループを作成する可能性があります 学生をコンフォートゾーンに追い込むのではなく、コンフォートゾーンに留まる満足感 熟達。構築する前に、教育用語で成功指標を定義します。
実稼働環境における適応型システムのベスト プラクティス
- コールドスタート: 経歴のない新入生の場合は、簡単な診断評価を使用します 適応アルゴリズムをアクティブにする前に、5 ~ 10 個の項目を選択します。フラットな事前分布がシーケンスを生み出す 教育学的に無効です。
- 解釈可能性: それぞれについて読みやすい説明を必ず実装してください 推奨事項。 「導出が難しいため、この演習をお勧めします 複合関数の学習」であり、生徒の自信に不可欠です。
- 多様性: 常に同じ種類のアイテムを推奨することは避けてください。統合する 「フィルターバブル」効果を防ぐための選択関数の多様性パラメーター。
- カリキュラムの制約: カリキュラムグラフは推奨事項を制約する必要があります。 微分をまだマスターしていない学生に独立して積分を提案しないでください。 アルゴリズムが示唆するものから。
- 人間の監視: 教師は推奨事項を上書きできる必要があります 授業の進捗状況に関するレポートを受け取ります。
結論
適応学習アルゴリズムは最も魅力的な分野の 1 つです 機械学習と教育科学の交差点にあります。の数学理論から IRT から Redis と FastAPI を使用したスケーラブルなアーキテクチャへ、研究から本番への移行 計算効率だけでなく教育学的妥当性にも注意を払う必要があります。
結果は具体的です。適切に設計されたシステムでは、20 ~ 40% の改善が見られます。 静的線形パスと比較して習得までの時間が短縮されます。ニューラルモデルの進歩により DKT や LLM との統合のように、この分野は個別指導に向けて急速に進化しています ますます洗練され、パーソナライズされます。
EdTech シリーズの関連記事
- 記事 00: スケーラブルな LMS アーキテクチャ: マルチテナント パターン
- 記事 04: LLM と RAG を使用した個別の家庭教師
- 記事 06: xAPI と Kafka を使用した分析の学習







