diff --git a/.gitea/workflows/build.yml b/.gitea/workflows/build.yml new file mode 100644 index 0000000..c3661c5 --- /dev/null +++ b/.gitea/workflows/build.yml @@ -0,0 +1,68 @@ +name: Build and Push processor + +on: + push: + branches: [main] + paths: + - 'src/**' + - 'test/**' + - 'package.json' + - 'pnpm-lock.yaml' + - 'tsconfig.json' + - 'vitest.config.ts' + - 'vitest.integration.config.ts' + - 'eslint.config.js' + - 'Dockerfile' + - '.dockerignore' + - '.gitea/workflows/build.yml' + workflow_dispatch: + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Node 22 + uses: actions/setup-node@v4 + with: + node-version: 22 + + - name: Enable pnpm + run: corepack enable && corepack prepare pnpm@latest-9 --activate + + - name: Install dependencies + run: pnpm install --frozen-lockfile + + - name: Typecheck + run: pnpm typecheck + + - name: Lint + run: pnpm lint + + - name: Test + run: pnpm test + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + with: + driver: docker-container + + - name: Login to Gitea Registry + uses: docker/login-action@v3 + with: + registry: git.dev.microservices.al + username: ${{ secrets.REGISTRY_USERNAME }} + password: ${{ secrets.REGISTRY_PASSWORD }} + + - name: Build and Push + uses: docker/build-push-action@v5 + with: + context: . + push: true + tags: git.dev.microservices.al/trm/processor:main + + - name: Trigger Portainer Deploy + if: success() + run: curl -X POST "${{ secrets.PORTAINER_WEBHOOK_URL }}" diff --git a/.planning/ROADMAP.md b/.planning/ROADMAP.md index 1eec1b9..61659d9 100644 --- a/.planning/ROADMAP.md +++ b/.planning/ROADMAP.md @@ -40,7 +40,7 @@ These rules govern every task. Any deviation must be discussed and documented as ### Phase 1 โ€” Throughput pipeline -**Status:** ๐ŸŸจ In progress (1.1โ€“1.8 done; 1.9โ€“1.11 ahead) +**Status:** ๐ŸŸฉ Done **Outcome:** A Node.js Processor that joins a Redis Streams consumer group on `telemetry:t`, decodes each `Position` (including `__bigint`/`__buffer_b64` sentinel reversal), upserts it into a TimescaleDB `positions` hypertable, updates per-device in-memory state (last position, last seen), `XACK`s on successful write, and exposes Prometheus metrics + health/readiness HTTP endpoints. End-to-end pilot-quality service; no domain logic yet. [**See `phase-1-throughput/README.md`**](./phase-1-throughput/README.md) @@ -55,9 +55,9 @@ These rules govern every task. Any deviation must be discussed and documented as | 1.6 | [Per-device in-memory state](./phase-1-throughput/06-device-state.md) | ๐ŸŸฉ | `68d3da3` | | 1.7 | [Position writer (batched upsert)](./phase-1-throughput/07-position-writer.md) | ๐ŸŸฉ | `68d3da3` | | 1.8 | [Main wiring & ACK semantics](./phase-1-throughput/08-main-wiring.md) | ๐ŸŸฉ | `68d3da3` | -| 1.9 | [Observability (Prometheus metrics + /healthz + /readyz)](./phase-1-throughput/09-observability.md) | โฌœ | โ€” | -| 1.10 | [Integration test (testcontainers Redis + Postgres)](./phase-1-throughput/10-integration-test.md) | โฌœ | โ€” | -| 1.11 | [Dockerfile & Gitea workflow](./phase-1-throughput/11-dockerfile-and-ci.md) | โฌœ | โ€” | +| 1.9 | [Observability (Prometheus metrics + /healthz + /readyz)](./phase-1-throughput/09-observability.md) | ๐ŸŸฉ | *(pending commit SHA)* | +| 1.10 | [Integration test (testcontainers Redis + Postgres)](./phase-1-throughput/10-integration-test.md) | ๐ŸŸฉ | *(pending commit SHA)* | +| 1.11 | [Dockerfile & Gitea workflow](./phase-1-throughput/11-dockerfile-and-ci.md) | ๐ŸŸฉ | *(pending commit SHA)* | ### Phase 2 โ€” Domain logic diff --git a/.planning/phase-1-throughput/09-observability.md b/.planning/phase-1-throughput/09-observability.md index 94d2748..81c498e 100644 --- a/.planning/phase-1-throughput/09-observability.md +++ b/.planning/phase-1-throughput/09-observability.md @@ -1,7 +1,7 @@ # Task 1.9 โ€” Observability (Prometheus metrics + /healthz + /readyz) **Phase:** 1 โ€” Throughput pipeline -**Status:** โฌœ Not started +**Status:** ๐ŸŸฉ Done **Depends on:** 1.5, 1.6, 1.7, 1.8 **Wiki refs:** `docs/wiki/entities/processor.md`, `docs/wiki/sources/gps-tracking-architecture.md` ยง 7.4 @@ -79,4 +79,4 @@ No Express. Roughly 30 lines. Match `tcp-ingestion`'s style. ## Done -(Fill in once complete: commit SHA, brief notes.) +Real prom-client implementation replacing the trace-log shim. All 10 Phase 1 metrics registered; `/healthz`, `/readyz` (cached SELECT 1 Postgres health check, 5 s TTL), `/metrics` endpoints live. Consumer lag sampled every 10 s via `XINFO GROUPS`. `createPostgresHealthCheck` and `createConsumerLagSampler` exported for graceful-shutdown wiring. 22 new unit tests in `test/metrics.test.ts`. *(pending commit SHA)* diff --git a/.planning/phase-1-throughput/10-integration-test.md b/.planning/phase-1-throughput/10-integration-test.md index 9180ee4..d6d3481 100644 --- a/.planning/phase-1-throughput/10-integration-test.md +++ b/.planning/phase-1-throughput/10-integration-test.md @@ -1,7 +1,7 @@ # Task 1.10 โ€” Integration test (testcontainers Redis + Postgres) **Phase:** 1 โ€” Throughput pipeline -**Status:** โฌœ Not started +**Status:** ๐ŸŸฉ Done **Depends on:** 1.5, 1.7, 1.8, 1.9 **Wiki refs:** โ€” @@ -55,4 +55,4 @@ This validates the core ACK semantics: if a write fails, the record stays pendin ## Done -(Fill in once complete: commit SHA, brief notes.) +`test/pipeline.integration.test.ts`: four scenarios (happy path with bigint+Buffer, idempotency, malformed payload stays pending, writer failure โ†’ retry after Postgres restart). Uses `timescale/timescaledb:latest-pg16`; skip-on-no-Docker pattern verified (exits 0 without Docker). `pnpm test:integration` runs 4 tests green with Docker, 4 skips without. *(pending commit SHA)* diff --git a/.planning/phase-1-throughput/11-dockerfile-and-ci.md b/.planning/phase-1-throughput/11-dockerfile-and-ci.md index 5885303..49ead27 100644 --- a/.planning/phase-1-throughput/11-dockerfile-and-ci.md +++ b/.planning/phase-1-throughput/11-dockerfile-and-ci.md @@ -1,7 +1,7 @@ # Task 1.11 โ€” Dockerfile & Gitea workflow **Phase:** 1 โ€” Throughput pipeline -**Status:** โฌœ Not started +**Status:** ๐ŸŸฉ Done **Depends on:** 1.10 **Wiki refs:** โ€” @@ -83,4 +83,4 @@ Plus a Postgres service (TimescaleDB image) added to the stack โ€” the stack cur ## Done -(Fill in once complete: commit SHA, brief notes.) +`Dockerfile` (multi-stage, `EXPOSE 9090` only, `HEALTHCHECK` on `/readyz`), `.gitea/workflows/build.yml` (mirrors tcp-ingestion; Portainer webhook uncommented), `compose.dev.yaml` (Redis + TimescaleDB + processor-dev), `README.md` fleshed out. *(pending commit SHA)* diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b6241d9 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,30 @@ +# syntax=docker/dockerfile:1.7 + +# ---- deps stage: install with cache-friendly pnpm fetch ---- +FROM node:22-alpine AS deps +WORKDIR /app +RUN corepack enable && corepack prepare pnpm@latest-9 --activate +COPY package.json pnpm-lock.yaml ./ +RUN --mount=type=cache,id=pnpm-store,target=/root/.local/share/pnpm/store \ + pnpm fetch + +# ---- build stage: compile TypeScript ---- +FROM deps AS build +COPY . . +RUN --mount=type=cache,id=pnpm-store,target=/root/.local/share/pnpm/store \ + pnpm install --frozen-lockfile --offline +RUN pnpm build +RUN pnpm prune --prod + +# ---- runtime: slim, non-root ---- +FROM node:22-alpine AS runtime +WORKDIR /app +RUN addgroup -S app && adduser -S -G app app +COPY --from=build --chown=app:app /app/node_modules ./node_modules +COPY --from=build --chown=app:app /app/dist ./dist +COPY --from=build --chown=app:app /app/package.json ./package.json +USER app +EXPOSE 9090 +HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \ + CMD wget -qO- http://localhost:${METRICS_PORT:-9090}/readyz || exit 1 +CMD ["node", "dist/main.js"] diff --git a/README.md b/README.md index e752cfd..f0c3d48 100644 --- a/README.md +++ b/README.md @@ -5,3 +5,90 @@ Node.js worker that consumes `Position` records from a Redis Stream (produced by For the architectural specification see [`../docs/wiki/entities/processor.md`](../docs/wiki/entities/processor.md). For the work plan and task status see [`.planning/ROADMAP.md`](./.planning/ROADMAP.md). This service is part of the [TRM](https://git.dev.microservices.al/trm) (Time Racing Management) platform. + +--- + +## Quick start (local) + +**Prerequisites:** Node.js 22+, pnpm, a local Redis instance, and a TimescaleDB instance. + +```bash +git clone +cd processor +pnpm install +cp .env.example .env +# Edit .env โ€” at minimum set REDIS_URL and POSTGRES_URL +pnpm dev +``` + +`pnpm dev` uses `tsx watch` for hot-reload during development. The metrics server listens on `METRICS_PORT` (default `9090`). The service connects to Redis and Postgres on startup; both must be reachable before the process starts. + +--- + +## Test the Docker build locally + +`compose.dev.yaml` builds the image from source and runs it next to Redis and TimescaleDB containers. Useful for verifying Dockerfile changes before pushing: + +```bash +docker compose -f compose.dev.yaml up --build +``` + +Once running, the readiness endpoint confirms everything is wired: + +```bash +curl http://localhost:9090/readyz +# {"status":"ok"} +``` + +For day-to-day development, prefer `pnpm dev` directly โ€” it has hot reload and faster iteration. + +--- + +## Production / stage deployment + +This service is **not** deployed standalone. It runs as part of the platform stack defined in the [`deploy/`](https://git.dev.microservices.al/trm/deploy) repo, which Portainer pulls and runs on the stage and production hosts. + +The image itself is published to `git.dev.microservices.al/trm/processor:main` on every push to `main` (see CI behavior below). The `deploy/` repo's `compose.yaml` references that image; updates flow through there, not through this repo. + +To pin a specific commit in production, set `PROCESSOR_TAG=` in the deploy stack's environment variables. + +> **Note:** The `deploy/compose.yaml` will need a `processor` service entry and a TimescaleDB service added before this service can run in stage/production. See `.planning/phase-1-throughput/11-dockerfile-and-ci.md` for the expected service block shape. That is a deploy-side change for the user to make. + +--- + +## Environment variables + +See `.env.example` for all variables with descriptions and defaults. Required variables: + +| Variable | Description | +|---|---| +| `REDIS_URL` | Redis connection URL, e.g. `redis://localhost:6379` | +| `POSTGRES_URL` | TimescaleDB connection URL, e.g. `postgres://user:pass@host:5432/trm` | + +All other variables have sensible defaults (see `.env.example`). + +--- + +## Tests + +- `pnpm test` โ€” unit tests only. Fast (~1โ€“2 s), no external dependencies. **This is what CI runs.** +- `pnpm test:integration` โ€” integration tests that need Docker (testcontainers spins up real Redis 7 and TimescaleDB containers). **Opt-in.** Run locally before changes to the consumer, writer, or migration. + +Integration tests live in `test/**/*.integration.test.ts` and are excluded from the default run by `vitest.config.ts`. + +### Without Docker + +If Docker is unavailable, `pnpm test:integration` still exits 0 โ€” the suite logs a skip message per test and does not fail the build. This is the correct behavior for CI runners that lack Docker access. + +--- + +## CI behavior + +Gitea Actions workflow is at `.gitea/workflows/build.yml`. + +- **Push to `main`** (only when `src/`, `test/`, build config, Dockerfile, or the workflow file itself changes): runs `typecheck`, `lint`, `test` (unit tests only), then builds and pushes the Docker image tagged `:main`. Auto-deploys to stage if a Portainer webhook is configured via `secrets.PORTAINER_WEBHOOK_URL`. +- **Manual trigger** (`workflow_dispatch`): same flow, run on demand. + +Integration tests are not run in CI โ€” they need Docker access on the runner, which is not currently configured. Run them locally as needed. + +The workflow uses `secrets.REGISTRY_USERNAME` and `secrets.REGISTRY_PASSWORD` for the Gitea registry login โ€” these must be configured in the repo's (or org's) Actions secrets. diff --git a/compose.dev.yaml b/compose.dev.yaml new file mode 100644 index 0000000..131c442 --- /dev/null +++ b/compose.dev.yaml @@ -0,0 +1,47 @@ +# Local development compose โ€” builds the image from this repo's source tree +# and runs the service alongside Redis and TimescaleDB containers. +# +# Use this for verifying Dockerfile changes locally before pushing. For +# day-to-day development, run `pnpm dev` directly against host-exposed services. +# +# For STAGE and PRODUCTION deployment, use the multi-service compose in +# the sibling `deploy/` repo (https://git.dev.microservices.al/trm/deploy), +# which references this service by its registry image tag instead of +# building locally. +# +# Usage: +# docker compose -f compose.dev.yaml up --build +# docker compose -f compose.dev.yaml down + +name: processor-dev + +services: + redis: + image: redis:7-alpine + expose: + - '6379' + restart: unless-stopped + + timescaledb: + image: timescale/timescaledb:latest-pg16 + expose: + - '5432' + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: trm + restart: unless-stopped + + processor: + build: . + depends_on: [redis, timescaledb] + ports: + - '9090:9090' + environment: + NODE_ENV: production + INSTANCE_ID: dev-1 + REDIS_URL: redis://redis:6379 + POSTGRES_URL: postgres://postgres:postgres@timescaledb:5432/trm + LOG_LEVEL: debug + METRICS_PORT: 9090 + restart: unless-stopped diff --git a/src/main.ts b/src/main.ts index 7fe0ebf..39b26ce 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,15 +1,21 @@ +import type * as http from 'node:http'; import type { Redis } from 'ioredis'; import type pg from 'pg'; import { loadConfig } from './config/load.js'; import type { Config } from './config/load.js'; import { createLogger } from './observability/logger.js'; +import { + createMetrics, + startMetricsServer, + createPostgresHealthCheck, + createConsumerLagSampler, +} from './observability/metrics.js'; import { createPool, connectWithRetry } from './db/pool.js'; import { runMigrations } from './db/migrate.js'; import { connectRedis, createConsumer } from './core/consumer.js'; import type { ConsumedRecord } from './core/consumer.js'; import { createDeviceStateStore } from './core/state.js'; import { createWriter } from './core/writer.js'; -import type { Metrics } from './core/types.js'; // ------------------------------------------------------------------------- // Startup: validate config (fail fast on bad env), build logger @@ -33,33 +39,21 @@ const logger = createLogger({ logger.info('processor starting'); -// ------------------------------------------------------------------------- -// Metrics placeholder shim (task 1.9 replaces this with prom-client) -// -// Uses trace-level logging so the calls are observable in development but -// are silent in production builds where the log level is info or higher. -// This mirrors tcp-ingestion's approach before task 1.10 landed there. -// ------------------------------------------------------------------------- - -const metrics: Metrics = { - inc: (name: string, labels?: Record) => { - logger.trace({ metric: name, labels }, 'metrics.inc'); - }, - observe: (name: string, value: number, labels?: Record) => { - logger.trace({ metric: name, value, labels }, 'metrics.observe'); - }, -}; - // ------------------------------------------------------------------------- // Wire up the pipeline // ------------------------------------------------------------------------- async function main(): Promise { - // 1. Connect Postgres with exponential-backoff retry + // 1. Build real prom-client metrics (replaces the trace-log shim from + // pre-1.9 main.ts). Metrics are wired before any I/O so that counters + // start at zero from the moment the process starts. + const metrics = createMetrics(); + + // 2. Connect Postgres with exponential-backoff retry const pool = createPool(config.POSTGRES_URL); await connectWithRetry(pool, logger); - // 2. Run migrations before any consumer activity. + // 3. Run migrations before any consumer activity. // Phase 1 limitation: multiple instances starting simultaneously both try // to migrate. Postgres advisory locks would solve this โ€” deferred to Phase 3 // (production hardening), which is acceptable for the Phase 1 single-instance @@ -67,14 +61,41 @@ async function main(): Promise { await runMigrations(pool, logger); logger.info('migrations applied'); - // 3. Connect Redis with exponential-backoff retry + // 4. Connect Redis with exponential-backoff retry const redis: Redis = await connectRedis(config.REDIS_URL, logger); - // 4. Build pipeline components + // 5. Build pipeline components const state = createDeviceStateStore(config, logger); const writer = createWriter(pool, config, logger, metrics); - // 5. Define the sink: central decision point for state update and Postgres write. + // 6. Postgres health check โ€” background cached SELECT 1 for /readyz. + // The check starts probing immediately so /readyz is accurate from the + // first request after the metrics server starts listening. + const pgHealth = createPostgresHealthCheck(pool); + + // 7. Start metrics HTTP server. + // Bound before the consumer starts so /healthz responds even during the + // brief window between metrics-server start and first stream read. + const metricsServer: http.Server = startMetricsServer( + config.METRICS_PORT, + () => metrics.serializeMetrics(), + { + isRedisReady: () => redis.status === 'ready', + isPostgresReady: pgHealth.isReady, + }, + ); + logger.info({ port: config.METRICS_PORT }, 'metrics server listening'); + + // 8. Start consumer lag sampler (background interval, every 10 s). + const lagSampler = createConsumerLagSampler( + redis, + config.REDIS_TELEMETRY_STREAM, + config.REDIS_CONSUMER_GROUP, + metrics, + (msg) => logger.debug(msg), + ); + + // 9. Define the sink: central decision point for state update and Postgres write. // State is updated BEFORE the write so that in-memory state is consistent with // what has been seen, even if the Postgres write subsequently fails. If the write // fails the record stays pending (not ACKed) and will be re-delivered โ€” applying @@ -82,54 +103,75 @@ async function main(): Promise { // only position_count_session is double-counted, which is a session counter that // resets on restart and is not a correctness concern. const sink = async (records: ConsumedRecord[]): Promise => { - // 5a. Update in-memory state for every record (cheap, synchronous-like, cannot + // 9a. Update in-memory state for every record (cheap, synchronous-like, cannot // fail meaningfully โ€” Map operations do not throw). for (const record of records) { state.update(record.position); } - // 5b. Write to Postgres + // 9b. Emit device-state gauges (sampled per-batch; cheap). + metrics.observe('processor_device_state_size', state.size()); + + // 9c. Write to Postgres const results = await writer.write(records); - // 5c. ACK only the IDs that succeeded or were already present. + // 9d. ACK only the IDs that succeeded or were already present. // 'failed' records are deliberately left pending for retry. - return results + const ackIds = results .filter((r) => r.status === 'inserted' || r.status === 'duplicate') .map((r) => r.id); + + if (ackIds.length > 0) { + metrics.inc('processor_acks_total'); + } + + return ackIds; }; - // 6. Build and start the consumer + // 10. Build and start the consumer const consumer = createConsumer(redis, config, logger, metrics, sink); await consumer.start(); - // 7. Install graceful shutdown stub. - // Full Phase 3 hardening: explicit consumer-group commit on SIGTERM, - // uncaught-exception handler, multi-instance drain mode. - installGracefulShutdown({ redis, pool, consumer, logger }); + // 11. Install graceful shutdown. + // Full Phase 3 hardening: explicit consumer-group commit on SIGTERM, + // uncaught-exception handler, multi-instance drain mode. + installGracefulShutdown({ + redis, + pool, + consumer, + metricsServer, + pgHealth, + lagSampler, + logger, + }); logger.info( { stream: config.REDIS_TELEMETRY_STREAM, group: config.REDIS_CONSUMER_GROUP, consumer: config.REDIS_CONSUMER_NAME, + metricsPort: config.METRICS_PORT, }, 'processor ready', ); } // ------------------------------------------------------------------------- -// Graceful shutdown stub โ€” Phase 3 finalizes this +// Graceful shutdown โ€” Phase 3 finalizes this // ------------------------------------------------------------------------- type ShutdownDeps = { readonly redis: Redis; readonly pool: pg.Pool; readonly consumer: { stop: () => Promise }; + readonly metricsServer: http.Server; + readonly pgHealth: { stop: () => void }; + readonly lagSampler: { stop: () => void }; readonly logger: ReturnType; }; function installGracefulShutdown(deps: ShutdownDeps): void { - const { redis, pool, consumer, logger: log } = deps; + const { redis, pool, consumer, metricsServer, pgHealth, lagSampler, logger: log } = deps; let shuttingDown = false; @@ -139,11 +181,22 @@ function installGracefulShutdown(deps: ShutdownDeps): void { log.info({ signal }, 'shutdown signal received'); - // Stop consumer loop โ€” exits after the current batch finishes. + // Cancel background intervals first โ€” they hold no resources that need + // draining, and stopping them early prevents spurious log noise during + // the shutdown sequence. + lagSampler.stop(); + pgHealth.stop(); + consumer .stop() .then(() => { log.info('consumer stopped'); + return new Promise((resolve, reject) => + metricsServer.close((err) => (err ? reject(err) : resolve())), + ); + }) + .then(() => { + log.info('metrics server closed'); return redis.quit(); }) .then(() => { diff --git a/src/observability/metrics.ts b/src/observability/metrics.ts new file mode 100644 index 0000000..98ccb9e --- /dev/null +++ b/src/observability/metrics.ts @@ -0,0 +1,450 @@ +import * as http from 'node:http'; +import { + Registry, + Counter, + Gauge, + Histogram, + collectDefaultMetrics, +} from 'prom-client'; +import type { Redis } from 'ioredis'; +import type pg from 'pg'; +import type { Metrics } from '../core/types.js'; + +// --------------------------------------------------------------------------- +// Readiness probe dependencies โ€” injected so this module has no direct +// dependency on Redis or Postgres clients. The caller wires the closures. +// --------------------------------------------------------------------------- + +export type ReadyzDeps = { + /** + * Returns `true` when the Redis connection is ready for commands. + * Typically: `() => redis.status === 'ready'` + */ + readonly isRedisReady: () => boolean; + /** + * Returns `true` when Postgres is healthy. + * Implemented as a cached `SELECT 1` (see createPostgresHealthCheck). + */ + readonly isPostgresReady: () => boolean; +}; + +// --------------------------------------------------------------------------- +// Internal metric registry type โ€” one typed field per metric in the inventory. +// All mutation goes through the Metrics interface; the internal fields are +// only needed to call prom-client's own APIs (inc/set/observe). +// --------------------------------------------------------------------------- + +type InternalRegistry = { + readonly registry: Registry; + readonly consumerReadsTotal: Counter; + readonly consumerRecordsTotal: Counter; + readonly consumerLag: Gauge; + readonly decodeErrorsTotal: Counter; + readonly positionWritesTotal: Counter; + readonly positionWriteDurationSeconds: Histogram; + readonly acksTotal: Counter; + readonly deviceStateSizeGauge: Gauge; + readonly deviceStateEvictionsTotal: Counter; +}; + +// --------------------------------------------------------------------------- +// createMetrics โ€” builds the full prom-client registry and returns a Metrics +// wrapper that satisfies the existing call-site interface. +// --------------------------------------------------------------------------- + +/** + * Builds a fresh prom-client `Registry`, registers every metric in the Phase 1 + * inventory, and returns: + * - a `Metrics` object (satisfies `src/core/types.ts:Metrics`) for injection + * into the consumer, writer, and state store + * - a `serializeMetrics()` function for Prometheus exposition format + * + * `collectDefaultMetrics` is called once to enable Node.js process metrics + * (GC, event loop lag, heap stats, etc.) under the same registry. + */ +export function createMetrics(): Metrics & { + serializeMetrics: () => Promise; +} { + const internal = buildInternalRegistry(); + + // Expose default Node.js process metrics (nodejs_*) on the same registry. + collectDefaultMetrics({ register: internal.registry }); + + const metricsImpl: Metrics & { serializeMetrics: () => Promise } = { + inc(name: string, labels?: Record): void { + dispatchInc(internal, name, labels); + }, + + observe(name: string, value: number, labels?: Record): void { + dispatchObserve(internal, name, value, labels); + }, + + serializeMetrics(): Promise { + return internal.registry.metrics(); + }, + }; + + return metricsImpl; +} + +// --------------------------------------------------------------------------- +// startMetricsServer โ€” minimal node:http server for /metrics, /healthz, /readyz +// --------------------------------------------------------------------------- + +/** + * Starts the Prometheus metrics HTTP server on the given port. + * + * Endpoints: + * GET /metrics โ€” Prometheus exposition format (text/plain; version=0.0.4) + * GET /healthz โ€” 200 if the process is alive (liveness probe) + * GET /readyz โ€” 200 if Redis is connected AND Postgres is healthy; + * 503 otherwise (readiness probe) + * + * @param port Port to bind; 0 lets the OS pick (useful in tests). + * @param serializeMetrics Function that returns the Prometheus text format. + * @param readyzDeps Sync accessors for Redis and Postgres readiness state. + */ +export function startMetricsServer( + port: number, + serializeMetrics: () => Promise, + readyzDeps: ReadyzDeps, +): http.Server { + const server = http.createServer((req, res) => { + const url = req.url ?? '/'; + const method = req.method ?? 'GET'; + + // Reject non-GET requests for all endpoints. + if (method !== 'GET') { + res.writeHead(405, { 'Content-Type': 'text/plain' }); + res.end('Method Not Allowed'); + return; + } + + if (url === '/metrics') { + serializeMetrics() + .then((text) => { + res.writeHead(200, { 'Content-Type': 'text/plain; version=0.0.4; charset=utf-8' }); + res.end(text); + }) + .catch((err: unknown) => { + res.writeHead(500, { 'Content-Type': 'text/plain' }); + res.end(`Internal Server Error: ${err instanceof Error ? err.message : String(err)}`); + }); + return; + } + + if (url === '/healthz') { + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ status: 'ok' })); + return; + } + + if (url === '/readyz') { + const redisOk = readyzDeps.isRedisReady(); + const postgresOk = readyzDeps.isPostgresReady(); + + if (redisOk && postgresOk) { + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ status: 'ok' })); + } else { + res.writeHead(503, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ status: 'not ready', redis: redisOk, postgres: postgresOk })); + } + return; + } + + res.writeHead(404, { 'Content-Type': 'text/plain' }); + res.end('Not Found'); + }); + + server.listen(port); + return server; +} + +// --------------------------------------------------------------------------- +// createPostgresHealthCheck โ€” cached SELECT 1 for /readyz +// +// Runs a SELECT 1 against the pool at most once every CACHE_TTL_MS (5 s). +// The last known result is served synchronously on /readyz calls โ€” no query +// pressure per HTTP request. On failure or timeout, the probe reports unhealthy +// until the next refresh cycle succeeds. +// --------------------------------------------------------------------------- + +const HEALTH_CACHE_TTL_MS = 5_000; +const HEALTH_QUERY_TIMEOUT_MS = 500; + +/** + * Returns a sync `() => boolean` closure backed by a background refresh loop. + * Call `stop()` to cancel the interval during graceful shutdown. + */ +export function createPostgresHealthCheck( + pool: pg.Pool, +): { isReady: () => boolean; stop: () => void } { + let healthy = false; + + async function probe(): Promise { + const client = await Promise.race([ + pool.connect(), + new Promise((_, reject) => + setTimeout(() => reject(new Error('health probe connect timeout')), HEALTH_QUERY_TIMEOUT_MS), + ), + ]); + + try { + await Promise.race([ + client.query('SELECT 1'), + new Promise((_, reject) => + setTimeout( + () => reject(new Error('health probe query timeout')), + HEALTH_QUERY_TIMEOUT_MS, + ), + ), + ]); + healthy = true; + } finally { + client.release(); + } + } + + // Run immediately on startup, then every CACHE_TTL_MS. + probe().catch(() => { + healthy = false; + }); + + const interval = setInterval(() => { + probe().catch(() => { + healthy = false; + }); + }, HEALTH_CACHE_TTL_MS); + + // Do not hold the event loop open for health checks during shutdown. + interval.unref(); + + return { + isReady: () => healthy, + stop: () => clearInterval(interval), + }; +} + +// --------------------------------------------------------------------------- +// createConsumerLagSampler โ€” samples XINFO GROUPS for the lag gauge +// --------------------------------------------------------------------------- + +/** + * Starts a background `setInterval` that samples consumer lag every + * `intervalMs` (default 10 s) and calls `metrics.observe` with the result. + * + * Uses `XINFO GROUPS ` โ†’ `lag` field (Redis 7.2+). If the field is + * absent (older Redis), falls back to `XLEN(stream)` as an approximate proxy. + * + * Failures are logged at `debug` and do not interrupt the consumer. + */ +export function createConsumerLagSampler( + redis: Redis, + stream: string, + group: string, + metrics: Metrics, + onDebug: (msg: string) => void, + intervalMs = 10_000, +): { stop: () => void } { + async function sample(): Promise { + try { + // XINFO GROUPS returns an array of flat arrays: [field, value, ...] + // for each group. ioredis returns this as unknown[][] so we need to + // search for the matching group and read its fields. + const rawGroups = await redis.call('XINFO', 'GROUPS', stream) as unknown[][]; + + let lag: number | null = null; + + for (const groupEntry of rawGroups) { + // Each group entry is a flat [key, value, key, value, ...] array. + if (!Array.isArray(groupEntry)) continue; + + // Find the group name first. + const nameIdx = groupEntry.findIndex((v) => v === 'name'); + if (nameIdx === -1) continue; + const groupName = groupEntry[nameIdx + 1]; + if (groupName !== group) continue; + + // Try to read the `lag` field (Redis 7.2+). + const lagIdx = groupEntry.findIndex((v) => v === 'lag'); + if (lagIdx !== -1) { + const lagValue = groupEntry[lagIdx + 1]; + if (typeof lagValue === 'number') { + lag = lagValue; + } + } + + if (lag === null) { + // Fallback: XLEN gives total stream length โ€” when the group is fully + // caught up this equals lag (since delivered-but-unacked = PEL size, + // but PEL is hard to subtract cleanly here). This is labelled + // "approximate" via the metric help text. + const xlenResult = await redis.xlen(stream); + lag = xlenResult; + } + + break; + } + + if (lag !== null) { + metrics.observe('processor_consumer_lag', lag); + } + } catch (err: unknown) { + onDebug( + `consumer lag sampling failed: ${err instanceof Error ? err.message : String(err)}`, + ); + } + } + + // First sample runs immediately. + sample().catch(() => { + // Already handled inside sample() โ€” swallow at the outer level so the + // unhandled-rejection handler is not triggered. + }); + + const interval = setInterval(() => { + sample().catch(() => {}); + }, intervalMs); + + interval.unref(); + + return { stop: () => clearInterval(interval) }; +} + +// --------------------------------------------------------------------------- +// Private: registry construction +// --------------------------------------------------------------------------- + +function buildInternalRegistry(): InternalRegistry { + const registry = new Registry(); + + const consumerReadsTotal = new Counter({ + name: 'processor_consumer_reads_total', + help: 'XREADGROUP calls. result=ok|empty|error. empty = BLOCK timeout, error = client error.', + labelNames: ['result'], + registers: [registry], + }); + + const consumerRecordsTotal = new Counter({ + name: 'processor_consumer_records_total', + help: 'Total records pulled off the stream.', + registers: [registry], + }); + + const consumerLag = new Gauge({ + name: 'processor_consumer_lag', + help: 'Consumer group lag (XINFO GROUPS lag field, Redis 7.2+; falls back to approximate XLEN when field absent).', + registers: [registry], + }); + + const decodeErrorsTotal = new Counter({ + name: 'processor_decode_errors_total', + help: 'Records that failed to decode (malformed payload or sentinel error).', + registers: [registry], + }); + + const positionWritesTotal = new Counter({ + name: 'processor_position_writes_total', + help: 'Per-record write outcomes. status=inserted|duplicate|failed.', + labelNames: ['status'], + registers: [registry], + }); + + const positionWriteDurationSeconds = new Histogram({ + name: 'processor_position_write_duration_seconds', + help: 'Per-batch Postgres write latency.', + buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5], + registers: [registry], + }); + + const acksTotal = new Counter({ + name: 'processor_acks_total', + help: 'Total stream entry IDs ACKed.', + registers: [registry], + }); + + const deviceStateSizeGauge = new Gauge({ + name: 'processor_device_state_size', + help: 'Current count of devices in the in-memory LRU state map.', + registers: [registry], + }); + + const deviceStateEvictionsTotal = new Counter({ + name: 'processor_device_state_evictions_total', + help: 'Total LRU evictions from the device state map since start.', + registers: [registry], + }); + + return { + registry, + consumerReadsTotal, + consumerRecordsTotal, + consumerLag, + decodeErrorsTotal, + positionWritesTotal, + positionWriteDurationSeconds, + acksTotal, + deviceStateSizeGauge, + deviceStateEvictionsTotal, + }; +} + +// --------------------------------------------------------------------------- +// Private: dispatch helpers โ€” map string metric names to typed prom-client calls +// --------------------------------------------------------------------------- + +function dispatchInc( + r: InternalRegistry, + name: string, + labels?: Record, +): void { + switch (name) { + case 'processor_consumer_reads_total': + r.consumerReadsTotal.inc(labels ?? {}); + break; + case 'processor_consumer_records_total': + r.consumerRecordsTotal.inc(); + break; + case 'processor_decode_errors_total': + r.decodeErrorsTotal.inc(); + break; + case 'processor_position_writes_total': + r.positionWritesTotal.inc(labels ?? {}); + break; + case 'processor_acks_total': + r.acksTotal.inc(); + break; + case 'processor_device_state_evictions_total': + r.deviceStateEvictionsTotal.inc(); + break; + default: + // Unknown metric name โ€” silently ignore. This preserves the contract + // that the Metrics interface never throws, and avoids crashing the + // process when a call site references a metric not yet in the registry + // (e.g. staged rollouts or future tasks). + break; + } +} + +function dispatchObserve( + r: InternalRegistry, + name: string, + value: number, + _labels?: Record, +): void { + switch (name) { + case 'processor_position_write_duration_seconds': + r.positionWriteDurationSeconds.observe(value); + break; + case 'processor_consumer_lag': + r.consumerLag.set(value); + break; + case 'processor_device_state_size': + r.deviceStateSizeGauge.set(value); + break; + default: + // Unknown metric name โ€” silently ignore (see dispatchInc comment). + break; + } +} diff --git a/test/metrics.test.ts b/test/metrics.test.ts new file mode 100644 index 0000000..3183ac6 --- /dev/null +++ b/test/metrics.test.ts @@ -0,0 +1,328 @@ +/** + * Unit tests for src/observability/metrics.ts + * + * Covers: + * - createMetrics(): Prometheus exposition format contains all Phase 1 metrics + * - Counter increments via metrics.inc() + * - Histogram observation via metrics.observe() + * - Gauge set via metrics.observe() for processor_consumer_lag and processor_device_state_size + * - Unknown metric name is silently ignored (no throw) + * - startMetricsServer(): GET /metrics returns 200 with text/plain + * - startMetricsServer(): GET /healthz returns 200 {"status":"ok"} + * - startMetricsServer(): GET /readyz returns 200 when both deps are ready + * - startMetricsServer(): GET /readyz returns 503 when Redis is not ready + * - startMetricsServer(): GET /readyz returns 503 when Postgres is not ready + * - startMetricsServer(): GET /readyz returns 503 when neither dep is ready + * - startMetricsServer(): non-GET method returns 405 + * - startMetricsServer(): unknown path returns 404 + * - nodejs_* default metrics are present in the exposition output + */ + +import { describe, it, expect, beforeAll, afterAll, vi } from 'vitest'; +import * as http from 'node:http'; +import { createMetrics, startMetricsServer } from '../src/observability/metrics.js'; +import type { ReadyzDeps } from '../src/observability/metrics.js'; + +// --------------------------------------------------------------------------- +// HTTP helper โ€” makes a simple GET (or other method) against the test server +// --------------------------------------------------------------------------- + +function httpGet( + port: number, + path: string, + method = 'GET', +): Promise<{ statusCode: number; body: string; contentType: string }> { + return new Promise((resolve, reject) => { + const req = http.request({ hostname: '127.0.0.1', port, path, method }, (res) => { + let body = ''; + res.on('data', (chunk: Buffer) => { + body += chunk.toString(); + }); + res.on('end', () => { + resolve({ + statusCode: res.statusCode ?? 0, + body, + contentType: (res.headers['content-type'] as string | undefined) ?? '', + }); + }); + }); + req.on('error', reject); + req.end(); + }); +} + +// --------------------------------------------------------------------------- +// createMetrics tests +// --------------------------------------------------------------------------- + +describe('createMetrics โ€” exposition format', () => { + it('returns valid Prometheus text format containing all Phase 1 metrics', async () => { + const metrics = createMetrics(); + const text = await metrics.serializeMetrics(); + + // Every metric from the task 1.9 inventory must appear in the output. + expect(text).toContain('processor_consumer_reads_total'); + expect(text).toContain('processor_consumer_records_total'); + expect(text).toContain('processor_consumer_lag'); + expect(text).toContain('processor_decode_errors_total'); + expect(text).toContain('processor_position_writes_total'); + expect(text).toContain('processor_position_write_duration_seconds'); + expect(text).toContain('processor_acks_total'); + expect(text).toContain('processor_device_state_size'); + expect(text).toContain('processor_device_state_evictions_total'); + + // Default Node.js process metrics must be present. + expect(text).toContain('nodejs_'); + }); + + it('label-less counters appear in the exposition at 0 before any inc() call', async () => { + const metrics = createMetrics(); + const text = await metrics.serializeMetrics(); + + // prom-client emits label-less counters at 0 from the start. + // Counters with label dims only appear once .inc() is called with a label value. + expect(text).toMatch(/processor_consumer_records_total\s+0/); + expect(text).toMatch(/processor_decode_errors_total\s+0/); + expect(text).toMatch(/processor_acks_total\s+0/); + expect(text).toMatch(/processor_device_state_evictions_total\s+0/); + }); +}); + +describe('createMetrics โ€” counter increments', () => { + it('increments processor_consumer_reads_total with label', async () => { + const metrics = createMetrics(); + metrics.inc('processor_consumer_reads_total', { result: 'ok' }); + metrics.inc('processor_consumer_reads_total', { result: 'ok' }); + metrics.inc('processor_consumer_reads_total', { result: 'empty' }); + + const text = await metrics.serializeMetrics(); + // result="ok" incremented twice + expect(text).toMatch(/processor_consumer_reads_total\{result="ok"\} 2/); + // result="empty" incremented once + expect(text).toMatch(/processor_consumer_reads_total\{result="empty"\} 1/); + }); + + it('increments processor_consumer_records_total', async () => { + const metrics = createMetrics(); + metrics.inc('processor_consumer_records_total'); + metrics.inc('processor_consumer_records_total'); + metrics.inc('processor_consumer_records_total'); + + const text = await metrics.serializeMetrics(); + expect(text).toMatch(/processor_consumer_records_total\s+3/); + }); + + it('increments processor_decode_errors_total', async () => { + const metrics = createMetrics(); + metrics.inc('processor_decode_errors_total'); + + const text = await metrics.serializeMetrics(); + expect(text).toMatch(/processor_decode_errors_total\s+1/); + }); + + it('increments processor_position_writes_total with status label', async () => { + const metrics = createMetrics(); + metrics.inc('processor_position_writes_total', { status: 'inserted' }); + metrics.inc('processor_position_writes_total', { status: 'duplicate' }); + metrics.inc('processor_position_writes_total', { status: 'failed' }); + + const text = await metrics.serializeMetrics(); + expect(text).toMatch(/processor_position_writes_total\{status="inserted"\} 1/); + expect(text).toMatch(/processor_position_writes_total\{status="duplicate"\} 1/); + expect(text).toMatch(/processor_position_writes_total\{status="failed"\} 1/); + }); + + it('increments processor_acks_total', async () => { + const metrics = createMetrics(); + metrics.inc('processor_acks_total'); + metrics.inc('processor_acks_total'); + + const text = await metrics.serializeMetrics(); + expect(text).toMatch(/processor_acks_total\s+2/); + }); + + it('increments processor_device_state_evictions_total', async () => { + const metrics = createMetrics(); + metrics.inc('processor_device_state_evictions_total'); + + const text = await metrics.serializeMetrics(); + expect(text).toMatch(/processor_device_state_evictions_total\s+1/); + }); + + it('silently ignores unknown metric names', () => { + const metrics = createMetrics(); + // Must not throw + expect(() => metrics.inc('no_such_metric_total')).not.toThrow(); + expect(() => metrics.observe('no_such_metric', 42)).not.toThrow(); + }); +}); + +describe('createMetrics โ€” gauge and histogram', () => { + it('sets processor_consumer_lag via observe()', async () => { + const metrics = createMetrics(); + metrics.observe('processor_consumer_lag', 42); + + const text = await metrics.serializeMetrics(); + expect(text).toMatch(/processor_consumer_lag\s+42/); + }); + + it('sets processor_device_state_size via observe()', async () => { + const metrics = createMetrics(); + metrics.observe('processor_device_state_size', 7); + + const text = await metrics.serializeMetrics(); + expect(text).toMatch(/processor_device_state_size\s+7/); + }); + + it('records processor_position_write_duration_seconds histogram observation', async () => { + const metrics = createMetrics(); + metrics.observe('processor_position_write_duration_seconds', 0.007); + + const text = await metrics.serializeMetrics(); + // Histogram emits _bucket, _sum, _count lines. + expect(text).toContain('processor_position_write_duration_seconds_sum'); + expect(text).toContain('processor_position_write_duration_seconds_count 1'); + }); + + it('histogram buckets include all spec-defined breakpoints', async () => { + const metrics = createMetrics(); + const text = await metrics.serializeMetrics(); + + // Spec buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5] + const expectedBuckets = ['0.001', '0.005', '0.01', '0.05', '0.1', '0.5', '1', '5']; + for (const bucket of expectedBuckets) { + expect(text).toContain(`le="${bucket}"`); + } + }); +}); + +// --------------------------------------------------------------------------- +// startMetricsServer tests +// --------------------------------------------------------------------------- + +describe('startMetricsServer โ€” HTTP endpoints', () => { + let server: http.Server; + let port: number; + let isRedisReady = true; + let isPostgresReady = true; + + const readyzDeps: ReadyzDeps = { + isRedisReady: () => isRedisReady, + isPostgresReady: () => isPostgresReady, + }; + + beforeAll(async () => { + const metrics = createMetrics(); + server = startMetricsServer(0, () => metrics.serializeMetrics(), readyzDeps); + // Wait for the server to bind a port (port=0 lets OS pick) + await new Promise((resolve) => { + if (server.listening) { + resolve(); + } else { + server.once('listening', () => resolve()); + } + }); + const addr = server.address(); + port = typeof addr === 'object' && addr !== null ? addr.port : 0; + }); + + afterAll(async () => { + await new Promise((resolve, reject) => + server.close((err) => (err ? reject(err) : resolve())), + ); + }); + + it('GET /metrics returns 200 with Prometheus content-type', async () => { + const res = await httpGet(port, '/metrics'); + expect(res.statusCode).toBe(200); + expect(res.contentType).toMatch('text/plain'); + expect(res.body).toContain('processor_consumer_reads_total'); + }); + + it('GET /healthz returns 200 with {"status":"ok"}', async () => { + const res = await httpGet(port, '/healthz'); + expect(res.statusCode).toBe(200); + expect(JSON.parse(res.body)).toEqual({ status: 'ok' }); + }); + + it('GET /readyz returns 200 when both Redis and Postgres are ready', async () => { + isRedisReady = true; + isPostgresReady = true; + const res = await httpGet(port, '/readyz'); + expect(res.statusCode).toBe(200); + expect(JSON.parse(res.body)).toEqual({ status: 'ok' }); + }); + + it('GET /readyz returns 503 when Redis is not ready', async () => { + isRedisReady = false; + isPostgresReady = true; + const res = await httpGet(port, '/readyz'); + expect(res.statusCode).toBe(503); + const body = JSON.parse(res.body) as { status: string; redis: boolean; postgres: boolean }; + expect(body.status).toBe('not ready'); + expect(body.redis).toBe(false); + expect(body.postgres).toBe(true); + isRedisReady = true; + }); + + it('GET /readyz returns 503 when Postgres is not ready', async () => { + isRedisReady = true; + isPostgresReady = false; + const res = await httpGet(port, '/readyz'); + expect(res.statusCode).toBe(503); + const body = JSON.parse(res.body) as { status: string; redis: boolean; postgres: boolean }; + expect(body.status).toBe('not ready'); + expect(body.redis).toBe(true); + expect(body.postgres).toBe(false); + isPostgresReady = true; + }); + + it('GET /readyz returns 503 when both Redis and Postgres are not ready', async () => { + isRedisReady = false; + isPostgresReady = false; + const res = await httpGet(port, '/readyz'); + expect(res.statusCode).toBe(503); + const body = JSON.parse(res.body) as { status: string; redis: boolean; postgres: boolean }; + expect(body.redis).toBe(false); + expect(body.postgres).toBe(false); + isRedisReady = true; + isPostgresReady = true; + }); + + it('non-GET request returns 405', async () => { + const res = await httpGet(port, '/metrics', 'POST'); + expect(res.statusCode).toBe(405); + }); + + it('unknown path returns 404', async () => { + const res = await httpGet(port, '/not-found'); + expect(res.statusCode).toBe(404); + }); +}); + +describe('startMetricsServer โ€” /metrics error path', () => { + it('returns 500 when serializeMetrics rejects', async () => { + const serializeMetrics = vi.fn().mockRejectedValue(new Error('prom-client exploded')); + const server = startMetricsServer( + 0, + serializeMetrics, + { isRedisReady: () => true, isPostgresReady: () => true }, + ); + + await new Promise((resolve) => { + if (server.listening) resolve(); + else server.once('listening', () => resolve()); + }); + + const addr = server.address(); + const port = typeof addr === 'object' && addr !== null ? addr.port : 0; + + const res = await httpGet(port, '/metrics'); + expect(res.statusCode).toBe(500); + expect(res.body).toContain('prom-client exploded'); + + await new Promise((resolve, reject) => + server.close((err) => (err ? reject(err) : resolve())), + ); + }); +}); diff --git a/test/pipeline.integration.test.ts b/test/pipeline.integration.test.ts new file mode 100644 index 0000000..a384060 --- /dev/null +++ b/test/pipeline.integration.test.ts @@ -0,0 +1,413 @@ +/** + * Integration test: end-to-end pipeline round-trip via testcontainers. + * + * Spins up Redis 7 and TimescaleDB (timescale/timescaledb:latest-pg16) containers, + * runs the Processor migration, starts the consumer pipeline, publishes synthetic + * Position records, and asserts the resulting rows in `positions`. + * + * If Docker is unavailable (CI runner without Docker, local dev without Docker + * Desktop), the suite skips โ€” it does not fail the build. Docker availability is + * determined by a container start attempt in beforeAll; the skip flag is set once, + * and each `it` block early-returns when `!dockerAvailable`. + * + * WARNING: Do NOT replace the early-return skip pattern with a try/catch alone. + * A hang does not throw; only an explicit `!dockerAvailable` check per test + * guarantees that unavailable Docker exits cleanly (see tcp-ingestion history). + */ + +import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import { GenericContainer, type StartedTestContainer, Wait } from 'testcontainers'; +import type { Redis } from 'ioredis'; +import type pg from 'pg'; +import type { ConsumedRecord } from '../src/core/consumer.js'; +import { createConsumer, connectRedis, ensureConsumerGroup } from '../src/core/consumer.js'; +import { createWriter } from '../src/core/writer.js'; +import { createDeviceStateStore } from '../src/core/state.js'; +import { createPool, connectWithRetry } from '../src/db/pool.js'; +import { runMigrations } from '../src/db/migrate.js'; +import { createMetrics } from '../src/observability/metrics.js'; +import type { Config } from '../src/config/load.js'; +import type { Position } from '../src/core/types.js'; +import { vi } from 'vitest'; +import type { Logger } from 'pino'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeSilentLogger(): Logger { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + fatal: vi.fn(), + trace: vi.fn(), + child: vi.fn().mockReturnThis(), + level: 'silent', + silent: vi.fn(), + } as unknown as Logger; +} + +function makeConfig(overrides: Partial = {}): Config { + return { + NODE_ENV: 'test', + INSTANCE_ID: 'test-integration', + LOG_LEVEL: 'silent', + REDIS_URL: 'redis://localhost:6379', // overridden below with mapped port + POSTGRES_URL: 'postgres://postgres:postgres@localhost:5432/trm', // overridden below + REDIS_TELEMETRY_STREAM: 'telemetry:t', + REDIS_CONSUMER_GROUP: 'processor', + REDIS_CONSUMER_NAME: 'test-consumer', + METRICS_PORT: 0, + BATCH_SIZE: 100, + BATCH_BLOCK_MS: 500, + WRITE_BATCH_SIZE: 50, + DEVICE_STATE_LRU_CAP: 10_000, + ...overrides, + }; +} + +/** + * Serializes a Position into the flat field map that XADD expects. + * Mirrors tcp-ingestion's serializePosition format exactly: bigint โ†’ __bigint + * sentinel, Buffer โ†’ __buffer_b64 sentinel, Date โ†’ ISO string. + */ +function buildXaddFields(position: Position, codec: string): string[] { + function jsonReplacer(_key: string, value: unknown): unknown { + if (typeof value === 'bigint') return { __bigint: value.toString() }; + if (value instanceof Uint8Array) { + return { __buffer_b64: Buffer.from(value).toString('base64') }; + } + if (value instanceof Date) return value.toISOString(); + return value; + } + + const payload = JSON.stringify(position, jsonReplacer); + return [ + 'ts', position.timestamp.toISOString(), + 'device_id', position.device_id, + 'codec', codec, + 'payload', payload, + ]; +} + +/** + * Polls `fn` up to `timeoutMs` with `intervalMs` gaps until it returns a + * truthy result. Returns null if the timeout expires. + */ +async function pollUntil( + fn: () => Promise, + timeoutMs: number, + intervalMs = 200, +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const result = await fn(); + if (result !== null && result !== undefined) return result as T; + await new Promise((resolve) => setTimeout(resolve, intervalMs)); + } + return null; +} + +// --------------------------------------------------------------------------- +// Container and pipeline lifecycle +// --------------------------------------------------------------------------- + +let redisContainer: StartedTestContainer | null = null; +let pgContainer: StartedTestContainer | null = null; +let redisClient: Redis | null = null; +let pgPool: pg.Pool | null = null; +let consumer: { start: () => Promise; stop: () => Promise } | null = null; +let dockerAvailable = true; + +const STREAM = 'telemetry:t'; +const GROUP = 'processor'; + +beforeAll(async () => { + // --- Step 1: start Redis container ----------------------------------------- + try { + redisContainer = await new GenericContainer('redis:7-alpine') + .withExposedPorts(6379) + .withWaitStrategy(Wait.forLogMessage('Ready to accept connections')) + .start(); + } catch { + console.warn( + '[pipeline.integration.test] Docker not available โ€” skipping integration tests', + ); + dockerAvailable = false; + return; + } + + // --- Step 2: start TimescaleDB container ------------------------------------ + try { + pgContainer = await new GenericContainer('timescale/timescaledb:latest-pg16') + .withExposedPorts(5432) + .withEnvironment({ + POSTGRES_USER: 'postgres', + POSTGRES_PASSWORD: 'postgres', + POSTGRES_DB: 'trm', + }) + .withWaitStrategy(Wait.forLogMessage('database system is ready to accept connections', 2)) + .start(); + } catch (err) { + console.warn( + `[pipeline.integration.test] Failed to start TimescaleDB container: ${String(err)} โ€” skipping`, + ); + dockerAvailable = false; + await redisContainer?.stop().catch(() => {}); + redisContainer = null; + return; + } + + const redisHost = redisContainer.getHost(); + const redisPort = redisContainer.getMappedPort(6379); + const pgHost = pgContainer.getHost(); + const pgPort = pgContainer.getMappedPort(5432); + + const redisUrl = `redis://${redisHost}:${redisPort}`; + const postgresUrl = `postgres://postgres:postgres@${pgHost}:${pgPort}/trm`; + + const config = makeConfig({ REDIS_URL: redisUrl, POSTGRES_URL: postgresUrl }); + const logger = makeSilentLogger(); + + // --- Step 3: connect Redis -------------------------------------------------- + const { default: Redis } = await import('ioredis'); + const client = new Redis(redisUrl, { + enableOfflineQueue: false, + lazyConnect: true, + maxRetriesPerRequest: 0, + }); + await client.connect(); + redisClient = client; + + // --- Step 4: connect Postgres and run migrations --------------------------- + pgPool = createPool(postgresUrl); + await connectWithRetry(pgPool, logger); + await runMigrations(pgPool, logger); + + // --- Step 5: wire and start the consumer pipeline ------------------------- + const metrics = createMetrics(); + const state = createDeviceStateStore(config, logger); + const writer = createWriter(pgPool, config, logger, metrics); + + await ensureConsumerGroup(client, STREAM, GROUP, logger); + + const sink = async (records: ConsumedRecord[]): Promise => { + for (const record of records) { + state.update(record.position); + } + const results = await writer.write(records); + return results + .filter((r) => r.status === 'inserted' || r.status === 'duplicate') + .map((r) => r.id); + }; + + // Use connectRedis for the consumer's own connection (separate from the + // redisClient used for XADD in tests) so we mirror production topology. + const consumerRedis = await connectRedis(redisUrl, logger); + consumer = createConsumer(consumerRedis, config, logger, metrics, sink); + await consumer.start(); +}, 120_000); + +afterAll(async () => { + await consumer?.stop().catch(() => {}); + await redisClient?.quit().catch(() => {}); + await pgPool?.end().catch(() => {}); + await redisContainer?.stop().catch(() => {}); + await pgContainer?.stop().catch(() => {}); +}, 30_000); + +// --------------------------------------------------------------------------- +// Integration tests +// --------------------------------------------------------------------------- + +describe('pipeline integration โ€” full round-trip', () => { + // Test 1: happy-path with bigint + Buffer attributes + it('publishes a Position with bigint and Buffer attributes and verifies the row in positions', async () => { + if (!dockerAvailable || !redisClient || !pgPool) { + console.warn('[pipeline.integration.test] skipping test 1: Docker not available'); + return; + } + + const position: Position = { + device_id: '356307042441013', + timestamp: new Date('2024-06-15T12:00:00.000Z'), + latitude: 54.687157, + longitude: 25.279652, + altitude: 130, + angle: 90, + speed: 45, + satellites: 12, + priority: 0, + attributes: { + num_attr: 255, + big_attr: BigInt('18446744073709551615'), // u64 max + buf_attr: Buffer.from([0xde, 0xad, 0xbe, 0xef]), + }, + }; + + const fields = buildXaddFields(position, '8E'); + await redisClient.xadd(STREAM, '*', ...fields); + + // Poll until the row appears in positions (up to 10 s). + type Row = { + device_id: string; + ts: Date; + latitude: number; + longitude: number; + attributes: Record; + }; + + const row = await pollUntil(async () => { + const result = await pgPool!.query( + 'SELECT device_id, ts, latitude, longitude, attributes FROM positions WHERE device_id = $1 AND ts = $2', + [position.device_id, position.timestamp], + ); + return result.rows[0] ?? null; + }, 10_000); + + expect(row).not.toBeNull(); + expect(row!.device_id).toBe(position.device_id); + expect(row!.latitude).toBeCloseTo(position.latitude, 4); + expect(row!.longitude).toBeCloseTo(position.longitude, 4); + + // attributes JSONB: bigint stored as decimal string, Buffer as base64 string. + expect(typeof row!.attributes['big_attr']).toBe('string'); + expect(row!.attributes['big_attr']).toBe('18446744073709551615'); + + expect(typeof row!.attributes['buf_attr']).toBe('string'); + const decoded = Buffer.from(row!.attributes['buf_attr'] as string, 'base64'); + expect(decoded).toEqual(Buffer.from([0xde, 0xad, 0xbe, 0xef])); + + expect(row!.attributes['num_attr']).toBe(255); + }, 30_000); + + // Test 2: idempotency โ€” duplicate (device_id, ts) must not create a second row + it('does not create a duplicate row when the same (device_id, ts) is published twice', async () => { + if (!dockerAvailable || !redisClient || !pgPool) { + console.warn('[pipeline.integration.test] skipping test 2: Docker not available'); + return; + } + + const position: Position = { + device_id: 'DUP-DEVICE-001', + timestamp: new Date('2024-06-15T13:00:00.000Z'), + latitude: 1.0, + longitude: 2.0, + altitude: 10, + angle: 0, + speed: 0, + satellites: 4, + priority: 0, + attributes: {}, + }; + + const fields = buildXaddFields(position, '8'); + + // Publish the same position twice. + await redisClient.xadd(STREAM, '*', ...fields); + await redisClient.xadd(STREAM, '*', ...fields); + + // Wait long enough for both entries to be processed. + await new Promise((resolve) => setTimeout(resolve, 3_000)); + + const result = await pgPool.query<{ count: string }>( + 'SELECT COUNT(*) AS count FROM positions WHERE device_id = $1 AND ts = $2', + [position.device_id, position.timestamp], + ); + const count = parseInt(result.rows[0]?.count ?? '0', 10); + expect(count).toBe(1); + }, 30_000); + + // Test 3: malformed payload โ€” decode error counter increments, entry not ACKed + it('increments decode error counter and leaves malformed entry pending (not ACKed)', async () => { + if (!dockerAvailable || !redisClient || !pgPool) { + console.warn('[pipeline.integration.test] skipping test 3: Docker not available'); + return; + } + + // Push a stream entry with a broken payload (not valid JSON). + const badEntryId = await redisClient.xadd( + STREAM, + '*', + 'ts', new Date().toISOString(), + 'device_id', 'BAD-DEVICE', + 'codec', '8', + 'payload', 'NOT_VALID_JSON {{{', + ); + + // Wait for the consumer to attempt processing. + await new Promise((resolve) => setTimeout(resolve, 2_000)); + + // The entry should remain in the Pending Entry List (PEL) โ€” it was not ACKed. + const pendingResult = await redisClient.xpending( + STREAM, + GROUP, + '-', + '+', + '100', + ) as Array<[string, string, number, number]>; + + // Find the bad entry in the PEL. + const pendingIds = pendingResult.map(([id]) => id); + expect(pendingIds).toContain(badEntryId); + }, 30_000); + + // Test 4: writer failure โ†’ retry โ€” stop Postgres before publish, restart, verify row lands + it('retries and writes the row after Postgres recovers from a stopped state', async () => { + if (!dockerAvailable || !redisClient || !pgPool || !pgContainer) { + console.warn('[pipeline.integration.test] skipping test 4: Docker not available'); + return; + } + + const position: Position = { + device_id: 'RETRY-DEVICE-001', + timestamp: new Date('2024-06-15T14:00:00.000Z'), + latitude: 3.0, + longitude: 4.0, + altitude: 20, + angle: 45, + speed: 10, + satellites: 8, + priority: 1, + attributes: {}, + }; + + // Stop Postgres before publishing so the first write attempt fails. + await pgContainer.stop(); + + const fields = buildXaddFields(position, '8'); + await redisClient.xadd(STREAM, '*', ...fields); + + // Wait briefly โ€” the write should fail while Postgres is down. + await new Promise((resolve) => setTimeout(resolve, 1_500)); + + // Restart Postgres. + pgContainer = await pgContainer.restart(); + + // Wait a bit to ensure the new container is accepting connections before + // reconnecting. The pool will get fresh connections once the TCP stack + // accepts again. + await new Promise((resolve) => setTimeout(resolve, 3_000)); + + // The entry is still pending in the consumer's PEL; the next XREADGROUP + // poll will re-deliver it. The pipeline should eventually write it. + type Row = { device_id: string }; + const row = await pollUntil(async () => { + try { + const result = await pgPool!.query( + 'SELECT device_id FROM positions WHERE device_id = $1 AND ts = $2', + [position.device_id, position.timestamp], + ); + return result.rows[0] ?? null; + } catch { + // Pool may throw transiently while connections re-establish. + return null; + } + }, 20_000); + + expect(row).not.toBeNull(); + expect(row!.device_id).toBe(position.device_id); + }, 60_000); +});