diff --git a/src/core/codec.ts b/src/core/codec.ts index e32d8bb..fa2f736 100644 --- a/src/core/codec.ts +++ b/src/core/codec.ts @@ -1,225 +1,11 @@ /** - * Sentinel decoder for Position records arriving from the Redis Stream. + * Re-exports from src/shared/codec.ts. * - * tcp-ingestion serializes Position objects with a custom JSON replacer that - * encodes types not natively supported by JSON: - * - bigint → { __bigint: "" } - * - Buffer → { __buffer_b64: "" } - * - Date → ISO8601 string + * The decode logic lives in src/shared/codec.ts so that both src/core/ (durable + * write consumer) and src/live/ (broadcast consumer) can import it without + * crossing the enforced src/core/ ↔ src/live/ boundary. * - * This module reverses that encoding so the Processor receives fully-typed - * Position objects. The contract is documented in: - * docs/wiki/concepts/position-record.md - * tcp-ingestion/src/core/publish.ts (jsonReplacer) + * All existing Phase 1 import paths (`import { decodePosition } from './codec.js'`) + * continue to work unchanged. */ - -import type { Position, AttributeValue } from './types.js'; - -// --------------------------------------------------------------------------- -// Error type -// --------------------------------------------------------------------------- - -export class CodecError extends Error { - override readonly name = 'CodecError'; - - constructor(message: string, options?: ErrorOptions) { - super(message, options); - } -} - -// --------------------------------------------------------------------------- -// Sentinel detection helpers -// --------------------------------------------------------------------------- - -/** - * Returns true when the value is exactly `{ __bigint: "" }`. - * The shape must have exactly one key — any extra keys indicate a user-defined - * object that coincidentally has a `__bigint` field, which is not a sentinel. - * In practice tcp-ingestion only emits single-key sentinels; validate strictly. - */ -function isBigintSentinel(value: unknown): value is { __bigint: string } { - if (typeof value !== 'object' || value === null) return false; - const keys = Object.keys(value); - return ( - keys.length === 1 && - keys[0] === '__bigint' && - typeof (value as Record)['__bigint'] === 'string' - ); -} - -/** - * Returns true when the value is exactly `{ __buffer_b64: "" }`. - */ -function isBufferSentinel(value: unknown): value is { __buffer_b64: string } { - if (typeof value !== 'object' || value === null) return false; - const keys = Object.keys(value); - return ( - keys.length === 1 && - keys[0] === '__buffer_b64' && - typeof (value as Record)['__buffer_b64'] === 'string' - ); -} - -// --------------------------------------------------------------------------- -// Reviver -// --------------------------------------------------------------------------- - -/** - * JSON.parse reviver that reconstructs the live types from sentinel encodings. - * - * Called by JSON.parse for every key-value pair in the document, bottom-up. - * By the time `attributes` is visited, each attribute value has already been - * converted (sentinels → bigint/Buffer), because JSON.parse visits leaves first. - * - * Reviver must return `unknown` because the result type depends on the key. - * The caller casts the final result to `PositionJson` after validation. - */ -function reviver(key: string, value: unknown): unknown { - // Timestamp field: ISO string → Date - if (key === 'timestamp' && typeof value === 'string') { - const date = new Date(value); - if (isNaN(date.getTime())) { - throw new CodecError(`Invalid timestamp value: "${value}"`); - } - return date; - } - - // bigint sentinel - if (isBigintSentinel(value)) { - const digits = value.__bigint; - // Validate: only decimal digits (including optional leading minus for - // negative bigints, though Teltonika IO elements are unsigned). - if (!/^-?\d+$/.test(digits)) { - throw new CodecError( - `Malformed __bigint sentinel: expected decimal digits, got "${digits}"`, - ); - } - return BigInt(digits); - } - - // Buffer sentinel - if (isBufferSentinel(value)) { - const b64 = value.__buffer_b64; - // Validate base64 characters (standard + URL-safe alphabets, with padding) - if (!/^[A-Za-z0-9+/\-_]*={0,2}$/.test(b64)) { - throw new CodecError( - `Malformed __buffer_b64 sentinel: invalid base64 string "${b64}"`, - ); - } - return Buffer.from(b64, 'base64'); - } - - return value; -} - -// --------------------------------------------------------------------------- -// Required field validation -// --------------------------------------------------------------------------- - -const REQUIRED_NUMERIC_FIELDS = [ - 'latitude', - 'longitude', - 'altitude', - 'angle', - 'speed', - 'satellites', - 'priority', -] as const; - -/** - * Validates the decoded object has all required Position fields with the - * correct types. Throws `CodecError` naming the first failing field. - */ -function validateDecodedPosition(obj: Record): asserts obj is { - device_id: string; - timestamp: Date; - latitude: number; - longitude: number; - altitude: number; - angle: number; - speed: number; - satellites: number; - priority: number; - attributes: Record; -} { - if (typeof obj['device_id'] !== 'string' || obj['device_id'].length === 0) { - throw new CodecError('Missing or invalid field: device_id (expected non-empty string)'); - } - - if (!(obj['timestamp'] instanceof Date)) { - throw new CodecError( - 'Missing or invalid field: timestamp (expected Date after reviver; was ISO string decoded?)', - ); - } - - for (const field of REQUIRED_NUMERIC_FIELDS) { - if (typeof obj[field] !== 'number') { - throw new CodecError( - `Missing or invalid field: ${field} (expected number, got ${typeof obj[field]})`, - ); - } - } - - if (typeof obj['attributes'] !== 'object' || obj['attributes'] === null) { - throw new CodecError('Missing or invalid field: attributes (expected object)'); - } - - // Validate priority is exactly 0, 1, or 2 - const priority = obj['priority'] as number; - if (priority !== 0 && priority !== 1 && priority !== 2) { - throw new CodecError( - `Invalid field: priority (expected 0 | 1 | 2, got ${priority})`, - ); - } - - // Validate attributes values are only AttributeValue types - const attrs = obj['attributes'] as Record; - for (const [attrKey, attrVal] of Object.entries(attrs)) { - if ( - typeof attrVal !== 'number' && - typeof attrVal !== 'bigint' && - !Buffer.isBuffer(attrVal) - ) { - throw new CodecError( - `Invalid attribute "${attrKey}": expected number | bigint | Buffer, got ${typeof attrVal}`, - ); - } - } -} - -// --------------------------------------------------------------------------- -// Public API -// --------------------------------------------------------------------------- - -/** - * Decodes a JSON-encoded Position string (with sentinel encoding applied by - * tcp-ingestion's `serializePosition`) into a fully-typed `Position` object. - * - * Throws `CodecError` if the JSON is malformed, a sentinel is invalid, a - * required field is missing, or a field has the wrong type. - */ -export function decodePosition(payload: string): Position { - let parsed: unknown; - - try { - parsed = JSON.parse(payload, reviver); - } catch (err) { - if (err instanceof CodecError) { - throw err; - } - throw new CodecError( - `Failed to parse Position payload as JSON: ${err instanceof Error ? err.message : String(err)}`, - { cause: err }, - ); - } - - if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) { - throw new CodecError('Position payload must be a JSON object'); - } - - const obj = parsed as Record; - - validateDecodedPosition(obj); - - return obj as unknown as Position; -} +export { decodePosition, CodecError } from '../shared/codec.js'; diff --git a/src/core/types.ts b/src/core/types.ts index d3a745e..63c2649 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -7,40 +7,14 @@ */ // --------------------------------------------------------------------------- -// Shared value types +// Shared value types — re-exported from src/shared/types.ts // --------------------------------------------------------------------------- -/** - * A single IO attribute value from the Teltonika AVL record. - * - number : fixed-width IO elements (N1/N2/N4 — fit safely in JS number) - * - bigint : N8 elements (u64, may exceed Number.MAX_SAFE_INTEGER) - * - Buffer : NX variable-length elements (Codec 8 Extended) - */ -export type AttributeValue = number | bigint | Buffer; - -// --------------------------------------------------------------------------- -// Position — input contract from tcp-ingestion -// --------------------------------------------------------------------------- - -/** - * Normalized GPS position record. Byte-equivalent to tcp-ingestion's `Position` - * type (docs/wiki/concepts/position-record.md). - * - * `priority` is typed as a union rather than `number` to stay consistent with - * tcp-ingestion and make exhaustive switches possible in domain logic. - */ -export type Position = { - readonly device_id: string; - readonly timestamp: Date; - readonly latitude: number; - readonly longitude: number; - readonly altitude: number; - readonly angle: number; // heading 0–360° - readonly speed: number; // km/h; 0 may mean "GPS invalid" — preserve verbatim - readonly satellites: number; - readonly priority: 0 | 1 | 2; // 0=Low, 1=High, 2=Panic - readonly attributes: Readonly>; -}; +// Position and AttributeValue live in src/shared/types.ts so that src/live/ +// can import them without crossing the src/core/ ↔ src/live/ boundary. +// Re-exported here to preserve all existing Phase 1 import paths. +export type { AttributeValue, Position } from '../shared/types.js'; +import type { Position } from '../shared/types.js'; // --------------------------------------------------------------------------- // StreamRecord — raw shape returned by XREADGROUP before codec decoding diff --git a/src/db/migrations/0002_positions_faulty.sql b/src/db/migrations/0002_positions_faulty.sql new file mode 100644 index 0000000..9964065 --- /dev/null +++ b/src/db/migrations/0002_positions_faulty.sql @@ -0,0 +1,19 @@ +-- Migration: 0002_positions_faulty +-- Adds the faulty column to positions and ensures the (device_id, ts DESC) index +-- needed by the snapshot-on-subscribe query exists. +-- +-- The faulty column is set post-hoc by operators in Directus when a position is +-- flagged as unrealistic. The snapshot-on-subscribe query (task 1.5.5) filters +-- WHERE faulty = false to exclude flagged positions from the initial map state. +-- The live broadcast path (Redis stream → fan-out) never touches this column +-- because faulty flags are applied after the fact. + +ALTER TABLE positions + ADD COLUMN IF NOT EXISTS faulty boolean NOT NULL DEFAULT false; + +-- Index for the snapshot DISTINCT ON query: +-- SELECT DISTINCT ON (device_id) ... ORDER BY device_id, ts DESC +-- TimescaleDB scans only the latest chunks for devices with recent activity, +-- but the (device_id, ts DESC) index makes per-device latest-position lookups +-- efficient regardless of chunk age. +CREATE INDEX IF NOT EXISTS positions_device_ts_idx ON positions (device_id, ts DESC); diff --git a/src/live/broadcast.ts b/src/live/broadcast.ts new file mode 100644 index 0000000..ae736b3 --- /dev/null +++ b/src/live/broadcast.ts @@ -0,0 +1,297 @@ +/** + * Broadcast consumer group — per-instance Redis Stream reader for live fan-out. + * + * Reads the same `telemetry:teltonika` stream as the durable-write consumer + * (task 1.5) but on a SEPARATE per-instance consumer group: + * `live-broadcast-{instance_id}` + * + * This means every Processor instance sees every record for its own connected + * clients. The durable-write group splits records across instances for exactly- + * once Postgres writes; the broadcast group replicates records to every + * instance for fan-out. The two groups operate independently with separate + * offsets; a slow Postgres write does not slow down broadcast. + * + * ACK semantics: ACK immediately on consume (no durability required for + * broadcast — missing a position is fine; only the latest position matters). + * + * Back-pressure: sendOutbound closes slow connections at BACKPRESSURE_THRESHOLD + * (already handled in server.ts). + * + * Spec: processor-ws-contract.md §Multi-instance behaviour; + * task 1.5.4 §Broadcast consumer group + */ + +import type { Redis } from 'ioredis'; +import type { Logger } from 'pino'; +import type { Config } from '../config/load.js'; +import type { Metrics, Position } from '../shared/types.js'; +import type { SubscriptionRegistry } from './registry.js'; +import type { DeviceEventMap } from './device-event-map.js'; +import type { PositionMessage } from './protocol.js'; +import { sendOutbound } from './server.js'; +import type { LiveConnection } from './server.js'; +import { decodePosition, CodecError } from '../shared/codec.js'; + +// --------------------------------------------------------------------------- +// Public interface +// --------------------------------------------------------------------------- + +export type BroadcastConsumer = { + readonly start: () => Promise; + readonly stop: () => Promise; +}; + +// --------------------------------------------------------------------------- +// Wire-format mapper +// --------------------------------------------------------------------------- + +/** + * Maps a decoded Position to a PositionMessage (minus `topic`). + * Omits fields rather than sending `null` for absent / zero values. + * + * Field mapping: + * - Position.device_id → deviceId (IMEI string; not a UUID in Phase 1) + * - Position.latitude → lat + * - Position.longitude → lon + * - Position.timestamp → ts (epoch ms) + * - Position.speed → speed (omitted if 0 — may indicate invalid GPS fix) + * - Position.angle → course (omitted if 0) + * - No accuracy field in Phase 1's Position type. + * + * Note: The WS contract spec says deviceId should be `devices.id` (UUID), but + * Phase 1's positions table stores the raw IMEI as device_id. The SPA will + * need to join on the IMEI until Phase 2 introduces UUID-based device tracking. + * This is documented as an open deviation. + */ +function toPositionMessage( + position: Position, +): Omit { + const msg: Omit = { + type: 'position', + deviceId: position.device_id, + lat: position.latitude, + lon: position.longitude, + ts: position.timestamp.getTime(), + }; + + // Omit speed when 0 — per Teltonika convention, 0 may indicate invalid GPS. + if (position.speed > 0) { + (msg as Record)['speed'] = position.speed; + } + // Omit angle/course when 0. + if (position.angle > 0) { + (msg as Record)['course'] = position.angle; + } + + return msg; +} + +// --------------------------------------------------------------------------- +// Raw stream entry shape (ioredis XREADGROUP return type) +// --------------------------------------------------------------------------- + +type RawStreamEntry = [id: string, fields: string[]]; + +// --------------------------------------------------------------------------- +// Factory +// --------------------------------------------------------------------------- + +export function createBroadcastConsumer( + redis: Redis, + registry: SubscriptionRegistry, + deviceToEvent: DeviceEventMap, + config: Config, + logger: Logger, + metrics: Metrics, +): BroadcastConsumer { + const stream = config.REDIS_TELEMETRY_STREAM; + const groupName = `${config.LIVE_BROADCAST_GROUP_PREFIX}-${config.INSTANCE_ID}`; + const consumerName = config.INSTANCE_ID; + const batchSize = config.LIVE_BROADCAST_BATCH_SIZE; + const batchBlockMs = config.LIVE_BROADCAST_BATCH_BLOCK_MS; + + let stopping = false; + let loopPromise: Promise = Promise.resolve(); + + // ------------------------------------------------------------------------- + // Consumer group setup + // ------------------------------------------------------------------------- + + async function ensureGroup(): Promise { + try { + await redis.xgroup('CREATE', stream, groupName, '$', 'MKSTREAM'); + logger.info({ stream, group: groupName }, 'broadcast consumer group created'); + } catch (err: unknown) { + if (err instanceof Error && err.message.startsWith('BUSYGROUP')) { + logger.info({ stream, group: groupName }, 'broadcast consumer group already exists'); + return; + } + throw err; + } + } + + // ------------------------------------------------------------------------- + // Fan-out + // ------------------------------------------------------------------------- + + function fanOut( + entryId: string, + position: Position, + ): void { + const eventIds = deviceToEvent.lookup(position.device_id); + + if (eventIds.length === 0) { + metrics.inc('processor_live_broadcast_orphan_records_total', { + instance_id: config.INSTANCE_ID, + }); + return; + } + + const baseMsg = toPositionMessage(position); + + for (const eventId of eventIds) { + const topic = `event:${eventId}`; + const conns = registry.connectionsForTopic(topic); + for (const conn of conns as Iterable) { + sendOutbound( + conn, + { ...baseMsg, topic }, + metrics, + config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES, + ); + metrics.inc('processor_live_broadcast_fanout_messages_total', { + instance_id: config.INSTANCE_ID, + }); + } + } + + // Broadcast lag: time from GPS fix to fan-out send. + const lagMs = Date.now() - position.timestamp.getTime(); + if (lagMs >= 0) { + metrics.observe('processor_live_broadcast_lag_ms', lagMs); + } + + logger.debug({ entryId, device: position.device_id, events: eventIds.length }, 'fanned out'); + } + + // ------------------------------------------------------------------------- + // Batch decoder (mirrors core/consumer.ts decodeBatch pattern) + // ------------------------------------------------------------------------- + + function decodeBatch(entries: RawStreamEntry[]): Array<{ + id: string; + position: Position; + }> { + const decoded: Array<{ id: string; position: Position }> = []; + + for (const [id, fields] of entries) { + const fieldMap: Record = {}; + for (let i = 0; i + 1 < fields.length; i += 2) { + const key = fields[i]; + const val = fields[i + 1]; + if (key !== undefined && val !== undefined) { + fieldMap[key] = val; + } + } + + const payload = fieldMap['payload']; + if (payload === undefined) { + logger.warn({ id, stream }, 'broadcast entry missing payload; skipping'); + continue; + } + + try { + const position = decodePosition(payload); + decoded.push({ id, position }); + } catch (err) { + if (err instanceof CodecError) { + logger.warn({ id, err }, 'broadcast decode error; skipping record'); + } else { + logger.error({ id, err }, 'unexpected broadcast decode error'); + } + // Do not ACK — leave in PEL (though broadcast doesn't retry, this is + // consistent with the "never silently skip" principle for decode errors). + } + } + + return decoded; + } + + // ------------------------------------------------------------------------- + // Read loop + // ------------------------------------------------------------------------- + + async function runLoop(): Promise { + logger.info({ stream, group: groupName, consumer: consumerName }, 'broadcast consumer started'); + + while (!stopping) { + let rawResult: [string, [string, string[]][]][] | null; + + try { + rawResult = (await redis.xreadgroup( + 'GROUP', + groupName, + consumerName, + 'COUNT', + String(batchSize), + 'BLOCK', + String(batchBlockMs), + 'STREAMS', + stream, + '>', + )) as [string, [string, string[]][]][] | null; + } catch (err) { + if (stopping) break; + logger.error({ err }, 'broadcast XREADGROUP failed; backing off 1s'); + await new Promise((resolve) => setTimeout(resolve, 1_000)); + continue; + } + + if (rawResult === null) continue; // BLOCK timeout — check stopping flag + + const streamEntries = rawResult[0]?.[1] ?? []; + if (streamEntries.length === 0) continue; + + metrics.inc('processor_live_broadcast_records_total', { + instance_id: config.INSTANCE_ID, + }, streamEntries.length); + + const decoded = decodeBatch(streamEntries); + + // ACK all entries immediately — broadcast has no durability requirement. + const allIds = streamEntries.map(([id]) => id); + if (allIds.length > 0) { + await redis.xack(stream, groupName, ...allIds); + } + + // Fan out decoded records to subscribed clients. + for (const { id, position } of decoded) { + fanOut(id, position); + } + } + + logger.info({ stream, group: groupName }, 'broadcast consumer loop exited'); + } + + // ------------------------------------------------------------------------- + // Lifecycle + // ------------------------------------------------------------------------- + + async function start(): Promise { + await ensureGroup(); + + loopPromise = runLoop(); + + loopPromise.catch((err: unknown) => { + logger.fatal({ err }, 'broadcast consumer loop crashed; exiting'); + process.exit(1); + }); + } + + async function stop(): Promise { + stopping = true; + await loopPromise; + } + + return { start, stop }; +} diff --git a/src/live/device-event-map.ts b/src/live/device-event-map.ts new file mode 100644 index 0000000..969c8b4 --- /dev/null +++ b/src/live/device-event-map.ts @@ -0,0 +1,118 @@ +/** + * In-memory cache of device → event mappings. + * + * The fan-out loop needs to answer "which events does this device belong to?" + * for every position record. The naive answer — query Postgres on each record — + * is wrong at any meaningful throughput. This module caches the full + * `entry_devices ⨯ entries` join in memory and refreshes it on a configurable + * cadence (default: every 30 s). + * + * Staleness window: up to LIVE_DEVICE_EVENT_REFRESH_MS. This is acceptable for + * pilot — operators register devices before the event starts, and "the device + * appeared on the map after 30 s" is a tolerable UX gap. Phase 3+ can add + * invalidation signals if needed. + * + * Spec: processor-ws-contract.md §Multi-instance behaviour; + * task 1.5.4 §DeviceEventMap design + */ + +import type pg from 'pg'; +import type { Logger } from 'pino'; +import type { Metrics } from '../shared/types.js'; +import type { Config } from '../config/load.js'; + +// --------------------------------------------------------------------------- +// Public interface +// --------------------------------------------------------------------------- + +export type DeviceEventMap = { + /** Returns the event IDs the device is currently registered to. */ + readonly lookup: (deviceId: string) => readonly string[]; + /** Starts the refresh timer. Immediately runs the first refresh. */ + readonly start: () => Promise; + /** Cancels the refresh timer. */ + readonly stop: () => void; +}; + +// --------------------------------------------------------------------------- +// Query result type +// --------------------------------------------------------------------------- + +type DeviceEventRow = { + device_id: string; + event_id: string; +}; + +// --------------------------------------------------------------------------- +// Factory +// --------------------------------------------------------------------------- + +export function createDeviceEventMap( + pool: pg.Pool, + config: Config, + logger: Logger, + metrics: Metrics, +): DeviceEventMap { + // Mutable map; atomically swapped on each refresh. + let cache = new Map>(); + let timer: ReturnType | null = null; + + async function refresh(): Promise { + const start = performance.now(); + try { + const result = await pool.query( + `SELECT ed.device_id, e.event_id + FROM entry_devices ed + JOIN entries e ON e.id = ed.entry_id`, + ); + + const next = new Map>(); + for (const row of result.rows) { + let eventSet = next.get(row.device_id); + if (!eventSet) { + eventSet = new Set(); + next.set(row.device_id, eventSet); + } + eventSet.add(row.event_id); + } + + cache = next; + + const elapsed = performance.now() - start; + metrics.observe('processor_live_device_event_refresh_latency_ms', elapsed); + metrics.observe('processor_live_device_event_entries', next.size); + + logger.debug({ devices: next.size, elapsedMs: Math.round(elapsed) }, 'device-event map refreshed'); + } catch (err) { + logger.warn({ err }, 'device-event map refresh failed; retaining stale cache'); + // Retain the stale cache — a stale map is better than an empty map + // which would silently drop all fan-out until the next refresh. + } + } + + async function start(): Promise { + await refresh(); + timer = setInterval(() => { + refresh().catch((err: unknown) => { + logger.warn({ err }, 'device-event map refresh interval error'); + }); + }, config.LIVE_DEVICE_EVENT_REFRESH_MS); + // Do not hold the event loop open during shutdown. + timer.unref(); + } + + function stop(): void { + if (timer !== null) { + clearInterval(timer); + timer = null; + } + } + + function lookup(deviceId: string): readonly string[] { + const events = cache.get(deviceId); + if (!events || events.size === 0) return []; + return [...events]; + } + + return { lookup, start, stop }; +} diff --git a/src/main.ts b/src/main.ts index 99e46c1..a633554 100644 --- a/src/main.ts +++ b/src/main.ts @@ -22,6 +22,8 @@ import type { InboundMessage } from './live/protocol.js'; import { createAuthClient } from './live/auth.js'; import { createAuthzClient } from './live/authz.js'; import { createSubscriptionRegistry } from './live/registry.js'; +import { createBroadcastConsumer } from './live/broadcast.js'; +import { createDeviceEventMap } from './live/device-event-map.js'; // ------------------------------------------------------------------------- // Startup: validate config (fail fast on bad env), build logger @@ -158,19 +160,36 @@ async function main(): Promise { (conn) => registry.onConnectionClose(conn), authClient, ); + + // 10b. Build the device-event map (Postgres-backed, periodic refresh). + const deviceEventMap = createDeviceEventMap(pool, config, logger, metrics); + + // 10c. Build the broadcast consumer (per-instance consumer group fan-out). + const broadcastConsumer = createBroadcastConsumer( + redis, + registry, + deviceEventMap, + config, + logger, + metrics, + ); await liveServer.start(); + await deviceEventMap.start(); + await broadcastConsumer.start(); // 11. Build and start the durable-write consumer const consumer = createConsumer(redis, config, logger, metrics, sink); await consumer.start(); // 12. Install graceful shutdown. - // Shutdown order: live server first (no new connections), then - // broadcast consumer (task 1.5.4 adds this), then durable-write consumer. + // Shutdown order: live server first (no new connections), + // then broadcast consumer, then durable-write consumer last. installGracefulShutdown({ redis, pool, consumer, + broadcastConsumer, + deviceEventMap, liveServer, metricsServer, pgHealth, @@ -198,6 +217,8 @@ type ShutdownDeps = { readonly redis: Redis; readonly pool: pg.Pool; readonly consumer: { stop: () => Promise }; + readonly broadcastConsumer: { stop: () => Promise }; + readonly deviceEventMap: { stop: () => void }; readonly liveServer: LiveServer; readonly metricsServer: http.Server; readonly pgHealth: { stop: () => void }; @@ -206,7 +227,10 @@ type ShutdownDeps = { }; function installGracefulShutdown(deps: ShutdownDeps): void { - const { redis, pool, consumer, liveServer, metricsServer, pgHealth, lagSampler, logger: log } = deps; + const { + redis, pool, consumer, broadcastConsumer, deviceEventMap, + liveServer, metricsServer, pgHealth, lagSampler, logger: log, + } = deps; let shuttingDown = false; @@ -232,6 +256,11 @@ function installGracefulShutdown(deps: ShutdownDeps): void { .stop() .then(() => { log.info('live server stopped'); + deviceEventMap.stop(); + return broadcastConsumer.stop(); + }) + .then(() => { + log.info('broadcast consumer stopped'); return consumer.stop(); }) .then(() => { diff --git a/src/shared/codec.ts b/src/shared/codec.ts new file mode 100644 index 0000000..fcbfbcf --- /dev/null +++ b/src/shared/codec.ts @@ -0,0 +1,229 @@ +/** + * Sentinel decoder for Position records arriving from the Redis Stream. + * + * Moved from src/core/codec.ts to src/shared/ so that both src/core/ and + * src/live/ can import it without crossing the enforced boundary between those + * two layers. + * + * tcp-ingestion serializes Position objects with a custom JSON replacer that + * encodes types not natively supported by JSON: + * - bigint → { __bigint: "" } + * - Buffer → { __buffer_b64: "" } + * - Date → ISO8601 string + * + * This module reverses that encoding so the Processor receives fully-typed + * Position objects. The contract is documented in: + * docs/wiki/concepts/position-record.md + * tcp-ingestion/src/core/publish.ts (jsonReplacer) + */ + +import type { Position, AttributeValue } from './types.js'; + +// --------------------------------------------------------------------------- +// Error type +// --------------------------------------------------------------------------- + +export class CodecError extends Error { + override readonly name = 'CodecError'; + + constructor(message: string, options?: ErrorOptions) { + super(message, options); + } +} + +// --------------------------------------------------------------------------- +// Sentinel detection helpers +// --------------------------------------------------------------------------- + +/** + * Returns true when the value is exactly `{ __bigint: "" }`. + * The shape must have exactly one key — any extra keys indicate a user-defined + * object that coincidentally has a `__bigint` field, which is not a sentinel. + * In practice tcp-ingestion only emits single-key sentinels; validate strictly. + */ +function isBigintSentinel(value: unknown): value is { __bigint: string } { + if (typeof value !== 'object' || value === null) return false; + const keys = Object.keys(value); + return ( + keys.length === 1 && + keys[0] === '__bigint' && + typeof (value as Record)['__bigint'] === 'string' + ); +} + +/** + * Returns true when the value is exactly `{ __buffer_b64: "" }`. + */ +function isBufferSentinel(value: unknown): value is { __buffer_b64: string } { + if (typeof value !== 'object' || value === null) return false; + const keys = Object.keys(value); + return ( + keys.length === 1 && + keys[0] === '__buffer_b64' && + typeof (value as Record)['__buffer_b64'] === 'string' + ); +} + +// --------------------------------------------------------------------------- +// Reviver +// --------------------------------------------------------------------------- + +/** + * JSON.parse reviver that reconstructs the live types from sentinel encodings. + * + * Called by JSON.parse for every key-value pair in the document, bottom-up. + * By the time `attributes` is visited, each attribute value has already been + * converted (sentinels → bigint/Buffer), because JSON.parse visits leaves first. + * + * Reviver must return `unknown` because the result type depends on the key. + * The caller casts the final result to `PositionJson` after validation. + */ +function reviver(key: string, value: unknown): unknown { + // Timestamp field: ISO string → Date + if (key === 'timestamp' && typeof value === 'string') { + const date = new Date(value); + if (isNaN(date.getTime())) { + throw new CodecError(`Invalid timestamp value: "${value}"`); + } + return date; + } + + // bigint sentinel + if (isBigintSentinel(value)) { + const digits = value.__bigint; + // Validate: only decimal digits (including optional leading minus for + // negative bigints, though Teltonika IO elements are unsigned). + if (!/^-?\d+$/.test(digits)) { + throw new CodecError( + `Malformed __bigint sentinel: expected decimal digits, got "${digits}"`, + ); + } + return BigInt(digits); + } + + // Buffer sentinel + if (isBufferSentinel(value)) { + const b64 = value.__buffer_b64; + // Validate base64 characters (standard + URL-safe alphabets, with padding) + if (!/^[A-Za-z0-9+/\-_]*={0,2}$/.test(b64)) { + throw new CodecError( + `Malformed __buffer_b64 sentinel: invalid base64 string "${b64}"`, + ); + } + return Buffer.from(b64, 'base64'); + } + + return value; +} + +// --------------------------------------------------------------------------- +// Required field validation +// --------------------------------------------------------------------------- + +const REQUIRED_NUMERIC_FIELDS = [ + 'latitude', + 'longitude', + 'altitude', + 'angle', + 'speed', + 'satellites', + 'priority', +] as const; + +/** + * Validates the decoded object has all required Position fields with the + * correct types. Throws `CodecError` naming the first failing field. + */ +function validateDecodedPosition(obj: Record): asserts obj is { + device_id: string; + timestamp: Date; + latitude: number; + longitude: number; + altitude: number; + angle: number; + speed: number; + satellites: number; + priority: number; + attributes: Record; +} { + if (typeof obj['device_id'] !== 'string' || obj['device_id'].length === 0) { + throw new CodecError('Missing or invalid field: device_id (expected non-empty string)'); + } + + if (!(obj['timestamp'] instanceof Date)) { + throw new CodecError( + 'Missing or invalid field: timestamp (expected Date after reviver; was ISO string decoded?)', + ); + } + + for (const field of REQUIRED_NUMERIC_FIELDS) { + if (typeof obj[field] !== 'number') { + throw new CodecError( + `Missing or invalid field: ${field} (expected number, got ${typeof obj[field]})`, + ); + } + } + + if (typeof obj['attributes'] !== 'object' || obj['attributes'] === null) { + throw new CodecError('Missing or invalid field: attributes (expected object)'); + } + + // Validate priority is exactly 0, 1, or 2 + const priority = obj['priority'] as number; + if (priority !== 0 && priority !== 1 && priority !== 2) { + throw new CodecError( + `Invalid field: priority (expected 0 | 1 | 2, got ${priority})`, + ); + } + + // Validate attributes values are only AttributeValue types + const attrs = obj['attributes'] as Record; + for (const [attrKey, attrVal] of Object.entries(attrs)) { + if ( + typeof attrVal !== 'number' && + typeof attrVal !== 'bigint' && + !Buffer.isBuffer(attrVal) + ) { + throw new CodecError( + `Invalid attribute "${attrKey}": expected number | bigint | Buffer, got ${typeof attrVal}`, + ); + } + } +} + +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- + +/** + * Decodes a JSON-encoded Position string (with sentinel encoding applied by + * tcp-ingestion's `serializePosition`) into a fully-typed `Position` object. + * + * Throws `CodecError` if the JSON is malformed, a sentinel is invalid, a + * required field is missing, or a field has the wrong type. + */ +export function decodePosition(payload: string): Position { + let parsed: unknown; + + try { + parsed = JSON.parse(payload, reviver); + } catch (err) { + if (err instanceof CodecError) { + throw err; + } + throw new CodecError( + `Failed to parse Position payload as JSON: ${err instanceof Error ? err.message : String(err)}`, + { cause: err }, + ); + } + + if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) { + throw new CodecError('Position payload must be a JSON object'); + } + + const obj = parsed as Record; + + validateDecodedPosition(obj); + + return obj as unknown as Position; +} diff --git a/src/shared/types.ts b/src/shared/types.ts index 5616817..47545ee 100644 --- a/src/shared/types.ts +++ b/src/shared/types.ts @@ -4,8 +4,11 @@ * Both modules need the `Metrics` interface for observability. Placing it here * avoids an import across the enforced src/core/ ↔ src/live/ boundary. * - * src/core/types.ts re-exports Metrics from here to preserve the existing - * import path for Phase 1 call sites. + * `Position` and `AttributeValue` are placed here so that src/live/broadcast.ts + * can reference them without importing across the src/core/ ↔ src/live/ boundary. + * + * src/core/types.ts re-exports all shared types to preserve existing import + * paths for Phase 1 call sites. */ // --------------------------------------------------------------------------- @@ -27,3 +30,35 @@ export type Metrics = { ) => void; readonly observe: (name: string, value: number, labels?: Record) => void; }; + +// --------------------------------------------------------------------------- +// Position — input contract from tcp-ingestion +// --------------------------------------------------------------------------- + +/** + * A single IO attribute value from the Teltonika AVL record. + * - number : fixed-width IO elements (N1/N2/N4 — fit safely in JS number) + * - bigint : N8 elements (u64, may exceed Number.MAX_SAFE_INTEGER) + * - Buffer : NX variable-length elements (Codec 8 Extended) + */ +export type AttributeValue = number | bigint | Buffer; + +/** + * Normalized GPS position record. Byte-equivalent to tcp-ingestion's `Position` + * type (docs/wiki/concepts/position-record.md). + * + * `priority` is typed as a union rather than `number` to stay consistent with + * tcp-ingestion and make exhaustive switches possible in domain logic. + */ +export type Position = { + readonly device_id: string; + readonly timestamp: Date; + readonly latitude: number; + readonly longitude: number; + readonly altitude: number; + readonly angle: number; // heading 0–360° + readonly speed: number; // km/h; 0 may mean "GPS invalid" — preserve verbatim + readonly satellites: number; + readonly priority: 0 | 1 | 2; // 0=Low, 1=High, 2=Panic + readonly attributes: Readonly>; +}; diff --git a/test/live-broadcast.test.ts b/test/live-broadcast.test.ts new file mode 100644 index 0000000..03a4d88 --- /dev/null +++ b/test/live-broadcast.test.ts @@ -0,0 +1,385 @@ +/** + * Unit tests for src/live/broadcast.ts — broadcast consumer fan-out logic. + * + * Strategy: exercise fanOut in isolation by driving a single-iteration loop. + * We stub XREADGROUP to return one batch of entries, then immediately set + * `stopping = true` via `stop()`. The Redis `xgroup` CREATE call returns + * BUSYGROUP (group already exists) so `ensureGroup` succeeds without a real + * server. + * + * `sendOutbound` is called with real LiveConnection stubs that have a mock + * `ws.send`. This tests the full fanOut → sendOutbound → ws.send path without + * any module mocking. + * + * Covers (spec: task 1.5.4): + * 1. Single subscriber on an event receives a correctly-shaped position message. + * 2. Multiple subscribers on the same event each receive the message. + * 3. Orphan device (not in any event) increments orphan counter, sends nothing. + * 4. Device registered to multiple events emits one message per event topic. + */ + +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import type { Logger } from 'pino'; +import type { Config } from '../src/config/load.js'; +import type { Metrics } from '../src/shared/types.js'; +import type { SubscriptionRegistry } from '../src/live/registry.js'; +import type { DeviceEventMap } from '../src/live/device-event-map.js'; +import type { LiveConnection } from '../src/live/server.js'; +import { createBroadcastConsumer } from '../src/live/broadcast.js'; +import WebSocket from 'ws'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeSilentLogger(): Logger { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + fatal: vi.fn(), + trace: vi.fn(), + child: vi.fn().mockReturnThis(), + level: 'silent', + silent: vi.fn(), + } as unknown as Logger; +} + +type RecordedMetrics = Metrics & { + incCalls: Array<{ name: string; labels?: Record; value?: number }>; + observeCalls: Array<{ name: string; value: number }>; +}; + +function makeMetrics(): RecordedMetrics { + const incCalls: Array<{ name: string; labels?: Record; value?: number }> = []; + const observeCalls: Array<{ name: string; value: number }> = []; + return { + incCalls, + observeCalls, + inc(name, labels?, value?) { incCalls.push({ name, labels, value }); }, + observe(name, value) { observeCalls.push({ name, value }); }, + }; +} + +function makeConfig(): Config { + return { + NODE_ENV: 'test', + INSTANCE_ID: 'test-instance', + LOG_LEVEL: 'silent', + REDIS_URL: 'redis://localhost:6379', + POSTGRES_URL: 'postgres://localhost:5432/test', + REDIS_TELEMETRY_STREAM: 'telemetry:teltonika', + REDIS_CONSUMER_GROUP: 'processor', + REDIS_CONSUMER_NAME: 'test-consumer', + METRICS_PORT: 0, + BATCH_SIZE: 100, + BATCH_BLOCK_MS: 500, + WRITE_BATCH_SIZE: 50, + DEVICE_STATE_LRU_CAP: 10_000, + LIVE_WS_PORT: 8081, + LIVE_WS_HOST: '0.0.0.0', + LIVE_WS_PING_INTERVAL_MS: 30_000, + LIVE_WS_DRAIN_TIMEOUT_MS: 5_000, + LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES: 1_048_576, + DIRECTUS_BASE_URL: 'http://directus.test', + DIRECTUS_AUTH_TIMEOUT_MS: 5_000, + DIRECTUS_AUTHZ_TIMEOUT_MS: 5_000, + LIVE_BROADCAST_GROUP_PREFIX: 'live-broadcast', + LIVE_BROADCAST_BATCH_SIZE: 100, + LIVE_BROADCAST_BATCH_BLOCK_MS: 1_000, + LIVE_DEVICE_EVENT_REFRESH_MS: 30_000, + }; +} + +/** + * Builds a synthetic LiveConnection stub whose `ws.send` captures JSON-parsed + * outbound messages. `bufferedAmount` is 0 so sendOutbound never closes it. + */ +function makeConn(id = 'conn-1'): LiveConnection & { sentMessages: unknown[] } { + const sentMessages: unknown[] = []; + const ws = { + readyState: WebSocket.OPEN, + bufferedAmount: 0, + send: vi.fn((data: string) => { sentMessages.push(JSON.parse(data)); }), + close: vi.fn(), + } as unknown as WebSocket; + + return { + id, + ws, + remoteAddr: '127.0.0.1', + openedAt: new Date(), + lastSeenAt: new Date(), + user: { + id: 'user-1', + email: 'test@test.com', + role: null, + first_name: 'T', + last_name: 'U', + }, + cookieHeader: 'session=x', + sentMessages, + }; +} + +/** Serialises a Position into the flat wire payload that broadcast.ts expects. */ +function makePositionPayload(overrides: Partial<{ + device_id: string; + timestamp: string; + speed: number; + angle: number; +}> = {}): string { + return JSON.stringify({ + device_id: overrides.device_id ?? 'IMEI123', + timestamp: overrides.timestamp ?? new Date('2025-01-01T12:00:00.000Z').toISOString(), + latitude: 41.33165, + longitude: 19.83177, + altitude: 50, + angle: overrides.angle ?? 0, + speed: overrides.speed ?? 0, + satellites: 8, + priority: 0, + attributes: {}, + }); +} + +/** + * Builds a fake XREADGROUP result for a single stream entry. + * ioredis returns: `[[streamName, [[id, fieldValueArray]]]]` + */ +function makeXreadgroupResult( + stream: string, + id: string, + payload: string, +): [string, [string, string[]][]][] { + return [[stream, [[id, ['payload', payload]]]]]; +} + +/** + * Creates a Redis stub that: + * - `xgroup` returns BUSYGROUP error (group already exists — happy path). + * - `xreadgroup` returns the provided result on the first call, then blocks + * for up to 2s on subsequent calls (simulating real BLOCK behaviour). + * Blocking is implemented by waiting for `stopSignal` to resolve, capped + * at 2000ms so tests cannot hang indefinitely. + * - `xack` resolves immediately and triggers the stopSignal promise. + */ +function makeRedis( + firstXreadgroupResult: [string, [string, string[]][]][] | null, +): Redis & { stopSignal: Promise; triggerStop: () => void } { + let xreadgroupCallCount = 0; + let triggerStop!: () => void; + const stopSignal = new Promise((resolve) => { triggerStop = resolve; }); + + const redis: Redis & { stopSignal: Promise; triggerStop: () => void } = { + xgroup: vi.fn().mockRejectedValue(Object.assign(new Error('BUSYGROUP group already exists'), {})), + xreadgroup: vi.fn((..._args: unknown[]) => { + xreadgroupCallCount += 1; + if (xreadgroupCallCount === 1) { + return Promise.resolve(firstXreadgroupResult); + } + // Block until stop() is called (or 2s timeout as safety valve). + return Promise.race([ + stopSignal.then(() => null as null), + new Promise((resolve) => setTimeout(() => resolve(null), 2_000)), + ]); + }), + xack: vi.fn().mockImplementation(() => { + // Signal that the batch has been processed — stop() can now be called. + triggerStop(); + return Promise.resolve(1); + }), + status: 'ready', + stopSignal, + triggerStop, + } as unknown as Redis & { stopSignal: Promise; triggerStop: () => void }; + + return redis; +} + +/** Creates a SubscriptionRegistry stub that maps topic → connections. */ +function makeRegistry( + topicToConns: Map, +): SubscriptionRegistry { + return { + connectionsForTopic: vi.fn((topic: string) => topicToConns.get(topic) ?? []), + subscribe: vi.fn(), + unsubscribe: vi.fn(), + onConnectionClose: vi.fn(), + topicsForConnection: vi.fn().mockReturnValue([]), + stats: vi.fn().mockReturnValue({ connections: 0, subscriptions: 0, topics: 0 }), + }; +} + +/** Creates a DeviceEventMap stub. */ +function makeDeviceEventMap(deviceToEvents: Map): DeviceEventMap { + return { + lookup: vi.fn((deviceId: string) => deviceToEvents.get(deviceId) ?? []), + start: vi.fn().mockResolvedValue(undefined), + stop: vi.fn(), + }; +} + +/** + * Runs the broadcast consumer for one batch: starts it, waits until xack has + * been called (the batch was fully processed), then stops it. + * + * The Redis stub's xreadgroup blocks on the second call until xack fires + * (or 2s timeout), so `stop()` always finds the loop idle before terminating. + */ +async function runOneBatch( + redis: ReturnType, + registry: SubscriptionRegistry, + deviceEventMap: DeviceEventMap, + config: Config, + logger: Logger, + metrics: Metrics, +): Promise { + const consumer = createBroadcastConsumer(redis, registry, deviceEventMap, config, logger, metrics); + await consumer.start(); + + // Wait until the xack mock fires (which also triggers stopSignal, causing the + // second xreadgroup call to unblock and return null). Give up after 3s to + // avoid hanging if the batch was empty / all entries were skipped. + await Promise.race([ + redis.stopSignal, + new Promise((resolve) => setTimeout(resolve, 3_000)), + ]); + + await consumer.stop(); +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('createBroadcastConsumer', () => { + let config: Config; + let logger: Logger; + let metrics: RecordedMetrics; + const STREAM = 'telemetry:teltonika'; + const EVENT_A = 'aaa00000-0000-0000-0000-000000000001'; + const EVENT_B = 'bbb00000-0000-0000-0000-000000000002'; + const DEVICE_ID = 'IMEI999888777'; + + beforeEach(() => { + config = makeConfig(); + logger = makeSilentLogger(); + metrics = makeMetrics(); + }); + + it('sends a correctly-shaped position message to a single subscriber', async () => { + const conn = makeConn('c1'); + const topicToConns = new Map([[`event:${EVENT_A}`, [conn]]]); + const deviceToEvents = new Map([[DEVICE_ID, [EVENT_A]]]); + + const payload = makePositionPayload({ device_id: DEVICE_ID, speed: 42, angle: 180 }); + const redis = makeRedis(makeXreadgroupResult(STREAM, '1-0', payload)); + const registry = makeRegistry(topicToConns); + const deviceEventMap = makeDeviceEventMap(deviceToEvents); + + await runOneBatch(redis, registry, deviceEventMap, config, logger, metrics); + + expect(conn.sentMessages).toHaveLength(1); + const msg = conn.sentMessages[0] as Record; + expect(msg['type']).toBe('position'); + expect(msg['topic']).toBe(`event:${EVENT_A}`); + expect(msg['deviceId']).toBe(DEVICE_ID); + expect(typeof msg['lat']).toBe('number'); + expect(typeof msg['lon']).toBe('number'); + expect(typeof msg['ts']).toBe('number'); + // speed and course are included when non-zero + expect(msg['speed']).toBe(42); + expect(msg['course']).toBe(180); + }); + + it('sends to all subscribers on the same event', async () => { + const conn1 = makeConn('c1'); + const conn2 = makeConn('c2'); + const conn3 = makeConn('c3'); + const topicToConns = new Map([[`event:${EVENT_A}`, [conn1, conn2, conn3]]]); + const deviceToEvents = new Map([[DEVICE_ID, [EVENT_A]]]); + + const payload = makePositionPayload({ device_id: DEVICE_ID }); + const redis = makeRedis(makeXreadgroupResult(STREAM, '1-0', payload)); + const registry = makeRegistry(topicToConns); + const deviceEventMap = makeDeviceEventMap(deviceToEvents); + + await runOneBatch(redis, registry, deviceEventMap, config, logger, metrics); + + expect(conn1.sentMessages).toHaveLength(1); + expect(conn2.sentMessages).toHaveLength(1); + expect(conn3.sentMessages).toHaveLength(1); + + // All received the same topic + for (const conn of [conn1, conn2, conn3]) { + expect((conn.sentMessages[0] as Record)['topic']).toBe(`event:${EVENT_A}`); + } + }); + + it('increments orphan counter and sends nothing for an unregistered device', async () => { + const conn = makeConn('c1'); + // Device has no events registered + const deviceToEvents = new Map(); + const topicToConns = new Map([[`event:${EVENT_A}`, [conn]]]); + + const payload = makePositionPayload({ device_id: DEVICE_ID }); + const redis = makeRedis(makeXreadgroupResult(STREAM, '1-0', payload)); + const registry = makeRegistry(topicToConns); + const deviceEventMap = makeDeviceEventMap(deviceToEvents); + + await runOneBatch(redis, registry, deviceEventMap, config, logger, metrics); + + expect(conn.sentMessages).toHaveLength(0); + + const orphanInc = metrics.incCalls.find( + (c) => c.name === 'processor_live_broadcast_orphan_records_total', + ); + expect(orphanInc).toBeDefined(); + }); + + it('emits one message per topic for a device registered to multiple events', async () => { + // conn1 subscribes to EVENT_A only, conn2 to EVENT_B only, + // conn3 subscribes to both. The device is registered to both events. + const conn1 = makeConn('c1'); + const conn2 = makeConn('c2'); + const conn3a = makeConn('c3a'); // conn3's subscription to EVENT_A + const conn3b = makeConn('c3b'); // conn3's subscription to EVENT_B (separate entry) + + const topicToConns = new Map([ + [`event:${EVENT_A}`, [conn1, conn3a]], + [`event:${EVENT_B}`, [conn2, conn3b]], + ]); + const deviceToEvents = new Map([[DEVICE_ID, [EVENT_A, EVENT_B]]]); + + const payload = makePositionPayload({ device_id: DEVICE_ID }); + const redis = makeRedis(makeXreadgroupResult(STREAM, '1-0', payload)); + const registry = makeRegistry(topicToConns); + const deviceEventMap = makeDeviceEventMap(deviceToEvents); + + await runOneBatch(redis, registry, deviceEventMap, config, logger, metrics); + + // conn1 is in EVENT_A only → 1 message with topic event:EVENT_A + expect(conn1.sentMessages).toHaveLength(1); + expect((conn1.sentMessages[0] as Record)['topic']).toBe(`event:${EVENT_A}`); + + // conn2 is in EVENT_B only → 1 message with topic event:EVENT_B + expect(conn2.sentMessages).toHaveLength(1); + expect((conn2.sentMessages[0] as Record)['topic']).toBe(`event:${EVENT_B}`); + + // conn3a is the EVENT_A entry for conn3 → 1 message + expect(conn3a.sentMessages).toHaveLength(1); + expect((conn3a.sentMessages[0] as Record)['topic']).toBe(`event:${EVENT_A}`); + + // conn3b is the EVENT_B entry for conn3 → 1 message + expect(conn3b.sentMessages).toHaveLength(1); + expect((conn3b.sentMessages[0] as Record)['topic']).toBe(`event:${EVENT_B}`); + + // Fanout counter: EVENT_A has 2 conns, EVENT_B has 2 conns → total 4 increments + const fanoutIncs = metrics.incCalls.filter( + (c) => c.name === 'processor_live_broadcast_fanout_messages_total', + ); + expect(fanoutIncs).toHaveLength(4); + }); +});