GitHub

Server Implementation

vuer-rtc uses explicit operations where each message specifies its merge behavior via otype. This simplifies server implementation significantly -- the server just applies operations and broadcasts.

Architecture Overview

┌────────────────────────────────────────────────────────────────────────────┐
│                              Server                                        │
│  ┌──────────────────────────────────────────────────────────────────────┐  │
│  │                         WebSocket Handler                            │  │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────────────────┐   │  │
│  │  │  Receive    │───▶│  Validate   │───▶│  Apply Operations       │   │  │
│  │  │  Message    │    │  & Dedup    │      (dispatcher.ts)        │   │  │
│  │  └─────────────┘    └─────────────┘    └───────────┬─────────────┘   │  │
│  │                                                    │                 │  │
│  │                     ┌──────────────────────────────┼─────────────┐   │  │
│  │                     │                              ▼             │   │  │
│  │                     │  ┌─────────────────────────────────────┐   │   │  │
│  │                     │  │           SceneGraph                │   │   │  │
│  │                     │  │  ┌───────────────────────────────┐  │   │   │  │
│  │                     │  │  │ nodes: { [key]: SceneNode }   │  │   │   │  │
│  │                     │  │  │   - position, rotation, etc.  │  │   │   │  │
│  │                     │  │  │   - schema-free properties    │  │   │   │  │
│  │                     │  │  └───────────────────────────────┘  │   │   │  │
│  │                     │  └─────────────────────────────────────┘   │   │  │
│  │                     │                    ▲                       │   │  │
│  │                     │                    │ rebuild               │   │  │
│  │                     │                    │                       │   │  │
│  │                     │  ┌─────────────────┴───────────────────┐   │   │  │
│  │                     │  │        Journal (Event Log)          │   │   │  │
│  │                     │  │  ┌───────────────────────────────┐  │   │   │  │
│  │                     │  │  │ Entry { msg, deletedAt? }     │  │   │   │  │
│  │                     │  │  │ Entry { msg, deletedAt? }     │  │   │   │  │
│  │                     │  │  │ ...                           │  │   │   │  │
│  │                     │  │  └───────────────────────────────┘  │   │   │  │
│  │                     │  └─────────────────────────────────────┘   │   │  │
│  │                     │                In-Memory                   │   │  │
│  │                     └────────────────────────────────────────────┘   │  │
│  └──────────────────────────────────────────────────────────────────────┘  │
│                                        │                                   │
│            ┌───────────────────────────┼───────────────────────────┐       │
│            ▼                           ▼                           ▼       │
│  ┌─────────────────┐         ┌─────────────────┐         ┌──────────────┐  │
│  │   Broadcast     │         │   Persist to    │         │    Send      │  │
│  │   to Clients    │         │    MongoDB      │         │    Ack       │  │
│  └─────────────────┘         └─────────────────┘         └──────────────┘  │
└────────────────────────────────────────────────────────────────────────────┘
        ┌───────────────────────────────┼───────────────────────────┐
        ▼                               ▼                           ▼
┌───────────────┐             ┌───────────────┐             ┌───────────────┐
│   Client A    │             │   Client B    │             │   Client C  (WebSocket)  (WebSocket)  (WebSocket)└───────────────┘             └───────────────┘             └───────────────┘

Database Schema

The server persists to 4 MongoDB collections:

CollectionPurpose
DocumentScene metadata + currentState (schema-free JSON snapshot)
OperationIndividual CRDT operations with vector clocks
JournalBatchBatched write-ahead log (33ms batching)
SessionClient connections, presence, and clock state
┌─────────────────────────────────────────────────────────────────┐
│                         MongoDB                                 │
│  ┌─────────────┐  ┌─────────────┐  ┌────────────┐  ┌─────────┐  │
│  │  Document   │  │  Operation  │  │ JournalBatch│ │ Session │  │
│  │             │  │             │  │             │  │         │  │
│  │ currentState│  │ vectorClock │  │ operations[]│  │presence │  │
 (JSON)      │  │ lamportTime │  │ startTime   │  │clockValue│ │
│  │ version     │  │ data        │  │ endTime     │  │connected│  │
│  └─────────────┘  └─────────────┘  └────────────┘  └─────────┘  │
└─────────────────────────────────────────────────────────────────┘

Data Flow

Client                          Server                         MongoDB
  │                               │                               │
  │  CRDTMessage                  │                               │
{id, ops[], timestamp}       │                               │
  │──────────────────────────────▶│                               │
  │                               │                               │
  │                               │  1. Deduplicate (processedIds)  │                               │  2. Apply ops to SceneGraph   │
  │                               │  3. Add to Journal            │
  │                               │                               │
  │                               │  Persist                      │
  │                               │──────────────────────────────▶│
  │                               │                               │
  │              Ack {msgId}      │                               │
  │◀──────────────────────────────│                               │
  │                               │                               │
  │                               │  Broadcast to other clients   │
│──────────────────────────────▶│ (Client B, C)
  │                               │                               │

Minimal Server

The server only needs to:

  1. Receive messages from clients
  2. Apply operations using the dispatcher
  3. Broadcast to other clients
  4. Send acknowledgements back
import { applyMessage } from '@vuer-ai/vuer-rtc/operations';
import type { CRDTMessage, SceneGraph } from '@vuer-ai/vuer-rtc/operations';

let state: SceneGraph = { nodes: {}, rootKey: '' };
const processedIds = new Set<string>();

function handleMessage(ws: WebSocket, msg: CRDTMessage) {
  // Idempotent - skip duplicates
  if (processedIds.has(msg.id)) {
    ws.send(JSON.stringify({ type: 'ack', msgId: msg.id }));
    return;
  }
  processedIds.add(msg.id);

  // Apply operations to state
  state = applyMessage(state, msg);

  // Send ack to sender
  ws.send(JSON.stringify({ type: 'ack', msgId: msg.id }));

  // Broadcast to other clients
  broadcast(msg, ws);
}

Why No Schema Needed?

With explicit operations like number.add, vector3.set, the operation itself specifies merge behavior:

// Client sends:
{
  otype: 'number.add',  // <-- This tells the server HOW to apply
  key: 'player',
  path: 'score',
  value: 10
}

The server doesn't need a schema to know score is additive - the otype: 'number.add' already says so!

Vector Clocks (Optional)

Vector clocks help detect concurrent operations for advanced conflict handling. For most use cases, Lamport timestamps (already in each message) are sufficient.

import { VectorClockManager } from '@vuer-ai/vuer-rtc/state';

const clockManager = new VectorClockManager();

// Compare two clocks
const comparison = clockManager.compare(clock1, clock2);
// 1 = clock1 > clock2 (happened after)
// -1 = clock1 < clock2 (happened before)
// 0 = concurrent (conflict!)

if (comparison === 0) {
  // Concurrent operations - both valid, apply in lamportTime order
}

Handling Undo/Redo

Meta operations (meta.undo, meta.redo) work by referencing a target message:

function handleMetaOps(msg: CRDTMessage, journal: JournalEntry[]) {
  for (const op of msg.ops) {
    if (op.otype === 'meta.undo') {
      const target = journal.find(e => e.msg.id === op.targetMsgId);
      if (target) target.deletedAt = msg.timestamp;
    } else if (op.otype === 'meta.redo') {
      const target = journal.find(e => e.msg.id === op.targetMsgId);
      if (target) delete target.deletedAt;
    }
  }
}

Complete Example

import WebSocket from 'ws';
import { applyMessage, createEmptyGraph } from '@vuer-ai/vuer-rtc/operations';
import type { CRDTMessage, SceneGraph } from '@vuer-ai/vuer-rtc/operations';

interface JournalEntry {
  msg: CRDTMessage;
  deletedAt?: number;
}

class RTCServer {
  private state: SceneGraph = createEmptyGraph();
  private journal: JournalEntry[] = [];
  private processedIds = new Set<string>();
  private clients = new Set<WebSocket>();

  handleConnection(ws: WebSocket) {
    this.clients.add(ws);

    // Send current state to new client
    ws.send(JSON.stringify({
      type: 'init',
      state: this.state,
      journal: this.journal.map(e => e.msg),
    }));

    ws.on('message', (data) => {
      const msg = JSON.parse(data.toString()) as CRDTMessage;
      this.handleMessage(ws, msg);
    });

    ws.on('close', () => this.clients.delete(ws));
  }

  private handleMessage(sender: WebSocket, msg: CRDTMessage) {
    // Idempotent
    if (this.processedIds.has(msg.id)) {
      sender.send(JSON.stringify({ type: 'ack', msgId: msg.id }));
      return;
    }
    this.processedIds.add(msg.id);

    // Handle meta ops (undo/redo)
    for (const op of msg.ops) {
      if (op.otype === 'meta.undo') {
        const target = this.journal.find(e => e.msg.id === (op as any).targetMsgId);
        if (target) target.deletedAt = msg.timestamp;
      } else if (op.otype === 'meta.redo') {
        const target = this.journal.find(e => e.msg.id === (op as any).targetMsgId);
        if (target) delete target.deletedAt;
      }
    }

    // Add to journal
    this.journal.push({ msg });

    // Rebuild state from journal (skip deleted entries)
    this.rebuildState();

    // Ack sender
    sender.send(JSON.stringify({ type: 'ack', msgId: msg.id }));

    // Broadcast to others
    for (const client of this.clients) {
      if (client !== sender && client.readyState === WebSocket.OPEN) {
        client.send(JSON.stringify({ type: 'message', msg }));
      }
    }
  }

  private rebuildState() {
    let state = createEmptyGraph();
    for (const entry of this.journal) {
      if (entry.deletedAt) continue;
      const realOps = entry.msg.ops.filter(op => !op.otype.startsWith('meta.'));
      if (realOps.length > 0) {
        state = applyMessage(state, { ...entry.msg, ops: realOps });
      }
    }
    this.state = state;
  }
}

Source Files

dispatcher.ts (Operation Application)

/**
 * Operation Dispatcher
 *
 * Applies CRDTMessage operations to a SceneGraph.
 * Each operation is dispatched to its corresponding "apply" function.
 *
 * Uses shallow cloning for immutability without immer overhead.
 */

import type { SceneGraph, CRDTMessage, Operation } from './OperationTypes.js';
import type { OpMeta } from './apply/types.js';
import * as registry from './apply/index.js';

/**
 * Handler map: otype -> apply function
 */
const handlers: Record<string, (graph: SceneGraph, op: Operation, meta: OpMeta) => void> = {
  // Number operations
  'number.set': registry.NumberSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'number.add': registry.NumberAdd as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'number.multiply': registry.NumberMultiply as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'number.min': registry.NumberMin as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'number.max': registry.NumberMax as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // String operations
  'string.set': registry.StringSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'string.concat': registry.StringConcat as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Text CRDT operations (character-level collaborative editing)
  'text.init': registry.TextInit as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'text.insert': registry.TextInsert as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'text.delete': registry.TextDelete as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Boolean operations
  'boolean.set': registry.BooleanSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'boolean.or': registry.BooleanOr as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'boolean.and': registry.BooleanAnd as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Vector3 operations
  'vector3.set': registry.Vector3Set as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'vector3.add': registry.Vector3Add as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'vector3.multiply': registry.Vector3Multiply as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'vector3.applyEuler': registry.Vector3ApplyEuler as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'vector3.applyQuaternion': registry.Vector3ApplyQuaternion as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Euler operations
  'euler.set': registry.EulerSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'euler.add': registry.EulerAdd as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Quaternion operations
  'quaternion.set': registry.QuaternionSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'quaternion.multiply': registry.QuaternionMultiply as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Color operations
  'color.set': registry.ColorSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'color.blend': registry.ColorBlend as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Array operations
  'array.set': registry.ArraySet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'array.push': registry.ArrayPush as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'array.remove': registry.ArrayRemove as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'array.union': registry.ArrayUnion as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Object operations
  'object.set': registry.ObjectSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'object.merge': registry.ObjectMerge as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Node operations
  'node.insert': registry.NodeInsert as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'node.remove': registry.NodeRemove as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
};

/**
 * Apply a single operation to a SceneGraph (mutates in place)
 */
export function applyOperation(
  graph: SceneGraph,
  op: Operation,
  meta: OpMeta
): void {
  const handler = handlers[op.otype];
  if (handler) {
    handler(graph, op, meta);
  } else {
    console.warn(`Unknown otype: ${op.otype}`);
  }
}

/**
 * Shallow clone the graph and modified nodes
 */
function shallowCloneGraph(graph: SceneGraph): SceneGraph {
  return {
    ...graph,
    nodes: { ...graph.nodes },
  };
}

/**
 * Apply a CRDTMessage to a SceneGraph (immutable)
 *
 * Creates a shallow clone of the graph before applying operations.
 *
 * @param graph - Current scene graph state
 * @param msg - CRDT message containing operations
 * @returns New scene graph state with operations applied
 */
export function applyMessage(graph: SceneGraph, msg: CRDTMessage): SceneGraph {
  // Shallow clone for immutability
  const newGraph = shallowCloneGraph(graph);

  const meta: OpMeta = {
    sessionId: msg.sessionId,
    clock: msg.clock,
    lamportTime: msg.lamportTime,
    timestamp: msg.timestamp,
  };

  for (const op of msg.ops) {
    // Clone the specific node being modified (if it exists)
    if (op.key && newGraph.nodes[op.key]) {
      newGraph.nodes[op.key] = { ...newGraph.nodes[op.key] };
    }
    applyOperation(newGraph, op, meta);
  }

  return newGraph;
}

/**
 * Apply a CRDTMessage to a SceneGraph (mutable)
 *
 * Mutates the graph directly for better performance.
 *
 * @param graph - Scene graph state to mutate
 * @param msg - CRDT message containing operations
 */
export function applyMessageMut(graph: SceneGraph, msg: CRDTMessage): void {
  const meta: OpMeta = {
    sessionId: msg.sessionId,
    clock: msg.clock,
    lamportTime: msg.lamportTime,
    timestamp: msg.timestamp,
  };

  for (const op of msg.ops) {
    applyOperation(graph, op, meta);
  }
}

/**
 * Apply multiple CRDTMessages to a SceneGraph (immutable)
 *
 * @param graph - Current scene graph state
 * @param messages - Array of CRDT messages to apply
 * @returns New scene graph state with all operations applied
 */
export function applyMessages(graph: SceneGraph, messages: CRDTMessage[]): SceneGraph {
  let current = graph;
  for (const msg of messages) {
    current = applyMessage(current, msg);
  }
  return current;
}

/**
 * Apply multiple CRDTMessages to a SceneGraph (mutable)
 *
 * @param graph - Scene graph state to mutate
 * @param messages - Array of CRDT messages to apply
 */
export function applyMessagesMut(graph: SceneGraph, messages: CRDTMessage[]): void {
  for (const msg of messages) {
    applyMessageMut(graph, msg);
  }
}

/**
 * Create an empty scene graph
 */
export function createEmptyGraph(): SceneGraph {
  return {
    nodes: {},
    rootKey: '',
  };
}

VectorClock.ts

/**
 * VectorClock - CRDT-inspired vector clock implementation
 *
 * Vector clocks provide causal ordering of operations in a distributed system.
 * Each session maintains a counter, and clocks are compared to detect:
 * - Causal ordering (A happened before B)
 * - Concurrent operations (A and B are independent)
 */

export type VectorClock = Record<string, number>;

export class VectorClockManager {
  /**
   * Create a new vector clock for a session
   * Initializes the session's counter to 0
   */
  create(sessionId: string): VectorClock {
    return { [sessionId]: 0 };
  }

  /**
   * Increment the counter for a session
   * Returns a new clock (immutable)
   */
  increment(clock: VectorClock, sessionId: string): VectorClock {
    const currentValue = clock[sessionId] || 0;
    return {
      ...clock,
      [sessionId]: currentValue + 1,
    };
  }

  /**
   * Merge two vector clocks
   * Takes the maximum value for each session
   * Used when receiving remote operations
   */
  merge(clock1: VectorClock, clock2: VectorClock): VectorClock {
    const merged: VectorClock = { ...clock1 };

    Object.entries(clock2).forEach(([sessionId, count]) => {
      merged[sessionId] = Math.max(merged[sessionId] || 0, count);
    });

    return merged;
  }

  /**
   * Compare two vector clocks
   *
   * Returns:
   *  1 if clock1 > clock2 (clock1 causally after clock2)
   * -1 if clock1 < clock2 (clock1 causally before clock2)
   *  0 if concurrent (neither causally precedes the other)
   */
  compare(clock1: VectorClock, clock2: VectorClock): number {
    const allSessionIds = new Set([
      ...Object.keys(clock1),
      ...Object.keys(clock2),
    ]);

    let clock1Greater = false;
    let clock2Greater = false;

    allSessionIds.forEach((sessionId) => {
      const val1 = clock1[sessionId] || 0;
      const val2 = clock2[sessionId] || 0;

      if (val1 > val2) {
        clock1Greater = true;
      }
      if (val2 > val1) {
        clock2Greater = true;
      }
    });

    // If clock1 is greater in all dimensions, it causally follows clock2
    if (clock1Greater && !clock2Greater) {
      return 1;
    }

    // If clock2 is greater in all dimensions, it causally follows clock1
    if (clock2Greater && !clock1Greater) {
      return -1;
    }

    // Otherwise, they are concurrent (or identical)
    return 0;
  }

  /**
   * Check if two operations are concurrent
   * (neither causally precedes the other)
   */
  areConcurrent(clock1: VectorClock, clock2: VectorClock): boolean {
    return this.compare(clock1, clock2) === 0;
  }
}