Tasks 1.5.4, 1.5.5, 1.5.6 marked 🟩 with commit hashes and implementation
notes. Phase 1.5 status updated to Done in ROADMAP.md.
8.7 KiB
processor — Roadmap
A Node.js worker service that consumes normalized Position records from a Redis Stream, maintains per-device runtime state, applies racing-domain rules, and writes durable state to Postgres / TimescaleDB.
This file is the single navigation hub for all implementation planning. Each phase has its own folder with a README and granular task files. Update statuses here as work lands.
Status legend
| Symbol | Meaning |
|---|---|
| ⬜ | Not started |
| 🟦 | Planned (designed, not coded) |
| 🟨 | In progress |
| 🟩 | Done |
| ⏸ | Paused / blocked |
| ❄️ | Frozen / future / optional |
Architectural anchors
The service is specified by the wiki at ../docs/wiki/. Implementing agents should read these pages before starting any task:
- Architecture —
docs/wiki/sources/gps-tracking-architecture.md,docs/wiki/concepts/plane-separation.md,docs/wiki/concepts/failure-domains.md - This service —
docs/wiki/entities/processor.md - Upstream contract (input) —
docs/wiki/concepts/position-record.md,docs/wiki/concepts/io-element-bag.md,docs/wiki/entities/redis-streams.md - Downstream contract (output) —
docs/wiki/entities/postgres-timescaledb.md,docs/wiki/entities/directus.md
Non-negotiable design rules
These rules govern every task. Any deviation must be discussed and documented as a decision before code lands.
- Domain logic is isolated.
src/core/(Stream consumer, Postgres writer, in-memory state plumbing) never imports fromsrc/domain/(geofence engine, timing logic, IO interpretation). Phase 2 must be a pure addition layered on top of the Phase 1 throughput pipeline. - Schema authority lives in Directus, with one exception: the
positionshypertable is bulk-written by this service and its migration is owned here. All other tables Processor writes to (timing_records, stage_results, etc.) are defined in Directus and Processor inserts respecting that schema. - Per-device state is in-memory, not durable. The DB is the source of truth for replay/analysis; in-memory state is the source of truth for the current decision. On restart, hot state is rehydrated from the DB — Phase 1 does not implement rehydration; restart loses state, which is acceptable until Phase 2 introduces stateful accumulators.
- Consumer-group offsets drive work distribution. No application-level coordination between Processor instances.
XACKon success; failed batches stay pending and are claimed by surviving instances viaXAUTOCLAIM. - Idempotent writes. Records arriving twice (after a claim, replay, or retry) must not produce duplicate rows. The
positionshypertable uses(device_id, ts)as a unique key withON CONFLICT DO NOTHING. Derived tables follow the same pattern, scoped by their natural keys. - Bounded memory. Per-device state is capped (LRU eviction by last-seen timestamp); replay-from-DB rehydrates an evicted device on next packet. No unbounded
Map<imei, ...>growth. - Fail loudly. Schema-incompatible records (e.g. malformed payload, unknown sentinel) are logged at
errorand dead-letter-streamed (Phase 3); they do not silently skip.
Phases
Phase 1 — Throughput pipeline
Status: 🟩 Done
Outcome: A Node.js Processor that joins a Redis Streams consumer group on telemetry:teltonika, decodes each Position (including __bigint/__buffer_b64 sentinel reversal), upserts it into a TimescaleDB positions hypertable, updates per-device in-memory state (last position, last seen), XACKs on successful write, and exposes Prometheus metrics + health/readiness HTTP endpoints. End-to-end pilot-quality service; no domain logic yet.
See phase-1-throughput/README.md
| # | Task | Status | Landed in |
|---|---|---|---|
| 1.1 | Project scaffold | 🟩 | 290a08e |
| 1.2 | Core types & contracts | 🟩 | 290a08e |
| 1.3 | Configuration & logging | 🟩 | 290a08e |
| 1.4 | Postgres connection & positions hypertable |
🟩 | 290a08e |
| 1.5 | Redis Stream consumer (XREADGROUP) | 🟩 | 68d3da3 |
| 1.6 | Per-device in-memory state | 🟩 | 68d3da3 |
| 1.7 | Position writer (batched upsert) | 🟩 | 68d3da3 |
| 1.8 | Main wiring & ACK semantics | 🟩 | 68d3da3 |
| 1.9 | Observability (Prometheus metrics + /healthz + /readyz) | 🟩 | 9791620 |
| 1.10 | Integration test (testcontainers Redis + Postgres) | 🟩 | 9791620 |
| 1.11 | Dockerfile & Gitea workflow | 🟩 | 9791620 |
Phase 1.5 — Live broadcast
Status: 🟩 Done
Outcome: WebSocket endpoint inside the Processor that fans live position updates from Redis to subscribed react-spa clients. Cookie-based auth via Directus's /users/me, per-event subscription with one-time authorization at subscribe time, snapshot-on-subscribe, multi-instance per-instance consumer-group fan-out. The wire spec is docs/wiki/synthesis/processor-ws-contract.md. Unblocks the SPA's live-map feature for the Rally Albania 2026 dogfood.
See phase-1-5-live-broadcast/README.md
| # | Task | Status | Landed in |
|---|---|---|---|
| 1.5.1 | WS server scaffold + heartbeat | 🟩 | b8ebbd0 |
| 1.5.2 | Cookie auth handshake | 🟩 | 190254d |
| 1.5.3 | Subscription registry & per-event authorization | 🟩 | 38de4bc |
| 1.5.4 | Broadcast consumer group & fan-out | 🟩 | c07ea0e |
| 1.5.5 | Snapshot-on-subscribe | 🟩 | f4b50ca |
| 1.5.6 | Integration test (testcontainers Redis + Postgres + Directus stub) | 🟩 | 2f2cf5c |
Phase 2 — Domain logic
Status: ⬜ Not started — blocks on Directus schema decisions
Outcome: Geofence engine that detects entry/checkpoint/finish crossings; per-model Teltonika IO mapping driving derived attributes (odometer_km, ignition, etc.); timing record writer producing entries in the Directus-owned timing_records table; per-stage result aggregator. Layered on top of Phase 1 — no changes to the throughput pipeline.
Detailed task breakdown deferred until the Directus schema is finalized (open questions about geofence ownership, IO mapping storage, stage vocabulary). Phase 1 can ship and run on stage without any Phase 2 work.
Phase 3 — Production hardening
Status: ⬜ Not started
Outcome: Graceful shutdown with consumer-group commit on SIGTERM; per-device state rehydration from Postgres on startup (only loaded on first packet for a given device); XAUTOCLAIM for stuck pending entries from a dead instance; dead-letter stream for poison records; multi-instance load-split verification; OPERATIONS.md runbook.
See phase-3-hardening/README.md
Phase 4 — Future / optional
Status: ❄️ Not committed
See phase-4-future/README.md
Ideas on radar: Directus Flow trigger emission, replay tooling, derived-metric backfill, alternate consumer for analytics export.
Operating model
- Implementation agent contract. Each task file is self-sufficient: goal, deliverables, specification, acceptance criteria. An agent should be able to complete one task without reading the whole wiki — but should skim the wiki references at the top of the task before starting.
- Sequence within a phase. Task numbering reflects intended order. Soft dependencies are explicit in each task's "Depends on" field. Tasks with no dependencies on each other can be done in parallel.
- Status updates. When a task is started, change its row in this ROADMAP to 🟨 and the task file's status badge accordingly. When done, 🟩 + a one-line note in the task file's "Done" section pointing at the merging commit/PR.
- Drift control. If implementation diverges from a task's spec, update the task file before the diverging code lands, with a note explaining why. Do not let plans rot — either fix the plan or fix the code.