학습 분석: xAPI 및 Kafka를 사용한 데이터 파이프라인
모든 클릭, 모든 오답, 모든 비디오 일시 중지 및 모든 운동 중단 학생의 학습 과정에 관해 중요한 내용을 알려줍니다. 플랫폼 현대 교육 기술에서는 매일 수십억 개의 이벤트를 수집하지만 대부분의 경우 그것들을 버리거나 사소한 집계 통계에만 사용합니다. “코스 X는 70%의 완료". 이는 데이터 낭비이며 기회를 놓친 것입니다.
학습 분석 그리고 이 행동 데이터를 변환하는 분야 실행 가능한 통찰력: 어떤 학생이 과정을 그만둘지 예측하고, 가장 난이도가 높은 콘텐츠를 실시간으로 학습 경로를 최적화합니다. 하지만 이를 위해서는 두 가지 기본 기술 요소가 필요합니다. 데이터 표준 상호 운용성(xAPI)을 허용하고하부 구조 관리할 수 있는 대용량 스트리밍 이벤트(Apache Kafka).
이 기사에서 우리는 완전한 학습 분석 파이프라인을 구축할 것입니다. 클라이언트의 xAPI 문부터 Kafka를 통한 실시간 수집, 처리까지 교사를 위한 대시보드까지 행동 패턴을 탐지하기 위해 Flink를 사용합니다.
이 기사에서 배울 내용
- xAPI(Experience API / Tin Can): 명령문 구조 및 모범 사례
- 학습 기록 저장소(LRS): 아키텍처 및 솔루션 선택
- 대규모 교육 이벤트를 위한 Kafka 파이프라인
- 병렬 및 내결함성 처리를 위한 소비자 그룹
- 자퇴위험 학생 조기발견
- Kafka Streams를 사용한 실시간 집계
- 주요 지표가 포함된 교사 분석 대시보드
- 학습 데이터 수집 시 GDPR과 개인정보 보호
1. xAPI: 학습 데이터의 표준
xAPI(Experience API, "Tin Can API"라고도 함)는 학습 경험의 표현. SCORM의 한계를 극복하다 (폐쇄된 LMS에서는 "완료/완료되지 않음"만 추적함) 허용 온라인, 오프라인, 내부 또는 외부의 학습 활동을 추적합니다. 표준화되고 상호 운용 가능한 어휘를 갖춘 LMS에서.
xAPI 문의 기본 구조는 간단합니다. 배우 (WHO), 동사 (그는 무엇을 했는지), 물체 (무엇에 대해). 선택적으로: 결과 (어떤 결과로), 문맥 (어떤 맥락에서) 이자형 타임스탬프. 이 주어-동사-목적어 구조 모든 교육 시나리오를 다룰 수 있을 만큼 직관적이고 유연합니다.
# xapi/statement_builder.py
from dataclasses import dataclass, field, asdict
from typing import Optional, Dict, Any, List
from datetime import datetime, timezone
import uuid
@dataclass
class XAPIActor:
"""Chi compie l'azione."""
mbox: str # mailto:user@example.com
name: str
objectType: str = "Agent"
@dataclass
class XAPIVerb:
"""Cosa ha fatto l'attore. Usa vocabolari standardizzati (ADL, TinCan)."""
id: str # URI del verbo, es: http://adlnet.gov/expapi/verbs/completed
display: Dict[str, str] # {"en-US": "completed", "it-IT": "completato"}
@dataclass
class XAPIActivity:
"""Su cosa e stata compiuta l'azione."""
id: str # URI univoca dell'oggetto
objectType: str = "Activity"
definition: Optional[Dict] = None
@dataclass
class XAPIResult:
"""Risultato dell'azione (opzionale)."""
score: Optional[Dict] = None # {"scaled": 0.85, "raw": 85, "min": 0, "max": 100}
success: Optional[bool] = None
completion: Optional[bool] = None
duration: Optional[str] = None # ISO 8601: "PT5M30S" = 5 minuti 30 secondi
response: Optional[str] = None # Risposta testuale dello studente
@dataclass
class XAPIContext:
"""Contesto aggiuntivo."""
platform: str = "EdTech Platform"
language: str = "it-IT"
contextActivities: Optional[Dict] = None # Gerarchia: course -> module -> lesson
extensions: Optional[Dict] = None # Dati custom (device, browser, etc.)
@dataclass
class XAPIStatement:
actor: XAPIActor
verb: XAPIVerb
object: XAPIActivity
id: str = field(default_factory=lambda: str(uuid.uuid4()))
result: Optional[XAPIResult] = None
context: Optional[XAPIContext] = None
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
stored: Optional[str] = None
version: str = "1.0.3"
def to_dict(self) -> Dict:
return asdict(self)
# Verbi xAPI standard (ADL vocabulary)
XAPI_VERBS = {
"completed": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/completed",
display={"en-US": "completed", "it-IT": "completato"},
),
"attempted": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/attempted",
display={"en-US": "attempted", "it-IT": "tentato"},
),
"passed": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/passed",
display={"en-US": "passed", "it-IT": "superato"},
),
"failed": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/failed",
display={"en-US": "failed", "it-IT": "fallito"},
),
"experienced": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/experienced",
display={"en-US": "experienced", "it-IT": "esperienzato"},
),
"asked": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/asked",
display={"en-US": "asked", "it-IT": "chiesto"},
),
"interacted": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/interacted",
display={"en-US": "interacted", "it-IT": "interagito"},
),
}
class StatementBuilder:
"""Builder per statement xAPI con validazione."""
def __init__(self, platform_url: str, tenant_id: str):
self.platform_url = platform_url
self.tenant_id = tenant_id
def lesson_completed(
self,
student_id: str,
student_email: str,
lesson_id: str,
lesson_title: str,
course_id: str,
duration_seconds: int,
score_percent: Optional[float] = None,
) -> XAPIStatement:
return XAPIStatement(
actor=XAPIActor(
mbox=f"mailto:{student_email}",
name=student_id,
),
verb=XAPI_VERBS["completed"],
object=XAPIActivity(
id=f"{self.platform_url}/activities/{lesson_id}",
definition={
"name": {"it-IT": lesson_title},
"type": "http://adlnet.gov/expapi/activities/lesson",
},
),
result=XAPIResult(
completion=True,
duration=f"PT{duration_seconds}S",
score={"scaled": score_percent / 100, "raw": score_percent, "min": 0, "max": 100} if score_percent else None,
),
context=XAPIContext(
contextActivities={
"parent": [{"id": f"{self.platform_url}/activities/course/{course_id}"}],
"grouping": [{"id": f"{self.platform_url}/activities/tenant/{self.tenant_id}"}],
},
extensions={
"https://schema.example.com/extensions/student_id": student_id,
"https://schema.example.com/extensions/tenant_id": self.tenant_id,
},
),
)
def quiz_answered(
self,
student_id: str,
student_email: str,
question_id: str,
question_text: str,
student_response: str,
correct: bool,
time_spent_seconds: int,
) -> XAPIStatement:
verb = XAPI_VERBS["passed"] if correct else XAPI_VERBS["failed"]
return XAPIStatement(
actor=XAPIActor(mbox=f"mailto:{student_email}", name=student_id),
verb=verb,
object=XAPIActivity(
id=f"{self.platform_url}/activities/question/{question_id}",
definition={
"name": {"it-IT": question_text[:100]},
"type": "http://adlnet.gov/expapi/activities/cmi.interaction",
},
),
result=XAPIResult(
success=correct,
response=student_response[:500], # Tronca per privacy
duration=f"PT{time_spent_seconds}S",
),
)
2. 교육 이벤트를 위한 Kafka 파이프라인
수백만 명의 학생들이 동시에 이벤트를 생성하므로 다음을 보장하는 메시징 시스템: 고가용성, 학생당 정렬 보장 (같은 사건의 모든 사건 학생은 순서대로 처리되어야 함), e 다시 하다 (수 분석 논리를 변경할 때 과거 사건을 재작업합니다. Apache Kafka는 이러한 모든 요구 사항을 충족하며 프로덕션 환경에서 표준 선택입니다. 대용량 이벤트 파이프라인용.
# kafka/producer.py
import json
import logging
from typing import Optional
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
logger = logging.getLogger(__name__)
class LearningEventProducer:
"""
Producer Kafka per eventi xAPI.
Usa la chiave di partizione student_id per garantire ordine per studente.
"""
TOPIC = "edtech.xapi.statements"
def __init__(self, bootstrap_servers: str):
self.bootstrap_servers = bootstrap_servers
self._producer: Optional[AIOKafkaProducer] = None
async def start(self):
self._producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8") if k else None,
# Configurazione per affidabilità
acks="all", # Conferma da tutti i replica
enable_idempotence=True, # Exactly-once delivery
compression_type="gzip", # Compressione per ridurre banda
max_batch_size=64 * 1024, # 64KB batch
linger_ms=10, # Aspetta 10ms per batch più grandi
)
await self._producer.start()
logger.info(f"Kafka producer started, topic: {self.TOPIC}")
async def stop(self):
if self._producer:
await self._producer.stop()
async def send_statement(self, statement: "XAPIStatement") -> bool:
"""
Invia uno statement xAPI a Kafka.
La partition key e lo student_id per garantire ordine per studente.
"""
if not self._producer:
raise RuntimeError("Producer non avviato. Chiama start() prima.")
statement_dict = statement.to_dict()
student_id = statement_dict["actor"]["name"]
try:
await self._producer.send_and_wait(
topic=self.TOPIC,
key=student_id, # Partition key: stesso studente -> stessa partizione
value=statement_dict,
headers=[
("content-type", b"application/json"),
("schema-version", b"1.0.3"),
],
)
return True
except KafkaError as e:
logger.error(f"Errore invio statement Kafka: {e}", exc_info=True)
return False
async def send_batch(self, statements: list) -> int:
"""Invia un batch di statement. Ritorna il numero di statement inviati con successo."""
success_count = 0
async with self._producer.transaction():
for stmt in statements:
if await self.send_statement(stmt):
success_count += 1
return success_count
# kafka/consumer.py
import asyncio
import json
import logging
from typing import Callable, Awaitable
from aiokafka import AIOKafkaConsumer
logger = logging.getLogger(__name__)
class LearningEventConsumer:
"""
Consumer Kafka per elaborazione statement xAPI.
Consumer groups per elaborazione parallela e fault-tolerant.
"""
def __init__(
self,
bootstrap_servers: str,
group_id: str,
handler: Callable[[dict], Awaitable[None]],
):
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
self.handler = handler
self._consumer: Optional[AIOKafkaConsumer] = None
self._running = False
async def start(self, topics: list = None):
topics = topics or ["edtech.xapi.statements"]
self._consumer = AIOKafkaConsumer(
*topics,
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id,
auto_offset_reset="earliest",
enable_auto_commit=False, # Commit manuale per at-least-once
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
max_poll_records=100, # Batch di 100 messaggi per poll
)
await self._consumer.start()
self._running = True
logger.info(f"Consumer {self.group_id} avviato su {topics}")
async def run(self):
"""Loop principale di consumo messaggi."""
if not self._consumer:
raise RuntimeError("Consumer non avviato.")
try:
async for message in self._consumer:
try:
await self.handler(message.value)
# Commit dopo elaborazione con successo
await self._consumer.commit()
except Exception as e:
logger.error(
f"Errore elaborazione messaggio offset {message.offset}: {e}",
exc_info=True,
)
# Non committare: il messaggio verrà rielaborato
# In produzione: invia a Dead Letter Queue dopo N tentativi
finally:
await self._consumer.stop()
3. 조기 경고 감지: 자퇴 위험이 있는 학생
EdTech 플랫폼의 가장 중요한 지표는 가입 수가 아니라, 하지만 완료율. 학생을 조기에 식별 이탈 위험(탈락 예측)을 통해 개인화된 알림으로 개입할 수 있으며, 학습 경로의 교사 지원 또는 수정. 우리는 Kafka Streams를 사용합니다 실시간으로 위험 신호를 계산합니다.
# analytics/dropout_detector.py
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from enum import Enum
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class DropoutRiskSignal:
student_id: str
course_id: str
risk_level: RiskLevel
risk_score: float # 0.0 - 1.0
contributing_factors: List[str]
last_activity: Optional[datetime]
recommended_action: str
calculated_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class StudentEngagementMetrics:
student_id: str
course_id: str
days_since_last_activity: int
completion_rate: float # 0.0 - 1.0
avg_quiz_score: float # 0.0 - 100.0
quiz_failure_rate: float # 0.0 - 1.0
avg_session_duration_minutes: float
total_sessions_last_30d: int
video_rewatch_rate: float # Quante volte rivede video (difficolta?)
help_requests_last_7d: int
class DropoutRiskDetector:
"""
Calcola il rischio di abbandono basato su un modello a regole.
In produzione: sostituisci con un modello ML addestrato su dati storici.
"""
RISK_THRESHOLDS = {
RiskLevel.LOW: (0.0, 0.3),
RiskLevel.MEDIUM: (0.3, 0.6),
RiskLevel.HIGH: (0.6, 0.8),
RiskLevel.CRITICAL: (0.8, 1.0),
}
RECOMMENDED_ACTIONS = {
RiskLevel.LOW: "Monitoraggio routinario. Nessuna azione immediata.",
RiskLevel.MEDIUM: "Invia notifica push motivazionale. Suggerisci contenuto più facile.",
RiskLevel.HIGH: "Contatta il tutor del corso. Proponi sessione di supporto 1:1.",
RiskLevel.CRITICAL: "Escalation immediata: email al responsabile del corso e allo studente.",
}
def calculate_risk(self, metrics: StudentEngagementMetrics) -> DropoutRiskSignal:
score = 0.0
factors = []
# Fattore 1: Inattivita recente (peso: 35%)
inactivity_score = self._score_inactivity(metrics.days_since_last_activity)
score += inactivity_score * 0.35
if inactivity_score > 0.5:
factors.append(f"Inattivo da {metrics.days_since_last_activity} giorni")
# Fattore 2: Progresso nel corso (peso: 25%)
progress_score = 1.0 - metrics.completion_rate
score += progress_score * 0.25
if progress_score > 0.7:
factors.append(f"Progresso corso: solo {metrics.completion_rate:.0%}")
# Fattore 3: Performance ai quiz (peso: 20%)
quiz_score = self._score_quiz_performance(metrics.avg_quiz_score, metrics.quiz_failure_rate)
score += quiz_score * 0.20
if quiz_score > 0.6:
factors.append(f"Media quiz bassa: {metrics.avg_quiz_score:.1f}/100")
# Fattore 4: Riduzione sessioni (peso: 15%)
session_score = 1.0 - min(metrics.total_sessions_last_30d / 10, 1.0)
score += session_score * 0.15
if metrics.total_sessions_last_30d < 3:
factors.append(f"Solo {metrics.total_sessions_last_30d} sessioni in 30 giorni")
# Fattore 5: Segnali di frustrazione (peso: 5%)
frustration_score = min(metrics.video_rewatch_rate + metrics.help_requests_last_7d * 0.1, 1.0)
score += frustration_score * 0.05
if metrics.video_rewatch_rate > 2.0:
factors.append("Alto numero di revisioni video (possibile difficolta)")
score = min(max(score, 0.0), 1.0)
risk_level = self._score_to_level(score)
return DropoutRiskSignal(
student_id=metrics.student_id,
course_id=metrics.course_id,
risk_level=risk_level,
risk_score=score,
contributing_factors=factors,
last_activity=None, # Iniettare datetime reale
recommended_action=self.RECOMMENDED_ACTIONS[risk_level],
)
def _score_inactivity(self, days: int) -> float:
if days <= 1: return 0.0
if days <= 3: return 0.2
if days <= 7: return 0.5
if days <= 14: return 0.8
return 1.0
def _score_quiz_performance(self, avg_score: float, failure_rate: float) -> float:
score_component = max(0, (60 - avg_score) / 60) # Baseline 60%
failure_component = min(failure_rate * 1.5, 1.0)
return (score_component + failure_component) / 2
def _score_to_level(self, score: float) -> RiskLevel:
for level, (low, high) in self.RISK_THRESHOLDS.items():
if low <= score < high:
return level
return RiskLevel.CRITICAL
4. Kafka Streams를 사용한 실시간 집계
Kafka Streams를 사용하면 Kafka 클러스터에서 직접 실시간 이벤트를 처리할 수 있습니다.
추가 인프라 없이. 우리는 Python 라이브러리를 사용합니다 faust
1시간 동안의 집계 측정항목을 계산합니다.
# analytics/streaming_aggregator.py
import faust
from datetime import datetime, timedelta
from typing import Optional
app = faust.App(
"edtech-analytics",
broker="kafka://localhost:9092",
value_serializer="json",
)
# Topic di input
xapi_topic = app.topic("edtech.xapi.statements")
# Topic di output per dashboard
course_metrics_topic = app.topic("edtech.analytics.course-metrics")
dropout_alerts_topic = app.topic("edtech.analytics.dropout-alerts")
class CourseMetrics(faust.Record):
course_id: str
window_start: str
window_end: str
total_events: int = 0
unique_students: int = 0
lessons_completed: int = 0
quizzes_passed: int = 0
quizzes_failed: int = 0
avg_quiz_score: float = 0.0
# Tabella aggregata per finestra di 1 ora
course_hourly_table = app.Table(
"course-hourly-metrics",
default=dict,
partitions=8,
)
student_activity_table = app.Table(
"student-activity",
default=dict,
partitions=8,
)
@app.agent(xapi_topic)
async def process_xapi_statement(statements):
"""Processa ogni statement xAPI e aggiorna le metriche aggregate."""
async for statement in statements:
actor = statement.get("actor", {})
verb = statement.get("verb", {}).get("id", "")
result = statement.get("result", {}) or {}
context = statement.get("context", {}) or {}
student_id = actor.get("name", "unknown")
course_id = _extract_course_id(context)
timestamp = statement.get("timestamp", "")
if not course_id:
continue
# Chiave finestra oraria
hour_key = f"{course_id}:{timestamp[:13]}" # ISO troncato all'ora
# Aggiorna metriche corso
metrics = course_hourly_table[hour_key]
metrics["total_events"] = metrics.get("total_events", 0) + 1
metrics.setdefault("students", set())
metrics["students"].add(student_id)
verb_local = verb.split("/")[-1] # Prendi solo il nome del verbo
if verb_local == "completed":
metrics["lessons_completed"] = metrics.get("lessons_completed", 0) + 1
elif verb_local == "passed":
metrics["quizzes_passed"] = metrics.get("quizzes_passed", 0) + 1
score = result.get("score", {}).get("raw", 0)
prev_avg = metrics.get("avg_quiz_score", 0.0)
prev_count = metrics.get("quiz_count", 0)
metrics["avg_quiz_score"] = (prev_avg * prev_count + score) / (prev_count + 1)
metrics["quiz_count"] = prev_count + 1
elif verb_local == "failed":
metrics["quizzes_failed"] = metrics.get("quizzes_failed", 0) + 1
course_hourly_table[hour_key] = metrics
# Aggiorna attivita studente per dropout detection
student_key = f"{student_id}:{course_id}"
student_data = student_activity_table[student_key]
student_data["last_activity"] = timestamp
student_data["event_count"] = student_data.get("event_count", 0) + 1
student_activity_table[student_key] = student_data
# Emetti metriche aggregate ogni 100 eventi
if metrics["total_events"] % 100 == 0:
await course_metrics_topic.send(
key=course_id,
value={
"course_id": course_id,
"hour_key": hour_key,
"total_events": metrics["total_events"],
"unique_students": len(metrics.get("students", set())),
"lessons_completed": metrics.get("lessons_completed", 0),
"quizzes_passed": metrics.get("quizzes_passed", 0),
"quizzes_failed": metrics.get("quizzes_failed", 0),
"avg_quiz_score": metrics.get("avg_quiz_score", 0.0),
},
)
def _extract_course_id(context: dict) -> Optional[str]:
"""Estrae il course_id dal contesto xAPI."""
parent_activities = context.get("contextActivities", {}).get("parent", [])
for activity in parent_activities:
activity_id = activity.get("id", "")
if "/activities/course/" in activity_id:
return activity_id.split("/activities/course/")[-1]
return None
5. 교사 대시보드: 주요 지표
수집된 데이터는 교사에게 유용한 통찰력이 되어야 합니다. 좋은 대시보드 학습 분석은 현재 일어나고 있는 일뿐만 아니라(서술적인), 하지만 왜(특수 증상) 그리고 학생이 무엇을 할 것인지 (예측).
# api/analytics_dashboard.py
from fastapi import FastAPI, Depends, Query
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime, date, timedelta
class CourseOverview(BaseModel):
course_id: str
course_name: str
total_enrolled: int
active_last_7d: int
completion_rate: float
avg_quiz_score: float
at_risk_count: int
dropout_rate_30d: float
class StudentProgressDetail(BaseModel):
student_id: str
student_name: str
enrollment_date: date
completion_percent: float
avg_quiz_score: float
streak_days: int
days_since_last_activity: int
risk_level: str
risk_score: float
class ContentDifficultyReport(BaseModel):
lesson_id: str
lesson_title: str
avg_time_spent_minutes: float
rewatch_rate: float
quit_rate: float # % studenti che hanno abbandonato durante questo contenuto
avg_quiz_score_after: float
difficulty_index: float # Calcolato: alto = contenuto difficile
app = FastAPI(title="Learning Analytics Dashboard API")
@app.get("/api/analytics/courses/{course_id}/overview", response_model=CourseOverview)
async def get_course_overview(course_id: str, db=Depends(get_db)):
"""Panoramica corso con metriche chiave per il dashboard insegnante."""
row = await db.execute("""
SELECT
c.name,
COUNT(DISTINCT e.student_id) as enrolled,
COUNT(DISTINCT CASE WHEN a.last_activity > NOW() - INTERVAL '7 days' THEN a.student_id END) as active_7d,
AVG(CASE WHEN e.completed THEN 1.0 ELSE 0.0 END) as completion_rate,
AVG(q.avg_score) as avg_quiz,
COUNT(DISTINCT CASE WHEN r.risk_level IN ('high','critical') THEN r.student_id END) as at_risk,
COUNT(DISTINCT CASE WHEN e.dropped_out AND e.dropout_date > NOW() - INTERVAL '30 days' THEN e.student_id END)::float /
NULLIF(COUNT(DISTINCT CASE WHEN e.enrollment_date < NOW() - INTERVAL '30 days' THEN e.student_id END), 0) as dropout_rate
FROM courses c
LEFT JOIN enrollments e ON c.id = e.course_id
LEFT JOIN student_activity a ON e.student_id = a.student_id AND a.course_id = c.id
LEFT JOIN student_quiz_stats q ON e.student_id = q.student_id AND q.course_id = c.id
LEFT JOIN dropout_risk r ON e.student_id = r.student_id AND r.course_id = c.id
WHERE c.id = :cid
GROUP BY c.id, c.name
""", {"cid": course_id})
data = row.fetchone()
return CourseOverview(
course_id=course_id,
course_name=data[0],
total_enrolled=data[1] or 0,
active_last_7d=data[2] or 0,
completion_rate=float(data[3] or 0),
avg_quiz_score=float(data[4] or 0),
at_risk_count=data[5] or 0,
dropout_rate_30d=float(data[6] or 0),
)
@app.get("/api/analytics/courses/{course_id}/at-risk", response_model=List[StudentProgressDetail])
async def get_at_risk_students(
course_id: str,
risk_level: Optional[str] = Query(None, description="Filter by risk: low, medium, high, critical"),
db=Depends(get_db),
):
"""Lista studenti a rischio abbandono con dettagli progressione."""
query = """
SELECT
s.id, s.name,
e.enrollment_date,
COALESCE(prog.completion_percent, 0) as completion_percent,
COALESCE(qs.avg_score, 0) as avg_quiz_score,
COALESCE(str.current_streak, 0) as streak_days,
EXTRACT(DAY FROM NOW() - a.last_activity)::int as days_inactive,
r.risk_level,
r.risk_score
FROM students s
JOIN enrollments e ON s.id = e.student_id AND e.course_id = :cid
JOIN dropout_risk r ON s.id = r.student_id AND r.course_id = :cid
LEFT JOIN course_progress prog ON s.id = prog.student_id AND prog.course_id = :cid
LEFT JOIN student_quiz_stats qs ON s.id = qs.student_id AND qs.course_id = :cid
LEFT JOIN student_streaks str ON s.id = str.student_id
LEFT JOIN student_activity a ON s.id = a.student_id AND a.course_id = :cid
WHERE (:risk IS NULL OR r.risk_level = :risk)
ORDER BY r.risk_score DESC
LIMIT 100
"""
rows = (await db.execute(query, {"cid": course_id, "risk": risk_level})).fetchall()
return [
StudentProgressDetail(
student_id=r[0], student_name=r[1], enrollment_date=r[2],
completion_percent=float(r[3]), avg_quiz_score=float(r[4]),
streak_days=r[5], days_since_last_activity=r[6],
risk_level=r[7], risk_score=float(r[8]),
)
for r in rows
]
6. 학습 분석의 GDPR 및 개인정보 보호
학습 데이터는 민감한 개인 데이터입니다. 수집 및 분석 학생 상호 작용에는 GDPR 준수가 필요하며, 미성년자의 경우 특정 법률(미국의 COPPA, 미성년자 보호에 관한 유럽 지침)에 따릅니다. 기본 원칙: 데이터 최소화 (만 수집 필요에 따라), 익명화 집계 보고서의 경우 잊혀질 권리 (모든 학생 데이터 삭제 가능) 전자 투명도 (수집 대상과 이유에 대한 명확한 정보)
학습 분석을 위한 GDPR 체크리스트
- 상세한 행동 데이터 수집 전 명시적 동의
- 집계 보고서에서 학생 ID를 익명화합니다(개인 데이터는 포함하지 않음).
- 잊혀질 권리를 위해 DELETE /students/{id}/analytics-data 엔드포인트 구현
- 미성년 학생의 데이터에는 부모의 동의가 필요합니다.
- 데이터 보존 정책: 원시 이벤트를 보관하는 기간을 정의합니다(예: 2년).
- 학생 분석에 접근하는 사람에 대한 감사 로그
- 저장 및 전송 중인 데이터 암호화
- 명시적인 동의 없이 제3자와 데이터를 공유하지 마십시오.
결론 및 다음 단계
우리는 완전한 학습 분석 파이프라인을 구축했습니다. 데이터 표준화, 대용량 수집을 보장하는 Kafka 전달, Faust/Kafka Streams를 통한 실시간 집계, 드롭아웃 감지 행동 신호를 기반으로 하며 교사를 위한 대시보드 API를 제공합니다.
다음 단계는 과거 데이터에 대해 훈련된 ML 모델을 통합하여 규칙 기반 드롭아웃 감지기를 대체합니다. 충분한 데이터로 모델을 ML은 조기 식별에서 85% 이상의 정확도를 달성합니다. 실제 자퇴 2~3주 전 위험에 처한 학생들.
다음 기사에서는 실시간 협업 EdTech 플랫폼: 공동 편집 및 WebSocket을 위한 Yjs가 포함된 CRDT 공유 문서의 실시간 동기화를 위해.
EdTech 엔지니어링 시리즈
- 확장 가능한 LMS 아키텍처: 다중 테넌트 패턴
- 적응형 학습 알고리즘: 이론에서 생산까지
- 교육용 비디오 스트리밍: WebRTC, HLS, DASH
- AI 감독 시스템: 컴퓨터 비전을 통한 개인정보 보호 우선
- LLM을 통한 맞춤형 교사: 지식 기반 구축을 위한 RAG
- 게임화 엔진: 아키텍처 및 상태 머신
- 학습 분석: xAPI 및 Kafka를 사용한 데이터 파이프라인(이 문서)
- EdTech의 실시간 협업: CRDT 및 WebSocket
- 모바일 우선 교육 기술: 오프라인 우선 아키텍처
- 다중 테넌트 콘텐츠 관리: 버전 관리 및 SCORM







