EdTech におけるリアルタイム コラボレーション: CRDT と WebSocket
Google ドキュメントは、ドキュメントの同期コラボレーションが可能であることを証明しました。 Notion、Figma、Miro は、このパラダイムを無限のメモ、デザイン、ホワイトボードに拡張しました。 現在、EdTech プラットフォームは、同じエクスペリエンスを共同学習にもたらしたいと考えています。 学生は同じ文書、コード、または演習にリアルタイムで取り組んでいます。 他の人のカーソルを見て、競合することなく瞬時に変更します。
根本的な問題は、 分散型競争:学生2人の場合 彼らは異なるマシンから同時に同じテキストを編集します。どちらの編集が勝つでしょうか? 単純に「last-write-wins」では受け入れられない結果が生じます(損失) の変更)。標準的なソリューションは 2 つあります。 オペレーション変革 (OT) (Google ドキュメントで使用) e 競合のないレプリケートされたデータ型 (CRDT)。 2024 年から 2025 年にかけて、CRDT は生産の成熟期に達し、 そのシンプルさのおかげで、新しい実装の事実上の標準 概念的かつ数学的な堅牢性。
この記事では、EdTech 用のリアルタイム コラボレーション システムを構築します。 Yjs (最も人気のある CRDT)、WebSocket を使用した共同ドキュメント編集 同期用、および学習用の特定の機能など 共有注釈、名前付きカーソル、スレッド化されたコメント。
この記事で学べること
- CRDT と OT: 違い、利点、いつ何を選択するか
- Yjs: 内部構造、データ型、Y.Doc
- y-websocket を使用した WebSocket 同期サーバー
- 存在: 名前付きカーソル、選択範囲、オンライン ユーザー
- 永続性: CRDT 状態の保存とロード
- 教育的な注釈: コメント、ハイライト、ディスカッション スレッド
- CRDT によるオフラインファースト: 再接続時の自動同期
- スケーリング: 単一サーバーから Redis pub/sub を使用したクラスターへ
1. CRDT と運用の変革
オペレーション変革 (OT) 演算を適用する前に変換する 同時の変更を考慮するため。うまく機能しますが、中央サーバーが必要です 変換を調整します: Google ドキュメントはこのアーキテクチャをサーバーで使用します 集中化された変革。
CRDT データ構造という根本的に異なる方法で問題を解決します。 それらは数学的に次のように設計されています マージ-競合なしで実行可能。 操作は次のとおりです。 可換 (関係なく同じ結果 注文から) e 冪等 (同じ操作を 2 回適用します) 結果は変わりません)。これにより、中央コーディネーターが不要になります。 自動同期によるオフライン操作が可能です。
OT と CRDT の比較
| 待ってます | OT (Google ドキュメント) | CRDT (Yjs、自動マージ) |
|---|---|---|
| 中央コーディネーター | 必須 | 必要ありません |
| オフラインサポート | 限定 | ネイティブ |
| 実装の複雑さ | 高い | メディア(ライブラリ付き) |
| 数学的保証 | それは実装によって異なります | 正式にテスト済み |
| メモリのオーバーヘッド | ベース | 中 (削除されたアイテムの廃棄) |
| 成熟したライブラリ | ShareDB、OT.js | YJS、オートマージ、コーラ |
| スケーラビリティ | 垂直型 (中央サーバー) | 水平型 (P2P またはマルチサーバー) |
| 2025 年の採用 | レガシー (既存のシステム) | 新しいシステムの基準 |
2. Yjs: EdTech 用の CRDT
Yjs 2025 年に最も成熟した CRDT ライブラリが採用されました。
すべての主要なエディタ (ProseMirror、TipTap、CodeMirror、Monaco) のバインディングを提供します。
WebSocket、WebRTC、IndexedDB、および以下に基づくシンプルな API のサポート
Y.Doc (共同作業ドキュメント) とその共有データ型。
Yjs eの基本単位 Y.Doc: を含む文書
共有データ構造の階層。このデータに変更が加えられる
自動的にシリアル化されます バイナリアップデート できるだけコンパクトに
すべてのピアに送信され、競合することなく任意の順序で適用されます。
// Frontend: src/collaboration/collaborative-editor.ts
import * as Y from 'yjs';
import { WebsocketProvider } from 'y-websocket';
import { QuillBinding } from 'y-quill';
import { IndexeddbPersistence } from 'y-indexeddb';
interface CollabUser {
name: string;
color: string;
avatar: string;
studentId: string;
}
export class CollaborativeEditorSession {
private doc: Y.Doc;
private wsProvider: WebsocketProvider;
private dbProvider: IndexeddbPersistence;
private text: Y.Text;
private annotations: Y.Array<any>;
private comments: Y.Map<any>;
constructor(
private readonly documentId: string,
private readonly user: CollabUser,
private readonly wsUrl: string,
) {
// Crea il documento collaborativo
this.doc = new Y.Doc();
// Strutture dati condivise
this.text = this.doc.getText('content');
this.annotations = this.doc.getArray('annotations');
this.comments = this.doc.getMap('comments');
}
async initialize(): Promise<void> {
// Persistenza locale con IndexedDB (supporto offline)
this.dbProvider = new IndexeddbPersistence(
`edtech-doc-${this.documentId}`,
this.doc,
);
await new Promise<void>((resolve) => {
this.dbProvider.on('synced', () => {
console.log('Documento caricato da IndexedDB');
resolve();
});
});
// Connessione WebSocket per sync real-time
this.wsProvider = new WebsocketProvider(
this.wsUrl,
`doc-${this.documentId}`,
this.doc,
{
connect: true,
awareness: this.createAwareness(),
},
);
this.wsProvider.on('status', (event: { status: string }) => {
console.log('WebSocket status:', event.status);
});
// Gestione presenza: cursori degli altri utenti
this.setupPresence();
}
private createAwareness() {
// Awareness e il meccanismo Yjs per stato temporaneo (presenza, cursori)
// NON fa parte del documento persistente, solo stato effimero
return {
getLocalState: () => ({
user: this.user,
cursor: null, // Aggiornato durante l'editing
selection: null,
lastSeen: Date.now(),
}),
};
}
private setupPresence(): void {
const awareness = this.wsProvider.awareness;
// Imposta stato locale dell'utente
awareness.setLocalStateField('user', this.user);
// Ascolta cambiamenti nella presenza degli altri utenti
awareness.on('change', () => {
const states = Array.from(awareness.getStates().entries());
const onlineUsers = states
.filter(([clientId]) => clientId !== this.doc.clientID)
.map(([, state]) => state.user)
.filter(Boolean);
this.onUsersChanged(onlineUsers);
});
}
updateCursor(position: number, selection: { from: number; to: number } | null): void {
this.wsProvider.awareness.setLocalStateField('cursor', position);
this.wsProvider.awareness.setLocalStateField('selection', selection);
}
addAnnotation(annotation: {
from: number;
to: number;
type: 'highlight' | 'underline' | 'comment';
comment?: string;
authorId: string;
}): void {
this.doc.transact(() => {
this.annotations.push([{
id: crypto.randomUUID(),
...annotation,
createdAt: Date.now(),
}]);
});
}
addComment(commentId: string, comment: {
text: string;
authorId: string;
authorName: string;
replyTo?: string;
}): void {
this.doc.transact(() => {
const thread = this.comments.get(commentId) || [];
this.comments.set(commentId, [...thread, {
id: crypto.randomUUID(),
...comment,
timestamp: Date.now(),
}]);
});
}
getAnnotations(): any[] {
return this.annotations.toArray();
}
getComments(): Map<string, any[]> {
return new Map(Array.from(this.comments.entries()));
}
protected onUsersChanged(users: CollabUser[]): void {
// Override in componente per aggiornare UI
console.log('Utenti online:', users.map(u => u.name));
}
destroy(): void {
this.wsProvider?.destroy();
this.dbProvider?.destroy();
this.doc?.destroy();
}
}
3. WebSocket同期サーバー
WebSocket Yjs サーバーは、クライアントからの更新の収集、
同じドキュメントに接続されているすべてのピアにそれらを配布し、状態を永続化します。
新しいクライアントが接続するためのドキュメント。使ってみましょう y-websocket
これをベースとして、テナントごとの認証と認可で拡張します。
Redis/PostgreSQL での永続性。
# server/collab_server.py
import asyncio
import json
import logging
from typing import Dict, Set, Optional
from dataclasses import dataclass, field
import websockets
from websockets.server import WebSocketServerProtocol
logger = logging.getLogger(__name__)
@dataclass
class DocumentRoom:
document_id: str
clients: Set[WebSocketServerProtocol] = field(default_factory=set)
awareness_states: Dict[int, dict] = field(default_factory=dict)
# Ultimo stato persistito del documento (aggiornamenti Yjs binari)
persisted_state: Optional[bytes] = None
class YjsWebSocketServer:
"""
Server WebSocket per sincronizzazione Yjs multi-room.
Ogni 'room' corrisponde a un documento collaborativo.
"""
MESSAGE_SYNC = 0
MESSAGE_AWARENESS = 1
MESSAGE_AUTH = 2
def __init__(
self,
redis_client,
db,
auth_service,
port: int = 8080,
):
self.redis = redis_client
self.db = db
self.auth = auth_service
self.port = port
# rooms: document_id -> DocumentRoom
self.rooms: Dict[str, DocumentRoom] = {}
async def handle_client(self, websocket: WebSocketServerProtocol, path: str):
"""
Handler principale per ogni connessione WebSocket.
path: /collab/{document_id}
"""
# Estrai document_id dal path
try:
document_id = path.split("/collab/")[1]
except (IndexError, AttributeError):
await websocket.close(4001, "Invalid path")
return
# Autenticazione: primo messaggio deve essere il token
try:
auth_msg = await asyncio.wait_for(websocket.recv(), timeout=5.0)
auth_data = json.loads(auth_msg)
user = await self.auth.verify_token(auth_data.get("token", ""))
if not user:
await websocket.close(4003, "Unauthorized")
return
# Verifica accesso al documento
if not await self.auth.can_access_document(user.id, document_id):
await websocket.close(4003, "Forbidden")
return
except asyncio.TimeoutError:
await websocket.close(4002, "Auth timeout")
return
# Unisci alla room del documento
room = self._get_or_create_room(document_id)
room.clients.add(websocket)
try:
# Invia stato corrente al nuovo client (sync iniziale)
await self._send_initial_state(websocket, document_id, room)
# Loop messaggi
async for message in websocket:
await self._handle_message(websocket, room, message, user)
except websockets.ConnectionClosed:
pass
finally:
room.clients.discard(websocket)
room.awareness_states.pop(id(websocket), None)
# Notifica agli altri client che questo utente e andato offline
await self._broadcast_awareness(room, exclude=websocket)
if not room.clients:
# Ultima persone nella room: persisti e rimuovi
await self._persist_document(document_id, room)
del self.rooms[document_id]
def _get_or_create_room(self, document_id: str) -> DocumentRoom:
if document_id not in self.rooms:
self.rooms[document_id] = DocumentRoom(document_id=document_id)
return self.rooms[document_id]
async def _send_initial_state(
self,
websocket: WebSocketServerProtocol,
document_id: str,
room: DocumentRoom,
) -> None:
"""Invia lo stato corrente del documento al nuovo client."""
# Carica da Redis (cache) o PostgreSQL
state = room.persisted_state or await self._load_document(document_id)
if state:
# Messaggio di sync: tipo 0 = SYNC_STEP1 in protocollo Yjs
await websocket.send(
bytes([self.MESSAGE_SYNC, 0]) + state
)
# Invia stati di awareness degli altri utenti
if room.awareness_states:
awareness_payload = json.dumps({
"clients": room.awareness_states
}).encode()
await websocket.send(bytes([self.MESSAGE_AWARENESS]) + awareness_payload)
async def _handle_message(
self,
websocket: WebSocketServerProtocol,
room: DocumentRoom,
message: bytes,
user,
) -> None:
"""Processa un messaggio dal client e lo propaga."""
if not isinstance(message, bytes) or not message:
return
msg_type = message[0]
payload = message[1:]
if msg_type == self.MESSAGE_SYNC:
# Aggiornamento documento: broadcast a tutti gli altri client
await self._broadcast(room, message, exclude=websocket)
# Aggiorna stato in memoria e schedula persistenza
await self._update_document_state(room, payload)
elif msg_type == self.MESSAGE_AWARENESS:
# Aggiornamento presenza: cursori, selezioni, utenti online
client_id = id(websocket)
room.awareness_states[client_id] = {
"user": user.to_dict(),
"payload": payload.decode("utf-8", errors="replace"),
}
await self._broadcast_awareness(room, exclude=None)
async def _broadcast(
self,
room: DocumentRoom,
message: bytes,
exclude: Optional[WebSocketServerProtocol] = None,
) -> None:
"""Invia un messaggio a tutti i client nella room eccetto il mittente."""
dead_clients = set()
for client in room.clients:
if client == exclude:
continue
try:
await client.send(message)
except websockets.ConnectionClosed:
dead_clients.add(client)
room.clients -= dead_clients
async def _broadcast_awareness(self, room: DocumentRoom, exclude=None) -> None:
awareness_payload = json.dumps({
"clients": {
str(cid): state
for cid, state in room.awareness_states.items()
}
}).encode()
message = bytes([self.MESSAGE_AWARENESS]) + awareness_payload
await self._broadcast(room, message, exclude=exclude)
async def _load_document(self, document_id: str) -> Optional[bytes]:
"""Carica documento da Redis o PostgreSQL."""
# Prima prova Redis
cached = await self.redis.get(f"yjsdoc:{document_id}")
if cached:
return cached
# Fallback su PostgreSQL
result = await self.db.execute(
"SELECT yjs_state FROM collaborative_documents WHERE id = :did",
{"did": document_id},
)
row = result.fetchone()
if row and row[0]:
state = bytes(row[0])
await self.redis.setex(f"yjsdoc:{document_id}", 3600, state)
return state
return None
async def _persist_document(self, document_id: str, room: DocumentRoom) -> None:
if not room.persisted_state:
return
await self.db.execute(
"""INSERT INTO collaborative_documents (id, yjs_state, updated_at)
VALUES (:did, :state, NOW())
ON CONFLICT (id) DO UPDATE
SET yjs_state = :state, updated_at = NOW()""",
{"did": document_id, "state": room.persisted_state},
)
await self.db.commit()
await self.redis.setex(f"yjsdoc:{document_id}", 3600, room.persisted_state)
async def _update_document_state(self, room: DocumentRoom, update: bytes) -> None:
"""Applica un update al documento in memoria."""
# In produzione usa una libreria Yjs lato server (y-py) per merge corretto
# Qui semplificato: conserva l'ultimo update ricevuto
room.persisted_state = update
async def start(self):
async with websockets.serve(self.handle_client, "0.0.0.0", self.port):
logger.info(f"Yjs WebSocket server in ascolto su porta {self.port}")
await asyncio.Future() # Loop infinito
4. スケーリング: 単一サーバーからクラスターへ
単一の WebSocket サーバーでは、数千のドキュメントを同時に処理できません。 水平方向にスケーリングするには、次を使用します。 Redis パブリッシュ/サブスクライブ メッセージバスとして サーバー間: サーバー A 上のクライアントが更新を送信すると、サーバー A はそれを公開します Redis 上にあり、サーバー B (同じドキュメントに接続されている他のクライアントがある) がそれを受信します。 そしてそれをクライアントに転送します。
# server/collab_cluster.py
import asyncio
import json
from typing import Dict, Optional
import redis.asyncio as aioredis
class ClusteredYjsServer:
"""
Estensione del server Yjs per deployment in cluster.
Usa Redis Pub/Sub per sincronizzare i nodi del cluster.
"""
def __init__(self, base_server: "YjsWebSocketServer", redis_url: str):
self.base = base_server
self.redis_url = redis_url
self.pub_redis: Optional[aioredis.Redis] = None
self.sub_redis: Optional[aioredis.Redis] = None
async def start_cluster_sync(self):
"""Avvia la sincronizzazione tra nodi del cluster via Redis."""
self.pub_redis = await aioredis.from_url(self.redis_url)
self.sub_redis = await aioredis.from_url(self.redis_url)
pubsub = self.sub_redis.pubsub()
await pubsub.psubscribe("yjs:*") # Subscribe a tutti i canali documento
asyncio.create_task(self._listen_cluster_messages(pubsub))
logger.info("Cluster sync avviato via Redis Pub/Sub")
async def _listen_cluster_messages(self, pubsub):
"""Ricevi e distribuisci messaggi dagli altri nodi del cluster."""
async for message in pubsub.listen():
if message["type"] not in ("message", "pmessage"):
continue
channel = message["channel"].decode()
document_id = channel.split("yjs:")[-1]
if document_id not in self.base.rooms:
continue # Questo nodo non ha client per questo documento
data = message["data"]
room = self.base.rooms[document_id]
# Propaga ai client locali (escludi il mittente tramite source_node)
source_node = json.loads(data[:36]) if len(data) > 36 else {}
payload = data[36:]
await self.base._broadcast(room, payload, exclude=None)
async def publish_update(self, document_id: str, update: bytes, node_id: str) -> None:
"""Pubblica un update a tutti i nodi del cluster."""
channel = f"yjs:{document_id}"
# Prefissa con il node_id per evitare loop
message = json.dumps({"node": node_id}).encode()[:36].ljust(36) + update
await self.pub_redis.publish(channel, message)
5. 教育機能: 注釈とスレッド
EdTech プラットフォームには共同編集に加えて機能も必要です 学習の詳細: 注釈 (ハイライトとメモ テキストの一部)、 ディスカッションスレッド (質疑応答 特定のステップに固定されている)、e レビューモード (先生 生徒の作品を直接変更せずにコメントすることもできます)。
// Frontend: src/collaboration/annotations.service.ts
import * as Y from 'yjs';
import { Injectable } from '@angular/core';
export interface Annotation {
id: string;
from: number;
to: number;
type: 'highlight' | 'underline' | 'comment' | 'correction';
color?: string;
authorId: string;
authorName: string;
text?: string;
timestamp: number;
resolved?: boolean;
}
export interface CommentThread {
id: string;
annotationId: string;
replies: CommentReply[];
resolved: boolean;
}
export interface CommentReply {
id: string;
text: string;
authorId: string;
authorName: string;
timestamp: number;
isTeacher: boolean;
}
@Injectable({ providedIn: 'root' })
export class AnnotationsService {
private doc: Y.Doc | null = null;
private annotations: Y.Array<Annotation> | null = null;
private threads: Y.Map<CommentThread> | null = null;
initialize(doc: Y.Doc): void {
this.doc = doc;
this.annotations = doc.getArray<Annotation>('annotations');
this.threads = doc.getMap<CommentThread>('threads');
}
addAnnotation(annotation: Omit<Annotation, 'id' | 'timestamp'>): string {
if (!this.annotations || !this.doc) throw new Error('Non inizializzato');
const id = crypto.randomUUID();
const fullAnnotation: Annotation = {
...annotation,
id,
timestamp: Date.now(),
};
this.doc.transact(() => {
this.annotations!.push([fullAnnotation]);
// Crea thread vuoto se e un commento
if (annotation.type === 'comment') {
this.threads!.set(id, {
id,
annotationId: id,
replies: [],
resolved: false,
});
}
});
return id;
}
addReply(threadId: string, reply: Omit<CommentReply, 'id' | 'timestamp'>): void {
if (!this.threads || !this.doc) return;
const thread = this.threads.get(threadId);
if (!thread) return;
const updatedThread: CommentThread = {
...thread,
replies: [...thread.replies, {
...reply,
id: crypto.randomUUID(),
timestamp: Date.now(),
}],
};
this.doc.transact(() => {
this.threads!.set(threadId, updatedThread);
});
}
resolveThread(threadId: string): void {
if (!this.threads || !this.doc) return;
const thread = this.threads.get(threadId);
if (!thread) return;
this.doc.transact(() => {
this.threads!.set(threadId, { ...thread, resolved: true });
// Segna l'annotazione come risolta
const annotations = this.annotations!.toArray();
const idx = annotations.findIndex(a => a.id === thread.annotationId);
if (idx !== -1) {
this.annotations!.delete(idx, 1);
this.annotations!.insert(idx, [{ ...annotations[idx], resolved: true }]);
}
});
}
getAnnotations(): Annotation[] {
return this.annotations?.toArray() ?? [];
}
getThread(threadId: string): CommentThread | undefined {
return this.threads?.get(threadId);
}
onAnnotationsChange(callback: (annotations: Annotation[]) => void): () => void {
if (!this.annotations) return () => {};
const handler = () => callback(this.getAnnotations());
this.annotations.observe(handler);
return () => this.annotations?.unobserve(handler);
}
}
避けるべきアンチパターン
- CRDT を使用しない共有可変状態: ロック付きのシェア変数と、デッドロックのリスクが高いアンチパターンを使用します。常に CRDT または OT を使用してください。
- フィルターなしでブロードキャスト: すべてのキーストロークをすべてのクライアントに送信するとコストがかかります。更新を送信する前に、クライアント側のデバウンス (50 ~ 100 ミリ秒) を適用します。
- 永続的な意識: プレゼンス状態 (カーソル、オンライン ユーザー) は一時的なものであり、永続的なものではありません。メインドキュメントには保存しないでください。
- ガベージコレクションなし: Yjsでは、削除されたアイテムの「墓石」を蓄積します。定期的なガベージ コレクションを有効にしてメモリ使用量を抑えます。
- ハートビートのない WebSocket: アイドル状態の接続はプロキシ/ロード バランサーによって閉じられます。 30 秒ごとにピンポンを実装します。
- WebSocket で認証がありません: WebSocket は標準の HTTP 認証をバイパスします。接続するときは必ずトークンを確認してください。
結論と次のステップ
私たちは EdTech のための完全なリアルタイム コラボレーション システムを構築しました。 Yjs を使用した CRDT による共有ドキュメントの競合のない管理、WebSocket リアルタイム同期、カーソルとオンライン ユーザーの存在、 特定の教育機能に関する注釈とディスカッション スレッド、 スケーラビリティのための Redis Pub/Sub を備えたクラスター アーキテクチャ。
CRDT + オフラインファーストの組み合わせにより、このシステムは特に適しています 接続が常に保証されていない教育環境の場合: 学生 オフラインでも作業を続けることができ、変更内容は同期されます 接続が再び利用可能になると自動的に接続されます。
次の記事では、 オフラインファーストのアーキテクチャ EdTech モバイル アプリ向け: IndexedDB、Service Workers、同期戦略、 接続がなくても学習を保証するための段階的な強化。
EdTechエンジニアリングシリーズ
- スケーラブルな LMS アーキテクチャ: マルチテナント パターン
- 適応学習アルゴリズム: 理論から本番まで
- 教育向けビデオ ストリーミング: WebRTC vs HLS vs DASH
- AI 監督システム: コンピューター ビジョンによるプライバシー最優先
- LLM を使用した個別の家庭教師: 知識の基礎を築くための RAG
- ゲーミフィケーション エンジン: アーキテクチャとステート マシン
- ラーニング アナリティクス: xAPI と Kafka を使用したデータ パイプライン
- EdTech におけるリアルタイム コラボレーション: CRDT と WebSocket (この記事)
- モバイルファーストの EdTech: オフラインファーストのアーキテクチャ
- マルチテナントコンテンツ管理: バージョン管理と SCORM







