Files

9.3 KiB

Task 1.5.1 — WS server scaffold + heartbeat

Phase: 1.5 — Live broadcast Status: Not started Depends on: 1.8 (main wiring), 1.9 (observability) Wiki refs: docs/wiki/synthesis/processor-ws-contract.md §Endpoint, §Transport; docs/wiki/concepts/live-channel-architecture.md

Goal

Stand up a WebSocket server inside the Processor process: bind to a configurable port, accept upgrades, dispatch incoming messages to a router, send 30s pings, hold a typed LiveConnection per client. No auth, no subscriptions yet — those land in 1.5.2 and 1.5.3. This task is the lifecycle and message-loop skeleton.

The WS server runs on its own HTTP server (separate from the Phase 1 metrics/health server on :9090) so the reverse proxy can route them to different paths and the failure modes don't entangle.

Deliverables

  • src/live/server.ts exporting:
    • createLiveServer(config, logger, metrics): LiveServer — factory.
    • LiveServer interface: start(): Promise<void> (binds and listens), stop(timeoutMs?: number): Promise<void> (closes new connections, sends a close frame to existing, waits for them to drain or force-closes after the timeout).
    • type LiveConnection = { id: string; ws: WebSocket; remoteAddr: string; openedAt: Date; lastSeenAt: Date } — opaque identity, augmented in later tasks.
    • A pluggable onMessage(conn: LiveConnection, raw: string): Promise<void> handler — for now, just logs at debug and replies with { type: 'error', code: 'not-implemented' }. Tasks 1.5.2 and 1.5.3 attach the real handler.
  • src/live/protocol.ts — zod schemas for inbound message envelopes:
    const InboundMessage = z.discriminatedUnion('type', [
      z.object({ type: z.literal('subscribe'), topic: z.string(), id: z.string().optional() }),
      z.object({ type: z.literal('unsubscribe'), topic: z.string(), id: z.string().optional() }),
    ]);
    
    Outbound types declared but not constructed yet (subscribed/position/unsubscribed/error).
  • src/main.ts updated to create + start the live server alongside the existing consumer; SIGTERM stops both in the right order (live server first so no new connections during drain; consumer second so the durable-write path completes its in-flight batch).
  • New config keys (zod schema in src/config/load.ts):
    • LIVE_WS_PORT (default 8081).
    • LIVE_WS_HOST (default 0.0.0.0).
    • LIVE_WS_PING_INTERVAL_MS (default 30_000).
    • LIVE_WS_DRAIN_TIMEOUT_MS (default 5_000).
  • New Prometheus metrics (in src/observability/metrics.ts):
    • processor_live_connections{instance_id} (gauge) — current open connections.
    • processor_live_messages_inbound_total{instance_id, type} (counter).
    • processor_live_messages_outbound_total{instance_id, type} (counter).
  • test/live-server.test.ts:
    • Server starts on a random port, accepts a connection, ping is sent within PING_INTERVAL_MS + 100ms, pong updates lastSeenAt.
    • Inbound message that fails zod validation receives an { type: 'error', code: 'protocol-violation' } reply and the connection stays open.
    • stop() sends a close frame to existing connections and resolves within the drain timeout.

Specification

Library choice

ws (the package, not @types/ws alone). Lightweight, minimal API, supports noServer: true for attaching to an existing http.createServer. Avoid uWebSockets.js — performance is great but the C++ binding makes deployment / testcontainers-friendliness fiddly.

Server attach pattern

const httpServer = http.createServer((req, res) => {
  // Optional: a small /healthz endpoint specific to the live server, separate
  // from the Phase 1 metrics/health server. For now, return 404 on HTTP requests
  // — the only thing this server does is upgrade.
  res.writeHead(404).end();
});

const wss = new WebSocketServer({ noServer: true });

httpServer.on('upgrade', (req, socket, head) => {
  // Auth happens here in task 1.5.2. For now, just accept.
  wss.handleUpgrade(req, socket, head, (ws) => {
    wss.emit('connection', ws, req);
  });
});

wss.on('connection', (ws, req) => {
  const conn: LiveConnection = {
    id: nanoid(),
    ws,
    remoteAddr: req.socket.remoteAddress ?? 'unknown',
    openedAt: new Date(),
    lastSeenAt: new Date(),
  };
  // ... attach handlers
});

Heartbeat

Use the ws library's built-in ping/pong:

const pingTimer = setInterval(() => {
  for (const conn of connections.values()) {
    if (conn.ws.readyState !== WebSocket.OPEN) continue;
    conn.ws.ping();
    // Optional: track outstanding pings; close if pong doesn't arrive in N seconds.
  }
}, config.LIVE_WS_PING_INTERVAL_MS);

ws automatically responds to inbound pings with pongs, and emits 'pong' on the server when a client responds. Update lastSeenAt in the pong handler. Don't roll your own ping in the application protocol — the WebSocket frame-level ping/pong is faster, browser-built-in, and doesn't pollute the message log.

Inbound message handling

ws.on('message', async (data) => {
  conn.lastSeenAt = new Date();
  const raw = data.toString('utf8');
  let parsed;
  try {
    parsed = InboundMessage.parse(JSON.parse(raw));
  } catch (err) {
    metrics.liveMessagesInbound.inc({ type: 'invalid' });
    sendOutbound(conn, { type: 'error', code: 'protocol-violation', message: 'Invalid message envelope' });
    return;
  }
  metrics.liveMessagesInbound.inc({ type: parsed.type });
  await onMessage(conn, parsed);
});

onMessage is the pluggable handler that 1.5.2 (auth gate) and 1.5.3 (subscription registry) replace.

Outbound message helper

function sendOutbound(conn: LiveConnection, msg: OutboundMessage): void {
  if (conn.ws.readyState !== WebSocket.OPEN) return;
  conn.ws.send(JSON.stringify(msg));
  metrics.liveMessagesOutbound.inc({ type: msg.type });
}

Centralised so back-pressure handling (1.5.4) and message logging hook in one place later.

Close codes

ws lets you specify close codes when calling ws.close(code, reason). Reserve these:

Code Meaning Where set
1000 Normal closure Default for clean disconnect
1001 Server going away stop() during shutdown
4401 Unauthorized Task 1.5.2
4403 Forbidden Task 1.5.3 (for revoked authorization, not used in pilot)

Document these in protocol.ts as constants.

Drain on stop()

async function stop(timeoutMs = config.LIVE_WS_DRAIN_TIMEOUT_MS) {
  // 1. Stop accepting new connections.
  httpServer.close();
  // 2. Send close frame to every open connection.
  for (const conn of connections.values()) {
    conn.ws.close(1001, 'server shutting down');
  }
  // 3. Wait for them to finish, with timeout.
  const deadline = Date.now() + timeoutMs;
  while (connections.size > 0 && Date.now() < deadline) {
    await sleep(50);
  }
  // 4. Force-terminate any stragglers.
  for (const conn of connections.values()) {
    conn.ws.terminate();
  }
}

Stragglers happen when a client's TCP stack is slow or the network is partitioned. Force-terminate is the right call — we're shutting down anyway.

Logger conventions

  • info: live server starting on :8081, live server ready, live server stopping, live server stopped.
  • debug: connection opened id=... remote=..., connection closed id=... code=... reason=..., inbound message id=... type=....
  • trace: per-message routing detail.

Don't log full WS payloads by default — they may contain large snapshot arrays in later tasks.

Acceptance criteria

  • pnpm typecheck, pnpm lint, pnpm test clean.
  • pnpm dev boots, logs both the consumer lifecycle (Phase 1) and the live server lifecycle.
  • wscat -c ws://localhost:8081 connects; sending a malformed JSON gets the protocol-violation error; sending {"type":"subscribe","topic":"foo"} gets {"type":"error","code":"not-implemented"}.
  • Server pings within LIVE_WS_PING_INTERVAL_MS of connect; client pong updates lastSeenAt.
  • kill -TERM <pid> exits cleanly within LIVE_WS_DRAIN_TIMEOUT_MS + 1s.
  • processor_live_connections gauge moves up on connect, down on disconnect.

Risks / open questions

  • Port conflict with metrics server. Phase 1 binds :9090 for metrics. Live server defaults to :8081. Both can be host-published or only the live one (metrics is internal-only). Document in compose.yaml updates that follow.
  • Reverse-proxy upgrade path. Traefik / Caddy / nginx all support WS upgrade transparently if the path is configured for it. The proxy config lives in trm/deploy; this task doesn't touch it but the README's manual smoke test requires it for end-to-end.
  • Per-connection memory. Each LiveConnection is small (~200 bytes plus the ws library's internal state). At 100 concurrent connections that's tens of KB. Not a concern at pilot scale.

Done

Landed in b8ebbd0. Key deviations from spec:

  • Used crypto.randomUUID() (Node 22 built-in) instead of nanoid — avoids adding a new npm dep beyond ws.
  • Metrics moved to src/shared/types.ts (re-exported from src/core/types.ts) so src/live/server.ts can import it without violating the ESLint import/no-restricted-paths rule.
  • processor_live_connections gauge and processor_live_subscriptions gauge are driven via metrics.observe() (which calls prom-client .set()) rather than inc/dec because the shared Metrics interface has no dec method.