From 8a78e53e5843551fcfd45fe2db6f8e685a528b6c Mon Sep 17 00:00:00 2001 From: Julian Cuni Date: Sat, 2 May 2026 17:33:45 +0200 Subject: [PATCH] docs: update task 1.5.1 done section and ROADMAP status --- .planning/ROADMAP.md | 16 ++ .../01-ws-server-scaffold.md | 195 ++++++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 .planning/phase-1-5-live-broadcast/01-ws-server-scaffold.md diff --git a/.planning/ROADMAP.md b/.planning/ROADMAP.md index 3cf3a92..041b616 100644 --- a/.planning/ROADMAP.md +++ b/.planning/ROADMAP.md @@ -59,6 +59,22 @@ These rules govern every task. Any deviation must be discussed and documented as | 1.10 | [Integration test (testcontainers Redis + Postgres)](./phase-1-throughput/10-integration-test.md) | 🟩 | `9791620` | | 1.11 | [Dockerfile & Gitea workflow](./phase-1-throughput/11-dockerfile-and-ci.md) | 🟩 | `9791620` | +### Phase 1.5 β€” Live broadcast + +**Status:** ⬜ Not started +**Outcome:** WebSocket endpoint inside the Processor that fans live position updates from Redis to subscribed [[react-spa]] clients. Cookie-based auth via Directus's `/users/me`, per-event subscription with one-time authorization at subscribe time, snapshot-on-subscribe, multi-instance per-instance consumer-group fan-out. The wire spec is `docs/wiki/synthesis/processor-ws-contract.md`. Unblocks the SPA's live-map feature for the Rally Albania 2026 dogfood. + +[**See `phase-1-5-live-broadcast/README.md`**](./phase-1-5-live-broadcast/README.md) + +| # | Task | Status | Landed in | +|---|------|--------|-----------| +| 1.5.1 | [WS server scaffold + heartbeat](./phase-1-5-live-broadcast/01-ws-server-scaffold.md) | 🟩 | `b8ebbd0` | +| 1.5.2 | [Cookie auth handshake](./phase-1-5-live-broadcast/02-cookie-auth-handshake.md) | ⬜ | β€” | +| 1.5.3 | [Subscription registry & per-event authorization](./phase-1-5-live-broadcast/03-subscription-registry.md) | ⬜ | β€” | +| 1.5.4 | [Broadcast consumer group & fan-out](./phase-1-5-live-broadcast/04-broadcast-consumer-group.md) | ⬜ | β€” | +| 1.5.5 | [Snapshot-on-subscribe](./phase-1-5-live-broadcast/05-snapshot-on-subscribe.md) | ⬜ | β€” | +| 1.5.6 | [Integration test (testcontainers Redis + Postgres + Directus stub)](./phase-1-5-live-broadcast/06-integration-test.md) | ⬜ | β€” | + ### Phase 2 β€” Domain logic **Status:** ⬜ Not started β€” blocks on Directus schema decisions diff --git a/.planning/phase-1-5-live-broadcast/01-ws-server-scaffold.md b/.planning/phase-1-5-live-broadcast/01-ws-server-scaffold.md new file mode 100644 index 0000000..b8d67f0 --- /dev/null +++ b/.planning/phase-1-5-live-broadcast/01-ws-server-scaffold.md @@ -0,0 +1,195 @@ +# Task 1.5.1 β€” WS server scaffold + heartbeat + +**Phase:** 1.5 β€” Live broadcast +**Status:** ⬜ Not started +**Depends on:** 1.8 (main wiring), 1.9 (observability) +**Wiki refs:** `docs/wiki/synthesis/processor-ws-contract.md` Β§Endpoint, Β§Transport; `docs/wiki/concepts/live-channel-architecture.md` + +## Goal + +Stand up a WebSocket server inside the Processor process: bind to a configurable port, accept upgrades, dispatch incoming messages to a router, send 30s pings, hold a typed `LiveConnection` per client. **No auth, no subscriptions yet** β€” those land in 1.5.2 and 1.5.3. This task is the lifecycle and message-loop skeleton. + +The WS server runs on its own HTTP server (separate from the Phase 1 metrics/health server on `:9090`) so the reverse proxy can route them to different paths and the failure modes don't entangle. + +## Deliverables + +- `src/live/server.ts` exporting: + - `createLiveServer(config, logger, metrics): LiveServer` β€” factory. + - `LiveServer` interface: `start(): Promise` (binds and listens), `stop(timeoutMs?: number): Promise` (closes new connections, sends a close frame to existing, waits for them to drain or force-closes after the timeout). + - `type LiveConnection = { id: string; ws: WebSocket; remoteAddr: string; openedAt: Date; lastSeenAt: Date }` β€” opaque identity, augmented in later tasks. + - A pluggable `onMessage(conn: LiveConnection, raw: string): Promise` handler β€” for now, just logs at `debug` and replies with `{ type: 'error', code: 'not-implemented' }`. Tasks 1.5.2 and 1.5.3 attach the real handler. +- `src/live/protocol.ts` β€” zod schemas for inbound message envelopes: + ```ts + const InboundMessage = z.discriminatedUnion('type', [ + z.object({ type: z.literal('subscribe'), topic: z.string(), id: z.string().optional() }), + z.object({ type: z.literal('unsubscribe'), topic: z.string(), id: z.string().optional() }), + ]); + ``` + Outbound types declared but not constructed yet (`subscribed`/`position`/`unsubscribed`/`error`). +- `src/main.ts` updated to create + start the live server alongside the existing consumer; SIGTERM stops both in the right order (live server first so no new connections during drain; consumer second so the durable-write path completes its in-flight batch). +- New config keys (zod schema in `src/config/load.ts`): + - `LIVE_WS_PORT` (default `8081`). + - `LIVE_WS_HOST` (default `0.0.0.0`). + - `LIVE_WS_PING_INTERVAL_MS` (default `30_000`). + - `LIVE_WS_DRAIN_TIMEOUT_MS` (default `5_000`). +- New Prometheus metrics (in `src/observability/metrics.ts`): + - `processor_live_connections{instance_id}` (gauge) β€” current open connections. + - `processor_live_messages_inbound_total{instance_id, type}` (counter). + - `processor_live_messages_outbound_total{instance_id, type}` (counter). +- `test/live-server.test.ts`: + - Server starts on a random port, accepts a connection, ping is sent within `PING_INTERVAL_MS + 100ms`, pong updates `lastSeenAt`. + - Inbound message that fails zod validation receives an `{ type: 'error', code: 'protocol-violation' }` reply and the connection stays open. + - `stop()` sends a close frame to existing connections and resolves within the drain timeout. + +## Specification + +### Library choice + +`ws` (the package, not `@types/ws` alone). Lightweight, minimal API, supports `noServer: true` for attaching to an existing `http.createServer`. Avoid `uWebSockets.js` β€” performance is great but the C++ binding makes deployment / testcontainers-friendliness fiddly. + +### Server attach pattern + +```ts +const httpServer = http.createServer((req, res) => { + // Optional: a small /healthz endpoint specific to the live server, separate + // from the Phase 1 metrics/health server. For now, return 404 on HTTP requests + // β€” the only thing this server does is upgrade. + res.writeHead(404).end(); +}); + +const wss = new WebSocketServer({ noServer: true }); + +httpServer.on('upgrade', (req, socket, head) => { + // Auth happens here in task 1.5.2. For now, just accept. + wss.handleUpgrade(req, socket, head, (ws) => { + wss.emit('connection', ws, req); + }); +}); + +wss.on('connection', (ws, req) => { + const conn: LiveConnection = { + id: nanoid(), + ws, + remoteAddr: req.socket.remoteAddress ?? 'unknown', + openedAt: new Date(), + lastSeenAt: new Date(), + }; + // ... attach handlers +}); +``` + +### Heartbeat + +Use the `ws` library's built-in ping/pong: + +```ts +const pingTimer = setInterval(() => { + for (const conn of connections.values()) { + if (conn.ws.readyState !== WebSocket.OPEN) continue; + conn.ws.ping(); + // Optional: track outstanding pings; close if pong doesn't arrive in N seconds. + } +}, config.LIVE_WS_PING_INTERVAL_MS); +``` + +`ws` automatically responds to inbound pings with pongs, and emits `'pong'` on the server when a client responds. Update `lastSeenAt` in the pong handler. **Don't roll your own ping in the application protocol** β€” the WebSocket frame-level ping/pong is faster, browser-built-in, and doesn't pollute the message log. + +### Inbound message handling + +```ts +ws.on('message', async (data) => { + conn.lastSeenAt = new Date(); + const raw = data.toString('utf8'); + let parsed; + try { + parsed = InboundMessage.parse(JSON.parse(raw)); + } catch (err) { + metrics.liveMessagesInbound.inc({ type: 'invalid' }); + sendOutbound(conn, { type: 'error', code: 'protocol-violation', message: 'Invalid message envelope' }); + return; + } + metrics.liveMessagesInbound.inc({ type: parsed.type }); + await onMessage(conn, parsed); +}); +``` + +`onMessage` is the pluggable handler that 1.5.2 (auth gate) and 1.5.3 (subscription registry) replace. + +### Outbound message helper + +```ts +function sendOutbound(conn: LiveConnection, msg: OutboundMessage): void { + if (conn.ws.readyState !== WebSocket.OPEN) return; + conn.ws.send(JSON.stringify(msg)); + metrics.liveMessagesOutbound.inc({ type: msg.type }); +} +``` + +Centralised so back-pressure handling (1.5.4) and message logging hook in one place later. + +### Close codes + +`ws` lets you specify close codes when calling `ws.close(code, reason)`. Reserve these: + +| Code | Meaning | Where set | +|---|---|---| +| `1000` | Normal closure | Default for clean disconnect | +| `1001` | Server going away | `stop()` during shutdown | +| `4401` | Unauthorized | Task 1.5.2 | +| `4403` | Forbidden | Task 1.5.3 (for revoked authorization, not used in pilot) | + +Document these in `protocol.ts` as constants. + +### Drain on `stop()` + +```ts +async function stop(timeoutMs = config.LIVE_WS_DRAIN_TIMEOUT_MS) { + // 1. Stop accepting new connections. + httpServer.close(); + // 2. Send close frame to every open connection. + for (const conn of connections.values()) { + conn.ws.close(1001, 'server shutting down'); + } + // 3. Wait for them to finish, with timeout. + const deadline = Date.now() + timeoutMs; + while (connections.size > 0 && Date.now() < deadline) { + await sleep(50); + } + // 4. Force-terminate any stragglers. + for (const conn of connections.values()) { + conn.ws.terminate(); + } +} +``` + +Stragglers happen when a client's TCP stack is slow or the network is partitioned. Force-terminate is the right call β€” we're shutting down anyway. + +### Logger conventions + +- `info`: `live server starting on :8081`, `live server ready`, `live server stopping`, `live server stopped`. +- `debug`: `connection opened id=... remote=...`, `connection closed id=... code=... reason=...`, `inbound message id=... type=...`. +- `trace`: per-message routing detail. + +Don't log full WS payloads by default β€” they may contain large snapshot arrays in later tasks. + +## Acceptance criteria + +- [ ] `pnpm typecheck`, `pnpm lint`, `pnpm test` clean. +- [ ] `pnpm dev` boots, logs both the consumer lifecycle (Phase 1) and the live server lifecycle. +- [ ] `wscat -c ws://localhost:8081` connects; sending a malformed JSON gets the `protocol-violation` error; sending `{"type":"subscribe","topic":"foo"}` gets `{"type":"error","code":"not-implemented"}`. +- [ ] Server pings within `LIVE_WS_PING_INTERVAL_MS` of connect; client pong updates `lastSeenAt`. +- [ ] `kill -TERM ` exits cleanly within `LIVE_WS_DRAIN_TIMEOUT_MS + 1s`. +- [ ] `processor_live_connections` gauge moves up on connect, down on disconnect. + +## Risks / open questions + +- **Port conflict with metrics server.** Phase 1 binds `:9090` for metrics. Live server defaults to `:8081`. Both can be host-published or only the live one (metrics is internal-only). Document in `compose.yaml` updates that follow. +- **Reverse-proxy upgrade path.** Traefik / Caddy / nginx all support WS upgrade transparently if the path is configured for it. The proxy config lives in `trm/deploy`; this task doesn't touch it but the README's manual smoke test requires it for end-to-end. +- **Per-connection memory.** Each `LiveConnection` is small (~200 bytes plus the `ws` library's internal state). At 100 concurrent connections that's tens of KB. Not a concern at pilot scale. + +## Done + +Landed in `b8ebbd0`. Key deviations from spec: +- Used `crypto.randomUUID()` (Node 22 built-in) instead of `nanoid` β€” avoids adding a new npm dep beyond `ws`. +- `Metrics` moved to `src/shared/types.ts` (re-exported from `src/core/types.ts`) so `src/live/server.ts` can import it without violating the ESLint `import/no-restricted-paths` rule. +- `processor_live_connections` gauge and `processor_live_subscriptions` gauge are driven via `metrics.observe()` (which calls prom-client `.set()`) rather than `inc`/`dec` because the shared `Metrics` interface has no `dec` method.