Files
processor/.planning/phase-1-throughput/05-stream-consumer.md
T
julian c314ba0902 Add planning documents for Phase 1 (throughput pipeline) and stub Phases 2-4
ROADMAP.md establishes status legend, architectural anchors pointing at the
wiki, and seven non-negotiable design rules — most importantly the
core/domain boundary that protects Phase 1 from Phase 2 churn, the
schema-authority split (positions hypertable owned here; everything else
owned by Directus), and idempotent-writes via (device_id, ts) ON CONFLICT.

Phase 1 (throughput pipeline) is fully detailed across 11 task files:
scaffold, core types + sentinel decoder, config + logging, Postgres
hypertable, Redis Stream consumer, per-device LRU state, batched writer,
main wiring, observability, integration test, Dockerfile + Gitea CI.
Observability is in Phase 1 (not deferred) — lesson learned from
tcp-ingestion task 1.10.

Phases 2-4 are stub READMEs. Phase 2 (domain logic) blocks on Directus
schema decisions and lists those open questions explicitly. Phase 3
(production hardening) and Phase 4 (future) sketch the task shape.
2026-04-30 21:16:59 +02:00

5.1 KiB

Task 1.5 — Redis Stream consumer (XREADGROUP)

Phase: 1 — Throughput pipeline Status: Not started Depends on: 1.2, 1.3 Wiki refs: docs/wiki/entities/redis-streams.md, docs/wiki/entities/processor.md

Goal

Build the Redis Stream consumer: join the consumer group, fetch batches via XREADGROUP, decode each entry to a Position, hand off to a sink callback, and return successfully-handled IDs to the caller for XACK.

This task does not wire in the Postgres writer or the in-memory state — those are tasks 1.7 and 1.6, joined to the consumer in 1.8. The consumer accepts a sink: (records: ConsumedRecord[]) => Promise<string[]> callback that returns the IDs it wants ACKed. Only those IDs are ACKed; failures stay pending and get claimed on the next loop.

Deliverables

  • src/core/consumer.ts exporting:
    • createConsumer(redis, config, logger, metrics, sink): Consumer — factory.
    • Consumer interface: start(): Promise<void> (returns when the consumer loop starts), stop(): Promise<void> (signals the loop to exit, waits for the in-flight batch).
    • ensureConsumerGroup(redis, stream, group)XGROUP CREATE ... MKSTREAM ignoring BUSYGROUP errors. Called once at start.
    • type ConsumedRecord = { id: string; position: Position; codec: string; ts: string } — what's passed to the sink.
  • test/consumer.test.ts (mocked ioredis):
    • Decodes a synthetic stream entry into a ConsumedRecord with the right shape.
    • Calls sink with the decoded batch and ACKs only the IDs the sink returned.
    • On BUSYGROUP error from XGROUP CREATE, swallows the error and continues.
    • On a malformed payload, increments consumer_decode_errors_total, logs at error, and does not ACK the bad entry — leaves it pending for inspection.
    • On stop(), the loop exits cleanly without losing in-flight work.

Specification

Consumer loop shape

async function runLoop() {
  while (!stopping) {
    let entries: StreamEntry[];
    try {
      entries = await redis.xreadgroup(
        'GROUP', group, consumerName,
        'COUNT', batchSize,
        'BLOCK', batchBlockMs,
        'STREAMS', stream, '>',
      );
    } catch (err) {
      logger.error({ err }, 'XREADGROUP failed; backing off');
      await sleep(1000);
      continue;
    }
    if (!entries) continue;  // BLOCK timeout

    const records = decodeBatch(entries);              // <— may emit decode errors
    const ackIds = await sink(records);                // <— writer + state
    if (ackIds.length > 0) {
      await redis.xack(stream, group, ...ackIds);
    }
  }
}

Decode error handling

decodeBatch calls decodePosition (from task 1.2) on each entry's payload field. If a single entry fails to decode:

  • Increment processor_decode_errors_total{stream=...}.
  • Log at error with the entry ID and a truncated raw payload (first 256 chars).
  • Skip the entry — do not pass to sink, do not ACK. It stays in the consumer's PEL (Pending Entries List) and will be re-attempted on next claim. Phase 3 will route truly-poison entries to a dead-letter stream; for Phase 1, leaving them pending and visible in XPENDING is enough.

XACK semantics

ACK only what the sink returned. If the sink returns ['id1', 'id3'] from a batch of [id1, id2, id3], then id2 stays pending. Why a sink might return a partial list: it failed to write some records. The consumer must trust the sink's signal — never ACK speculatively.

Consumer group setup

On start():

  1. XGROUP CREATE <stream> <group> $ MKSTREAM — creates the stream if missing, group at "now" so we don't replay history. If the group already exists, the call returns BUSYGROUP Consumer Group name already exists — catch and ignore.
  2. Log at info whether the group was created or already existed.

Why > not 0 for the read ID

> means "deliver only new entries, not pending ones for this consumer." That's what we want for the steady-state loop. Phase 3 will add an explicit XAUTOCLAIM step at startup (and periodically) to pull stuck pending entries from dead consumers; Phase 1 relies on the natural redelivery via consumer-group resumption (when a dead instance restarts with the same name, it sees its old PEL).

Acceptance criteria

  • pnpm typecheck, pnpm lint, pnpm test clean.
  • Unit tests cover: happy path, BUSYGROUP swallow, decode error skip, partial-ACK, clean stop.
  • Stop signal causes the loop to exit within one BATCH_BLOCK_MS tick.

Risks / open questions

  • Consumer name uniqueness. Two instances with the same REDIS_CONSUMER_NAME will both read from the same PEL, which is undefined behaviour. Task 1.3 already documents that INSTANCE_ID (which defaults REDIS_CONSUMER_NAME) must be unique per instance — surface this again in the operator-facing README later.
  • Long sink calls block the loop. If the Postgres writer takes 30s, no new records are read. That's fine for Phase 1 (Postgres should be fast); Phase 3 may add a configurable max-in-flight if writer pressure becomes an issue.

Done

(Fill in once complete: commit SHA, brief notes.)