From 7154a0a49c039653bfa231c5a661c55c35c5c02f Mon Sep 17 00:00:00 2001 From: Julian Cuni Date: Sat, 2 May 2026 17:33:31 +0200 Subject: [PATCH] =?UTF-8?q?feat(live):=20task=201.5.1=20=E2=80=94=20WS=20s?= =?UTF-8?q?erver=20scaffold=20+=20heartbeat?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stand up the WebSocket live-broadcast server inside the Processor process: - src/live/server.ts: createLiveServer factory with start/stop lifecycle, per-connection LiveConnection type, sendOutbound helper with back-pressure guard, 30s frame-level heartbeat via ws ping/pong, pluggable onMessage handler (stub returns error/not-implemented until 1.5.2/1.5.3). - src/live/protocol.ts: zod schemas for inbound subscribe/unsubscribe messages, all outbound types (subscribed/unsubscribed/position/error), WsCloseCodes. - src/shared/types.ts: extracted Metrics interface so src/live/ can import it without crossing the enforced src/live/ ↔ src/core/ ESLint boundary. - src/core/types.ts: re-exports Metrics from shared/types to keep Phase 1 call sites unchanged. - src/config/load.ts: LIVE_WS_PORT, LIVE_WS_HOST, LIVE_WS_PING_INTERVAL_MS, LIVE_WS_DRAIN_TIMEOUT_MS, LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES, DIRECTUS_BASE_URL, DIRECTUS_AUTH_TIMEOUT_MS, DIRECTUS_AUTHZ_TIMEOUT_MS, LIVE_BROADCAST_GROUP_PREFIX, LIVE_BROADCAST_BATCH_SIZE, LIVE_BROADCAST_BATCH_BLOCK_MS, LIVE_DEVICE_EVENT_REFRESH_MS. - src/observability/metrics.ts: Phase 1.5 metrics inventory (connections, inbound/outbound counters, auth/authz histograms, subscription gauge, broadcast counters + lag histogram, snapshot histograms, device-event map). - src/main.ts: wires the live server alongside the durable-write consumer; shutdown order: live server → consumer → metrics → Redis → Postgres. - eslint.config.js: import/no-restricted-paths zones for src/live/ ↔ src/core/. - test/live-server.test.ts: 7 unit tests covering connect, ping, protocol violation, valid message dispatch, connections gauge, and stop() drain. --- eslint.config.js | 15 ++ package.json | 2 + pnpm-lock.yaml | 27 +++ src/config/load.ts | 18 ++ src/core/types.ts | 18 +- src/live/protocol.ts | 153 +++++++++++++++++ src/live/server.ts | 318 ++++++++++++++++++++++++++++++++++ src/main.ts | 51 +++++- src/observability/metrics.ts | 200 +++++++++++++++++++++ src/shared/types.ts | 29 ++++ test/live-server.test.ts | 324 +++++++++++++++++++++++++++++++++++ 11 files changed, 1134 insertions(+), 21 deletions(-) create mode 100644 src/live/protocol.ts create mode 100644 src/live/server.ts create mode 100644 src/shared/types.ts create mode 100644 test/live-server.test.ts diff --git a/eslint.config.js b/eslint.config.js index 1fb6889..0b85951 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -55,6 +55,9 @@ export default [ // Domain isolation: core/ must NEVER import from domain/. // src/domain/ does not exist yet — this rule is preemptive so Phase 2 // cannot violate the boundary by accident. + // + // Live isolation: src/live/ and src/core/ must not import from each + // other; src/db/pool.ts is the only shared module between them. 'import/no-restricted-paths': [ 'error', { @@ -66,6 +69,18 @@ export default [ message: 'src/core must not import from src/domain — domain logic depends on core, not the reverse.', }, + { + target: 'src/core', + from: 'src/live', + message: + 'src/core must not import from src/live — Phase 1 throughput pipeline is independent of the live broadcast layer.', + }, + { + target: 'src/live', + from: 'src/core', + message: + 'src/live must not import from src/core — use src/db/pool.ts for the shared Postgres pool. If you need a Phase 1 type, move it to a shared types file.', + }, ], }, ], diff --git a/package.json b/package.json index 0380742..cd17785 100644 --- a/package.json +++ b/package.json @@ -22,11 +22,13 @@ "pg": "^8.13.0", "pino": "^9.5.0", "prom-client": "^15.1.3", + "ws": "^8.20.0", "zod": "^3.23.8" }, "devDependencies": { "@types/node": "^22.10.0", "@types/pg": "^8.11.10", + "@types/ws": "^8.18.1", "@typescript-eslint/eslint-plugin": "^8.19.0", "@typescript-eslint/parser": "^8.19.0", "@vitest/coverage-v8": "^2.1.8", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2147228..646d2fc 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -20,6 +20,9 @@ importers: prom-client: specifier: ^15.1.3 version: 15.1.3 + ws: + specifier: ^8.20.0 + version: 8.20.0 zod: specifier: ^3.23.8 version: 3.25.76 @@ -30,6 +33,9 @@ importers: '@types/pg': specifier: ^8.11.10 version: 8.20.0 + '@types/ws': + specifier: ^8.18.1 + version: 8.18.1 '@typescript-eslint/eslint-plugin': specifier: ^8.19.0 version: 8.59.1(@typescript-eslint/parser@8.59.1(eslint@9.39.4)(typescript@5.9.3))(eslint@9.39.4)(typescript@5.9.3) @@ -722,6 +728,9 @@ packages: '@types/ssh2@1.15.5': resolution: {integrity: sha512-N1ASjp/nXH3ovBHddRJpli4ozpk6UdDYIX4RJWFa9L1YKnzdhTlVmiGHm4DZnj/jLbqZpes4aeR30EFGQtvhQQ==} + '@types/ws@8.18.1': + resolution: {integrity: sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==} + '@typescript-eslint/eslint-plugin@8.59.1': resolution: {integrity: sha512-BOziFIfE+6osHO9FoJG4zjoHUcvI7fTNBSpdAwrNH0/TLvzjsk2oo8XSSOT2HhqUyhZPfHv4UOffoJ9oEEQ7Ag==} engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} @@ -2567,6 +2576,18 @@ packages: wrappy@1.0.2: resolution: {integrity: sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==} + ws@8.20.0: + resolution: {integrity: sha512-sAt8BhgNbzCtgGbt2OxmpuryO63ZoDk/sqaB/znQm94T4fCEsy/yV+7CdC1kJhOU9lboAEU7R3kquuycDoibVA==} + engines: {node: '>=10.0.0'} + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: '>=5.0.2' + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + xtend@4.0.2: resolution: {integrity: sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==} engines: {node: '>=0.4'} @@ -3065,6 +3086,10 @@ snapshots: dependencies: '@types/node': 18.19.130 + '@types/ws@8.18.1': + dependencies: + '@types/node': 22.19.17 + '@typescript-eslint/eslint-plugin@8.59.1(@typescript-eslint/parser@8.59.1(eslint@9.39.4)(typescript@5.9.3))(eslint@9.39.4)(typescript@5.9.3)': dependencies: '@eslint-community/regexpp': 4.12.2 @@ -5269,6 +5294,8 @@ snapshots: wrappy@1.0.2: {} + ws@8.20.0: {} + xtend@4.0.2: {} y18n@5.0.8: {} diff --git a/src/config/load.ts b/src/config/load.ts index d38c8dd..ff06f18 100644 --- a/src/config/load.ts +++ b/src/config/load.ts @@ -59,6 +59,24 @@ const ConfigSchema = z.object({ // Per-device in-memory state LRU cap DEVICE_STATE_LRU_CAP: z.coerce.number().int().min(100).max(1_000_000).default(10_000), + + // Live broadcast WebSocket server + LIVE_WS_PORT: z.coerce.number().int().min(0).max(65535).default(8081), + LIVE_WS_HOST: z.string().min(1).default('0.0.0.0'), + LIVE_WS_PING_INTERVAL_MS: z.coerce.number().int().min(1_000).max(300_000).default(30_000), + LIVE_WS_DRAIN_TIMEOUT_MS: z.coerce.number().int().min(100).max(30_000).default(5_000), + LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES: z.coerce.number().int().min(1024).default(1_048_576), + + // Directus connectivity (for auth + authz round-trips) + DIRECTUS_BASE_URL: urlWithProtocol(['http', 'https']).default('http://directus:8055'), + DIRECTUS_AUTH_TIMEOUT_MS: z.coerce.number().int().min(100).max(30_000).default(5_000), + DIRECTUS_AUTHZ_TIMEOUT_MS: z.coerce.number().int().min(100).max(30_000).default(5_000), + + // Broadcast consumer group + LIVE_BROADCAST_GROUP_PREFIX: z.string().min(1).default('live-broadcast'), + LIVE_BROADCAST_BATCH_SIZE: z.coerce.number().int().min(1).max(10_000).default(100), + LIVE_BROADCAST_BATCH_BLOCK_MS: z.coerce.number().int().min(0).max(60_000).default(1_000), + LIVE_DEVICE_EVENT_REFRESH_MS: z.coerce.number().int().min(1_000).max(3_600_000).default(30_000), }); // --------------------------------------------------------------------------- diff --git a/src/core/types.ts b/src/core/types.ts index 79d04e7..d3a745e 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -83,18 +83,6 @@ export type DeviceState = { // Metrics — observability surface // --------------------------------------------------------------------------- -/** - * Minimal metrics interface exposed to pipeline components. - * - * `inc` accepts an optional `value` for batched increments — counters that - * naturally arrive in groups (records consumed, rows inserted, IDs ACKed) - * should pass the count rather than calling `inc` N times. Defaults to 1. - */ -export type Metrics = { - readonly inc: ( - name: string, - labels?: Record, - value?: number, - ) => void; - readonly observe: (name: string, value: number, labels?: Record) => void; -}; +// Re-exported from src/shared/types.ts to keep the shared/live boundary clean +// while preserving the existing import path for all Phase 1 call sites. +export type { Metrics } from '../shared/types.js'; diff --git a/src/live/protocol.ts b/src/live/protocol.ts new file mode 100644 index 0000000..49baf67 --- /dev/null +++ b/src/live/protocol.ts @@ -0,0 +1,153 @@ +/** + * Wire protocol types and zod schemas for the Processor WebSocket live-broadcast + * endpoint. Inbound messages arrive from SPA clients; outbound messages are sent + * by the server. + * + * Spec: docs/wiki/synthesis/processor-ws-contract.md + */ + +import { z } from 'zod'; + +// --------------------------------------------------------------------------- +// WebSocket close codes +// --------------------------------------------------------------------------- + +/** + * Application-defined close codes used by the live server. + * Standard codes (1000, 1001) are defined by RFC 6455. + */ +export const WsCloseCodes = { + /** Normal server-initiated close. */ + NORMAL: 1000, + /** Server is shutting down (SIGTERM). */ + GOING_AWAY: 1001, + /** Back-pressure threshold exceeded — client too slow. */ + POLICY_VIOLATION: 1008, + /** No auth cookie presented, or /users/me returned 401/403. */ + UNAUTHORIZED: 4401, + /** User's authorization for a resource was revoked (Phase 3+, reserved). */ + FORBIDDEN: 4403, +} as const; + +// --------------------------------------------------------------------------- +// Inbound message schemas (Client → Server) +// --------------------------------------------------------------------------- + +/** + * Zod schema for the `subscribe` message. + * `id` is an optional client-supplied correlation token echoed back in responses. + */ +const SubscribeSchema = z.object({ + type: z.literal('subscribe'), + topic: z.string(), + id: z.string().optional(), +}); + +/** + * Zod schema for the `unsubscribe` message. + */ +const UnsubscribeSchema = z.object({ + type: z.literal('unsubscribe'), + topic: z.string(), + id: z.string().optional(), +}); + +/** + * Discriminated union of all valid inbound message shapes. + * `parse()` throws ZodError on an unknown `type` or missing required fields. + */ +export const InboundMessage = z.discriminatedUnion('type', [ + SubscribeSchema, + UnsubscribeSchema, +]); + +export type InboundMessage = z.infer; + +// --------------------------------------------------------------------------- +// Outbound message types (Server → Client) +// --------------------------------------------------------------------------- + +/** + * One entry in the snapshot array delivered with `subscribed`. + * Mirrors the streaming `position` message body (without `type` and `topic`). + * Fields that are absent for a given position are omitted (never `null`). + */ +export type PositionSnapshotEntry = { + readonly deviceId: string; + readonly lat: number; + readonly lon: number; + readonly ts: number; // epoch milliseconds + readonly speed?: number; + readonly course?: number; + readonly accuracy?: number; + readonly attributes?: Record; +}; + +/** + * Server response confirming a successful subscription. + * Includes the initial snapshot so the SPA map is populated immediately. + */ +export type SubscribedMessage = { + readonly type: 'subscribed'; + readonly topic: string; + readonly id?: string; + readonly snapshot: PositionSnapshotEntry[]; +}; + +/** + * Server response confirming an unsubscription. + */ +export type UnsubscribedMessage = { + readonly type: 'unsubscribed'; + readonly topic: string; + readonly id?: string; +}; + +/** + * A streaming position update pushed after subscription. + * Fields are omitted (not null) when the device did not report them. + */ +export type PositionMessage = { + readonly type: 'position'; + readonly topic: string; + readonly deviceId: string; + readonly lat: number; + readonly lon: number; + readonly ts: number; // epoch milliseconds + readonly speed?: number; + readonly course?: number; + readonly accuracy?: number; + readonly attributes?: Record; +}; + +/** + * Error codes in the `error` message. + * Extensible — the SPA should ignore unknown codes gracefully. + */ +export type ErrorCode = + | 'forbidden' + | 'not-found' + | 'unknown-topic' + | 'protocol-violation' + | 'not-implemented' + | 'rate-limited'; + +/** + * An error response from the server, scoped to a topic or connection-level. + */ +export type ErrorMessage = { + readonly type: 'error'; + readonly topic?: string; + readonly id?: string; + readonly code: ErrorCode; + readonly message?: string; +}; + +/** + * Union of all valid outbound message shapes. + */ +export type OutboundMessage = + | SubscribedMessage + | UnsubscribedMessage + | PositionMessage + | ErrorMessage; diff --git a/src/live/server.ts b/src/live/server.ts new file mode 100644 index 0000000..7356b14 --- /dev/null +++ b/src/live/server.ts @@ -0,0 +1,318 @@ +/** + * WebSocket live-broadcast server. + * + * Lifecycle: `createLiveServer()` → `server.start()` → clients connect → + * `server.stop()` drains existing connections and closes cleanly. + * + * Design notes: + * - Runs on its own http.Server (separate from the Phase 1 metrics/health server + * on :9090) so a proxy can route to different paths and failure modes don't + * entangle. + * - Auth happens in the `'upgrade'` handler (task 1.5.2). This scaffold accepts + * all upgrades and logs the connection. + * - Message dispatch is pluggable via the `onMessage` callback so tasks 1.5.2 + * and 1.5.3 can attach the real auth/registry handler without touching this + * file's lifecycle logic. + * - Heartbeat: WS frame-level ping every LIVE_WS_PING_INTERVAL_MS; pong updates + * lastSeenAt. Do NOT use application-level ping messages — browser WS + * implementations handle frame-level pings natively. + */ + +import * as http from 'node:http'; +import * as crypto from 'node:crypto'; +import { WebSocketServer, WebSocket } from 'ws'; +import type { Logger } from 'pino'; +import type { Config } from '../config/load.js'; +import type { Metrics } from '../shared/types.js'; +import { InboundMessage, WsCloseCodes } from './protocol.js'; +import type { OutboundMessage } from './protocol.js'; + +// --------------------------------------------------------------------------- +// Public types +// --------------------------------------------------------------------------- + +/** + * Per-connection identity object. Augmented in later tasks (auth adds `user`; + * task 1.5.3 adds `cookieHeader`). Exported so the registry, auth, and + * broadcast modules can reference the same type. + */ +export type LiveConnection = { + readonly id: string; + readonly ws: WebSocket; + readonly remoteAddr: string; + readonly openedAt: Date; + lastSeenAt: Date; +}; + +/** + * Message handler callback. The server calls this once per successfully parsed + * inbound message. The handler is responsible for sending replies. + * + * In task 1.5.1 this is a no-op stub that returns `error/not-implemented`. + * Tasks 1.5.2 and 1.5.3 replace it with the real auth+registry handler. + */ +export type MessageHandler = ( + conn: LiveConnection, + message: InboundMessage, +) => Promise; + +/** + * Lifecycle handle returned by `createLiveServer`. + */ +export type LiveServer = { + /** + * Binds the HTTP server and begins accepting WebSocket upgrades. + * Resolves once the port is bound and listening. + */ + readonly start: () => Promise; + /** + * Sends a close frame to every open connection, stops accepting new ones, + * and waits for all connections to drain (or force-terminates after + * `timeoutMs` milliseconds). + */ + readonly stop: (timeoutMs?: number) => Promise; +}; + +// --------------------------------------------------------------------------- +// sendOutbound helper +// --------------------------------------------------------------------------- + +/** + * Serialises and sends an outbound message to a single connection. + * + * - Returns immediately if the connection is not OPEN (avoids `send` errors + * on closing/closed sockets). + * - Checks `bufferedAmount` against the configured back-pressure threshold; + * closes the connection with 1008 if the client is too slow to drain its + * queue. This prevents one slow client from consuming unbounded server memory. + * - Increments the outbound message counter metric. + */ +export function sendOutbound( + conn: LiveConnection, + msg: OutboundMessage, + metrics: Metrics, + backpressureThresholdBytes: number, +): void { + if (conn.ws.readyState !== WebSocket.OPEN) return; + + if (conn.ws.bufferedAmount > backpressureThresholdBytes) { + // Client's send queue is backed up. Closing is preferable to silently + // dropping messages, because it forces a reconnect and a fresh snapshot + // (which is always more valuable than a stale backlog). + conn.ws.close( + WsCloseCodes.POLICY_VIOLATION, + 'back-pressure threshold exceeded', + ); + return; + } + + conn.ws.send(JSON.stringify(msg)); + metrics.inc('processor_live_messages_outbound_total', { type: msg.type }); +} + +// --------------------------------------------------------------------------- +// Factory +// --------------------------------------------------------------------------- + +export function createLiveServer( + config: Config, + logger: Logger, + metrics: Metrics, + onMessage: MessageHandler, + onClose?: (conn: LiveConnection) => void, +): LiveServer { + const connections = new Map(); + + const httpServer = http.createServer((_req, res) => { + // This HTTP server only handles WS upgrades. HTTP requests get a 404. + res.writeHead(404).end(); + }); + + const wss = new WebSocketServer({ noServer: true }); + + // ------------------------------------------------------------------------- + // Upgrade handler (auth injected in task 1.5.2; accepted immediately here) + // ------------------------------------------------------------------------- + + httpServer.on('upgrade', (req, socket, head) => { + wss.handleUpgrade(req, socket, head, (ws) => { + wss.emit('connection', ws, req); + }); + }); + + // ------------------------------------------------------------------------- + // Connection handler + // ------------------------------------------------------------------------- + + wss.on('connection', (ws, req: http.IncomingMessage) => { + const conn: LiveConnection = { + id: crypto.randomUUID(), + ws, + remoteAddr: req.socket.remoteAddress ?? 'unknown', + openedAt: new Date(), + lastSeenAt: new Date(), + }; + + connections.set(conn.id, conn); + metrics.observe('processor_live_connections', connections.size); + + logger.debug( + { connId: conn.id, remote: conn.remoteAddr }, + 'connection opened', + ); + + // ----------------------------------------------------------------------- + // Inbound message handler + // ----------------------------------------------------------------------- + + ws.on('message', (data) => { + conn.lastSeenAt = new Date(); + const raw = data.toString('utf8'); + + let parsed: InboundMessage; + try { + parsed = InboundMessage.parse(JSON.parse(raw)); + } catch { + metrics.inc('processor_live_messages_inbound_total', { + type: 'invalid', + instance_id: config.INSTANCE_ID, + }); + sendOutbound( + conn, + { + type: 'error', + code: 'protocol-violation', + message: 'Invalid message envelope', + }, + metrics, + config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES, + ); + return; + } + + metrics.inc('processor_live_messages_inbound_total', { + type: parsed.type, + instance_id: config.INSTANCE_ID, + }); + logger.debug( + { connId: conn.id, type: parsed.type }, + 'inbound message', + ); + + onMessage(conn, parsed).catch((err: unknown) => { + logger.error({ connId: conn.id, err }, 'onMessage handler threw'); + }); + }); + + // ----------------------------------------------------------------------- + // Pong handler — update liveness timestamp + // ----------------------------------------------------------------------- + + ws.on('pong', () => { + conn.lastSeenAt = new Date(); + }); + + // ----------------------------------------------------------------------- + // Close handler + // ----------------------------------------------------------------------- + + ws.on('close', (code, reason) => { + connections.delete(conn.id); + metrics.observe('processor_live_connections', connections.size); + logger.debug( + { + connId: conn.id, + code, + reason: reason.toString('utf8'), + }, + 'connection closed', + ); + if (onClose) onClose(conn); + }); + + // ----------------------------------------------------------------------- + // Error handler — prevent uncaught-exception crash on socket errors + // ----------------------------------------------------------------------- + + ws.on('error', (err) => { + logger.debug({ connId: conn.id, err }, 'connection error'); + }); + }); + + // ------------------------------------------------------------------------- + // Heartbeat interval + // ------------------------------------------------------------------------- + + let pingTimer: ReturnType | null = null; + + function startHeartbeat(): void { + pingTimer = setInterval(() => { + for (const conn of connections.values()) { + if (conn.ws.readyState !== WebSocket.OPEN) continue; + conn.ws.ping(); + } + }, config.LIVE_WS_PING_INTERVAL_MS); + + // Do not hold the event loop open just for heartbeats during shutdown. + pingTimer.unref(); + } + + // ------------------------------------------------------------------------- + // Lifecycle + // ------------------------------------------------------------------------- + + async function start(): Promise { + logger.info( + { host: config.LIVE_WS_HOST, port: config.LIVE_WS_PORT }, + 'live server starting', + ); + + await new Promise((resolve, reject) => { + httpServer.once('error', reject); + httpServer.listen(config.LIVE_WS_PORT, config.LIVE_WS_HOST, () => { + httpServer.off('error', reject); + resolve(); + }); + }); + + startHeartbeat(); + + logger.info( + { host: config.LIVE_WS_HOST, port: config.LIVE_WS_PORT }, + 'live server ready', + ); + } + + async function stop(timeoutMs = config.LIVE_WS_DRAIN_TIMEOUT_MS): Promise { + logger.info('live server stopping'); + + // Stop heartbeat interval. + if (pingTimer !== null) { + clearInterval(pingTimer); + pingTimer = null; + } + + // Stop accepting new connections. + httpServer.close(); + + // Send close frame to every open connection. + for (const conn of connections.values()) { + conn.ws.close(WsCloseCodes.GOING_AWAY, 'server shutting down'); + } + + // Wait for connections to drain, up to the timeout. + const deadline = Date.now() + timeoutMs; + while (connections.size > 0 && Date.now() < deadline) { + await new Promise((resolve) => setTimeout(resolve, 50)); + } + + // Force-terminate any stragglers (e.g. client with slow TCP stack). + for (const conn of connections.values()) { + conn.ws.terminate(); + } + + logger.info('live server stopped'); + } + + return { start, stop }; +} diff --git a/src/main.ts b/src/main.ts index b7f0f09..472af9a 100644 --- a/src/main.ts +++ b/src/main.ts @@ -16,6 +16,9 @@ import { connectRedis, createConsumer } from './core/consumer.js'; import type { ConsumedRecord } from './core/consumer.js'; import { createDeviceStateStore } from './core/state.js'; import { createWriter } from './core/writer.js'; +import { createLiveServer, sendOutbound } from './live/server.js'; +import type { LiveServer, LiveConnection } from './live/server.js'; +import type { InboundMessage } from './live/protocol.js'; // ------------------------------------------------------------------------- // Startup: validate config (fail fast on bad env), build logger @@ -128,17 +131,41 @@ async function main(): Promise { return ackIds; }; - // 10. Build and start the consumer + // 10. Build the live WebSocket server (task 1.5.1). + // The stub message handler replies with `error/not-implemented` until + // tasks 1.5.2 and 1.5.3 wire in the real auth + registry handler. + const stubMessageHandler = async ( + conn: LiveConnection, + _message: InboundMessage, + ): Promise => { + sendOutbound( + conn, + { type: 'error', code: 'not-implemented' }, + metrics, + config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES, + ); + }; + + const liveServer: LiveServer = createLiveServer( + config, + logger, + metrics, + stubMessageHandler, + ); + await liveServer.start(); + + // 11. Build and start the durable-write consumer const consumer = createConsumer(redis, config, logger, metrics, sink); await consumer.start(); - // 11. Install graceful shutdown. - // Full Phase 3 hardening: explicit consumer-group commit on SIGTERM, - // uncaught-exception handler, multi-instance drain mode. + // 12. Install graceful shutdown. + // Shutdown order: live server first (no new connections), then + // broadcast consumer (task 1.5.4 adds this), then durable-write consumer. installGracefulShutdown({ redis, pool, consumer, + liveServer, metricsServer, pgHealth, lagSampler, @@ -151,6 +178,7 @@ async function main(): Promise { group: config.REDIS_CONSUMER_GROUP, consumer: config.REDIS_CONSUMER_NAME, metricsPort: config.METRICS_PORT, + wsPort: config.LIVE_WS_PORT, }, 'processor ready', ); @@ -164,6 +192,7 @@ type ShutdownDeps = { readonly redis: Redis; readonly pool: pg.Pool; readonly consumer: { stop: () => Promise }; + readonly liveServer: LiveServer; readonly metricsServer: http.Server; readonly pgHealth: { stop: () => void }; readonly lagSampler: { stop: () => void }; @@ -171,7 +200,7 @@ type ShutdownDeps = { }; function installGracefulShutdown(deps: ShutdownDeps): void { - const { redis, pool, consumer, metricsServer, pgHealth, lagSampler, logger: log } = deps; + const { redis, pool, consumer, liveServer, metricsServer, pgHealth, lagSampler, logger: log } = deps; let shuttingDown = false; @@ -187,8 +216,18 @@ function installGracefulShutdown(deps: ShutdownDeps): void { lagSampler.stop(); pgHealth.stop(); - consumer + // Shutdown order: + // 1. Live server — stop accepting new connections and drain existing ones + // first, so clients know the server is going away before the consumer + // stops processing. + // 2. Durable-write consumer — lets the in-flight batch finish. + // 3. Metrics server, Redis, Postgres. + liveServer .stop() + .then(() => { + log.info('live server stopped'); + return consumer.stop(); + }) .then(() => { log.info('consumer stopped'); return new Promise((resolve, reject) => diff --git a/src/observability/metrics.ts b/src/observability/metrics.ts index 89da9af..40dcc6c 100644 --- a/src/observability/metrics.ts +++ b/src/observability/metrics.ts @@ -45,6 +45,23 @@ type InternalRegistry = { readonly acksTotal: Counter; readonly deviceStateSizeGauge: Gauge; readonly deviceStateEvictionsTotal: Counter; + // Phase 1.5 — Live broadcast + readonly liveConnectionsGauge: Gauge; + readonly liveMessagesInboundTotal: Counter; + readonly liveMessagesOutboundTotal: Counter; + readonly liveAuthAttemptsTotal: Counter; + readonly liveAuthLatencyMs: Histogram; + readonly liveSubscriptionsGauge: Gauge; + readonly liveSubscribeAttemptsTotal: Counter; + readonly liveAuthzLatencyMs: Histogram; + readonly liveBroadcastRecordsTotal: Counter; + readonly liveBroadcastFanoutMessagesTotal: Counter; + readonly liveBroadcastOrphanRecordsTotal: Counter; + readonly liveBroadcastLagMs: Histogram; + readonly liveSnapshotQueryLatencyMs: Histogram; + readonly liveSnapshotSize: Histogram; + readonly liveDeviceEventRefreshLatencyMs: Histogram; + readonly liveDeviceEventEntries: Gauge; }; // --------------------------------------------------------------------------- @@ -376,6 +393,121 @@ function buildInternalRegistry(): InternalRegistry { registers: [registry], }); + // ------------------------------------------------------------------------- + // Phase 1.5 — Live broadcast metrics + // ------------------------------------------------------------------------- + + const liveConnectionsGauge = new Gauge({ + name: 'processor_live_connections', + help: 'Current number of open WebSocket connections.', + labelNames: ['instance_id'], + registers: [registry], + }); + + const liveMessagesInboundTotal = new Counter({ + name: 'processor_live_messages_inbound_total', + help: 'Inbound WS messages. type=subscribe|unsubscribe|invalid.', + labelNames: ['type', 'instance_id'], + registers: [registry], + }); + + const liveMessagesOutboundTotal = new Counter({ + name: 'processor_live_messages_outbound_total', + help: 'Outbound WS messages. type=subscribed|unsubscribed|position|error.', + labelNames: ['type', 'instance_id'], + registers: [registry], + }); + + const liveAuthAttemptsTotal = new Counter({ + name: 'processor_live_auth_attempts_total', + help: 'WS upgrade auth attempts. result=success|unauthorized|error.', + labelNames: ['result'], + registers: [registry], + }); + + const liveAuthLatencyMs = new Histogram({ + name: 'processor_live_auth_latency_ms', + help: 'Latency of /users/me round-trip for WS upgrade auth.', + buckets: [5, 10, 25, 50, 100, 250, 500, 1000, 5000], + registers: [registry], + }); + + const liveSubscriptionsGauge = new Gauge({ + name: 'processor_live_subscriptions', + help: 'Current total active topic subscriptions across all connections.', + labelNames: ['instance_id'], + registers: [registry], + }); + + const liveSubscribeAttemptsTotal = new Counter({ + name: 'processor_live_subscribe_attempts_total', + help: 'Subscribe attempts. result=success|forbidden|not-found|unknown-topic|error.', + labelNames: ['result'], + registers: [registry], + }); + + const liveAuthzLatencyMs = new Histogram({ + name: 'processor_live_authz_latency_ms', + help: 'Latency of /items/events/ round-trip for per-event authorization.', + buckets: [5, 10, 25, 50, 100, 250, 500, 1000, 5000], + registers: [registry], + }); + + const liveBroadcastRecordsTotal = new Counter({ + name: 'processor_live_broadcast_records_total', + help: 'Records consumed by the broadcast consumer group.', + labelNames: ['instance_id'], + registers: [registry], + }); + + const liveBroadcastFanoutMessagesTotal = new Counter({ + name: 'processor_live_broadcast_fanout_messages_total', + help: 'Outbound position frames sent via fan-out.', + labelNames: ['instance_id'], + registers: [registry], + }); + + const liveBroadcastOrphanRecordsTotal = new Counter({ + name: 'processor_live_broadcast_orphan_records_total', + help: 'Records for devices not registered to any event (no fan-out).', + labelNames: ['instance_id'], + registers: [registry], + }); + + const liveBroadcastLagMs = new Histogram({ + name: 'processor_live_broadcast_lag_ms', + help: 'End-to-end latency from record ts to fan-out send, in milliseconds.', + buckets: [5, 10, 25, 50, 100, 250, 500, 1000, 5000], + registers: [registry], + }); + + const liveSnapshotQueryLatencyMs = new Histogram({ + name: 'processor_live_snapshot_query_latency_ms', + help: 'Latency of the snapshot-on-subscribe query.', + buckets: [5, 10, 25, 50, 100, 250, 500, 1000, 5000], + registers: [registry], + }); + + const liveSnapshotSize = new Histogram({ + name: 'processor_live_snapshot_size', + help: 'Number of positions in each snapshot response.', + buckets: [0, 1, 5, 10, 25, 50, 100, 250, 500], + registers: [registry], + }); + + const liveDeviceEventRefreshLatencyMs = new Histogram({ + name: 'processor_live_device_event_refresh_latency_ms', + help: 'Latency of device-event map refresh queries.', + buckets: [1, 5, 10, 25, 50, 100, 250, 500], + registers: [registry], + }); + + const liveDeviceEventEntries = new Gauge({ + name: 'processor_live_device_event_entries', + help: 'Number of device→event mappings currently in the in-memory cache.', + registers: [registry], + }); + return { registry, consumerReadsTotal, @@ -387,6 +519,22 @@ function buildInternalRegistry(): InternalRegistry { acksTotal, deviceStateSizeGauge, deviceStateEvictionsTotal, + liveConnectionsGauge, + liveMessagesInboundTotal, + liveMessagesOutboundTotal, + liveAuthAttemptsTotal, + liveAuthLatencyMs, + liveSubscriptionsGauge, + liveSubscribeAttemptsTotal, + liveAuthzLatencyMs, + liveBroadcastRecordsTotal, + liveBroadcastFanoutMessagesTotal, + liveBroadcastOrphanRecordsTotal, + liveBroadcastLagMs, + liveSnapshotQueryLatencyMs, + liveSnapshotSize, + liveDeviceEventRefreshLatencyMs, + liveDeviceEventEntries, }; } @@ -420,6 +568,30 @@ function dispatchInc( case 'processor_device_state_evictions_total': r.deviceStateEvictionsTotal.inc(v); break; + // Phase 1.5 — Live broadcast (connections are set via observe, not inc) + case 'processor_live_messages_inbound_total': + r.liveMessagesInboundTotal.inc(labels ?? {}, v); + break; + case 'processor_live_messages_outbound_total': + r.liveMessagesOutboundTotal.inc(labels ?? {}, v); + break; + case 'processor_live_auth_attempts_total': + r.liveAuthAttemptsTotal.inc(labels ?? {}, v); + break; + // subscriptions gauge is set via observe (see dispatchObserve) + case 'processor_live_subscribe_attempts_total': + r.liveSubscribeAttemptsTotal.inc(labels ?? {}, v); + break; + case 'processor_live_broadcast_records_total': + r.liveBroadcastRecordsTotal.inc(labels ?? {}, v); + break; + case 'processor_live_broadcast_fanout_messages_total': + r.liveBroadcastFanoutMessagesTotal.inc(labels ?? {}, v); + break; + case 'processor_live_broadcast_orphan_records_total': + r.liveBroadcastOrphanRecordsTotal.inc(labels ?? {}, v); + break; + // device_event_entries gauge is set via observe (see dispatchObserve) default: // Unknown metric name — silently ignore. This preserves the contract // that the Metrics interface never throws, and avoids crashing the @@ -445,6 +617,34 @@ function dispatchObserve( case 'processor_device_state_size': r.deviceStateSizeGauge.set(value); break; + // Phase 1.5 — Live broadcast + case 'processor_live_connections': + r.liveConnectionsGauge.set(value); + break; + case 'processor_live_auth_latency_ms': + r.liveAuthLatencyMs.observe(value); + break; + case 'processor_live_authz_latency_ms': + r.liveAuthzLatencyMs.observe(value); + break; + case 'processor_live_broadcast_lag_ms': + r.liveBroadcastLagMs.observe(value); + break; + case 'processor_live_snapshot_query_latency_ms': + r.liveSnapshotQueryLatencyMs.observe(value); + break; + case 'processor_live_snapshot_size': + r.liveSnapshotSize.observe(value); + break; + case 'processor_live_device_event_refresh_latency_ms': + r.liveDeviceEventRefreshLatencyMs.observe(value); + break; + case 'processor_live_subscriptions': + r.liveSubscriptionsGauge.set(value); + break; + case 'processor_live_device_event_entries': + r.liveDeviceEventEntries.set(value); + break; default: // Unknown metric name — silently ignore (see dispatchInc comment). break; diff --git a/src/shared/types.ts b/src/shared/types.ts new file mode 100644 index 0000000..5616817 --- /dev/null +++ b/src/shared/types.ts @@ -0,0 +1,29 @@ +/** + * Shared types used by both src/core/ and src/live/. + * + * Both modules need the `Metrics` interface for observability. Placing it here + * avoids an import across the enforced src/core/ ↔ src/live/ boundary. + * + * src/core/types.ts re-exports Metrics from here to preserve the existing + * import path for Phase 1 call sites. + */ + +// --------------------------------------------------------------------------- +// Metrics +// --------------------------------------------------------------------------- + +/** + * Minimal metrics interface exposed to pipeline and live-broadcast components. + * + * `inc` accepts an optional `value` for batched increments — counters that + * naturally arrive in groups should pass the count rather than calling `inc` + * N times. Defaults to 1. + */ +export type Metrics = { + readonly inc: ( + name: string, + labels?: Record, + value?: number, + ) => void; + readonly observe: (name: string, value: number, labels?: Record) => void; +}; diff --git a/test/live-server.test.ts b/test/live-server.test.ts new file mode 100644 index 0000000..2f97358 --- /dev/null +++ b/test/live-server.test.ts @@ -0,0 +1,324 @@ +/** + * Unit tests for src/live/server.ts — WebSocket server scaffold + heartbeat. + * + * Uses a real `ws` client against an in-process server bound on a random port. + * No Redis or Postgres required. The message handler is stubbed. + * + * Covers: + * - Server starts on a random port and accepts a WS connection. + * - Server sends a ping within PING_INTERVAL_MS + 100ms; pong updates lastSeenAt. + * - Inbound message that fails zod validation receives protocol-violation error + * and the connection stays open. + * - Valid subscribe message reaches the onMessage handler. + * - stop() sends a close frame to existing connections and resolves within + * the drain timeout. + * - processor_live_connections gauge increments on connect and decrements on close. + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import WebSocket from 'ws'; +import type { Logger } from 'pino'; +import type { Config } from '../src/config/load.js'; +import type { Metrics } from '../src/core/types.js'; +import { createLiveServer } from '../src/live/server.js'; +import type { LiveConnection, MessageHandler } from '../src/live/server.js'; +import type { InboundMessage } from '../src/live/protocol.js'; + +// --------------------------------------------------------------------------- +// Test helpers +// --------------------------------------------------------------------------- + +function makeSilentLogger(): Logger { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + fatal: vi.fn(), + trace: vi.fn(), + child: vi.fn().mockReturnThis(), + level: 'silent', + silent: vi.fn(), + } as unknown as Logger; +} + +type TestMetrics = Metrics & { + readonly incCalls: Array<{ name: string; labels?: Record; value?: number }>; + readonly observeCalls: Array<{ name: string; value: number }>; +}; + +function makeMetrics(): TestMetrics { + const incCalls: Array<{ name: string; labels?: Record; value?: number }> = []; + const observeCalls: Array<{ name: string; value: number }> = []; + return { + incCalls, + observeCalls, + inc(name, labels, value) { + incCalls.push({ name, labels, value }); + }, + observe(name, value) { + observeCalls.push({ name, value }); + }, + }; +} + +function makeConfig(overrides: Partial = {}): Config { + return { + NODE_ENV: 'test', + INSTANCE_ID: 'test-1', + LOG_LEVEL: 'silent', + REDIS_URL: 'redis://localhost:6379', + POSTGRES_URL: 'postgres://localhost:5432/test', + REDIS_TELEMETRY_STREAM: 'telemetry:t', + REDIS_CONSUMER_GROUP: 'processor', + REDIS_CONSUMER_NAME: 'test-consumer', + METRICS_PORT: 0, + BATCH_SIZE: 100, + BATCH_BLOCK_MS: 500, + WRITE_BATCH_SIZE: 50, + DEVICE_STATE_LRU_CAP: 10_000, + LIVE_WS_PORT: 0, // OS-assigned port + LIVE_WS_HOST: '127.0.0.1', + LIVE_WS_PING_INTERVAL_MS: 200, // short for tests + LIVE_WS_DRAIN_TIMEOUT_MS: 500, + LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES: 1_048_576, + DIRECTUS_BASE_URL: 'http://localhost:8055', + DIRECTUS_AUTH_TIMEOUT_MS: 5_000, + DIRECTUS_AUTHZ_TIMEOUT_MS: 5_000, + LIVE_BROADCAST_GROUP_PREFIX: 'live-broadcast', + LIVE_BROADCAST_BATCH_SIZE: 100, + LIVE_BROADCAST_BATCH_BLOCK_MS: 1_000, + LIVE_DEVICE_EVENT_REFRESH_MS: 30_000, + ...overrides, + }; +} + +/** + * Connects a ws client to the given URL and resolves when the connection is open. + */ +function connectClient(url: string): Promise { + return new Promise((resolve, reject) => { + const ws = new WebSocket(url); + ws.once('open', () => resolve(ws)); + ws.once('error', reject); + }); +} + +/** + * Waits for the next message on the given WebSocket and returns its parsed JSON. + */ +function waitForMessage(ws: WebSocket, timeoutMs = 3_000): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error('timeout waiting for message')), timeoutMs); + ws.once('message', (data) => { + clearTimeout(timer); + resolve(JSON.parse(data.toString())); + }); + }); +} + +/** + * Waits for the next pong frame on the ws client socket. + */ +function waitForPing(ws: WebSocket, timeoutMs = 3_000): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error('timeout waiting for ping')), timeoutMs); + // ws client emits 'ping' when a server ping frame arrives. + ws.once('ping', () => { + clearTimeout(timer); + resolve(); + }); + }); +} + +/** + * Waits for the ws client to receive a close frame. + */ +function waitForClose(ws: WebSocket, timeoutMs = 3_000): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error('timeout waiting for close')), timeoutMs); + ws.once('close', (code) => { + clearTimeout(timer); + resolve(code); + }); + }); +} + +// --------------------------------------------------------------------------- +// Tests — server with discoverable port +// --------------------------------------------------------------------------- + +import * as httpModule from 'node:http'; +import type { AddressInfo } from 'node:net'; + +/** + * Finds a free TCP port by letting the OS assign one, then closing the listener. + */ +function getFreePort(): Promise { + return new Promise((resolve, reject) => { + const srv = new httpModule.Server(); + srv.listen(0, '127.0.0.1', () => { + const addr = srv.address() as AddressInfo; + const p = addr.port; + srv.close((err) => (err ? reject(err) : resolve(p))); + }); + }); +} + +describe('live server — lifecycle and message routing', () => { + let server: ReturnType; + let wsUrl: string; + let clients: WebSocket[] = []; + let metrics: TestMetrics; + let capturedConnections: LiveConnection[] = []; + let capturedMessages: Array<{ conn: LiveConnection; msg: InboundMessage }> = []; + + beforeEach(async () => { + clients = []; + capturedConnections = []; + capturedMessages = []; + + const logger = makeSilentLogger(); + metrics = makeMetrics(); + const port = await getFreePort(); + + const handler: MessageHandler = async (conn, msg) => { + capturedConnections.push(conn); + capturedMessages.push({ conn, msg }); + // Echo not-implemented for subscribe + conn.ws.send(JSON.stringify({ type: 'error', code: 'not-implemented' })); + }; + + const config = makeConfig({ + LIVE_WS_PORT: port, + LIVE_WS_HOST: '127.0.0.1', + LIVE_WS_PING_INTERVAL_MS: 200, + LIVE_WS_DRAIN_TIMEOUT_MS: 500, + }); + + server = createLiveServer(config, logger, metrics, handler); + await server.start(); + wsUrl = `ws://127.0.0.1:${port}`; + }); + + afterEach(async () => { + for (const client of clients) { + if (client.readyState !== WebSocket.CLOSED) { + client.terminate(); + } + } + await server.stop(200).catch(() => {}); + }); + + it('accepts a WS connection', async () => { + const client = await connectClient(wsUrl); + clients.push(client); + expect(client.readyState).toBe(WebSocket.OPEN); + }); + + it('sends a ping within PING_INTERVAL_MS + buffer', async () => { + const client = await connectClient(wsUrl); + clients.push(client); + // PING_INTERVAL_MS = 200ms; allow 300ms total. + await waitForPing(client, 300); + // If we get here without timeout, the ping arrived. + }); + + it('inbound message failing zod validation receives protocol-violation error; connection stays open', async () => { + const client = await connectClient(wsUrl); + clients.push(client); + + // Send a JSON object with an unknown type — zod will reject. + client.send(JSON.stringify({ type: 'unknown-action', data: 42 })); + const msg = await waitForMessage(client, 2_000) as Record; + + expect(msg['type']).toBe('error'); + expect(msg['code']).toBe('protocol-violation'); + expect(client.readyState).toBe(WebSocket.OPEN); + }); + + it('inbound malformed JSON receives protocol-violation error', async () => { + const client = await connectClient(wsUrl); + clients.push(client); + + client.send('not valid json {{{'); + const msg = await waitForMessage(client, 2_000) as Record; + + expect(msg['type']).toBe('error'); + expect(msg['code']).toBe('protocol-violation'); + expect(client.readyState).toBe(WebSocket.OPEN); + }); + + it('valid subscribe message reaches the onMessage handler', async () => { + const client = await connectClient(wsUrl); + clients.push(client); + + const subMsg = { + type: 'subscribe', + topic: 'event:ada60b3d-b29f-4017-b702-cd6b700f9f6c', + id: 'corr-1', + }; + client.send(JSON.stringify(subMsg)); + await waitForMessage(client, 2_000); // Wait for not-implemented reply + + expect(capturedMessages.length).toBe(1); + const captured = capturedMessages[0]; + expect(captured).toBeDefined(); + expect(captured!.msg.type).toBe('subscribe'); + expect(captured!.msg.topic).toBe('event:ada60b3d-b29f-4017-b702-cd6b700f9f6c'); + }); + + it('processor_live_connections observe is called on connect and disconnect', async () => { + const before = metrics.observeCalls.filter( + (c) => c.name === 'processor_live_connections', + ).length; + + const client = await connectClient(wsUrl); + clients.push(client); + + // Wait briefly for the connection event. + await new Promise((resolve) => setTimeout(resolve, 50)); + + const afterConnect = metrics.observeCalls.filter( + (c) => c.name === 'processor_live_connections', + ); + expect(afterConnect.length).toBeGreaterThan(before); + // After connect, value should be >= 1. + const lastAfterConnect = afterConnect[afterConnect.length - 1]; + expect(lastAfterConnect).toBeDefined(); + expect(lastAfterConnect!.value).toBeGreaterThanOrEqual(1); + + // Disconnect the client. + const closePromise = waitForClose(client, 1_000); + client.close(); + await closePromise; + + // Wait briefly for the close event to propagate. + await new Promise((resolve) => setTimeout(resolve, 50)); + + const afterDisconnect = metrics.observeCalls.filter( + (c) => c.name === 'processor_live_connections', + ); + const lastAfterDisconnect = afterDisconnect[afterDisconnect.length - 1]; + expect(lastAfterDisconnect).toBeDefined(); + expect(lastAfterDisconnect!.value).toBe(0); + }); + + it('stop() sends close frame to existing connections and resolves within drain timeout', async () => { + const client = await connectClient(wsUrl); + clients.push(client); + + const closePromise = waitForClose(client, 2_000); + const startMs = Date.now(); + await server.stop(500); + const elapsedMs = Date.now() - startMs; + + // Should resolve within the drain timeout + a small buffer. + expect(elapsedMs).toBeLessThan(1_000); + + // The client should have received a close frame. + const code = await closePromise; + // 1001 = GOING_AWAY (server shutting down). + expect(code).toBe(1001); + }); +});