ROADMAP.md establishes status legend, architectural anchors pointing at the wiki, and seven non-negotiable design rules — most importantly the core/domain boundary that protects Phase 1 from Phase 2 churn, the schema-authority split (positions hypertable owned here; everything else owned by Directus), and idempotent-writes via (device_id, ts) ON CONFLICT. Phase 1 (throughput pipeline) is fully detailed across 11 task files: scaffold, core types + sentinel decoder, config + logging, Postgres hypertable, Redis Stream consumer, per-device LRU state, batched writer, main wiring, observability, integration test, Dockerfile + Gitea CI. Observability is in Phase 1 (not deferred) — lesson learned from tcp-ingestion task 1.10. Phases 2-4 are stub READMEs. Phase 2 (domain logic) blocks on Directus schema decisions and lists those open questions explicitly. Phase 3 (production hardening) and Phase 4 (future) sketch the task shape.
5.1 KiB
Task 1.5 — Redis Stream consumer (XREADGROUP)
Phase: 1 — Throughput pipeline
Status: ⬜ Not started
Depends on: 1.2, 1.3
Wiki refs: docs/wiki/entities/redis-streams.md, docs/wiki/entities/processor.md
Goal
Build the Redis Stream consumer: join the consumer group, fetch batches via XREADGROUP, decode each entry to a Position, hand off to a sink callback, and return successfully-handled IDs to the caller for XACK.
This task does not wire in the Postgres writer or the in-memory state — those are tasks 1.7 and 1.6, joined to the consumer in 1.8. The consumer accepts a sink: (records: ConsumedRecord[]) => Promise<string[]> callback that returns the IDs it wants ACKed. Only those IDs are ACKed; failures stay pending and get claimed on the next loop.
Deliverables
src/core/consumer.tsexporting:createConsumer(redis, config, logger, metrics, sink): Consumer— factory.Consumerinterface:start(): Promise<void>(returns when the consumer loop starts),stop(): Promise<void>(signals the loop to exit, waits for the in-flight batch).ensureConsumerGroup(redis, stream, group)—XGROUP CREATE ... MKSTREAMignoringBUSYGROUPerrors. Called once at start.type ConsumedRecord = { id: string; position: Position; codec: string; ts: string }— what's passed to the sink.
test/consumer.test.ts(mockedioredis):- Decodes a synthetic stream entry into a
ConsumedRecordwith the right shape. - Calls
sinkwith the decoded batch and ACKs only the IDs the sink returned. - On
BUSYGROUPerror fromXGROUP CREATE, swallows the error and continues. - On a malformed payload, increments
consumer_decode_errors_total, logs aterror, and does not ACK the bad entry — leaves it pending for inspection. - On
stop(), the loop exits cleanly without losing in-flight work.
- Decodes a synthetic stream entry into a
Specification
Consumer loop shape
async function runLoop() {
while (!stopping) {
let entries: StreamEntry[];
try {
entries = await redis.xreadgroup(
'GROUP', group, consumerName,
'COUNT', batchSize,
'BLOCK', batchBlockMs,
'STREAMS', stream, '>',
);
} catch (err) {
logger.error({ err }, 'XREADGROUP failed; backing off');
await sleep(1000);
continue;
}
if (!entries) continue; // BLOCK timeout
const records = decodeBatch(entries); // <— may emit decode errors
const ackIds = await sink(records); // <— writer + state
if (ackIds.length > 0) {
await redis.xack(stream, group, ...ackIds);
}
}
}
Decode error handling
decodeBatch calls decodePosition (from task 1.2) on each entry's payload field. If a single entry fails to decode:
- Increment
processor_decode_errors_total{stream=...}. - Log at
errorwith the entry ID and a truncated raw payload (first 256 chars). - Skip the entry — do not pass to sink, do not ACK. It stays in the consumer's PEL (Pending Entries List) and will be re-attempted on next claim. Phase 3 will route truly-poison entries to a dead-letter stream; for Phase 1, leaving them pending and visible in
XPENDINGis enough.
XACK semantics
ACK only what the sink returned. If the sink returns ['id1', 'id3'] from a batch of [id1, id2, id3], then id2 stays pending. Why a sink might return a partial list: it failed to write some records. The consumer must trust the sink's signal — never ACK speculatively.
Consumer group setup
On start():
XGROUP CREATE <stream> <group> $ MKSTREAM— creates the stream if missing, group at "now" so we don't replay history. If the group already exists, the call returnsBUSYGROUP Consumer Group name already exists— catch and ignore.- Log at
infowhether the group was created or already existed.
Why > not 0 for the read ID
> means "deliver only new entries, not pending ones for this consumer." That's what we want for the steady-state loop. Phase 3 will add an explicit XAUTOCLAIM step at startup (and periodically) to pull stuck pending entries from dead consumers; Phase 1 relies on the natural redelivery via consumer-group resumption (when a dead instance restarts with the same name, it sees its old PEL).
Acceptance criteria
pnpm typecheck,pnpm lint,pnpm testclean.- Unit tests cover: happy path,
BUSYGROUPswallow, decode error skip, partial-ACK, clean stop. - Stop signal causes the loop to exit within one
BATCH_BLOCK_MStick.
Risks / open questions
- Consumer name uniqueness. Two instances with the same
REDIS_CONSUMER_NAMEwill both read from the same PEL, which is undefined behaviour. Task 1.3 already documents thatINSTANCE_ID(which defaultsREDIS_CONSUMER_NAME) must be unique per instance — surface this again in the operator-facing README later. - Long sink calls block the loop. If the Postgres writer takes 30s, no new records are read. That's fine for Phase 1 (Postgres should be fast); Phase 3 may add a configurable max-in-flight if writer pressure becomes an issue.
Done
(Fill in once complete: commit SHA, brief notes.)