# Task 1.5.4 — Broadcast consumer group & fan-out **Phase:** 1.5 — Live broadcast **Status:** ⬜ Not started **Depends on:** 1.5.3 **Wiki refs:** `docs/wiki/synthesis/processor-ws-contract.md` §Streaming updates, §Multi-instance behaviour; `docs/wiki/concepts/live-channel-architecture.md` §Multi-instance Processor ## Goal Read the same `telemetry:teltonika` Redis stream as Phase 1's durable-write consumer, but on a **per-instance** consumer group `live-broadcast-{instance_id}`, and fan out each position record to the connections subscribed to its event. The durable-write path is unaffected; this is an additional read with a different group name and a different sink. After this task, a position published to Redis arrives on the SPA's WebSocket within ~50ms (in-process Redis read + per-event index lookup + JSON serialise + WS send). ## Deliverables - `src/live/broadcast.ts` exporting: - `createBroadcastConsumer(redis, registry, deviceToEvent, config, logger, metrics): BroadcastConsumer` — factory. - `BroadcastConsumer` interface: `start(): Promise`, `stop(): Promise`. Same lifecycle shape as Phase 1's `Consumer`. - The fan-out loop: read batch via `XREADGROUP`, for each record decode → look up the device's event → fetch subscribers → emit one `position` message per subscriber. - `src/live/device-event-map.ts` exporting: - `createDeviceEventMap(pool, config, logger): DeviceEventMap` — factory. - `DeviceEventMap.lookup(deviceId: string): string[]` — returns the event IDs the device is registered to *right now*. Cached in memory; refreshed on a cadence (default every 30s) and on demand (via a Redis Stream invalidation signal — same pattern as Phase 1's `recompute:requests`, but for entry-device assignments). For pilot, the cadence-based refresh is enough; manual invalidation can land later. - The query: `SELECT entry_devices.device_id, entries.event_id FROM entry_devices JOIN entries ON entries.id = entry_devices.entry_id`. - `src/main.ts` updated to wire and start the broadcast consumer alongside the existing throughput consumer; SIGTERM stops both (live server first, broadcast consumer second, durable-write consumer last). - New config keys (zod): - `LIVE_BROADCAST_GROUP_PREFIX` (default `live-broadcast`) — full group name is `${prefix}-${INSTANCE_ID}`. - `LIVE_BROADCAST_BATCH_SIZE` (default `100`). - `LIVE_BROADCAST_BATCH_BLOCK_MS` (default `1000`). - `LIVE_DEVICE_EVENT_REFRESH_MS` (default `30_000`). - New Prometheus metrics: - `processor_live_broadcast_records_total{instance_id}` (counter). - `processor_live_broadcast_fanout_messages_total{instance_id}` (counter) — per outbound `position` frame sent. - `processor_live_broadcast_orphan_records_total{instance_id}` (counter) — records for devices not registered to any event. - `processor_live_broadcast_lag_ms` (histogram) — time from `XADD` (record's `ts` field) to fan-out send. - `test/live-broadcast.test.ts`: - With a fake stream entry for a device registered to `event:E1` and one subscriber to `event:E1`, fan-out sends one `position` message to that subscriber. - Multiple subscribers on the same event each receive the message. - A device registered to no event increments `orphan_records_total` and emits no message. - Devices registered to multiple events emit one message per subscribing connection per event (i.e. a connection subscribed to both events for the same device receives two messages — they're per-topic). ## Specification ### Why a separate consumer group Phase 1's durable-write consumer is on group `processor` (configurable, default in `tcp-ingestion` and matched in Processor). Two instances share that group and Redis splits records across them — exactly one instance handles each write. Live broadcast needs the opposite: **every instance must see every record**, because each instance has its own connected clients. The clean way to do that with Redis Streams is one group per instance. Group name `live-broadcast-{instance_id}` ensures uniqueness; each instance's `XREADGROUP` gets the full firehose for that group. The two reads are independent — the durable-write group's offset and the live-broadcast group's offset are separate. A slow durable write doesn't slow down broadcast and vice versa. ### Fan-out shape ```ts async function runLoop() { while (!stopping) { let entries: StreamEntry[]; try { entries = await redis.xreadgroup( 'GROUP', groupName, consumerName, 'COUNT', config.LIVE_BROADCAST_BATCH_SIZE, 'BLOCK', config.LIVE_BROADCAST_BATCH_BLOCK_MS, 'STREAMS', config.REDIS_TELEMETRY_STREAM, '>', ); } catch (err) { logger.error({ err }, 'broadcast XREADGROUP failed; backing off'); await sleep(1000); continue; } if (!entries) continue; for (const entry of decodeBatch(entries)) { metrics.broadcastRecords.inc(); await fanOut(entry); // ACK immediately — broadcast doesn't need durability semantics. await redis.xack(config.REDIS_TELEMETRY_STREAM, groupName, entry.id); } } } async function fanOut(record: ConsumedRecord) { const eventIds = deviceToEvent.lookup(record.position.deviceId); if (eventIds.length === 0) { metrics.broadcastOrphans.inc(); return; } const message = toPositionMessage(record.position); // shape per processor-ws-contract for (const eventId of eventIds) { const topic = `event:${eventId}`; const conns = registry.connectionsForTopic(topic); for (const conn of conns) { sendOutbound(conn, { ...message, topic }); metrics.broadcastFanout.inc(); } } metrics.broadcastLag.observe(Date.now() - record.position.ts); } ``` ### Why ACK immediately Phase 1's durable-write consumer ACKs only after Postgres confirms the write — that's the `XACK` discipline that protects against data loss. The broadcast consumer has different durability semantics: **a missed broadcast is acceptable.** If a position fails to fan out (because the connection crashed mid-send, say), the next position is what matters. Don't keep a pending entry just to retry an obsolete record. ACK-on-consume keeps the broadcast group's PEL empty, prevents pending-entry buildup, and avoids the "send the same position twice on retry" anti-feature. Phase 3 hardening can revisit if we ever need broadcast guarantees. ### `DeviceEventMap` design The fan-out path needs to answer "which events does this device belong to?" thousands of times per second. The naive answer — query Postgres on each record — is wrong. Two options: **Option A: In-process cache with periodic refresh.** Load the full `entry_devices` ⨯ `entries` join at startup; refresh every 30s. Stale data window: up to 30s. **Pick this for pilot.** **Option B: Listen for changes.** Add a `entry-devices:changed` Redis Stream (or use Directus Flows to publish on writes); broadcast invalidates affected entries. Stale data window: ~50ms. Adds protocol surface and a coordination point. For pilot: Option A. The 30s staleness window is acceptable — operators register devices before the event starts, and "you registered a new device 30s ago and it's not on the map yet" is a tolerable UX. Phase 3+ can promote to Option B if real-time registration matters. ```ts class DeviceEventMap { private map = new Map>(); // deviceId → Set private timer: NodeJS.Timeout | null = null; async start() { await this.refresh(); this.timer = setInterval(() => { this.refresh().catch(err => logger.warn({ err }, 'device-event map refresh failed')); }, config.LIVE_DEVICE_EVENT_REFRESH_MS); } async stop() { if (this.timer) clearInterval(this.timer); } async refresh() { const start = performance.now(); const result = await pool.query<{ device_id: string; event_id: string }>( `SELECT ed.device_id, e.event_id FROM entry_devices ed JOIN entries e ON e.id = ed.entry_id` ); const next = new Map>(); for (const row of result.rows) { if (!next.has(row.device_id)) next.set(row.device_id, new Set()); next.get(row.device_id)!.add(row.event_id); } this.map = next; metrics.deviceEventRefreshLatency.observe(performance.now() - start); metrics.deviceEventEntries.set(next.size); logger.debug({ devices: next.size }, 'device-event map refreshed'); } lookup(deviceId: string): string[] { return Array.from(this.map.get(deviceId) ?? []); } } ``` ### Back-pressure If a connection's send queue is backing up (slow client, slow network), the WS library queues messages in process memory. At 100 msg/s × 10s of slow consumer = 1000 queued messages × ~200 bytes each = 200KB per slow connection. Tolerable. If we ever see real back-pressure problems: per-connection bounded queue (e.g. last 100 positions per device, dropping older), with a metric `processor_live_broadcast_dropped_total`. Document but defer. For now: rely on `ws.bufferedAmount` to detect slow consumers; if it exceeds a threshold (say 1MB), close the connection with code 1008 (policy violation) and log. Client reconnects. Worth implementing as a defensive measure even for pilot — prevents one slow client from eating all the memory. ```ts function sendOutbound(conn: LiveConnection, msg: OutboundMessage) { if (conn.ws.readyState !== WebSocket.OPEN) return; if (conn.ws.bufferedAmount > config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES) { logger.warn({ connId: conn.id, buffered: conn.ws.bufferedAmount }, 'closing slow connection'); conn.ws.close(1008, 'back-pressure threshold exceeded'); return; } conn.ws.send(JSON.stringify(msg)); metrics.liveMessagesOutbound.inc({ type: msg.type }); } ``` (Update 1.5.1's `sendOutbound` to include this check; add `LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES` config with default `1_048_576`.) ### `toPositionMessage` ```ts function toPositionMessage(p: Position): Omit { const msg: any = { type: 'position', deviceId: p.deviceId, lat: p.latitude, lon: p.longitude, ts: p.ts.getTime(), // epoch ms; contract is number, not ISO string }; if (p.speed != null) msg.speed = p.speed; if (p.course != null) msg.course = p.course; if (p.accuracy != null) msg.accuracy = p.accuracy; if (p.attributes && Object.keys(p.attributes).length > 0) msg.attributes = p.attributes; return msg; } ``` Per the contract: omit fields rather than send `null` for absent values. ## Acceptance criteria - [ ] `pnpm typecheck`, `pnpm lint`, `pnpm test` clean. - [ ] `pnpm dev` boots; logs show both consumer groups joining (`processor` and `live-broadcast-{instance_id}`). - [ ] With a subscribed `wscat` client and a synthetic position published to `telemetry:teltonika`, the client receives a `{"type":"position",...}` frame within ~100ms. - [ ] A second `wscat` client subscribed to the same event also receives the message. - [ ] An orphan position (device not in any `entry_devices` row) increments `processor_live_broadcast_orphan_records_total` and emits no WS message. - [ ] After 30s, modifying `entry_devices` directly in Postgres and publishing a position routes correctly to the new event's subscribers. - [ ] Broadcast lag p50 < 100ms, p95 < 500ms with a small subscriber count (≤20). ## Risks / open questions - **30s staleness window** is acceptable for pilot but worth surfacing in operator docs. "If you just registered a device, wait 30s before expecting it on the map" is a reasonable line in the dogfood README. - **Memory cost of `DeviceEventMap`.** For 500 devices × 10 events average, ~5000 entries. Trivial. - **What about devices registered to *multiple* events at the same time?** Schema allows it (one device on multiple `entry_devices` rows). Fan-out handles it: each event's subscribers get the message. The SPA may want to filter by event on its end if it's showing a single event. - **Memory leak from `topicConnections` if registry isn't cleaning up.** Defensive: log a warning if `registry.stats().topics` exceeds a sanity threshold (e.g. 1000) to surface a leak before it OOMs. ## Done (Filled in when the task lands.)