Server Setup and Implementation
The @vuer-ai/vuer-rtc-server package provides production-ready server infrastructure for real-time
collaborative applications using CRDT operations. It includes MongoDB persistence, WebSocket transport,
and comprehensive state management.
Installation
Install the server package in your Node.js project:
pnpm add @vuer-ai/vuer-rtc-serverThe package requires Node.js 18+ and the following dependencies (installed automatically):
@vuer-ai/vuer-rtc- Core CRDT operations@prisma/client- Database ORMws- WebSocket server
Environment Configuration
Create a .env file in your project root:
# Required: MongoDB connection string with replica set
DATABASE_URL="mongodb://localhost:27017/vuer-rtc?replicaSet=rs0"
# Optional: Server port (default: 8080)
PORT=8080
# Optional: Redis URL (for multi-instance scaling)
REDIS_URL="redis://localhost:6379"Required Environment Variables
| Variable | Description | Example |
|---|---|---|
DATABASE_URL | MongoDB connection string with replica set | mongodb://localhost:27017/vuer-rtc?replicaSet=rs0 |
Optional Environment Variables
| Variable | Default | Description |
|---|---|---|
PORT | 8080 | HTTP/WebSocket server port |
REDIS_URL | - | Redis connection URL (required for multi-instance deployments) |
MongoDB Replica Set Setup
The server requires MongoDB with replica set support for transactions and change streams.
Development (Local)
Use Docker Compose for quick local setup:
# Clone or download docker-compose.yml from the repository
curl -O https://raw.githubusercontent.com/vuer-ai/vuer-rtc/main/docker/docker-compose.yml
# Start MongoDB and Redis
docker compose up -d
# Wait for MongoDB replica set initialization (about 30 seconds)
docker compose logs -f mongoThe docker-compose.yml configuration:
- MongoDB 7 with replica set
rs0on port 27017 - Redis 7 on port 6379
- Automatic replica set initialization via healthcheck
Production Setup
For production deployments, use a managed MongoDB service:
MongoDB Atlas (Recommended):
DATABASE_URL="mongodb+srv://username:password@cluster.mongodb.net/vuer-rtc?retryWrites=true&w=majority"Self-Hosted Replica Set:
# Initialize replica set on primary server
mongosh
> rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "mongo1.example.com:27017" },
{ _id: 1, host: "mongo2.example.com:27017" },
{ _id: 2, host: "mongo3.example.com:27017" }
]
})
# Connection string
DATABASE_URL="mongodb://mongo1.example.com:27017,mongo2.example.com:27017,mongo3.example.com:27017/vuer-rtc?replicaSet=rs0"Database Schema Initialization
After configuring DATABASE_URL, initialize the Prisma schema:
# Generate Prisma client
pnpm exec prisma generate
# Push schema to database (creates collections)
pnpm exec prisma db pushThis creates 4 MongoDB collections:
| Collection | Purpose |
|---|---|
Document | Scene metadata and current state snapshots |
Operation | Individual CRDT operations with vector clocks |
JournalBatch | Batched write-ahead log (33ms batching) |
Session | Client connections, presence, and clock state |
Redis Configuration (Optional)
Redis is required for horizontal scaling across multiple server instances. The broker uses Redis pub/sub to synchronize room state and member presence.
Development (Local)
Redis is included in the Docker Compose setup:
docker compose up -d redisProduction Setup
Use a managed Redis service:
# Redis Cloud, AWS ElastiCache, etc.
REDIS_URL="redis://username:password@redis.example.com:6379"
# Redis Cluster
REDIS_URL="redis://redis-cluster.example.com:6379?cluster=true"Scaling Considerations
- Single Instance: No Redis required, uses
InMemoryBroker - Multi-Instance: Redis required for
RedisBroker(not yet implemented) - Load Balancing: Use sticky sessions or implement Redis-backed broker
Starting the Server
Basic Usage
import { createServer } from 'http';
import { WebSocketServer } from 'ws';
import { InMemoryBroker, RTCServer } from '@vuer-ai/vuer-rtc-server';
import { createPrismaClient } from '@vuer-ai/vuer-rtc-server/persistence';
import { JournalService } from '@vuer-ai/vuer-rtc-server/journal';
const PORT = Number(process.env.PORT) || 8080;
const prisma = createPrismaClient();
const broker = new InMemoryBroker();
const journalService = new JournalService(prisma);
// Wire broker clocks into journal for safe compaction
journalService.setMemberClockProvider(async (docId: string) => {
const doc = await prisma.document.findUnique({ where: { id: docId } });
if (!doc) return [];
const members = await broker.getMembers(doc.name);
return Array.from(members.values())
.filter(m => m.connected)
.map(m => m.vectorClock);
});
journalService.startCompactionLoop();
const rtcServer = new RTCServer(broker, {
async processMessage(roomId: string, msg: any) {
const docId = await ensureDocument(roomId);
return journalService.processMessage(docId, msg);
},
async getStateForClient(roomId: string) {
const docId = await ensureDocument(roomId);
return journalService.getStateForClient(docId);
},
});
const server = createServer((req, res) => {
res.writeHead(200).end('OK');
});
const wss = new WebSocketServer({ server });
wss.on('connection', (ws, req) => {
const url = new URL(req.url ?? '/', `http://${req.headers.host}`);
const match = url.pathname.match(/^\/ws\/([^/]+)$/);
if (!match) {
ws.close(4000, 'Invalid path');
return;
}
const roomId = decodeURIComponent(match[1]);
const sessionId = url.searchParams.get('sessionId');
if (!sessionId) {
ws.close(4001, 'Missing sessionId');
return;
}
rtcServer.handleConnection(ws, roomId, sessionId);
});
server.listen(PORT, () => {
console.log(`Server listening on http://localhost:${PORT}`);
});Using the Built-in Server
The package includes a complete production-ready server:
# Build and run
pnpm build
pnpm serveWebSocket URL format:
ws://localhost:8080/ws/{roomId}?sessionId={sessionId}REST API endpoints:
GET /api/stats- Database statisticsGET /api/documents- List all documentsGET /api/documents/:id- Get document detailsGET /api/documents/:id/journal- Get journal batchesGET /api/rooms/:roomId/state- Get room stateDELETE /api/rooms/:roomId- Clear room (keeps connections)
Production Deployment
Docker Deployment
Create a Dockerfile:
FROM node:18-alpine AS base
RUN corepack enable
FROM base AS deps
WORKDIR /app
COPY package.json pnpm-lock.yaml ./
RUN pnpm install --frozen-lockfile
FROM base AS builder
WORKDIR /app
COPY /app/node_modules ./node_modules
COPY . .
RUN pnpm exec prisma generate
RUN pnpm build
FROM base AS runner
WORKDIR /app
ENV NODE_ENV=production
COPY /app/node_modules ./node_modules
COPY /app/dist ./dist
COPY /app/prisma ./prisma
COPY /app/package.json ./
EXPOSE 8080
CMD ["node", "dist/serve.js"]Build and run:
docker build -t vuer-rtc-server .
docker run -p 8080:8080 \
-e DATABASE_URL="mongodb://mongo:27017/vuer-rtc?replicaSet=rs0" \
vuer-rtc-serverDocker Compose Example
Complete production setup with MongoDB and Redis:
version: '3.8'
services:
server:
build: .
ports:
- "8080:8080"
environment:
DATABASE_URL: mongodb://mongo:27017/vuer-rtc?replicaSet=rs0
REDIS_URL: redis://redis:6379
PORT: 8080
depends_on:
mongo:
condition: service_healthy
redis:
condition: service_started
restart: unless-stopped
mongo:
image: mongo:7
command: ["--replSet", "rs0", "--bind_ip_all"]
volumes:
- mongo-data:/data/db
healthcheck:
test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'localhost:27017'}]}) }" | mongosh --quiet
interval: 5s
timeout: 30s
retries: 30
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis-data:/data
volumes:
mongo-data:
redis-data:Cloud Deployment
Recommended Stack:
- Application: AWS ECS, Google Cloud Run, or Railway
- Database: MongoDB Atlas (M10+ with replica set)
- Cache: Redis Cloud or AWS ElastiCache
- Load Balancer: AWS ALB with WebSocket support
Environment Variables:
DATABASE_URL="mongodb+srv://cluster.mongodb.net/vuer-rtc"
REDIS_URL="redis://redis.cloud:6379"
PORT=8080
NODE_ENV=productionHorizontal Scaling
For multi-instance deployments:
- Use Redis-backed broker (when available)
- Configure sticky sessions on load balancer (WebSocket affinity)
- Share MongoDB cluster across all instances
- Monitor connection limits on MongoDB and Redis
Current limitation: InMemoryBroker only supports single-instance deployment.
RedisBroker implementation is planned for horizontal scaling.
Health Checks and Monitoring
Health Check Endpoint
Add a health check route:
server.on('request', (req, res) => {
if (req.url === '/health') {
prisma.$queryRaw`SELECT 1`
.then(() => res.writeHead(200).end('OK'))
.catch(() => res.writeHead(503).end('Database unavailable'));
}
});Monitoring Metrics
Track these key metrics:
| Metric | Endpoint/Method | Description |
|---|---|---|
| Active connections | wss.clients.size | Current WebSocket connections |
| Room count | broker.getRooms() | Number of active rooms |
| Database size | GET /api/stats/sizes | Storage usage per collection |
| Operation throughput | Custom counter | Messages/second processed |
| Journal batch size | Monitor JournalBatch.operations.length | Average operations per batch |
Example Prometheus Metrics
import { register, Counter, Gauge } from 'prom-client';
const messageCounter = new Counter({
name: 'vuer_rtc_messages_total',
help: 'Total CRDT messages processed',
});
const connectionGauge = new Gauge({
name: 'vuer_rtc_connections',
help: 'Current WebSocket connections',
});
rtcServer.on('message', () => messageCounter.inc());
wss.on('connection', () => connectionGauge.inc());
wss.on('close', () => connectionGauge.dec());
server.on('request', (req, res) => {
if (req.url === '/metrics') {
res.setHeader('Content-Type', register.contentType);
register.metrics().then(metrics => res.end(metrics));
}
});Logging
Configure structured logging for production:
import pino from 'pino';
const logger = pino({
level: process.env.LOG_LEVEL || 'info',
transport: process.env.NODE_ENV === 'development'
? { target: 'pino-pretty' }
: undefined,
});
rtcServer.on('message', (msg) => {
logger.debug({ msgId: msg.id, roomId: msg.roomId }, 'Message processed');
});Performance Tuning
Journal Compaction
Configure compaction frequency in JournalService:
// Default: compact every 5 minutes
journalService.startCompactionLoop({
intervalMs: 5 * 60 * 1000,
minOperations: 1000, // Only compact if > 1000 operations
});Connection Limits
MongoDB and WebSocket limits:
// MongoDB connection pool (in DATABASE_URL)
DATABASE_URL="mongodb://localhost:27017/vuer-rtc?replicaSet=rs0&maxPoolSize=50"
// WebSocket max connections (OS limits apply)
wss.setMaxListeners(10000);Memory Management
Monitor and limit room history:
// Clear old rooms periodically
setInterval(async () => {
const rooms = await broker.getRooms();
for (const roomId of rooms) {
const members = await broker.getMembers(roomId);
if (members.size === 0) {
await rtcServer.clearRoom(roomId);
}
}
}, 60 * 60 * 1000); // Every hourArchitecture Overview
vuer-rtc uses explicit operations where each message specifies its merge behavior via ot.
This simplifies server implementation significantly -- the server just applies operations and
broadcasts.
Architecture Overview
┌────────────────────────────────────────────────────────────────────────────┐
│ Server │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ WebSocket Handler │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │ │
│ │ │ Receive │───▶│ Validate │───▶│ Apply Operations │ │ │
│ │ │ Message │ │ & Dedup │ │ (dispatcher.ts) │ │ │
│ │ └─────────────┘ └─────────────┘ └───────────┬─────────────┘ │ │
│ │ │ │ │
│ │ ┌──────────────────────────────┼─────────────┐ │ │
│ │ │ ▼ │ │ │
│ │ │ ┌─────────────────────────────────────┐ │ │ │
│ │ │ │ SceneGraph │ │ │ │
│ │ │ │ ┌───────────────────────────────┐ │ │ │ │
│ │ │ │ │ nodes: { [key]: SceneNode } │ │ │ │ │
│ │ │ │ │ - position, rotation, etc. │ │ │ │ │
│ │ │ │ │ - schema-free properties │ │ │ │ │
│ │ │ │ └───────────────────────────────┘ │ │ │ │
│ │ │ └─────────────────────────────────────┘ │ │ │
│ │ │ ▲ │ │ │
│ │ │ │ rebuild │ │ │
│ │ │ │ │ │ │
│ │ │ ┌─────────────────┴───────────────────┐ │ │ │
│ │ │ │ Journal (Event Log) │ │ │ │
│ │ │ │ ┌───────────────────────────────┐ │ │ │ │
│ │ │ │ │ Entry { msg, deletedAt? } │ │ │ │ │
│ │ │ │ │ Entry { msg, deletedAt? } │ │ │ │ │
│ │ │ │ │ ... │ │ │ │ │
│ │ │ │ └───────────────────────────────┘ │ │ │ │
│ │ │ └─────────────────────────────────────┘ │ │ │
│ │ │ In-Memory │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────┼───────────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │
│ │ Broadcast │ │ Persist to │ │ Send │ │
│ │ to Clients │ │ MongoDB │ │ Ack │ │
│ └─────────────────┘ └─────────────────┘ └──────────────┘ │
└────────────────────────────────────────────────────────────────────────────┘
│
┌───────────────────────────────┼───────────────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Client A │ │ Client B │ │ Client C │
│ (WebSocket) │ │ (WebSocket) │ │ (WebSocket) │
└───────────────┘ └───────────────┘ └───────────────┘Database Schema
The server persists to 4 MongoDB collections:
| Collection | Purpose |
|---|---|
| Document | Scene metadata + currentState (schema-free JSON snapshot) |
| Operation | Individual CRDT operations with vector clocks |
| JournalBatch | Batched write-ahead log (33ms batching) |
| Session | Client connections, presence, and clock state |
┌─────────────────────────────────────────────────────────────────┐
│ MongoDB │
│ ┌─────────────┐ ┌─────────────┐ ┌────────────┐ ┌─────────┐ │
│ │ Document │ │ Operation │ │ JournalBatch│ │ Session │ │
│ │ │ │ │ │ │ │ │ │
│ │ currentState│ │ vectorClock │ │ operations[]│ │presence │ │
│ │ (JSON) │ │ lamportTime │ │ startTime │ │clockValue│ │
│ │ version │ │ data │ │ endTime │ │connected│ │
│ └─────────────┘ └─────────────┘ └────────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────────┘Data Flow
Client Server MongoDB
│ │ │
│ CRDTMessage │ │
│ {id, ops[], timestamp} │ │
│──────────────────────────────▶│ │
│ │ │
│ │ 1. Deduplicate (processedIds)│
│ │ 2. Apply ops to SceneGraph │
│ │ 3. Add to Journal │
│ │ │
│ │ Persist │
│ │──────────────────────────────▶│
│ │ │
│ Ack {msgId} │ │
│◀──────────────────────────────│ │
│ │ │
│ │ Broadcast to other clients │
│ │──────────────────────────────▶│ (Client B, C)
│ │ │Minimal Server
The server only needs to:
- Receive messages from clients
- Apply operations using the dispatcher
- Broadcast to other clients
- Send acknowledgements back
import { applyMessage } from '@vuer-ai/vuer-rtc/operations';
import type { CRDTMessage, SceneGraph } from '@vuer-ai/vuer-rtc/operations';
let state: SceneGraph = { nodes: {}, rootKey: '' };
const processedIds = new Set<string>();
function handleMessage(ws: WebSocket, msg: CRDTMessage) {
// Idempotent - skip duplicates
if (processedIds.has(msg.id)) {
ws.send(JSON.stringify({ mtype: 'ack', msgId: msg.id }));
return;
}
processedIds.add(msg.id);
// Apply operations to state
state = applyMessage(state, msg);
// Send ack to sender
ws.send(JSON.stringify({ mtype: 'ack', msgId: msg.id }));
// Broadcast to other clients
broadcast(msg, ws);
}Why No Schema Needed?
With explicit operations like number.add, vector3.set, the operation itself specifies merge behavior:
// Client sends:
{
ot: 'number.add', // <-- This tells the server HOW to apply
key: 'player',
path: 'score',
value: 10
}The server doesn't need a schema to know score is additive - the ot: 'number.add' already says so!
Vector Clocks (Optional)
Vector clocks help detect concurrent operations for advanced conflict handling. For most use cases, Lamport timestamps (already in each message) are sufficient.
import { VectorClockManager } from '@vuer-ai/vuer-rtc/state';
const clockManager = new VectorClockManager();
// Compare two clocks
const comparison = clockManager.compare(clock1, clock2);
// 1 = clock1 > clock2 (happened after)
// -1 = clock1 < clock2 (happened before)
// 0 = concurrent (conflict!)
if (comparison === 0) {
// Concurrent operations - both valid, apply in lamportTime order
}Handling Undo/Redo
Meta operations (meta.undo, meta.redo) work by referencing a target message:
function handleMetaOps(msg: CRDTMessage, journal: JournalEntry[]) {
for (const op of msg.ops) {
if (op.ot === 'meta.undo') {
const target = journal.find(e => e.msg.id === op.targetMsgId);
if (target) target.deletedAt = msg.timestamp;
} else if (op.ot === 'meta.redo') {
const target = journal.find(e => e.msg.id === op.targetMsgId);
if (target) delete target.deletedAt;
}
}
}Complete Example
import WebSocket from 'ws';
import { applyMessage, createEmptyGraph } from '@vuer-ai/vuer-rtc/operations';
import type { CRDTMessage, SceneGraph } from '@vuer-ai/vuer-rtc/operations';
interface JournalEntry {
msg: CRDTMessage;
deletedAt?: number;
}
class RTCServer {
private state: SceneGraph = createEmptyGraph();
private journal: JournalEntry[] = [];
private processedIds = new Set<string>();
private clients = new Set<WebSocket>();
handleConnection(ws: WebSocket) {
this.clients.add(ws);
// Send current state to new client
ws.send(JSON.stringify({
mtype: 'init',
state: this.state,
journal: this.journal.map(e => e.msg),
}));
ws.on('message', (data) => {
const msg = JSON.parse(data.toString()) as CRDTMessage;
this.handleMessage(ws, msg);
});
ws.on('close', () => this.clients.delete(ws));
}
private handleMessage(sender: WebSocket, msg: CRDTMessage) {
// Idempotent
if (this.processedIds.has(msg.id)) {
sender.send(JSON.stringify({ mtype: 'ack', msgId: msg.id }));
return;
}
this.processedIds.add(msg.id);
// Handle meta ops (undo/redo)
for (const op of msg.ops) {
if (op.ot === 'meta.undo') {
const target = this.journal.find(e => e.msg.id === (op as any).targetMsgId);
if (target) target.deletedAt = msg.timestamp;
} else if (op.ot === 'meta.redo') {
const target = this.journal.find(e => e.msg.id === (op as any).targetMsgId);
if (target) delete target.deletedAt;
}
}
// Add to journal
this.journal.push({ msg });
// Rebuild state from journal (skip deleted entries)
this.rebuildState();
// Ack sender
sender.send(JSON.stringify({ mtype: 'ack', msgId: msg.id }));
// Broadcast to others
for (const client of this.clients) {
if (client !== sender && client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ mtype: 'message', msg }));
}
}
}
private rebuildState() {
let state = createEmptyGraph();
for (const entry of this.journal) {
if (entry.deletedAt) continue;
const realOps = entry.msg.ops.filter(op => !op.ot.startsWith('meta.'));
if (realOps.length > 0) {
state = applyMessage(state, { ...entry.msg, ops: realOps });
}
}
this.state = state;
}
}Source Files
dispatcher.ts (Operation Application)
/**
* Operation Dispatcher
*
* Applies CRDTMessage operations to a SceneGraph.
* Each operation is dispatched to its corresponding "apply" function.
*
* Uses shallow cloning for immutability without immer overhead.
*/
import type { SceneGraph, CRDTMessage, Operation } from './OperationTypes.js';
import type { OpMeta } from './apply/types.js';
import * as registry from './apply/index.js';
/**
* Handler map: ot -> apply function
*/
const handlers: Record<string, (graph: SceneGraph, op: Operation, meta: OpMeta) => void> = {
// Number operations
'number.set': registry.NumberSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'number.add': registry.NumberAdd as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'number.multiply': registry.NumberMultiply as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'number.min': registry.NumberMin as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'number.max': registry.NumberMax as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// String operations
'string.set': registry.StringSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'string.concat': registry.StringConcat as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Text CRDT operations (character-level collaborative editing)
'text.init': registry.TextInit as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'text.insert': registry.TextInsert as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'text.delete': registry.TextDelete as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'text.replace': registry.TextReplace as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Boolean operations
'boolean.set': registry.BooleanSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'boolean.or': registry.BooleanOr as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'boolean.and': registry.BooleanAnd as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Vector3 operations
'vector3.set': registry.Vector3Set as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'vector3.add': registry.Vector3Add as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'vector3.multiply': registry.Vector3Multiply as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'vector3.applyEuler': registry.Vector3ApplyEuler as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'vector3.applyQuaternion': registry.Vector3ApplyQuaternion as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Euler operations
'euler.set': registry.EulerSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'euler.add': registry.EulerAdd as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Quaternion operations
'quaternion.set': registry.QuaternionSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'quaternion.multiply': registry.QuaternionMultiply as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Color operations
'color.set': registry.ColorSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'color.blend': registry.ColorBlend as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Array operations
'array.set': registry.ArraySet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'array.push': registry.ArrayPush as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'array.remove': registry.ArrayRemove as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'array.union': registry.ArrayUnion as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Object operations
'object.set': registry.ObjectSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'object.merge': registry.ObjectMerge as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Node operations
'node.insert': registry.NodeInsert as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'node.remove': registry.NodeRemove as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'node.move': registry.NodeMove as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
};
/**
* Apply a single operation to a SceneGraph (mutates in place)
*/
export function applyOperation(
graph: SceneGraph,
op: Operation,
meta: OpMeta
): void {
const handler = handlers[op.ot];
if (handler) {
handler(graph, op, meta);
} else {
console.warn(`Unknown ot: ${op.ot}`);
}
}
/**
* Compare operations for sorting by Lamport timestamp.
* CRDT invariant: operations must be applied in causal order (seq → ts → id).
*
* This ensures that:
* 1. Operations with lower Lamport clocks are applied first
* 2. If Lamport clocks are equal, wall-clock time breaks the tie
* 3. If both are equal, lexicographic ID order ensures determinism
*
* @param a - First operation
* @param b - Second operation
* @returns Negative if a < b, positive if a > b, 0 if equal
*/
function compareOperations(a: Operation, b: Operation): number {
const aOp = a as any;
const bOp = b as any;
// Compare by Lamport clock (seq)
if (aOp.seq !== undefined && bOp.seq !== undefined) {
if (aOp.seq !== bOp.seq) return aOp.seq - bOp.seq;
}
// If equal or missing, compare by wall-clock time (ts)
if (aOp.ts !== undefined && bOp.ts !== undefined) {
if (aOp.ts !== bOp.ts) return aOp.ts - bOp.ts;
}
// Final fallback: compare by ID (lexicographic)
if (aOp.id && bOp.id) {
return String(aOp.id).localeCompare(String(bOp.id));
}
// If no metadata, preserve original order
return 0;
}
/**
* Shallow clone the graph and modified nodes
*/
function shallowCloneGraph(graph: SceneGraph): SceneGraph {
return {
...graph,
nodes: { ...graph.nodes },
};
}
/**
* Apply a CRDTMessage to a SceneGraph (immutable)
*
* Creates a shallow clone of the graph before applying operations.
*
* @param graph - Current scene graph state
* @param msg - CRDT message containing operations
* @returns New scene graph state with operations applied
*/
export function applyMessage(graph: SceneGraph, msg: CRDTMessage): SceneGraph {
// Shallow clone for immutability
const newGraph = shallowCloneGraph(graph);
const meta: OpMeta = {
client: msg.client,
clock: msg.clock,
lt: msg.lt,
ts: msg.ts,
};
// Sort operations by Lamport timestamp to ensure causal order
// This is critical for CRDT correctness, especially for text operations
// where replace ops may reference IDs from insert ops
const sortedOps = [...msg.ops].sort(compareOperations);
for (const op of sortedOps) {
// Determine all node keys that this operation will mutate
const keysToClone: string[] = [];
if (op.key && newGraph.nodes[op.key]) {
keysToClone.push(op.key);
}
if (op.ot === 'node.move') {
const { nodeKey, newParent } = (op as any).value;
if (nodeKey && newGraph.nodes[nodeKey]) keysToClone.push(nodeKey);
if (newParent && newGraph.nodes[newParent]) keysToClone.push(newParent);
}
if (op.ot === 'node.remove') {
const nodeKey = (op as any).value;
if (typeof nodeKey === 'string' && newGraph.nodes[nodeKey]) keysToClone.push(nodeKey);
}
for (const k of new Set(keysToClone)) {
const orig = newGraph.nodes[k];
newGraph.nodes[k] = {
...orig,
children: orig.children ? [...orig.children] : [],
};
}
applyOperation(newGraph, op, meta);
}
return newGraph;
}
/**
* Apply a CRDTMessage to a SceneGraph (mutable)
*
* Mutates the graph directly for better performance.
*
* @param graph - Scene graph state to mutate
* @param msg - CRDT message containing operations
*/
export function applyMessageMut(graph: SceneGraph, msg: CRDTMessage): void {
const meta: OpMeta = {
client: msg.client,
clock: msg.clock,
lt: msg.lt,
ts: msg.ts,
};
// Sort operations by Lamport timestamp to ensure causal order
const sortedOps = [...msg.ops].sort(compareOperations);
for (const op of sortedOps) {
applyOperation(graph, op, meta);
}
}
/**
* Apply multiple CRDTMessages to a SceneGraph (immutable)
*
* @param graph - Current scene graph state
* @param messages - Array of CRDT messages to apply
* @returns New scene graph state with all operations applied
*/
export function applyMessages(graph: SceneGraph, messages: CRDTMessage[]): SceneGraph {
let current = graph;
for (const msg of messages) {
current = applyMessage(current, msg);
}
return current;
}
/**
* Apply multiple CRDTMessages to a SceneGraph (mutable)
*
* @param graph - Scene graph state to mutate
* @param messages - Array of CRDT messages to apply
*/
export function applyMessagesMut(graph: SceneGraph, messages: CRDTMessage[]): void {
for (const msg of messages) {
applyMessageMut(graph, msg);
}
}
/**
* Create an empty scene graph
*/
export function createEmptyGraph(): SceneGraph {
return {
nodes: {},
rootKey: '',
lww: {},
tombstones: {},
};
}VectorClock.ts
/**
* VectorClock - CRDT-inspired vector clock implementation
*
* Vector clocks provide causal ordering of operations in a distributed system.
* Each session maintains a counter, and clocks are compared to detect:
* - Causal ordering (A happened before B)
* - Concurrent operations (A and B are independent)
*/
export type VectorClock = Record<string, number>;
export class VectorClockManager {
/**
* Create a new vector clock for a client
* Initializes the client's counter to 0
*/
create(client: string): VectorClock {
return { [client]: 0 };
}
/**
* Increment the counter for a client
* Returns a new clock (immutable)
*/
increment(clock: VectorClock, client: string): VectorClock {
const currentValue = clock[client] || 0;
return {
...clock,
[client]: currentValue + 1,
};
}
/**
* Merge two vector clocks
* Takes the maximum value for each client
* Used when receiving remote operations
*/
merge(clock1: VectorClock, clock2: VectorClock): VectorClock {
const merged: VectorClock = { ...clock1 };
Object.entries(clock2).forEach(([client, count]) => {
merged[client] = Math.max(merged[client] || 0, count);
});
return merged;
}
/**
* Compare two vector clocks
*
* Returns:
* 1 if clock1 > clock2 (clock1 causally after clock2)
* -1 if clock1 < clock2 (clock1 causally before clock2)
* 0 if concurrent (neither causally precedes the other)
*/
compare(clock1: VectorClock, clock2: VectorClock): number {
const allSessionIds = new Set([
...Object.keys(clock1),
...Object.keys(clock2),
]);
let clock1Greater = false;
let clock2Greater = false;
allSessionIds.forEach((client) => {
const val1 = clock1[client] || 0;
const val2 = clock2[client] || 0;
if (val1 > val2) {
clock1Greater = true;
}
if (val2 > val1) {
clock2Greater = true;
}
});
// If clock1 is greater in all dimensions, it causally follows clock2
if (clock1Greater && !clock2Greater) {
return 1;
}
// If clock2 is greater in all dimensions, it causally follows clock1
if (clock2Greater && !clock1Greater) {
return -1;
}
// Otherwise, they are concurrent (or identical)
return 0;
}
/**
* Check if two operations are concurrent
* (neither causally precedes the other)
*/
areConcurrent(clock1: VectorClock, clock2: VectorClock): boolean {
return this.compare(clock1, clock2) === 0;
}
}