/** * WebSocket live-broadcast server. * * Lifecycle: `createLiveServer()` → `server.start()` → clients connect → * `server.stop()` drains existing connections and closes cleanly. * * Design notes: * - Runs on its own http.Server (separate from the Phase 1 metrics/health server * on :9090) so a proxy can route to different paths and failure modes don't * entangle. * - Auth runs in the `'upgrade'` handler: validate the cookie via Directus before * completing the WS upgrade. Rejected upgrades get an HTTP 401 response. * - Message dispatch is pluggable via the `onMessage` callback so task 1.5.3 * can attach the real subscription-registry handler. * - Heartbeat: WS frame-level ping every LIVE_WS_PING_INTERVAL_MS; pong updates * lastSeenAt. Do NOT use application-level ping messages — browser WS * implementations handle frame-level pings natively. * - cookieHeader is stored on the connection so the authz client (task 1.5.3) * can forward it to Directus for per-event authorization. It is sensitive * material; never log it. */ import * as http from 'node:http'; import * as crypto from 'node:crypto'; import { WebSocketServer, WebSocket } from 'ws'; import type { Logger } from 'pino'; import type { Config } from '../config/load.js'; import type { Metrics } from '../shared/types.js'; import { InboundMessage, WsCloseCodes } from './protocol.js'; import type { OutboundMessage } from './protocol.js'; import type { AuthClient, AuthenticatedUser } from './auth.js'; // --------------------------------------------------------------------------- // Public types // --------------------------------------------------------------------------- /** * Per-connection identity object. Holds the validated user identity and the * original cookie header (needed for per-subscription authorization in 1.5.3). * * `cookieHeader` is sensitive — never log it. */ export type LiveConnection = { readonly id: string; readonly ws: WebSocket; readonly remoteAddr: string; readonly openedAt: Date; lastSeenAt: Date; readonly user: AuthenticatedUser; /** The raw Cookie: header from the upgrade request. Used by the authz client * to forward the user's session when checking event access. */ readonly cookieHeader: string; }; /** * Message handler callback. The server calls this once per successfully parsed * inbound message. The handler is responsible for sending replies. * * Task 1.5.3 replaces the stub with the real subscription-registry handler. */ export type MessageHandler = ( conn: LiveConnection, message: InboundMessage, ) => Promise; /** * Lifecycle handle returned by `createLiveServer`. */ export type LiveServer = { /** * Binds the HTTP server and begins accepting WebSocket upgrades. * Resolves once the port is bound and listening. */ readonly start: () => Promise; /** * Sends a close frame to every open connection, stops accepting new ones, * and waits for all connections to drain (or force-terminates after * `timeoutMs` milliseconds). */ readonly stop: (timeoutMs?: number) => Promise; }; // --------------------------------------------------------------------------- // sendOutbound helper // --------------------------------------------------------------------------- /** * Serialises and sends an outbound message to a single connection. * * - Returns immediately if the connection is not OPEN (avoids `send` errors * on closing/closed sockets). * - Checks `bufferedAmount` against the configured back-pressure threshold; * closes the connection with 1008 if the client is too slow to drain its * queue. This prevents one slow client from consuming unbounded server memory. * - Increments the outbound message counter metric. */ export function sendOutbound( conn: LiveConnection, msg: OutboundMessage, metrics: Metrics, backpressureThresholdBytes: number, ): void { if (conn.ws.readyState !== WebSocket.OPEN) return; if (conn.ws.bufferedAmount > backpressureThresholdBytes) { // Client's send queue is backed up. Closing is preferable to silently // dropping messages, because it forces a reconnect and a fresh snapshot // (which is always more valuable than a stale backlog). conn.ws.close( WsCloseCodes.POLICY_VIOLATION, 'back-pressure threshold exceeded', ); return; } conn.ws.send(JSON.stringify(msg)); metrics.inc('processor_live_messages_outbound_total', { type: msg.type }); } // --------------------------------------------------------------------------- // Factory // --------------------------------------------------------------------------- export function createLiveServer( config: Config, logger: Logger, metrics: Metrics, onMessage: MessageHandler, onClose?: (conn: LiveConnection) => void, authClient?: AuthClient, ): LiveServer { const connections = new Map(); const httpServer = http.createServer((_req, res) => { // This HTTP server only handles WS upgrades. HTTP requests get a 404. res.writeHead(404).end(); }); const wss = new WebSocketServer({ noServer: true }); // ------------------------------------------------------------------------- // Upgrade handler — validates auth before completing the WS handshake // ------------------------------------------------------------------------- httpServer.on('upgrade', (req, socket, head) => { const cookieHeader = req.headers['cookie'] ?? ''; if (!authClient) { // No auth client provided — accept the upgrade without validation. // Used in tests that don't need auth. wss.handleUpgrade(req, socket, head, (ws) => { wss.emit('connection', ws, req, '', { id: 'anonymous', email: null, role: null, first_name: null, last_name: null } satisfies AuthenticatedUser); }); return; } // Validate the cookie asynchronously. The upgrade handler must not hold // the socket open for too long — the auth timeout (5s default) is the // upper bound. authClient.validate(cookieHeader).then((user) => { if (!user) { socket.write( 'HTTP/1.1 401 Unauthorized\r\n' + 'Content-Length: 0\r\n' + 'Connection: close\r\n' + '\r\n', ); socket.destroy(); return; } // Stash user + cookieHeader on the request so the connection handler // can pick them up without a second async call. (req as http.IncomingMessage & { _liveUser: AuthenticatedUser; _liveCookie: string })._liveUser = user; (req as http.IncomingMessage & { _liveUser: AuthenticatedUser; _liveCookie: string })._liveCookie = cookieHeader; wss.handleUpgrade(req, socket, head, (ws) => { wss.emit('connection', ws, req); }); }).catch((err: unknown) => { logger.error({ err }, 'auth validation threw unexpectedly during upgrade'); socket.write( 'HTTP/1.1 500 Internal Server Error\r\n' + 'Content-Length: 0\r\n' + 'Connection: close\r\n' + '\r\n', ); socket.destroy(); }); }); // ------------------------------------------------------------------------- // Connection handler // ------------------------------------------------------------------------- wss.on('connection', (ws, req: http.IncomingMessage) => { // Retrieve the user stashed by the upgrade handler. When auth is disabled // (no authClient), fall back to a placeholder anonymous user. type AugmentedRequest = http.IncomingMessage & { _liveUser?: AuthenticatedUser; _liveCookie?: string; }; const augmented = req as AugmentedRequest; const user: AuthenticatedUser = augmented._liveUser ?? { id: crypto.randomUUID(), email: null, role: null, first_name: null, last_name: null, }; const cookieHeader = augmented._liveCookie ?? ''; const conn: LiveConnection = { id: crypto.randomUUID(), ws, remoteAddr: req.socket.remoteAddress ?? 'unknown', openedAt: new Date(), lastSeenAt: new Date(), user, cookieHeader, }; connections.set(conn.id, conn); metrics.observe('processor_live_connections', connections.size); logger.debug( { connId: conn.id, remote: conn.remoteAddr, userId: user.id }, 'connection opened', ); // ----------------------------------------------------------------------- // Inbound message handler // ----------------------------------------------------------------------- ws.on('message', (data) => { conn.lastSeenAt = new Date(); const raw = data.toString('utf8'); let parsed: InboundMessage; try { parsed = InboundMessage.parse(JSON.parse(raw)); } catch { metrics.inc('processor_live_messages_inbound_total', { type: 'invalid', instance_id: config.INSTANCE_ID, }); sendOutbound( conn, { type: 'error', code: 'protocol-violation', message: 'Invalid message envelope', }, metrics, config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES, ); return; } metrics.inc('processor_live_messages_inbound_total', { type: parsed.type, instance_id: config.INSTANCE_ID, }); logger.debug( { connId: conn.id, type: parsed.type }, 'inbound message', ); onMessage(conn, parsed).catch((err: unknown) => { logger.error({ connId: conn.id, err }, 'onMessage handler threw'); }); }); // ----------------------------------------------------------------------- // Pong handler — update liveness timestamp // ----------------------------------------------------------------------- ws.on('pong', () => { conn.lastSeenAt = new Date(); }); // ----------------------------------------------------------------------- // Close handler // ----------------------------------------------------------------------- ws.on('close', (code, reason) => { connections.delete(conn.id); metrics.observe('processor_live_connections', connections.size); logger.debug( { connId: conn.id, code, reason: reason.toString('utf8'), }, 'connection closed', ); if (onClose) onClose(conn); }); // ----------------------------------------------------------------------- // Error handler — prevent uncaught-exception crash on socket errors // ----------------------------------------------------------------------- ws.on('error', (err) => { logger.debug({ connId: conn.id, err }, 'connection error'); }); }); // ------------------------------------------------------------------------- // Heartbeat interval // ------------------------------------------------------------------------- let pingTimer: ReturnType | null = null; function startHeartbeat(): void { pingTimer = setInterval(() => { for (const conn of connections.values()) { if (conn.ws.readyState !== WebSocket.OPEN) continue; conn.ws.ping(); } }, config.LIVE_WS_PING_INTERVAL_MS); // Do not hold the event loop open just for heartbeats during shutdown. pingTimer.unref(); } // ------------------------------------------------------------------------- // Lifecycle // ------------------------------------------------------------------------- async function start(): Promise { logger.info( { host: config.LIVE_WS_HOST, port: config.LIVE_WS_PORT }, 'live server starting', ); await new Promise((resolve, reject) => { httpServer.once('error', reject); httpServer.listen(config.LIVE_WS_PORT, config.LIVE_WS_HOST, () => { httpServer.off('error', reject); resolve(); }); }); startHeartbeat(); logger.info( { host: config.LIVE_WS_HOST, port: config.LIVE_WS_PORT }, 'live server ready', ); } async function stop(timeoutMs = config.LIVE_WS_DRAIN_TIMEOUT_MS): Promise { logger.info('live server stopping'); // Stop heartbeat interval. if (pingTimer !== null) { clearInterval(pingTimer); pingTimer = null; } // Stop accepting new connections. httpServer.close(); // Send close frame to every open connection. for (const conn of connections.values()) { conn.ws.close(WsCloseCodes.GOING_AWAY, 'server shutting down'); } // Wait for connections to drain, up to the timeout. const deadline = Date.now() + timeoutMs; while (connections.size > 0 && Date.now() < deadline) { await new Promise((resolve) => setTimeout(resolve, 50)); } // Force-terminate any stragglers (e.g. client with slow TCP stack). for (const conn of connections.values()) { conn.ws.terminate(); } logger.info('live server stopped'); } return { start, stop }; }