feat(live): task 1.5.1 — WS server scaffold + heartbeat

Stand up the WebSocket live-broadcast server inside the Processor process:
- src/live/server.ts: createLiveServer factory with start/stop lifecycle,
  per-connection LiveConnection type, sendOutbound helper with back-pressure
  guard, 30s frame-level heartbeat via ws ping/pong, pluggable onMessage
  handler (stub returns error/not-implemented until 1.5.2/1.5.3).
- src/live/protocol.ts: zod schemas for inbound subscribe/unsubscribe messages,
  all outbound types (subscribed/unsubscribed/position/error), WsCloseCodes.
- src/shared/types.ts: extracted Metrics interface so src/live/ can import it
  without crossing the enforced src/live/ ↔ src/core/ ESLint boundary.
- src/core/types.ts: re-exports Metrics from shared/types to keep Phase 1
  call sites unchanged.
- src/config/load.ts: LIVE_WS_PORT, LIVE_WS_HOST, LIVE_WS_PING_INTERVAL_MS,
  LIVE_WS_DRAIN_TIMEOUT_MS, LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES,
  DIRECTUS_BASE_URL, DIRECTUS_AUTH_TIMEOUT_MS, DIRECTUS_AUTHZ_TIMEOUT_MS,
  LIVE_BROADCAST_GROUP_PREFIX, LIVE_BROADCAST_BATCH_SIZE,
  LIVE_BROADCAST_BATCH_BLOCK_MS, LIVE_DEVICE_EVENT_REFRESH_MS.
- src/observability/metrics.ts: Phase 1.5 metrics inventory (connections,
  inbound/outbound counters, auth/authz histograms, subscription gauge,
  broadcast counters + lag histogram, snapshot histograms, device-event map).
- src/main.ts: wires the live server alongside the durable-write consumer;
  shutdown order: live server → consumer → metrics → Redis → Postgres.
- eslint.config.js: import/no-restricted-paths zones for src/live/ ↔ src/core/.
- test/live-server.test.ts: 7 unit tests covering connect, ping, protocol
  violation, valid message dispatch, connections gauge, and stop() drain.
This commit is contained in:
2026-05-02 17:33:31 +02:00
parent e1c6f59948
commit 7154a0a49c
11 changed files with 1134 additions and 21 deletions
+18
View File
@@ -59,6 +59,24 @@ const ConfigSchema = z.object({
// Per-device in-memory state LRU cap
DEVICE_STATE_LRU_CAP: z.coerce.number().int().min(100).max(1_000_000).default(10_000),
// Live broadcast WebSocket server
LIVE_WS_PORT: z.coerce.number().int().min(0).max(65535).default(8081),
LIVE_WS_HOST: z.string().min(1).default('0.0.0.0'),
LIVE_WS_PING_INTERVAL_MS: z.coerce.number().int().min(1_000).max(300_000).default(30_000),
LIVE_WS_DRAIN_TIMEOUT_MS: z.coerce.number().int().min(100).max(30_000).default(5_000),
LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES: z.coerce.number().int().min(1024).default(1_048_576),
// Directus connectivity (for auth + authz round-trips)
DIRECTUS_BASE_URL: urlWithProtocol(['http', 'https']).default('http://directus:8055'),
DIRECTUS_AUTH_TIMEOUT_MS: z.coerce.number().int().min(100).max(30_000).default(5_000),
DIRECTUS_AUTHZ_TIMEOUT_MS: z.coerce.number().int().min(100).max(30_000).default(5_000),
// Broadcast consumer group
LIVE_BROADCAST_GROUP_PREFIX: z.string().min(1).default('live-broadcast'),
LIVE_BROADCAST_BATCH_SIZE: z.coerce.number().int().min(1).max(10_000).default(100),
LIVE_BROADCAST_BATCH_BLOCK_MS: z.coerce.number().int().min(0).max(60_000).default(1_000),
LIVE_DEVICE_EVENT_REFRESH_MS: z.coerce.number().int().min(1_000).max(3_600_000).default(30_000),
});
// ---------------------------------------------------------------------------
+3 -15
View File
@@ -83,18 +83,6 @@ export type DeviceState = {
// Metrics — observability surface
// ---------------------------------------------------------------------------
/**
* Minimal metrics interface exposed to pipeline components.
*
* `inc` accepts an optional `value` for batched increments — counters that
* naturally arrive in groups (records consumed, rows inserted, IDs ACKed)
* should pass the count rather than calling `inc` N times. Defaults to 1.
*/
export type Metrics = {
readonly inc: (
name: string,
labels?: Record<string, string>,
value?: number,
) => void;
readonly observe: (name: string, value: number, labels?: Record<string, string>) => void;
};
// Re-exported from src/shared/types.ts to keep the shared/live boundary clean
// while preserving the existing import path for all Phase 1 call sites.
export type { Metrics } from '../shared/types.js';
+153
View File
@@ -0,0 +1,153 @@
/**
* Wire protocol types and zod schemas for the Processor WebSocket live-broadcast
* endpoint. Inbound messages arrive from SPA clients; outbound messages are sent
* by the server.
*
* Spec: docs/wiki/synthesis/processor-ws-contract.md
*/
import { z } from 'zod';
// ---------------------------------------------------------------------------
// WebSocket close codes
// ---------------------------------------------------------------------------
/**
* Application-defined close codes used by the live server.
* Standard codes (1000, 1001) are defined by RFC 6455.
*/
export const WsCloseCodes = {
/** Normal server-initiated close. */
NORMAL: 1000,
/** Server is shutting down (SIGTERM). */
GOING_AWAY: 1001,
/** Back-pressure threshold exceeded — client too slow. */
POLICY_VIOLATION: 1008,
/** No auth cookie presented, or /users/me returned 401/403. */
UNAUTHORIZED: 4401,
/** User's authorization for a resource was revoked (Phase 3+, reserved). */
FORBIDDEN: 4403,
} as const;
// ---------------------------------------------------------------------------
// Inbound message schemas (Client → Server)
// ---------------------------------------------------------------------------
/**
* Zod schema for the `subscribe` message.
* `id` is an optional client-supplied correlation token echoed back in responses.
*/
const SubscribeSchema = z.object({
type: z.literal('subscribe'),
topic: z.string(),
id: z.string().optional(),
});
/**
* Zod schema for the `unsubscribe` message.
*/
const UnsubscribeSchema = z.object({
type: z.literal('unsubscribe'),
topic: z.string(),
id: z.string().optional(),
});
/**
* Discriminated union of all valid inbound message shapes.
* `parse()` throws ZodError on an unknown `type` or missing required fields.
*/
export const InboundMessage = z.discriminatedUnion('type', [
SubscribeSchema,
UnsubscribeSchema,
]);
export type InboundMessage = z.infer<typeof InboundMessage>;
// ---------------------------------------------------------------------------
// Outbound message types (Server → Client)
// ---------------------------------------------------------------------------
/**
* One entry in the snapshot array delivered with `subscribed`.
* Mirrors the streaming `position` message body (without `type` and `topic`).
* Fields that are absent for a given position are omitted (never `null`).
*/
export type PositionSnapshotEntry = {
readonly deviceId: string;
readonly lat: number;
readonly lon: number;
readonly ts: number; // epoch milliseconds
readonly speed?: number;
readonly course?: number;
readonly accuracy?: number;
readonly attributes?: Record<string, unknown>;
};
/**
* Server response confirming a successful subscription.
* Includes the initial snapshot so the SPA map is populated immediately.
*/
export type SubscribedMessage = {
readonly type: 'subscribed';
readonly topic: string;
readonly id?: string;
readonly snapshot: PositionSnapshotEntry[];
};
/**
* Server response confirming an unsubscription.
*/
export type UnsubscribedMessage = {
readonly type: 'unsubscribed';
readonly topic: string;
readonly id?: string;
};
/**
* A streaming position update pushed after subscription.
* Fields are omitted (not null) when the device did not report them.
*/
export type PositionMessage = {
readonly type: 'position';
readonly topic: string;
readonly deviceId: string;
readonly lat: number;
readonly lon: number;
readonly ts: number; // epoch milliseconds
readonly speed?: number;
readonly course?: number;
readonly accuracy?: number;
readonly attributes?: Record<string, unknown>;
};
/**
* Error codes in the `error` message.
* Extensible — the SPA should ignore unknown codes gracefully.
*/
export type ErrorCode =
| 'forbidden'
| 'not-found'
| 'unknown-topic'
| 'protocol-violation'
| 'not-implemented'
| 'rate-limited';
/**
* An error response from the server, scoped to a topic or connection-level.
*/
export type ErrorMessage = {
readonly type: 'error';
readonly topic?: string;
readonly id?: string;
readonly code: ErrorCode;
readonly message?: string;
};
/**
* Union of all valid outbound message shapes.
*/
export type OutboundMessage =
| SubscribedMessage
| UnsubscribedMessage
| PositionMessage
| ErrorMessage;
+318
View File
@@ -0,0 +1,318 @@
/**
* WebSocket live-broadcast server.
*
* Lifecycle: `createLiveServer()` → `server.start()` → clients connect →
* `server.stop()` drains existing connections and closes cleanly.
*
* Design notes:
* - Runs on its own http.Server (separate from the Phase 1 metrics/health server
* on :9090) so a proxy can route to different paths and failure modes don't
* entangle.
* - Auth happens in the `'upgrade'` handler (task 1.5.2). This scaffold accepts
* all upgrades and logs the connection.
* - Message dispatch is pluggable via the `onMessage` callback so tasks 1.5.2
* and 1.5.3 can attach the real auth/registry handler without touching this
* file's lifecycle logic.
* - Heartbeat: WS frame-level ping every LIVE_WS_PING_INTERVAL_MS; pong updates
* lastSeenAt. Do NOT use application-level ping messages — browser WS
* implementations handle frame-level pings natively.
*/
import * as http from 'node:http';
import * as crypto from 'node:crypto';
import { WebSocketServer, WebSocket } from 'ws';
import type { Logger } from 'pino';
import type { Config } from '../config/load.js';
import type { Metrics } from '../shared/types.js';
import { InboundMessage, WsCloseCodes } from './protocol.js';
import type { OutboundMessage } from './protocol.js';
// ---------------------------------------------------------------------------
// Public types
// ---------------------------------------------------------------------------
/**
* Per-connection identity object. Augmented in later tasks (auth adds `user`;
* task 1.5.3 adds `cookieHeader`). Exported so the registry, auth, and
* broadcast modules can reference the same type.
*/
export type LiveConnection = {
readonly id: string;
readonly ws: WebSocket;
readonly remoteAddr: string;
readonly openedAt: Date;
lastSeenAt: Date;
};
/**
* Message handler callback. The server calls this once per successfully parsed
* inbound message. The handler is responsible for sending replies.
*
* In task 1.5.1 this is a no-op stub that returns `error/not-implemented`.
* Tasks 1.5.2 and 1.5.3 replace it with the real auth+registry handler.
*/
export type MessageHandler = (
conn: LiveConnection,
message: InboundMessage,
) => Promise<void>;
/**
* Lifecycle handle returned by `createLiveServer`.
*/
export type LiveServer = {
/**
* Binds the HTTP server and begins accepting WebSocket upgrades.
* Resolves once the port is bound and listening.
*/
readonly start: () => Promise<void>;
/**
* Sends a close frame to every open connection, stops accepting new ones,
* and waits for all connections to drain (or force-terminates after
* `timeoutMs` milliseconds).
*/
readonly stop: (timeoutMs?: number) => Promise<void>;
};
// ---------------------------------------------------------------------------
// sendOutbound helper
// ---------------------------------------------------------------------------
/**
* Serialises and sends an outbound message to a single connection.
*
* - Returns immediately if the connection is not OPEN (avoids `send` errors
* on closing/closed sockets).
* - Checks `bufferedAmount` against the configured back-pressure threshold;
* closes the connection with 1008 if the client is too slow to drain its
* queue. This prevents one slow client from consuming unbounded server memory.
* - Increments the outbound message counter metric.
*/
export function sendOutbound(
conn: LiveConnection,
msg: OutboundMessage,
metrics: Metrics,
backpressureThresholdBytes: number,
): void {
if (conn.ws.readyState !== WebSocket.OPEN) return;
if (conn.ws.bufferedAmount > backpressureThresholdBytes) {
// Client's send queue is backed up. Closing is preferable to silently
// dropping messages, because it forces a reconnect and a fresh snapshot
// (which is always more valuable than a stale backlog).
conn.ws.close(
WsCloseCodes.POLICY_VIOLATION,
'back-pressure threshold exceeded',
);
return;
}
conn.ws.send(JSON.stringify(msg));
metrics.inc('processor_live_messages_outbound_total', { type: msg.type });
}
// ---------------------------------------------------------------------------
// Factory
// ---------------------------------------------------------------------------
export function createLiveServer(
config: Config,
logger: Logger,
metrics: Metrics,
onMessage: MessageHandler,
onClose?: (conn: LiveConnection) => void,
): LiveServer {
const connections = new Map<string, LiveConnection>();
const httpServer = http.createServer((_req, res) => {
// This HTTP server only handles WS upgrades. HTTP requests get a 404.
res.writeHead(404).end();
});
const wss = new WebSocketServer({ noServer: true });
// -------------------------------------------------------------------------
// Upgrade handler (auth injected in task 1.5.2; accepted immediately here)
// -------------------------------------------------------------------------
httpServer.on('upgrade', (req, socket, head) => {
wss.handleUpgrade(req, socket, head, (ws) => {
wss.emit('connection', ws, req);
});
});
// -------------------------------------------------------------------------
// Connection handler
// -------------------------------------------------------------------------
wss.on('connection', (ws, req: http.IncomingMessage) => {
const conn: LiveConnection = {
id: crypto.randomUUID(),
ws,
remoteAddr: req.socket.remoteAddress ?? 'unknown',
openedAt: new Date(),
lastSeenAt: new Date(),
};
connections.set(conn.id, conn);
metrics.observe('processor_live_connections', connections.size);
logger.debug(
{ connId: conn.id, remote: conn.remoteAddr },
'connection opened',
);
// -----------------------------------------------------------------------
// Inbound message handler
// -----------------------------------------------------------------------
ws.on('message', (data) => {
conn.lastSeenAt = new Date();
const raw = data.toString('utf8');
let parsed: InboundMessage;
try {
parsed = InboundMessage.parse(JSON.parse(raw));
} catch {
metrics.inc('processor_live_messages_inbound_total', {
type: 'invalid',
instance_id: config.INSTANCE_ID,
});
sendOutbound(
conn,
{
type: 'error',
code: 'protocol-violation',
message: 'Invalid message envelope',
},
metrics,
config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES,
);
return;
}
metrics.inc('processor_live_messages_inbound_total', {
type: parsed.type,
instance_id: config.INSTANCE_ID,
});
logger.debug(
{ connId: conn.id, type: parsed.type },
'inbound message',
);
onMessage(conn, parsed).catch((err: unknown) => {
logger.error({ connId: conn.id, err }, 'onMessage handler threw');
});
});
// -----------------------------------------------------------------------
// Pong handler — update liveness timestamp
// -----------------------------------------------------------------------
ws.on('pong', () => {
conn.lastSeenAt = new Date();
});
// -----------------------------------------------------------------------
// Close handler
// -----------------------------------------------------------------------
ws.on('close', (code, reason) => {
connections.delete(conn.id);
metrics.observe('processor_live_connections', connections.size);
logger.debug(
{
connId: conn.id,
code,
reason: reason.toString('utf8'),
},
'connection closed',
);
if (onClose) onClose(conn);
});
// -----------------------------------------------------------------------
// Error handler — prevent uncaught-exception crash on socket errors
// -----------------------------------------------------------------------
ws.on('error', (err) => {
logger.debug({ connId: conn.id, err }, 'connection error');
});
});
// -------------------------------------------------------------------------
// Heartbeat interval
// -------------------------------------------------------------------------
let pingTimer: ReturnType<typeof setInterval> | null = null;
function startHeartbeat(): void {
pingTimer = setInterval(() => {
for (const conn of connections.values()) {
if (conn.ws.readyState !== WebSocket.OPEN) continue;
conn.ws.ping();
}
}, config.LIVE_WS_PING_INTERVAL_MS);
// Do not hold the event loop open just for heartbeats during shutdown.
pingTimer.unref();
}
// -------------------------------------------------------------------------
// Lifecycle
// -------------------------------------------------------------------------
async function start(): Promise<void> {
logger.info(
{ host: config.LIVE_WS_HOST, port: config.LIVE_WS_PORT },
'live server starting',
);
await new Promise<void>((resolve, reject) => {
httpServer.once('error', reject);
httpServer.listen(config.LIVE_WS_PORT, config.LIVE_WS_HOST, () => {
httpServer.off('error', reject);
resolve();
});
});
startHeartbeat();
logger.info(
{ host: config.LIVE_WS_HOST, port: config.LIVE_WS_PORT },
'live server ready',
);
}
async function stop(timeoutMs = config.LIVE_WS_DRAIN_TIMEOUT_MS): Promise<void> {
logger.info('live server stopping');
// Stop heartbeat interval.
if (pingTimer !== null) {
clearInterval(pingTimer);
pingTimer = null;
}
// Stop accepting new connections.
httpServer.close();
// Send close frame to every open connection.
for (const conn of connections.values()) {
conn.ws.close(WsCloseCodes.GOING_AWAY, 'server shutting down');
}
// Wait for connections to drain, up to the timeout.
const deadline = Date.now() + timeoutMs;
while (connections.size > 0 && Date.now() < deadline) {
await new Promise<void>((resolve) => setTimeout(resolve, 50));
}
// Force-terminate any stragglers (e.g. client with slow TCP stack).
for (const conn of connections.values()) {
conn.ws.terminate();
}
logger.info('live server stopped');
}
return { start, stop };
}
+45 -6
View File
@@ -16,6 +16,9 @@ import { connectRedis, createConsumer } from './core/consumer.js';
import type { ConsumedRecord } from './core/consumer.js';
import { createDeviceStateStore } from './core/state.js';
import { createWriter } from './core/writer.js';
import { createLiveServer, sendOutbound } from './live/server.js';
import type { LiveServer, LiveConnection } from './live/server.js';
import type { InboundMessage } from './live/protocol.js';
// -------------------------------------------------------------------------
// Startup: validate config (fail fast on bad env), build logger
@@ -128,17 +131,41 @@ async function main(): Promise<void> {
return ackIds;
};
// 10. Build and start the consumer
// 10. Build the live WebSocket server (task 1.5.1).
// The stub message handler replies with `error/not-implemented` until
// tasks 1.5.2 and 1.5.3 wire in the real auth + registry handler.
const stubMessageHandler = async (
conn: LiveConnection,
_message: InboundMessage,
): Promise<void> => {
sendOutbound(
conn,
{ type: 'error', code: 'not-implemented' },
metrics,
config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES,
);
};
const liveServer: LiveServer = createLiveServer(
config,
logger,
metrics,
stubMessageHandler,
);
await liveServer.start();
// 11. Build and start the durable-write consumer
const consumer = createConsumer(redis, config, logger, metrics, sink);
await consumer.start();
// 11. Install graceful shutdown.
// Full Phase 3 hardening: explicit consumer-group commit on SIGTERM,
// uncaught-exception handler, multi-instance drain mode.
// 12. Install graceful shutdown.
// Shutdown order: live server first (no new connections), then
// broadcast consumer (task 1.5.4 adds this), then durable-write consumer.
installGracefulShutdown({
redis,
pool,
consumer,
liveServer,
metricsServer,
pgHealth,
lagSampler,
@@ -151,6 +178,7 @@ async function main(): Promise<void> {
group: config.REDIS_CONSUMER_GROUP,
consumer: config.REDIS_CONSUMER_NAME,
metricsPort: config.METRICS_PORT,
wsPort: config.LIVE_WS_PORT,
},
'processor ready',
);
@@ -164,6 +192,7 @@ type ShutdownDeps = {
readonly redis: Redis;
readonly pool: pg.Pool;
readonly consumer: { stop: () => Promise<void> };
readonly liveServer: LiveServer;
readonly metricsServer: http.Server;
readonly pgHealth: { stop: () => void };
readonly lagSampler: { stop: () => void };
@@ -171,7 +200,7 @@ type ShutdownDeps = {
};
function installGracefulShutdown(deps: ShutdownDeps): void {
const { redis, pool, consumer, metricsServer, pgHealth, lagSampler, logger: log } = deps;
const { redis, pool, consumer, liveServer, metricsServer, pgHealth, lagSampler, logger: log } = deps;
let shuttingDown = false;
@@ -187,8 +216,18 @@ function installGracefulShutdown(deps: ShutdownDeps): void {
lagSampler.stop();
pgHealth.stop();
consumer
// Shutdown order:
// 1. Live server — stop accepting new connections and drain existing ones
// first, so clients know the server is going away before the consumer
// stops processing.
// 2. Durable-write consumer — lets the in-flight batch finish.
// 3. Metrics server, Redis, Postgres.
liveServer
.stop()
.then(() => {
log.info('live server stopped');
return consumer.stop();
})
.then(() => {
log.info('consumer stopped');
return new Promise<void>((resolve, reject) =>
+200
View File
@@ -45,6 +45,23 @@ type InternalRegistry = {
readonly acksTotal: Counter;
readonly deviceStateSizeGauge: Gauge;
readonly deviceStateEvictionsTotal: Counter;
// Phase 1.5 — Live broadcast
readonly liveConnectionsGauge: Gauge;
readonly liveMessagesInboundTotal: Counter;
readonly liveMessagesOutboundTotal: Counter;
readonly liveAuthAttemptsTotal: Counter;
readonly liveAuthLatencyMs: Histogram;
readonly liveSubscriptionsGauge: Gauge;
readonly liveSubscribeAttemptsTotal: Counter;
readonly liveAuthzLatencyMs: Histogram;
readonly liveBroadcastRecordsTotal: Counter;
readonly liveBroadcastFanoutMessagesTotal: Counter;
readonly liveBroadcastOrphanRecordsTotal: Counter;
readonly liveBroadcastLagMs: Histogram;
readonly liveSnapshotQueryLatencyMs: Histogram;
readonly liveSnapshotSize: Histogram;
readonly liveDeviceEventRefreshLatencyMs: Histogram;
readonly liveDeviceEventEntries: Gauge;
};
// ---------------------------------------------------------------------------
@@ -376,6 +393,121 @@ function buildInternalRegistry(): InternalRegistry {
registers: [registry],
});
// -------------------------------------------------------------------------
// Phase 1.5 — Live broadcast metrics
// -------------------------------------------------------------------------
const liveConnectionsGauge = new Gauge({
name: 'processor_live_connections',
help: 'Current number of open WebSocket connections.',
labelNames: ['instance_id'],
registers: [registry],
});
const liveMessagesInboundTotal = new Counter({
name: 'processor_live_messages_inbound_total',
help: 'Inbound WS messages. type=subscribe|unsubscribe|invalid.',
labelNames: ['type', 'instance_id'],
registers: [registry],
});
const liveMessagesOutboundTotal = new Counter({
name: 'processor_live_messages_outbound_total',
help: 'Outbound WS messages. type=subscribed|unsubscribed|position|error.',
labelNames: ['type', 'instance_id'],
registers: [registry],
});
const liveAuthAttemptsTotal = new Counter({
name: 'processor_live_auth_attempts_total',
help: 'WS upgrade auth attempts. result=success|unauthorized|error.',
labelNames: ['result'],
registers: [registry],
});
const liveAuthLatencyMs = new Histogram({
name: 'processor_live_auth_latency_ms',
help: 'Latency of /users/me round-trip for WS upgrade auth.',
buckets: [5, 10, 25, 50, 100, 250, 500, 1000, 5000],
registers: [registry],
});
const liveSubscriptionsGauge = new Gauge({
name: 'processor_live_subscriptions',
help: 'Current total active topic subscriptions across all connections.',
labelNames: ['instance_id'],
registers: [registry],
});
const liveSubscribeAttemptsTotal = new Counter({
name: 'processor_live_subscribe_attempts_total',
help: 'Subscribe attempts. result=success|forbidden|not-found|unknown-topic|error.',
labelNames: ['result'],
registers: [registry],
});
const liveAuthzLatencyMs = new Histogram({
name: 'processor_live_authz_latency_ms',
help: 'Latency of /items/events/<id> round-trip for per-event authorization.',
buckets: [5, 10, 25, 50, 100, 250, 500, 1000, 5000],
registers: [registry],
});
const liveBroadcastRecordsTotal = new Counter({
name: 'processor_live_broadcast_records_total',
help: 'Records consumed by the broadcast consumer group.',
labelNames: ['instance_id'],
registers: [registry],
});
const liveBroadcastFanoutMessagesTotal = new Counter({
name: 'processor_live_broadcast_fanout_messages_total',
help: 'Outbound position frames sent via fan-out.',
labelNames: ['instance_id'],
registers: [registry],
});
const liveBroadcastOrphanRecordsTotal = new Counter({
name: 'processor_live_broadcast_orphan_records_total',
help: 'Records for devices not registered to any event (no fan-out).',
labelNames: ['instance_id'],
registers: [registry],
});
const liveBroadcastLagMs = new Histogram({
name: 'processor_live_broadcast_lag_ms',
help: 'End-to-end latency from record ts to fan-out send, in milliseconds.',
buckets: [5, 10, 25, 50, 100, 250, 500, 1000, 5000],
registers: [registry],
});
const liveSnapshotQueryLatencyMs = new Histogram({
name: 'processor_live_snapshot_query_latency_ms',
help: 'Latency of the snapshot-on-subscribe query.',
buckets: [5, 10, 25, 50, 100, 250, 500, 1000, 5000],
registers: [registry],
});
const liveSnapshotSize = new Histogram({
name: 'processor_live_snapshot_size',
help: 'Number of positions in each snapshot response.',
buckets: [0, 1, 5, 10, 25, 50, 100, 250, 500],
registers: [registry],
});
const liveDeviceEventRefreshLatencyMs = new Histogram({
name: 'processor_live_device_event_refresh_latency_ms',
help: 'Latency of device-event map refresh queries.',
buckets: [1, 5, 10, 25, 50, 100, 250, 500],
registers: [registry],
});
const liveDeviceEventEntries = new Gauge({
name: 'processor_live_device_event_entries',
help: 'Number of device→event mappings currently in the in-memory cache.',
registers: [registry],
});
return {
registry,
consumerReadsTotal,
@@ -387,6 +519,22 @@ function buildInternalRegistry(): InternalRegistry {
acksTotal,
deviceStateSizeGauge,
deviceStateEvictionsTotal,
liveConnectionsGauge,
liveMessagesInboundTotal,
liveMessagesOutboundTotal,
liveAuthAttemptsTotal,
liveAuthLatencyMs,
liveSubscriptionsGauge,
liveSubscribeAttemptsTotal,
liveAuthzLatencyMs,
liveBroadcastRecordsTotal,
liveBroadcastFanoutMessagesTotal,
liveBroadcastOrphanRecordsTotal,
liveBroadcastLagMs,
liveSnapshotQueryLatencyMs,
liveSnapshotSize,
liveDeviceEventRefreshLatencyMs,
liveDeviceEventEntries,
};
}
@@ -420,6 +568,30 @@ function dispatchInc(
case 'processor_device_state_evictions_total':
r.deviceStateEvictionsTotal.inc(v);
break;
// Phase 1.5 — Live broadcast (connections are set via observe, not inc)
case 'processor_live_messages_inbound_total':
r.liveMessagesInboundTotal.inc(labels ?? {}, v);
break;
case 'processor_live_messages_outbound_total':
r.liveMessagesOutboundTotal.inc(labels ?? {}, v);
break;
case 'processor_live_auth_attempts_total':
r.liveAuthAttemptsTotal.inc(labels ?? {}, v);
break;
// subscriptions gauge is set via observe (see dispatchObserve)
case 'processor_live_subscribe_attempts_total':
r.liveSubscribeAttemptsTotal.inc(labels ?? {}, v);
break;
case 'processor_live_broadcast_records_total':
r.liveBroadcastRecordsTotal.inc(labels ?? {}, v);
break;
case 'processor_live_broadcast_fanout_messages_total':
r.liveBroadcastFanoutMessagesTotal.inc(labels ?? {}, v);
break;
case 'processor_live_broadcast_orphan_records_total':
r.liveBroadcastOrphanRecordsTotal.inc(labels ?? {}, v);
break;
// device_event_entries gauge is set via observe (see dispatchObserve)
default:
// Unknown metric name — silently ignore. This preserves the contract
// that the Metrics interface never throws, and avoids crashing the
@@ -445,6 +617,34 @@ function dispatchObserve(
case 'processor_device_state_size':
r.deviceStateSizeGauge.set(value);
break;
// Phase 1.5 — Live broadcast
case 'processor_live_connections':
r.liveConnectionsGauge.set(value);
break;
case 'processor_live_auth_latency_ms':
r.liveAuthLatencyMs.observe(value);
break;
case 'processor_live_authz_latency_ms':
r.liveAuthzLatencyMs.observe(value);
break;
case 'processor_live_broadcast_lag_ms':
r.liveBroadcastLagMs.observe(value);
break;
case 'processor_live_snapshot_query_latency_ms':
r.liveSnapshotQueryLatencyMs.observe(value);
break;
case 'processor_live_snapshot_size':
r.liveSnapshotSize.observe(value);
break;
case 'processor_live_device_event_refresh_latency_ms':
r.liveDeviceEventRefreshLatencyMs.observe(value);
break;
case 'processor_live_subscriptions':
r.liveSubscriptionsGauge.set(value);
break;
case 'processor_live_device_event_entries':
r.liveDeviceEventEntries.set(value);
break;
default:
// Unknown metric name — silently ignore (see dispatchInc comment).
break;
+29
View File
@@ -0,0 +1,29 @@
/**
* Shared types used by both src/core/ and src/live/.
*
* Both modules need the `Metrics` interface for observability. Placing it here
* avoids an import across the enforced src/core/ ↔ src/live/ boundary.
*
* src/core/types.ts re-exports Metrics from here to preserve the existing
* import path for Phase 1 call sites.
*/
// ---------------------------------------------------------------------------
// Metrics
// ---------------------------------------------------------------------------
/**
* Minimal metrics interface exposed to pipeline and live-broadcast components.
*
* `inc` accepts an optional `value` for batched increments — counters that
* naturally arrive in groups should pass the count rather than calling `inc`
* N times. Defaults to 1.
*/
export type Metrics = {
readonly inc: (
name: string,
labels?: Record<string, string>,
value?: number,
) => void;
readonly observe: (name: string, value: number, labels?: Record<string, string>) => void;
};