적응형 학습 알고리즘: 이론에서 생산까지
모든 학생을 위한 개인 교사, 지속적으로 수준을 조정하는 사람의 꿈 오늘날, 어려움을 겪고 있는 지식의 격차를 파악하고 적시에 적절한 콘텐츠를 제공합니다. 덕분에 달성 가능 적응형 학습 알고리즘. 공상과학 소설이 아니다: 플랫폼 Khan Academy, Duolingo 및 Coursera와 같이 맞춤 생성된 경로를 통해 수백만 명의 학생들에게 서비스를 제공합니다. 알고리즘적으로 실시간으로.
문제는 이론적인 것이 아니라 엔지니어링입니다. 시스템 구현 방법 항목 반응 이론(IRT) 대기 시간 저하 없이 백만 명의 학생으로 확장할 수 있나요? 모델을 통합하는 방법 지식 추적 지속적인 모니터링과 A/B 테스트를 갖춘 프로덕션 ML 파이프라인에 있나요? 둘 다 필요한 추천 시스템에서 탐색과 활용의 균형을 어떻게 유지합니까? 정확하고 교육학적으로 유효한가?
이 기사에서는 구체적인 코드, 확장 가능한 아키텍처 및 교훈을 통해 이러한 질문을 해결합니다. 생산 시스템에서 배웠습니다. IRT의 수학부터 시작하여 베이지안 지식을 통과하겠습니다. 기능 파이프라인, 모델을 갖춘 완전한 시스템에 도달하기 위한 추적 및 심층 지식 추적 배포 및 A/B 테스트 프레임워크.
무엇을 배울 것인가
- IRT(항목 응답 이론)의 수학적 기초와 이를 Python에서 구현하는 방법
- 베이지안 지식 추적(BKT)과 심층 지식 추적(DKT): 언제 무엇을 사용해야 할까요?
- 학습 신호를 위한 특성 추출: 시간, 신뢰도, 순차적 오류
- FastAPI 및 PostgreSQL을 사용한 적응형 추천 엔진 아키텍처
- 프로덕션에서 알고리즘을 검증하기 위한 A/B 테스트 프레임워크
- 교육용 모델의 모니터링 및 드리프트 감지
1. 항목 반응 이론: 기본 모델
L'항목 반응 이론(IRT) 현대 교육 측정의 수학적 기초. 1960년대 Georg Rasch와 Lord가 도입한 IRT는 학생이 응답할 확률을 모델링합니다. 해당 항목의 잠재 능력과 특성에 따라 항목(질문)에 올바르게 적용됩니다.
2PL(2매개변수 로지스틱) 모델은 생산에 가장 많이 사용됩니다.
P(X=1 | 세타, a, b) = 1 / (1 + exp(-a * (세타 - b)))
어디 theta 그리고 학생의 능력, a 그리고 차별 매개변수
(항목이 서로 다른 능력을 가진 학생들을 얼마나 잘 구별하는지) e b 그리고 매개변수는
난이도(정답 확률이 0.5인 세타 값).
import numpy as np
from scipy.optimize import minimize
from scipy.special import expit # sigmoid function
class IRTModel2PL:
"""
Two-Parameter Logistic IRT Model.
Calibra difficulty (b) e discrimination (a) per ogni item.
Stima l'ability (theta) per ogni studente.
"""
def __init__(self):
self.item_params = {} # {item_id: {'a': float, 'b': float}}
self.student_abilities = {} # {student_id: float}
def probability_correct(self, theta: float, a: float, b: float) -> float:
"""P(correct | theta, a, b) usando il modello 2PL."""
return expit(a * (theta - b))
def log_likelihood_student(self, theta: float, responses: list) -> float:
"""
Log-likelihood per uno studente data la sequenza di risposte.
responses: lista di tuple (item_id, is_correct)
"""
ll = 0.0
for item_id, is_correct in responses:
if item_id not in self.item_params:
continue
a = self.item_params[item_id]['a']
b = self.item_params[item_id]['b']
p = self.probability_correct(theta, a, b)
# Clip per stabilità numerica
p = np.clip(p, 1e-9, 1 - 1e-9)
ll += is_correct * np.log(p) + (1 - is_correct) * np.log(1 - p)
return ll
def estimate_student_ability(
self,
student_id: str,
responses: list,
prior_mean: float = 0.0,
prior_std: float = 1.0
) -> float:
"""
MAP estimation dell'ability di uno studente.
Usa prior Gaussiano N(0,1) per regolarizzazione.
"""
def negative_map(theta_array):
theta = theta_array[0]
ll = self.log_likelihood_student(theta, responses)
# Log prior Gaussiano
log_prior = -0.5 * ((theta - prior_mean) / prior_std) ** 2
return -(ll + log_prior)
result = minimize(
negative_map,
x0=[0.0],
method='L-BFGS-B',
bounds=[(-4.0, 4.0)]
)
ability = result.x[0]
self.student_abilities[student_id] = ability
return ability
def next_item_cat(
self,
student_id: str,
available_items: list,
strategy: str = 'max_information'
) -> str:
"""
Computerized Adaptive Testing: seleziona il prossimo item ottimale.
Strategie disponibili:
- 'max_information': massimizza l'informazione di Fisher al theta corrente
- 'target_difficulty': seleziona item con b vicino a theta
"""
theta = self.student_abilities.get(student_id, 0.0)
best_item = None
best_score = -np.inf
for item_id in available_items:
if item_id not in self.item_params:
continue
a = self.item_params[item_id]['a']
b = self.item_params[item_id]['b']
if strategy == 'max_information':
# Informazione di Fisher per il modello 2PL
p = self.probability_correct(theta, a, b)
q = 1 - p
fisher_info = (a ** 2) * p * q
score = fisher_info
elif strategy == 'target_difficulty':
# Item più vicino al livello dello studente
score = -abs(b - theta)
if score > best_score:
best_score = score
best_item = item_id
return best_item
def calibrate_items_mle(self, response_matrix: np.ndarray) -> None:
"""
Calibra i parametri degli item usando Maximum Likelihood.
response_matrix: (n_students, n_items) con valori 0/1
Implementazione semplificata - in produzione usa py-irt o mirt.
"""
# Stima iniziale delle abilita con proporzione di risposte corrette
n_students, n_items = response_matrix.shape
abilities = np.zeros(n_students)
for iteration in range(20): # EM iterations
# E-step: aggiorna abilita dato params items
for s in range(n_students):
responses = [
(f'item_{i}', int(response_matrix[s, i]))
for i in range(n_items)
if f'item_{i}' in self.item_params
]
if responses:
abilities[s] = self.estimate_student_ability(
f'student_{s}', responses
)
# M-step: aggiorna params items dato abilita
for i in range(n_items):
item_id = f'item_{i}'
# Stima semplificata: in produzione usa marginal MLE
p_mean = response_matrix[:, i].mean()
b_estimate = -np.log(p_mean / (1 - p_mean + 1e-9))
self.item_params[item_id] = {
'a': 1.0, # discrimination iniziale
'b': np.clip(b_estimate, -3.0, 3.0)
}
# Esempio di utilizzo
model = IRTModel2PL()
# Calibra con dati storici
import numpy as np
np.random.seed(42)
responses = np.random.binomial(1, 0.6, (500, 50))
model.calibrate_items_mle(responses)
# Stima ability di un nuovo studente
student_responses = [
('item_0', 1), ('item_5', 0), ('item_10', 1),
('item_15', 1), ('item_20', 0)
]
ability = model.estimate_student_ability('student_new', student_responses)
print(f"Estimated ability: {ability:.3f}") # es: 0.342
# Seleziona il prossimo item
next_item = model.next_item_cat('student_new', [f'item_{i}' for i in range(50)])
print(f"Next optimal item: {next_item}")
2. 베이지안 지식 추적: 시간에 따른 진행 모델링
IRT가 학생의 능력을 순간적으로 촬영하는 동안, 베이지안 지식 추적(BKT) 학습을 통해 시간이 지남에 따라 지식이 어떻게 변하는지 모델링합니다. 1994년 Corbett과 Anderson에 의해 소개되었으며, BKT는 해석 가능성으로 인해 적응 시스템의 주류로 남아 있습니다.
BKT는 지식 구성요소(KC)에 대한 4개의 매개변수가 있는 은닉 마르코프 모델입니다.
- 피(L0): 학생이 KC를 알고 있을 초기 확률
- 피(티): 전환 확률(연습 기회 이후 KC 학습)
- 피(G): 추측할 확률(모르면서도 정답을 맞추는 경우)
- 추신): 미끄러질 확률(알고 있음에도 불구하고 오답)
from dataclasses import dataclass
from typing import Optional
import json
@dataclass
class BKTParams:
"""Parametri BKT per un knowledge component."""
p_learn: float # P(L0) - prior knowledge
p_transit: float # P(T) - learning rate
p_guess: float # P(G) - guess rate
p_slip: float # P(S) - slip rate
kc_id: str # Knowledge Component ID
class BKTTracker:
"""
Bayesian Knowledge Tracing tracker per singolo studente.
Aggiorna la probabilità di padronanza dopo ogni risposta.
"""
def __init__(self, params: BKTParams):
self.params = params
self.p_mastery = params.p_learn # Stima corrente di padronanza
self.history = []
def update(self, is_correct: bool) -> float:
"""
Aggiorna la stima di padronanza dopo una risposta.
Restituisce la nuova probabilità di padronanza.
Passi:
1. Calcola P(correct | mastery) e P(correct | !mastery)
2. Applica Bayes per aggiornare P(mastery | correct/incorrect)
3. Applica la transizione di apprendimento
"""
p = self.p_mastery
# Step 1: Calcola probabilità di osservare la risposta
if is_correct:
# P(correct | mastery) = 1 - P(slip)
p_obs_given_know = 1 - self.params.p_slip
# P(correct | !mastery) = P(guess)
p_obs_given_not = self.params.p_guess
else:
# P(incorrect | mastery) = P(slip)
p_obs_given_know = self.params.p_slip
# P(incorrect | !mastery) = 1 - P(guess)
p_obs_given_not = 1 - self.params.p_guess
# Step 2: Posterior di padronanza (formula di Bayes)
p_correct_total = (
p_obs_given_know * p +
p_obs_given_not * (1 - p)
)
# Evita divisione per zero
if p_correct_total < 1e-10:
p_posterior = p
else:
p_posterior = (p_obs_given_know * p) / p_correct_total
# Step 3: Applica transizione di apprendimento
p_new = p_posterior + (1 - p_posterior) * self.params.p_transit
self.p_mastery = p_new
self.history.append({
'response': is_correct,
'p_mastery_before': p,
'p_mastery_after': p_new
})
return p_new
def is_mastered(self, threshold: float = 0.95) -> bool:
"""Ritorna True se la padronanza supera la soglia."""
return self.p_mastery >= threshold
def recommended_action(self) -> str:
"""Raccomanda l'azione successiva basata sullo stato corrente."""
p = self.p_mastery
if p >= 0.95:
return 'advance' # Procedi al KC successivo
elif p >= 0.70:
return 'practice' # Pratica ulteriore
elif p >= 0.40:
return 'hint' # Offri un hint
else:
return 'remediation' # Torna ai prerequisiti
class BKTSystem:
"""Sistema BKT multi-studente multi-KC."""
def __init__(self, kc_params: dict):
"""
kc_params: {kc_id: BKTParams}
"""
self.kc_params = kc_params
self.trackers = {} # {(student_id, kc_id): BKTTracker}
def get_tracker(self, student_id: str, kc_id: str) -> BKTTracker:
"""Recupera o crea un tracker per student/KC."""
key = (student_id, kc_id)
if key not in self.trackers:
if kc_id not in self.kc_params:
raise ValueError(f"KC {kc_id} non trovato")
self.trackers[key] = BKTTracker(self.kc_params[kc_id])
return self.trackers[key]
def process_response(
self,
student_id: str,
kc_id: str,
is_correct: bool
) -> dict:
"""Processa una risposta e restituisce lo stato aggiornato."""
tracker = self.get_tracker(student_id, kc_id)
p_mastery = tracker.update(is_correct)
return {
'student_id': student_id,
'kc_id': kc_id,
'p_mastery': round(p_mastery, 4),
'is_mastered': tracker.is_mastered(),
'recommended_action': tracker.recommended_action()
}
def get_mastery_profile(self, student_id: str) -> dict:
"""Restituisce il profilo completo di padronanza di uno studente."""
profile = {}
for (sid, kc_id), tracker in self.trackers.items():
if sid == student_id:
profile[kc_id] = {
'p_mastery': round(tracker.p_mastery, 4),
'is_mastered': tracker.is_mastered(),
'attempts': len(tracker.history)
}
return profile
# Esempio configurazione sistema
kc_params = {
'algebra_linear': BKTParams(
p_learn=0.20, # 20% conosce gia
p_transit=0.15, # 15% impara per ogni pratica
p_guess=0.25, # 25% probabilità di indovinare
p_slip=0.10, # 10% probabilità di sbagliare pur sapendo
kc_id='algebra_linear'
),
'calcolo_derivate': BKTParams(
p_learn=0.10,
p_transit=0.12,
p_guess=0.15,
p_slip=0.08,
kc_id='calcolo_derivate'
)
}
bkt_system = BKTSystem(kc_params)
# Simula una sessione di apprendimento
responses_sequence = [False, False, True, True, True, True, False, True]
for i, correct in enumerate(responses_sequence):
result = bkt_system.process_response('student_42', 'algebra_linear', correct)
print(f"Risposta {i+1}: {'corretta' if correct else 'errata'} -> "
f"P(mastery)={result['p_mastery']:.3f}, "
f"Azione: {result['recommended_action']}")
3. 심층 지식 추적: 신경 접근법
Il 심층 지식 추적(DKT), Piech et al.에 의해 소개되었습니다. 2015년에는 대체 종속성을 포착할 수 있는 순환 신경망을 사용한 BKT의 단순화된 가정 지식 구성요소 사이의 복합체. DKT는 데이터 세트에서 BKT보다 20-30% 더 높은 정확도를 달성합니다. 2025년 연구에 따르면 실제입니다(최근 연구의 예측 정확도 87.5%).
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
class DKTModel(nn.Module):
"""
Deep Knowledge Tracing con LSTM.
Input: sequenza di (item_id, risposta) encodata
Output: probabilità di risposta corretta per ogni item
"""
def __init__(
self,
n_items: int,
hidden_size: int = 128,
n_layers: int = 2,
dropout: float = 0.2
):
super().__init__()
self.n_items = n_items
self.hidden_size = hidden_size
# Input encoding: item_id * 2 (correct/incorrect)
# Ogni item ha 2 rappresentazioni: risposta corretta e risposta errata
self.input_size = n_items * 2
self.lstm = nn.LSTM(
input_size=self.input_size,
hidden_size=hidden_size,
num_layers=n_layers,
dropout=dropout if n_layers > 1 else 0,
batch_first=True
)
self.output_layer = nn.Sequential(
nn.Linear(hidden_size, n_items),
nn.Sigmoid()
)
def encode_input(
self,
item_ids: torch.Tensor,
responses: torch.Tensor
) -> torch.Tensor:
"""
Encoding one-hot per (item, response).
item_ids: (batch, seq_len) - ID degli item
responses: (batch, seq_len) - 0 o 1
Restituisce: (batch, seq_len, n_items * 2)
"""
batch_size, seq_len = item_ids.shape
x = torch.zeros(batch_size, seq_len, self.input_size)
# Item risposto correttamente: posizione item_id
# Item risposto erroneamente: posizione item_id + n_items
for b in range(batch_size):
for t in range(seq_len):
iid = item_ids[b, t].item()
r = responses[b, t].item()
if r == 1:
x[b, t, iid] = 1.0
else:
x[b, t, iid + self.n_items] = 1.0
return x
def forward(
self,
item_ids: torch.Tensor,
responses: torch.Tensor
) -> torch.Tensor:
"""
Forward pass.
Restituisce predizioni per il prossimo item in sequenza.
Output shape: (batch, seq_len, n_items)
"""
x = self.encode_input(item_ids, responses)
lstm_out, _ = self.lstm(x)
predictions = self.output_layer(lstm_out)
return predictions
class DKTDataset(Dataset):
"""Dataset per DKT - sequenze di interazioni studente."""
def __init__(
self,
sequences: list,
max_seq_len: int = 200,
pad_value: int = -1
):
"""
sequences: lista di dizionari
{'item_ids': [...], 'responses': [...]}
"""
self.sequences = sequences
self.max_seq_len = max_seq_len
self.pad_value = pad_value
def __len__(self):
return len(self.sequences)
def __getitem__(self, idx):
seq = self.sequences[idx]
item_ids = seq['item_ids'][:self.max_seq_len]
responses = seq['responses'][:self.max_seq_len]
seq_len = len(item_ids)
# Padding
pad_len = self.max_seq_len - seq_len
item_ids = item_ids + [0] * pad_len
responses = responses + [0] * pad_len
mask = [1] * seq_len + [0] * pad_len
return {
'item_ids': torch.tensor(item_ids, dtype=torch.long),
'responses': torch.tensor(responses, dtype=torch.float),
'mask': torch.tensor(mask, dtype=torch.bool),
'seq_len': seq_len
}
def train_dkt(
model: DKTModel,
train_loader: DataLoader,
val_loader: DataLoader,
n_epochs: int = 50,
lr: float = 1e-3
) -> dict:
"""Training loop per DKT con early stopping."""
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.BCELoss(reduction='none')
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, patience=5, factor=0.5
)
best_val_auc = 0.0
history = {'train_loss': [], 'val_auc': []}
for epoch in range(n_epochs):
model.train()
total_loss = 0.0
for batch in train_loader:
item_ids = batch['item_ids']
responses = batch['responses']
mask = batch['mask']
# Predici su t-1, target su t
# input: sequenza[:-1], target: sequenza[1:]
pred = model(item_ids[:, :-1], responses[:, :-1])
# Raccoglie le predizioni per gli item effettivi
target_items = item_ids[:, 1:]
target_responses = responses[:, 1:]
target_mask = mask[:, 1:]
# Estrai predizione per l'item corretto
batch_size, seq_len, n_items = pred.shape
pred_flat = pred.reshape(-1, n_items)
target_items_flat = target_items.reshape(-1)
pred_selected = pred_flat.gather(
1, target_items_flat.unsqueeze(1)
).squeeze(1)
# Loss mascherata (ignora padding)
loss = criterion(
pred_selected,
target_responses.reshape(-1)
)
mask_flat = target_mask.reshape(-1).float()
loss = (loss * mask_flat).sum() / mask_flat.sum()
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
history['train_loss'].append(avg_loss)
# Validation
val_auc = evaluate_dkt(model, val_loader)
history['val_auc'].append(val_auc)
scheduler.step(1 - val_auc) # Minimizza 1-AUC
if val_auc > best_val_auc:
best_val_auc = val_auc
torch.save(model.state_dict(), 'best_dkt_model.pt')
print(f"Epoch {epoch+1}/{n_epochs} - "
f"Loss: {avg_loss:.4f}, Val AUC: {val_auc:.4f}")
return history
4. 학습 신호를 위한 특성 엔지니어링
기능의 품질은 종종 모델의 아키텍처보다 더 중요합니다. 적응형 시스템 프로덕션에서는 단순한 쌍(항목, 올바른/잘못된 항목)을 넘어 컨텍스트를 캡처해야 합니다. 학습 상호 작용이 풍부합니다.
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
import math
@dataclass
class LearningInteraction:
"""Una singola interazione di apprendimento."""
student_id: str
item_id: str
kc_ids: list # Knowledge components coinvolti
is_correct: bool
response_time_ms: int # Tempo di risposta in millisecondi
timestamp: datetime
hint_used: bool = False
attempt_number: int = 1
confidence_level: Optional[int] = None # 1-5 se richiesto
@dataclass
class StudentFeatures:
"""Feature aggregate per uno studente su un KC."""
student_id: str
kc_id: str
# Accuracy features
total_attempts: int = 0
correct_count: int = 0
recent_accuracy: float = 0.0 # Ultimi 5 tentativi
streak_correct: int = 0 # Sequenza corrente di corrette
streak_incorrect: int = 0
# Time features
avg_response_time: float = 0.0
response_time_trend: float = 0.0 # Positivo = rallentamento
time_since_last_practice: float = 0.0 # In ore
# Engagement features
hint_rate: float = 0.0 # Proporzione con hint
first_attempt_accuracy: float = 0.0 # Solo primi tentativi
# Memory features (Ebbinghaus forgetting curve)
estimated_retention: float = 1.0
class FeatureExtractor:
"""
Estrae feature per il sistema adattivo da sequenze di interazioni.
"""
def __init__(self, decay_factor: float = 0.3):
"""
decay_factor: controlla la decadenza della curva di dimenticanza
"""
self.decay_factor = decay_factor
def ebbinghaus_retention(
self,
days_since_last: float,
memory_strength: float = 1.0
) -> float:
"""
Modello di dimenticanza di Ebbinghaus.
R = exp(-t / (S * stability_factor))
"""
if days_since_last <= 0:
return 1.0
stability = max(memory_strength, 0.1)
return math.exp(-days_since_last / (stability * 30))
def compute_features(
self,
student_id: str,
kc_id: str,
interactions: list
) -> StudentFeatures:
"""
Calcola le feature per uno studente su un KC.
interactions: lista di LearningInteraction ordinate per timestamp
"""
kc_interactions = [
i for i in interactions
if kc_id in i.kc_ids and i.student_id == student_id
]
if not kc_interactions:
return StudentFeatures(student_id=student_id, kc_id=kc_id)
features = StudentFeatures(student_id=student_id, kc_id=kc_id)
# Accuracy
features.total_attempts = len(kc_interactions)
features.correct_count = sum(1 for i in kc_interactions if i.is_correct)
# Recent accuracy (finestra ultimi 5)
recent = kc_interactions[-5:]
if recent:
features.recent_accuracy = sum(1 for i in recent if i.is_correct) / len(recent)
# Streak
streak_c = 0
streak_i = 0
for interaction in reversed(kc_interactions):
if interaction.is_correct:
if streak_i == 0:
streak_c += 1
else:
break
else:
if streak_c == 0:
streak_i += 1
else:
break
features.streak_correct = streak_c
features.streak_incorrect = streak_i
# Response time
times = [i.response_time_ms for i in kc_interactions]
features.avg_response_time = sum(times) / len(times)
# Trend risposta (positivo = rallentamento)
if len(times) >= 3:
recent_avg = sum(times[-3:]) / 3
older_avg = sum(times[:-3]) / max(len(times) - 3, 1)
features.response_time_trend = (recent_avg - older_avg) / max(older_avg, 1)
# Tempo dall'ultima pratica
last_practice = kc_interactions[-1].timestamp
now = datetime.now()
hours_since = (now - last_practice).total_seconds() / 3600
features.time_since_last_practice = hours_since
# Hint rate
features.hint_rate = sum(1 for i in kc_interactions if i.hint_used) / len(kc_interactions)
# First attempt accuracy
first_attempts = [i for i in kc_interactions if i.attempt_number == 1]
if first_attempts:
features.first_attempt_accuracy = (
sum(1 for i in first_attempts if i.is_correct) / len(first_attempts)
)
# Retention stimata
days_since = hours_since / 24
memory_strength = features.first_attempt_accuracy * 2 # 0-2
features.estimated_retention = self.ebbinghaus_retention(days_since, memory_strength)
return features
def compute_next_review_time(self, features: StudentFeatures) -> datetime:
"""
Calcola il momento ottimale per la prossima revisione (spaced repetition).
Ispirato all'algoritmo SM-2 di SuperMemo.
"""
accuracy = features.first_attempt_accuracy
attempts = features.total_attempts
# Intervallo di base in giorni
if attempts <= 1:
interval_days = 1
elif attempts == 2:
interval_days = 3
else:
# Ease factor basato sull'accuracy
ease = 1.3 + (2.5 - 1.3) * accuracy
interval_days = max(1, round(
features.time_since_last_practice / 24 * ease
))
return datetime.now() + timedelta(days=interval_days)
5. 생산 시스템의 아키텍처
프로덕션 환경의 적응형 시스템은 아래의 대기 시간으로 초당 수백 개의 요청을 처리해야 합니다. 100ms. 다음 아키텍처는 온라인 경로 (실시간으로 제공) 에서오프라인 경로 (일괄 교육 및 교정)
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio
import aioredis
import asyncpg
app = FastAPI(title="Adaptive Learning API")
# ---- Modelli Pydantic ----
class ResponseRequest(BaseModel):
student_id: str
item_id: str
is_correct: bool
response_time_ms: int
hint_used: bool = False
class NextItemResponse(BaseModel):
item_id: str
estimated_difficulty: float
reason: str
mastery_state: dict
# ---- Servizi ----
class AdaptiveEngine:
"""
Engine principale per raccomandazioni adattive.
Usa Redis per feature cache e PostgreSQL per persistenza.
"""
def __init__(self, redis_client, db_pool):
self.redis = redis_client
self.db = db_pool
self.bkt_system = None # Inizializzato al startup
self.irt_model = None
async def process_response(
self,
request: ResponseRequest,
background_tasks: BackgroundTasks
) -> dict:
"""
Pipeline di processing per una risposta.
1. Persiste la risposta nel DB (async)
2. Aggiorna lo stato BKT dalla cache Redis
3. Seleziona il prossimo item
4. Schedula aggiornamento features in background
"""
student_id = request.student_id
item_id = request.item_id
# 1. Get KCs per questo item
kc_ids = await self.get_item_kcs(item_id)
# 2. Aggiorna BKT per ogni KC (via Redis)
mastery_updates = {}
for kc_id in kc_ids:
cache_key = f"bkt:{student_id}:{kc_id}"
# Recupera stato dalla cache
cached = await self.redis.get(cache_key)
if cached:
p_mastery = float(cached)
else:
# Fallback al DB
p_mastery = await self.get_mastery_from_db(student_id, kc_id)
# Aggiorna con nuova risposta
tracker = BKTTracker(await self.get_bkt_params(kc_id))
tracker.p_mastery = p_mastery
new_mastery = tracker.update(request.is_correct)
# Salva in Redis con TTL 24h
await self.redis.setex(cache_key, 86400, str(new_mastery))
mastery_updates[kc_id] = new_mastery
# 3. Seleziona prossimo item
next_item = await self.select_next_item(student_id, mastery_updates)
# 4. Background: persisti nel DB
background_tasks.add_task(
self.persist_interaction,
request,
mastery_updates
)
return {
'mastery_updates': mastery_updates,
'next_item': next_item
}
async def select_next_item(
self,
student_id: str,
mastery_updates: dict
) -> dict:
"""
Seleziona il prossimo item usando una strategia multi-obiettivo:
- Bilancia apprendimento di nuovi KC vs rinforzo di quelli deboli
- Considera la curva di dimenticanza
- Rispetta il curriculum graph
"""
# Ottieni profilo completo del curriculum
curriculum = await self.get_curriculum_graph(student_id)
# Calcola priorità per ogni KC disponibile
priorities = {}
for kc_id, kc_data in curriculum.items():
mastery = mastery_updates.get(kc_id, kc_data.get('mastery', 0.0))
# Priorità alta per KC quasi-mastered (0.7-0.9) - spingili oltre la soglia
if 0.70 <= mastery < 0.95:
priority = 2.0 * (mastery - 0.70) / 0.25
# Priorità media per KC molto deboli - ma non sopraffatti
elif mastery < 0.30:
priority = 0.5
# KC mastered - bassa priorità, solo spaced repetition
elif mastery >= 0.95:
retention = kc_data.get('estimated_retention', 1.0)
priority = (1 - retention) * 0.3
else:
priority = 1.0
priorities[kc_id] = priority
# Seleziona KC con più alta priorità
if not priorities:
raise HTTPException(status_code=404, detail="No available KCs")
target_kc = max(priorities, key=priorities.get)
# Seleziona item per questo KC usando IRT
available_items = await self.get_items_for_kc(target_kc)
student_ability = await self.get_student_ability(student_id)
# CAT selection
optimal_item = self.irt_model.next_item_cat(
student_id, available_items, strategy='max_information'
)
item_params = self.irt_model.item_params.get(optimal_item, {'a': 1.0, 'b': 0.0})
return {
'item_id': optimal_item,
'kc_id': target_kc,
'estimated_difficulty': item_params.get('b', 0.0),
'student_ability': student_ability,
'mastery_before': mastery_updates.get(target_kc, 0.0),
'priority_reason': 'near_mastery' if priorities[target_kc] > 1.5 else 'standard'
}
async def get_item_kcs(self, item_id: str) -> list:
"""Recupera i KC associati a un item dal DB."""
async with self.db.acquire() as conn:
rows = await conn.fetch(
"SELECT kc_id FROM item_kc_mapping WHERE item_id = $1",
item_id
)
return [row['kc_id'] for row in rows]
# ---- Route FastAPI ----
@app.post("/api/v1/adaptive/response")
async def process_response(
request: ResponseRequest,
background_tasks: BackgroundTasks,
engine: AdaptiveEngine = None # Dipendenza iniettata
) -> NextItemResponse:
"""
Processa una risposta e restituisce il prossimo item adattivo.
Target latency: <100ms (p95)
"""
result = await engine.process_response(request, background_tasks)
next_item = result['next_item']
return NextItemResponse(
item_id=next_item['item_id'],
estimated_difficulty=next_item['estimated_difficulty'],
reason=next_item['priority_reason'],
mastery_state=result['mastery_updates']
)
6. 적응형 알고리즘을 위한 A/B 테스트
프로덕션에서 새로운 알고리즘을 검증하려면 다음을 존중하는 A/B 테스트 프레임워크가 필요합니다. 교육의 특징: 학생들은 상호 교환이 불가능하며, 학습의 효과 이는 누적되며 관련 측정항목은 클릭률을 뛰어넘습니다.
import hashlib
from enum import Enum
from typing import Callable
import scipy.stats as stats
class ExperimentVariant(Enum):
CONTROL = "control"
TREATMENT = "treatment"
class ABTestManager:
"""
Framework A/B testing per algoritmi adattivi.
Usa student_id hashing per assegnazione deterministica.
"""
def __init__(self, experiment_id: str, treatment_fraction: float = 0.5):
self.experiment_id = experiment_id
self.treatment_fraction = treatment_fraction
self.metrics = {
ExperimentVariant.CONTROL: [],
ExperimentVariant.TREATMENT: []
}
def assign_variant(self, student_id: str) -> ExperimentVariant:
"""
Assegna uno studente a control o treatment in modo deterministico.
Lo stesso studente riceve sempre la stessa variante (no leakage).
"""
hash_input = f"{self.experiment_id}:{student_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
bucket = (hash_value % 100) / 100.0
return (
ExperimentVariant.TREATMENT
if bucket < self.treatment_fraction
else ExperimentVariant.CONTROL
)
def record_metric(
self,
student_id: str,
metric_value: float,
metric_name: str = 'mastery_gain'
):
"""Registra una metrica per uno studente."""
variant = self.assign_variant(student_id)
self.metrics[variant].append(metric_value)
def analyze_results(self) -> dict:
"""
Analisi statistica con Welch's t-test.
Adatto a campioni di dimensioni diverse.
"""
control = self.metrics[ExperimentVariant.CONTROL]
treatment = self.metrics[ExperimentVariant.TREATMENT]
if len(control) < 30 or len(treatment) < 30:
return {'error': 'Campione insufficiente (min 30 per variante)'}
t_stat, p_value = stats.ttest_ind(
treatment, control, equal_var=False
)
control_mean = sum(control) / len(control)
treatment_mean = sum(treatment) / len(treatment)
relative_lift = (treatment_mean - control_mean) / max(abs(control_mean), 1e-9)
# Cohen's d per effect size
pooled_std = (
(sum((x - control_mean)**2 for x in control) +
sum((x - treatment_mean)**2 for x in treatment)) /
(len(control) + len(treatment) - 2)
) ** 0.5
cohens_d = (treatment_mean - control_mean) / max(pooled_std, 1e-9)
return {
'experiment_id': self.experiment_id,
'control_n': len(control),
'treatment_n': len(treatment),
'control_mean': round(control_mean, 4),
'treatment_mean': round(treatment_mean, 4),
'relative_lift': round(relative_lift * 100, 2), # Percentuale
'p_value': round(p_value, 4),
'statistically_significant': p_value < 0.05,
'cohens_d': round(cohens_d, 3),
'practical_significance': abs(cohens_d) > 0.2,
'recommendation': (
'Deploy treatment' if p_value < 0.05 and cohens_d > 0.2
else 'Keep control' if p_value < 0.05 and cohens_d < -0.2
else 'No significant difference'
)
}
# Metriche chiave per A/B testing in EdTech
EDTECH_METRICS = [
'mastery_gain_per_session', # Incremento P(mastery) per sessione
'time_to_mastery', # Minuti necessari per raggiungere P(mastery)>=0.95
'session_completion_rate', # % sessioni completate
'return_rate_7d', # % studenti che tornano entro 7 giorni
'assessment_accuracy_post', # Accuratezza nei test formali post-training
]
7. 모니터링 및 드리프트 감지
교육용 모델에는 다음이 적용됩니다. 컨셉 드리프트 다양한 이유로: 변경 커리큘럼, 학업 계절성, 교재 업데이트. 모니터링 시스템 시간이 지남에 따라 예측 품질을 유지하는 데 강력하고 필수적입니다.
적응 시스템의 드리프트 신호
- 정확도 드리프트: 현 기간 동안 DKT 모델의 AUC가 0.70 이하로 하락
- 교정 드리프트: 확률 버킷이 관찰된 정확도와 일치하지 않습니다.
- 특징 드리프트: 응답시간의 분포가 크게 변화함
- 라벨 이동: 안정적인 코호트에 대한 정답 비율 변경
from collections import deque
import numpy as np
class ModelMonitor:
"""
Monitor per rilevamento drift in modelli adattivi.
Implementa ADWIN (Adaptive Windowing) per concept drift detection.
"""
def __init__(
self,
window_size: int = 1000,
drift_threshold: float = 0.02,
alert_callback: Callable = None
):
self.window = deque(maxlen=window_size)
self.drift_threshold = drift_threshold
self.alert_callback = alert_callback
self.baseline_auc = None
def add_prediction(self, predicted_prob: float, actual_label: int):
"""Aggiunge una predizione al buffer di monitoraggio."""
self.window.append((predicted_prob, actual_label))
if len(self.window) >= 100:
self._check_drift()
def _check_drift(self):
"""Verifica il drift confrontando finestre recenti vs storiche."""
window_list = list(self.window)
n = len(window_list)
half = n // 2
# Calcola AUC su prima e seconda meta
first_half = window_list[:half]
second_half = window_list[half:]
auc_first = self._compute_auc(first_half)
auc_second = self._compute_auc(second_half)
drift_magnitude = abs(auc_first - auc_second)
if drift_magnitude > self.drift_threshold:
alert = {
'type': 'concept_drift',
'auc_old': round(auc_first, 4),
'auc_new': round(auc_second, 4),
'magnitude': round(drift_magnitude, 4),
'recommended_action': (
'retrain' if drift_magnitude > 0.10
else 'monitor_closely'
)
}
if self.alert_callback:
self.alert_callback(alert)
def _compute_auc(self, predictions: list) -> float:
"""Calcola AUC-ROC per una lista di (predicted_prob, actual_label)."""
if not predictions:
return 0.5
sorted_preds = sorted(predictions, key=lambda x: x[0], reverse=True)
n_pos = sum(1 for _, y in predictions if y == 1)
n_neg = len(predictions) - n_pos
if n_pos == 0 or n_neg == 0:
return 0.5
tpr_points = []
fpr_points = []
tp = fp = 0
for prob, label in sorted_preds:
if label == 1:
tp += 1
else:
fp += 1
tpr_points.append(tp / n_pos)
fpr_points.append(fp / n_neg)
# Trapezoidal integration
auc = 0.0
for i in range(1, len(fpr_points)):
auc += (fpr_points[i] - fpr_points[i-1]) * (tpr_points[i] + tpr_points[i-1]) / 2
return auc
안티 패턴: 잘못된 측정항목 최적화
적응형 시스템에서 가장 교활한 위험은 참여 지표를 최적화하는 것입니다. 실제 학습 결과보다는 (플랫폼에서의 시간, 클릭, 세션). 플랫폼에서 시간을 최대화하는 시스템은 쉬운 루프를 생성할 수 있습니다. 학생들을 편안한 곳으로 밀어붙이는 대신 편안한 곳으로 유지하는 만족감 숙달. 구축하기 전에 교육학적인 용어로 성공 지표를 정의하십시오.
프로덕션 환경의 적응형 시스템 모범 사례
- 콜드 스타트: 이력이 없는 신입생의 경우 간단한 진단 평가를 사용하세요. 적응형 알고리즘을 활성화하기 전에 5~10개의 항목을 선택하세요. 평평한 사전은 시퀀스로 이어집니다. 교육학적으로 유효하지 않습니다.
- 해석 가능성: 항상 각각에 대해 읽기 쉬운 설명을 구현하십시오. 추천. "당신이 파생에 어려움을 겪고 있기 때문에 우리는 이 연습을 제안합니다. 복합 함수"이며 학생의 자신감에 필수적입니다.
- 다양성: 항상 동일한 유형의 항목을 추천하지 마십시오. 통합 "필터 버블" 효과를 방지하기 위한 선택 기능의 다양성 매개변수.
- 커리큘럼 제약: 커리큘럼 그래프는 권장 사항을 제한해야 합니다. 아직 도함수를 마스터하지 않은 학생에게 독립적으로 적분을 제안하지 마세요. 알고리즘이 제안하는 것에서.
- 인간의 감독: 교사는 권장 사항을 무시할 수 있어야 합니다. 그리고 수업 진행 상황에 대한 보고를 받습니다.
결론
적응형 학습 알고리즘은 가장 매력적인 분야 중 하나입니다. 머신러닝과 교육과학의 교차점에서 수학적 이론으로부터 IRT에서 Redis 및 FastAPI를 사용하여 확장 가능한 아키텍처로, 연구에서 생산까지의 여정 계산 효율성뿐만 아니라 교육학적 타당성에도 주의가 필요합니다.
결과는 구체적입니다. 잘 설계된 시스템은 20~40%의 개선을 보여줍니다. 정적 선형 경로와 비교하여 숙달하는 데 걸리는 시간. 신경모델의 발전으로 DKT 및 LLM과의 통합과 같이 이 분야는 개인교습 방향으로 빠르게 발전하고 있습니다. 점점 더 정교해지는 개인화.
EdTech 시리즈 관련 기사
- 기사 00: 확장 가능한 LMS 아키텍처: 다중 테넌트 패턴
- 기사 04: LLM 및 RAG를 통한 맞춤형 교사
- 기사 06: xAPI 및 Kafka를 사용한 분석 학습







