diff --git a/.env.example b/.env.example index 0f8df1a..df77d6d 100644 --- a/.env.example +++ b/.env.example @@ -22,7 +22,9 @@ REDIS_URL=redis://localhost:6379 POSTGRES_URL=postgres://postgres:postgres@localhost:5432/trm # Redis Stream name to consume from. Must match tcp-ingestion's REDIS_TELEMETRY_STREAM. -REDIS_TELEMETRY_STREAM=telemetry:t +# In the deploy stack this is pinned via the shared REDIS_TELEMETRY_STREAM env +# var so neither service can drift from the other. +REDIS_TELEMETRY_STREAM=telemetry:teltonika # Redis consumer group name. All Processor instances join this group. REDIS_CONSUMER_GROUP=processor diff --git a/.planning/ROADMAP.md b/.planning/ROADMAP.md index 205b5f5..3cf3a92 100644 --- a/.planning/ROADMAP.md +++ b/.planning/ROADMAP.md @@ -41,7 +41,7 @@ These rules govern every task. Any deviation must be discussed and documented as ### Phase 1 — Throughput pipeline **Status:** 🟩 Done -**Outcome:** A Node.js Processor that joins a Redis Streams consumer group on `telemetry:t`, decodes each `Position` (including `__bigint`/`__buffer_b64` sentinel reversal), upserts it into a TimescaleDB `positions` hypertable, updates per-device in-memory state (last position, last seen), `XACK`s on successful write, and exposes Prometheus metrics + health/readiness HTTP endpoints. End-to-end pilot-quality service; no domain logic yet. +**Outcome:** A Node.js Processor that joins a Redis Streams consumer group on `telemetry:teltonika`, decodes each `Position` (including `__bigint`/`__buffer_b64` sentinel reversal), upserts it into a TimescaleDB `positions` hypertable, updates per-device in-memory state (last position, last seen), `XACK`s on successful write, and exposes Prometheus metrics + health/readiness HTTP endpoints. End-to-end pilot-quality service; no domain logic yet. [**See `phase-1-throughput/README.md`**](./phase-1-throughput/README.md) diff --git a/.planning/phase-1-throughput/03-config-and-logging.md b/.planning/phase-1-throughput/03-config-and-logging.md index 171e230..0e2461b 100644 --- a/.planning/phase-1-throughput/03-config-and-logging.md +++ b/.planning/phase-1-throughput/03-config-and-logging.md @@ -30,7 +30,7 @@ Validate environment variables on startup with `zod`, build the pino root logger | `LOG_LEVEL` | no | `info` | `trace` / `debug` / `info` / `warn` / `error` | | `REDIS_URL` | yes | — | e.g. `redis://redis:6379` | | `POSTGRES_URL` | yes | — | e.g. `postgres://user:pass@db:5432/trm` | -| `REDIS_TELEMETRY_STREAM` | no | `telemetry:t` | Must match `tcp-ingestion`'s `REDIS_TELEMETRY_STREAM` | +| `REDIS_TELEMETRY_STREAM` | no | `telemetry:teltonika` | Must match `tcp-ingestion`'s `REDIS_TELEMETRY_STREAM`. Pinned via the deploy-stack shared env var so the two services cannot drift from each other. | | `REDIS_CONSUMER_GROUP` | no | `processor` | All Processor instances join this group | | `REDIS_CONSUMER_NAME` | no | `${INSTANCE_ID}` | Unique per instance — defaults to instance id | | `METRICS_PORT` | no | `9090` | HTTP server port for `/metrics`, `/healthz`, `/readyz` | diff --git a/.planning/phase-1-throughput/08-main-wiring.md b/.planning/phase-1-throughput/08-main-wiring.md index dea150e..ab2e832 100644 --- a/.planning/phase-1-throughput/08-main-wiring.md +++ b/.planning/phase-1-throughput/08-main-wiring.md @@ -80,13 +80,13 @@ Match `tcp-ingestion`'s convention: - `debug` for per-batch: `batch consumed n=42`, `batch written inserted=40 duplicates=2 failed=0`. - `warn` / `error` for the obvious. -After this task lands you should be able to run `pnpm dev` against a local Redis + Postgres, publish a synthetic `Position` to `telemetry:t`, and watch a row appear in `positions` while seeing the lifecycle logs above. +After this task lands you should be able to run `pnpm dev` against a local Redis + Postgres, publish a synthetic `Position` to `telemetry:teltonika`, and watch a row appear in `positions` while seeing the lifecycle logs above. ## Acceptance criteria - [ ] `pnpm typecheck`, `pnpm lint`, `pnpm test` clean. - [ ] `pnpm dev` (with local Redis + Postgres reachable) shows the lifecycle log sequence and `processor ready`. -- [ ] Manually publishing a `Position` to `telemetry:t` results in a row in `positions` within seconds. +- [ ] Manually publishing a `Position` to `telemetry:teltonika` results in a row in `positions` within seconds. - [ ] SIGTERM during idle exits cleanly (no error, no force-exit warning). - [ ] SIGTERM with in-flight writes waits for them to complete before exiting. diff --git a/.planning/phase-1-throughput/10-integration-test.md b/.planning/phase-1-throughput/10-integration-test.md index 947483b..ca396d1 100644 --- a/.planning/phase-1-throughput/10-integration-test.md +++ b/.planning/phase-1-throughput/10-integration-test.md @@ -7,7 +7,7 @@ ## Goal -End-to-end pipeline test: spin up Redis 7 and TimescaleDB via testcontainers, boot the Processor against them, publish a synthetic `Position` to `telemetry:t`, verify the row appears in `positions` with byte-equivalent attribute decoding (bigint, Buffer included). +End-to-end pipeline test: spin up Redis 7 and TimescaleDB via testcontainers, boot the Processor against them, publish a synthetic `Position` to `telemetry:teltonika`, verify the row appears in `positions` with byte-equivalent attribute decoding (bigint, Buffer included). This is the integration test that proves the upstream contract from `tcp-ingestion` flows through end-to-end. Mirror `tcp-ingestion/test/publish.integration.test.ts`'s structure and skip-on-no-Docker pattern. @@ -35,7 +35,7 @@ Copy `tcp-ingestion/test/publish.integration.test.ts`'s pattern verbatim: ### Synthetic Position publishing -Reuse `serializePosition` from `tcp-ingestion`'s `publish.ts` if it can be imported (likely not — separate repos). Otherwise inline the encoding: a Position object → JSON.stringify with the bigint/Buffer replacer → `XADD telemetry:t * ts device_id codec 8E payload `. +Reuse `serializePosition` from `tcp-ingestion`'s `publish.ts` if it can be imported (likely not — separate repos). Otherwise inline the encoding: a Position object → JSON.stringify with the bigint/Buffer replacer → `XADD telemetry:teltonika * ts device_id codec 8E payload `. ### Why test 4 (writer failure → retry) diff --git a/.planning/phase-1-throughput/README.md b/.planning/phase-1-throughput/README.md index ca523cf..88441a7 100644 --- a/.planning/phase-1-throughput/README.md +++ b/.planning/phase-1-throughput/README.md @@ -6,7 +6,7 @@ Implement a Node.js worker that joins a Redis Streams consumer group, decodes `P When Phase 1 is done: -- The Processor connects to Redis and joins consumer group `processor` on stream `telemetry:t` (configurable). On startup it creates the group with `MKSTREAM` if missing. +- The Processor connects to Redis and joins consumer group `processor` on stream `telemetry:teltonika` (configurable; must match tcp-ingestion's compiled default). On startup it creates the group with `MKSTREAM` if missing. - Every `Position` record published by `tcp-ingestion` lands as exactly one row in the `positions` hypertable, with `device_id`, `ts`, GPS fields, and the IO `attributes` bag preserved as `JSONB` (sentinel-decoded — bigint values become `numeric`, Buffer values become `bytea` or `text` per the spec in task 1.2). - Per-device in-memory state (`last_position`, `last_seen`, `position_count_session`) is updated on every record and bounded by an LRU cap. - `XACK` is sent only after the Postgres write succeeds. A crashed instance leaves work pending; on its next start it picks up via consumer-group resumption, and any other instance can claim its pending entries (full `XAUTOCLAIM` polish lives in Phase 3, but the basic resumption works in Phase 1). diff --git a/.planning/phase-3-hardening/README.md b/.planning/phase-3-hardening/README.md index e7fa45c..2d434ee 100644 --- a/.planning/phase-3-hardening/README.md +++ b/.planning/phase-3-hardening/README.md @@ -11,7 +11,7 @@ When Phase 3 is done: - **Graceful shutdown** with bounded in-flight drain: SIGTERM blocks new reads, awaits in-flight writes, ACKs anything still in PEL whose write succeeded, exits clean. - **State rehydration on restart**: on first packet for an unknown device, the Processor queries Postgres for the device's `last_position` and seeds `DeviceState` accordingly. Phase 2 accumulators get the same treatment (e.g. last geofence membership comes from the last `timing_records` row). - **`XAUTOCLAIM` for stuck pending entries**: at startup and on a cadence, the Processor claims entries that have been pending in another consumer's PEL for longer than `CLAIM_THRESHOLD_MS`. Lets a dead instance's work get picked up by survivors without manual intervention. -- **Dead-letter stream for poison records**: records that fail to decode N times go to `telemetry:t:dlq` with the original payload + the error. Operators can inspect, fix, replay. +- **Dead-letter stream for poison records**: records that fail to decode N times go to `telemetry:teltonika:dlq` with the original payload + the error. Operators can inspect, fix, replay. - **Multi-instance load split verified**: spinning up two Processor instances against the same consumer group splits the work evenly. End-to-end test in CI (or at least a manual playbook). - **Migration safety with multiple instances**: Postgres advisory locks around the migration runner so two instances starting simultaneously don't race. - **Uncaught exception / unhandled rejection handlers**: log, flush in-memory state to a panic dump file, exit with a code Portainer treats as restart-worthy. @@ -24,7 +24,7 @@ When Phase 3 is done: | 3.1 | Graceful shutdown — full | Replaces the Phase 1 stub. Drain budget configurable. Tested end-to-end | | 3.2 | Per-device state rehydration on first-packet | Single `SELECT ... LIMIT 1` per cold device. Memoized by LRU | | 3.3 | `XAUTOCLAIM` runner | Periodic + on-startup. Claims entries pending > `CLAIM_THRESHOLD_MS`. Re-runs the sink | -| 3.4 | Dead-letter stream | After N failed decodes/writes, record goes to `telemetry:t:dlq`; original ACKed off the main stream | +| 3.4 | Dead-letter stream | After N failed decodes/writes, record goes to `telemetry:teltonika:dlq`; original ACKed off the main stream | | 3.5 | Migration advisory lock | `pg_advisory_lock()` around the migrate runner; two instances can start simultaneously | | 3.6 | Uncaught exception / unhandled rejection handlers | Log, flush, exit. Match `tcp-ingestion`'s eventual Phase 1 task 1.12 work when that lands | | 3.7 | OPERATIONS.md | The runbook | diff --git a/src/config/load.ts b/src/config/load.ts index 8da0b07..d38c8dd 100644 --- a/src/config/load.ts +++ b/src/config/load.ts @@ -44,7 +44,7 @@ const ConfigSchema = z.object({ POSTGRES_URL: urlWithProtocol(['postgres', 'postgresql']), // Redis stream / group config — must match tcp-ingestion's output stream - REDIS_TELEMETRY_STREAM: z.string().min(1).default('telemetry:t'), + REDIS_TELEMETRY_STREAM: z.string().min(1).default('telemetry:teltonika'), REDIS_CONSUMER_GROUP: z.string().min(1).default('processor'), // Consumer name defaults to INSTANCE_ID; resolved after schema parse (see below) REDIS_CONSUMER_NAME: z.string().min(1).optional(), diff --git a/test/config.test.ts b/test/config.test.ts index 20e710b..dd6be8d 100644 --- a/test/config.test.ts +++ b/test/config.test.ts @@ -52,9 +52,9 @@ describe('loadConfig — defaults', () => { expect(config.LOG_LEVEL).toBe('info'); }); - it('applies default REDIS_TELEMETRY_STREAM=telemetry:t', () => { + it('applies default REDIS_TELEMETRY_STREAM=telemetry:teltonika', () => { const config = loadConfig(validEnv()); - expect(config.REDIS_TELEMETRY_STREAM).toBe('telemetry:t'); + expect(config.REDIS_TELEMETRY_STREAM).toBe('telemetry:teltonika'); }); it('applies default REDIS_CONSUMER_GROUP=processor', () => {