/** * WebSocket live-broadcast server. * * Lifecycle: `createLiveServer()` → `server.start()` → clients connect → * `server.stop()` drains existing connections and closes cleanly. * * Design notes: * - Runs on its own http.Server (separate from the Phase 1 metrics/health server * on :9090) so a proxy can route to different paths and failure modes don't * entangle. * - Auth happens in the `'upgrade'` handler (task 1.5.2). This scaffold accepts * all upgrades and logs the connection. * - Message dispatch is pluggable via the `onMessage` callback so tasks 1.5.2 * and 1.5.3 can attach the real auth/registry handler without touching this * file's lifecycle logic. * - Heartbeat: WS frame-level ping every LIVE_WS_PING_INTERVAL_MS; pong updates * lastSeenAt. Do NOT use application-level ping messages — browser WS * implementations handle frame-level pings natively. */ import * as http from 'node:http'; import * as crypto from 'node:crypto'; import { WebSocketServer, WebSocket } from 'ws'; import type { Logger } from 'pino'; import type { Config } from '../config/load.js'; import type { Metrics } from '../shared/types.js'; import { InboundMessage, WsCloseCodes } from './protocol.js'; import type { OutboundMessage } from './protocol.js'; // --------------------------------------------------------------------------- // Public types // --------------------------------------------------------------------------- /** * Per-connection identity object. Augmented in later tasks (auth adds `user`; * task 1.5.3 adds `cookieHeader`). Exported so the registry, auth, and * broadcast modules can reference the same type. */ export type LiveConnection = { readonly id: string; readonly ws: WebSocket; readonly remoteAddr: string; readonly openedAt: Date; lastSeenAt: Date; }; /** * Message handler callback. The server calls this once per successfully parsed * inbound message. The handler is responsible for sending replies. * * In task 1.5.1 this is a no-op stub that returns `error/not-implemented`. * Tasks 1.5.2 and 1.5.3 replace it with the real auth+registry handler. */ export type MessageHandler = ( conn: LiveConnection, message: InboundMessage, ) => Promise; /** * Lifecycle handle returned by `createLiveServer`. */ export type LiveServer = { /** * Binds the HTTP server and begins accepting WebSocket upgrades. * Resolves once the port is bound and listening. */ readonly start: () => Promise; /** * Sends a close frame to every open connection, stops accepting new ones, * and waits for all connections to drain (or force-terminates after * `timeoutMs` milliseconds). */ readonly stop: (timeoutMs?: number) => Promise; }; // --------------------------------------------------------------------------- // sendOutbound helper // --------------------------------------------------------------------------- /** * Serialises and sends an outbound message to a single connection. * * - Returns immediately if the connection is not OPEN (avoids `send` errors * on closing/closed sockets). * - Checks `bufferedAmount` against the configured back-pressure threshold; * closes the connection with 1008 if the client is too slow to drain its * queue. This prevents one slow client from consuming unbounded server memory. * - Increments the outbound message counter metric. */ export function sendOutbound( conn: LiveConnection, msg: OutboundMessage, metrics: Metrics, backpressureThresholdBytes: number, ): void { if (conn.ws.readyState !== WebSocket.OPEN) return; if (conn.ws.bufferedAmount > backpressureThresholdBytes) { // Client's send queue is backed up. Closing is preferable to silently // dropping messages, because it forces a reconnect and a fresh snapshot // (which is always more valuable than a stale backlog). conn.ws.close( WsCloseCodes.POLICY_VIOLATION, 'back-pressure threshold exceeded', ); return; } conn.ws.send(JSON.stringify(msg)); metrics.inc('processor_live_messages_outbound_total', { type: msg.type }); } // --------------------------------------------------------------------------- // Factory // --------------------------------------------------------------------------- export function createLiveServer( config: Config, logger: Logger, metrics: Metrics, onMessage: MessageHandler, onClose?: (conn: LiveConnection) => void, ): LiveServer { const connections = new Map(); const httpServer = http.createServer((_req, res) => { // This HTTP server only handles WS upgrades. HTTP requests get a 404. res.writeHead(404).end(); }); const wss = new WebSocketServer({ noServer: true }); // ------------------------------------------------------------------------- // Upgrade handler (auth injected in task 1.5.2; accepted immediately here) // ------------------------------------------------------------------------- httpServer.on('upgrade', (req, socket, head) => { wss.handleUpgrade(req, socket, head, (ws) => { wss.emit('connection', ws, req); }); }); // ------------------------------------------------------------------------- // Connection handler // ------------------------------------------------------------------------- wss.on('connection', (ws, req: http.IncomingMessage) => { const conn: LiveConnection = { id: crypto.randomUUID(), ws, remoteAddr: req.socket.remoteAddress ?? 'unknown', openedAt: new Date(), lastSeenAt: new Date(), }; connections.set(conn.id, conn); metrics.observe('processor_live_connections', connections.size); logger.debug( { connId: conn.id, remote: conn.remoteAddr }, 'connection opened', ); // ----------------------------------------------------------------------- // Inbound message handler // ----------------------------------------------------------------------- ws.on('message', (data) => { conn.lastSeenAt = new Date(); const raw = data.toString('utf8'); let parsed: InboundMessage; try { parsed = InboundMessage.parse(JSON.parse(raw)); } catch { metrics.inc('processor_live_messages_inbound_total', { type: 'invalid', instance_id: config.INSTANCE_ID, }); sendOutbound( conn, { type: 'error', code: 'protocol-violation', message: 'Invalid message envelope', }, metrics, config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES, ); return; } metrics.inc('processor_live_messages_inbound_total', { type: parsed.type, instance_id: config.INSTANCE_ID, }); logger.debug( { connId: conn.id, type: parsed.type }, 'inbound message', ); onMessage(conn, parsed).catch((err: unknown) => { logger.error({ connId: conn.id, err }, 'onMessage handler threw'); }); }); // ----------------------------------------------------------------------- // Pong handler — update liveness timestamp // ----------------------------------------------------------------------- ws.on('pong', () => { conn.lastSeenAt = new Date(); }); // ----------------------------------------------------------------------- // Close handler // ----------------------------------------------------------------------- ws.on('close', (code, reason) => { connections.delete(conn.id); metrics.observe('processor_live_connections', connections.size); logger.debug( { connId: conn.id, code, reason: reason.toString('utf8'), }, 'connection closed', ); if (onClose) onClose(conn); }); // ----------------------------------------------------------------------- // Error handler — prevent uncaught-exception crash on socket errors // ----------------------------------------------------------------------- ws.on('error', (err) => { logger.debug({ connId: conn.id, err }, 'connection error'); }); }); // ------------------------------------------------------------------------- // Heartbeat interval // ------------------------------------------------------------------------- let pingTimer: ReturnType | null = null; function startHeartbeat(): void { pingTimer = setInterval(() => { for (const conn of connections.values()) { if (conn.ws.readyState !== WebSocket.OPEN) continue; conn.ws.ping(); } }, config.LIVE_WS_PING_INTERVAL_MS); // Do not hold the event loop open just for heartbeats during shutdown. pingTimer.unref(); } // ------------------------------------------------------------------------- // Lifecycle // ------------------------------------------------------------------------- async function start(): Promise { logger.info( { host: config.LIVE_WS_HOST, port: config.LIVE_WS_PORT }, 'live server starting', ); await new Promise((resolve, reject) => { httpServer.once('error', reject); httpServer.listen(config.LIVE_WS_PORT, config.LIVE_WS_HOST, () => { httpServer.off('error', reject); resolve(); }); }); startHeartbeat(); logger.info( { host: config.LIVE_WS_HOST, port: config.LIVE_WS_PORT }, 'live server ready', ); } async function stop(timeoutMs = config.LIVE_WS_DRAIN_TIMEOUT_MS): Promise { logger.info('live server stopping'); // Stop heartbeat interval. if (pingTimer !== null) { clearInterval(pingTimer); pingTimer = null; } // Stop accepting new connections. httpServer.close(); // Send close frame to every open connection. for (const conn of connections.values()) { conn.ws.close(WsCloseCodes.GOING_AWAY, 'server shutting down'); } // Wait for connections to drain, up to the timeout. const deadline = Date.now() + timeoutMs; while (connections.size > 0 && Date.now() < deadline) { await new Promise((resolve) => setTimeout(resolve, 50)); } // Force-terminate any stragglers (e.g. client with slow TCP stack). for (const conn of connections.values()) { conn.ws.terminate(); } logger.info('live server stopped'); } return { start, stop }; }