Server Setup and Implementation

The @vuer-ai/vuer-rtc-server package provides production-ready server infrastructure for real-time collaborative applications using CRDT operations. It includes MongoDB persistence, WebSocket transport, and comprehensive state management.

Installation

Install the server package in your Node.js project:

pnpm add @vuer-ai/vuer-rtc-server

The package requires Node.js 18+ and the following dependencies (installed automatically):

  • @vuer-ai/vuer-rtc - Core CRDT operations
  • @prisma/client - Database ORM
  • ws - WebSocket server

Environment Configuration

Create a .env file in your project root:

# Required: MongoDB connection string with replica set
DATABASE_URL="mongodb://localhost:27017/vuer-rtc?replicaSet=rs0"

# Optional: Server port (default: 8080)
PORT=8080

# Optional: Redis URL (for multi-instance scaling)
REDIS_URL="redis://localhost:6379"

Required Environment Variables

VariableDescriptionExample
DATABASE_URLMongoDB connection string with replica setmongodb://localhost:27017/vuer-rtc?replicaSet=rs0

Optional Environment Variables

VariableDefaultDescription
PORT8080HTTP/WebSocket server port
REDIS_URL-Redis connection URL (required for multi-instance deployments)

MongoDB Replica Set Setup

The server requires MongoDB with replica set support for transactions and change streams.

Development (Local)

Use Docker Compose for quick local setup:

# Clone or download docker-compose.yml from the repository
curl -O https://raw.githubusercontent.com/vuer-ai/vuer-rtc/main/docker/docker-compose.yml

# Start MongoDB and Redis
docker compose up -d

# Wait for MongoDB replica set initialization (about 30 seconds)
docker compose logs -f mongo

The docker-compose.yml configuration:

  • MongoDB 7 with replica set rs0 on port 27017
  • Redis 7 on port 6379
  • Automatic replica set initialization via healthcheck

Production Setup

For production deployments, use a managed MongoDB service:

MongoDB Atlas (Recommended):

DATABASE_URL="mongodb+srv://username:password@cluster.mongodb.net/vuer-rtc?retryWrites=true&w=majority"

Self-Hosted Replica Set:

# Initialize replica set on primary server
mongosh
> rs.initiate({
  _id: "rs0",
  members: [
    { _id: 0, host: "mongo1.example.com:27017" },
    { _id: 1, host: "mongo2.example.com:27017" },
    { _id: 2, host: "mongo3.example.com:27017" }
  ]
})

# Connection string
DATABASE_URL="mongodb://mongo1.example.com:27017,mongo2.example.com:27017,mongo3.example.com:27017/vuer-rtc?replicaSet=rs0"

Database Schema Initialization

After configuring DATABASE_URL, initialize the Prisma schema:

# Generate Prisma client
pnpm exec prisma generate

# Push schema to database (creates collections)
pnpm exec prisma db push

This creates 4 MongoDB collections:

CollectionPurpose
DocumentScene metadata and current state snapshots
OperationIndividual CRDT operations with vector clocks
JournalBatchBatched write-ahead log (33ms batching)
SessionClient connections, presence, and clock state

Redis Configuration (Optional)

Redis is required for horizontal scaling across multiple server instances. The broker uses Redis pub/sub to synchronize room state and member presence.

Development (Local)

Redis is included in the Docker Compose setup:

docker compose up -d redis

Production Setup

Use a managed Redis service:

# Redis Cloud, AWS ElastiCache, etc.
REDIS_URL="redis://username:password@redis.example.com:6379"

# Redis Cluster
REDIS_URL="redis://redis-cluster.example.com:6379?cluster=true"

Scaling Considerations

  • Single Instance: No Redis required, uses InMemoryBroker
  • Multi-Instance: Redis required for RedisBroker (not yet implemented)
  • Load Balancing: Use sticky sessions or implement Redis-backed broker

Starting the Server

Basic Usage

import { createServer } from 'http';
import { WebSocketServer } from 'ws';
import { InMemoryBroker, RTCServer } from '@vuer-ai/vuer-rtc-server';
import { createPrismaClient } from '@vuer-ai/vuer-rtc-server/persistence';
import { JournalService } from '@vuer-ai/vuer-rtc-server/journal';

const PORT = Number(process.env.PORT) || 8080;
const prisma = createPrismaClient();

const broker = new InMemoryBroker();
const journalService = new JournalService(prisma);

// Wire broker clocks into journal for safe compaction
journalService.setMemberClockProvider(async (docId: string) => {
  const doc = await prisma.document.findUnique({ where: { id: docId } });
  if (!doc) return [];
  const members = await broker.getMembers(doc.name);
  return Array.from(members.values())
    .filter(m => m.connected)
    .map(m => m.vectorClock);
});

journalService.startCompactionLoop();

const rtcServer = new RTCServer(broker, {
  async processMessage(roomId: string, msg: any) {
    const docId = await ensureDocument(roomId);
    return journalService.processMessage(docId, msg);
  },
  async getStateForClient(roomId: string) {
    const docId = await ensureDocument(roomId);
    return journalService.getStateForClient(docId);
  },
});

const server = createServer((req, res) => {
  res.writeHead(200).end('OK');
});

const wss = new WebSocketServer({ server });

wss.on('connection', (ws, req) => {
  const url = new URL(req.url ?? '/', `http://${req.headers.host}`);
  const match = url.pathname.match(/^\/ws\/([^/]+)$/);
  if (!match) {
    ws.close(4000, 'Invalid path');
    return;
  }
  const roomId = decodeURIComponent(match[1]);
  const sessionId = url.searchParams.get('sessionId');
  if (!sessionId) {
    ws.close(4001, 'Missing sessionId');
    return;
  }
  rtcServer.handleConnection(ws, roomId, sessionId);
});

server.listen(PORT, () => {
  console.log(`Server listening on http://localhost:${PORT}`);
});

Using the Built-in Server

The package includes a complete production-ready server:

# Build and run
pnpm build
pnpm serve

WebSocket URL format:

ws://localhost:8080/ws/{roomId}?sessionId={sessionId}

REST API endpoints:

  • GET /api/stats - Database statistics
  • GET /api/documents - List all documents
  • GET /api/documents/:id - Get document details
  • GET /api/documents/:id/journal - Get journal batches
  • GET /api/rooms/:roomId/state - Get room state
  • DELETE /api/rooms/:roomId - Clear room (keeps connections)

Production Deployment

Docker Deployment

Create a Dockerfile:

FROM node:18-alpine AS base
RUN corepack enable

FROM base AS deps
WORKDIR /app
COPY package.json pnpm-lock.yaml ./
RUN pnpm install --frozen-lockfile

FROM base AS builder
WORKDIR /app
COPY --from=deps /app/node_modules ./node_modules
COPY . .
RUN pnpm exec prisma generate
RUN pnpm build

FROM base AS runner
WORKDIR /app
ENV NODE_ENV=production

COPY --from=builder /app/node_modules ./node_modules
COPY --from=builder /app/dist ./dist
COPY --from=builder /app/prisma ./prisma
COPY --from=builder /app/package.json ./

EXPOSE 8080
CMD ["node", "dist/serve.js"]

Build and run:

docker build -t vuer-rtc-server .
docker run -p 8080:8080 \
  -e DATABASE_URL="mongodb://mongo:27017/vuer-rtc?replicaSet=rs0" \
  vuer-rtc-server

Docker Compose Example

Complete production setup with MongoDB and Redis:

version: '3.8'

services:
  server:
    build: .
    ports:
      - "8080:8080"
    environment:
      DATABASE_URL: mongodb://mongo:27017/vuer-rtc?replicaSet=rs0
      REDIS_URL: redis://redis:6379
      PORT: 8080
    depends_on:
      mongo:
        condition: service_healthy
      redis:
        condition: service_started
    restart: unless-stopped

  mongo:
    image: mongo:7
    command: ["--replSet", "rs0", "--bind_ip_all"]
    volumes:
      - mongo-data:/data/db
    healthcheck:
      test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'localhost:27017'}]}) }" | mongosh --quiet
      interval: 5s
      timeout: 30s
      retries: 30

  redis:
    image: redis:7-alpine
    command: redis-server --appendonly yes
    volumes:
      - redis-data:/data

volumes:
  mongo-data:
  redis-data:

Cloud Deployment

Recommended Stack:

  • Application: AWS ECS, Google Cloud Run, or Railway
  • Database: MongoDB Atlas (M10+ with replica set)
  • Cache: Redis Cloud or AWS ElastiCache
  • Load Balancer: AWS ALB with WebSocket support

Environment Variables:

DATABASE_URL="mongodb+srv://cluster.mongodb.net/vuer-rtc"
REDIS_URL="redis://redis.cloud:6379"
PORT=8080
NODE_ENV=production

Horizontal Scaling

For multi-instance deployments:

  1. Use Redis-backed broker (when available)
  2. Configure sticky sessions on load balancer (WebSocket affinity)
  3. Share MongoDB cluster across all instances
  4. Monitor connection limits on MongoDB and Redis

Current limitation: InMemoryBroker only supports single-instance deployment. RedisBroker implementation is planned for horizontal scaling.

Health Checks and Monitoring

Health Check Endpoint

Add a health check route:

server.on('request', (req, res) => {
  if (req.url === '/health') {
    prisma.$queryRaw`SELECT 1`
      .then(() => res.writeHead(200).end('OK'))
      .catch(() => res.writeHead(503).end('Database unavailable'));
  }
});

Monitoring Metrics

Track these key metrics:

MetricEndpoint/MethodDescription
Active connectionswss.clients.sizeCurrent WebSocket connections
Room countbroker.getRooms()Number of active rooms
Database sizeGET /api/stats/sizesStorage usage per collection
Operation throughputCustom counterMessages/second processed
Journal batch sizeMonitor JournalBatch.operations.lengthAverage operations per batch

Example Prometheus Metrics

import { register, Counter, Gauge } from 'prom-client';

const messageCounter = new Counter({
  name: 'vuer_rtc_messages_total',
  help: 'Total CRDT messages processed',
});

const connectionGauge = new Gauge({
  name: 'vuer_rtc_connections',
  help: 'Current WebSocket connections',
});

rtcServer.on('message', () => messageCounter.inc());
wss.on('connection', () => connectionGauge.inc());
wss.on('close', () => connectionGauge.dec());

server.on('request', (req, res) => {
  if (req.url === '/metrics') {
    res.setHeader('Content-Type', register.contentType);
    register.metrics().then(metrics => res.end(metrics));
  }
});

Logging

Configure structured logging for production:

import pino from 'pino';

const logger = pino({
  level: process.env.LOG_LEVEL || 'info',
  transport: process.env.NODE_ENV === 'development'
    ? { target: 'pino-pretty' }
    : undefined,
});

rtcServer.on('message', (msg) => {
  logger.debug({ msgId: msg.id, roomId: msg.roomId }, 'Message processed');
});

Performance Tuning

Journal Compaction

Configure compaction frequency in JournalService:

// Default: compact every 5 minutes
journalService.startCompactionLoop({
  intervalMs: 5 * 60 * 1000,
  minOperations: 1000, // Only compact if > 1000 operations
});

Connection Limits

MongoDB and WebSocket limits:

// MongoDB connection pool (in DATABASE_URL)
DATABASE_URL="mongodb://localhost:27017/vuer-rtc?replicaSet=rs0&maxPoolSize=50"

// WebSocket max connections (OS limits apply)
wss.setMaxListeners(10000);

Memory Management

Monitor and limit room history:

// Clear old rooms periodically
setInterval(async () => {
  const rooms = await broker.getRooms();
  for (const roomId of rooms) {
    const members = await broker.getMembers(roomId);
    if (members.size === 0) {
      await rtcServer.clearRoom(roomId);
    }
  }
}, 60 * 60 * 1000); // Every hour

Architecture Overview

vuer-rtc uses explicit operations where each message specifies its merge behavior via ot. This simplifies server implementation significantly -- the server just applies operations and broadcasts.

Architecture Overview

┌────────────────────────────────────────────────────────────────────────────┐
│                              Server                                        │
│  ┌──────────────────────────────────────────────────────────────────────┐  │
│  │                         WebSocket Handler                            │  │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────────────────┐   │  │
│  │  │  Receive    │───▶│  Validate   │───▶│  Apply Operations       │   │  │
│  │  │  Message    │    │  & Dedup    │      (dispatcher.ts)        │   │  │
│  │  └─────────────┘    └─────────────┘    └───────────┬─────────────┘   │  │
│  │                                                    │                 │  │
│  │                     ┌──────────────────────────────┼─────────────┐   │  │
│  │                     │                              ▼             │   │  │
│  │                     │  ┌─────────────────────────────────────┐   │   │  │
│  │                     │  │           SceneGraph                │   │   │  │
│  │                     │  │  ┌───────────────────────────────┐  │   │   │  │
│  │                     │  │  │ nodes: { [key]: SceneNode }   │  │   │   │  │
│  │                     │  │  │   - position, rotation, etc.  │  │   │   │  │
│  │                     │  │  │   - schema-free properties    │  │   │   │  │
│  │                     │  │  └───────────────────────────────┘  │   │   │  │
│  │                     │  └─────────────────────────────────────┘   │   │  │
│  │                     │                    ▲                       │   │  │
│  │                     │                    │ rebuild               │   │  │
│  │                     │                    │                       │   │  │
│  │                     │  ┌─────────────────┴───────────────────┐   │   │  │
│  │                     │  │        Journal (Event Log)          │   │   │  │
│  │                     │  │  ┌───────────────────────────────┐  │   │   │  │
│  │                     │  │  │ Entry { msg, deletedAt? }     │  │   │   │  │
│  │                     │  │  │ Entry { msg, deletedAt? }     │  │   │   │  │
│  │                     │  │  │ ...                           │  │   │   │  │
│  │                     │  │  └───────────────────────────────┘  │   │   │  │
│  │                     │  └─────────────────────────────────────┘   │   │  │
│  │                     │                In-Memory                   │   │  │
│  │                     └────────────────────────────────────────────┘   │  │
│  └──────────────────────────────────────────────────────────────────────┘  │
│                                        │                                   │
│            ┌───────────────────────────┼───────────────────────────┐       │
│            ▼                           ▼                           ▼       │
│  ┌─────────────────┐         ┌─────────────────┐         ┌──────────────┐  │
│  │   Broadcast     │         │   Persist to    │         │    Send      │  │
│  │   to Clients    │         │    MongoDB      │         │    Ack       │  │
│  └─────────────────┘         └─────────────────┘         └──────────────┘  │
└────────────────────────────────────────────────────────────────────────────┘
        ┌───────────────────────────────┼───────────────────────────┐
        ▼                               ▼                           ▼
┌───────────────┐             ┌───────────────┐             ┌───────────────┐
│   Client A    │             │   Client B    │             │   Client C  (WebSocket)  (WebSocket)  (WebSocket)└───────────────┘             └───────────────┘             └───────────────┘

Database Schema

The server persists to 4 MongoDB collections:

CollectionPurpose
DocumentScene metadata + currentState (schema-free JSON snapshot)
OperationIndividual CRDT operations with vector clocks
JournalBatchBatched write-ahead log (33ms batching)
SessionClient connections, presence, and clock state
┌─────────────────────────────────────────────────────────────────┐
│                         MongoDB                                 │
│  ┌─────────────┐  ┌─────────────┐  ┌────────────┐  ┌─────────┐  │
│  │  Document   │  │  Operation  │  │ JournalBatch│ │ Session │  │
│  │             │  │             │  │             │  │         │  │
│  │ currentState│  │ vectorClock │  │ operations[]│  │presence │  │
 (JSON)      │  │ lamportTime │  │ startTime   │  │clockValue│ │
│  │ version     │  │ data        │  │ endTime     │  │connected│  │
│  └─────────────┘  └─────────────┘  └────────────┘  └─────────┘  │
└─────────────────────────────────────────────────────────────────┘

Data Flow

Client                          Server                         MongoDB
  │                               │                               │
  │  CRDTMessage                  │                               │
{id, ops[], timestamp}       │                               │
  │──────────────────────────────▶│                               │
  │                               │                               │
  │                               │  1. Deduplicate (processedIds)  │                               │  2. Apply ops to SceneGraph   │
  │                               │  3. Add to Journal            │
  │                               │                               │
  │                               │  Persist                      │
  │                               │──────────────────────────────▶│
  │                               │                               │
  │              Ack {msgId}      │                               │
  │◀──────────────────────────────│                               │
  │                               │                               │
  │                               │  Broadcast to other clients   │
│──────────────────────────────▶│ (Client B, C)
  │                               │                               │

Minimal Server

The server only needs to:

  1. Receive messages from clients
  2. Apply operations using the dispatcher
  3. Broadcast to other clients
  4. Send acknowledgements back
import { applyMessage } from '@vuer-ai/vuer-rtc/operations';
import type { CRDTMessage, SceneGraph } from '@vuer-ai/vuer-rtc/operations';

let state: SceneGraph = { nodes: {}, rootKey: '' };
const processedIds = new Set<string>();

function handleMessage(ws: WebSocket, msg: CRDTMessage) {
  // Idempotent - skip duplicates
  if (processedIds.has(msg.id)) {
    ws.send(JSON.stringify({ mtype: 'ack', msgId: msg.id }));
    return;
  }
  processedIds.add(msg.id);

  // Apply operations to state
  state = applyMessage(state, msg);

  // Send ack to sender
  ws.send(JSON.stringify({ mtype: 'ack', msgId: msg.id }));

  // Broadcast to other clients
  broadcast(msg, ws);
}

Why No Schema Needed?

With explicit operations like number.add, vector3.set, the operation itself specifies merge behavior:

// Client sends:
{
  ot: 'number.add',  // <-- This tells the server HOW to apply
  key: 'player',
  path: 'score',
  value: 10
}

The server doesn't need a schema to know score is additive - the ot: 'number.add' already says so!

Vector Clocks (Optional)

Vector clocks help detect concurrent operations for advanced conflict handling. For most use cases, Lamport timestamps (already in each message) are sufficient.

import { VectorClockManager } from '@vuer-ai/vuer-rtc/state';

const clockManager = new VectorClockManager();

// Compare two clocks
const comparison = clockManager.compare(clock1, clock2);
// 1 = clock1 > clock2 (happened after)
// -1 = clock1 < clock2 (happened before)
// 0 = concurrent (conflict!)

if (comparison === 0) {
  // Concurrent operations - both valid, apply in lamportTime order
}

Handling Undo/Redo

Meta operations (meta.undo, meta.redo) work by referencing a target message:

function handleMetaOps(msg: CRDTMessage, journal: JournalEntry[]) {
  for (const op of msg.ops) {
    if (op.ot === 'meta.undo') {
      const target = journal.find(e => e.msg.id === op.targetMsgId);
      if (target) target.deletedAt = msg.timestamp;
    } else if (op.ot === 'meta.redo') {
      const target = journal.find(e => e.msg.id === op.targetMsgId);
      if (target) delete target.deletedAt;
    }
  }
}

Complete Example

import WebSocket from 'ws';
import { applyMessage, createEmptyGraph } from '@vuer-ai/vuer-rtc/operations';
import type { CRDTMessage, SceneGraph } from '@vuer-ai/vuer-rtc/operations';

interface JournalEntry {
  msg: CRDTMessage;
  deletedAt?: number;
}

class RTCServer {
  private state: SceneGraph = createEmptyGraph();
  private journal: JournalEntry[] = [];
  private processedIds = new Set<string>();
  private clients = new Set<WebSocket>();

  handleConnection(ws: WebSocket) {
    this.clients.add(ws);

    // Send current state to new client
    ws.send(JSON.stringify({
      mtype: 'init',
      state: this.state,
      journal: this.journal.map(e => e.msg),
    }));

    ws.on('message', (data) => {
      const msg = JSON.parse(data.toString()) as CRDTMessage;
      this.handleMessage(ws, msg);
    });

    ws.on('close', () => this.clients.delete(ws));
  }

  private handleMessage(sender: WebSocket, msg: CRDTMessage) {
    // Idempotent
    if (this.processedIds.has(msg.id)) {
      sender.send(JSON.stringify({ mtype: 'ack', msgId: msg.id }));
      return;
    }
    this.processedIds.add(msg.id);

    // Handle meta ops (undo/redo)
    for (const op of msg.ops) {
      if (op.ot === 'meta.undo') {
        const target = this.journal.find(e => e.msg.id === (op as any).targetMsgId);
        if (target) target.deletedAt = msg.timestamp;
      } else if (op.ot === 'meta.redo') {
        const target = this.journal.find(e => e.msg.id === (op as any).targetMsgId);
        if (target) delete target.deletedAt;
      }
    }

    // Add to journal
    this.journal.push({ msg });

    // Rebuild state from journal (skip deleted entries)
    this.rebuildState();

    // Ack sender
    sender.send(JSON.stringify({ mtype: 'ack', msgId: msg.id }));

    // Broadcast to others
    for (const client of this.clients) {
      if (client !== sender && client.readyState === WebSocket.OPEN) {
        client.send(JSON.stringify({ mtype: 'message', msg }));
      }
    }
  }

  private rebuildState() {
    let state = createEmptyGraph();
    for (const entry of this.journal) {
      if (entry.deletedAt) continue;
      const realOps = entry.msg.ops.filter(op => !op.ot.startsWith('meta.'));
      if (realOps.length > 0) {
        state = applyMessage(state, { ...entry.msg, ops: realOps });
      }
    }
    this.state = state;
  }
}

Source Files

dispatcher.ts (Operation Application)

/**
 * Operation Dispatcher
 *
 * Applies CRDTMessage operations to a SceneGraph.
 * Each operation is dispatched to its corresponding "apply" function.
 *
 * Uses shallow cloning for immutability without immer overhead.
 */

import type { SceneGraph, CRDTMessage, Operation } from './OperationTypes.js';
import type { OpMeta } from './apply/types.js';
import * as registry from './apply/index.js';

/**
 * Handler map: ot -> apply function
 */
const handlers: Record<string, (graph: SceneGraph, op: Operation, meta: OpMeta) => void> = {
  // Number operations
  'number.set': registry.NumberSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'number.add': registry.NumberAdd as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'number.multiply': registry.NumberMultiply as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'number.min': registry.NumberMin as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'number.max': registry.NumberMax as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // String operations
  'string.set': registry.StringSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'string.concat': registry.StringConcat as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Text CRDT operations (character-level collaborative editing)
  'text.init': registry.TextInit as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'text.insert': registry.TextInsert as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'text.delete': registry.TextDelete as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'text.replace': registry.TextReplace as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Boolean operations
  'boolean.set': registry.BooleanSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'boolean.or': registry.BooleanOr as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'boolean.and': registry.BooleanAnd as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Vector3 operations
  'vector3.set': registry.Vector3Set as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'vector3.add': registry.Vector3Add as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'vector3.multiply': registry.Vector3Multiply as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'vector3.applyEuler': registry.Vector3ApplyEuler as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'vector3.applyQuaternion': registry.Vector3ApplyQuaternion as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Euler operations
  'euler.set': registry.EulerSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'euler.add': registry.EulerAdd as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Quaternion operations
  'quaternion.set': registry.QuaternionSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'quaternion.multiply': registry.QuaternionMultiply as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Color operations
  'color.set': registry.ColorSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'color.blend': registry.ColorBlend as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Array operations
  'array.set': registry.ArraySet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'array.push': registry.ArrayPush as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'array.remove': registry.ArrayRemove as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'array.union': registry.ArrayUnion as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Object operations
  'object.set': registry.ObjectSet as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'object.merge': registry.ObjectMerge as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,

  // Node operations
  'node.insert': registry.NodeInsert as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'node.remove': registry.NodeRemove as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
  'node.move': registry.NodeMove as (graph: SceneGraph, op: Operation, meta: OpMeta) => void,
};

/**
 * Apply a single operation to a SceneGraph (mutates in place)
 */
export function applyOperation(
  graph: SceneGraph,
  op: Operation,
  meta: OpMeta
): void {
  const handler = handlers[op.ot];
  if (handler) {
    handler(graph, op, meta);
  } else {
    console.warn(`Unknown ot: ${op.ot}`);
  }
}

/**
 * Compare operations for sorting by Lamport timestamp.
 * CRDT invariant: operations must be applied in causal order (seq → ts → id).
 *
 * This ensures that:
 * 1. Operations with lower Lamport clocks are applied first
 * 2. If Lamport clocks are equal, wall-clock time breaks the tie
 * 3. If both are equal, lexicographic ID order ensures determinism
 *
 * @param a - First operation
 * @param b - Second operation
 * @returns Negative if a < b, positive if a > b, 0 if equal
 */
function compareOperations(a: Operation, b: Operation): number {
  const aOp = a as any;
  const bOp = b as any;

  // Compare by Lamport clock (seq)
  if (aOp.seq !== undefined && bOp.seq !== undefined) {
    if (aOp.seq !== bOp.seq) return aOp.seq - bOp.seq;
  }

  // If equal or missing, compare by wall-clock time (ts)
  if (aOp.ts !== undefined && bOp.ts !== undefined) {
    if (aOp.ts !== bOp.ts) return aOp.ts - bOp.ts;
  }

  // Final fallback: compare by ID (lexicographic)
  if (aOp.id && bOp.id) {
    return String(aOp.id).localeCompare(String(bOp.id));
  }

  // If no metadata, preserve original order
  return 0;
}

/**
 * Shallow clone the graph and modified nodes
 */
function shallowCloneGraph(graph: SceneGraph): SceneGraph {
  return {
    ...graph,
    nodes: { ...graph.nodes },
  };
}

/**
 * Apply a CRDTMessage to a SceneGraph (immutable)
 *
 * Creates a shallow clone of the graph before applying operations.
 *
 * @param graph - Current scene graph state
 * @param msg - CRDT message containing operations
 * @returns New scene graph state with operations applied
 */
export function applyMessage(graph: SceneGraph, msg: CRDTMessage): SceneGraph {
  // Shallow clone for immutability
  const newGraph = shallowCloneGraph(graph);

  const meta: OpMeta = {
    client: msg.client,
    clock: msg.clock,
    lt: msg.lt,
    ts: msg.ts,
  };

  // Sort operations by Lamport timestamp to ensure causal order
  // This is critical for CRDT correctness, especially for text operations
  // where replace ops may reference IDs from insert ops
  const sortedOps = [...msg.ops].sort(compareOperations);

  for (const op of sortedOps) {
    // Determine all node keys that this operation will mutate
    const keysToClone: string[] = [];
    if (op.key && newGraph.nodes[op.key]) {
      keysToClone.push(op.key);
    }
    if (op.ot === 'node.move') {
      const { nodeKey, newParent } = (op as any).value;
      if (nodeKey && newGraph.nodes[nodeKey]) keysToClone.push(nodeKey);
      if (newParent && newGraph.nodes[newParent]) keysToClone.push(newParent);
    }
    if (op.ot === 'node.remove') {
      const nodeKey = (op as any).value;
      if (typeof nodeKey === 'string' && newGraph.nodes[nodeKey]) keysToClone.push(nodeKey);
    }

    for (const k of new Set(keysToClone)) {
      const orig = newGraph.nodes[k];
      newGraph.nodes[k] = {
        ...orig,
        children: orig.children ? [...orig.children] : [],
      };
    }

    applyOperation(newGraph, op, meta);
  }

  return newGraph;
}

/**
 * Apply a CRDTMessage to a SceneGraph (mutable)
 *
 * Mutates the graph directly for better performance.
 *
 * @param graph - Scene graph state to mutate
 * @param msg - CRDT message containing operations
 */
export function applyMessageMut(graph: SceneGraph, msg: CRDTMessage): void {
  const meta: OpMeta = {
    client: msg.client,
    clock: msg.clock,
    lt: msg.lt,
    ts: msg.ts,
  };

  // Sort operations by Lamport timestamp to ensure causal order
  const sortedOps = [...msg.ops].sort(compareOperations);

  for (const op of sortedOps) {
    applyOperation(graph, op, meta);
  }
}

/**
 * Apply multiple CRDTMessages to a SceneGraph (immutable)
 *
 * @param graph - Current scene graph state
 * @param messages - Array of CRDT messages to apply
 * @returns New scene graph state with all operations applied
 */
export function applyMessages(graph: SceneGraph, messages: CRDTMessage[]): SceneGraph {
  let current = graph;
  for (const msg of messages) {
    current = applyMessage(current, msg);
  }
  return current;
}

/**
 * Apply multiple CRDTMessages to a SceneGraph (mutable)
 *
 * @param graph - Scene graph state to mutate
 * @param messages - Array of CRDT messages to apply
 */
export function applyMessagesMut(graph: SceneGraph, messages: CRDTMessage[]): void {
  for (const msg of messages) {
    applyMessageMut(graph, msg);
  }
}

/**
 * Create an empty scene graph
 */
export function createEmptyGraph(): SceneGraph {
  return {
    nodes: {},
    rootKey: '',
    lww: {},
    tombstones: {},
  };
}

VectorClock.ts

/**
 * VectorClock - CRDT-inspired vector clock implementation
 *
 * Vector clocks provide causal ordering of operations in a distributed system.
 * Each session maintains a counter, and clocks are compared to detect:
 * - Causal ordering (A happened before B)
 * - Concurrent operations (A and B are independent)
 */

export type VectorClock = Record<string, number>;

export class VectorClockManager {
  /**
   * Create a new vector clock for a client
   * Initializes the client's counter to 0
   */
  create(client: string): VectorClock {
    return { [client]: 0 };
  }

  /**
   * Increment the counter for a client
   * Returns a new clock (immutable)
   */
  increment(clock: VectorClock, client: string): VectorClock {
    const currentValue = clock[client] || 0;
    return {
      ...clock,
      [client]: currentValue + 1,
    };
  }

  /**
   * Merge two vector clocks
   * Takes the maximum value for each client
   * Used when receiving remote operations
   */
  merge(clock1: VectorClock, clock2: VectorClock): VectorClock {
    const merged: VectorClock = { ...clock1 };

    Object.entries(clock2).forEach(([client, count]) => {
      merged[client] = Math.max(merged[client] || 0, count);
    });

    return merged;
  }

  /**
   * Compare two vector clocks
   *
   * Returns:
   *  1 if clock1 > clock2 (clock1 causally after clock2)
   * -1 if clock1 < clock2 (clock1 causally before clock2)
   *  0 if concurrent (neither causally precedes the other)
   */
  compare(clock1: VectorClock, clock2: VectorClock): number {
    const allSessionIds = new Set([
      ...Object.keys(clock1),
      ...Object.keys(clock2),
    ]);

    let clock1Greater = false;
    let clock2Greater = false;

    allSessionIds.forEach((client) => {
      const val1 = clock1[client] || 0;
      const val2 = clock2[client] || 0;

      if (val1 > val2) {
        clock1Greater = true;
      }
      if (val2 > val1) {
        clock2Greater = true;
      }
    });

    // If clock1 is greater in all dimensions, it causally follows clock2
    if (clock1Greater && !clock2Greater) {
      return 1;
    }

    // If clock2 is greater in all dimensions, it causally follows clock1
    if (clock2Greater && !clock1Greater) {
      return -1;
    }

    // Otherwise, they are concurrent (or identical)
    return 0;
  }

  /**
   * Check if two operations are concurrent
   * (neither causally precedes the other)
   */
  areConcurrent(clock1: VectorClock, clock2: VectorClock): boolean {
    return this.compare(clock1, clock2) === 0;
  }
}