Files
julian fa50df3e27 docs(planning): mark Phase 1.5 live broadcast as Done
Tasks 1.5.4, 1.5.5, 1.5.6 marked 🟩 with commit hashes and implementation
notes. Phase 1.5 status updated to Done in ROADMAP.md.
2026-05-02 18:39:22 +02:00

12 KiB
Raw Permalink Blame History

Task 1.5.4 — Broadcast consumer group & fan-out

Phase: 1.5 — Live broadcast Status: 🟩 Done Depends on: 1.5.3 Wiki refs: docs/wiki/synthesis/processor-ws-contract.md §Streaming updates, §Multi-instance behaviour; docs/wiki/concepts/live-channel-architecture.md §Multi-instance Processor

Goal

Read the same telemetry:teltonika Redis stream as Phase 1's durable-write consumer, but on a per-instance consumer group live-broadcast-{instance_id}, and fan out each position record to the connections subscribed to its event. The durable-write path is unaffected; this is an additional read with a different group name and a different sink.

After this task, a position published to Redis arrives on the SPA's WebSocket within ~50ms (in-process Redis read + per-event index lookup + JSON serialise + WS send).

Deliverables

  • src/live/broadcast.ts exporting:
    • createBroadcastConsumer(redis, registry, deviceToEvent, config, logger, metrics): BroadcastConsumer — factory.
    • BroadcastConsumer interface: start(): Promise<void>, stop(): Promise<void>. Same lifecycle shape as Phase 1's Consumer.
    • The fan-out loop: read batch via XREADGROUP, for each record decode → look up the device's event → fetch subscribers → emit one position message per subscriber.
  • src/live/device-event-map.ts exporting:
    • createDeviceEventMap(pool, config, logger): DeviceEventMap — factory.
    • DeviceEventMap.lookup(deviceId: string): string[] — returns the event IDs the device is registered to right now. Cached in memory; refreshed on a cadence (default every 30s) and on demand (via a Redis Stream invalidation signal — same pattern as Phase 1's recompute:requests, but for entry-device assignments). For pilot, the cadence-based refresh is enough; manual invalidation can land later.
    • The query: SELECT entry_devices.device_id, entries.event_id FROM entry_devices JOIN entries ON entries.id = entry_devices.entry_id.
  • src/main.ts updated to wire and start the broadcast consumer alongside the existing throughput consumer; SIGTERM stops both (live server first, broadcast consumer second, durable-write consumer last).
  • New config keys (zod):
    • LIVE_BROADCAST_GROUP_PREFIX (default live-broadcast) — full group name is ${prefix}-${INSTANCE_ID}.
    • LIVE_BROADCAST_BATCH_SIZE (default 100).
    • LIVE_BROADCAST_BATCH_BLOCK_MS (default 1000).
    • LIVE_DEVICE_EVENT_REFRESH_MS (default 30_000).
  • New Prometheus metrics:
    • processor_live_broadcast_records_total{instance_id} (counter).
    • processor_live_broadcast_fanout_messages_total{instance_id} (counter) — per outbound position frame sent.
    • processor_live_broadcast_orphan_records_total{instance_id} (counter) — records for devices not registered to any event.
    • processor_live_broadcast_lag_ms (histogram) — time from XADD (record's ts field) to fan-out send.
  • test/live-broadcast.test.ts:
    • With a fake stream entry for a device registered to event:E1 and one subscriber to event:E1, fan-out sends one position message to that subscriber.
    • Multiple subscribers on the same event each receive the message.
    • A device registered to no event increments orphan_records_total and emits no message.
    • Devices registered to multiple events emit one message per subscribing connection per event (i.e. a connection subscribed to both events for the same device receives two messages — they're per-topic).

Specification

Why a separate consumer group

Phase 1's durable-write consumer is on group processor (configurable, default in tcp-ingestion and matched in Processor). Two instances share that group and Redis splits records across them — exactly one instance handles each write.

Live broadcast needs the opposite: every instance must see every record, because each instance has its own connected clients. The clean way to do that with Redis Streams is one group per instance. Group name live-broadcast-{instance_id} ensures uniqueness; each instance's XREADGROUP gets the full firehose for that group.

The two reads are independent — the durable-write group's offset and the live-broadcast group's offset are separate. A slow durable write doesn't slow down broadcast and vice versa.

Fan-out shape

async function runLoop() {
  while (!stopping) {
    let entries: StreamEntry[];
    try {
      entries = await redis.xreadgroup(
        'GROUP', groupName, consumerName,
        'COUNT', config.LIVE_BROADCAST_BATCH_SIZE,
        'BLOCK', config.LIVE_BROADCAST_BATCH_BLOCK_MS,
        'STREAMS', config.REDIS_TELEMETRY_STREAM, '>',
      );
    } catch (err) {
      logger.error({ err }, 'broadcast XREADGROUP failed; backing off');
      await sleep(1000);
      continue;
    }
    if (!entries) continue;

    for (const entry of decodeBatch(entries)) {
      metrics.broadcastRecords.inc();
      await fanOut(entry);
      // ACK immediately — broadcast doesn't need durability semantics.
      await redis.xack(config.REDIS_TELEMETRY_STREAM, groupName, entry.id);
    }
  }
}

async function fanOut(record: ConsumedRecord) {
  const eventIds = deviceToEvent.lookup(record.position.deviceId);
  if (eventIds.length === 0) {
    metrics.broadcastOrphans.inc();
    return;
  }

  const message = toPositionMessage(record.position);  // shape per processor-ws-contract

  for (const eventId of eventIds) {
    const topic = `event:${eventId}`;
    const conns = registry.connectionsForTopic(topic);
    for (const conn of conns) {
      sendOutbound(conn, { ...message, topic });
      metrics.broadcastFanout.inc();
    }
  }

  metrics.broadcastLag.observe(Date.now() - record.position.ts);
}

Why ACK immediately

Phase 1's durable-write consumer ACKs only after Postgres confirms the write — that's the XACK discipline that protects against data loss. The broadcast consumer has different durability semantics: a missed broadcast is acceptable. If a position fails to fan out (because the connection crashed mid-send, say), the next position is what matters. Don't keep a pending entry just to retry an obsolete record.

ACK-on-consume keeps the broadcast group's PEL empty, prevents pending-entry buildup, and avoids the "send the same position twice on retry" anti-feature. Phase 3 hardening can revisit if we ever need broadcast guarantees.

DeviceEventMap design

The fan-out path needs to answer "which events does this device belong to?" thousands of times per second. The naive answer — query Postgres on each record — is wrong. Two options:

Option A: In-process cache with periodic refresh. Load the full entry_devices entries join at startup; refresh every 30s. Stale data window: up to 30s. Pick this for pilot.

Option B: Listen for changes. Add a entry-devices:changed Redis Stream (or use Directus Flows to publish on writes); broadcast invalidates affected entries. Stale data window: ~50ms. Adds protocol surface and a coordination point.

For pilot: Option A. The 30s staleness window is acceptable — operators register devices before the event starts, and "you registered a new device 30s ago and it's not on the map yet" is a tolerable UX. Phase 3+ can promote to Option B if real-time registration matters.

class DeviceEventMap {
  private map = new Map<string, Set<string>>();  // deviceId → Set<eventId>
  private timer: NodeJS.Timeout | null = null;

  async start() {
    await this.refresh();
    this.timer = setInterval(() => {
      this.refresh().catch(err => logger.warn({ err }, 'device-event map refresh failed'));
    }, config.LIVE_DEVICE_EVENT_REFRESH_MS);
  }

  async stop() {
    if (this.timer) clearInterval(this.timer);
  }

  async refresh() {
    const start = performance.now();
    const result = await pool.query<{ device_id: string; event_id: string }>(
      `SELECT ed.device_id, e.event_id
       FROM entry_devices ed
       JOIN entries e ON e.id = ed.entry_id`
    );
    const next = new Map<string, Set<string>>();
    for (const row of result.rows) {
      if (!next.has(row.device_id)) next.set(row.device_id, new Set());
      next.get(row.device_id)!.add(row.event_id);
    }
    this.map = next;
    metrics.deviceEventRefreshLatency.observe(performance.now() - start);
    metrics.deviceEventEntries.set(next.size);
    logger.debug({ devices: next.size }, 'device-event map refreshed');
  }

  lookup(deviceId: string): string[] {
    return Array.from(this.map.get(deviceId) ?? []);
  }
}

Back-pressure

If a connection's send queue is backing up (slow client, slow network), the WS library queues messages in process memory. At 100 msg/s × 10s of slow consumer = 1000 queued messages × ~200 bytes each = 200KB per slow connection. Tolerable.

If we ever see real back-pressure problems: per-connection bounded queue (e.g. last 100 positions per device, dropping older), with a metric processor_live_broadcast_dropped_total. Document but defer.

For now: rely on ws.bufferedAmount to detect slow consumers; if it exceeds a threshold (say 1MB), close the connection with code 1008 (policy violation) and log. Client reconnects. Worth implementing as a defensive measure even for pilot — prevents one slow client from eating all the memory.

function sendOutbound(conn: LiveConnection, msg: OutboundMessage) {
  if (conn.ws.readyState !== WebSocket.OPEN) return;
  if (conn.ws.bufferedAmount > config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES) {
    logger.warn({ connId: conn.id, buffered: conn.ws.bufferedAmount }, 'closing slow connection');
    conn.ws.close(1008, 'back-pressure threshold exceeded');
    return;
  }
  conn.ws.send(JSON.stringify(msg));
  metrics.liveMessagesOutbound.inc({ type: msg.type });
}

(Update 1.5.1's sendOutbound to include this check; add LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES config with default 1_048_576.)

toPositionMessage

function toPositionMessage(p: Position): Omit<PositionMessage, 'topic'> {
  const msg: any = {
    type: 'position',
    deviceId: p.deviceId,
    lat: p.latitude,
    lon: p.longitude,
    ts: p.ts.getTime(),  // epoch ms; contract is number, not ISO string
  };
  if (p.speed != null) msg.speed = p.speed;
  if (p.course != null) msg.course = p.course;
  if (p.accuracy != null) msg.accuracy = p.accuracy;
  if (p.attributes && Object.keys(p.attributes).length > 0) msg.attributes = p.attributes;
  return msg;
}

Per the contract: omit fields rather than send null for absent values.

Acceptance criteria

  • pnpm typecheck, pnpm lint, pnpm test clean.
  • pnpm dev boots; logs show both consumer groups joining (processor and live-broadcast-{instance_id}).
  • With a subscribed wscat client and a synthetic position published to telemetry:teltonika, the client receives a {"type":"position",...} frame within ~100ms.
  • A second wscat client subscribed to the same event also receives the message.
  • An orphan position (device not in any entry_devices row) increments processor_live_broadcast_orphan_records_total and emits no WS message.
  • After 30s, modifying entry_devices directly in Postgres and publishing a position routes correctly to the new event's subscribers.
  • Broadcast lag p50 < 100ms, p95 < 500ms with a small subscriber count (≤20).

Risks / open questions

  • 30s staleness window is acceptable for pilot but worth surfacing in operator docs. "If you just registered a device, wait 30s before expecting it on the map" is a reasonable line in the dogfood README.
  • Memory cost of DeviceEventMap. For 500 devices × 10 events average, ~5000 entries. Trivial.
  • What about devices registered to multiple events at the same time? Schema allows it (one device on multiple entry_devices rows). Fan-out handles it: each event's subscribers get the message. The SPA may want to filter by event on its end if it's showing a single event.
  • Memory leak from topicConnections if registry isn't cleaning up. Defensive: log a warning if registry.stats().topics exceeds a sanity threshold (e.g. 1000) to surface a leak before it OOMs.

Done

Landed in c07ea0e. Key implementation decisions:

  • CodecError/decodePosition moved to src/shared/codec.ts; Position/AttributeValue moved to src/shared/types.ts. Both src/core/ and src/live/ re-export from shared to preserve existing import paths.
  • broadcast.ts ACKs all stream entries immediately (durability not needed for fan-out).
  • Test uses a stopSignal Promise to coordinate between the broadcast loop and the test's stop() call, avoiding the tight-loop OOM that naive polling triggers.