Files
julian fa50df3e27 docs(planning): mark Phase 1.5 live broadcast as Done
Tasks 1.5.4, 1.5.5, 1.5.6 marked 🟩 with commit hashes and implementation
notes. Phase 1.5 status updated to Done in ROADMAP.md.
2026-05-02 18:39:22 +02:00

226 lines
12 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Task 1.5.4 — Broadcast consumer group & fan-out
**Phase:** 1.5 — Live broadcast
**Status:** 🟩 Done
**Depends on:** 1.5.3
**Wiki refs:** `docs/wiki/synthesis/processor-ws-contract.md` §Streaming updates, §Multi-instance behaviour; `docs/wiki/concepts/live-channel-architecture.md` §Multi-instance Processor
## Goal
Read the same `telemetry:teltonika` Redis stream as Phase 1's durable-write consumer, but on a **per-instance** consumer group `live-broadcast-{instance_id}`, and fan out each position record to the connections subscribed to its event. The durable-write path is unaffected; this is an additional read with a different group name and a different sink.
After this task, a position published to Redis arrives on the SPA's WebSocket within ~50ms (in-process Redis read + per-event index lookup + JSON serialise + WS send).
## Deliverables
- `src/live/broadcast.ts` exporting:
- `createBroadcastConsumer(redis, registry, deviceToEvent, config, logger, metrics): BroadcastConsumer` — factory.
- `BroadcastConsumer` interface: `start(): Promise<void>`, `stop(): Promise<void>`. Same lifecycle shape as Phase 1's `Consumer`.
- The fan-out loop: read batch via `XREADGROUP`, for each record decode → look up the device's event → fetch subscribers → emit one `position` message per subscriber.
- `src/live/device-event-map.ts` exporting:
- `createDeviceEventMap(pool, config, logger): DeviceEventMap` — factory.
- `DeviceEventMap.lookup(deviceId: string): string[]` — returns the event IDs the device is registered to *right now*. Cached in memory; refreshed on a cadence (default every 30s) and on demand (via a Redis Stream invalidation signal — same pattern as Phase 1's `recompute:requests`, but for entry-device assignments). For pilot, the cadence-based refresh is enough; manual invalidation can land later.
- The query: `SELECT entry_devices.device_id, entries.event_id FROM entry_devices JOIN entries ON entries.id = entry_devices.entry_id`.
- `src/main.ts` updated to wire and start the broadcast consumer alongside the existing throughput consumer; SIGTERM stops both (live server first, broadcast consumer second, durable-write consumer last).
- New config keys (zod):
- `LIVE_BROADCAST_GROUP_PREFIX` (default `live-broadcast`) — full group name is `${prefix}-${INSTANCE_ID}`.
- `LIVE_BROADCAST_BATCH_SIZE` (default `100`).
- `LIVE_BROADCAST_BATCH_BLOCK_MS` (default `1000`).
- `LIVE_DEVICE_EVENT_REFRESH_MS` (default `30_000`).
- New Prometheus metrics:
- `processor_live_broadcast_records_total{instance_id}` (counter).
- `processor_live_broadcast_fanout_messages_total{instance_id}` (counter) — per outbound `position` frame sent.
- `processor_live_broadcast_orphan_records_total{instance_id}` (counter) — records for devices not registered to any event.
- `processor_live_broadcast_lag_ms` (histogram) — time from `XADD` (record's `ts` field) to fan-out send.
- `test/live-broadcast.test.ts`:
- With a fake stream entry for a device registered to `event:E1` and one subscriber to `event:E1`, fan-out sends one `position` message to that subscriber.
- Multiple subscribers on the same event each receive the message.
- A device registered to no event increments `orphan_records_total` and emits no message.
- Devices registered to multiple events emit one message per subscribing connection per event (i.e. a connection subscribed to both events for the same device receives two messages — they're per-topic).
## Specification
### Why a separate consumer group
Phase 1's durable-write consumer is on group `processor` (configurable, default in `tcp-ingestion` and matched in Processor). Two instances share that group and Redis splits records across them — exactly one instance handles each write.
Live broadcast needs the opposite: **every instance must see every record**, because each instance has its own connected clients. The clean way to do that with Redis Streams is one group per instance. Group name `live-broadcast-{instance_id}` ensures uniqueness; each instance's `XREADGROUP` gets the full firehose for that group.
The two reads are independent — the durable-write group's offset and the live-broadcast group's offset are separate. A slow durable write doesn't slow down broadcast and vice versa.
### Fan-out shape
```ts
async function runLoop() {
while (!stopping) {
let entries: StreamEntry[];
try {
entries = await redis.xreadgroup(
'GROUP', groupName, consumerName,
'COUNT', config.LIVE_BROADCAST_BATCH_SIZE,
'BLOCK', config.LIVE_BROADCAST_BATCH_BLOCK_MS,
'STREAMS', config.REDIS_TELEMETRY_STREAM, '>',
);
} catch (err) {
logger.error({ err }, 'broadcast XREADGROUP failed; backing off');
await sleep(1000);
continue;
}
if (!entries) continue;
for (const entry of decodeBatch(entries)) {
metrics.broadcastRecords.inc();
await fanOut(entry);
// ACK immediately — broadcast doesn't need durability semantics.
await redis.xack(config.REDIS_TELEMETRY_STREAM, groupName, entry.id);
}
}
}
async function fanOut(record: ConsumedRecord) {
const eventIds = deviceToEvent.lookup(record.position.deviceId);
if (eventIds.length === 0) {
metrics.broadcastOrphans.inc();
return;
}
const message = toPositionMessage(record.position); // shape per processor-ws-contract
for (const eventId of eventIds) {
const topic = `event:${eventId}`;
const conns = registry.connectionsForTopic(topic);
for (const conn of conns) {
sendOutbound(conn, { ...message, topic });
metrics.broadcastFanout.inc();
}
}
metrics.broadcastLag.observe(Date.now() - record.position.ts);
}
```
### Why ACK immediately
Phase 1's durable-write consumer ACKs only after Postgres confirms the write — that's the `XACK` discipline that protects against data loss. The broadcast consumer has different durability semantics: **a missed broadcast is acceptable.** If a position fails to fan out (because the connection crashed mid-send, say), the next position is what matters. Don't keep a pending entry just to retry an obsolete record.
ACK-on-consume keeps the broadcast group's PEL empty, prevents pending-entry buildup, and avoids the "send the same position twice on retry" anti-feature. Phase 3 hardening can revisit if we ever need broadcast guarantees.
### `DeviceEventMap` design
The fan-out path needs to answer "which events does this device belong to?" thousands of times per second. The naive answer — query Postgres on each record — is wrong. Two options:
**Option A: In-process cache with periodic refresh.** Load the full `entry_devices` `entries` join at startup; refresh every 30s. Stale data window: up to 30s. **Pick this for pilot.**
**Option B: Listen for changes.** Add a `entry-devices:changed` Redis Stream (or use Directus Flows to publish on writes); broadcast invalidates affected entries. Stale data window: ~50ms. Adds protocol surface and a coordination point.
For pilot: Option A. The 30s staleness window is acceptable — operators register devices before the event starts, and "you registered a new device 30s ago and it's not on the map yet" is a tolerable UX. Phase 3+ can promote to Option B if real-time registration matters.
```ts
class DeviceEventMap {
private map = new Map<string, Set<string>>(); // deviceId → Set<eventId>
private timer: NodeJS.Timeout | null = null;
async start() {
await this.refresh();
this.timer = setInterval(() => {
this.refresh().catch(err => logger.warn({ err }, 'device-event map refresh failed'));
}, config.LIVE_DEVICE_EVENT_REFRESH_MS);
}
async stop() {
if (this.timer) clearInterval(this.timer);
}
async refresh() {
const start = performance.now();
const result = await pool.query<{ device_id: string; event_id: string }>(
`SELECT ed.device_id, e.event_id
FROM entry_devices ed
JOIN entries e ON e.id = ed.entry_id`
);
const next = new Map<string, Set<string>>();
for (const row of result.rows) {
if (!next.has(row.device_id)) next.set(row.device_id, new Set());
next.get(row.device_id)!.add(row.event_id);
}
this.map = next;
metrics.deviceEventRefreshLatency.observe(performance.now() - start);
metrics.deviceEventEntries.set(next.size);
logger.debug({ devices: next.size }, 'device-event map refreshed');
}
lookup(deviceId: string): string[] {
return Array.from(this.map.get(deviceId) ?? []);
}
}
```
### Back-pressure
If a connection's send queue is backing up (slow client, slow network), the WS library queues messages in process memory. At 100 msg/s × 10s of slow consumer = 1000 queued messages × ~200 bytes each = 200KB per slow connection. Tolerable.
If we ever see real back-pressure problems: per-connection bounded queue (e.g. last 100 positions per device, dropping older), with a metric `processor_live_broadcast_dropped_total`. Document but defer.
For now: rely on `ws.bufferedAmount` to detect slow consumers; if it exceeds a threshold (say 1MB), close the connection with code 1008 (policy violation) and log. Client reconnects. Worth implementing as a defensive measure even for pilot — prevents one slow client from eating all the memory.
```ts
function sendOutbound(conn: LiveConnection, msg: OutboundMessage) {
if (conn.ws.readyState !== WebSocket.OPEN) return;
if (conn.ws.bufferedAmount > config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES) {
logger.warn({ connId: conn.id, buffered: conn.ws.bufferedAmount }, 'closing slow connection');
conn.ws.close(1008, 'back-pressure threshold exceeded');
return;
}
conn.ws.send(JSON.stringify(msg));
metrics.liveMessagesOutbound.inc({ type: msg.type });
}
```
(Update 1.5.1's `sendOutbound` to include this check; add `LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES` config with default `1_048_576`.)
### `toPositionMessage`
```ts
function toPositionMessage(p: Position): Omit<PositionMessage, 'topic'> {
const msg: any = {
type: 'position',
deviceId: p.deviceId,
lat: p.latitude,
lon: p.longitude,
ts: p.ts.getTime(), // epoch ms; contract is number, not ISO string
};
if (p.speed != null) msg.speed = p.speed;
if (p.course != null) msg.course = p.course;
if (p.accuracy != null) msg.accuracy = p.accuracy;
if (p.attributes && Object.keys(p.attributes).length > 0) msg.attributes = p.attributes;
return msg;
}
```
Per the contract: omit fields rather than send `null` for absent values.
## Acceptance criteria
- [ ] `pnpm typecheck`, `pnpm lint`, `pnpm test` clean.
- [ ] `pnpm dev` boots; logs show both consumer groups joining (`processor` and `live-broadcast-{instance_id}`).
- [ ] With a subscribed `wscat` client and a synthetic position published to `telemetry:teltonika`, the client receives a `{"type":"position",...}` frame within ~100ms.
- [ ] A second `wscat` client subscribed to the same event also receives the message.
- [ ] An orphan position (device not in any `entry_devices` row) increments `processor_live_broadcast_orphan_records_total` and emits no WS message.
- [ ] After 30s, modifying `entry_devices` directly in Postgres and publishing a position routes correctly to the new event's subscribers.
- [ ] Broadcast lag p50 < 100ms, p95 < 500ms with a small subscriber count (≤20).
## Risks / open questions
- **30s staleness window** is acceptable for pilot but worth surfacing in operator docs. "If you just registered a device, wait 30s before expecting it on the map" is a reasonable line in the dogfood README.
- **Memory cost of `DeviceEventMap`.** For 500 devices × 10 events average, ~5000 entries. Trivial.
- **What about devices registered to *multiple* events at the same time?** Schema allows it (one device on multiple `entry_devices` rows). Fan-out handles it: each event's subscribers get the message. The SPA may want to filter by event on its end if it's showing a single event.
- **Memory leak from `topicConnections` if registry isn't cleaning up.** Defensive: log a warning if `registry.stats().topics` exceeds a sanity threshold (e.g. 1000) to surface a leak before it OOMs.
## Done
Landed in `c07ea0e`. Key implementation decisions:
- `CodecError`/`decodePosition` moved to `src/shared/codec.ts`; `Position`/`AttributeValue` moved to `src/shared/types.ts`. Both `src/core/` and `src/live/` re-export from shared to preserve existing import paths.
- `broadcast.ts` ACKs all stream entries immediately (durability not needed for fan-out).
- Test uses a `stopSignal` Promise to coordinate between the broadcast loop and the test's `stop()` call, avoiding the tight-loop OOM that naive polling triggers.