diff --git a/.planning/ROADMAP.md b/.planning/ROADMAP.md new file mode 100644 index 0000000..c914d34 --- /dev/null +++ b/.planning/ROADMAP.md @@ -0,0 +1,90 @@ +# processor โ€” Roadmap + +A Node.js worker service that consumes normalized `Position` records from a Redis Stream, maintains per-device runtime state, applies racing-domain rules, and writes durable state to Postgres / TimescaleDB. + +This file is the single navigation hub for all implementation planning. Each phase has its own folder with a README and granular task files. Update statuses here as work lands. + +## Status legend + +| Symbol | Meaning | +|--------|---------| +| โฌœ | Not started | +| ๐ŸŸฆ | Planned (designed, not coded) | +| ๐ŸŸจ | In progress | +| ๐ŸŸฉ | Done | +| โธ | Paused / blocked | +| โ„๏ธ | Frozen / future / optional | + +## Architectural anchors + +The service is specified by the wiki at `../docs/wiki/`. Implementing agents should read these pages before starting any task: + +- **Architecture** โ€” `docs/wiki/sources/gps-tracking-architecture.md`, `docs/wiki/concepts/plane-separation.md`, `docs/wiki/concepts/failure-domains.md` +- **This service** โ€” `docs/wiki/entities/processor.md` +- **Upstream contract (input)** โ€” `docs/wiki/concepts/position-record.md`, `docs/wiki/concepts/io-element-bag.md`, `docs/wiki/entities/redis-streams.md` +- **Downstream contract (output)** โ€” `docs/wiki/entities/postgres-timescaledb.md`, `docs/wiki/entities/directus.md` + +## Non-negotiable design rules + +These rules govern every task. Any deviation must be discussed and documented as a decision before code lands. + +1. **Domain logic is isolated.** `src/core/` (Stream consumer, Postgres writer, in-memory state plumbing) never imports from `src/domain/` (geofence engine, timing logic, IO interpretation). Phase 2 must be a pure addition layered on top of the Phase 1 throughput pipeline. +2. **Schema authority lives in Directus**, with one exception: the `positions` hypertable is bulk-written by this service and its migration is owned here. All other tables Processor writes to (timing_records, stage_results, etc.) are defined in Directus and Processor inserts respecting that schema. +3. **Per-device state is in-memory, not durable.** The DB is the source of truth for replay/analysis; in-memory state is the source of truth for the current decision. On restart, hot state is rehydrated from the DB โ€” Phase 1 does not implement rehydration; restart loses state, which is acceptable until Phase 2 introduces stateful accumulators. +4. **Consumer-group offsets drive work distribution.** No application-level coordination between Processor instances. `XACK` on success; failed batches stay pending and are claimed by surviving instances via `XAUTOCLAIM`. +5. **Idempotent writes.** Records arriving twice (after a claim, replay, or retry) must not produce duplicate rows. The `positions` hypertable uses `(device_id, ts)` as a unique key with `ON CONFLICT DO NOTHING`. Derived tables follow the same pattern, scoped by their natural keys. +6. **Bounded memory.** Per-device state is capped (LRU eviction by last-seen timestamp); replay-from-DB rehydrates an evicted device on next packet. No unbounded `Map` growth. +7. **Fail loudly.** Schema-incompatible records (e.g. malformed payload, unknown sentinel) are logged at `error` and dead-letter-streamed (Phase 3); they do **not** silently skip. + +## Phases + +### Phase 1 โ€” Throughput pipeline + +**Status:** โฌœ Not started +**Outcome:** A Node.js Processor that joins a Redis Streams consumer group on `telemetry:t`, decodes each `Position` (including `__bigint`/`__buffer_b64` sentinel reversal), upserts it into a TimescaleDB `positions` hypertable, updates per-device in-memory state (last position, last seen), `XACK`s on successful write, and exposes Prometheus metrics + health/readiness HTTP endpoints. End-to-end pilot-quality service; no domain logic yet. + +[**See `phase-1-throughput/README.md`**](./phase-1-throughput/README.md) + +| # | Task | Status | Landed in | +|---|------|--------|-----------| +| 1.1 | [Project scaffold](./phase-1-throughput/01-project-scaffold.md) | โฌœ | โ€” | +| 1.2 | [Core types & contracts](./phase-1-throughput/02-core-types.md) | โฌœ | โ€” | +| 1.3 | [Configuration & logging](./phase-1-throughput/03-config-and-logging.md) | โฌœ | โ€” | +| 1.4 | [Postgres connection & `positions` hypertable](./phase-1-throughput/04-postgres-schema.md) | โฌœ | โ€” | +| 1.5 | [Redis Stream consumer (XREADGROUP)](./phase-1-throughput/05-stream-consumer.md) | โฌœ | โ€” | +| 1.6 | [Per-device in-memory state](./phase-1-throughput/06-device-state.md) | โฌœ | โ€” | +| 1.7 | [Position writer (batched upsert)](./phase-1-throughput/07-position-writer.md) | โฌœ | โ€” | +| 1.8 | [Main wiring & ACK semantics](./phase-1-throughput/08-main-wiring.md) | โฌœ | โ€” | +| 1.9 | [Observability (Prometheus metrics + /healthz + /readyz)](./phase-1-throughput/09-observability.md) | โฌœ | โ€” | +| 1.10 | [Integration test (testcontainers Redis + Postgres)](./phase-1-throughput/10-integration-test.md) | โฌœ | โ€” | +| 1.11 | [Dockerfile & Gitea workflow](./phase-1-throughput/11-dockerfile-and-ci.md) | โฌœ | โ€” | + +### Phase 2 โ€” Domain logic + +**Status:** โฌœ Not started โ€” blocks on Directus schema decisions +**Outcome:** Geofence engine that detects entry/checkpoint/finish crossings; per-model Teltonika IO mapping driving derived attributes (`odometer_km`, `ignition`, etc.); timing record writer producing entries in the Directus-owned `timing_records` table; per-stage result aggregator. Layered on top of Phase 1 โ€” no changes to the throughput pipeline. + +[**See `phase-2-domain/README.md`**](./phase-2-domain/README.md) + +Detailed task breakdown deferred until the Directus schema is finalized (open questions about geofence ownership, IO mapping storage, stage vocabulary). Phase 1 can ship and run on stage without any Phase 2 work. + +### Phase 3 โ€” Production hardening + +**Status:** โฌœ Not started +**Outcome:** Graceful shutdown with consumer-group commit on SIGTERM; per-device state rehydration from Postgres on startup (only loaded on first packet for a given device); `XAUTOCLAIM` for stuck pending entries from a dead instance; dead-letter stream for poison records; multi-instance load-split verification; `OPERATIONS.md` runbook. + +[**See `phase-3-hardening/README.md`**](./phase-3-hardening/README.md) + +### Phase 4 โ€” Future / optional + +**Status:** โ„๏ธ Not committed +[**See `phase-4-future/README.md`**](./phase-4-future/README.md) + +Ideas on radar: Directus Flow trigger emission, replay tooling, derived-metric backfill, alternate consumer for analytics export. + +## Operating model + +- **Implementation agent contract.** Each task file is self-sufficient: goal, deliverables, specification, acceptance criteria. An agent should be able to complete one task without reading the whole wiki โ€” but should skim the wiki references at the top of the task before starting. +- **Sequence within a phase.** Task numbering reflects intended order. Soft dependencies are explicit in each task's "Depends on" field. Tasks with no dependencies on each other can be done in parallel. +- **Status updates.** When a task is started, change its row in this ROADMAP to ๐ŸŸจ and the task file's status badge accordingly. When done, ๐ŸŸฉ + a one-line note in the task file's "Done" section pointing at the merging commit/PR. +- **Drift control.** If implementation diverges from a task's spec, update the task file *before* the diverging code lands, with a note explaining why. Do not let plans rot โ€” either fix the plan or fix the code. diff --git a/.planning/phase-1-throughput/01-project-scaffold.md b/.planning/phase-1-throughput/01-project-scaffold.md new file mode 100644 index 0000000..337455b --- /dev/null +++ b/.planning/phase-1-throughput/01-project-scaffold.md @@ -0,0 +1,58 @@ +# Task 1.1 โ€” Project scaffold + +**Phase:** 1 โ€” Throughput pipeline +**Status:** โฌœ Not started +**Depends on:** None +**Wiki refs:** `docs/wiki/entities/processor.md` + +## Goal + +Initialize the Node.js / TypeScript project with the directory layout from the Phase 1 README, install the agreed tooling, and produce a minimal `main.ts` that the rest of Phase 1 builds on. Mirror the `tcp-ingestion` scaffold conventions exactly so the two services feel uniform. + +## Deliverables + +- `package.json` declaring: + - `"type": "module"` (ESM only). + - `"engines": { "node": ">=22" }`. + - Scripts: `build`, `dev`, `start`, `test`, `test:watch`, `test:integration`, `lint`, `format`, `typecheck`. + - Dependencies: `ioredis`, `pg`, `pino`, `prom-client`, `zod`. + - Dev dependencies: `typescript`, `@types/node`, `@types/pg`, `vitest`, `@vitest/coverage-v8`, `eslint`, `@typescript-eslint/parser`, `@typescript-eslint/eslint-plugin`, `eslint-plugin-import`, `eslint-import-resolver-typescript`, `prettier`, `pino-pretty`, `tsx`, `testcontainers`. +- `tsconfig.json` โ€” same as `tcp-ingestion`: `strict: true`, `target: ES2022`, `module: NodeNext`, `moduleResolution: NodeNext`, `outDir: dist`, `rootDir: src`, `noUncheckedIndexedAccess: true`. +- `eslint.config.js` (flat config) with `@typescript-eslint/recommended-type-checked`, `@typescript-eslint/no-floating-promises`, `@typescript-eslint/no-misused-promises`. Add `import/no-restricted-paths` enforcing **`src/core/` cannot import from `src/domain/`**. (`src/domain/` doesn't exist yet โ€” the rule is preemptive so Phase 2 can't violate the boundary by accident.) +- `.prettierrc` โ€” match `tcp-ingestion` (2 spaces, single quotes, semis). +- `.gitignore` โ€” `node_modules/`, `dist/`, `coverage/`, `.env`, `.env.local`, `*.log`. +- `.dockerignore` โ€” same as `.gitignore` plus `.git/`, `.planning/`, `test/`, `*.md` except `README.md`. +- `vitest.config.ts` โ€” unit-test config that excludes `*.integration.test.ts`. +- `vitest.integration.config.ts` โ€” integration-test config with `hookTimeout: 120_000`, `testTimeout: 60_000`. Include only `*.integration.test.ts`. +- `.env.example` documenting every env var (with descriptions and defaults). Required vars only: `REDIS_URL`, `POSTGRES_URL`. All others have sensible defaults. +- Empty directories with `.gitkeep` files where Phase 1 will fill them in: + - `src/core/`, `src/db/migrations/`, `src/config/`, `src/observability/` + - `test/` +- `src/main.ts` โ€” minimal stub: imports nothing yet, prints `processor starting` to stdout, exits with code 0. +- `README.md` โ€” short description pointing at `.planning/ROADMAP.md` for the work plan, and at `../docs/wiki/entities/processor.md` for the architectural specification. + +## Specification + +- **Package manager:** pnpm. Commit `pnpm-lock.yaml`. The Dockerfile (task 1.11) will use `pnpm fetch` for layer-cache friendliness. +- **Module style:** ESM throughout. Relative imports use `.js` suffix per Node ESM resolution. No `paths` aliases. +- **No bundler.** Build is `tsc` only. Runtime is plain Node consuming `dist/`. +- **Linting style:** Configure ESLint to enforce no-floating-promises and no-misused-promises โ€” both critical in a stream consumer where unhandled rejection silently loses work. + +## Acceptance criteria + +- [ ] `pnpm install` succeeds with no warnings other than peer deps. +- [ ] `pnpm typecheck` succeeds on the empty project. +- [ ] `pnpm lint` succeeds. +- [ ] `pnpm build` produces `dist/main.js`. +- [ ] `pnpm start` runs the compiled output and prints the startup message. +- [ ] `pnpm test` runs (with no tests) and exits successfully. +- [ ] `pnpm dev` runs `main.ts` via `tsx` and prints the startup message. +- [ ] Repository builds reproducibly: deleting `node_modules` and `dist`, then `pnpm install --frozen-lockfile && pnpm build` produces identical output. + +## Risks / open questions + +- The `import/no-restricted-paths` rule is preemptive and will be silently inert until Phase 2 introduces `src/domain/`. Verify it activates correctly with a temporary `src/domain/foo.ts` during scaffold setup, then remove. + +## Done + +(Fill in once complete: commit SHA, brief notes.) diff --git a/.planning/phase-1-throughput/02-core-types.md b/.planning/phase-1-throughput/02-core-types.md new file mode 100644 index 0000000..278d1a3 --- /dev/null +++ b/.planning/phase-1-throughput/02-core-types.md @@ -0,0 +1,66 @@ +# Task 1.2 โ€” Core types & contracts + +**Phase:** 1 โ€” Throughput pipeline +**Status:** โฌœ Not started +**Depends on:** 1.1 +**Wiki refs:** `docs/wiki/concepts/position-record.md`, `docs/wiki/concepts/io-element-bag.md` + +## Goal + +Define the canonical TypeScript types for the data flowing through the Processor: the `Position` record (input from Redis), the per-device runtime state, the metrics interface, and the codec for reversing the JSON sentinels (`__bigint`, `__buffer_b64`) that `tcp-ingestion` produces. + +This task does **not** add behaviour โ€” only types and a sentinel decoder with unit tests. Behaviour is layered on in subsequent tasks. + +## Deliverables + +- `src/core/types.ts` exporting: + - `Position` โ€” must be byte-equivalent to `tcp-ingestion`'s output. Fields: `device_id: string`, `timestamp: Date`, `latitude: number`, `longitude: number`, `altitude: number`, `angle: number`, `speed: number`, `satellites: number`, `priority: number`, `attributes: Record`. Where `AttributeValue = number | bigint | Buffer`. + - `StreamRecord` โ€” what `XREADGROUP` actually returns: `{ id: string; ts: string; device_id: string; codec: string; payload: string }`. The `payload` field is the JSON-encoded `Position` (still sentinel-encoded โ€” the consumer decodes it). + - `DeviceState` โ€” `{ device_id: string; last_position: Position; last_seen: Date; position_count_session: number }`. + - `Metrics` interface โ€” same shape as `tcp-ingestion`: `inc(name: string, labels?: Record): void`, `observe(name: string, value: number, labels?: Record): void`. Don't extend it in Phase 1; task 1.9 may add helpers but the interface stays. +- `src/core/codec.ts` exporting: + - `decodePosition(payload: string): Position` โ€” JSON-parses with a reviver that detects `{ __bigint: "..." }` โ†’ `BigInt(...)` and `{ __buffer_b64: "..." }` โ†’ `Buffer.from(s, 'base64')`. Throws `CodecError` with a clear message on malformed payloads. + - `class CodecError extends Error` for failure cases. +- `test/codec.test.ts` covering: + - Round-trip a Position with bigint and Buffer attributes through `tcp-ingestion`'s `serializePosition` (copy the helper into the test or inline-encode) โ†’ `decodePosition` โ†’ assert byte-equal. + - Decode a u64-max bigint sentinel. + - Decode a Buffer sentinel with non-UTF-8 bytes (e.g. `0xde 0xad 0xbe 0xef`). + - Reject malformed payload (non-JSON, missing required fields, invalid sentinel shape). + - `device_id`, `timestamp` (ISO string โ†’ Date), and numeric fields all decode correctly. + +## Specification + +### Sentinel reversal โ€” exact rules + +The reviver runs on every JSON value. For each value: + +1. If the value is an object with exactly one property `__bigint` whose value is a string of digits โ†’ return `BigInt(value.__bigint)`. Validate that the string parses or throw. +2. If the value is an object with exactly one property `__buffer_b64` whose value is a base64 string โ†’ return `Buffer.from(value.__buffer_b64, 'base64')`. +3. If the key is `timestamp` and the value is a string โ†’ return `new Date(value)` (validate it parsed; reject `Invalid Date`). +4. Otherwise pass through. + +**Critical:** the reviver must not match shapes broader than the sentinels. A user-defined attribute `{ __bigint: "..." }` is by definition a sentinel โ€” there is no ambiguity because `tcp-ingestion` only uses these shapes for sentinel encoding. But validate the inner string strictly so a malformed attribute fails fast. + +### Why `Buffer`, not `Uint8Array` + +`tcp-ingestion` uses Node's `Buffer`. We're also Node-only. Using `Buffer` avoids the conversion cost on every record. If the platform later needs to support browser-side decoding (e.g. for a debug tool), introduce a `Uint8Array`-based parallel path then; not now. + +### Why `bigint`, not `number` + +Some Teltonika IO elements are u64 values that exceed `Number.MAX_SAFE_INTEGER` (2^53 โˆ’ 1). Silently truncating to `number` would corrupt those values. Phase 1 preserves them as `bigint`; the Position writer (task 1.7) decides how to store them in Postgres (likely `numeric` or stringified โ€” see that task). + +## Acceptance criteria + +- [ ] `pnpm typecheck` succeeds. +- [ ] `pnpm test` runs `codec.test.ts` and all cases pass. +- [ ] A round-tripped Position with `bigint` and `Buffer` attributes matches the original byte-for-byte (including `Buffer` content equality, not just length). +- [ ] Malformed payloads throw `CodecError` with a descriptive message that names the failing field. + +## Risks / open questions + +- The reviver runs on the full JSON tree, including the top-level object. Verify that nested attributes (rare, but possible if Teltonika ever sends nested IO bags in Codec 16) decode correctly. The spec doesn't currently allow nesting; treat unexpected nesting as an error. +- TypeScript inference for revivers is awkward (`(key: string, value: any) => any`). Use a typed wrapper to surface the result as `Position` without `any` leakage outside the codec module. + +## Done + +(Fill in once complete: commit SHA, brief notes.) diff --git a/.planning/phase-1-throughput/03-config-and-logging.md b/.planning/phase-1-throughput/03-config-and-logging.md new file mode 100644 index 0000000..f5122b4 --- /dev/null +++ b/.planning/phase-1-throughput/03-config-and-logging.md @@ -0,0 +1,76 @@ +# Task 1.3 โ€” Configuration & logging + +**Phase:** 1 โ€” Throughput pipeline +**Status:** โฌœ Not started +**Depends on:** 1.1 +**Wiki refs:** `docs/wiki/entities/processor.md` + +## Goal + +Validate environment variables on startup with `zod`, build the pino root logger with the same conventions as `tcp-ingestion` (ISO timestamps, string level labels, instance_id base field), and fail fast with a readable error message if config is invalid. + +## Deliverables + +- `src/config/load.ts` exporting: + - `loadConfig(): Config` โ€” reads `process.env`, runs zod parse, returns a typed `Config`. Throws on invalid input with a multi-line message that names every invalid field. + - `Config` type derived from the zod schema. +- `src/observability/logger.ts` exporting: + - `createLogger({ level, nodeEnv, instanceId }): Logger` โ€” pino root logger with base fields `service: 'processor'`, `instance_id`. ISO timestamps via `pino.stdTimeFunctions.isoTime`. Level formatter that emits `"level":"info"` not `"level":30`. In `nodeEnv === 'development'`, use the pino-pretty transport. + - `type Logger` re-exported from `pino`. +- Wire both into `src/main.ts`: `loadConfig()` โ†’ `createLogger()` โ†’ `logger.info('processor starting')` โ†’ exit 0 (still a stub; consumer wiring lands in 1.8). + +## Specification + +### Environment variables + +| Var | Required | Default | Notes | +|---|---|---|---| +| `NODE_ENV` | no | `production` | `development` enables pino-pretty | +| `INSTANCE_ID` | no | `processor-1` | Used in metrics + log base field | +| `LOG_LEVEL` | no | `info` | `trace` / `debug` / `info` / `warn` / `error` | +| `REDIS_URL` | yes | โ€” | e.g. `redis://redis:6379` | +| `POSTGRES_URL` | yes | โ€” | e.g. `postgres://user:pass@db:5432/trm` | +| `REDIS_TELEMETRY_STREAM` | no | `telemetry:t` | Must match `tcp-ingestion`'s `REDIS_TELEMETRY_STREAM` | +| `REDIS_CONSUMER_GROUP` | no | `processor` | All Processor instances join this group | +| `REDIS_CONSUMER_NAME` | no | `${INSTANCE_ID}` | Unique per instance โ€” defaults to instance id | +| `METRICS_PORT` | no | `9090` | HTTP server port for `/metrics`, `/healthz`, `/readyz` | +| `BATCH_SIZE` | no | `100` | Max records per `XREADGROUP` call | +| `BATCH_BLOCK_MS` | no | `5000` | `BLOCK` timeout on `XREADGROUP` when stream is empty | +| `WRITE_BATCH_SIZE` | no | `50` | Max rows per Postgres `INSERT` | +| `DEVICE_STATE_LRU_CAP` | no | `10000` | Max devices kept in memory; LRU eviction beyond this | + +### Validation rules + +- All defaults must be expressed in the zod schema with `.default(...)` so the parsed `Config` is fully typed and never has `undefined` for an optional field. +- Numeric env vars must be coerced from string and bounded: `BATCH_SIZE` 1โ€“10000, `BATCH_BLOCK_MS` 0โ€“60000, `WRITE_BATCH_SIZE` 1โ€“1000, `DEVICE_STATE_LRU_CAP` 100โ€“1_000_000. +- `REDIS_URL` and `POSTGRES_URL` must parse as URLs with the expected protocol (`redis:` or `rediss:`; `postgres:` or `postgresql:`). +- `LOG_LEVEL` must be one of pino's accepted levels. + +### Logger conventions + +Match `tcp-ingestion/src/observability/logger.ts` line for line where applicable. Future-you grepping across services should see the same shape: + +```ts +const formatters = { level: (label: string) => ({ level: label }) }; + +if (nodeEnv === 'development') { + return pino({ level, base, timestamp: pino.stdTimeFunctions.isoTime, formatters, + transport: { target: 'pino-pretty', options: { colorize: true, translateTime: 'SYS:standard', ignore: 'pid,hostname' } } }); +} +return pino({ level, base, timestamp: pino.stdTimeFunctions.isoTime, formatters }); +``` + +## Acceptance criteria + +- [ ] `pnpm test` covers config validation: missing required vars throw with the right message; invalid URLs throw; bounded numerics throw on out-of-range values. +- [ ] Running with valid env emits a single `processor starting` info log with `service=processor` and `instance_id=processor-1` base fields. +- [ ] Running with `NODE_ENV=development` produces colorized output via pino-pretty. +- [ ] Running with `NODE_ENV=production` produces JSON output with ISO `time` and string `level`. + +## Risks / open questions + +- `REDIS_CONSUMER_NAME` defaulting to `INSTANCE_ID` means `INSTANCE_ID` must be unique per instance for safe consumer-group operation. Document this in `.env.example` so operators don't accidentally run two instances with the same `INSTANCE_ID`. + +## Done + +(Fill in once complete: commit SHA, brief notes.) diff --git a/.planning/phase-1-throughput/04-postgres-schema.md b/.planning/phase-1-throughput/04-postgres-schema.md new file mode 100644 index 0000000..9a61d05 --- /dev/null +++ b/.planning/phase-1-throughput/04-postgres-schema.md @@ -0,0 +1,89 @@ +# Task 1.4 โ€” Postgres connection & `positions` hypertable + +**Phase:** 1 โ€” Throughput pipeline +**Status:** โฌœ Not started +**Depends on:** 1.1, 1.3 +**Wiki refs:** `docs/wiki/entities/postgres-timescaledb.md` + +## Goal + +Stand up the Postgres connection (a single `pg.Pool`) and define the `positions` hypertable migration. This is the only table whose schema the Processor owns directly (per the design rule in ROADMAP.md). Every other table is owned by Directus. + +## Deliverables + +- `src/db/pool.ts` exporting: + - `createPool(url: string): pg.Pool` โ€” single Pool with sane defaults (`max: 10`, `idleTimeoutMillis: 30_000`, `connectionTimeoutMillis: 5_000`). Sets `application_name = 'processor'` so connections are identifiable in `pg_stat_activity`. + - `connectWithRetry(pool, logger): Promise` โ€” runs `SELECT 1` with exponential backoff (3 attempts, up to 5s). Mirrors `tcp-ingestion`'s `connectRedis` pattern. Calls `process.exit(1)` on final failure. +- `src/db/migrations/0001_positions.sql` containing: + - `CREATE EXTENSION IF NOT EXISTS timescaledb;` (no-op if already enabled) + - `CREATE TABLE IF NOT EXISTS positions (...)` per the schema below + - `SELECT create_hypertable('positions', 'ts', if_not_exists => TRUE, chunk_time_interval => INTERVAL '1 day');` + - `CREATE UNIQUE INDEX IF NOT EXISTS positions_device_ts ON positions (device_id, ts);` + - `CREATE INDEX IF NOT EXISTS positions_ts ON positions (ts DESC);` +- `src/db/migrate.ts` โ€” minimal runner that executes pending migration files in order. Tracks applied migrations in a `schema_migrations(version, applied_at)` table. Idempotent. Called from `main.ts` before the consumer starts. +- `test/db/migrate.test.ts` covering: applying a fresh migration; applying twice is a no-op; bad SQL fails loudly. + +## Specification + +### `positions` table schema + +```sql +CREATE TABLE IF NOT EXISTS positions ( + device_id text NOT NULL, + ts timestamptz NOT NULL, -- canonical event time from device GPS + ingested_at timestamptz NOT NULL DEFAULT now(), -- when Processor wrote the row + latitude double precision NOT NULL, + longitude double precision NOT NULL, + altitude real NOT NULL, + angle real NOT NULL, + speed real NOT NULL, + satellites smallint NOT NULL, + priority smallint NOT NULL, + codec text NOT NULL, -- '8' | '8E' | '16' + attributes jsonb NOT NULL -- the IO bag, sentinel-decoded +); +``` + +### Why these column types + +- `device_id text` โ€” IMEIs are 15 ASCII digits. Could be `bigint`, but `text` keeps the door open for non-IMEI device identifiers (future vendors) and avoids leading-zero loss. +- `ts timestamptz` โ€” the **device-reported** time, not ingestion time. This is the hypertable partitioning column. +- `ingested_at timestamptz` โ€” diagnostic: helps spot devices with clock skew or buffered records (the 55-record buffer flush we saw on stage). Not part of the natural key. +- `altitude/angle/speed real` โ€” float32 is plenty; saves space on a high-volume table. +- `attributes jsonb` โ€” preserves the IO bag verbatim. Per the design rule, no naming or unit conversion happens here; that's Phase 2 in `src/domain/`. + +### bigint and Buffer attributes โ€” JSONB encoding + +The codec (task 1.2) decodes `__bigint` to `bigint` and `__buffer_b64` to `Buffer`. Postgres `jsonb` is JSON, so we re-encode for storage: +- `bigint` โ†’ JSON number if it fits in `Number.MAX_SAFE_INTEGER`, else JSON string. Always store as a string is simpler and unambiguous; **decision: always string for bigint**. +- `Buffer` โ†’ base64 string. + +**Re-encoding loses the type tag.** Phase 2 IO interpretation (per-model mapping table) is responsible for knowing that `attributes.io_240` is a u64 stored as a string. Phase 1 doesn't need to query individual attributes โ€” it's pass-through storage. + +If this becomes painful later, options to revisit: a separate `attributes_typed` column with structured shape; or store bigints as `numeric` and Buffers as `bytea` in dedicated columns. **Defer** โ€” 80% of attributes are small ints, and the simple string approach unblocks Phase 1. + +### Migration runner + +Follow the simplest possible pattern. The runner: +1. `CREATE TABLE IF NOT EXISTS schema_migrations (version text PRIMARY KEY, applied_at timestamptz NOT NULL DEFAULT now())`. +2. Lists `*.sql` files in `src/db/migrations/` sorted by filename. +3. For each, `SELECT 1 FROM schema_migrations WHERE version = $1`. If absent, run the SQL inside a transaction and insert the row. +4. Logs each applied or skipped migration at `info`. + +Do **not** introduce a heavy framework (Knex, node-pg-migrate). The Processor has one migration file in Phase 1 โ€” a 30-line runner is the right answer. + +## Acceptance criteria + +- [ ] `pnpm typecheck`, `pnpm lint`, `pnpm test` clean. +- [ ] Integration test (testcontainers TimescaleDB): apply migration; insert a row with a bigint-as-string attribute; query it back; verify shape. +- [ ] Re-running the migration on an already-migrated database is a no-op. +- [ ] `connectWithRetry` retries 3 times with exponential backoff, then calls `process.exit(1)`. Verify with a unit test using a fake Pool. + +## Risks / open questions + +- **TimescaleDB extension availability.** The `deploy/` repo's Postgres container must be the `timescale/timescaledb` image, not stock `postgres`. Document this explicitly in the deploy README when Phase 1 ships. Fall back to a regular table (no hypertable) if the extension is unavailable: `create_hypertable` will error, but the `IF NOT EXISTS` table creation succeeds. The performance falls off a cliff at scale, but functional. +- **Schema authority overlap with Directus.** Directus also speaks Postgres. When Directus connects and introspects the schema, it will see the `positions` table created by Processor. That's fine โ€” Directus can reflect tables it didn't create. But if an operator later modifies `positions` from the Directus admin UI, the migration may break. Document: `positions` is a Processor-owned table; do not edit from Directus. + +## Done + +(Fill in once complete: commit SHA, brief notes.) diff --git a/.planning/phase-1-throughput/05-stream-consumer.md b/.planning/phase-1-throughput/05-stream-consumer.md new file mode 100644 index 0000000..3fc2cea --- /dev/null +++ b/.planning/phase-1-throughput/05-stream-consumer.md @@ -0,0 +1,93 @@ +# Task 1.5 โ€” Redis Stream consumer (XREADGROUP) + +**Phase:** 1 โ€” Throughput pipeline +**Status:** โฌœ Not started +**Depends on:** 1.2, 1.3 +**Wiki refs:** `docs/wiki/entities/redis-streams.md`, `docs/wiki/entities/processor.md` + +## Goal + +Build the Redis Stream consumer: join the consumer group, fetch batches via `XREADGROUP`, decode each entry to a `Position`, hand off to a sink callback, and return successfully-handled IDs to the caller for `XACK`. + +This task does **not** wire in the Postgres writer or the in-memory state โ€” those are tasks 1.7 and 1.6, joined to the consumer in 1.8. The consumer accepts a `sink: (records: ConsumedRecord[]) => Promise` callback that returns the IDs it wants ACKed. Only those IDs are ACKed; failures stay pending and get claimed on the next loop. + +## Deliverables + +- `src/core/consumer.ts` exporting: + - `createConsumer(redis, config, logger, metrics, sink): Consumer` โ€” factory. + - `Consumer` interface: `start(): Promise` (returns when the consumer loop starts), `stop(): Promise` (signals the loop to exit, waits for the in-flight batch). + - `ensureConsumerGroup(redis, stream, group)` โ€” `XGROUP CREATE ... MKSTREAM` ignoring `BUSYGROUP` errors. Called once at start. + - `type ConsumedRecord = { id: string; position: Position; codec: string; ts: string }` โ€” what's passed to the sink. +- `test/consumer.test.ts` (mocked `ioredis`): + - Decodes a synthetic stream entry into a `ConsumedRecord` with the right shape. + - Calls `sink` with the decoded batch and ACKs only the IDs the sink returned. + - On `BUSYGROUP` error from `XGROUP CREATE`, swallows the error and continues. + - On a malformed payload, increments `consumer_decode_errors_total`, logs at `error`, and **does not** ACK the bad entry โ€” leaves it pending for inspection. + - On `stop()`, the loop exits cleanly without losing in-flight work. + +## Specification + +### Consumer loop shape + +```ts +async function runLoop() { + while (!stopping) { + let entries: StreamEntry[]; + try { + entries = await redis.xreadgroup( + 'GROUP', group, consumerName, + 'COUNT', batchSize, + 'BLOCK', batchBlockMs, + 'STREAMS', stream, '>', + ); + } catch (err) { + logger.error({ err }, 'XREADGROUP failed; backing off'); + await sleep(1000); + continue; + } + if (!entries) continue; // BLOCK timeout + + const records = decodeBatch(entries); // <โ€” may emit decode errors + const ackIds = await sink(records); // <โ€” writer + state + if (ackIds.length > 0) { + await redis.xack(stream, group, ...ackIds); + } + } +} +``` + +### Decode error handling + +`decodeBatch` calls `decodePosition` (from task 1.2) on each entry's `payload` field. If a single entry fails to decode: +- Increment `processor_decode_errors_total{stream=...}`. +- Log at `error` with the entry ID and a truncated raw payload (first 256 chars). +- **Skip** the entry โ€” do not pass to sink, do not ACK. It stays in the consumer's PEL (Pending Entries List) and will be re-attempted on next claim. Phase 3 will route truly-poison entries to a dead-letter stream; for Phase 1, leaving them pending and visible in `XPENDING` is enough. + +### `XACK` semantics + +ACK only what the sink returned. If the sink returns `['id1', 'id3']` from a batch of `[id1, id2, id3]`, then `id2` stays pending. Why a sink might return a partial list: it failed to write some records. The consumer must trust the sink's signal โ€” never ACK speculatively. + +### Consumer group setup + +On `start()`: +1. `XGROUP CREATE $ MKSTREAM` โ€” creates the stream if missing, group at "now" so we don't replay history. If the group already exists, the call returns `BUSYGROUP Consumer Group name already exists` โ€” catch and ignore. +2. Log at `info` whether the group was created or already existed. + +### Why `>` not `0` for the read ID + +`>` means "deliver only new entries, not pending ones for this consumer." That's what we want for the steady-state loop. Phase 3 will add an explicit `XAUTOCLAIM` step at startup (and periodically) to pull stuck pending entries from dead consumers; Phase 1 relies on the natural redelivery via consumer-group resumption (when a dead instance restarts with the same name, it sees its old PEL). + +## Acceptance criteria + +- [ ] `pnpm typecheck`, `pnpm lint`, `pnpm test` clean. +- [ ] Unit tests cover: happy path, `BUSYGROUP` swallow, decode error skip, partial-ACK, clean stop. +- [ ] Stop signal causes the loop to exit within one `BATCH_BLOCK_MS` tick. + +## Risks / open questions + +- **Consumer name uniqueness.** Two instances with the same `REDIS_CONSUMER_NAME` will both read from the same PEL, which is undefined behaviour. Task 1.3 already documents that `INSTANCE_ID` (which defaults `REDIS_CONSUMER_NAME`) must be unique per instance โ€” surface this again in the operator-facing README later. +- **Long sink calls block the loop.** If the Postgres writer takes 30s, no new records are read. That's fine for Phase 1 (Postgres should be fast); Phase 3 may add a configurable max-in-flight if writer pressure becomes an issue. + +## Done + +(Fill in once complete: commit SHA, brief notes.) diff --git a/.planning/phase-1-throughput/06-device-state.md b/.planning/phase-1-throughput/06-device-state.md new file mode 100644 index 0000000..0d6f718 --- /dev/null +++ b/.planning/phase-1-throughput/06-device-state.md @@ -0,0 +1,81 @@ +# Task 1.6 โ€” Per-device in-memory state + +**Phase:** 1 โ€” Throughput pipeline +**Status:** โฌœ Not started +**Depends on:** 1.2 +**Wiki refs:** `docs/wiki/entities/processor.md` (ยง State management) + +## Goal + +Maintain a bounded `Map` updated on every accepted Position. Phase 1 only stores trivial state โ€” `last_position`, `last_seen`, `position_count_session` โ€” but the structure is built so Phase 2 (geofence accumulators, time-since-last-checkpoint, etc.) can extend it cleanly. + +## Deliverables + +- `src/core/state.ts` exporting: + - `createDeviceStateStore(config, logger): DeviceStateStore` โ€” factory. + - `DeviceStateStore` interface: + - `update(position: Position): DeviceState` โ€” applies the position, returns the new state. Touches LRU order. + - `get(device_id: string): DeviceState | undefined` โ€” read without touching LRU order. (Used for diagnostics; the hot path uses `update`.) + - `size(): number` โ€” for metrics. + - `evictedTotal(): number` โ€” for metrics. +- `test/state.test.ts` covering: + - First update for a new device creates the entry; subsequent updates increment `position_count_session`. + - LRU eviction: with cap=3, after 4 distinct devices, the least-recently-updated is evicted. + - Eviction increments `evictedTotal()`. + - `last_seen` reflects the position's `timestamp` (the device-reported time), not the wall clock at update time. + - Out-of-order positions (a position with `timestamp` older than `last_seen`) are still applied (we don't drop them) but `last_seen` only advances forward โ€” i.e. `last_seen = max(prev_last_seen, position.timestamp)`. Document the rationale. + +## Specification + +### LRU implementation + +Use a plain `Map`. JavaScript `Map` preserves insertion order, and we exploit it: on every `update`, `delete` then `set` the entry โ€” that bumps it to the most recent position in iteration order. When `size() > cap`, take `keys().next().value` (the oldest) and `delete` it. + +This is O(1) per update and avoids a third-party LRU dependency. **Do not** introduce `lru-cache` โ€” the standard `Map` trick is sufficient for Phase 1's needs. + +### Why `last_seen = max(...)`, not `last_seen = position.timestamp` + +Devices buffer records when offline and replay them in bursts (we observed a 55-record buffer flush on stage). Within a single batch, timestamps may *decrease* between consecutive records if the device sorted them oddly. We want `last_seen` to mean "highest device timestamp seen so far for this device" โ€” that's what downstream consumers want. + +### What about restart? + +On Processor restart, the in-memory state is empty. The first record from any device creates a fresh `DeviceState`. **Phase 1 accepts this** โ€” it's a recovery path, not a hot path, and Phase 1 has no domain logic that would be wrong without rehydrated state. + +Phase 3 (production hardening) adds rehydration: on first packet for an unknown device, query `positions WHERE device_id = $1 ORDER BY ts DESC LIMIT 1` to seed `last_position`. That's a Phase 3 task, not Phase 1. + +### What state lives here, what doesn't + +In Phase 1 the state is intentionally minimal: + +```ts +type DeviceState = { + device_id: string; + last_position: Position; + last_seen: Date; // = max(prev, position.timestamp) + position_count_session: number; // resets on restart +}; +``` + +**Not in Phase 1:** +- Geofence membership (Phase 2) +- Distance accumulators (Phase 2) +- Time-in-stage (Phase 2) +- Anything that would be wrong if dropped on restart (Phase 3 + rehydration) + +The interface is built to extend: Phase 2 may add fields, but the existing fields and method signatures should not change. + +## Acceptance criteria + +- [ ] `pnpm typecheck`, `pnpm lint`, `pnpm test` clean. +- [ ] LRU cap from `DEVICE_STATE_LRU_CAP` config is respected. +- [ ] `evictedTotal()` increments correctly under eviction. +- [ ] `last_seen` does not regress on out-of-order timestamps. + +## Risks / open questions + +- **Cap sizing.** Default `DEVICE_STATE_LRU_CAP=10000`. At 1KB per state entry, that's 10MB of resident memory โ€” fine. Operators with unusually large fleets can raise it; the bound exists to prevent runaway growth from misbehaving devices flooding novel `device_id` values. +- **No mutex.** State is updated only from the consumer loop, which is single-threaded. If Phase 2 introduces parallel sinks, revisit with proper synchronization. + +## Done + +(Fill in once complete: commit SHA, brief notes.) diff --git a/.planning/phase-1-throughput/07-position-writer.md b/.planning/phase-1-throughput/07-position-writer.md new file mode 100644 index 0000000..0a9e9da --- /dev/null +++ b/.planning/phase-1-throughput/07-position-writer.md @@ -0,0 +1,94 @@ +# Task 1.7 โ€” Position writer (batched upsert) + +**Phase:** 1 โ€” Throughput pipeline +**Status:** โฌœ Not started +**Depends on:** 1.2, 1.4 +**Wiki refs:** `docs/wiki/entities/postgres-timescaledb.md` + +## Goal + +Write batches of `Position` records into the `positions` hypertable using `INSERT ... ON CONFLICT (device_id, ts) DO NOTHING` for idempotency. Return per-record success/failure so the consumer (task 1.8) can decide what to ACK. + +## Deliverables + +- `src/core/writer.ts` exporting: + - `createWriter(pool, config, logger, metrics): Writer` โ€” factory. + - `Writer` interface: + - `write(records: ConsumedRecord[]): Promise` โ€” inserts the batch, returns per-record results: `{ id: string; status: 'inserted' | 'duplicate' | 'failed'; error?: Error }`. +- `test/writer.test.ts` (mocked `pg.Pool`): + - Happy path: all records insert. + - Duplicate-key: `ON CONFLICT DO NOTHING` returns `'duplicate'` for those records. + - Mixed: half new, half duplicate. + - Pool error: all records in the batch return `'failed'`. + - Bigint attribute is stringified before serialization. + - Buffer attribute is base64-encoded before serialization. + +## Specification + +### SQL pattern + +Use a single multi-row `INSERT` per batch with `RETURNING (xmax = 0) AS inserted`: + +```sql +INSERT INTO positions (device_id, ts, latitude, longitude, altitude, angle, speed, satellites, priority, codec, attributes) +VALUES + ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11), + ($12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22), + ... +ON CONFLICT (device_id, ts) DO NOTHING +RETURNING device_id, ts, (xmax = 0) AS inserted; +``` + +`xmax = 0` is true for newly-inserted rows, false for ones that hit `ON CONFLICT`. The `RETURNING` rows give us a lookup of which `(device_id, ts)` pairs were inserted vs. duplicates. + +**Note:** rows that hit the conflict are NOT returned (Postgres doesn't return them with `ON CONFLICT DO NOTHING`). To distinguish duplicate from "new but hit a unique violation later," compare the returned rows against the input by `(device_id, ts)`. Anything in the input but missing from RETURNING is a `'duplicate'`. + +### bigint and Buffer attribute encoding + +Per task 1.4, `jsonb` storage: +- `bigint` โ†’ JSON string. Use a custom replacer in `JSON.stringify`: + ```ts + JSON.stringify(attributes, (_k, v) => + typeof v === 'bigint' ? v.toString() : + Buffer.isBuffer(v) ? v.toString('base64') : v + ); + ``` +- `Buffer` โ†’ base64 string. + +Document this in `wiki/concepts/position-record.md` as a follow-up โ€” the on-disk shape differs slightly from the in-flight shape because JSON can't hold bigints or bytes natively. + +### Batching strategy + +The consumer (task 1.8) calls `write(batch)` with whatever batch the consumer received from `XREADGROUP`. Phase 1 doesn't internally batch further โ€” the consumer's batch size (`BATCH_SIZE`, default 100) is the writer's batch size. + +If `BATCH_SIZE > WRITE_BATCH_SIZE` (default 50), the writer chunks internally: split the input into chunks of `WRITE_BATCH_SIZE`, run them sequentially. Don't parallelize chunks against the same Pool โ€” `pg.Pool` has bounded connections and we don't want to starve other queries (the migration runner, `/readyz` health checks, etc.). + +### Per-record status + +The consumer (task 1.8) takes the `WriteResult[]` and decides ACK: +- `'inserted'` and `'duplicate'` โ†’ ACK (we got the data into Postgres or already had it). +- `'failed'` โ†’ do not ACK (let it stay pending for retry). + +If a transaction-wide failure occurs (Pool dead, transient network), all records in the chunk get `'failed'`. The consumer treats them all the same. + +### Metrics emitted by this module + +- `processor_position_writes_total{status="inserted"|"duplicate"|"failed"}` โ€” counter +- `processor_position_write_duration_seconds` โ€” histogram (per-batch latency) + +## Acceptance criteria + +- [ ] `pnpm typecheck`, `pnpm lint`, `pnpm test` clean. +- [ ] Mocked-Pool test verifies SQL parameter ordering and types are correct. +- [ ] Bigint and Buffer attributes serialize as expected via the JSON.stringify replacer. +- [ ] Mixed insert/conflict batch produces correct per-record `WriteResult[]`. +- [ ] Pool error โ†’ all records get `'failed'`; metrics reflect this. + +## Risks / open questions + +- **Parameter limit.** Postgres protocol allows max 65535 parameters per statement. With 11 columns per row, that caps us at ~5957 rows per statement. `WRITE_BATCH_SIZE=50` is well under. If the cap is ever raised, document the formula. +- **`RETURNING` cost.** On a hypertable with many chunks, `RETURNING` has near-zero overhead. Verify with a benchmark in task 1.10 (integration test). + +## Done + +(Fill in once complete: commit SHA, brief notes.) diff --git a/.planning/phase-1-throughput/08-main-wiring.md b/.planning/phase-1-throughput/08-main-wiring.md new file mode 100644 index 0000000..71c7fda --- /dev/null +++ b/.planning/phase-1-throughput/08-main-wiring.md @@ -0,0 +1,100 @@ +# Task 1.8 โ€” Main wiring & ACK semantics + +**Phase:** 1 โ€” Throughput pipeline +**Status:** โฌœ Not started +**Depends on:** 1.5, 1.6, 1.7 +**Wiki refs:** `docs/wiki/entities/processor.md` + +## Goal + +Assemble the throughput pipeline in `src/main.ts`: connect Redis + Postgres โ†’ run migrations โ†’ build the device-state store โ†’ build the writer โ†’ build the consumer with a sink that calls `state.update()` then `writer.write()` โ†’ start. Establish the rule for what to ACK and when. + +## Deliverables + +- `src/main.ts` updated to: + 1. `loadConfig()` (from task 1.3). + 2. `createLogger()` (from task 1.3). + 3. `createPool(config.POSTGRES_URL)` and `connectWithRetry()` (from task 1.4). + 4. Run migrations via `migrate()` (from task 1.4) before any consumer activity. + 5. Connect Redis with `connectRedis(...)` (re-implement the `tcp-ingestion` retry pattern; small enough to copy). + 6. Build `state = createDeviceStateStore(config, logger)`. + 7. Build `writer = createWriter(pool, config, logger, metrics)`. + 8. Build `consumer = createConsumer(redis, config, logger, metrics, sink)` where `sink` is the function defined below. + 9. `await consumer.start()`. + 10. Install graceful shutdown stub (full Phase 3 hardening later): on SIGTERM/SIGINT, call `consumer.stop()`, await pending writes, close Redis + Pool, exit. +- `src/main.ts` defines the **sink function** (the central decision point): + + ```ts + async function sink(records: ConsumedRecord[]): Promise { + // 1. Update in-memory state for every record (cheap, synchronous, can't fail meaningfully) + for (const r of records) state.update(r.position); + + // 2. Write to Postgres + const results = await writer.write(records); + + // 3. ACK only the IDs that succeeded or were duplicates + return results + .filter(r => r.status === 'inserted' || r.status === 'duplicate') + .map(r => r.id); + } + ``` + +- A placeholder `metrics` shim โ€” the same trace-logging stub as `tcp-ingestion` originally had (task 1.9 replaces it with prom-client). Use `Metrics` from `src/core/types.ts`. + +## Specification + +### State update happens before write โ€” by design + +The sink updates `state` first, *then* writes. If the write fails: +- The state update has already happened. +- The record is not ACKed, so it stays pending. +- On re-delivery (same instance retries, or another instance claims), the record will be processed again. +- `state.update` is idempotent for a given position (same record applied twice produces the same `last_position`, only `position_count_session` is double-counted โ€” and that's a session counter that resets on restart anyway, so it's a non-issue). + +If we wrote *first* and updated state second, a successful write followed by a state-update crash would leave Postgres ahead of state โ€” but state is hot-path, so that's worse. The chosen order keeps state consistent with what's been seen, even if not yet persisted. + +### What the sink does NOT do + +- **No business logic.** No "is this a finish-line crossing" detection. That's Phase 2's domain. +- **No multi-stream fanout.** No publishing to derived streams (e.g. for the SPA). The Phase 1 model is: positions go into Postgres, Directus reads them and pushes via WebSocket. If that fanout proves insufficient at the SPA layer, Phase 4 considers a dedicated WebSocket gateway reading from Redis directly. + +### Graceful shutdown โ€” Phase 1 stub vs. Phase 3 final + +Phase 1 stub is enough to not lose data in the common case: +1. Catch SIGTERM/SIGINT. +2. `consumer.stop()` โ€” exits the read loop after the current batch. +3. Await any in-flight `writer.write()`. +4. `redis.quit()` and `pool.end()`. +5. `process.exit(0)`. +6. Force-exit timer at 15s as a backstop. + +What Phase 1 does NOT do (deferred to Phase 3): +- Explicit consumer-group offset commit on SIGTERM (the current model relies on `XACK` after each successful write, which is already the right thing โ€” but Phase 3 documents and tests this rigorously). +- Uncaught exception / unhandled rejection handlers that flush state to logs before crashing. +- Multi-instance coordination on shutdown (drain mode). + +### Logger shape + +Match `tcp-ingestion`'s convention: +- `info` for lifecycle: `processor starting`, `Postgres connected`, `Redis connected`, `migrations applied`, `consumer started on stream X group Y consumer Z`, `processor ready`. +- `debug` for per-batch: `batch consumed n=42`, `batch written inserted=40 duplicates=2 failed=0`. +- `warn` / `error` for the obvious. + +After this task lands you should be able to run `pnpm dev` against a local Redis + Postgres, publish a synthetic `Position` to `telemetry:t`, and watch a row appear in `positions` while seeing the lifecycle logs above. + +## Acceptance criteria + +- [ ] `pnpm typecheck`, `pnpm lint`, `pnpm test` clean. +- [ ] `pnpm dev` (with local Redis + Postgres reachable) shows the lifecycle log sequence and `processor ready`. +- [ ] Manually publishing a `Position` to `telemetry:t` results in a row in `positions` within seconds. +- [ ] SIGTERM during idle exits cleanly (no error, no force-exit warning). +- [ ] SIGTERM with in-flight writes waits for them to complete before exiting. + +## Risks / open questions + +- **`metrics` placeholder is intentional.** Don't try to wire prom-client here; that's task 1.9. Use the trace-logging shim from `tcp-ingestion`'s pre-1.10 `main.ts` as the model. +- **Migration during deploy.** Phase 1 runs migrations on every startup. With multiple instances, two starting at once both try to migrate โ€” Postgres advisory locks would solve this. **Defer to Phase 3** (it's a Production hardening concern); for the pilot with one instance, this is fine. Document the limitation. + +## Done + +(Fill in once complete: commit SHA, brief notes.) diff --git a/.planning/phase-1-throughput/09-observability.md b/.planning/phase-1-throughput/09-observability.md new file mode 100644 index 0000000..94d2748 --- /dev/null +++ b/.planning/phase-1-throughput/09-observability.md @@ -0,0 +1,82 @@ +# Task 1.9 โ€” Observability (Prometheus metrics + /healthz + /readyz) + +**Phase:** 1 โ€” Throughput pipeline +**Status:** โฌœ Not started +**Depends on:** 1.5, 1.6, 1.7, 1.8 +**Wiki refs:** `docs/wiki/entities/processor.md`, `docs/wiki/sources/gps-tracking-architecture.md` ยง 7.4 + +## Goal + +Replace the placeholder `Metrics` shim with a real `prom-client` implementation. Expose `/metrics` (Prometheus exposition format), `/healthz` (liveness), and `/readyz` (readiness โ€” Redis ready AND Postgres ready) on `METRICS_PORT`. + +This is **not** a deferral candidate (unlike `tcp-ingestion` task 1.10). The Processor has no other surface for measuring consumer lag, write throughput, or failure rates โ€” without it, "is the pilot keeping up?" cannot be answered. + +## Deliverables + +- `src/observability/metrics.ts` โ€” same shape as `tcp-ingestion/src/observability/metrics.ts`: + - `createMetrics(): Metrics & { serializeMetrics(): Promise }` โ€” wraps `prom-client` registries; calls `collectDefaultMetrics()` once for `nodejs_*` process metrics. + - `startMetricsServer(port, metrics, deps): http.Server` โ€” `node:http` server with three endpoints. `deps` carries readyz health checks: `{ isRedisReady(): boolean; isPostgresReady(): boolean }`. +- Update `src/main.ts` to use the real `createMetrics()` and start the metrics server after Redis + Postgres are connected and the consumer is started. Wire it into graceful shutdown (close it before `redis.quit()`). +- Tests: `test/metrics.test.ts` mirroring the `tcp-ingestion` test pattern โ€” exposition format, counter/gauge/histogram behaviour, all four HTTP endpoint paths including `/readyz` 503 cases. + +## Specification + +### Metric inventory + +| Metric | Type | Labels | Description | +|---|---|---|---| +| `processor_consumer_reads_total` | counter | `result=ok\|empty\|error` | `XREADGROUP` calls; `empty` = BLOCK timeout, `error` = client error | +| `processor_consumer_records_total` | counter | โ€” | Total records pulled off the stream | +| `processor_consumer_lag` | gauge | โ€” | `XLEN` minus the consumer group's last-delivered ID position. Sampled every N seconds. | +| `processor_decode_errors_total` | counter | โ€” | Records that failed to decode (malformed payload, sentinel error) | +| `processor_position_writes_total` | counter | `status=inserted\|duplicate\|failed` | Per-record write outcomes | +| `processor_position_write_duration_seconds` | histogram | โ€” | Per-batch write latency. Buckets `[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5]` | +| `processor_acks_total` | counter | โ€” | Total IDs ACKed | +| `processor_device_state_size` | gauge | โ€” | Current count of devices in the LRU map | +| `processor_device_state_evictions_total` | counter | โ€” | Total LRU evictions since start | +| `nodejs_*` | various | โ€” | Default Node process metrics | + +### Naming convention + +- `processor_*` for service-specific metrics. `tcp-ingestion` uses `teltonika_*` because that's its adapter; the Processor isn't bound to a vendor, so the service-name prefix fits. +- No external `service` label โ€” Prometheus scrape config adds it. + +### Health and readiness + +- `GET /healthz` โ†’ 200 if the process is alive. Always returns `{ "status": "ok" }`. +- `GET /readyz` โ†’ 200 if both Redis is ready (`redis.status === 'ready'`) AND Postgres is ready (last successful query within 30s, or a fresh `SELECT 1` succeeds quickly). 503 otherwise. +- Both endpoints return tiny JSON bodies for diagnostic value. + +### `processor_consumer_lag` measurement + +Sample every 10s in a separate setInterval (don't compute it on every read โ€” too noisy). Compute as: + +``` +lag = XLEN(stream) - position_of(group_last_delivered_id_in_stream) +``` + +Use `XINFO GROUPS ` โ†’ `lag` field (Redis 7.2+). If the field is absent, fall back to `XLEN` minus 0 (good-enough proxy when up to date; flag as "approximate" in the metric description). + +If sampling fails (Redis blip), log at `debug` and continue. Don't let metrics gather break the consumer. + +### HTTP server โ€” same minimal node:http + +No Express. Roughly 30 lines. Match `tcp-ingestion`'s style. + +## Acceptance criteria + +- [ ] `pnpm typecheck`, `pnpm lint`, `pnpm test` clean. +- [ ] `curl http://localhost:9090/metrics` returns valid exposition format with every metric in the inventory present (some at zero). +- [ ] After processing one record end-to-end, `processor_consumer_records_total` increments by 1, `processor_position_writes_total{status="inserted"}` increments by 1, `processor_acks_total` increments by 1. +- [ ] `/readyz` returns 503 while Redis is disconnected (simulate by `redis.disconnect()`), 200 once it reconnects. +- [ ] `/readyz` returns 503 while the Pool fails its health probe, 200 when it recovers. +- [ ] `nodejs_*` default metrics are exposed. + +## Risks / open questions + +- **Cardinality of label values.** None of the Phase 1 metrics use unbounded labels. Phase 2 may want per-stage metrics โ€” be careful: hundreds of stages is fine, hundreds of devices as labels is not. Keep the same rule as `tcp-ingestion`: per-device labels never go on Prometheus metrics; logs/traces are the right place. +- **`processor_consumer_lag` sampling cadence.** 10s is a guess. If alerts get jittery, lower to 5s or raise to 30s. Tunable later. + +## Done + +(Fill in once complete: commit SHA, brief notes.) diff --git a/.planning/phase-1-throughput/10-integration-test.md b/.planning/phase-1-throughput/10-integration-test.md new file mode 100644 index 0000000..9180ee4 --- /dev/null +++ b/.planning/phase-1-throughput/10-integration-test.md @@ -0,0 +1,58 @@ +# Task 1.10 โ€” Integration test (testcontainers Redis + Postgres) + +**Phase:** 1 โ€” Throughput pipeline +**Status:** โฌœ Not started +**Depends on:** 1.5, 1.7, 1.8, 1.9 +**Wiki refs:** โ€” + +## Goal + +End-to-end pipeline test: spin up Redis 7 and TimescaleDB via testcontainers, boot the Processor against them, publish a synthetic `Position` to `telemetry:t`, verify the row appears in `positions` with byte-equivalent attribute decoding (bigint, Buffer included). + +This is the integration test that proves the upstream contract from `tcp-ingestion` flows through end-to-end. Mirror `tcp-ingestion/test/publish.integration.test.ts`'s structure and skip-on-no-Docker pattern. + +## Deliverables + +- `test/pipeline.integration.test.ts`: + - `beforeAll`: start Redis container, start TimescaleDB container, run migrations, build a Processor instance pointed at both. If Docker is unavailable, log a clear skip message and set a flag so all `it` blocks early-return without failing. + - `afterAll`: stop the Processor, stop containers. + - Test 1: publish a Position with `bigint` and `Buffer` attributes via `XADD`; wait for the row in `positions` (poll, timeout 10s); assert `device_id`, `ts`, GPS fields, and a JSON round-trip of `attributes` matches the original (bigint as string, Buffer as base64). + - Test 2: publish two records with the same `(device_id, ts)`; verify only one row in `positions` (idempotency check). + - Test 3: publish a malformed payload (broken JSON) on the stream; verify `processor_decode_errors_total` increments and the bad entry stays in PEL (not ACKed). + - Test 4: simulate the writer failing once (e.g. by temporarily shutting Postgres mid-test, then bringing it back); verify the record gets retried and eventually lands. + +- Use the **TimescaleDB image**, not stock `postgres:7-alpine`. Suggested: `timescale/timescaledb:latest-pg16`. Confirm the migration's `CREATE EXTENSION IF NOT EXISTS timescaledb` no-ops (extension already loaded). +- Use the same Vitest config split as `tcp-ingestion`: `vitest.integration.config.ts` with `hookTimeout: 120_000`, `testTimeout: 60_000`. Default `pnpm test` excludes `*.integration.test.ts`; opt-in via `pnpm test:integration`. + +## Specification + +### Skip-on-no-Docker pattern + +Copy `tcp-ingestion/test/publish.integration.test.ts`'s pattern verbatim: +- Try to start the first container in `beforeAll`. On error, set `dockerAvailable = false`, log a warning, and return. +- Each `it` block early-returns with a `console.warn` if `!dockerAvailable`. +- This pattern was the fix for the CI test failure on the runner without Docker โ€” keep it. + +### Synthetic Position publishing + +Reuse `serializePosition` from `tcp-ingestion`'s `publish.ts` if it can be imported (likely not โ€” separate repos). Otherwise inline the encoding: a Position object โ†’ JSON.stringify with the bigint/Buffer replacer โ†’ `XADD telemetry:t * ts device_id codec 8E payload `. + +### Why test 4 (writer failure โ†’ retry) + +This validates the core ACK semantics: if a write fails, the record stays pending, and re-delivery brings it back. Without this test, we have unit tests showing each piece behaves correctly, but no proof the pieces compose right. Skip-conditions: if simulating Postgres failure mid-test is too flaky in testcontainers, weaken to: stop Postgres before publishing, publish, start Postgres, verify row appears. + +## Acceptance criteria + +- [ ] `pnpm test:integration` runs all four scenarios green when Docker is available. +- [ ] Without Docker, the suite logs skip messages and exits 0 (does not fail). +- [ ] CI (`pnpm test`, unit only) does not run these โ€” they are opt-in. +- [ ] First-run container pull is reasonable; subsequent runs are fast (testcontainers caches the image). + +## Risks / open questions + +- **Image pull on first CI run.** The TimescaleDB image is large (~700MB). If we ever wire integration tests into CI (separate job with Docker), pre-pulling may be required. Document but defer. +- **Test flakiness from polling.** Polling for "row appears in `positions`" uses a 10s timeout. If CI is slow, raise it. Don't replace polling with `await sleep(2000)` โ€” that's reliably wrong. + +## Done + +(Fill in once complete: commit SHA, brief notes.) diff --git a/.planning/phase-1-throughput/11-dockerfile-and-ci.md b/.planning/phase-1-throughput/11-dockerfile-and-ci.md new file mode 100644 index 0000000..5885303 --- /dev/null +++ b/.planning/phase-1-throughput/11-dockerfile-and-ci.md @@ -0,0 +1,86 @@ +# Task 1.11 โ€” Dockerfile & Gitea workflow + +**Phase:** 1 โ€” Throughput pipeline +**Status:** โฌœ Not started +**Depends on:** 1.10 +**Wiki refs:** โ€” + +## Goal + +Containerize the service and add the Gitea Actions workflow that builds and publishes `git.dev.microservices.al/trm/processor:main` on every push to `main`. Mirror `tcp-ingestion`'s slim variant โ€” same multi-stage Dockerfile, same single-job workflow with path filters. + +## Deliverables + +- `Dockerfile` โ€” multi-stage: deps โ†’ build โ†’ runtime. Match `tcp-ingestion/Dockerfile` line for line, adjusting only: + - `EXPOSE 9090` (only โ€” Processor has no TCP listener). + - `HEALTHCHECK` pointing at `/readyz` on `${METRICS_PORT}`. + - `CMD ["node", "dist/main.js"]`. +- `.gitea/workflows/build.yml` โ€” single-job workflow matching `tcp-ingestion/.gitea/workflows/build.yml`: + - Trigger: `push` to `main` (path filters: `src/`, `test/`, `package.json`, `pnpm-lock.yaml`, `tsconfig.json`, `Dockerfile`, `.gitea/workflows/build.yml`) + `workflow_dispatch`. + - Steps: checkout, setup-node@v4 (Node 22, pnpm), install, typecheck, lint, test (unit only), docker buildx build-push to `git.dev.microservices.al/trm/processor:main`. + - Uses `secrets.REGISTRY_USERNAME` / `secrets.REGISTRY_PASSWORD`. + - Final step: trigger Portainer webhook on success (uncommented; same as `tcp-ingestion` after the `:main` -> webhook auto-deploy got working). +- `compose.dev.yaml` โ€” local-build variant with `build: .`, named `processor-dev`, depends on a Redis service and a TimescaleDB service. Useful for verifying Dockerfile changes without the registry round-trip. +- `README.md` (the repo-level one, already a stub) โ€” flesh out with: + - Quick-start (local: `pnpm install && cp .env.example .env && pnpm dev`). + - "Run the Docker build locally" section (`docker compose -f compose.dev.yaml up --build`). + - Production-deployment note: image is pulled by the `deploy/` repo's stack; do not run standalone. + - Pin to a specific commit via `PROCESSOR_TAG=` in the deploy stack. + - Tests section (unit vs. integration). + - CI behavior summary. + - "Pilot deployment notes" section if anything is paused (Phase 1 has nothing paused โ€” note this and remove the section if so). + +## Specification + +### Dockerfile parity with `tcp-ingestion` + +Open `tcp-ingestion/Dockerfile` and copy structure verbatim. The only diffs from a Phase 1 Processor are: +- No `EXPOSE 5027` โ€” there's no TCP listener. +- `HEALTHCHECK` URL path is `/readyz` (already true for `tcp-ingestion`). +- Image label: `org.opencontainers.image.source` should point to the `processor` repo URL. + +This parity matters: when a future engineer needs to debug a build, having two services build the same way reduces cognitive load. + +### Workflow parity with `tcp-ingestion` + +Same. Open `tcp-ingestion/.gitea/workflows/build.yml`, copy, change image name and (if needed) path filters. The webhook step at the end should be uncommented so `:main` builds auto-deploy through Portainer. + +### Stage deploy + +Phase 1 ships ready to land in the `deploy/compose.yaml` (`trm/deploy` repo) as a new service. **Do not edit `deploy/compose.yaml` from this task.** Surface it in the final report: "Add `processor` service to `deploy/compose.yaml` with image, env, depends_on Redis + Postgres." That is a deploy-side change, made by the user. + +The `deploy/compose.yaml`'s service block will look roughly like: + +```yaml +processor: + image: git.dev.microservices.al/trm/processor:${PROCESSOR_TAG:-main} + depends_on: + redis: { condition: service_healthy } + postgres: { condition: service_healthy } + environment: + NODE_ENV: production + INSTANCE_ID: ${PROCESSOR_INSTANCE_ID:-processor-1} + REDIS_URL: redis://redis:6379 + POSTGRES_URL: postgres://... + LOG_LEVEL: ${LOG_LEVEL:-info} + restart: unless-stopped +``` + +Plus a Postgres service (TimescaleDB image) added to the stack โ€” the stack currently only has Redis + tcp-ingestion. That's the user's deploy decision to make. + +## Acceptance criteria + +- [ ] `docker build .` succeeds locally; resulting image runs and exposes `/healthz` on 9090. +- [ ] `docker compose -f compose.dev.yaml up --build` boots Redis + TimescaleDB + Processor; `/readyz` reports 200 once everything is up. +- [ ] Pushing to `main` (or hitting `workflow_dispatch`) builds the image, runs typecheck/lint/test, and pushes `:main` to the registry. +- [ ] Portainer webhook fires on successful push and the stage stack picks up the new image (assuming the `deploy/` stack is set up). +- [ ] Image size is reasonable (target < 250 MB final stage; the `tcp-ingestion` slim variant lands around there). + +## Risks / open questions + +- **Re-pull on stack redeploy.** The same Portainer issue we hit with `tcp-ingestion` (stack redeploy doesn't pull new images by default) will apply here. Make sure the same fix is in place ("Re-pull image" toggle, or per-commit-SHA tags) before this lands. Cross-reference the `tcp-ingestion` deploy note in `deploy/README.md`. +- **HEALTHCHECK `wget` availability.** `node:22-alpine` includes `wget`. If we ever switch base image, revisit. + +## Done + +(Fill in once complete: commit SHA, brief notes.) diff --git a/.planning/phase-1-throughput/README.md b/.planning/phase-1-throughput/README.md new file mode 100644 index 0000000..ca523cf --- /dev/null +++ b/.planning/phase-1-throughput/README.md @@ -0,0 +1,98 @@ +# Phase 1 โ€” Throughput pipeline + +Implement a Node.js worker that joins a Redis Streams consumer group, decodes `Position` records, upserts them into a TimescaleDB hypertable, maintains per-device in-memory state, and ships with the operational baseline (Prometheus metrics, health/readiness endpoints, integration tests, Dockerfile, Gitea CI/CD pipeline). + +## Outcome statement + +When Phase 1 is done: + +- The Processor connects to Redis and joins consumer group `processor` on stream `telemetry:t` (configurable). On startup it creates the group with `MKSTREAM` if missing. +- Every `Position` record published by `tcp-ingestion` lands as exactly one row in the `positions` hypertable, with `device_id`, `ts`, GPS fields, and the IO `attributes` bag preserved as `JSONB` (sentinel-decoded โ€” bigint values become `numeric`, Buffer values become `bytea` or `text` per the spec in task 1.2). +- Per-device in-memory state (`last_position`, `last_seen`, `position_count_session`) is updated on every record and bounded by an LRU cap. +- `XACK` is sent only after the Postgres write succeeds. A crashed instance leaves work pending; on its next start it picks up via consumer-group resumption, and any other instance can claim its pending entries (full `XAUTOCLAIM` polish lives in Phase 3, but the basic resumption works in Phase 1). +- `GET /metrics` returns Prometheus exposition format with consumer lag, throughput, write-latency histogram, error counters. `GET /healthz` and `GET /readyz` cover liveness and readiness (Redis ready + Postgres ready). +- The service builds reproducibly via a Gitea Actions workflow, publishing a Docker image to the Gitea container registry tagged `:main` (and per-commit SHA tags later if needed). +- An integration test spins up Redis + Postgres via testcontainers, publishes a synthetic `Position` to the input stream, and verifies the resulting row in `positions`. End-to-end byte-level round-trip including bigint and Buffer sentinel reversal. + +## Sequencing + +``` +1.1 Project scaffold + โ”œโ”€โ†’ 1.2 Core types & contracts + โ”‚ โ”œโ”€โ†’ 1.3 Configuration & logging + โ”‚ โ”œโ”€โ†’ 1.4 Postgres connection & positions hypertable + โ”‚ โ”‚ โ””โ”€โ†’ 1.7 Position writer (batched upsert) + โ”‚ โ””โ”€โ†’ 1.5 Redis Stream consumer + โ”‚ โ”œโ”€โ†’ 1.6 Per-device in-memory state + โ”‚ โ””โ”€โ†’ 1.8 Main wiring & ACK semantics (depends on 1.5, 1.6, 1.7) + โ”‚ โ””โ”€โ†’ 1.9 Observability + โ”‚ โ””โ”€โ†’ 1.10 Integration test + โ”‚ โ””โ”€โ†’ 1.11 Dockerfile & CI +``` + +Tasks 1.5/1.6/1.7 can be developed in parallel after 1.4 lands. Task 1.10 (integration test) should land *before* 1.11 because the Dockerfile depends on knowing what `pnpm test` and `pnpm test:integration` will do. + +## Files modified + +Phase 1 produces this layout in `processor/`: + +``` +processor/ +โ”œโ”€โ”€ .gitea/workflows/build.yml +โ”œโ”€โ”€ src/ +โ”‚ โ”œโ”€โ”€ core/ +โ”‚ โ”‚ โ”œโ”€โ”€ types.ts # Position, DeviceState, Metrics +โ”‚ โ”‚ โ”œโ”€โ”€ consumer.ts # XREADGROUP loop + claim handler +โ”‚ โ”‚ โ”œโ”€โ”€ writer.ts # Postgres batched upsert +โ”‚ โ”‚ โ”œโ”€โ”€ state.ts # in-memory device state with LRU +โ”‚ โ”‚ โ””โ”€โ”€ codec.ts # sentinel decode (__bigint, __buffer_b64) +โ”‚ โ”œโ”€โ”€ db/ +โ”‚ โ”‚ โ”œโ”€โ”€ pool.ts # pg.Pool factory +โ”‚ โ”‚ โ””โ”€โ”€ migrations/ +โ”‚ โ”‚ โ””โ”€โ”€ 0001_positions.sql # hypertable creation +โ”‚ โ”œโ”€โ”€ config/load.ts # zod schema for env +โ”‚ โ”œโ”€โ”€ observability/ +โ”‚ โ”‚ โ”œโ”€โ”€ logger.ts # pino root logger +โ”‚ โ”‚ โ””โ”€โ”€ metrics.ts # prom-client + HTTP server +โ”‚ โ””โ”€โ”€ main.ts +โ”œโ”€โ”€ test/ +โ”‚ โ”œโ”€โ”€ codec.test.ts +โ”‚ โ”œโ”€โ”€ state.test.ts +โ”‚ โ”œโ”€โ”€ consumer.test.ts # mocked Redis behaviour +โ”‚ โ”œโ”€โ”€ writer.test.ts # mocked pg behaviour +โ”‚ โ””โ”€โ”€ pipeline.integration.test.ts # testcontainers Redis + Postgres +โ”œโ”€โ”€ Dockerfile +โ”œโ”€โ”€ compose.dev.yaml +โ”œโ”€โ”€ package.json +โ”œโ”€โ”€ pnpm-lock.yaml +โ”œโ”€โ”€ tsconfig.json +โ”œโ”€โ”€ vitest.config.ts +โ”œโ”€โ”€ vitest.integration.config.ts +โ”œโ”€โ”€ .dockerignore +โ”œโ”€โ”€ .gitignore +โ”œโ”€โ”€ .prettierrc +โ”œโ”€โ”€ eslint.config.js +โ””โ”€โ”€ README.md +``` + +## Tech stack (decided) + +- **Node.js 22 LTS**, ESM-only. +- **TypeScript 5.x** with `strict: true`, `noUncheckedIndexedAccess: true`. +- **pnpm** for dependency management. +- **vitest** for tests (unit + integration split โ€” same pattern as `tcp-ingestion`). +- **pino** for structured logging (ISO timestamps, string level labels โ€” same config as `tcp-ingestion`). +- **prom-client** for Prometheus metrics. +- **ioredis** for Redis Streams (XREADGROUP, XACK, XAUTOCLAIM). +- **pg** (`pg` package, not `postgres.js`) for Postgres โ€” battle-tested, simple Pool API. +- **zod** for environment-variable validation. +- **testcontainers** for integration tests (Redis 7 + TimescaleDB). + +If an implementer wants to deviate, they must update the relevant task file first. + +## Key design decisions inherited from `tcp-ingestion` + +- **ESLint `import/no-restricted-paths`** โ€” `src/core/` cannot import from `src/domain/` (the boundary that protects Phase 1 from Phase 2 churn). `src/db/` is shared. +- **Logger config** โ€” `pino.stdTimeFunctions.isoTime` + level-as-string formatter. Lifecycle events at `info`; high-frequency per-record events at `debug` or `trace`. +- **Slim Dockerfile** โ€” multi-stage with BuildKit cache mounts, `pnpm fetch` + `pnpm install --offline` in the build stage, `pnpm prune --prod` for runtime. +- **CI workflow** โ€” single-job pattern matching `tcp-ingestion/.gitea/workflows/build.yml`. No `services:` block; no separate test container. diff --git a/.planning/phase-2-domain/README.md b/.planning/phase-2-domain/README.md new file mode 100644 index 0000000..adbf672 --- /dev/null +++ b/.planning/phase-2-domain/README.md @@ -0,0 +1,47 @@ +# Phase 2 โ€” Domain logic + +**Status:** โฌœ Not started โ€” blocks on Directus schema decisions + +The phase that makes the Processor *racing-aware*. Phase 1 produces a generic position firehose into Postgres; Phase 2 layers the domain rules that turn raw positions into racing events: geofence crossings, timing records, IO interpretation, stage results. + +## Outcome statement + +When Phase 2 is done: + +- Per-model Teltonika IO mappings (e.g. `FMB920 IO 16 โ†’ odometer_km`) live in a Directus-managed collection that the Processor reads at startup and refreshes on a known cadence. Decoded attributes are written to a typed shape alongside the raw bag. +- The geofence engine evaluates each incoming Position against the active geofences for the device's current event/stage and emits cross-events (entry/exit) when transitions happen. +- A `timing_records` table is written for each cross-event of interest (start gate, finish gate, intermediate splits), tied to the entry's bib/competitor/stage. +- A `stage_results` rollup is maintained per `(entry, stage)` showing total time, position, and split times. Updated on each new timing record. + +## Why this is a separate phase + +- **Throughput correctness is independent of domain correctness.** Phase 1 ships a working firehose; Phase 2 layers logic on top without touching the consumer/writer/state plumbing. +- **The Directus schema gates everything in this phase.** Geofences, entries, classes, device_assignments โ€” all live in Directus collections. Until those are designed and migrated, Phase 2 has no schema to write against. +- **Multiple Phase 1 production milestones can pass before Phase 2 starts.** Real-device pilot, second tcp-ingestion instance, Redis high availability โ€” none of those need Phase 2. + +## Tasks (sketched, not detailed) + +These tasks will get full task files once the Directus schema conversation is settled and we know the exact collection shapes. For now, this is the planned shape: + +| # | Task | Notes | +|---|------|-------| +| 2.1 | Directus reflection โ€” read-only client for `geofences`, `device_assignments`, `entries`, `events`, `stages` | Cached in memory, refreshed on a cadence; the boundary that lets the Processor know "what is this device currently racing in" | +| 2.2 | IO mapping table & per-model decoder | `device_models` collection in Directus โ†’ in-memory map โ†’ `decoded_attributes` JSONB column on `positions` (or a separate table) | +| 2.3 | Geofence engine | Per-position, evaluate active geofences for the device's current entry. Use PostGIS `ST_Contains` for the cross-detection. Emit cross-events | +| 2.4 | Timing record writer | Cross-events of interest โ†’ rows in `timing_records` (Directus-owned). Idempotent on `(entry_id, geofence_id, ts)` | +| 2.5 | Stage result aggregator | On each new `timing_records` row, recompute `stage_results.{total_time, position}` for the affected entry. Materialized incrementally to avoid full recomputation | +| 2.6 | Per-device runtime state extension | Phase 1's `DeviceState` extended with current entry, current stage, last geofence membership, accumulators. Note: Phase 3 rehydration becomes important once this state has substance | + +## Architectural boundary to maintain + +`src/core/` from Phase 1 stays untouched. Phase 2 lives in `src/domain/`. The wire-up point is the `sink` function in `src/main.ts`: after `state.update` and `writer.write`, the sink invokes domain handlers. Per the ESLint rule from task 1.1, `src/core/` cannot import from `src/domain/` โ€” only `main.ts` glues them. + +## Open questions blocking task-level detail + +(These get answered in the Directus schema conversation.) + +1. Are `geofences` org-scoped, event-scoped, or both? +2. Is `device_assignments` time-bounded (start_at + end_at) or just event-bounded? +3. Where does the IO mapping table live โ€” Directus collection, hardcoded in Processor, or in a config file? +4. What's the canonical name for the sub-event unit โ€” `stage`, `session`, `run`, `leg`? +5. Is there a live leaderboard requirement, or is timing reviewed post-event? diff --git a/.planning/phase-3-hardening/README.md b/.planning/phase-3-hardening/README.md new file mode 100644 index 0000000..e7fa45c --- /dev/null +++ b/.planning/phase-3-hardening/README.md @@ -0,0 +1,45 @@ +# Phase 3 โ€” Production hardening + +**Status:** โฌœ Not started + +The set of operational features that turn a working pilot into something safe to leave running unattended through deploys, instance failures, and bad data. + +## Outcome statement + +When Phase 3 is done: + +- **Graceful shutdown** with bounded in-flight drain: SIGTERM blocks new reads, awaits in-flight writes, ACKs anything still in PEL whose write succeeded, exits clean. +- **State rehydration on restart**: on first packet for an unknown device, the Processor queries Postgres for the device's `last_position` and seeds `DeviceState` accordingly. Phase 2 accumulators get the same treatment (e.g. last geofence membership comes from the last `timing_records` row). +- **`XAUTOCLAIM` for stuck pending entries**: at startup and on a cadence, the Processor claims entries that have been pending in another consumer's PEL for longer than `CLAIM_THRESHOLD_MS`. Lets a dead instance's work get picked up by survivors without manual intervention. +- **Dead-letter stream for poison records**: records that fail to decode N times go to `telemetry:t:dlq` with the original payload + the error. Operators can inspect, fix, replay. +- **Multi-instance load split verified**: spinning up two Processor instances against the same consumer group splits the work evenly. End-to-end test in CI (or at least a manual playbook). +- **Migration safety with multiple instances**: Postgres advisory locks around the migration runner so two instances starting simultaneously don't race. +- **Uncaught exception / unhandled rejection handlers**: log, flush in-memory state to a panic dump file, exit with a code Portainer treats as restart-worthy. +- **`OPERATIONS.md` runbook**: exact commands for "claim stuck entries from a dead instance," "drain the DLQ," "force-rehydrate a single device," "view consumer lag," etc. + +## Tasks (sketched, not detailed) + +| # | Task | Notes | +|---|------|-------| +| 3.1 | Graceful shutdown โ€” full | Replaces the Phase 1 stub. Drain budget configurable. Tested end-to-end | +| 3.2 | Per-device state rehydration on first-packet | Single `SELECT ... LIMIT 1` per cold device. Memoized by LRU | +| 3.3 | `XAUTOCLAIM` runner | Periodic + on-startup. Claims entries pending > `CLAIM_THRESHOLD_MS`. Re-runs the sink | +| 3.4 | Dead-letter stream | After N failed decodes/writes, record goes to `telemetry:t:dlq`; original ACKed off the main stream | +| 3.5 | Migration advisory lock | `pg_advisory_lock()` around the migrate runner; two instances can start simultaneously | +| 3.6 | Uncaught exception / unhandled rejection handlers | Log, flush, exit. Match `tcp-ingestion`'s eventual Phase 1 task 1.12 work when that lands | +| 3.7 | OPERATIONS.md | The runbook | +| 3.8 | Multi-instance load test | A test (manual or in CI) that proves two instances split the work; document expected lag behaviour during failover | + +## Why this is a separate phase + +Phase 1 + Phase 2 produce a service that *works*. Phase 3 is what you do *before you stop watching it*. None of these tasks change correctness โ€” they change operational ergonomics. + +## Resume triggers + +Each Phase 3 task has its own resume trigger. The whole phase doesn't have to land at once: + +- **3.1, 3.5, 3.6** before adding a second Processor instance (rolling deploys become safe). +- **3.2** before any Phase 2 task that depends on hot state (geofence membership) โ€” without rehydration, a restart would forget which geofence each device is in until the device crosses a boundary again. +- **3.3, 3.4** before the pilot is "always-on" (operators need a way to handle stuck/poison records without touching production). +- **3.7** can land alongside whichever of the above ships first; updates over time. +- **3.8** before the second instance is added. diff --git a/.planning/phase-4-future/README.md b/.planning/phase-4-future/README.md new file mode 100644 index 0000000..3a3362e --- /dev/null +++ b/.planning/phase-4-future/README.md @@ -0,0 +1,21 @@ +# Phase 4 โ€” Future / optional + +**Status:** โ„๏ธ Not committed + +Ideas on radar that may or may not become real tasks. Captured here so they don't get forgotten and so we have a place to push scope creep that surfaces during Phase 1โ€“3. + +## Candidates + +- **Directus Flow trigger emission.** When a domain event fires (timing record written, stage result computed, anomaly detected), publish a structured event Directus Flows can subscribe to. Lets Directus orchestrate notifications, integrations, derived workflows without polling the database. + +- **Replay tooling.** Read historical positions for a device + time range from Postgres, re-emit them through the domain pipeline (geofence engine, timing logic) without touching `positions`. Useful for: validating a new geofence layout against past races, regenerating timing records after a rule change, demoing. + +- **Derived-metric backfill.** When the IO mapping table changes (new model, corrected mapping), backfill `decoded_attributes` for affected devices over a chosen time range without touching `positions`. + +- **Alternate consumer for analytics export.** A second consumer group reading the same stream, writing to a parallel destination (Parquet on object storage, ClickHouse, etc.) for offline analytics. The Phase 1 architecture already supports this โ€” it's a separate process joining the same stream with a different group name. No Processor changes needed; just operational scaffolding. + +- **WebSocket gateway for live updates.** If Directus's WebSocket subscriptions hit a fan-out ceiling for spectator-facing live leaderboards, a dedicated gateway reads from Redis and pushes to clients, bypassing Directus for the live channel only. REST/GraphQL stays in Directus. Mentioned in `wiki/entities/directus.md`. + +- **Per-instance sharding hint.** If consumer-group load distribution turns out to be uneven (one instance handles all the chatty devices), introduce hashing-by-device-id with explicit assignment. Probably overkill โ€” Redis Streams' default round-robin works for most workloads. + +None of these are committed. Move them out of Phase 4 and into a numbered phase only when there's a concrete reason to do them. diff --git a/README.md b/README.md index e69de29..e752cfd 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,7 @@ +# processor + +Node.js worker that consumes `Position` records from a Redis Stream (produced by `tcp-ingestion`), maintains per-device runtime state, applies racing-domain rules, and writes durable state to Postgres / TimescaleDB. + +For the architectural specification see [`../docs/wiki/entities/processor.md`](../docs/wiki/entities/processor.md). For the work plan and task status see [`.planning/ROADMAP.md`](./.planning/ROADMAP.md). + +This service is part of the [TRM](https://git.dev.microservices.al/trm) (Time Racing Management) platform.