9.3 KiB
Task 1.5.1 — WS server scaffold + heartbeat
Phase: 1.5 — Live broadcast
Status: ⬜ Not started
Depends on: 1.8 (main wiring), 1.9 (observability)
Wiki refs: docs/wiki/synthesis/processor-ws-contract.md §Endpoint, §Transport; docs/wiki/concepts/live-channel-architecture.md
Goal
Stand up a WebSocket server inside the Processor process: bind to a configurable port, accept upgrades, dispatch incoming messages to a router, send 30s pings, hold a typed LiveConnection per client. No auth, no subscriptions yet — those land in 1.5.2 and 1.5.3. This task is the lifecycle and message-loop skeleton.
The WS server runs on its own HTTP server (separate from the Phase 1 metrics/health server on :9090) so the reverse proxy can route them to different paths and the failure modes don't entangle.
Deliverables
src/live/server.tsexporting:createLiveServer(config, logger, metrics): LiveServer— factory.LiveServerinterface:start(): Promise<void>(binds and listens),stop(timeoutMs?: number): Promise<void>(closes new connections, sends a close frame to existing, waits for them to drain or force-closes after the timeout).type LiveConnection = { id: string; ws: WebSocket; remoteAddr: string; openedAt: Date; lastSeenAt: Date }— opaque identity, augmented in later tasks.- A pluggable
onMessage(conn: LiveConnection, raw: string): Promise<void>handler — for now, just logs atdebugand replies with{ type: 'error', code: 'not-implemented' }. Tasks 1.5.2 and 1.5.3 attach the real handler.
src/live/protocol.ts— zod schemas for inbound message envelopes:Outbound types declared but not constructed yet (const InboundMessage = z.discriminatedUnion('type', [ z.object({ type: z.literal('subscribe'), topic: z.string(), id: z.string().optional() }), z.object({ type: z.literal('unsubscribe'), topic: z.string(), id: z.string().optional() }), ]);subscribed/position/unsubscribed/error).src/main.tsupdated to create + start the live server alongside the existing consumer; SIGTERM stops both in the right order (live server first so no new connections during drain; consumer second so the durable-write path completes its in-flight batch).- New config keys (zod schema in
src/config/load.ts):LIVE_WS_PORT(default8081).LIVE_WS_HOST(default0.0.0.0).LIVE_WS_PING_INTERVAL_MS(default30_000).LIVE_WS_DRAIN_TIMEOUT_MS(default5_000).
- New Prometheus metrics (in
src/observability/metrics.ts):processor_live_connections{instance_id}(gauge) — current open connections.processor_live_messages_inbound_total{instance_id, type}(counter).processor_live_messages_outbound_total{instance_id, type}(counter).
test/live-server.test.ts:- Server starts on a random port, accepts a connection, ping is sent within
PING_INTERVAL_MS + 100ms, pong updateslastSeenAt. - Inbound message that fails zod validation receives an
{ type: 'error', code: 'protocol-violation' }reply and the connection stays open. stop()sends a close frame to existing connections and resolves within the drain timeout.
- Server starts on a random port, accepts a connection, ping is sent within
Specification
Library choice
ws (the package, not @types/ws alone). Lightweight, minimal API, supports noServer: true for attaching to an existing http.createServer. Avoid uWebSockets.js — performance is great but the C++ binding makes deployment / testcontainers-friendliness fiddly.
Server attach pattern
const httpServer = http.createServer((req, res) => {
// Optional: a small /healthz endpoint specific to the live server, separate
// from the Phase 1 metrics/health server. For now, return 404 on HTTP requests
// — the only thing this server does is upgrade.
res.writeHead(404).end();
});
const wss = new WebSocketServer({ noServer: true });
httpServer.on('upgrade', (req, socket, head) => {
// Auth happens here in task 1.5.2. For now, just accept.
wss.handleUpgrade(req, socket, head, (ws) => {
wss.emit('connection', ws, req);
});
});
wss.on('connection', (ws, req) => {
const conn: LiveConnection = {
id: nanoid(),
ws,
remoteAddr: req.socket.remoteAddress ?? 'unknown',
openedAt: new Date(),
lastSeenAt: new Date(),
};
// ... attach handlers
});
Heartbeat
Use the ws library's built-in ping/pong:
const pingTimer = setInterval(() => {
for (const conn of connections.values()) {
if (conn.ws.readyState !== WebSocket.OPEN) continue;
conn.ws.ping();
// Optional: track outstanding pings; close if pong doesn't arrive in N seconds.
}
}, config.LIVE_WS_PING_INTERVAL_MS);
ws automatically responds to inbound pings with pongs, and emits 'pong' on the server when a client responds. Update lastSeenAt in the pong handler. Don't roll your own ping in the application protocol — the WebSocket frame-level ping/pong is faster, browser-built-in, and doesn't pollute the message log.
Inbound message handling
ws.on('message', async (data) => {
conn.lastSeenAt = new Date();
const raw = data.toString('utf8');
let parsed;
try {
parsed = InboundMessage.parse(JSON.parse(raw));
} catch (err) {
metrics.liveMessagesInbound.inc({ type: 'invalid' });
sendOutbound(conn, { type: 'error', code: 'protocol-violation', message: 'Invalid message envelope' });
return;
}
metrics.liveMessagesInbound.inc({ type: parsed.type });
await onMessage(conn, parsed);
});
onMessage is the pluggable handler that 1.5.2 (auth gate) and 1.5.3 (subscription registry) replace.
Outbound message helper
function sendOutbound(conn: LiveConnection, msg: OutboundMessage): void {
if (conn.ws.readyState !== WebSocket.OPEN) return;
conn.ws.send(JSON.stringify(msg));
metrics.liveMessagesOutbound.inc({ type: msg.type });
}
Centralised so back-pressure handling (1.5.4) and message logging hook in one place later.
Close codes
ws lets you specify close codes when calling ws.close(code, reason). Reserve these:
| Code | Meaning | Where set |
|---|---|---|
1000 |
Normal closure | Default for clean disconnect |
1001 |
Server going away | stop() during shutdown |
4401 |
Unauthorized | Task 1.5.2 |
4403 |
Forbidden | Task 1.5.3 (for revoked authorization, not used in pilot) |
Document these in protocol.ts as constants.
Drain on stop()
async function stop(timeoutMs = config.LIVE_WS_DRAIN_TIMEOUT_MS) {
// 1. Stop accepting new connections.
httpServer.close();
// 2. Send close frame to every open connection.
for (const conn of connections.values()) {
conn.ws.close(1001, 'server shutting down');
}
// 3. Wait for them to finish, with timeout.
const deadline = Date.now() + timeoutMs;
while (connections.size > 0 && Date.now() < deadline) {
await sleep(50);
}
// 4. Force-terminate any stragglers.
for (const conn of connections.values()) {
conn.ws.terminate();
}
}
Stragglers happen when a client's TCP stack is slow or the network is partitioned. Force-terminate is the right call — we're shutting down anyway.
Logger conventions
info:live server starting on :8081,live server ready,live server stopping,live server stopped.debug:connection opened id=... remote=...,connection closed id=... code=... reason=...,inbound message id=... type=....trace: per-message routing detail.
Don't log full WS payloads by default — they may contain large snapshot arrays in later tasks.
Acceptance criteria
pnpm typecheck,pnpm lint,pnpm testclean.pnpm devboots, logs both the consumer lifecycle (Phase 1) and the live server lifecycle.wscat -c ws://localhost:8081connects; sending a malformed JSON gets theprotocol-violationerror; sending{"type":"subscribe","topic":"foo"}gets{"type":"error","code":"not-implemented"}.- Server pings within
LIVE_WS_PING_INTERVAL_MSof connect; client pong updateslastSeenAt. kill -TERM <pid>exits cleanly withinLIVE_WS_DRAIN_TIMEOUT_MS + 1s.processor_live_connectionsgauge moves up on connect, down on disconnect.
Risks / open questions
- Port conflict with metrics server. Phase 1 binds
:9090for metrics. Live server defaults to:8081. Both can be host-published or only the live one (metrics is internal-only). Document incompose.yamlupdates that follow. - Reverse-proxy upgrade path. Traefik / Caddy / nginx all support WS upgrade transparently if the path is configured for it. The proxy config lives in
trm/deploy; this task doesn't touch it but the README's manual smoke test requires it for end-to-end. - Per-connection memory. Each
LiveConnectionis small (~200 bytes plus thewslibrary's internal state). At 100 concurrent connections that's tens of KB. Not a concern at pilot scale.
Done
Landed in b8ebbd0. Key deviations from spec:
- Used
crypto.randomUUID()(Node 22 built-in) instead ofnanoid— avoids adding a new npm dep beyondws. Metricsmoved tosrc/shared/types.ts(re-exported fromsrc/core/types.ts) sosrc/live/server.tscan import it without violating the ESLintimport/no-restricted-pathsrule.processor_live_connectionsgauge andprocessor_live_subscriptionsgauge are driven viametrics.observe()(which calls prom-client.set()) rather thaninc/decbecause the sharedMetricsinterface has nodecmethod.