Server Implementation
vuer-rtc uses explicit operations where each message specifies its merge behavior via otype.
This simplifies server implementation significantly -- the server just applies operations and
broadcasts.
Architecture Overview
┌────────────────────────────────────────────────────────────────────────────┐
│ Server │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ WebSocket Handler │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │ │
│ │ │ Receive │───▶│ Validate │───▶│ Apply Operations │ │ │
│ │ │ Message │ │ & Dedup │ │ (dispatcher.ts) │ │ │
│ │ └─────────────┘ └─────────────┘ └───────────┬─────────────┘ │ │
│ │ │ │ │
│ │ ┌──────────────────────────────┼─────────────┐ │ │
│ │ │ ▼ │ │ │
│ │ │ ┌─────────────────────────────────────┐ │ │ │
│ │ │ │ SceneGraph │ │ │ │
│ │ │ │ ┌───────────────────────────────┐ │ │ │ │
│ │ │ │ │ nodes: { [key]: SceneNode } │ │ │ │ │
│ │ │ │ │ - position, rotation, etc. │ │ │ │ │
│ │ │ │ │ - schema-free properties │ │ │ │ │
│ │ │ │ └───────────────────────────────┘ │ │ │ │
│ │ │ └─────────────────────────────────────┘ │ │ │
│ │ │ ▲ │ │ │
│ │ │ │ rebuild │ │ │
│ │ │ │ │ │ │
│ │ │ ┌─────────────────┴───────────────────┐ │ │ │
│ │ │ │ Journal (Event Log) │ │ │ │
│ │ │ │ ┌───────────────────────────────┐ │ │ │ │
│ │ │ │ │ Entry { msg, deletedAt? } │ │ │ │ │
│ │ │ │ │ Entry { msg, deletedAt? } │ │ │ │ │
│ │ │ │ │ ... │ │ │ │ │
│ │ │ │ └───────────────────────────────┘ │ │ │ │
│ │ │ └─────────────────────────────────────┘ │ │ │
│ │ │ In-Memory │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────┼───────────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │
│ │ Broadcast │ │ Persist to │ │ Send │ │
│ │ to Clients │ │ MongoDB │ │ Ack │ │
│ └─────────────────┘ └─────────────────┘ └──────────────┘ │
└────────────────────────────────────────────────────────────────────────────┘
│
┌───────────────────────────────┼───────────────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Client A │ │ Client B │ │ Client C │
│ (WebSocket) │ │ (WebSocket) │ │ (WebSocket) │
└───────────────┘ └───────────────┘ └───────────────┘Database Schema
The server persists to 4 MongoDB collections:
| Collection | Purpose |
|---|---|
| Document | Scene metadata + currentState (schema-free JSON snapshot) |
| Operation | Individual CRDT operations with vector clocks |
| JournalBatch | Batched write-ahead log (33ms batching) |
| Session | Client connections, presence, and clock state |
┌─────────────────────────────────────────────────────────────────┐
│ MongoDB │
│ ┌─────────────┐ ┌─────────────┐ ┌────────────┐ ┌─────────┐ │
│ │ Document │ │ Operation │ │ JournalBatch│ │ Session │ │
│ │ │ │ │ │ │ │ │ │
│ │ currentState│ │ vectorClock │ │ operations[]│ │presence │ │
│ │ (JSON) │ │ lamportTime │ │ startTime │ │clockValue│ │
│ │ version │ │ data │ │ endTime │ │connected│ │
│ └─────────────┘ └─────────────┘ └────────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────────┘Data Flow
Client Server MongoDB
│ │ │
│ CRDTMessage │ │
│ {id, ops[], timestamp} │ │
│──────────────────────────────▶│ │
│ │ │
│ │ 1. Deduplicate (processedIds)│
│ │ 2. Apply ops to SceneGraph │
│ │ 3. Add to Journal │
│ │ │
│ │ Persist │
│ │──────────────────────────────▶│
│ │ │
│ Ack {msgId} │ │
│◀──────────────────────────────│ │
│ │ │
│ │ Broadcast to other clients │
│ │──────────────────────────────▶│ (Client B, C)
│ │ │Minimal Server
The server only needs to:
- Receive messages from clients
- Apply operations using the dispatcher
- Broadcast to other clients
- Send acknowledgements back
import { applyMessage } from '@vuer-ai/vuer-rtc/operations';
import type { CRDTMessage, SceneGraph } from '@vuer-ai/vuer-rtc/operations';
let state: SceneGraph = { nodes: {}, rootKey: '' };
const processedIds = new Set<string>();
function handleMessage(ws: WebSocket, msg: CRDTMessage) {
// Idempotent - skip duplicates
if (processedIds.has(msg.id)) {
ws.send(JSON.stringify({ type: 'ack', msgId: msg.id }));
return;
}
processedIds.add(msg.id);
// Apply operations to state
state = applyMessage(state, msg);
// Send ack to sender
ws.send(JSON.stringify({ type: 'ack', msgId: msg.id }));
// Broadcast to other clients
broadcast(msg, ws);
}Why No Schema Needed?
With explicit operations like number.add, vector3.set, the operation itself specifies merge behavior:
// Client sends:
{
otype: 'number.add', // <-- This tells the server HOW to apply
key: 'player',
path: 'score',
value: 10
}The server doesn't need a schema to know score is additive - the otype: 'number.add' already says so!
Vector Clocks (Optional)
Vector clocks help detect concurrent operations for advanced conflict handling. For most use cases, Lamport timestamps (already in each message) are sufficient.
import { VectorClockManager } from '@vuer-ai/vuer-rtc/state';
const clockManager = new VectorClockManager();
// Compare two clocks
const comparison = clockManager.compare(clock1, clock2);
// 1 = clock1 > clock2 (happened after)
// -1 = clock1 < clock2 (happened before)
// 0 = concurrent (conflict!)
if (comparison === 0) {
// Concurrent operations - both valid, apply in lamportTime order
}Handling Undo/Redo
Meta operations (meta.undo, meta.redo) work by referencing a target message:
function handleMetaOps(msg: CRDTMessage, journal: JournalEntry[]) {
for (const op of msg.ops) {
if (op.otype === 'meta.undo') {
const target = journal.find(e => e.msg.id === op.targetMsgId);
if (target) target.deletedAt = msg.timestamp;
} else if (op.otype === 'meta.redo') {
const target = journal.find(e => e.msg.id === op.targetMsgId);
if (target) delete target.deletedAt;
}
}
}Complete Example
import WebSocket from 'ws';
import { applyMessage, createEmptyGraph } from '@vuer-ai/vuer-rtc/operations';
import type { CRDTMessage, SceneGraph } from '@vuer-ai/vuer-rtc/operations';
interface JournalEntry {
msg: CRDTMessage;
deletedAt?: number;
}
class RTCServer {
private state: SceneGraph = createEmptyGraph();
private journal: JournalEntry[] = [];
private processedIds = new Set<string>();
private clients = new Set<WebSocket>();
handleConnection(ws: WebSocket) {
this.clients.add(ws);
// Send current state to new client
ws.send(JSON.stringify({
type: 'init',
state: this.state,
journal: this.journal.map(e => e.msg),
}));
ws.on('message', (data) => {
const msg = JSON.parse(data.toString()) as CRDTMessage;
this.handleMessage(ws, msg);
});
ws.on('close', () => this.clients.delete(ws));
}
private handleMessage(sender: WebSocket, msg: CRDTMessage) {
// Idempotent
if (this.processedIds.has(msg.id)) {
sender.send(JSON.stringify({ type: 'ack', msgId: msg.id }));
return;
}
this.processedIds.add(msg.id);
// Handle meta ops (undo/redo)
for (const op of msg.ops) {
if (op.otype === 'meta.undo') {
const target = this.journal.find(e => e.msg.id === (op as any).targetMsgId);
if (target) target.deletedAt = msg.timestamp;
} else if (op.otype === 'meta.redo') {
const target = this.journal.find(e => e.msg.id === (op as any).targetMsgId);
if (target) delete target.deletedAt;
}
}
// Add to journal
this.journal.push({ msg });
// Rebuild state from journal (skip deleted entries)
this.rebuildState();
// Ack sender
sender.send(JSON.stringify({ type: 'ack', msgId: msg.id }));
// Broadcast to others
for (const client of this.clients) {
if (client !== sender && client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ type: 'message', msg }));
}
}
}
private rebuildState() {
let state = createEmptyGraph();
for (const entry of this.journal) {
if (entry.deletedAt) continue;
const realOps = entry.msg.ops.filter(op => !op.otype.startsWith('meta.'));
if (realOps.length > 0) {
state = applyMessage(state, { ...entry.msg, ops: realOps });
}
}
this.state = state;
}
}Source Files
dispatcher.ts (Operation Application)
/**
* Operation Dispatcher
*
* Applies CRDTMessage operations to a SceneGraph.
* Each operation is dispatched to its corresponding "apply" function.
*
* Uses shallow cloning for immutability without immer overhead.
*/
import type { SceneGraph, CRDTMessage, Operation } from './OperationTypes.js';
import type { OpMeta } from './apply/types.js';
import * as registry from './apply/index.js';
/**
* Handler map: otype -> apply function
*/
const handlers: Record<string, (graph: SceneGraph, op: Operation, meta: OpMeta) => void> = {
// Number operations
'number.set': registry.NumberSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'number.add': registry.NumberAdd as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'number.multiply': registry.NumberMultiply as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'number.min': registry.NumberMin as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'number.max': registry.NumberMax as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// String operations
'string.set': registry.StringSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'string.concat': registry.StringConcat as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Text CRDT operations (character-level collaborative editing)
'text.init': registry.TextInit as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'text.insert': registry.TextInsert as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'text.delete': registry.TextDelete as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Boolean operations
'boolean.set': registry.BooleanSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'boolean.or': registry.BooleanOr as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'boolean.and': registry.BooleanAnd as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Vector3 operations
'vector3.set': registry.Vector3Set as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'vector3.add': registry.Vector3Add as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'vector3.multiply': registry.Vector3Multiply as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'vector3.applyEuler': registry.Vector3ApplyEuler as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'vector3.applyQuaternion': registry.Vector3ApplyQuaternion as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Euler operations
'euler.set': registry.EulerSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'euler.add': registry.EulerAdd as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Quaternion operations
'quaternion.set': registry.QuaternionSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'quaternion.multiply': registry.QuaternionMultiply as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Color operations
'color.set': registry.ColorSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'color.blend': registry.ColorBlend as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Array operations
'array.set': registry.ArraySet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'array.push': registry.ArrayPush as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'array.remove': registry.ArrayRemove as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'array.union': registry.ArrayUnion as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Object operations
'object.set': registry.ObjectSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'object.merge': registry.ObjectMerge as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
// Node operations
'node.insert': registry.NodeInsert as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
'node.remove': registry.NodeRemove as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
};
/**
* Apply a single operation to a SceneGraph (mutates in place)
*/
export function applyOperation(
graph: SceneGraph,
op: Operation,
meta: OpMeta
): void {
const handler = handlers[op.otype];
if (handler) {
handler(graph, op, meta);
} else {
console.warn(`Unknown otype: ${op.otype}`);
}
}
/**
* Shallow clone the graph and modified nodes
*/
function shallowCloneGraph(graph: SceneGraph): SceneGraph {
return {
...graph,
nodes: { ...graph.nodes },
};
}
/**
* Apply a CRDTMessage to a SceneGraph (immutable)
*
* Creates a shallow clone of the graph before applying operations.
*
* @param graph - Current scene graph state
* @param msg - CRDT message containing operations
* @returns New scene graph state with operations applied
*/
export function applyMessage(graph: SceneGraph, msg: CRDTMessage): SceneGraph {
// Shallow clone for immutability
const newGraph = shallowCloneGraph(graph);
const meta: OpMeta = {
sessionId: msg.sessionId,
clock: msg.clock,
lamportTime: msg.lamportTime,
timestamp: msg.timestamp,
};
for (const op of msg.ops) {
// Clone the specific node being modified (if it exists)
if (op.key && newGraph.nodes[op.key]) {
newGraph.nodes[op.key] = { ...newGraph.nodes[op.key] };
}
applyOperation(newGraph, op, meta);
}
return newGraph;
}
/**
* Apply a CRDTMessage to a SceneGraph (mutable)
*
* Mutates the graph directly for better performance.
*
* @param graph - Scene graph state to mutate
* @param msg - CRDT message containing operations
*/
export function applyMessageMut(graph: SceneGraph, msg: CRDTMessage): void {
const meta: OpMeta = {
sessionId: msg.sessionId,
clock: msg.clock,
lamportTime: msg.lamportTime,
timestamp: msg.timestamp,
};
for (const op of msg.ops) {
applyOperation(graph, op, meta);
}
}
/**
* Apply multiple CRDTMessages to a SceneGraph (immutable)
*
* @param graph - Current scene graph state
* @param messages - Array of CRDT messages to apply
* @returns New scene graph state with all operations applied
*/
export function applyMessages(graph: SceneGraph, messages: CRDTMessage[]): SceneGraph {
let current = graph;
for (const msg of messages) {
current = applyMessage(current, msg);
}
return current;
}
/**
* Apply multiple CRDTMessages to a SceneGraph (mutable)
*
* @param graph - Scene graph state to mutate
* @param messages - Array of CRDT messages to apply
*/
export function applyMessagesMut(graph: SceneGraph, messages: CRDTMessage[]): void {
for (const msg of messages) {
applyMessageMut(graph, msg);
}
}
/**
* Create an empty scene graph
*/
export function createEmptyGraph(): SceneGraph {
return {
nodes: {},
rootKey: '',
};
}VectorClock.ts
/**
* VectorClock - CRDT-inspired vector clock implementation
*
* Vector clocks provide causal ordering of operations in a distributed system.
* Each session maintains a counter, and clocks are compared to detect:
* - Causal ordering (A happened before B)
* - Concurrent operations (A and B are independent)
*/
export type VectorClock = Record<string, number>;
export class VectorClockManager {
/**
* Create a new vector clock for a session
* Initializes the session's counter to 0
*/
create(sessionId: string): VectorClock {
return { [sessionId]: 0 };
}
/**
* Increment the counter for a session
* Returns a new clock (immutable)
*/
increment(clock: VectorClock, sessionId: string): VectorClock {
const currentValue = clock[sessionId] || 0;
return {
...clock,
[sessionId]: currentValue + 1,
};
}
/**
* Merge two vector clocks
* Takes the maximum value for each session
* Used when receiving remote operations
*/
merge(clock1: VectorClock, clock2: VectorClock): VectorClock {
const merged: VectorClock = { ...clock1 };
Object.entries(clock2).forEach(([sessionId, count]) => {
merged[sessionId] = Math.max(merged[sessionId] || 0, count);
});
return merged;
}
/**
* Compare two vector clocks
*
* Returns:
* 1 if clock1 > clock2 (clock1 causally after clock2)
* -1 if clock1 < clock2 (clock1 causally before clock2)
* 0 if concurrent (neither causally precedes the other)
*/
compare(clock1: VectorClock, clock2: VectorClock): number {
const allSessionIds = new Set([
...Object.keys(clock1),
...Object.keys(clock2),
]);
let clock1Greater = false;
let clock2Greater = false;
allSessionIds.forEach((sessionId) => {
const val1 = clock1[sessionId] || 0;
const val2 = clock2[sessionId] || 0;
if (val1 > val2) {
clock1Greater = true;
}
if (val2 > val1) {
clock2Greater = true;
}
});
// If clock1 is greater in all dimensions, it causally follows clock2
if (clock1Greater && !clock2Greater) {
return 1;
}
// If clock2 is greater in all dimensions, it causally follows clock1
if (clock2Greater && !clock1Greater) {
return -1;
}
// Otherwise, they are concurrent (or identical)
return 0;
}
/**
* Check if two operations are concurrent
* (neither causally precedes the other)
*/
areConcurrent(clock1: VectorClock, clock2: VectorClock): boolean {
return this.compare(clock1, clock2) === 0;
}
}