EdTech의 실시간 협업: CRDT 및 WebSocket
Google Docs는 문서에 대한 동기식 협업이 가능하다는 것을 입증했습니다. Notion, Figma, Miro는 이 패러다임을 무한한 노트, 디자인 및 화이트보드로 확장했습니다. 이제 EdTech 플랫폼은 협업 학습에 동일한 경험을 제공하려고 합니다. 동일한 문서, 코드 또는 연습 문제를 실시간으로 작업하는 학생들, 충돌 없이 다른 사람의 커서, 즉각적인 변경 사항을 볼 수 있습니다.
근본적인 문제는 분산 경쟁: 학생이 2명일 때 그들은 다른 컴퓨터에서 동시에 동일한 텍스트를 편집합니다. 어느 편집이 이기나요? 단순히 "마지막 쓰기 승리"는 허용할 수 없는 결과를 생성합니다(손실 변경 사항). 두 가지 표준 솔루션이 있습니다. 운영 혁신(OT) (Google Docs에서 사용) e 충돌 없는 복제 데이터 유형(CRDT). 2024~2025년에 CRDT는 생산 성숙기에 도달했으며 점점 더 발전하고 있습니다. 단순성 덕분에 새로운 구현을 위한 사실상의 표준 개념적, 수학적 견고성.
이 기사에서는 EdTech를 위한 실시간 협업 시스템을 구축하겠습니다. Yjs(가장 널리 사용되는 CRDT), WebSocket을 사용한 공동 문서 편집 동기화를 위한 학습을 위한 특정 기능 공유 주석, 명명된 커서 및 스레드 주석.
이 기사에서 배울 내용
- CRDT와 OT: 차이점, 장점 및 선택 시기
- Yjs: 내부 구조, 데이터 유형 및 Y.Doc
- y-websocket을 사용한 WebSocket 동기화 서버
- 현재 상태: 명명된 커서, 선택 항목 및 온라인 사용자
- 지속성: CRDT 상태 저장 및 로드
- 교육용 주석: 댓글, 하이라이트 및 토론 스레드
- CRDT를 사용한 오프라인 우선: 재연결 시 자동 동기화
- 확장: 단일 서버에서 Redis pub/sub를 사용하는 클러스터로
1. CRDT와 운영 혁신
운영 혁신(OT) 작업을 적용하기 전에 변환 동시 변경 사항을 고려합니다. 잘 작동하지만 중앙 서버가 필요합니다 변환을 조정합니다. Google Docs는 서버와 함께 이 아키텍처를 사용합니다. 중앙 집중식 변환.
CRDT 근본적으로 다른 방식으로 문제를 해결합니다: 데이터 구조 그들은 수학적으로 다음과 같이 설계되었습니다. 병합-충돌 없이 가능합니다. 작업은 다음과 같습니다 교환적 (관계없이 같은 결과 주문에서) e 멱등성 (동일한 작업을 두 번 적용 결과는 변경되지 않습니다). 이를 통해 중앙 조정자가 필요하지 않습니다. 자동 동기화를 통해 오프라인 작업이 가능합니다.
OT와 CRDT 비교
| 나는 기다린다 | OT(구글 문서도구) | CRDT(Yjs, 자동 병합) |
|---|---|---|
| 중앙 코디네이터 | 필수의 | 필요하지 않음 |
| 오프라인 지원 | 제한된 | 토종의 |
| 구현 복잡성 | 높은 | 미디어(라이브러리 포함) |
| 수학적 보장 | 구현에 따라 다릅니다. | 공식적으로 테스트됨 |
| 메모리 오버헤드 | 베이스 | 중간(삭제된 항목의 묘비) |
| 성숙한 도서관 | ShareDB, OT.js | Yjs, 오토머지, 콜라 |
| 확장성 | 수직(중앙 서버) | 수평형(P2P 또는 다중 서버) |
| 채택 2025 | 레거시(기존 시스템) | 새로운 시스템에 대한 표준 |
2. Yjs: 교육 기술을 위한 CRDT
Yjs 2025년에 가장 성숙하고 채택된 CRDT 라이브러리입니다.
모든 주요 편집자(ProseMirror, TipTap, CodeMirror, Monaco)에 대한 바인딩을 제공합니다.
WebSocket, WebRTC 및 IndexedDB에 대한 지원과 이를 기반으로 하는 간단한 API
Y.Doc (공동 작업 문서) 및 공유 데이터 유형입니다.
Yjs e의 기본 단위 Y.Doc: 다음을 포함하는 문서
공유 데이터 구조의 계층 구조. 이 데이터에 대한 변경 사항이 발생합니다.
자동으로 직렬화됨 바이너리 업데이트 가능한 한 컴팩트하다
모든 피어에게 전송되고 충돌 없이 순서에 관계없이 적용됩니다.
// Frontend: src/collaboration/collaborative-editor.ts
import * as Y from 'yjs';
import { WebsocketProvider } from 'y-websocket';
import { QuillBinding } from 'y-quill';
import { IndexeddbPersistence } from 'y-indexeddb';
interface CollabUser {
name: string;
color: string;
avatar: string;
studentId: string;
}
export class CollaborativeEditorSession {
private doc: Y.Doc;
private wsProvider: WebsocketProvider;
private dbProvider: IndexeddbPersistence;
private text: Y.Text;
private annotations: Y.Array<any>;
private comments: Y.Map<any>;
constructor(
private readonly documentId: string,
private readonly user: CollabUser,
private readonly wsUrl: string,
) {
// Crea il documento collaborativo
this.doc = new Y.Doc();
// Strutture dati condivise
this.text = this.doc.getText('content');
this.annotations = this.doc.getArray('annotations');
this.comments = this.doc.getMap('comments');
}
async initialize(): Promise<void> {
// Persistenza locale con IndexedDB (supporto offline)
this.dbProvider = new IndexeddbPersistence(
`edtech-doc-${this.documentId}`,
this.doc,
);
await new Promise<void>((resolve) => {
this.dbProvider.on('synced', () => {
console.log('Documento caricato da IndexedDB');
resolve();
});
});
// Connessione WebSocket per sync real-time
this.wsProvider = new WebsocketProvider(
this.wsUrl,
`doc-${this.documentId}`,
this.doc,
{
connect: true,
awareness: this.createAwareness(),
},
);
this.wsProvider.on('status', (event: { status: string }) => {
console.log('WebSocket status:', event.status);
});
// Gestione presenza: cursori degli altri utenti
this.setupPresence();
}
private createAwareness() {
// Awareness e il meccanismo Yjs per stato temporaneo (presenza, cursori)
// NON fa parte del documento persistente, solo stato effimero
return {
getLocalState: () => ({
user: this.user,
cursor: null, // Aggiornato durante l'editing
selection: null,
lastSeen: Date.now(),
}),
};
}
private setupPresence(): void {
const awareness = this.wsProvider.awareness;
// Imposta stato locale dell'utente
awareness.setLocalStateField('user', this.user);
// Ascolta cambiamenti nella presenza degli altri utenti
awareness.on('change', () => {
const states = Array.from(awareness.getStates().entries());
const onlineUsers = states
.filter(([clientId]) => clientId !== this.doc.clientID)
.map(([, state]) => state.user)
.filter(Boolean);
this.onUsersChanged(onlineUsers);
});
}
updateCursor(position: number, selection: { from: number; to: number } | null): void {
this.wsProvider.awareness.setLocalStateField('cursor', position);
this.wsProvider.awareness.setLocalStateField('selection', selection);
}
addAnnotation(annotation: {
from: number;
to: number;
type: 'highlight' | 'underline' | 'comment';
comment?: string;
authorId: string;
}): void {
this.doc.transact(() => {
this.annotations.push([{
id: crypto.randomUUID(),
...annotation,
createdAt: Date.now(),
}]);
});
}
addComment(commentId: string, comment: {
text: string;
authorId: string;
authorName: string;
replyTo?: string;
}): void {
this.doc.transact(() => {
const thread = this.comments.get(commentId) || [];
this.comments.set(commentId, [...thread, {
id: crypto.randomUUID(),
...comment,
timestamp: Date.now(),
}]);
});
}
getAnnotations(): any[] {
return this.annotations.toArray();
}
getComments(): Map<string, any[]> {
return new Map(Array.from(this.comments.entries()));
}
protected onUsersChanged(users: CollabUser[]): void {
// Override in componente per aggiornare UI
console.log('Utenti online:', users.map(u => u.name));
}
destroy(): void {
this.wsProvider?.destroy();
this.dbProvider?.destroy();
this.doc?.destroy();
}
}
3. 웹소켓 동기화 서버
WebSocket Yjs 서버는 클라이언트로부터 업데이트 수집,
동일한 문서에 연결된 모든 피어에게 배포하고 상태를 유지합니다.
신규 고객 연결을 위한 문서입니다. 사용하자 y-websocket
이를 기반으로 인증, 테넌트별 권한 부여로 확장합니다.
Redis/PostgreSQL의 지속성.
# server/collab_server.py
import asyncio
import json
import logging
from typing import Dict, Set, Optional
from dataclasses import dataclass, field
import websockets
from websockets.server import WebSocketServerProtocol
logger = logging.getLogger(__name__)
@dataclass
class DocumentRoom:
document_id: str
clients: Set[WebSocketServerProtocol] = field(default_factory=set)
awareness_states: Dict[int, dict] = field(default_factory=dict)
# Ultimo stato persistito del documento (aggiornamenti Yjs binari)
persisted_state: Optional[bytes] = None
class YjsWebSocketServer:
"""
Server WebSocket per sincronizzazione Yjs multi-room.
Ogni 'room' corrisponde a un documento collaborativo.
"""
MESSAGE_SYNC = 0
MESSAGE_AWARENESS = 1
MESSAGE_AUTH = 2
def __init__(
self,
redis_client,
db,
auth_service,
port: int = 8080,
):
self.redis = redis_client
self.db = db
self.auth = auth_service
self.port = port
# rooms: document_id -> DocumentRoom
self.rooms: Dict[str, DocumentRoom] = {}
async def handle_client(self, websocket: WebSocketServerProtocol, path: str):
"""
Handler principale per ogni connessione WebSocket.
path: /collab/{document_id}
"""
# Estrai document_id dal path
try:
document_id = path.split("/collab/")[1]
except (IndexError, AttributeError):
await websocket.close(4001, "Invalid path")
return
# Autenticazione: primo messaggio deve essere il token
try:
auth_msg = await asyncio.wait_for(websocket.recv(), timeout=5.0)
auth_data = json.loads(auth_msg)
user = await self.auth.verify_token(auth_data.get("token", ""))
if not user:
await websocket.close(4003, "Unauthorized")
return
# Verifica accesso al documento
if not await self.auth.can_access_document(user.id, document_id):
await websocket.close(4003, "Forbidden")
return
except asyncio.TimeoutError:
await websocket.close(4002, "Auth timeout")
return
# Unisci alla room del documento
room = self._get_or_create_room(document_id)
room.clients.add(websocket)
try:
# Invia stato corrente al nuovo client (sync iniziale)
await self._send_initial_state(websocket, document_id, room)
# Loop messaggi
async for message in websocket:
await self._handle_message(websocket, room, message, user)
except websockets.ConnectionClosed:
pass
finally:
room.clients.discard(websocket)
room.awareness_states.pop(id(websocket), None)
# Notifica agli altri client che questo utente e andato offline
await self._broadcast_awareness(room, exclude=websocket)
if not room.clients:
# Ultima persone nella room: persisti e rimuovi
await self._persist_document(document_id, room)
del self.rooms[document_id]
def _get_or_create_room(self, document_id: str) -> DocumentRoom:
if document_id not in self.rooms:
self.rooms[document_id] = DocumentRoom(document_id=document_id)
return self.rooms[document_id]
async def _send_initial_state(
self,
websocket: WebSocketServerProtocol,
document_id: str,
room: DocumentRoom,
) -> None:
"""Invia lo stato corrente del documento al nuovo client."""
# Carica da Redis (cache) o PostgreSQL
state = room.persisted_state or await self._load_document(document_id)
if state:
# Messaggio di sync: tipo 0 = SYNC_STEP1 in protocollo Yjs
await websocket.send(
bytes([self.MESSAGE_SYNC, 0]) + state
)
# Invia stati di awareness degli altri utenti
if room.awareness_states:
awareness_payload = json.dumps({
"clients": room.awareness_states
}).encode()
await websocket.send(bytes([self.MESSAGE_AWARENESS]) + awareness_payload)
async def _handle_message(
self,
websocket: WebSocketServerProtocol,
room: DocumentRoom,
message: bytes,
user,
) -> None:
"""Processa un messaggio dal client e lo propaga."""
if not isinstance(message, bytes) or not message:
return
msg_type = message[0]
payload = message[1:]
if msg_type == self.MESSAGE_SYNC:
# Aggiornamento documento: broadcast a tutti gli altri client
await self._broadcast(room, message, exclude=websocket)
# Aggiorna stato in memoria e schedula persistenza
await self._update_document_state(room, payload)
elif msg_type == self.MESSAGE_AWARENESS:
# Aggiornamento presenza: cursori, selezioni, utenti online
client_id = id(websocket)
room.awareness_states[client_id] = {
"user": user.to_dict(),
"payload": payload.decode("utf-8", errors="replace"),
}
await self._broadcast_awareness(room, exclude=None)
async def _broadcast(
self,
room: DocumentRoom,
message: bytes,
exclude: Optional[WebSocketServerProtocol] = None,
) -> None:
"""Invia un messaggio a tutti i client nella room eccetto il mittente."""
dead_clients = set()
for client in room.clients:
if client == exclude:
continue
try:
await client.send(message)
except websockets.ConnectionClosed:
dead_clients.add(client)
room.clients -= dead_clients
async def _broadcast_awareness(self, room: DocumentRoom, exclude=None) -> None:
awareness_payload = json.dumps({
"clients": {
str(cid): state
for cid, state in room.awareness_states.items()
}
}).encode()
message = bytes([self.MESSAGE_AWARENESS]) + awareness_payload
await self._broadcast(room, message, exclude=exclude)
async def _load_document(self, document_id: str) -> Optional[bytes]:
"""Carica documento da Redis o PostgreSQL."""
# Prima prova Redis
cached = await self.redis.get(f"yjsdoc:{document_id}")
if cached:
return cached
# Fallback su PostgreSQL
result = await self.db.execute(
"SELECT yjs_state FROM collaborative_documents WHERE id = :did",
{"did": document_id},
)
row = result.fetchone()
if row and row[0]:
state = bytes(row[0])
await self.redis.setex(f"yjsdoc:{document_id}", 3600, state)
return state
return None
async def _persist_document(self, document_id: str, room: DocumentRoom) -> None:
if not room.persisted_state:
return
await self.db.execute(
"""INSERT INTO collaborative_documents (id, yjs_state, updated_at)
VALUES (:did, :state, NOW())
ON CONFLICT (id) DO UPDATE
SET yjs_state = :state, updated_at = NOW()""",
{"did": document_id, "state": room.persisted_state},
)
await self.db.commit()
await self.redis.setex(f"yjsdoc:{document_id}", 3600, room.persisted_state)
async def _update_document_state(self, room: DocumentRoom, update: bytes) -> None:
"""Applica un update al documento in memoria."""
# In produzione usa una libreria Yjs lato server (y-py) per merge corretto
# Qui semplificato: conserva l'ultimo update ricevuto
room.persisted_state = update
async def start(self):
async with websockets.serve(self.handle_client, "0.0.0.0", self.port):
logger.info(f"Yjs WebSocket server in ascolto su porta {self.port}")
await asyncio.Future() # Loop infinito
4. 확장: 단일 서버에서 클러스터로
단일 WebSocket 서버는 수천 개의 동시 문서를 처리할 수 없습니다. 수평으로 확장하려면 다음을 사용합니다. Redis 게시/구독 메시지 버스로 서버 간: 서버 A의 클라이언트가 업데이트를 보내면 서버 A가 이를 게시합니다. Redis에서는 서버 B(동일한 문서에 연결된 다른 클라이언트가 있음)가 이를 수신합니다. 그리고 그것을 고객에게 전달합니다.
# server/collab_cluster.py
import asyncio
import json
from typing import Dict, Optional
import redis.asyncio as aioredis
class ClusteredYjsServer:
"""
Estensione del server Yjs per deployment in cluster.
Usa Redis Pub/Sub per sincronizzare i nodi del cluster.
"""
def __init__(self, base_server: "YjsWebSocketServer", redis_url: str):
self.base = base_server
self.redis_url = redis_url
self.pub_redis: Optional[aioredis.Redis] = None
self.sub_redis: Optional[aioredis.Redis] = None
async def start_cluster_sync(self):
"""Avvia la sincronizzazione tra nodi del cluster via Redis."""
self.pub_redis = await aioredis.from_url(self.redis_url)
self.sub_redis = await aioredis.from_url(self.redis_url)
pubsub = self.sub_redis.pubsub()
await pubsub.psubscribe("yjs:*") # Subscribe a tutti i canali documento
asyncio.create_task(self._listen_cluster_messages(pubsub))
logger.info("Cluster sync avviato via Redis Pub/Sub")
async def _listen_cluster_messages(self, pubsub):
"""Ricevi e distribuisci messaggi dagli altri nodi del cluster."""
async for message in pubsub.listen():
if message["type"] not in ("message", "pmessage"):
continue
channel = message["channel"].decode()
document_id = channel.split("yjs:")[-1]
if document_id not in self.base.rooms:
continue # Questo nodo non ha client per questo documento
data = message["data"]
room = self.base.rooms[document_id]
# Propaga ai client locali (escludi il mittente tramite source_node)
source_node = json.loads(data[:36]) if len(data) > 36 else {}
payload = data[36:]
await self.base._broadcast(room, payload, exclude=None)
async def publish_update(self, document_id: str, update: bytes, node_id: str) -> None:
"""Pubblica un update a tutti i nodi del cluster."""
channel = f"yjs:{document_id}"
# Prefissa con il node_id per evitare loop
message = json.dumps({"node": node_id}).encode()[:36].ljust(36) + update
await self.pub_redis.publish(channel, message)
5. 교육 기능: 주석 및 스레드
공동 편집 외에도 EdTech 플랫폼에는 기능이 필요합니다. 학습 세부사항: 주석 (하이라이트 및 메모 텍스트의 일부), 토론 스레드 (질문과 답변 특정 단계에 고정됨), e 검토 모드 (선생님 직접 수정하지 않고 학생의 작업에 대한 의견).
// Frontend: src/collaboration/annotations.service.ts
import * as Y from 'yjs';
import { Injectable } from '@angular/core';
export interface Annotation {
id: string;
from: number;
to: number;
type: 'highlight' | 'underline' | 'comment' | 'correction';
color?: string;
authorId: string;
authorName: string;
text?: string;
timestamp: number;
resolved?: boolean;
}
export interface CommentThread {
id: string;
annotationId: string;
replies: CommentReply[];
resolved: boolean;
}
export interface CommentReply {
id: string;
text: string;
authorId: string;
authorName: string;
timestamp: number;
isTeacher: boolean;
}
@Injectable({ providedIn: 'root' })
export class AnnotationsService {
private doc: Y.Doc | null = null;
private annotations: Y.Array<Annotation> | null = null;
private threads: Y.Map<CommentThread> | null = null;
initialize(doc: Y.Doc): void {
this.doc = doc;
this.annotations = doc.getArray<Annotation>('annotations');
this.threads = doc.getMap<CommentThread>('threads');
}
addAnnotation(annotation: Omit<Annotation, 'id' | 'timestamp'>): string {
if (!this.annotations || !this.doc) throw new Error('Non inizializzato');
const id = crypto.randomUUID();
const fullAnnotation: Annotation = {
...annotation,
id,
timestamp: Date.now(),
};
this.doc.transact(() => {
this.annotations!.push([fullAnnotation]);
// Crea thread vuoto se e un commento
if (annotation.type === 'comment') {
this.threads!.set(id, {
id,
annotationId: id,
replies: [],
resolved: false,
});
}
});
return id;
}
addReply(threadId: string, reply: Omit<CommentReply, 'id' | 'timestamp'>): void {
if (!this.threads || !this.doc) return;
const thread = this.threads.get(threadId);
if (!thread) return;
const updatedThread: CommentThread = {
...thread,
replies: [...thread.replies, {
...reply,
id: crypto.randomUUID(),
timestamp: Date.now(),
}],
};
this.doc.transact(() => {
this.threads!.set(threadId, updatedThread);
});
}
resolveThread(threadId: string): void {
if (!this.threads || !this.doc) return;
const thread = this.threads.get(threadId);
if (!thread) return;
this.doc.transact(() => {
this.threads!.set(threadId, { ...thread, resolved: true });
// Segna l'annotazione come risolta
const annotations = this.annotations!.toArray();
const idx = annotations.findIndex(a => a.id === thread.annotationId);
if (idx !== -1) {
this.annotations!.delete(idx, 1);
this.annotations!.insert(idx, [{ ...annotations[idx], resolved: true }]);
}
});
}
getAnnotations(): Annotation[] {
return this.annotations?.toArray() ?? [];
}
getThread(threadId: string): CommentThread | undefined {
return this.threads?.get(threadId);
}
onAnnotationsChange(callback: (annotations: Annotation[]) => void): () => void {
if (!this.annotations) return () => {};
const handler = () => callback(this.getAnnotations());
this.annotations.observe(handler);
return () => this.annotations?.unobserve(handler);
}
}
피해야 할 안티패턴
- CRDT가 없는 공유 변경 가능 상태: 교착 상태 위험이 높은 잠금 및 안티 패턴이 있는 공유 변수를 사용하십시오. 항상 CRDT 또는 OT를 사용하십시오.
- 필터 없이 방송: 모든 키 입력을 모든 클라이언트에 보내는 것은 비용이 많이 듭니다. 업데이트를 보내기 전에 클라이언트 측 디바운스(50~100ms)를 적용하세요.
- 지속적인 인식: 현재 상태(커서, 온라인 사용자)는 영구적이지 않고 일시적입니다. 기본 문서에 저장하지 마세요.
- 가비지 수집 없음: Yjs는 삭제된 항목에 대한 "삭제 표시"를 축적합니다. 메모리 사용량을 낮추려면 주기적인 가비지 수집을 활성화하세요.
- 하트비트가 없는 WebSocket: 유휴 연결은 프록시/로드 밸런서에 의해 닫힙니다. 30초마다 핑/퐁을 구현합니다.
- WebSocket에 대한 인증 없음: WebSocket은 표준 HTTP 인증을 우회합니다. 연결할 때 항상 토큰을 확인하세요.
결론 및 다음 단계
우리는 EdTech를 위한 완벽한 실시간 협업 시스템을 구축했습니다. 공유 문서의 충돌 없는 관리를 위한 Yjs를 사용한 CRDT, WebSocket 실시간 동기화, 커서 및 온라인 사용자의 존재, 특정 교육 기능에 대한 주석 및 토론 스레드, 확장성을 위해 Redis Pub/Sub를 사용한 클러스터 아키텍처.
CRDT + 오프라인 우선 조합으로 인해 이 시스템이 특히 적합해졌습니다. 연결이 항상 보장되지 않는 교육적 맥락: 학생 오프라인으로 계속 작업할 수 있으며 변경 내용이 동기화됩니다. 다시 연결이 가능해지면 자동으로.
다음 기사에서는 이에 대해 다루겠습니다. 오프라인 우선 아키텍처 EdTech 모바일 앱용: IndexedDB, Service Workers, 동기화 전략 및 연결 없이도 학습을 보장하는 점진적인 향상.
EdTech 엔지니어링 시리즈
- 확장 가능한 LMS 아키텍처: 다중 테넌트 패턴
- 적응형 학습 알고리즘: 이론에서 생산까지
- 교육용 비디오 스트리밍: WebRTC, HLS, DASH
- AI 감독 시스템: 컴퓨터 비전을 통한 개인정보 보호 우선
- LLM을 통한 맞춤형 교사: 지식 기반 구축을 위한 RAG
- 게임화 엔진: 아키텍처 및 상태 머신
- 학습 분석: xAPI 및 Kafka를 사용한 데이터 파이프라인
- EdTech의 실시간 협업: CRDT 및 WebSocket(이 문서)
- 모바일 우선 교육 기술: 오프라인 우선 아키텍처
- 다중 테넌트 콘텐츠 관리: 버전 관리 및 SCORM







