# Phase 1 — Throughput pipeline Implement a Node.js worker that joins a Redis Streams consumer group, decodes `Position` records, upserts them into a TimescaleDB hypertable, maintains per-device in-memory state, and ships with the operational baseline (Prometheus metrics, health/readiness endpoints, integration tests, Dockerfile, Gitea CI/CD pipeline). ## Outcome statement When Phase 1 is done: - The Processor connects to Redis and joins consumer group `processor` on stream `telemetry:teltonika` (configurable; must match tcp-ingestion's compiled default). On startup it creates the group with `MKSTREAM` if missing. - Every `Position` record published by `tcp-ingestion` lands as exactly one row in the `positions` hypertable, with `device_id`, `ts`, GPS fields, and the IO `attributes` bag preserved as `JSONB` (sentinel-decoded — bigint values become `numeric`, Buffer values become `bytea` or `text` per the spec in task 1.2). - Per-device in-memory state (`last_position`, `last_seen`, `position_count_session`) is updated on every record and bounded by an LRU cap. - `XACK` is sent only after the Postgres write succeeds. A crashed instance leaves work pending; on its next start it picks up via consumer-group resumption, and any other instance can claim its pending entries (full `XAUTOCLAIM` polish lives in Phase 3, but the basic resumption works in Phase 1). - `GET /metrics` returns Prometheus exposition format with consumer lag, throughput, write-latency histogram, error counters. `GET /healthz` and `GET /readyz` cover liveness and readiness (Redis ready + Postgres ready). - The service builds reproducibly via a Gitea Actions workflow, publishing a Docker image to the Gitea container registry tagged `:main` (and per-commit SHA tags later if needed). - An integration test spins up Redis + Postgres via testcontainers, publishes a synthetic `Position` to the input stream, and verifies the resulting row in `positions`. End-to-end byte-level round-trip including bigint and Buffer sentinel reversal. ## Sequencing ``` 1.1 Project scaffold ├─→ 1.2 Core types & contracts │ ├─→ 1.3 Configuration & logging │ ├─→ 1.4 Postgres connection & positions hypertable │ │ └─→ 1.7 Position writer (batched upsert) │ └─→ 1.5 Redis Stream consumer │ ├─→ 1.6 Per-device in-memory state │ └─→ 1.8 Main wiring & ACK semantics (depends on 1.5, 1.6, 1.7) │ └─→ 1.9 Observability │ └─→ 1.10 Integration test │ └─→ 1.11 Dockerfile & CI ``` Tasks 1.5/1.6/1.7 can be developed in parallel after 1.4 lands. Task 1.10 (integration test) should land *before* 1.11 because the Dockerfile depends on knowing what `pnpm test` and `pnpm test:integration` will do. ## Files modified Phase 1 produces this layout in `processor/`: ``` processor/ ├── .gitea/workflows/build.yml ├── src/ │ ├── core/ │ │ ├── types.ts # Position, DeviceState, Metrics │ │ ├── consumer.ts # XREADGROUP loop + claim handler │ │ ├── writer.ts # Postgres batched upsert │ │ ├── state.ts # in-memory device state with LRU │ │ └── codec.ts # sentinel decode (__bigint, __buffer_b64) │ ├── db/ │ │ ├── pool.ts # pg.Pool factory │ │ └── migrations/ │ │ └── 0001_positions.sql # hypertable creation │ ├── config/load.ts # zod schema for env │ ├── observability/ │ │ ├── logger.ts # pino root logger │ │ └── metrics.ts # prom-client + HTTP server │ └── main.ts ├── test/ │ ├── codec.test.ts │ ├── state.test.ts │ ├── consumer.test.ts # mocked Redis behaviour │ ├── writer.test.ts # mocked pg behaviour │ └── pipeline.integration.test.ts # testcontainers Redis + Postgres ├── Dockerfile ├── compose.dev.yaml ├── package.json ├── pnpm-lock.yaml ├── tsconfig.json ├── vitest.config.ts ├── vitest.integration.config.ts ├── .dockerignore ├── .gitignore ├── .prettierrc ├── eslint.config.js └── README.md ``` ## Tech stack (decided) - **Node.js 22 LTS**, ESM-only. - **TypeScript 5.x** with `strict: true`, `noUncheckedIndexedAccess: true`. - **pnpm** for dependency management. - **vitest** for tests (unit + integration split — same pattern as `tcp-ingestion`). - **pino** for structured logging (ISO timestamps, string level labels — same config as `tcp-ingestion`). - **prom-client** for Prometheus metrics. - **ioredis** for Redis Streams (XREADGROUP, XACK, XAUTOCLAIM). - **pg** (`pg` package, not `postgres.js`) for Postgres — battle-tested, simple Pool API. - **zod** for environment-variable validation. - **testcontainers** for integration tests (Redis 7 + TimescaleDB). If an implementer wants to deviate, they must update the relevant task file first. ## Key design decisions inherited from `tcp-ingestion` - **ESLint `import/no-restricted-paths`** — `src/core/` cannot import from `src/domain/` (the boundary that protects Phase 1 from Phase 2 churn). `src/db/` is shared. - **Logger config** — `pino.stdTimeFunctions.isoTime` + level-as-string formatter. Lifecycle events at `info`; high-frequency per-record events at `debug` or `trace`. - **Slim Dockerfile** — multi-stage with BuildKit cache mounts, `pnpm fetch` + `pnpm install --offline` in the build stage, `pnpm prune --prod` for runtime. - **CI workflow** — single-job pattern matching `tcp-ingestion/.gitea/workflows/build.yml`. No `services:` block; no separate test container.