# Task 1.5.1 — WS server scaffold + heartbeat **Phase:** 1.5 — Live broadcast **Status:** ⬜ Not started **Depends on:** 1.8 (main wiring), 1.9 (observability) **Wiki refs:** `docs/wiki/synthesis/processor-ws-contract.md` §Endpoint, §Transport; `docs/wiki/concepts/live-channel-architecture.md` ## Goal Stand up a WebSocket server inside the Processor process: bind to a configurable port, accept upgrades, dispatch incoming messages to a router, send 30s pings, hold a typed `LiveConnection` per client. **No auth, no subscriptions yet** — those land in 1.5.2 and 1.5.3. This task is the lifecycle and message-loop skeleton. The WS server runs on its own HTTP server (separate from the Phase 1 metrics/health server on `:9090`) so the reverse proxy can route them to different paths and the failure modes don't entangle. ## Deliverables - `src/live/server.ts` exporting: - `createLiveServer(config, logger, metrics): LiveServer` — factory. - `LiveServer` interface: `start(): Promise` (binds and listens), `stop(timeoutMs?: number): Promise` (closes new connections, sends a close frame to existing, waits for them to drain or force-closes after the timeout). - `type LiveConnection = { id: string; ws: WebSocket; remoteAddr: string; openedAt: Date; lastSeenAt: Date }` — opaque identity, augmented in later tasks. - A pluggable `onMessage(conn: LiveConnection, raw: string): Promise` handler — for now, just logs at `debug` and replies with `{ type: 'error', code: 'not-implemented' }`. Tasks 1.5.2 and 1.5.3 attach the real handler. - `src/live/protocol.ts` — zod schemas for inbound message envelopes: ```ts const InboundMessage = z.discriminatedUnion('type', [ z.object({ type: z.literal('subscribe'), topic: z.string(), id: z.string().optional() }), z.object({ type: z.literal('unsubscribe'), topic: z.string(), id: z.string().optional() }), ]); ``` Outbound types declared but not constructed yet (`subscribed`/`position`/`unsubscribed`/`error`). - `src/main.ts` updated to create + start the live server alongside the existing consumer; SIGTERM stops both in the right order (live server first so no new connections during drain; consumer second so the durable-write path completes its in-flight batch). - New config keys (zod schema in `src/config/load.ts`): - `LIVE_WS_PORT` (default `8081`). - `LIVE_WS_HOST` (default `0.0.0.0`). - `LIVE_WS_PING_INTERVAL_MS` (default `30_000`). - `LIVE_WS_DRAIN_TIMEOUT_MS` (default `5_000`). - New Prometheus metrics (in `src/observability/metrics.ts`): - `processor_live_connections{instance_id}` (gauge) — current open connections. - `processor_live_messages_inbound_total{instance_id, type}` (counter). - `processor_live_messages_outbound_total{instance_id, type}` (counter). - `test/live-server.test.ts`: - Server starts on a random port, accepts a connection, ping is sent within `PING_INTERVAL_MS + 100ms`, pong updates `lastSeenAt`. - Inbound message that fails zod validation receives an `{ type: 'error', code: 'protocol-violation' }` reply and the connection stays open. - `stop()` sends a close frame to existing connections and resolves within the drain timeout. ## Specification ### Library choice `ws` (the package, not `@types/ws` alone). Lightweight, minimal API, supports `noServer: true` for attaching to an existing `http.createServer`. Avoid `uWebSockets.js` — performance is great but the C++ binding makes deployment / testcontainers-friendliness fiddly. ### Server attach pattern ```ts const httpServer = http.createServer((req, res) => { // Optional: a small /healthz endpoint specific to the live server, separate // from the Phase 1 metrics/health server. For now, return 404 on HTTP requests // — the only thing this server does is upgrade. res.writeHead(404).end(); }); const wss = new WebSocketServer({ noServer: true }); httpServer.on('upgrade', (req, socket, head) => { // Auth happens here in task 1.5.2. For now, just accept. wss.handleUpgrade(req, socket, head, (ws) => { wss.emit('connection', ws, req); }); }); wss.on('connection', (ws, req) => { const conn: LiveConnection = { id: nanoid(), ws, remoteAddr: req.socket.remoteAddress ?? 'unknown', openedAt: new Date(), lastSeenAt: new Date(), }; // ... attach handlers }); ``` ### Heartbeat Use the `ws` library's built-in ping/pong: ```ts const pingTimer = setInterval(() => { for (const conn of connections.values()) { if (conn.ws.readyState !== WebSocket.OPEN) continue; conn.ws.ping(); // Optional: track outstanding pings; close if pong doesn't arrive in N seconds. } }, config.LIVE_WS_PING_INTERVAL_MS); ``` `ws` automatically responds to inbound pings with pongs, and emits `'pong'` on the server when a client responds. Update `lastSeenAt` in the pong handler. **Don't roll your own ping in the application protocol** — the WebSocket frame-level ping/pong is faster, browser-built-in, and doesn't pollute the message log. ### Inbound message handling ```ts ws.on('message', async (data) => { conn.lastSeenAt = new Date(); const raw = data.toString('utf8'); let parsed; try { parsed = InboundMessage.parse(JSON.parse(raw)); } catch (err) { metrics.liveMessagesInbound.inc({ type: 'invalid' }); sendOutbound(conn, { type: 'error', code: 'protocol-violation', message: 'Invalid message envelope' }); return; } metrics.liveMessagesInbound.inc({ type: parsed.type }); await onMessage(conn, parsed); }); ``` `onMessage` is the pluggable handler that 1.5.2 (auth gate) and 1.5.3 (subscription registry) replace. ### Outbound message helper ```ts function sendOutbound(conn: LiveConnection, msg: OutboundMessage): void { if (conn.ws.readyState !== WebSocket.OPEN) return; conn.ws.send(JSON.stringify(msg)); metrics.liveMessagesOutbound.inc({ type: msg.type }); } ``` Centralised so back-pressure handling (1.5.4) and message logging hook in one place later. ### Close codes `ws` lets you specify close codes when calling `ws.close(code, reason)`. Reserve these: | Code | Meaning | Where set | |---|---|---| | `1000` | Normal closure | Default for clean disconnect | | `1001` | Server going away | `stop()` during shutdown | | `4401` | Unauthorized | Task 1.5.2 | | `4403` | Forbidden | Task 1.5.3 (for revoked authorization, not used in pilot) | Document these in `protocol.ts` as constants. ### Drain on `stop()` ```ts async function stop(timeoutMs = config.LIVE_WS_DRAIN_TIMEOUT_MS) { // 1. Stop accepting new connections. httpServer.close(); // 2. Send close frame to every open connection. for (const conn of connections.values()) { conn.ws.close(1001, 'server shutting down'); } // 3. Wait for them to finish, with timeout. const deadline = Date.now() + timeoutMs; while (connections.size > 0 && Date.now() < deadline) { await sleep(50); } // 4. Force-terminate any stragglers. for (const conn of connections.values()) { conn.ws.terminate(); } } ``` Stragglers happen when a client's TCP stack is slow or the network is partitioned. Force-terminate is the right call — we're shutting down anyway. ### Logger conventions - `info`: `live server starting on :8081`, `live server ready`, `live server stopping`, `live server stopped`. - `debug`: `connection opened id=... remote=...`, `connection closed id=... code=... reason=...`, `inbound message id=... type=...`. - `trace`: per-message routing detail. Don't log full WS payloads by default — they may contain large snapshot arrays in later tasks. ## Acceptance criteria - [ ] `pnpm typecheck`, `pnpm lint`, `pnpm test` clean. - [ ] `pnpm dev` boots, logs both the consumer lifecycle (Phase 1) and the live server lifecycle. - [ ] `wscat -c ws://localhost:8081` connects; sending a malformed JSON gets the `protocol-violation` error; sending `{"type":"subscribe","topic":"foo"}` gets `{"type":"error","code":"not-implemented"}`. - [ ] Server pings within `LIVE_WS_PING_INTERVAL_MS` of connect; client pong updates `lastSeenAt`. - [ ] `kill -TERM ` exits cleanly within `LIVE_WS_DRAIN_TIMEOUT_MS + 1s`. - [ ] `processor_live_connections` gauge moves up on connect, down on disconnect. ## Risks / open questions - **Port conflict with metrics server.** Phase 1 binds `:9090` for metrics. Live server defaults to `:8081`. Both can be host-published or only the live one (metrics is internal-only). Document in `compose.yaml` updates that follow. - **Reverse-proxy upgrade path.** Traefik / Caddy / nginx all support WS upgrade transparently if the path is configured for it. The proxy config lives in `trm/deploy`; this task doesn't touch it but the README's manual smoke test requires it for end-to-end. - **Per-connection memory.** Each `LiveConnection` is small (~200 bytes plus the `ws` library's internal state). At 100 concurrent connections that's tens of KB. Not a concern at pilot scale. ## Done Landed in `b8ebbd0`. Key deviations from spec: - Used `crypto.randomUUID()` (Node 22 built-in) instead of `nanoid` — avoids adding a new npm dep beyond `ws`. - `Metrics` moved to `src/shared/types.ts` (re-exported from `src/core/types.ts`) so `src/live/server.ts` can import it without violating the ESLint `import/no-restricted-paths` rule. - `processor_live_connections` gauge and `processor_live_subscriptions` gauge are driven via `metrics.observe()` (which calls prom-client `.set()`) rather than `inc`/`dec` because the shared `Metrics` interface has no `dec` method.