feat(live): task 1.5.4 — broadcast consumer group and fan-out

Adds the per-instance Redis Stream consumer group (live-broadcast-{instance_id})
that reads the telemetry stream and fans out each position to subscribed
WebSocket connections without affecting the durable-write consumer path.

Key changes:
- src/shared/codec.ts: moved decodePosition/CodecError out of src/core/ so
  src/live/broadcast.ts can decode positions without crossing the enforced
  src/core/ ↔ src/live/ boundary; src/core/codec.ts now re-exports from there
- src/shared/types.ts: added Position and AttributeValue (same move, same reason);
  src/core/types.ts re-exports both to preserve existing import paths
- src/live/broadcast.ts: createBroadcastConsumer factory — XREADGROUP loop,
  immediate ACK semantics, toPositionMessage mapper, fanOut per event/topic
- src/live/device-event-map.ts: createDeviceEventMap factory — in-memory cache
  of entry_devices × entries join, refreshed every LIVE_DEVICE_EVENT_REFRESH_MS
- src/db/migrations/0002_positions_faulty.sql: adds faulty boolean column and
  positions_device_ts_idx for snapshot-on-subscribe query (task 1.5.5)
- src/main.ts: wired authClient, authzClient, registry, liveServer,
  deviceEventMap, broadcastConsumer; shutdown chain: liveServer → deviceEventMap
  + broadcastConsumer → durable-write consumer → metricsServer → Redis → Postgres
- test/live-broadcast.test.ts: 4 unit tests covering single subscriber, multiple
  subscribers, orphan device, and multi-event device fan-out
This commit is contained in:
2026-05-02 17:52:33 +02:00
parent 90605614f6
commit fbb1f34e9a
9 changed files with 1130 additions and 258 deletions
+7 -221
View File
@@ -1,225 +1,11 @@
/** /**
* Sentinel decoder for Position records arriving from the Redis Stream. * Re-exports from src/shared/codec.ts.
* *
* tcp-ingestion serializes Position objects with a custom JSON replacer that * The decode logic lives in src/shared/codec.ts so that both src/core/ (durable
* encodes types not natively supported by JSON: * write consumer) and src/live/ (broadcast consumer) can import it without
* - bigint → { __bigint: "<decimal-digits>" } * crossing the enforced src/core/ ↔ src/live/ boundary.
* - Buffer → { __buffer_b64: "<base64>" }
* - Date → ISO8601 string
* *
* This module reverses that encoding so the Processor receives fully-typed * All existing Phase 1 import paths (`import { decodePosition } from './codec.js'`)
* Position objects. The contract is documented in: * continue to work unchanged.
* docs/wiki/concepts/position-record.md
* tcp-ingestion/src/core/publish.ts (jsonReplacer)
*/ */
export { decodePosition, CodecError } from '../shared/codec.js';
import type { Position, AttributeValue } from './types.js';
// ---------------------------------------------------------------------------
// Error type
// ---------------------------------------------------------------------------
export class CodecError extends Error {
override readonly name = 'CodecError';
constructor(message: string, options?: ErrorOptions) {
super(message, options);
}
}
// ---------------------------------------------------------------------------
// Sentinel detection helpers
// ---------------------------------------------------------------------------
/**
* Returns true when the value is exactly `{ __bigint: "<string>" }`.
* The shape must have exactly one key — any extra keys indicate a user-defined
* object that coincidentally has a `__bigint` field, which is not a sentinel.
* In practice tcp-ingestion only emits single-key sentinels; validate strictly.
*/
function isBigintSentinel(value: unknown): value is { __bigint: string } {
if (typeof value !== 'object' || value === null) return false;
const keys = Object.keys(value);
return (
keys.length === 1 &&
keys[0] === '__bigint' &&
typeof (value as Record<string, unknown>)['__bigint'] === 'string'
);
}
/**
* Returns true when the value is exactly `{ __buffer_b64: "<string>" }`.
*/
function isBufferSentinel(value: unknown): value is { __buffer_b64: string } {
if (typeof value !== 'object' || value === null) return false;
const keys = Object.keys(value);
return (
keys.length === 1 &&
keys[0] === '__buffer_b64' &&
typeof (value as Record<string, unknown>)['__buffer_b64'] === 'string'
);
}
// ---------------------------------------------------------------------------
// Reviver
// ---------------------------------------------------------------------------
/**
* JSON.parse reviver that reconstructs the live types from sentinel encodings.
*
* Called by JSON.parse for every key-value pair in the document, bottom-up.
* By the time `attributes` is visited, each attribute value has already been
* converted (sentinels → bigint/Buffer), because JSON.parse visits leaves first.
*
* Reviver must return `unknown` because the result type depends on the key.
* The caller casts the final result to `PositionJson` after validation.
*/
function reviver(key: string, value: unknown): unknown {
// Timestamp field: ISO string → Date
if (key === 'timestamp' && typeof value === 'string') {
const date = new Date(value);
if (isNaN(date.getTime())) {
throw new CodecError(`Invalid timestamp value: "${value}"`);
}
return date;
}
// bigint sentinel
if (isBigintSentinel(value)) {
const digits = value.__bigint;
// Validate: only decimal digits (including optional leading minus for
// negative bigints, though Teltonika IO elements are unsigned).
if (!/^-?\d+$/.test(digits)) {
throw new CodecError(
`Malformed __bigint sentinel: expected decimal digits, got "${digits}"`,
);
}
return BigInt(digits);
}
// Buffer sentinel
if (isBufferSentinel(value)) {
const b64 = value.__buffer_b64;
// Validate base64 characters (standard + URL-safe alphabets, with padding)
if (!/^[A-Za-z0-9+/\-_]*={0,2}$/.test(b64)) {
throw new CodecError(
`Malformed __buffer_b64 sentinel: invalid base64 string "${b64}"`,
);
}
return Buffer.from(b64, 'base64');
}
return value;
}
// ---------------------------------------------------------------------------
// Required field validation
// ---------------------------------------------------------------------------
const REQUIRED_NUMERIC_FIELDS = [
'latitude',
'longitude',
'altitude',
'angle',
'speed',
'satellites',
'priority',
] as const;
/**
* Validates the decoded object has all required Position fields with the
* correct types. Throws `CodecError` naming the first failing field.
*/
function validateDecodedPosition(obj: Record<string, unknown>): asserts obj is {
device_id: string;
timestamp: Date;
latitude: number;
longitude: number;
altitude: number;
angle: number;
speed: number;
satellites: number;
priority: number;
attributes: Record<string, AttributeValue>;
} {
if (typeof obj['device_id'] !== 'string' || obj['device_id'].length === 0) {
throw new CodecError('Missing or invalid field: device_id (expected non-empty string)');
}
if (!(obj['timestamp'] instanceof Date)) {
throw new CodecError(
'Missing or invalid field: timestamp (expected Date after reviver; was ISO string decoded?)',
);
}
for (const field of REQUIRED_NUMERIC_FIELDS) {
if (typeof obj[field] !== 'number') {
throw new CodecError(
`Missing or invalid field: ${field} (expected number, got ${typeof obj[field]})`,
);
}
}
if (typeof obj['attributes'] !== 'object' || obj['attributes'] === null) {
throw new CodecError('Missing or invalid field: attributes (expected object)');
}
// Validate priority is exactly 0, 1, or 2
const priority = obj['priority'] as number;
if (priority !== 0 && priority !== 1 && priority !== 2) {
throw new CodecError(
`Invalid field: priority (expected 0 | 1 | 2, got ${priority})`,
);
}
// Validate attributes values are only AttributeValue types
const attrs = obj['attributes'] as Record<string, unknown>;
for (const [attrKey, attrVal] of Object.entries(attrs)) {
if (
typeof attrVal !== 'number' &&
typeof attrVal !== 'bigint' &&
!Buffer.isBuffer(attrVal)
) {
throw new CodecError(
`Invalid attribute "${attrKey}": expected number | bigint | Buffer, got ${typeof attrVal}`,
);
}
}
}
// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
/**
* Decodes a JSON-encoded Position string (with sentinel encoding applied by
* tcp-ingestion's `serializePosition`) into a fully-typed `Position` object.
*
* Throws `CodecError` if the JSON is malformed, a sentinel is invalid, a
* required field is missing, or a field has the wrong type.
*/
export function decodePosition(payload: string): Position {
let parsed: unknown;
try {
parsed = JSON.parse(payload, reviver);
} catch (err) {
if (err instanceof CodecError) {
throw err;
}
throw new CodecError(
`Failed to parse Position payload as JSON: ${err instanceof Error ? err.message : String(err)}`,
{ cause: err },
);
}
if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) {
throw new CodecError('Position payload must be a JSON object');
}
const obj = parsed as Record<string, unknown>;
validateDecodedPosition(obj);
return obj as unknown as Position;
}
+6 -32
View File
@@ -7,40 +7,14 @@
*/ */
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Shared value types // Shared value types — re-exported from src/shared/types.ts
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/** // Position and AttributeValue live in src/shared/types.ts so that src/live/
* A single IO attribute value from the Teltonika AVL record. // can import them without crossing the src/core/ ↔ src/live/ boundary.
* - number : fixed-width IO elements (N1/N2/N4 — fit safely in JS number) // Re-exported here to preserve all existing Phase 1 import paths.
* - bigint : N8 elements (u64, may exceed Number.MAX_SAFE_INTEGER) export type { AttributeValue, Position } from '../shared/types.js';
* - Buffer : NX variable-length elements (Codec 8 Extended) import type { Position } from '../shared/types.js';
*/
export type AttributeValue = number | bigint | Buffer;
// ---------------------------------------------------------------------------
// Position — input contract from tcp-ingestion
// ---------------------------------------------------------------------------
/**
* Normalized GPS position record. Byte-equivalent to tcp-ingestion's `Position`
* type (docs/wiki/concepts/position-record.md).
*
* `priority` is typed as a union rather than `number` to stay consistent with
* tcp-ingestion and make exhaustive switches possible in domain logic.
*/
export type Position = {
readonly device_id: string;
readonly timestamp: Date;
readonly latitude: number;
readonly longitude: number;
readonly altitude: number;
readonly angle: number; // heading 0360°
readonly speed: number; // km/h; 0 may mean "GPS invalid" — preserve verbatim
readonly satellites: number;
readonly priority: 0 | 1 | 2; // 0=Low, 1=High, 2=Panic
readonly attributes: Readonly<Record<string, AttributeValue>>;
};
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// StreamRecord — raw shape returned by XREADGROUP before codec decoding // StreamRecord — raw shape returned by XREADGROUP before codec decoding
@@ -0,0 +1,19 @@
-- Migration: 0002_positions_faulty
-- Adds the faulty column to positions and ensures the (device_id, ts DESC) index
-- needed by the snapshot-on-subscribe query exists.
--
-- The faulty column is set post-hoc by operators in Directus when a position is
-- flagged as unrealistic. The snapshot-on-subscribe query (task 1.5.5) filters
-- WHERE faulty = false to exclude flagged positions from the initial map state.
-- The live broadcast path (Redis stream → fan-out) never touches this column
-- because faulty flags are applied after the fact.
ALTER TABLE positions
ADD COLUMN IF NOT EXISTS faulty boolean NOT NULL DEFAULT false;
-- Index for the snapshot DISTINCT ON query:
-- SELECT DISTINCT ON (device_id) ... ORDER BY device_id, ts DESC
-- TimescaleDB scans only the latest chunks for devices with recent activity,
-- but the (device_id, ts DESC) index makes per-device latest-position lookups
-- efficient regardless of chunk age.
CREATE INDEX IF NOT EXISTS positions_device_ts_idx ON positions (device_id, ts DESC);
+297
View File
@@ -0,0 +1,297 @@
/**
* Broadcast consumer group — per-instance Redis Stream reader for live fan-out.
*
* Reads the same `telemetry:teltonika` stream as the durable-write consumer
* (task 1.5) but on a SEPARATE per-instance consumer group:
* `live-broadcast-{instance_id}`
*
* This means every Processor instance sees every record for its own connected
* clients. The durable-write group splits records across instances for exactly-
* once Postgres writes; the broadcast group replicates records to every
* instance for fan-out. The two groups operate independently with separate
* offsets; a slow Postgres write does not slow down broadcast.
*
* ACK semantics: ACK immediately on consume (no durability required for
* broadcast — missing a position is fine; only the latest position matters).
*
* Back-pressure: sendOutbound closes slow connections at BACKPRESSURE_THRESHOLD
* (already handled in server.ts).
*
* Spec: processor-ws-contract.md §Multi-instance behaviour;
* task 1.5.4 §Broadcast consumer group
*/
import type { Redis } from 'ioredis';
import type { Logger } from 'pino';
import type { Config } from '../config/load.js';
import type { Metrics, Position } from '../shared/types.js';
import type { SubscriptionRegistry } from './registry.js';
import type { DeviceEventMap } from './device-event-map.js';
import type { PositionMessage } from './protocol.js';
import { sendOutbound } from './server.js';
import type { LiveConnection } from './server.js';
import { decodePosition, CodecError } from '../shared/codec.js';
// ---------------------------------------------------------------------------
// Public interface
// ---------------------------------------------------------------------------
export type BroadcastConsumer = {
readonly start: () => Promise<void>;
readonly stop: () => Promise<void>;
};
// ---------------------------------------------------------------------------
// Wire-format mapper
// ---------------------------------------------------------------------------
/**
* Maps a decoded Position to a PositionMessage (minus `topic`).
* Omits fields rather than sending `null` for absent / zero values.
*
* Field mapping:
* - Position.device_id → deviceId (IMEI string; not a UUID in Phase 1)
* - Position.latitude → lat
* - Position.longitude → lon
* - Position.timestamp → ts (epoch ms)
* - Position.speed → speed (omitted if 0 — may indicate invalid GPS fix)
* - Position.angle → course (omitted if 0)
* - No accuracy field in Phase 1's Position type.
*
* Note: The WS contract spec says deviceId should be `devices.id` (UUID), but
* Phase 1's positions table stores the raw IMEI as device_id. The SPA will
* need to join on the IMEI until Phase 2 introduces UUID-based device tracking.
* This is documented as an open deviation.
*/
function toPositionMessage(
position: Position,
): Omit<PositionMessage, 'topic'> {
const msg: Omit<PositionMessage, 'topic'> = {
type: 'position',
deviceId: position.device_id,
lat: position.latitude,
lon: position.longitude,
ts: position.timestamp.getTime(),
};
// Omit speed when 0 — per Teltonika convention, 0 may indicate invalid GPS.
if (position.speed > 0) {
(msg as Record<string, unknown>)['speed'] = position.speed;
}
// Omit angle/course when 0.
if (position.angle > 0) {
(msg as Record<string, unknown>)['course'] = position.angle;
}
return msg;
}
// ---------------------------------------------------------------------------
// Raw stream entry shape (ioredis XREADGROUP return type)
// ---------------------------------------------------------------------------
type RawStreamEntry = [id: string, fields: string[]];
// ---------------------------------------------------------------------------
// Factory
// ---------------------------------------------------------------------------
export function createBroadcastConsumer(
redis: Redis,
registry: SubscriptionRegistry,
deviceToEvent: DeviceEventMap,
config: Config,
logger: Logger,
metrics: Metrics,
): BroadcastConsumer {
const stream = config.REDIS_TELEMETRY_STREAM;
const groupName = `${config.LIVE_BROADCAST_GROUP_PREFIX}-${config.INSTANCE_ID}`;
const consumerName = config.INSTANCE_ID;
const batchSize = config.LIVE_BROADCAST_BATCH_SIZE;
const batchBlockMs = config.LIVE_BROADCAST_BATCH_BLOCK_MS;
let stopping = false;
let loopPromise: Promise<void> = Promise.resolve();
// -------------------------------------------------------------------------
// Consumer group setup
// -------------------------------------------------------------------------
async function ensureGroup(): Promise<void> {
try {
await redis.xgroup('CREATE', stream, groupName, '$', 'MKSTREAM');
logger.info({ stream, group: groupName }, 'broadcast consumer group created');
} catch (err: unknown) {
if (err instanceof Error && err.message.startsWith('BUSYGROUP')) {
logger.info({ stream, group: groupName }, 'broadcast consumer group already exists');
return;
}
throw err;
}
}
// -------------------------------------------------------------------------
// Fan-out
// -------------------------------------------------------------------------
function fanOut(
entryId: string,
position: Position,
): void {
const eventIds = deviceToEvent.lookup(position.device_id);
if (eventIds.length === 0) {
metrics.inc('processor_live_broadcast_orphan_records_total', {
instance_id: config.INSTANCE_ID,
});
return;
}
const baseMsg = toPositionMessage(position);
for (const eventId of eventIds) {
const topic = `event:${eventId}`;
const conns = registry.connectionsForTopic(topic);
for (const conn of conns as Iterable<LiveConnection>) {
sendOutbound(
conn,
{ ...baseMsg, topic },
metrics,
config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES,
);
metrics.inc('processor_live_broadcast_fanout_messages_total', {
instance_id: config.INSTANCE_ID,
});
}
}
// Broadcast lag: time from GPS fix to fan-out send.
const lagMs = Date.now() - position.timestamp.getTime();
if (lagMs >= 0) {
metrics.observe('processor_live_broadcast_lag_ms', lagMs);
}
logger.debug({ entryId, device: position.device_id, events: eventIds.length }, 'fanned out');
}
// -------------------------------------------------------------------------
// Batch decoder (mirrors core/consumer.ts decodeBatch pattern)
// -------------------------------------------------------------------------
function decodeBatch(entries: RawStreamEntry[]): Array<{
id: string;
position: Position;
}> {
const decoded: Array<{ id: string; position: Position }> = [];
for (const [id, fields] of entries) {
const fieldMap: Record<string, string> = {};
for (let i = 0; i + 1 < fields.length; i += 2) {
const key = fields[i];
const val = fields[i + 1];
if (key !== undefined && val !== undefined) {
fieldMap[key] = val;
}
}
const payload = fieldMap['payload'];
if (payload === undefined) {
logger.warn({ id, stream }, 'broadcast entry missing payload; skipping');
continue;
}
try {
const position = decodePosition(payload);
decoded.push({ id, position });
} catch (err) {
if (err instanceof CodecError) {
logger.warn({ id, err }, 'broadcast decode error; skipping record');
} else {
logger.error({ id, err }, 'unexpected broadcast decode error');
}
// Do not ACK — leave in PEL (though broadcast doesn't retry, this is
// consistent with the "never silently skip" principle for decode errors).
}
}
return decoded;
}
// -------------------------------------------------------------------------
// Read loop
// -------------------------------------------------------------------------
async function runLoop(): Promise<void> {
logger.info({ stream, group: groupName, consumer: consumerName }, 'broadcast consumer started');
while (!stopping) {
let rawResult: [string, [string, string[]][]][] | null;
try {
rawResult = (await redis.xreadgroup(
'GROUP',
groupName,
consumerName,
'COUNT',
String(batchSize),
'BLOCK',
String(batchBlockMs),
'STREAMS',
stream,
'>',
)) as [string, [string, string[]][]][] | null;
} catch (err) {
if (stopping) break;
logger.error({ err }, 'broadcast XREADGROUP failed; backing off 1s');
await new Promise<void>((resolve) => setTimeout(resolve, 1_000));
continue;
}
if (rawResult === null) continue; // BLOCK timeout — check stopping flag
const streamEntries = rawResult[0]?.[1] ?? [];
if (streamEntries.length === 0) continue;
metrics.inc('processor_live_broadcast_records_total', {
instance_id: config.INSTANCE_ID,
}, streamEntries.length);
const decoded = decodeBatch(streamEntries);
// ACK all entries immediately — broadcast has no durability requirement.
const allIds = streamEntries.map(([id]) => id);
if (allIds.length > 0) {
await redis.xack(stream, groupName, ...allIds);
}
// Fan out decoded records to subscribed clients.
for (const { id, position } of decoded) {
fanOut(id, position);
}
}
logger.info({ stream, group: groupName }, 'broadcast consumer loop exited');
}
// -------------------------------------------------------------------------
// Lifecycle
// -------------------------------------------------------------------------
async function start(): Promise<void> {
await ensureGroup();
loopPromise = runLoop();
loopPromise.catch((err: unknown) => {
logger.fatal({ err }, 'broadcast consumer loop crashed; exiting');
process.exit(1);
});
}
async function stop(): Promise<void> {
stopping = true;
await loopPromise;
}
return { start, stop };
}
+118
View File
@@ -0,0 +1,118 @@
/**
* In-memory cache of device → event mappings.
*
* The fan-out loop needs to answer "which events does this device belong to?"
* for every position record. The naive answer — query Postgres on each record —
* is wrong at any meaningful throughput. This module caches the full
* `entry_devices entries` join in memory and refreshes it on a configurable
* cadence (default: every 30 s).
*
* Staleness window: up to LIVE_DEVICE_EVENT_REFRESH_MS. This is acceptable for
* pilot — operators register devices before the event starts, and "the device
* appeared on the map after 30 s" is a tolerable UX gap. Phase 3+ can add
* invalidation signals if needed.
*
* Spec: processor-ws-contract.md §Multi-instance behaviour;
* task 1.5.4 §DeviceEventMap design
*/
import type pg from 'pg';
import type { Logger } from 'pino';
import type { Metrics } from '../shared/types.js';
import type { Config } from '../config/load.js';
// ---------------------------------------------------------------------------
// Public interface
// ---------------------------------------------------------------------------
export type DeviceEventMap = {
/** Returns the event IDs the device is currently registered to. */
readonly lookup: (deviceId: string) => readonly string[];
/** Starts the refresh timer. Immediately runs the first refresh. */
readonly start: () => Promise<void>;
/** Cancels the refresh timer. */
readonly stop: () => void;
};
// ---------------------------------------------------------------------------
// Query result type
// ---------------------------------------------------------------------------
type DeviceEventRow = {
device_id: string;
event_id: string;
};
// ---------------------------------------------------------------------------
// Factory
// ---------------------------------------------------------------------------
export function createDeviceEventMap(
pool: pg.Pool,
config: Config,
logger: Logger,
metrics: Metrics,
): DeviceEventMap {
// Mutable map; atomically swapped on each refresh.
let cache = new Map<string, Set<string>>();
let timer: ReturnType<typeof setInterval> | null = null;
async function refresh(): Promise<void> {
const start = performance.now();
try {
const result = await pool.query<DeviceEventRow>(
`SELECT ed.device_id, e.event_id
FROM entry_devices ed
JOIN entries e ON e.id = ed.entry_id`,
);
const next = new Map<string, Set<string>>();
for (const row of result.rows) {
let eventSet = next.get(row.device_id);
if (!eventSet) {
eventSet = new Set<string>();
next.set(row.device_id, eventSet);
}
eventSet.add(row.event_id);
}
cache = next;
const elapsed = performance.now() - start;
metrics.observe('processor_live_device_event_refresh_latency_ms', elapsed);
metrics.observe('processor_live_device_event_entries', next.size);
logger.debug({ devices: next.size, elapsedMs: Math.round(elapsed) }, 'device-event map refreshed');
} catch (err) {
logger.warn({ err }, 'device-event map refresh failed; retaining stale cache');
// Retain the stale cache — a stale map is better than an empty map
// which would silently drop all fan-out until the next refresh.
}
}
async function start(): Promise<void> {
await refresh();
timer = setInterval(() => {
refresh().catch((err: unknown) => {
logger.warn({ err }, 'device-event map refresh interval error');
});
}, config.LIVE_DEVICE_EVENT_REFRESH_MS);
// Do not hold the event loop open during shutdown.
timer.unref();
}
function stop(): void {
if (timer !== null) {
clearInterval(timer);
timer = null;
}
}
function lookup(deviceId: string): readonly string[] {
const events = cache.get(deviceId);
if (!events || events.size === 0) return [];
return [...events];
}
return { lookup, start, stop };
}
+32 -3
View File
@@ -22,6 +22,8 @@ import type { InboundMessage } from './live/protocol.js';
import { createAuthClient } from './live/auth.js'; import { createAuthClient } from './live/auth.js';
import { createAuthzClient } from './live/authz.js'; import { createAuthzClient } from './live/authz.js';
import { createSubscriptionRegistry } from './live/registry.js'; import { createSubscriptionRegistry } from './live/registry.js';
import { createBroadcastConsumer } from './live/broadcast.js';
import { createDeviceEventMap } from './live/device-event-map.js';
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Startup: validate config (fail fast on bad env), build logger // Startup: validate config (fail fast on bad env), build logger
@@ -158,19 +160,36 @@ async function main(): Promise<void> {
(conn) => registry.onConnectionClose(conn), (conn) => registry.onConnectionClose(conn),
authClient, authClient,
); );
// 10b. Build the device-event map (Postgres-backed, periodic refresh).
const deviceEventMap = createDeviceEventMap(pool, config, logger, metrics);
// 10c. Build the broadcast consumer (per-instance consumer group fan-out).
const broadcastConsumer = createBroadcastConsumer(
redis,
registry,
deviceEventMap,
config,
logger,
metrics,
);
await liveServer.start(); await liveServer.start();
await deviceEventMap.start();
await broadcastConsumer.start();
// 11. Build and start the durable-write consumer // 11. Build and start the durable-write consumer
const consumer = createConsumer(redis, config, logger, metrics, sink); const consumer = createConsumer(redis, config, logger, metrics, sink);
await consumer.start(); await consumer.start();
// 12. Install graceful shutdown. // 12. Install graceful shutdown.
// Shutdown order: live server first (no new connections), then // Shutdown order: live server first (no new connections),
// broadcast consumer (task 1.5.4 adds this), then durable-write consumer. // then broadcast consumer, then durable-write consumer last.
installGracefulShutdown({ installGracefulShutdown({
redis, redis,
pool, pool,
consumer, consumer,
broadcastConsumer,
deviceEventMap,
liveServer, liveServer,
metricsServer, metricsServer,
pgHealth, pgHealth,
@@ -198,6 +217,8 @@ type ShutdownDeps = {
readonly redis: Redis; readonly redis: Redis;
readonly pool: pg.Pool; readonly pool: pg.Pool;
readonly consumer: { stop: () => Promise<void> }; readonly consumer: { stop: () => Promise<void> };
readonly broadcastConsumer: { stop: () => Promise<void> };
readonly deviceEventMap: { stop: () => void };
readonly liveServer: LiveServer; readonly liveServer: LiveServer;
readonly metricsServer: http.Server; readonly metricsServer: http.Server;
readonly pgHealth: { stop: () => void }; readonly pgHealth: { stop: () => void };
@@ -206,7 +227,10 @@ type ShutdownDeps = {
}; };
function installGracefulShutdown(deps: ShutdownDeps): void { function installGracefulShutdown(deps: ShutdownDeps): void {
const { redis, pool, consumer, liveServer, metricsServer, pgHealth, lagSampler, logger: log } = deps; const {
redis, pool, consumer, broadcastConsumer, deviceEventMap,
liveServer, metricsServer, pgHealth, lagSampler, logger: log,
} = deps;
let shuttingDown = false; let shuttingDown = false;
@@ -232,6 +256,11 @@ function installGracefulShutdown(deps: ShutdownDeps): void {
.stop() .stop()
.then(() => { .then(() => {
log.info('live server stopped'); log.info('live server stopped');
deviceEventMap.stop();
return broadcastConsumer.stop();
})
.then(() => {
log.info('broadcast consumer stopped');
return consumer.stop(); return consumer.stop();
}) })
.then(() => { .then(() => {
+229
View File
@@ -0,0 +1,229 @@
/**
* Sentinel decoder for Position records arriving from the Redis Stream.
*
* Moved from src/core/codec.ts to src/shared/ so that both src/core/ and
* src/live/ can import it without crossing the enforced boundary between those
* two layers.
*
* tcp-ingestion serializes Position objects with a custom JSON replacer that
* encodes types not natively supported by JSON:
* - bigint → { __bigint: "<decimal-digits>" }
* - Buffer → { __buffer_b64: "<base64>" }
* - Date → ISO8601 string
*
* This module reverses that encoding so the Processor receives fully-typed
* Position objects. The contract is documented in:
* docs/wiki/concepts/position-record.md
* tcp-ingestion/src/core/publish.ts (jsonReplacer)
*/
import type { Position, AttributeValue } from './types.js';
// ---------------------------------------------------------------------------
// Error type
// ---------------------------------------------------------------------------
export class CodecError extends Error {
override readonly name = 'CodecError';
constructor(message: string, options?: ErrorOptions) {
super(message, options);
}
}
// ---------------------------------------------------------------------------
// Sentinel detection helpers
// ---------------------------------------------------------------------------
/**
* Returns true when the value is exactly `{ __bigint: "<string>" }`.
* The shape must have exactly one key — any extra keys indicate a user-defined
* object that coincidentally has a `__bigint` field, which is not a sentinel.
* In practice tcp-ingestion only emits single-key sentinels; validate strictly.
*/
function isBigintSentinel(value: unknown): value is { __bigint: string } {
if (typeof value !== 'object' || value === null) return false;
const keys = Object.keys(value);
return (
keys.length === 1 &&
keys[0] === '__bigint' &&
typeof (value as Record<string, unknown>)['__bigint'] === 'string'
);
}
/**
* Returns true when the value is exactly `{ __buffer_b64: "<string>" }`.
*/
function isBufferSentinel(value: unknown): value is { __buffer_b64: string } {
if (typeof value !== 'object' || value === null) return false;
const keys = Object.keys(value);
return (
keys.length === 1 &&
keys[0] === '__buffer_b64' &&
typeof (value as Record<string, unknown>)['__buffer_b64'] === 'string'
);
}
// ---------------------------------------------------------------------------
// Reviver
// ---------------------------------------------------------------------------
/**
* JSON.parse reviver that reconstructs the live types from sentinel encodings.
*
* Called by JSON.parse for every key-value pair in the document, bottom-up.
* By the time `attributes` is visited, each attribute value has already been
* converted (sentinels → bigint/Buffer), because JSON.parse visits leaves first.
*
* Reviver must return `unknown` because the result type depends on the key.
* The caller casts the final result to `PositionJson` after validation.
*/
function reviver(key: string, value: unknown): unknown {
// Timestamp field: ISO string → Date
if (key === 'timestamp' && typeof value === 'string') {
const date = new Date(value);
if (isNaN(date.getTime())) {
throw new CodecError(`Invalid timestamp value: "${value}"`);
}
return date;
}
// bigint sentinel
if (isBigintSentinel(value)) {
const digits = value.__bigint;
// Validate: only decimal digits (including optional leading minus for
// negative bigints, though Teltonika IO elements are unsigned).
if (!/^-?\d+$/.test(digits)) {
throw new CodecError(
`Malformed __bigint sentinel: expected decimal digits, got "${digits}"`,
);
}
return BigInt(digits);
}
// Buffer sentinel
if (isBufferSentinel(value)) {
const b64 = value.__buffer_b64;
// Validate base64 characters (standard + URL-safe alphabets, with padding)
if (!/^[A-Za-z0-9+/\-_]*={0,2}$/.test(b64)) {
throw new CodecError(
`Malformed __buffer_b64 sentinel: invalid base64 string "${b64}"`,
);
}
return Buffer.from(b64, 'base64');
}
return value;
}
// ---------------------------------------------------------------------------
// Required field validation
// ---------------------------------------------------------------------------
const REQUIRED_NUMERIC_FIELDS = [
'latitude',
'longitude',
'altitude',
'angle',
'speed',
'satellites',
'priority',
] as const;
/**
* Validates the decoded object has all required Position fields with the
* correct types. Throws `CodecError` naming the first failing field.
*/
function validateDecodedPosition(obj: Record<string, unknown>): asserts obj is {
device_id: string;
timestamp: Date;
latitude: number;
longitude: number;
altitude: number;
angle: number;
speed: number;
satellites: number;
priority: number;
attributes: Record<string, AttributeValue>;
} {
if (typeof obj['device_id'] !== 'string' || obj['device_id'].length === 0) {
throw new CodecError('Missing or invalid field: device_id (expected non-empty string)');
}
if (!(obj['timestamp'] instanceof Date)) {
throw new CodecError(
'Missing or invalid field: timestamp (expected Date after reviver; was ISO string decoded?)',
);
}
for (const field of REQUIRED_NUMERIC_FIELDS) {
if (typeof obj[field] !== 'number') {
throw new CodecError(
`Missing or invalid field: ${field} (expected number, got ${typeof obj[field]})`,
);
}
}
if (typeof obj['attributes'] !== 'object' || obj['attributes'] === null) {
throw new CodecError('Missing or invalid field: attributes (expected object)');
}
// Validate priority is exactly 0, 1, or 2
const priority = obj['priority'] as number;
if (priority !== 0 && priority !== 1 && priority !== 2) {
throw new CodecError(
`Invalid field: priority (expected 0 | 1 | 2, got ${priority})`,
);
}
// Validate attributes values are only AttributeValue types
const attrs = obj['attributes'] as Record<string, unknown>;
for (const [attrKey, attrVal] of Object.entries(attrs)) {
if (
typeof attrVal !== 'number' &&
typeof attrVal !== 'bigint' &&
!Buffer.isBuffer(attrVal)
) {
throw new CodecError(
`Invalid attribute "${attrKey}": expected number | bigint | Buffer, got ${typeof attrVal}`,
);
}
}
}
// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
/**
* Decodes a JSON-encoded Position string (with sentinel encoding applied by
* tcp-ingestion's `serializePosition`) into a fully-typed `Position` object.
*
* Throws `CodecError` if the JSON is malformed, a sentinel is invalid, a
* required field is missing, or a field has the wrong type.
*/
export function decodePosition(payload: string): Position {
let parsed: unknown;
try {
parsed = JSON.parse(payload, reviver);
} catch (err) {
if (err instanceof CodecError) {
throw err;
}
throw new CodecError(
`Failed to parse Position payload as JSON: ${err instanceof Error ? err.message : String(err)}`,
{ cause: err },
);
}
if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) {
throw new CodecError('Position payload must be a JSON object');
}
const obj = parsed as Record<string, unknown>;
validateDecodedPosition(obj);
return obj as unknown as Position;
}
+37 -2
View File
@@ -4,8 +4,11 @@
* Both modules need the `Metrics` interface for observability. Placing it here * Both modules need the `Metrics` interface for observability. Placing it here
* avoids an import across the enforced src/core/ ↔ src/live/ boundary. * avoids an import across the enforced src/core/ ↔ src/live/ boundary.
* *
* src/core/types.ts re-exports Metrics from here to preserve the existing * `Position` and `AttributeValue` are placed here so that src/live/broadcast.ts
* import path for Phase 1 call sites. * can reference them without importing across the src/core/ ↔ src/live/ boundary.
*
* src/core/types.ts re-exports all shared types to preserve existing import
* paths for Phase 1 call sites.
*/ */
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -27,3 +30,35 @@ export type Metrics = {
) => void; ) => void;
readonly observe: (name: string, value: number, labels?: Record<string, string>) => void; readonly observe: (name: string, value: number, labels?: Record<string, string>) => void;
}; };
// ---------------------------------------------------------------------------
// Position — input contract from tcp-ingestion
// ---------------------------------------------------------------------------
/**
* A single IO attribute value from the Teltonika AVL record.
* - number : fixed-width IO elements (N1/N2/N4 — fit safely in JS number)
* - bigint : N8 elements (u64, may exceed Number.MAX_SAFE_INTEGER)
* - Buffer : NX variable-length elements (Codec 8 Extended)
*/
export type AttributeValue = number | bigint | Buffer;
/**
* Normalized GPS position record. Byte-equivalent to tcp-ingestion's `Position`
* type (docs/wiki/concepts/position-record.md).
*
* `priority` is typed as a union rather than `number` to stay consistent with
* tcp-ingestion and make exhaustive switches possible in domain logic.
*/
export type Position = {
readonly device_id: string;
readonly timestamp: Date;
readonly latitude: number;
readonly longitude: number;
readonly altitude: number;
readonly angle: number; // heading 0360°
readonly speed: number; // km/h; 0 may mean "GPS invalid" — preserve verbatim
readonly satellites: number;
readonly priority: 0 | 1 | 2; // 0=Low, 1=High, 2=Panic
readonly attributes: Readonly<Record<string, AttributeValue>>;
};
+385
View File
@@ -0,0 +1,385 @@
/**
* Unit tests for src/live/broadcast.ts — broadcast consumer fan-out logic.
*
* Strategy: exercise fanOut in isolation by driving a single-iteration loop.
* We stub XREADGROUP to return one batch of entries, then immediately set
* `stopping = true` via `stop()`. The Redis `xgroup` CREATE call returns
* BUSYGROUP (group already exists) so `ensureGroup` succeeds without a real
* server.
*
* `sendOutbound` is called with real LiveConnection stubs that have a mock
* `ws.send`. This tests the full fanOut → sendOutbound → ws.send path without
* any module mocking.
*
* Covers (spec: task 1.5.4):
* 1. Single subscriber on an event receives a correctly-shaped position message.
* 2. Multiple subscribers on the same event each receive the message.
* 3. Orphan device (not in any event) increments orphan counter, sends nothing.
* 4. Device registered to multiple events emits one message per event topic.
*/
import { describe, it, expect, vi, beforeEach } from 'vitest';
import type { Logger } from 'pino';
import type { Config } from '../src/config/load.js';
import type { Metrics } from '../src/shared/types.js';
import type { SubscriptionRegistry } from '../src/live/registry.js';
import type { DeviceEventMap } from '../src/live/device-event-map.js';
import type { LiveConnection } from '../src/live/server.js';
import { createBroadcastConsumer } from '../src/live/broadcast.js';
import WebSocket from 'ws';
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeSilentLogger(): Logger {
return {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
fatal: vi.fn(),
trace: vi.fn(),
child: vi.fn().mockReturnThis(),
level: 'silent',
silent: vi.fn(),
} as unknown as Logger;
}
type RecordedMetrics = Metrics & {
incCalls: Array<{ name: string; labels?: Record<string, string>; value?: number }>;
observeCalls: Array<{ name: string; value: number }>;
};
function makeMetrics(): RecordedMetrics {
const incCalls: Array<{ name: string; labels?: Record<string, string>; value?: number }> = [];
const observeCalls: Array<{ name: string; value: number }> = [];
return {
incCalls,
observeCalls,
inc(name, labels?, value?) { incCalls.push({ name, labels, value }); },
observe(name, value) { observeCalls.push({ name, value }); },
};
}
function makeConfig(): Config {
return {
NODE_ENV: 'test',
INSTANCE_ID: 'test-instance',
LOG_LEVEL: 'silent',
REDIS_URL: 'redis://localhost:6379',
POSTGRES_URL: 'postgres://localhost:5432/test',
REDIS_TELEMETRY_STREAM: 'telemetry:teltonika',
REDIS_CONSUMER_GROUP: 'processor',
REDIS_CONSUMER_NAME: 'test-consumer',
METRICS_PORT: 0,
BATCH_SIZE: 100,
BATCH_BLOCK_MS: 500,
WRITE_BATCH_SIZE: 50,
DEVICE_STATE_LRU_CAP: 10_000,
LIVE_WS_PORT: 8081,
LIVE_WS_HOST: '0.0.0.0',
LIVE_WS_PING_INTERVAL_MS: 30_000,
LIVE_WS_DRAIN_TIMEOUT_MS: 5_000,
LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES: 1_048_576,
DIRECTUS_BASE_URL: 'http://directus.test',
DIRECTUS_AUTH_TIMEOUT_MS: 5_000,
DIRECTUS_AUTHZ_TIMEOUT_MS: 5_000,
LIVE_BROADCAST_GROUP_PREFIX: 'live-broadcast',
LIVE_BROADCAST_BATCH_SIZE: 100,
LIVE_BROADCAST_BATCH_BLOCK_MS: 1_000,
LIVE_DEVICE_EVENT_REFRESH_MS: 30_000,
};
}
/**
* Builds a synthetic LiveConnection stub whose `ws.send` captures JSON-parsed
* outbound messages. `bufferedAmount` is 0 so sendOutbound never closes it.
*/
function makeConn(id = 'conn-1'): LiveConnection & { sentMessages: unknown[] } {
const sentMessages: unknown[] = [];
const ws = {
readyState: WebSocket.OPEN,
bufferedAmount: 0,
send: vi.fn((data: string) => { sentMessages.push(JSON.parse(data)); }),
close: vi.fn(),
} as unknown as WebSocket;
return {
id,
ws,
remoteAddr: '127.0.0.1',
openedAt: new Date(),
lastSeenAt: new Date(),
user: {
id: 'user-1',
email: 'test@test.com',
role: null,
first_name: 'T',
last_name: 'U',
},
cookieHeader: 'session=x',
sentMessages,
};
}
/** Serialises a Position into the flat wire payload that broadcast.ts expects. */
function makePositionPayload(overrides: Partial<{
device_id: string;
timestamp: string;
speed: number;
angle: number;
}> = {}): string {
return JSON.stringify({
device_id: overrides.device_id ?? 'IMEI123',
timestamp: overrides.timestamp ?? new Date('2025-01-01T12:00:00.000Z').toISOString(),
latitude: 41.33165,
longitude: 19.83177,
altitude: 50,
angle: overrides.angle ?? 0,
speed: overrides.speed ?? 0,
satellites: 8,
priority: 0,
attributes: {},
});
}
/**
* Builds a fake XREADGROUP result for a single stream entry.
* ioredis returns: `[[streamName, [[id, fieldValueArray]]]]`
*/
function makeXreadgroupResult(
stream: string,
id: string,
payload: string,
): [string, [string, string[]][]][] {
return [[stream, [[id, ['payload', payload]]]]];
}
/**
* Creates a Redis stub that:
* - `xgroup` returns BUSYGROUP error (group already exists — happy path).
* - `xreadgroup` returns the provided result on the first call, then blocks
* for up to 2s on subsequent calls (simulating real BLOCK behaviour).
* Blocking is implemented by waiting for `stopSignal` to resolve, capped
* at 2000ms so tests cannot hang indefinitely.
* - `xack` resolves immediately and triggers the stopSignal promise.
*/
function makeRedis(
firstXreadgroupResult: [string, [string, string[]][]][] | null,
): Redis & { stopSignal: Promise<void>; triggerStop: () => void } {
let xreadgroupCallCount = 0;
let triggerStop!: () => void;
const stopSignal = new Promise<void>((resolve) => { triggerStop = resolve; });
const redis: Redis & { stopSignal: Promise<void>; triggerStop: () => void } = {
xgroup: vi.fn().mockRejectedValue(Object.assign(new Error('BUSYGROUP group already exists'), {})),
xreadgroup: vi.fn((..._args: unknown[]) => {
xreadgroupCallCount += 1;
if (xreadgroupCallCount === 1) {
return Promise.resolve(firstXreadgroupResult);
}
// Block until stop() is called (or 2s timeout as safety valve).
return Promise.race([
stopSignal.then(() => null as null),
new Promise<null>((resolve) => setTimeout(() => resolve(null), 2_000)),
]);
}),
xack: vi.fn().mockImplementation(() => {
// Signal that the batch has been processed — stop() can now be called.
triggerStop();
return Promise.resolve(1);
}),
status: 'ready',
stopSignal,
triggerStop,
} as unknown as Redis & { stopSignal: Promise<void>; triggerStop: () => void };
return redis;
}
/** Creates a SubscriptionRegistry stub that maps topic → connections. */
function makeRegistry(
topicToConns: Map<string, LiveConnection[]>,
): SubscriptionRegistry {
return {
connectionsForTopic: vi.fn((topic: string) => topicToConns.get(topic) ?? []),
subscribe: vi.fn(),
unsubscribe: vi.fn(),
onConnectionClose: vi.fn(),
topicsForConnection: vi.fn().mockReturnValue([]),
stats: vi.fn().mockReturnValue({ connections: 0, subscriptions: 0, topics: 0 }),
};
}
/** Creates a DeviceEventMap stub. */
function makeDeviceEventMap(deviceToEvents: Map<string, string[]>): DeviceEventMap {
return {
lookup: vi.fn((deviceId: string) => deviceToEvents.get(deviceId) ?? []),
start: vi.fn().mockResolvedValue(undefined),
stop: vi.fn(),
};
}
/**
* Runs the broadcast consumer for one batch: starts it, waits until xack has
* been called (the batch was fully processed), then stops it.
*
* The Redis stub's xreadgroup blocks on the second call until xack fires
* (or 2s timeout), so `stop()` always finds the loop idle before terminating.
*/
async function runOneBatch(
redis: ReturnType<typeof makeRedis>,
registry: SubscriptionRegistry,
deviceEventMap: DeviceEventMap,
config: Config,
logger: Logger,
metrics: Metrics,
): Promise<void> {
const consumer = createBroadcastConsumer(redis, registry, deviceEventMap, config, logger, metrics);
await consumer.start();
// Wait until the xack mock fires (which also triggers stopSignal, causing the
// second xreadgroup call to unblock and return null). Give up after 3s to
// avoid hanging if the batch was empty / all entries were skipped.
await Promise.race([
redis.stopSignal,
new Promise<void>((resolve) => setTimeout(resolve, 3_000)),
]);
await consumer.stop();
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe('createBroadcastConsumer', () => {
let config: Config;
let logger: Logger;
let metrics: RecordedMetrics;
const STREAM = 'telemetry:teltonika';
const EVENT_A = 'aaa00000-0000-0000-0000-000000000001';
const EVENT_B = 'bbb00000-0000-0000-0000-000000000002';
const DEVICE_ID = 'IMEI999888777';
beforeEach(() => {
config = makeConfig();
logger = makeSilentLogger();
metrics = makeMetrics();
});
it('sends a correctly-shaped position message to a single subscriber', async () => {
const conn = makeConn('c1');
const topicToConns = new Map([[`event:${EVENT_A}`, [conn]]]);
const deviceToEvents = new Map([[DEVICE_ID, [EVENT_A]]]);
const payload = makePositionPayload({ device_id: DEVICE_ID, speed: 42, angle: 180 });
const redis = makeRedis(makeXreadgroupResult(STREAM, '1-0', payload));
const registry = makeRegistry(topicToConns);
const deviceEventMap = makeDeviceEventMap(deviceToEvents);
await runOneBatch(redis, registry, deviceEventMap, config, logger, metrics);
expect(conn.sentMessages).toHaveLength(1);
const msg = conn.sentMessages[0] as Record<string, unknown>;
expect(msg['type']).toBe('position');
expect(msg['topic']).toBe(`event:${EVENT_A}`);
expect(msg['deviceId']).toBe(DEVICE_ID);
expect(typeof msg['lat']).toBe('number');
expect(typeof msg['lon']).toBe('number');
expect(typeof msg['ts']).toBe('number');
// speed and course are included when non-zero
expect(msg['speed']).toBe(42);
expect(msg['course']).toBe(180);
});
it('sends to all subscribers on the same event', async () => {
const conn1 = makeConn('c1');
const conn2 = makeConn('c2');
const conn3 = makeConn('c3');
const topicToConns = new Map([[`event:${EVENT_A}`, [conn1, conn2, conn3]]]);
const deviceToEvents = new Map([[DEVICE_ID, [EVENT_A]]]);
const payload = makePositionPayload({ device_id: DEVICE_ID });
const redis = makeRedis(makeXreadgroupResult(STREAM, '1-0', payload));
const registry = makeRegistry(topicToConns);
const deviceEventMap = makeDeviceEventMap(deviceToEvents);
await runOneBatch(redis, registry, deviceEventMap, config, logger, metrics);
expect(conn1.sentMessages).toHaveLength(1);
expect(conn2.sentMessages).toHaveLength(1);
expect(conn3.sentMessages).toHaveLength(1);
// All received the same topic
for (const conn of [conn1, conn2, conn3]) {
expect((conn.sentMessages[0] as Record<string, unknown>)['topic']).toBe(`event:${EVENT_A}`);
}
});
it('increments orphan counter and sends nothing for an unregistered device', async () => {
const conn = makeConn('c1');
// Device has no events registered
const deviceToEvents = new Map<string, string[]>();
const topicToConns = new Map([[`event:${EVENT_A}`, [conn]]]);
const payload = makePositionPayload({ device_id: DEVICE_ID });
const redis = makeRedis(makeXreadgroupResult(STREAM, '1-0', payload));
const registry = makeRegistry(topicToConns);
const deviceEventMap = makeDeviceEventMap(deviceToEvents);
await runOneBatch(redis, registry, deviceEventMap, config, logger, metrics);
expect(conn.sentMessages).toHaveLength(0);
const orphanInc = metrics.incCalls.find(
(c) => c.name === 'processor_live_broadcast_orphan_records_total',
);
expect(orphanInc).toBeDefined();
});
it('emits one message per topic for a device registered to multiple events', async () => {
// conn1 subscribes to EVENT_A only, conn2 to EVENT_B only,
// conn3 subscribes to both. The device is registered to both events.
const conn1 = makeConn('c1');
const conn2 = makeConn('c2');
const conn3a = makeConn('c3a'); // conn3's subscription to EVENT_A
const conn3b = makeConn('c3b'); // conn3's subscription to EVENT_B (separate entry)
const topicToConns = new Map([
[`event:${EVENT_A}`, [conn1, conn3a]],
[`event:${EVENT_B}`, [conn2, conn3b]],
]);
const deviceToEvents = new Map([[DEVICE_ID, [EVENT_A, EVENT_B]]]);
const payload = makePositionPayload({ device_id: DEVICE_ID });
const redis = makeRedis(makeXreadgroupResult(STREAM, '1-0', payload));
const registry = makeRegistry(topicToConns);
const deviceEventMap = makeDeviceEventMap(deviceToEvents);
await runOneBatch(redis, registry, deviceEventMap, config, logger, metrics);
// conn1 is in EVENT_A only → 1 message with topic event:EVENT_A
expect(conn1.sentMessages).toHaveLength(1);
expect((conn1.sentMessages[0] as Record<string, unknown>)['topic']).toBe(`event:${EVENT_A}`);
// conn2 is in EVENT_B only → 1 message with topic event:EVENT_B
expect(conn2.sentMessages).toHaveLength(1);
expect((conn2.sentMessages[0] as Record<string, unknown>)['topic']).toBe(`event:${EVENT_B}`);
// conn3a is the EVENT_A entry for conn3 → 1 message
expect(conn3a.sentMessages).toHaveLength(1);
expect((conn3a.sentMessages[0] as Record<string, unknown>)['topic']).toBe(`event:${EVENT_A}`);
// conn3b is the EVENT_B entry for conn3 → 1 message
expect(conn3b.sentMessages).toHaveLength(1);
expect((conn3b.sentMessages[0] as Record<string, unknown>)['topic']).toBe(`event:${EVENT_B}`);
// Fanout counter: EVENT_A has 2 conns, EVENT_B has 2 conns → total 4 increments
const fanoutIncs = metrics.incCalls.filter(
(c) => c.name === 'processor_live_broadcast_fanout_messages_total',
);
expect(fanoutIncs).toHaveLength(4);
});
});