ラーニング アナリティクス: xAPI と Kafka を使用したデータ パイプライン
すべてのクリック、すべての間違った答え、すべてのビデオが一時停止され、すべての演習が放棄されました 生徒の学習プロセスについて重要なことを伝えます。プラットフォーム 現代の EdTech はこれらのイベントを毎日何十億件も収集していますが、そのほとんどは それらを破棄するか、些細な集計統計のためにのみ使用します。「コース X には、 完成」。これはデータの無駄であり、機会の損失です。
学習分析 そしてこの行動データを変換する規律 実用的な洞察: どの学生がコースからドロップアウトするかを予測し、特定する 最も困難を引き起こすコンテンツをリアルタイムで学習パスを最適化します。 しかし、これを行うには、2 つの基本的な技術要素が必要です。 データ標準 相互運用性 (xAPI) を可能にし、インフラストラクチャー 管理できる 大容量ストリーミング イベント (Apache Kafka)。
この記事では、完全な学習分析パイプラインを世代から構築します。 クライアントでの xAPI ステートメントから、Kafka によるリアルタイムの取り込み、処理まで Flink を使用して行動パターンを検出し、教師用のダッシュボードまで使用できます。
この記事で学べること
- xAPI (Experience API / Tin Can): ステートメントの構造とベスト プラクティス
- Learning Record Store (LRS): アーキテクチャとソリューションの選択
- 大規模な教育イベント向けの Kafka パイプライン
- 並列処理およびフォールトトレラント処理のためのコンシューマ グループ
- 中退の危険性がある学生の早期発見
- Kafka Streams によるリアルタイム集計
- 主要な指標を含む教師分析ダッシュボード
- 学習データ収集における GDPR とプライバシー
1. xAPI: データ学習の標準
xAPI (Experience API、「Tin Can API」とも呼ばれる) は、 学習経験の表現。 SCORM の限界を克服する (クローズド LMS で「完了/未完了」のみを追跡) オンラインでもオフラインでも、屋内でも屋外でも、あらゆる学習活動を追跡するため 標準化された相互運用可能な語彙を備えた LMS から。
xAPI ステートメントの基本構造は単純です。 俳優 (誰が)、 動詞 (彼は何をしたの)、 物体 (何について)。 オプションで: 結果 (どのような結果になるか)、 コンテクスト (どのような文脈で) e タイムスタンプ。この主語-動詞-目的語構造 直感的かつ柔軟で、あらゆる教育シナリオに対応できます。
# xapi/statement_builder.py
from dataclasses import dataclass, field, asdict
from typing import Optional, Dict, Any, List
from datetime import datetime, timezone
import uuid
@dataclass
class XAPIActor:
"""Chi compie l'azione."""
mbox: str # mailto:user@example.com
name: str
objectType: str = "Agent"
@dataclass
class XAPIVerb:
"""Cosa ha fatto l'attore. Usa vocabolari standardizzati (ADL, TinCan)."""
id: str # URI del verbo, es: http://adlnet.gov/expapi/verbs/completed
display: Dict[str, str] # {"en-US": "completed", "it-IT": "completato"}
@dataclass
class XAPIActivity:
"""Su cosa e stata compiuta l'azione."""
id: str # URI univoca dell'oggetto
objectType: str = "Activity"
definition: Optional[Dict] = None
@dataclass
class XAPIResult:
"""Risultato dell'azione (opzionale)."""
score: Optional[Dict] = None # {"scaled": 0.85, "raw": 85, "min": 0, "max": 100}
success: Optional[bool] = None
completion: Optional[bool] = None
duration: Optional[str] = None # ISO 8601: "PT5M30S" = 5 minuti 30 secondi
response: Optional[str] = None # Risposta testuale dello studente
@dataclass
class XAPIContext:
"""Contesto aggiuntivo."""
platform: str = "EdTech Platform"
language: str = "it-IT"
contextActivities: Optional[Dict] = None # Gerarchia: course -> module -> lesson
extensions: Optional[Dict] = None # Dati custom (device, browser, etc.)
@dataclass
class XAPIStatement:
actor: XAPIActor
verb: XAPIVerb
object: XAPIActivity
id: str = field(default_factory=lambda: str(uuid.uuid4()))
result: Optional[XAPIResult] = None
context: Optional[XAPIContext] = None
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
stored: Optional[str] = None
version: str = "1.0.3"
def to_dict(self) -> Dict:
return asdict(self)
# Verbi xAPI standard (ADL vocabulary)
XAPI_VERBS = {
"completed": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/completed",
display={"en-US": "completed", "it-IT": "completato"},
),
"attempted": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/attempted",
display={"en-US": "attempted", "it-IT": "tentato"},
),
"passed": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/passed",
display={"en-US": "passed", "it-IT": "superato"},
),
"failed": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/failed",
display={"en-US": "failed", "it-IT": "fallito"},
),
"experienced": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/experienced",
display={"en-US": "experienced", "it-IT": "esperienzato"},
),
"asked": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/asked",
display={"en-US": "asked", "it-IT": "chiesto"},
),
"interacted": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/interacted",
display={"en-US": "interacted", "it-IT": "interagito"},
),
}
class StatementBuilder:
"""Builder per statement xAPI con validazione."""
def __init__(self, platform_url: str, tenant_id: str):
self.platform_url = platform_url
self.tenant_id = tenant_id
def lesson_completed(
self,
student_id: str,
student_email: str,
lesson_id: str,
lesson_title: str,
course_id: str,
duration_seconds: int,
score_percent: Optional[float] = None,
) -> XAPIStatement:
return XAPIStatement(
actor=XAPIActor(
mbox=f"mailto:{student_email}",
name=student_id,
),
verb=XAPI_VERBS["completed"],
object=XAPIActivity(
id=f"{self.platform_url}/activities/{lesson_id}",
definition={
"name": {"it-IT": lesson_title},
"type": "http://adlnet.gov/expapi/activities/lesson",
},
),
result=XAPIResult(
completion=True,
duration=f"PT{duration_seconds}S",
score={"scaled": score_percent / 100, "raw": score_percent, "min": 0, "max": 100} if score_percent else None,
),
context=XAPIContext(
contextActivities={
"parent": [{"id": f"{self.platform_url}/activities/course/{course_id}"}],
"grouping": [{"id": f"{self.platform_url}/activities/tenant/{self.tenant_id}"}],
},
extensions={
"https://schema.example.com/extensions/student_id": student_id,
"https://schema.example.com/extensions/tenant_id": self.tenant_id,
},
),
)
def quiz_answered(
self,
student_id: str,
student_email: str,
question_id: str,
question_text: str,
student_response: str,
correct: bool,
time_spent_seconds: int,
) -> XAPIStatement:
verb = XAPI_VERBS["passed"] if correct else XAPI_VERBS["failed"]
return XAPIStatement(
actor=XAPIActor(mbox=f"mailto:{student_email}", name=student_id),
verb=verb,
object=XAPIActivity(
id=f"{self.platform_url}/activities/question/{question_id}",
definition={
"name": {"it-IT": question_text[:100]},
"type": "http://adlnet.gov/expapi/activities/cmi.interaction",
},
),
result=XAPIResult(
success=correct,
response=student_response[:500], # Tronca per privacy
duration=f"PT{time_spent_seconds}S",
),
)
2. 教育イベント用の Kafka パイプライン
何百万もの学生が同時にイベントを生成するため、 以下を保証するメッセージング システム: 高可用性, 学生ごとにソートを保証 (同じイベントのすべて 学生は順番に処理する必要があります)、e リプレイ (できる 分析ロジックを変更する場合は、歴史的な出来事を作り直す必要があります)。 Apache Kafka はこれらすべての要件を満たしており、運用環境における標準的な選択肢です。 大規模なイベント パイプライン向け。
# kafka/producer.py
import json
import logging
from typing import Optional
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
logger = logging.getLogger(__name__)
class LearningEventProducer:
"""
Producer Kafka per eventi xAPI.
Usa la chiave di partizione student_id per garantire ordine per studente.
"""
TOPIC = "edtech.xapi.statements"
def __init__(self, bootstrap_servers: str):
self.bootstrap_servers = bootstrap_servers
self._producer: Optional[AIOKafkaProducer] = None
async def start(self):
self._producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8") if k else None,
# Configurazione per affidabilità
acks="all", # Conferma da tutti i replica
enable_idempotence=True, # Exactly-once delivery
compression_type="gzip", # Compressione per ridurre banda
max_batch_size=64 * 1024, # 64KB batch
linger_ms=10, # Aspetta 10ms per batch più grandi
)
await self._producer.start()
logger.info(f"Kafka producer started, topic: {self.TOPIC}")
async def stop(self):
if self._producer:
await self._producer.stop()
async def send_statement(self, statement: "XAPIStatement") -> bool:
"""
Invia uno statement xAPI a Kafka.
La partition key e lo student_id per garantire ordine per studente.
"""
if not self._producer:
raise RuntimeError("Producer non avviato. Chiama start() prima.")
statement_dict = statement.to_dict()
student_id = statement_dict["actor"]["name"]
try:
await self._producer.send_and_wait(
topic=self.TOPIC,
key=student_id, # Partition key: stesso studente -> stessa partizione
value=statement_dict,
headers=[
("content-type", b"application/json"),
("schema-version", b"1.0.3"),
],
)
return True
except KafkaError as e:
logger.error(f"Errore invio statement Kafka: {e}", exc_info=True)
return False
async def send_batch(self, statements: list) -> int:
"""Invia un batch di statement. Ritorna il numero di statement inviati con successo."""
success_count = 0
async with self._producer.transaction():
for stmt in statements:
if await self.send_statement(stmt):
success_count += 1
return success_count
# kafka/consumer.py
import asyncio
import json
import logging
from typing import Callable, Awaitable
from aiokafka import AIOKafkaConsumer
logger = logging.getLogger(__name__)
class LearningEventConsumer:
"""
Consumer Kafka per elaborazione statement xAPI.
Consumer groups per elaborazione parallela e fault-tolerant.
"""
def __init__(
self,
bootstrap_servers: str,
group_id: str,
handler: Callable[[dict], Awaitable[None]],
):
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
self.handler = handler
self._consumer: Optional[AIOKafkaConsumer] = None
self._running = False
async def start(self, topics: list = None):
topics = topics or ["edtech.xapi.statements"]
self._consumer = AIOKafkaConsumer(
*topics,
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id,
auto_offset_reset="earliest",
enable_auto_commit=False, # Commit manuale per at-least-once
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
max_poll_records=100, # Batch di 100 messaggi per poll
)
await self._consumer.start()
self._running = True
logger.info(f"Consumer {self.group_id} avviato su {topics}")
async def run(self):
"""Loop principale di consumo messaggi."""
if not self._consumer:
raise RuntimeError("Consumer non avviato.")
try:
async for message in self._consumer:
try:
await self.handler(message.value)
# Commit dopo elaborazione con successo
await self._consumer.commit()
except Exception as e:
logger.error(
f"Errore elaborazione messaggio offset {message.offset}: {e}",
exc_info=True,
)
# Non committare: il messaggio verrà rielaborato
# In produzione: invia a Dead Letter Queue dopo N tentativi
finally:
await self._consumer.stop()
3. 早期警告検出: 中退の危険にさらされている学生
EdTech プラットフォームにとって最も重要な指標はサインアップ数ではありません。 しかし、 完了率。学生を早期に特定する 放棄のリスクがある (ドロップアウト予測) と、パーソナライズされた通知で介入できるようになります。 家庭教師のサポートまたは学習パスの変更。 Kafkaストリームを使用します リスクシグナルをリアルタイムで計算します。
# analytics/dropout_detector.py
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from enum import Enum
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class DropoutRiskSignal:
student_id: str
course_id: str
risk_level: RiskLevel
risk_score: float # 0.0 - 1.0
contributing_factors: List[str]
last_activity: Optional[datetime]
recommended_action: str
calculated_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class StudentEngagementMetrics:
student_id: str
course_id: str
days_since_last_activity: int
completion_rate: float # 0.0 - 1.0
avg_quiz_score: float # 0.0 - 100.0
quiz_failure_rate: float # 0.0 - 1.0
avg_session_duration_minutes: float
total_sessions_last_30d: int
video_rewatch_rate: float # Quante volte rivede video (difficolta?)
help_requests_last_7d: int
class DropoutRiskDetector:
"""
Calcola il rischio di abbandono basato su un modello a regole.
In produzione: sostituisci con un modello ML addestrato su dati storici.
"""
RISK_THRESHOLDS = {
RiskLevel.LOW: (0.0, 0.3),
RiskLevel.MEDIUM: (0.3, 0.6),
RiskLevel.HIGH: (0.6, 0.8),
RiskLevel.CRITICAL: (0.8, 1.0),
}
RECOMMENDED_ACTIONS = {
RiskLevel.LOW: "Monitoraggio routinario. Nessuna azione immediata.",
RiskLevel.MEDIUM: "Invia notifica push motivazionale. Suggerisci contenuto più facile.",
RiskLevel.HIGH: "Contatta il tutor del corso. Proponi sessione di supporto 1:1.",
RiskLevel.CRITICAL: "Escalation immediata: email al responsabile del corso e allo studente.",
}
def calculate_risk(self, metrics: StudentEngagementMetrics) -> DropoutRiskSignal:
score = 0.0
factors = []
# Fattore 1: Inattivita recente (peso: 35%)
inactivity_score = self._score_inactivity(metrics.days_since_last_activity)
score += inactivity_score * 0.35
if inactivity_score > 0.5:
factors.append(f"Inattivo da {metrics.days_since_last_activity} giorni")
# Fattore 2: Progresso nel corso (peso: 25%)
progress_score = 1.0 - metrics.completion_rate
score += progress_score * 0.25
if progress_score > 0.7:
factors.append(f"Progresso corso: solo {metrics.completion_rate:.0%}")
# Fattore 3: Performance ai quiz (peso: 20%)
quiz_score = self._score_quiz_performance(metrics.avg_quiz_score, metrics.quiz_failure_rate)
score += quiz_score * 0.20
if quiz_score > 0.6:
factors.append(f"Media quiz bassa: {metrics.avg_quiz_score:.1f}/100")
# Fattore 4: Riduzione sessioni (peso: 15%)
session_score = 1.0 - min(metrics.total_sessions_last_30d / 10, 1.0)
score += session_score * 0.15
if metrics.total_sessions_last_30d < 3:
factors.append(f"Solo {metrics.total_sessions_last_30d} sessioni in 30 giorni")
# Fattore 5: Segnali di frustrazione (peso: 5%)
frustration_score = min(metrics.video_rewatch_rate + metrics.help_requests_last_7d * 0.1, 1.0)
score += frustration_score * 0.05
if metrics.video_rewatch_rate > 2.0:
factors.append("Alto numero di revisioni video (possibile difficolta)")
score = min(max(score, 0.0), 1.0)
risk_level = self._score_to_level(score)
return DropoutRiskSignal(
student_id=metrics.student_id,
course_id=metrics.course_id,
risk_level=risk_level,
risk_score=score,
contributing_factors=factors,
last_activity=None, # Iniettare datetime reale
recommended_action=self.RECOMMENDED_ACTIONS[risk_level],
)
def _score_inactivity(self, days: int) -> float:
if days <= 1: return 0.0
if days <= 3: return 0.2
if days <= 7: return 0.5
if days <= 14: return 0.8
return 1.0
def _score_quiz_performance(self, avg_score: float, failure_rate: float) -> float:
score_component = max(0, (60 - avg_score) / 60) # Baseline 60%
failure_component = min(failure_rate * 1.5, 1.0)
return (score_component + failure_component) / 2
def _score_to_level(self, score: float) -> RiskLevel:
for level, (low, high) in self.RISK_THRESHOLDS.items():
if low <= score < high:
return level
return RiskLevel.CRITICAL
4. Kafka ストリームを使用したリアルタイム集計
Kafka Streams を使用すると、リアルタイム イベントを Kafka クラスター内で直接処理できます。
追加のインフラストラクチャなしで。 Pythonライブラリを使用します faust
1 時間の時間枠にわたる集計メトリクスを計算します。
# analytics/streaming_aggregator.py
import faust
from datetime import datetime, timedelta
from typing import Optional
app = faust.App(
"edtech-analytics",
broker="kafka://localhost:9092",
value_serializer="json",
)
# Topic di input
xapi_topic = app.topic("edtech.xapi.statements")
# Topic di output per dashboard
course_metrics_topic = app.topic("edtech.analytics.course-metrics")
dropout_alerts_topic = app.topic("edtech.analytics.dropout-alerts")
class CourseMetrics(faust.Record):
course_id: str
window_start: str
window_end: str
total_events: int = 0
unique_students: int = 0
lessons_completed: int = 0
quizzes_passed: int = 0
quizzes_failed: int = 0
avg_quiz_score: float = 0.0
# Tabella aggregata per finestra di 1 ora
course_hourly_table = app.Table(
"course-hourly-metrics",
default=dict,
partitions=8,
)
student_activity_table = app.Table(
"student-activity",
default=dict,
partitions=8,
)
@app.agent(xapi_topic)
async def process_xapi_statement(statements):
"""Processa ogni statement xAPI e aggiorna le metriche aggregate."""
async for statement in statements:
actor = statement.get("actor", {})
verb = statement.get("verb", {}).get("id", "")
result = statement.get("result", {}) or {}
context = statement.get("context", {}) or {}
student_id = actor.get("name", "unknown")
course_id = _extract_course_id(context)
timestamp = statement.get("timestamp", "")
if not course_id:
continue
# Chiave finestra oraria
hour_key = f"{course_id}:{timestamp[:13]}" # ISO troncato all'ora
# Aggiorna metriche corso
metrics = course_hourly_table[hour_key]
metrics["total_events"] = metrics.get("total_events", 0) + 1
metrics.setdefault("students", set())
metrics["students"].add(student_id)
verb_local = verb.split("/")[-1] # Prendi solo il nome del verbo
if verb_local == "completed":
metrics["lessons_completed"] = metrics.get("lessons_completed", 0) + 1
elif verb_local == "passed":
metrics["quizzes_passed"] = metrics.get("quizzes_passed", 0) + 1
score = result.get("score", {}).get("raw", 0)
prev_avg = metrics.get("avg_quiz_score", 0.0)
prev_count = metrics.get("quiz_count", 0)
metrics["avg_quiz_score"] = (prev_avg * prev_count + score) / (prev_count + 1)
metrics["quiz_count"] = prev_count + 1
elif verb_local == "failed":
metrics["quizzes_failed"] = metrics.get("quizzes_failed", 0) + 1
course_hourly_table[hour_key] = metrics
# Aggiorna attivita studente per dropout detection
student_key = f"{student_id}:{course_id}"
student_data = student_activity_table[student_key]
student_data["last_activity"] = timestamp
student_data["event_count"] = student_data.get("event_count", 0) + 1
student_activity_table[student_key] = student_data
# Emetti metriche aggregate ogni 100 eventi
if metrics["total_events"] % 100 == 0:
await course_metrics_topic.send(
key=course_id,
value={
"course_id": course_id,
"hour_key": hour_key,
"total_events": metrics["total_events"],
"unique_students": len(metrics.get("students", set())),
"lessons_completed": metrics.get("lessons_completed", 0),
"quizzes_passed": metrics.get("quizzes_passed", 0),
"quizzes_failed": metrics.get("quizzes_failed", 0),
"avg_quiz_score": metrics.get("avg_quiz_score", 0.0),
},
)
def _extract_course_id(context: dict) -> Optional[str]:
"""Estrae il course_id dal contesto xAPI."""
parent_activities = context.get("contextActivities", {}).get("parent", [])
for activity in parent_activities:
activity_id = activity.get("id", "")
if "/activities/course/" in activity_id:
return activity_id.split("/activities/course/")[-1]
return None
5. 教師用ダッシュボード: 主要な指標
収集されたデータは教師にとって有益な洞察となる必要があります。優れたダッシュボード 学習分析では、何が起こっているかだけを示す必要はありません (説明的な)、 でもその理由も(診断) そして生徒は何をするか(予測的な).
# api/analytics_dashboard.py
from fastapi import FastAPI, Depends, Query
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime, date, timedelta
class CourseOverview(BaseModel):
course_id: str
course_name: str
total_enrolled: int
active_last_7d: int
completion_rate: float
avg_quiz_score: float
at_risk_count: int
dropout_rate_30d: float
class StudentProgressDetail(BaseModel):
student_id: str
student_name: str
enrollment_date: date
completion_percent: float
avg_quiz_score: float
streak_days: int
days_since_last_activity: int
risk_level: str
risk_score: float
class ContentDifficultyReport(BaseModel):
lesson_id: str
lesson_title: str
avg_time_spent_minutes: float
rewatch_rate: float
quit_rate: float # % studenti che hanno abbandonato durante questo contenuto
avg_quiz_score_after: float
difficulty_index: float # Calcolato: alto = contenuto difficile
app = FastAPI(title="Learning Analytics Dashboard API")
@app.get("/api/analytics/courses/{course_id}/overview", response_model=CourseOverview)
async def get_course_overview(course_id: str, db=Depends(get_db)):
"""Panoramica corso con metriche chiave per il dashboard insegnante."""
row = await db.execute("""
SELECT
c.name,
COUNT(DISTINCT e.student_id) as enrolled,
COUNT(DISTINCT CASE WHEN a.last_activity > NOW() - INTERVAL '7 days' THEN a.student_id END) as active_7d,
AVG(CASE WHEN e.completed THEN 1.0 ELSE 0.0 END) as completion_rate,
AVG(q.avg_score) as avg_quiz,
COUNT(DISTINCT CASE WHEN r.risk_level IN ('high','critical') THEN r.student_id END) as at_risk,
COUNT(DISTINCT CASE WHEN e.dropped_out AND e.dropout_date > NOW() - INTERVAL '30 days' THEN e.student_id END)::float /
NULLIF(COUNT(DISTINCT CASE WHEN e.enrollment_date < NOW() - INTERVAL '30 days' THEN e.student_id END), 0) as dropout_rate
FROM courses c
LEFT JOIN enrollments e ON c.id = e.course_id
LEFT JOIN student_activity a ON e.student_id = a.student_id AND a.course_id = c.id
LEFT JOIN student_quiz_stats q ON e.student_id = q.student_id AND q.course_id = c.id
LEFT JOIN dropout_risk r ON e.student_id = r.student_id AND r.course_id = c.id
WHERE c.id = :cid
GROUP BY c.id, c.name
""", {"cid": course_id})
data = row.fetchone()
return CourseOverview(
course_id=course_id,
course_name=data[0],
total_enrolled=data[1] or 0,
active_last_7d=data[2] or 0,
completion_rate=float(data[3] or 0),
avg_quiz_score=float(data[4] or 0),
at_risk_count=data[5] or 0,
dropout_rate_30d=float(data[6] or 0),
)
@app.get("/api/analytics/courses/{course_id}/at-risk", response_model=List[StudentProgressDetail])
async def get_at_risk_students(
course_id: str,
risk_level: Optional[str] = Query(None, description="Filter by risk: low, medium, high, critical"),
db=Depends(get_db),
):
"""Lista studenti a rischio abbandono con dettagli progressione."""
query = """
SELECT
s.id, s.name,
e.enrollment_date,
COALESCE(prog.completion_percent, 0) as completion_percent,
COALESCE(qs.avg_score, 0) as avg_quiz_score,
COALESCE(str.current_streak, 0) as streak_days,
EXTRACT(DAY FROM NOW() - a.last_activity)::int as days_inactive,
r.risk_level,
r.risk_score
FROM students s
JOIN enrollments e ON s.id = e.student_id AND e.course_id = :cid
JOIN dropout_risk r ON s.id = r.student_id AND r.course_id = :cid
LEFT JOIN course_progress prog ON s.id = prog.student_id AND prog.course_id = :cid
LEFT JOIN student_quiz_stats qs ON s.id = qs.student_id AND qs.course_id = :cid
LEFT JOIN student_streaks str ON s.id = str.student_id
LEFT JOIN student_activity a ON s.id = a.student_id AND a.course_id = :cid
WHERE (:risk IS NULL OR r.risk_level = :risk)
ORDER BY r.risk_score DESC
LIMIT 100
"""
rows = (await db.execute(query, {"cid": course_id, "risk": risk_level})).fetchall()
return [
StudentProgressDetail(
student_id=r[0], student_name=r[1], enrollment_date=r[2],
completion_percent=float(r[3]), avg_quiz_score=float(r[4]),
streak_days=r[5], days_since_last_activity=r[6],
risk_level=r[7], risk_score=float(r[8]),
)
for r in rows
]
6. 学習分析における GDPR とプライバシー
学習データは機密の個人データです。収集して分析する 学生との交流には GDPR への準拠が必要であり、未成年者の場合は、 特定の法律(米国の COPPA、未成年者の保護に関する欧州指令)に準拠します。 基本原則: データの最小化 (収集のみ 必要に応じて)、 匿名化 集計レポートの場合、 忘れられる権利 (すべての生徒データを削除する可能性があります) e 透明性 (収集される内容とその理由に関する明確な情報)。
学習分析のための GDPR チェックリスト
- 詳細な行動データを収集する前の明示的な同意
- 集計レポートでstudent_idを匿名化します(個人データは含まれません)
- 忘れられる権利のために DELETE /students/{id}/analytics-data エンドポイントを実装する
- 未成年の生徒からのデータには保護者の同意が必要です
- データ保持ポリシー: 未処理のイベントを保持する期間を定義します (例: 2 年)
- 学生の分析にアクセスしたユーザーの監査ログ
- 保存中および転送中のデータの暗号化
- 明示的な同意なしにデータを第三者と共有しないでください
結論と次のステップ
私たちは完全な学習分析パイプラインを構築しました: xAPI ステートメント データの標準化、保証付きの大量取り込みのための Kafka 配信、Faust/Kafka Streams によるリアルタイム集計、ドロップアウト検出 行動シグナルと教師向けのダッシュボード API に基づいています。
次のステップは、履歴データでトレーニングされた ML モデルを統合することです。 ルールベースのドロップアウト検出器を置き換えます。十分なデータがあれば、モデルは ML は、早期の特定において 85% を超える精度を達成します。 実際に退学する 2 ~ 3 週間前に、危険にさらされている学生。
次の記事では、 リアルタイムのコラボレーション EdTech プラットフォーム: 共同編集および WebSocket のための Yjs を使用した CRDT 共有ドキュメントのリアルタイム同期用。
EdTechエンジニアリングシリーズ
- スケーラブルな LMS アーキテクチャ: マルチテナント パターン
- 適応学習アルゴリズム: 理論から本番まで
- 教育向けビデオ ストリーミング: WebRTC vs HLS vs DASH
- AI 監督システム: コンピューター ビジョンによるプライバシー最優先
- LLM を使用した個別の家庭教師: 知識の基礎を築くための RAG
- ゲーミフィケーション エンジン: アーキテクチャとステート マシン
- ラーニング アナリティクス: xAPI と Kafka を使用したデータ パイプライン (この記事)
- EdTech におけるリアルタイム コラボレーション: CRDT と WebSocket
- モバイルファーストの EdTech: オフラインファーストのアーキテクチャ
- マルチテナントコンテンツ管理: バージョン管理と SCORM







