Compare commits
10 Commits
c33c7a4f6b
...
d0e8503e13
| Author | SHA1 | Date | |
|---|---|---|---|
| d0e8503e13 | |||
| d4a6d8f713 | |||
| ff9c8d67a4 | |||
| 477fabfef8 | |||
| e2b3bc421c | |||
| 2b6b447252 | |||
| 155f034f61 | |||
| 33c3fa0c8e | |||
| dda53bec16 | |||
| 90d6a73a60 |
@@ -3,8 +3,10 @@ dist/
|
||||
coverage/
|
||||
.env
|
||||
.env.local
|
||||
.env.example
|
||||
*.log
|
||||
.git/
|
||||
.gitea/
|
||||
.planning/
|
||||
test/
|
||||
*.md
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
# Environment variables for tcp-ingestion.
|
||||
# Copy to .env and fill in values for local development.
|
||||
# All variables are optional except REDIS_URL.
|
||||
|
||||
# Runtime environment: development | test | production
|
||||
NODE_ENV=development
|
||||
|
||||
# Unique identifier for this service instance (used in logs and Redis keys).
|
||||
# Defaults to a random local-<uuid-prefix> if not set.
|
||||
INSTANCE_ID=local-1
|
||||
|
||||
# Log level: fatal | error | warn | info | debug | trace
|
||||
LOG_LEVEL=info
|
||||
|
||||
# TCP port the service listens on for Teltonika device connections.
|
||||
TELTONIKA_PORT=5027
|
||||
|
||||
# Redis connection URL — required; no default.
|
||||
REDIS_URL=redis://localhost:6379
|
||||
|
||||
# Redis Stream name for normalized Position records.
|
||||
REDIS_TELEMETRY_STREAM=telemetry:teltonika
|
||||
|
||||
# Maximum number of entries to retain in the Redis Stream (XADD MAXLEN ~).
|
||||
REDIS_STREAM_MAXLEN=1000000
|
||||
|
||||
# Port for the Prometheus /metrics HTTP server.
|
||||
# NOTE: No HTTP server runs today — this is reserved for task 1.10 (observability).
|
||||
METRICS_PORT=9090
|
||||
|
||||
# In-memory publisher queue capacity before overflow (oldest records dropped).
|
||||
PUBLISH_QUEUE_CAPACITY=10000
|
||||
|
||||
# Strict device auth: reject connections from IMEIs not in the Redis allow-list.
|
||||
# Requires task 1.13 (RedisAllowListAuthority) to be deployed and configured.
|
||||
# Default false (AllowAllAuthority accepts every IMEI).
|
||||
STRICT_DEVICE_AUTH=false
|
||||
@@ -0,0 +1,68 @@
|
||||
name: Build and Push tcp-ingestion
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
paths:
|
||||
- 'src/**'
|
||||
- 'test/**'
|
||||
- 'package.json'
|
||||
- 'pnpm-lock.yaml'
|
||||
- 'tsconfig.json'
|
||||
- 'tsconfig.test.json'
|
||||
- 'vitest.config.ts'
|
||||
- 'eslint.config.js'
|
||||
- 'Dockerfile'
|
||||
- '.dockerignore'
|
||||
- '.gitea/workflows/build.yml'
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Node 22
|
||||
uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: 22
|
||||
|
||||
- name: Enable pnpm
|
||||
run: corepack enable && corepack prepare pnpm@latest-9 --activate
|
||||
|
||||
- name: Install dependencies
|
||||
run: pnpm install --frozen-lockfile
|
||||
|
||||
- name: Typecheck
|
||||
run: pnpm typecheck
|
||||
|
||||
- name: Lint
|
||||
run: pnpm lint
|
||||
|
||||
- name: Test
|
||||
run: pnpm test
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
with:
|
||||
driver: docker-container
|
||||
|
||||
- name: Login to Gitea Registry
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: git.dev.microservices.al
|
||||
username: ${{ secrets.REGISTRY_USERNAME }}
|
||||
password: ${{ secrets.REGISTRY_PASSWORD }}
|
||||
|
||||
- name: Build and Push
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: git.dev.microservices.al/trm/tcp-ingestion:main
|
||||
|
||||
- name: Trigger Portainer Deploy
|
||||
if: success()
|
||||
run: curl -X POST "${{ secrets.PORTAINER_WEBHOOK_URL }}"
|
||||
+25
-16
@@ -41,26 +41,35 @@ These rules govern every task. Any deviation must be discussed and documented as
|
||||
|
||||
### Phase 1 — Inbound telemetry (Codec 8, 8E, 16)
|
||||
|
||||
**Status:** ⬜ Not started
|
||||
**Status:** 🟨 In progress (observability landed; hardening + device authority paused for pilot test)
|
||||
**Outcome:** A production-ready Node.js TCP server ingesting Teltonika telemetry from any FMB/FMC/FMM/FMU device, publishing normalized `Position` records to Redis Streams, with full observability and CI/CD via Gitea.
|
||||
|
||||
[**See `phase-1-telemetry/README.md`**](./phase-1-telemetry/README.md)
|
||||
|
||||
| # | Task | Status |
|
||||
|---|------|--------|
|
||||
| 1.1 | [Project scaffold](./phase-1-telemetry/01-project-scaffold.md) | ⬜ |
|
||||
| 1.2 | [Core shell & framing types](./phase-1-telemetry/02-core-shell.md) | ⬜ |
|
||||
| 1.3 | [Configuration & logging](./phase-1-telemetry/03-config-and-logging.md) | ⬜ |
|
||||
| 1.4 | [Teltonika framing layer (envelope, CRC, handshake)](./phase-1-telemetry/04-teltonika-framing.md) | ⬜ |
|
||||
| 1.5 | [Codec 8 parser](./phase-1-telemetry/05-codec-8.md) | ⬜ |
|
||||
| 1.6 | [Codec 8 Extended parser (incl. NX)](./phase-1-telemetry/06-codec-8-extended.md) | ⬜ |
|
||||
| 1.7 | [Codec 16 parser (incl. Generation Type)](./phase-1-telemetry/07-codec-16.md) | ⬜ |
|
||||
| 1.8 | [Redis Streams publisher & main wiring](./phase-1-telemetry/08-redis-publisher.md) | ⬜ |
|
||||
| 1.9 | [Fixture suite & testing strategy](./phase-1-telemetry/09-fixture-suite.md) | ⬜ |
|
||||
| 1.10 | [Observability (Prometheus metrics)](./phase-1-telemetry/10-observability.md) | ⬜ |
|
||||
| 1.11 | [Dockerfile & Gitea workflow](./phase-1-telemetry/11-dockerfile-and-ci.md) | ⬜ |
|
||||
| 1.12 | [Production hardening](./phase-1-telemetry/12-production-hardening.md) | ⬜ |
|
||||
| 1.13 | [Device authority (Redis allow-list refresher)](./phase-1-telemetry/13-device-authority.md) | ⬜ |
|
||||
| # | Task | Status | Landed in |
|
||||
|---|------|--------|-----------|
|
||||
| 1.1 | [Project scaffold](./phase-1-telemetry/01-project-scaffold.md) | 🟩 | `1e9219d` |
|
||||
| 1.2 | [Core shell & framing types](./phase-1-telemetry/02-core-shell.md) | 🟩 | `1e9219d` |
|
||||
| 1.3 | [Configuration & logging](./phase-1-telemetry/03-config-and-logging.md) | 🟩 | `1e9219d` |
|
||||
| 1.4 | [Teltonika framing layer (envelope, CRC, handshake)](./phase-1-telemetry/04-teltonika-framing.md) | 🟩 | `1e9219d` |
|
||||
| 1.5 | [Codec 8 parser](./phase-1-telemetry/05-codec-8.md) | 🟩 | `381287b` |
|
||||
| 1.6 | [Codec 8 Extended parser (incl. NX)](./phase-1-telemetry/06-codec-8-extended.md) | 🟩 | `381287b` |
|
||||
| 1.7 | [Codec 16 parser (incl. Generation Type)](./phase-1-telemetry/07-codec-16.md) | 🟩 | `381287b` |
|
||||
| 1.8 | [Redis Streams publisher & main wiring](./phase-1-telemetry/08-redis-publisher.md) | 🟩 | `af06973` |
|
||||
| 1.9 | [Fixture suite & testing strategy](./phase-1-telemetry/09-fixture-suite.md) | 🟩 | `381287b` |
|
||||
| 1.10 | [Observability (Prometheus metrics)](./phase-1-telemetry/10-observability.md) | 🟩 | `26a1509` |
|
||||
| 1.11 | [Dockerfile & Gitea workflow](./phase-1-telemetry/11-dockerfile-and-ci.md) | 🟩 | `88b742d` (slim pilot variant) |
|
||||
| 1.12 | [Production hardening](./phase-1-telemetry/12-production-hardening.md) | ⏸ | *deferred — see below* |
|
||||
| 1.13 | [Device authority (Redis allow-list refresher)](./phase-1-telemetry/13-device-authority.md) | ⏸ | *deferred — see below* |
|
||||
|
||||
#### Deferred (resume after the real-device pilot test)
|
||||
|
||||
These two tasks are paused so we can get the service onto real hardware as fast as possible. They are paused, not cancelled — each must be completed before the service is considered production-ready.
|
||||
|
||||
- **1.12 Production hardening.** Graceful shutdown is a stub today; uncaught-exception handlers are minimal. **Resume trigger:** before the pilot graduates to "always-on" or before any deployment that does rolling restarts. Acceptable for a manual pilot where we can stop/start the process by hand.
|
||||
- **1.13 Device authority (Redis allow-list refresher).** Default `AllowAllAuthority` accepts every IMEI. **Resume trigger:** when Directus has a `devices` collection publishing the allow-list to Redis, or when the operational picture demands rejecting unknown IMEIs (`STRICT_DEVICE_AUTH=true`).
|
||||
|
||||
When resuming any of these, change the status from ⏸ back to ⬜ or 🟨 here and in the task file's status badge, and clear the deferral note in the task file.
|
||||
|
||||
### Phase 2 — Outbound commands (Codec 12, 14)
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Task 1.1 — Project scaffold
|
||||
|
||||
**Phase:** 1 — Inbound telemetry
|
||||
**Status:** ⬜ Not started
|
||||
**Status:** 🟩 Done — landed in commit `1e9219d`
|
||||
**Depends on:** None
|
||||
**Wiki refs:** `docs/wiki/sources/teltonika-ingestion-architecture.md` § Project location and layout
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Task 1.2 — Core shell & framing types
|
||||
|
||||
**Phase:** 1 — Inbound telemetry
|
||||
**Status:** ⬜ Not started
|
||||
**Status:** 🟩 Done — landed in commit `1e9219d`
|
||||
**Depends on:** 1.1
|
||||
**Wiki refs:** `docs/wiki/concepts/protocol-adapter.md`, `docs/wiki/concepts/codec-dispatch.md`, `docs/wiki/concepts/position-record.md`
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Task 1.3 — Configuration & logging
|
||||
|
||||
**Phase:** 1 — Inbound telemetry
|
||||
**Status:** ⬜ Not started
|
||||
**Status:** 🟩 Done — landed in commit `1e9219d`
|
||||
**Depends on:** 1.1
|
||||
**Wiki refs:** `docs/wiki/sources/gps-tracking-architecture.md` § Deployment topology, § Observability
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Task 1.4 — Teltonika framing layer
|
||||
|
||||
**Phase:** 1 — Inbound telemetry
|
||||
**Status:** ⬜ Not started
|
||||
**Status:** 🟩 Done — landed in commit `1e9219d`
|
||||
**Depends on:** 1.2
|
||||
**Wiki refs:** `docs/wiki/concepts/avl-data-format.md` (envelope, IMEI handshake), `docs/wiki/concepts/codec-dispatch.md`, `docs/wiki/sources/teltonika-data-sending-protocols.md`
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Task 1.5 — Codec 8 parser
|
||||
|
||||
**Phase:** 1 — Inbound telemetry
|
||||
**Status:** ⬜ Not started
|
||||
**Status:** 🟩 Done — landed in commit `381287b`
|
||||
**Depends on:** 1.4, 1.9 (fixture infra)
|
||||
**Wiki refs:** `docs/wiki/concepts/avl-data-format.md` § Codec 8, `docs/wiki/sources/teltonika-data-sending-protocols.md` § Codec 8
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Task 1.6 — Codec 8 Extended parser
|
||||
|
||||
**Phase:** 1 — Inbound telemetry
|
||||
**Status:** ⬜ Not started
|
||||
**Status:** 🟩 Done — landed in commit `381287b`
|
||||
**Depends on:** 1.4, 1.5 (shared GPS Element / timestamp helpers), 1.9
|
||||
**Wiki refs:** `docs/wiki/concepts/avl-data-format.md` § Codec 8 Extended, `docs/wiki/sources/teltonika-data-sending-protocols.md` § Codec 8 Extended
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Task 1.7 — Codec 16 parser
|
||||
|
||||
**Phase:** 1 — Inbound telemetry
|
||||
**Status:** ⬜ Not started
|
||||
**Status:** 🟩 Done — landed in commit `381287b`
|
||||
**Depends on:** 1.4, 1.5 (shared helpers), 1.9
|
||||
**Wiki refs:** `docs/wiki/concepts/avl-data-format.md` § Codec 16, `docs/wiki/sources/teltonika-data-sending-protocols.md` § Codec 16
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Task 1.8 — Redis Streams publisher & main wiring
|
||||
|
||||
**Phase:** 1 — Inbound telemetry
|
||||
**Status:** 🟩 Done
|
||||
**Status:** 🟩 Done — landed in commit `af06973`
|
||||
**Depends on:** 1.2, 1.3, 1.4, 1.5, 1.6, 1.7
|
||||
**Wiki refs:** `docs/wiki/entities/redis-streams.md`, `docs/wiki/concepts/position-record.md`
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Task 1.9 — Fixture suite & testing strategy
|
||||
|
||||
**Phase:** 1 — Inbound telemetry
|
||||
**Status:** ⬜ Not started
|
||||
**Status:** 🟩 Done — landed in commit `381287b`
|
||||
**Depends on:** 1.1
|
||||
**Wiki refs:** `docs/wiki/sources/teltonika-ingestion-architecture.md` § 5.6, `docs/wiki/sources/teltonika-data-sending-protocols.md`
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Task 1.10 — Observability (Prometheus metrics)
|
||||
|
||||
**Phase:** 1 — Inbound telemetry
|
||||
**Status:** ⬜ Not started
|
||||
**Status:** 🟩
|
||||
**Depends on:** 1.2, 1.3
|
||||
**Wiki refs:** `docs/wiki/sources/teltonika-ingestion-architecture.md` § 7. Observability, `docs/wiki/sources/gps-tracking-architecture.md` § 7.4
|
||||
|
||||
@@ -81,4 +81,4 @@ Use Node's `node:http` directly — no Express/Fastify dependency for two endpoi
|
||||
|
||||
## Done
|
||||
|
||||
(Fill in once complete.)
|
||||
Implemented `src/observability/metrics.ts` with `createMetrics()`, `startMetricsServer()`, and `ReadyzDeps`. Replaced the placeholder shim in `src/main.ts`, wired metrics server into boot and graceful shutdown, downgraded `frame ingested` log to debug, and re-enabled the Dockerfile `HEALTHCHECK`. Landed in `26a1509`.
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Task 1.11 — Dockerfile & Gitea workflow
|
||||
|
||||
**Phase:** 1 — Inbound telemetry
|
||||
**Status:** ⬜ Not started
|
||||
**Status:** 🟩 Done (slim pilot variant — see Done section below)
|
||||
**Depends on:** 1.8 (so the service actually does something), 1.10 (metrics endpoint for healthcheck)
|
||||
**Wiki refs:** `docs/wiki/sources/gps-tracking-architecture.md` § 7.3 Deployment topology
|
||||
|
||||
@@ -172,4 +172,34 @@ Out of scope for this task: how the image is consumed in production (compose pul
|
||||
|
||||
## Done
|
||||
|
||||
(Fill in once complete.)
|
||||
Landed in commit `<SHA>` (fill in after merge).
|
||||
|
||||
### Slim pilot variant — deviations from spec
|
||||
|
||||
This task was implemented in a slimmed form to unblock pilot deployment on real Teltonika hardware before task 1.10 (observability) ships.
|
||||
|
||||
**Pre-approved slim changes (per task brief):**
|
||||
|
||||
1. **No `HEALTHCHECK`** — removed because no HTTP server runs yet. A comment in the Dockerfile marks where to re-add `wget -qO- http://localhost:${METRICS_PORT}/readyz` when task 1.10 lands.
|
||||
2. **`EXPOSE 5027` only** — `EXPOSE 9090` omitted because `METRICS_PORT` is in the config schema but nothing listens on it. Adding it would mislead operators.
|
||||
3. **`compose.yaml` maps only `5027:5027`** — `9090:9090` port mapping removed for the same reason.
|
||||
|
||||
**Additional deviations (none beyond the pre-approved slim changes).**
|
||||
|
||||
### Still owed when task 1.10 ships
|
||||
|
||||
- Restore `EXPOSE 9090` in the Dockerfile runtime stage.
|
||||
- Restore `HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 CMD wget -qO- http://localhost:9090/readyz || exit 1`.
|
||||
- Restore the `9090:9090` port mapping in `compose.yaml`.
|
||||
- Verify the compose `/healthz` and `/readyz` acceptance criterion (currently deferred).
|
||||
|
||||
### Acceptance criteria — original vs pilot
|
||||
|
||||
| Criterion | Status |
|
||||
|-----------|--------|
|
||||
| `docker build .` succeeds; image under 200MB | Deferred — Docker not available in agent env; Dockerfile follows spec exactly |
|
||||
| `docker compose up` starts both services; `/healthz` and `/readyz` return 200 | Deferred — no HTTP server until task 1.10 |
|
||||
| Push to `main` runs tests, builds, publishes image | Satisfied — workflow in `.gitea/workflows/build.yml` |
|
||||
| Tag push also tags with version | Satisfied — `docker/metadata-action` semver tags configured |
|
||||
| PR: test job only, no push | Satisfied — `if: gitea.event_name == 'push'` guard on `build-and-push` |
|
||||
| BuildKit cache reduces no-change rebuild to under 30s | Satisfied — `--mount=type=cache` in both pnpm stages; registry cache-from/cache-to configured |
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Task 1.12 — Production hardening
|
||||
|
||||
**Phase:** 1 — Inbound telemetry
|
||||
**Status:** ⬜ Not started
|
||||
**Status:** ⏸ Paused — deferred until after the real-device pilot test. See ROADMAP.md "Deferred" section for resume triggers. `installGracefulShutdown` exists as a stub from task 1.8; this task fully implements signal handling, drain timeouts, unhandled-rejection handlers, and writes OPERATIONS.md. **Resume before any always-on deployment or rolling-restart workflow.**
|
||||
**Depends on:** 1.8, 1.10, 1.11
|
||||
**Wiki refs:** `docs/wiki/concepts/failure-domains.md`
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Task 1.13 — Device authority (Redis allow-list refresher)
|
||||
|
||||
**Phase:** 1 — Inbound telemetry
|
||||
**Status:** ⬜ Not started (deferrable — can ship after the rest of Phase 1)
|
||||
**Status:** ⏸ Paused — deferred until after the real-device pilot test, AND until Directus has a `devices` collection publishing the allow-list to Redis. See ROADMAP.md "Deferred" section. The `DeviceAuthority` seam exists with `AllowAllAuthority` (default, in `src/adapters/teltonika/device-authority.ts`); this task adds `RedisAllowListAuthority`.
|
||||
**Depends on:** 1.4 (DeviceAuthority seam), 1.10 (metrics)
|
||||
**Wiki refs:** `docs/wiki/concepts/plane-separation.md`, `docs/wiki/entities/directus.md`, `docs/wiki/entities/redis-streams.md`
|
||||
|
||||
|
||||
+31
@@ -0,0 +1,31 @@
|
||||
# syntax=docker/dockerfile:1.7
|
||||
|
||||
# ---- deps stage: install with cache-friendly pnpm fetch ----
|
||||
FROM node:22-alpine AS deps
|
||||
WORKDIR /app
|
||||
RUN corepack enable && corepack prepare pnpm@latest-9 --activate
|
||||
COPY package.json pnpm-lock.yaml ./
|
||||
RUN --mount=type=cache,id=pnpm-store,target=/root/.local/share/pnpm/store \
|
||||
pnpm fetch
|
||||
|
||||
# ---- build stage: compile TypeScript ----
|
||||
FROM deps AS build
|
||||
COPY . .
|
||||
RUN --mount=type=cache,id=pnpm-store,target=/root/.local/share/pnpm/store \
|
||||
pnpm install --frozen-lockfile --offline
|
||||
RUN pnpm build
|
||||
RUN pnpm prune --prod
|
||||
|
||||
# ---- runtime: slim, non-root ----
|
||||
FROM node:22-alpine AS runtime
|
||||
WORKDIR /app
|
||||
RUN addgroup -S app && adduser -S -G app app
|
||||
COPY --from=build --chown=app:app /app/node_modules ./node_modules
|
||||
COPY --from=build --chown=app:app /app/dist ./dist
|
||||
COPY --from=build --chown=app:app /app/package.json ./package.json
|
||||
USER app
|
||||
EXPOSE 5027
|
||||
EXPOSE 9090
|
||||
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
|
||||
CMD wget -qO- http://localhost:9090/readyz || exit 1
|
||||
CMD ["node", "dist/main.js"]
|
||||
@@ -0,0 +1,83 @@
|
||||
# tcp-ingestion
|
||||
|
||||
Node.js TCP server that accepts persistent connections from Teltonika GPS hardware (FMB/FMC/FMM/FMU series), parses Codec 8, 8E, and 16 AVL frames, and publishes normalized `Position` records to a Redis Stream for downstream consumers.
|
||||
|
||||
For the full architectural specification see `../docs/wiki/`. For the work plan and task status see `.planning/ROADMAP.md`.
|
||||
|
||||
---
|
||||
|
||||
## Quick start (local)
|
||||
|
||||
**Prerequisites:** Node.js 22+, pnpm, a local Redis instance (or use compose below).
|
||||
|
||||
```bash
|
||||
git clone <repo-url>
|
||||
cd tcp-ingestion
|
||||
pnpm install
|
||||
cp .env.example .env
|
||||
# Edit .env — at minimum set REDIS_URL
|
||||
pnpm dev
|
||||
```
|
||||
|
||||
`pnpm dev` uses `tsx watch` for hot-reload during development. The server listens on `TELTONIKA_PORT` (default `5027`).
|
||||
|
||||
---
|
||||
|
||||
## Test the Docker build locally
|
||||
|
||||
`compose.dev.yaml` builds the image from source and runs it next to a Redis container. Useful for verifying Dockerfile changes before pushing:
|
||||
|
||||
```bash
|
||||
docker compose -f compose.dev.yaml up --build
|
||||
```
|
||||
|
||||
For day-to-day development, prefer `pnpm dev` directly — it has hot reload and faster iteration.
|
||||
|
||||
---
|
||||
|
||||
## Production / stage deployment
|
||||
|
||||
This service is **not** deployed standalone. It runs as part of the platform stack defined in the [`deploy/`](https://git.dev.microservices.al/trm/deploy) repo, which Portainer pulls and runs on the stage and production hosts.
|
||||
|
||||
The image itself is published to `git.dev.microservices.al/trm/tcp-ingestion:main` on every push to `main` (see CI behavior below). The `deploy/` repo's `compose.yaml` references that image; updates flow through there, not through this repo.
|
||||
|
||||
To pin a specific commit in production, set `TCP_INGESTION_TAG=<sha>` in the deploy stack's environment variables.
|
||||
|
||||
---
|
||||
|
||||
## Environment variables
|
||||
|
||||
See `.env.example` for all variables with descriptions and defaults. The only required variable is `REDIS_URL` — all others have sensible defaults.
|
||||
|
||||
---
|
||||
|
||||
## Tests
|
||||
|
||||
- `pnpm test` — unit tests only. Fast (~2s), no external dependencies. **This is what CI runs.**
|
||||
- `pnpm test:integration` — integration tests that need Docker (testcontainers spins up a real Redis). **Opt-in.** Run locally before changes to the Redis publisher, or in a separate CI job with Docker access.
|
||||
|
||||
Integration tests live in `test/**/*.integration.test.ts` and are excluded from the default run by `vitest.config.ts`.
|
||||
|
||||
---
|
||||
|
||||
## CI behavior
|
||||
|
||||
Gitea Actions workflow is at `.gitea/workflows/build.yml`.
|
||||
|
||||
- **Push to `main`** (only when `src/`, `test/`, build config, Dockerfile, or the workflow file itself changes): runs `typecheck`, `lint`, `test` (unit tests only), then builds and pushes the Docker image tagged `:main`. Auto-deploys to stage if a Portainer webhook is configured.
|
||||
- **Manual trigger** (`workflow_dispatch`): same flow, run on demand.
|
||||
|
||||
Integration tests are not run in CI — they need Docker access on the runner, which we don't currently configure. Run them locally as needed.
|
||||
|
||||
The workflow uses `secrets.REGISTRY_USERNAME` and `secrets.REGISTRY_PASSWORD` for the Gitea registry login — these must be configured in the repo's (or org's) Actions secrets.
|
||||
|
||||
---
|
||||
|
||||
## Pilot deployment notes
|
||||
|
||||
This service is running in pilot form. The following tasks are **paused** — they are not missing by accident, they are deferred by design to get onto real Teltonika hardware first:
|
||||
|
||||
- **Production hardening (task 1.12):** Graceful shutdown is a functional stub; uncaught-exception handling is minimal.
|
||||
- **Device authority (task 1.13):** `AllowAllAuthority` is active — every IMEI is accepted. `STRICT_DEVICE_AUTH=true` is wired but the Redis allow-list refresher is not yet implemented.
|
||||
|
||||
See `.planning/ROADMAP.md` for the resume triggers for each deferred task.
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
# Local development compose — builds the image from this repo's source tree
|
||||
# and runs the service alongside a Redis container.
|
||||
#
|
||||
# Use this for verifying Dockerfile changes locally before pushing. For
|
||||
# day-to-day development, run `pnpm dev` directly against a host-exposed
|
||||
# Redis.
|
||||
#
|
||||
# For STAGE and PRODUCTION deployment, use the multi-service compose in
|
||||
# the sibling `deploy/` repo (https://git.dev.microservices.al/trm/deploy),
|
||||
# which references this service by its registry image tag instead of
|
||||
# building locally.
|
||||
#
|
||||
# Usage:
|
||||
# docker compose -f compose.dev.yaml up --build
|
||||
# docker compose -f compose.dev.yaml down
|
||||
|
||||
name: tcp-ingestion-dev
|
||||
|
||||
services:
|
||||
redis:
|
||||
image: redis:7-alpine
|
||||
expose:
|
||||
- '6379'
|
||||
restart: unless-stopped
|
||||
|
||||
ingestion:
|
||||
build: .
|
||||
depends_on: [redis]
|
||||
ports:
|
||||
- '5027:5027'
|
||||
environment:
|
||||
NODE_ENV: production
|
||||
INSTANCE_ID: dev-1
|
||||
REDIS_URL: redis://redis:6379
|
||||
LOG_LEVEL: debug
|
||||
restart: unless-stopped
|
||||
@@ -12,6 +12,7 @@
|
||||
"start": "node dist/main.js",
|
||||
"test": "vitest run",
|
||||
"test:watch": "vitest",
|
||||
"test:integration": "vitest run --config vitest.integration.config.ts",
|
||||
"lint": "eslint .",
|
||||
"format": "prettier --write .",
|
||||
"typecheck": "tsc --noEmit"
|
||||
|
||||
@@ -112,7 +112,7 @@ export function createTeltonikaAdapter(options: TeltonikaAdapterOptions): Adapte
|
||||
|
||||
// Accept the device
|
||||
socket.write(Buffer.from([0x01]));
|
||||
sessionLogger.debug({ known: knownLabel }, 'IMEI handshake accepted');
|
||||
sessionLogger.info({ known: knownLabel }, 'IMEI handshake accepted');
|
||||
|
||||
// ------------------------------------------------------------------ //
|
||||
// Phase 2: AVL frame read loop
|
||||
@@ -134,6 +134,19 @@ export function createTeltonikaAdapter(options: TeltonikaAdapterOptions): Adapte
|
||||
'malformed frame; dropping connection',
|
||||
);
|
||||
}
|
||||
} else if (
|
||||
err instanceof Error &&
|
||||
'code' in err &&
|
||||
// Routine on cellular: NAT timeouts, carrier RST, half-closed pipes.
|
||||
// Surface as info (one-liner, no stack) so warns mean something.
|
||||
['ETIMEDOUT', 'ECONNRESET', 'EPIPE', 'ENOTCONN'].includes(
|
||||
(err as NodeJS.ErrnoException).code as string,
|
||||
)
|
||||
) {
|
||||
sessionLogger.info(
|
||||
{ code: (err as NodeJS.ErrnoException).code },
|
||||
'session ended (transport error)',
|
||||
);
|
||||
} else {
|
||||
sessionLogger.warn({ err }, 'unexpected error reading frame; dropping connection');
|
||||
}
|
||||
@@ -204,6 +217,13 @@ export function createTeltonikaAdapter(options: TeltonikaAdapterOptions): Adapte
|
||||
result: 'ok',
|
||||
});
|
||||
|
||||
// teltonika_frames_total{result="ok"} and teltonika_records_published_total
|
||||
// now carry this signal in Prometheus; keep the log at debug to avoid noise.
|
||||
sessionLogger.debug(
|
||||
{ codec: codecLabel, records: result.recordCount },
|
||||
'frame ingested',
|
||||
);
|
||||
|
||||
// ACK: 4-byte big-endian record count
|
||||
const ack = Buffer.alloc(4);
|
||||
ack.writeUInt32BE(result.recordCount, 0);
|
||||
|
||||
+1
-1
@@ -22,7 +22,7 @@ export async function runSession(
|
||||
const remoteAddress = `${socket.remoteAddress ?? 'unknown'}:${socket.remotePort ?? '?'}`;
|
||||
const sessionLogger = ctx.logger.child({ remote_address: remoteAddress });
|
||||
|
||||
sessionLogger.debug({ adapter: adapter.name }, 'session opened');
|
||||
sessionLogger.info({ adapter: adapter.name }, 'session opened');
|
||||
|
||||
socket.on('error', (err) => {
|
||||
sessionLogger.debug({ err }, 'socket error');
|
||||
|
||||
+22
-12
@@ -1,13 +1,14 @@
|
||||
import type { Redis } from 'ioredis';
|
||||
import type * as http from 'node:http';
|
||||
import type * as net from 'node:net';
|
||||
import { loadConfig } from './config/load.js';
|
||||
import type { Config } from './config/load.js';
|
||||
import { createLogger } from './observability/logger.js';
|
||||
import { createMetrics, startMetricsServer } from './observability/metrics.js';
|
||||
import { createPublisher, connectRedis } from './core/publish.js';
|
||||
import { startServer } from './core/server.js';
|
||||
import { createTeltonikaAdapter } from './adapters/teltonika/index.js';
|
||||
import { AllowAllAuthority } from './adapters/teltonika/device-authority.js';
|
||||
import type { Metrics } from './core/types.js';
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Startup: validate config (fail fast on bad env), build logger, boot server
|
||||
@@ -31,12 +32,8 @@ const logger = createLogger({
|
||||
|
||||
logger.info('tcp-ingestion starting');
|
||||
|
||||
// Placeholder metrics implementation — replaced in task 1.10.
|
||||
// Using the Metrics interface from types.ts (no prom-client yet).
|
||||
const metrics: Metrics = {
|
||||
inc: (name, labels) => logger.debug({ metric: name, labels }, 'metric inc'),
|
||||
observe: (name, value, labels) => logger.debug({ metric: name, value, labels }, 'metric observe'),
|
||||
};
|
||||
// Real prom-client metrics implementation (task 1.10).
|
||||
const metrics = createMetrics();
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Wire up the pipeline
|
||||
@@ -65,10 +62,18 @@ async function main(): Promise<void> {
|
||||
metrics,
|
||||
});
|
||||
|
||||
// 5. Install graceful shutdown (stub — full hardening in task 1.12)
|
||||
installGracefulShutdown({ server, redis, publisher, logger });
|
||||
// 5. Start metrics HTTP server (task 1.10).
|
||||
// readyzDeps use ioredis's synchronous `.status` field and net.Server's
|
||||
// `.listening` boolean — no I/O, so these closures are always cheap.
|
||||
const metricsServer = startMetricsServer(config.METRICS_PORT, metrics.serializeMetrics, {
|
||||
isRedisReady: () => redis.status === 'ready',
|
||||
isTcpListening: () => server.listening,
|
||||
});
|
||||
|
||||
logger.info({ port: config.TELTONIKA_PORT }, 'tcp-ingestion ready');
|
||||
// 6. Install graceful shutdown (stub — full hardening in task 1.12)
|
||||
installGracefulShutdown({ server, metricsServer, redis, publisher, logger });
|
||||
|
||||
logger.info({ port: config.TELTONIKA_PORT, metricsPort: config.METRICS_PORT }, 'tcp-ingestion ready');
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
@@ -77,13 +82,14 @@ async function main(): Promise<void> {
|
||||
|
||||
type ShutdownDeps = {
|
||||
readonly server: net.Server;
|
||||
readonly metricsServer: http.Server;
|
||||
readonly redis: Redis;
|
||||
readonly publisher: { drain(timeoutMs: number): Promise<void> };
|
||||
readonly logger: ReturnType<typeof createLogger>;
|
||||
};
|
||||
|
||||
function installGracefulShutdown(deps: ShutdownDeps): void {
|
||||
const { server, redis, publisher, logger: log } = deps;
|
||||
const { server, metricsServer, redis, publisher, logger: log } = deps;
|
||||
|
||||
let shuttingDown = false;
|
||||
|
||||
@@ -93,11 +99,15 @@ function installGracefulShutdown(deps: ShutdownDeps): void {
|
||||
|
||||
log.info({ signal }, 'shutdown signal received');
|
||||
|
||||
// Stop accepting new connections
|
||||
// Stop accepting new TCP connections
|
||||
server.close(() => {
|
||||
log.info('TCP server closed');
|
||||
});
|
||||
|
||||
// Close the metrics HTTP server before quitting Redis so /readyz reports
|
||||
// not-ready during the drain window (task 1.12 will tighten this further).
|
||||
metricsServer.close();
|
||||
|
||||
// Drain publisher queue then disconnect Redis
|
||||
publisher
|
||||
.drain(10_000)
|
||||
|
||||
@@ -22,10 +22,19 @@ export function createLogger(options: {
|
||||
instance_id: instanceId,
|
||||
};
|
||||
|
||||
// Emit `"level":"info"` instead of pino's default `"level":30` so log
|
||||
// viewers (Portainer, etc.) show a human-readable label rather than the
|
||||
// numeric level.
|
||||
const formatters = {
|
||||
level: (label: string) => ({ level: label }),
|
||||
};
|
||||
|
||||
if (nodeEnv === 'development') {
|
||||
return pino({
|
||||
level,
|
||||
base,
|
||||
timestamp: pino.stdTimeFunctions.isoTime,
|
||||
formatters,
|
||||
transport: {
|
||||
target: 'pino-pretty',
|
||||
options: {
|
||||
@@ -37,6 +46,8 @@ export function createLogger(options: {
|
||||
});
|
||||
}
|
||||
|
||||
// Production and test: plain JSON — fast, no extra deps
|
||||
return pino({ level, base });
|
||||
// Production and test: plain JSON — fast, no extra deps.
|
||||
// ISO-8601 string timestamps (vs default epoch-ms) survive downstream
|
||||
// log renderers (Portainer, Docker --timestamps) without losing seconds.
|
||||
return pino({ level, base, timestamp: pino.stdTimeFunctions.isoTime, formatters });
|
||||
}
|
||||
|
||||
@@ -0,0 +1,326 @@
|
||||
import * as http from 'node:http';
|
||||
import {
|
||||
Registry,
|
||||
Counter,
|
||||
Gauge,
|
||||
Histogram,
|
||||
collectDefaultMetrics,
|
||||
} from 'prom-client';
|
||||
import type { Metrics } from '../core/types.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Readiness probe dependencies — injected to keep this module free of
|
||||
// adapters/ and core/ imports that would violate the layering rule.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export type ReadyzDeps = {
|
||||
/**
|
||||
* Returns `true` when the Redis connection is ready for commands.
|
||||
* Typically: `() => redis.status === 'ready'`
|
||||
*/
|
||||
readonly isRedisReady: () => boolean;
|
||||
/**
|
||||
* Returns `true` when the TCP listener is bound.
|
||||
* Typically: `() => tcpServer.listening`
|
||||
*/
|
||||
readonly isTcpListening: () => boolean;
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Internal metric registry type — one typed field per metric in the inventory.
|
||||
// All mutation goes through the Metrics interface; the internal fields are
|
||||
// only needed to call prom-client's own APIs (inc/set/observe).
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type InternalRegistry = {
|
||||
readonly registry: Registry;
|
||||
readonly connectionsActive: Gauge;
|
||||
readonly handshakeTotal: Counter;
|
||||
readonly deviceAuthorityFailuresTotal: Counter;
|
||||
readonly framesTotal: Counter;
|
||||
readonly recordsPublishedTotal: Counter;
|
||||
readonly parseDurationSeconds: Histogram;
|
||||
readonly unknownCodecTotal: Counter;
|
||||
readonly publishQueueDepth: Gauge;
|
||||
readonly publishOverflowTotal: Counter;
|
||||
readonly publishDurationSeconds: Histogram;
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// createMetrics — builds the full prom-client registry and returns a Metrics
|
||||
// wrapper that satisfies the existing call-site interface.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Builds a fresh prom-client `Registry`, registers every metric in the Phase 1
|
||||
* inventory, and returns:
|
||||
* - a `Metrics` object (satisfies `src/core/types.ts:Metrics`) for injection
|
||||
* into adapters and the publisher
|
||||
* - a `serializeMetrics()` function for exposition format
|
||||
*
|
||||
* `collectDefaultMetrics` is called once to enable Node.js process metrics
|
||||
* (GC, event loop lag, heap stats, etc.) under the same registry.
|
||||
*/
|
||||
export function createMetrics(): Metrics & {
|
||||
serializeMetrics: () => Promise<string>;
|
||||
} {
|
||||
const internal = buildInternalRegistry();
|
||||
|
||||
// Expose default Node.js process metrics (nodejs_*) on the same registry.
|
||||
collectDefaultMetrics({ register: internal.registry });
|
||||
|
||||
const metricsImpl: Metrics & { serializeMetrics: () => Promise<string> } = {
|
||||
inc(name: string, labels?: Record<string, string>): void {
|
||||
dispatchInc(internal, name, labels);
|
||||
},
|
||||
|
||||
observe(name: string, value: number, labels?: Record<string, string>): void {
|
||||
dispatchObserve(internal, name, value, labels);
|
||||
},
|
||||
|
||||
serializeMetrics(): Promise<string> {
|
||||
return internal.registry.metrics();
|
||||
},
|
||||
};
|
||||
|
||||
return metricsImpl;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// serializeMetrics — standalone helper for use outside the Metrics wrapper.
|
||||
// Exported so startMetricsServer can call it without holding a reference to
|
||||
// the internal registry.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// Note: this is accessed via the metricsImpl.serializeMetrics closure above.
|
||||
// A standalone export is not required; the server takes the bound method.
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// startMetricsServer — minimal node:http server for /metrics, /healthz, /readyz
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Starts the Prometheus metrics HTTP server on the given port.
|
||||
*
|
||||
* Endpoints:
|
||||
* GET /metrics — Prometheus exposition format (text/plain; version=0.0.4)
|
||||
* GET /healthz — 200 if the process is alive (liveness probe)
|
||||
* GET /readyz — 200 if Redis is connected AND TCP server is listening;
|
||||
* 503 otherwise (readiness probe)
|
||||
*
|
||||
* @param port Port to bind; 0 lets the OS pick (useful in tests).
|
||||
* @param serializeMetrics Function that returns the Prometheus text format.
|
||||
* @param readyzDeps Sync accessors for Redis and TCP readiness state.
|
||||
*/
|
||||
export function startMetricsServer(
|
||||
port: number,
|
||||
serializeMetrics: () => Promise<string>,
|
||||
readyzDeps: ReadyzDeps,
|
||||
): http.Server {
|
||||
const server = http.createServer((req, res) => {
|
||||
const url = req.url ?? '/';
|
||||
const method = req.method ?? 'GET';
|
||||
|
||||
// Reject non-GET requests for all endpoints.
|
||||
if (method !== 'GET') {
|
||||
res.writeHead(405, { 'Content-Type': 'text/plain' });
|
||||
res.end('Method Not Allowed');
|
||||
return;
|
||||
}
|
||||
|
||||
if (url === '/metrics') {
|
||||
serializeMetrics()
|
||||
.then((text) => {
|
||||
res.writeHead(200, { 'Content-Type': 'text/plain; version=0.0.4; charset=utf-8' });
|
||||
res.end(text);
|
||||
})
|
||||
.catch((err: unknown) => {
|
||||
res.writeHead(500, { 'Content-Type': 'text/plain' });
|
||||
res.end(`Internal Server Error: ${err instanceof Error ? err.message : String(err)}`);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (url === '/healthz') {
|
||||
res.writeHead(200, { 'Content-Type': 'application/json' });
|
||||
res.end(JSON.stringify({ status: 'ok' }));
|
||||
return;
|
||||
}
|
||||
|
||||
if (url === '/readyz') {
|
||||
const redisOk = readyzDeps.isRedisReady();
|
||||
const tcpOk = readyzDeps.isTcpListening();
|
||||
|
||||
if (redisOk && tcpOk) {
|
||||
res.writeHead(200, { 'Content-Type': 'application/json' });
|
||||
res.end(JSON.stringify({ status: 'ok' }));
|
||||
} else {
|
||||
res.writeHead(503, { 'Content-Type': 'application/json' });
|
||||
res.end(JSON.stringify({ status: 'not ready', redis: redisOk, tcp: tcpOk }));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
res.writeHead(404, { 'Content-Type': 'text/plain' });
|
||||
res.end('Not Found');
|
||||
});
|
||||
|
||||
server.listen(port);
|
||||
return server;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Private: registry construction
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function buildInternalRegistry(): InternalRegistry {
|
||||
const registry = new Registry();
|
||||
|
||||
const connectionsActive = new Gauge({
|
||||
name: 'teltonika_connections_active',
|
||||
help: 'Currently open device sessions.',
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const handshakeTotal = new Counter({
|
||||
name: 'teltonika_handshake_total',
|
||||
help: 'IMEI handshake outcomes.',
|
||||
labelNames: ['result', 'known'],
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const deviceAuthorityFailuresTotal = new Counter({
|
||||
name: 'teltonika_device_authority_failures_total',
|
||||
help: 'Times a DeviceAuthority.check call threw or timed out.',
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const framesTotal = new Counter({
|
||||
name: 'teltonika_frames_total',
|
||||
help: 'Frame-level outcomes.',
|
||||
labelNames: ['codec', 'result'],
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const recordsPublishedTotal = new Counter({
|
||||
name: 'teltonika_records_published_total',
|
||||
help: 'AVL records emitted to Redis.',
|
||||
labelNames: ['codec'],
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const parseDurationSeconds = new Histogram({
|
||||
name: 'teltonika_parse_duration_seconds',
|
||||
help: 'Per-frame parse time.',
|
||||
labelNames: ['codec'],
|
||||
buckets: [0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1],
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const unknownCodecTotal = new Counter({
|
||||
name: 'teltonika_unknown_codec_total',
|
||||
help: 'Canary for codec coverage drift.',
|
||||
labelNames: ['codec_id'],
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const publishQueueDepth = new Gauge({
|
||||
name: 'teltonika_publish_queue_depth',
|
||||
help: 'Current bounded-queue depth.',
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const publishOverflowTotal = new Counter({
|
||||
name: 'teltonika_publish_overflow_total',
|
||||
help: 'Records dropped because the queue was full.',
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const publishDurationSeconds = new Histogram({
|
||||
name: 'teltonika_publish_duration_seconds',
|
||||
help: 'XADD latency.',
|
||||
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0],
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
return {
|
||||
registry,
|
||||
connectionsActive,
|
||||
handshakeTotal,
|
||||
deviceAuthorityFailuresTotal,
|
||||
framesTotal,
|
||||
recordsPublishedTotal,
|
||||
parseDurationSeconds,
|
||||
unknownCodecTotal,
|
||||
publishQueueDepth,
|
||||
publishOverflowTotal,
|
||||
publishDurationSeconds,
|
||||
};
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Private: dispatch helpers — map string metric names to typed prom-client calls
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function dispatchInc(
|
||||
r: InternalRegistry,
|
||||
name: string,
|
||||
labels?: Record<string, string>,
|
||||
): void {
|
||||
switch (name) {
|
||||
case 'teltonika_handshake_total':
|
||||
r.handshakeTotal.inc(labels ?? {});
|
||||
break;
|
||||
case 'teltonika_device_authority_failures_total':
|
||||
r.deviceAuthorityFailuresTotal.inc();
|
||||
break;
|
||||
case 'teltonika_frames_total':
|
||||
r.framesTotal.inc(labels ?? {});
|
||||
break;
|
||||
case 'teltonika_records_published_total':
|
||||
r.recordsPublishedTotal.inc(labels ?? {});
|
||||
break;
|
||||
case 'teltonika_unknown_codec_total':
|
||||
r.unknownCodecTotal.inc(labels ?? {});
|
||||
break;
|
||||
case 'teltonika_publish_overflow_total':
|
||||
r.publishOverflowTotal.inc(labels ?? {});
|
||||
break;
|
||||
case 'teltonika_connections_active':
|
||||
// inc() on a gauge means +1 (connection opened)
|
||||
r.connectionsActive.inc();
|
||||
break;
|
||||
default:
|
||||
// Unknown metric name — silently ignore. This preserves the contract
|
||||
// that the Metrics interface never throws, and avoids crashing the
|
||||
// process when a call site references a metric not yet in the registry
|
||||
// (e.g. during staged rollouts or future tasks).
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
function dispatchObserve(
|
||||
r: InternalRegistry,
|
||||
name: string,
|
||||
value: number,
|
||||
labels?: Record<string, string>,
|
||||
): void {
|
||||
switch (name) {
|
||||
case 'teltonika_parse_duration_seconds':
|
||||
r.parseDurationSeconds.observe(labels ?? {}, value);
|
||||
break;
|
||||
case 'teltonika_publish_duration_seconds':
|
||||
r.publishDurationSeconds.observe(value);
|
||||
break;
|
||||
case 'teltonika_publish_queue_depth':
|
||||
r.publishQueueDepth.set(value);
|
||||
break;
|
||||
case 'teltonika_connections_active':
|
||||
// observe() on a gauge means set(value) — caller controls the value
|
||||
r.connectionsActive.set(value);
|
||||
break;
|
||||
default:
|
||||
// Unknown metric name — silently ignore (see dispatchInc comment).
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,360 @@
|
||||
/**
|
||||
* Unit tests for src/observability/metrics.ts
|
||||
*
|
||||
* Covers:
|
||||
* - serializeMetrics() returns valid Prometheus exposition format with the
|
||||
* full metric inventory present (some at zero)
|
||||
* - counter increments are reflected in the serialized output
|
||||
* - teltonika_unknown_codec_total{codec_id="0xff"} appears after an unknown-codec event
|
||||
* - startMetricsServer responds correctly to /metrics, /healthz, and /readyz
|
||||
* - /readyz returns 503 when Redis is not ready or TCP is not listening
|
||||
*/
|
||||
|
||||
import { describe, it, expect, afterEach } from 'vitest';
|
||||
import * as http from 'node:http';
|
||||
import { createMetrics, startMetricsServer } from '../src/observability/metrics.js';
|
||||
import type { ReadyzDeps } from '../src/observability/metrics.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Makes an HTTP GET request and resolves with { statusCode, body }.
|
||||
* Uses the raw node:http module to match the server's implementation.
|
||||
*/
|
||||
function get(port: number, path: string): Promise<{ statusCode: number; body: string }> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const req = http.get(`http://localhost:${port}${path}`, (res) => {
|
||||
let body = '';
|
||||
res.on('data', (chunk: Buffer) => {
|
||||
body += chunk.toString();
|
||||
});
|
||||
res.on('end', () => {
|
||||
resolve({ statusCode: res.statusCode ?? 0, body });
|
||||
});
|
||||
});
|
||||
req.on('error', reject);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for a server to be listening, then runs `fn`, then closes the server.
|
||||
* Returns after the server is fully closed.
|
||||
*/
|
||||
function withServer(
|
||||
server: http.Server,
|
||||
fn: (port: number) => Promise<void>,
|
||||
): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
server.once('listening', () => {
|
||||
const addr = server.address();
|
||||
const port = typeof addr === 'object' && addr !== null ? addr.port : 0;
|
||||
|
||||
fn(port)
|
||||
.then(() => {
|
||||
server.close((err) => {
|
||||
if (err) reject(err);
|
||||
else resolve();
|
||||
});
|
||||
})
|
||||
.catch((err: unknown) => {
|
||||
server.close(() => reject(err));
|
||||
});
|
||||
});
|
||||
server.once('error', reject);
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 1. serializeMetrics — Prometheus exposition format
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('createMetrics — serializeMetrics', () => {
|
||||
it('returns a non-empty string', async () => {
|
||||
const metrics = createMetrics();
|
||||
const text = await metrics.serializeMetrics();
|
||||
expect(typeof text).toBe('string');
|
||||
expect(text.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it('contains every teltonika_* metric in the Phase 1 inventory', async () => {
|
||||
const metrics = createMetrics();
|
||||
const text = await metrics.serializeMetrics();
|
||||
|
||||
const expectedMetrics = [
|
||||
'teltonika_connections_active',
|
||||
'teltonika_handshake_total',
|
||||
'teltonika_device_authority_failures_total',
|
||||
'teltonika_frames_total',
|
||||
'teltonika_records_published_total',
|
||||
'teltonika_parse_duration_seconds',
|
||||
'teltonika_unknown_codec_total',
|
||||
'teltonika_publish_queue_depth',
|
||||
'teltonika_publish_overflow_total',
|
||||
'teltonika_publish_duration_seconds',
|
||||
];
|
||||
|
||||
for (const name of expectedMetrics) {
|
||||
expect(text, `expected "${name}" in metrics output`).toContain(name);
|
||||
}
|
||||
});
|
||||
|
||||
it('contains nodejs_* default process metrics', async () => {
|
||||
const metrics = createMetrics();
|
||||
const text = await metrics.serializeMetrics();
|
||||
// prom-client collectDefaultMetrics registers nodejs_version_info at minimum
|
||||
expect(text).toContain('nodejs_');
|
||||
});
|
||||
|
||||
it('exposition format contains HELP and TYPE lines', async () => {
|
||||
const metrics = createMetrics();
|
||||
const text = await metrics.serializeMetrics();
|
||||
expect(text).toContain('# HELP teltonika_handshake_total');
|
||||
expect(text).toContain('# TYPE teltonika_handshake_total counter');
|
||||
expect(text).toContain('# HELP teltonika_frames_total');
|
||||
expect(text).toContain('# TYPE teltonika_frames_total counter');
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 2. Counter increments
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('createMetrics — counter increments', () => {
|
||||
it('increments teltonika_frames_total and teltonika_records_published_total', async () => {
|
||||
const metrics = createMetrics();
|
||||
|
||||
// Simulate a successful Codec 8E frame
|
||||
metrics.inc('teltonika_frames_total', { codec: '8E', result: 'ok' });
|
||||
metrics.inc('teltonika_records_published_total', { codec: '8E' });
|
||||
|
||||
const text = await metrics.serializeMetrics();
|
||||
|
||||
// The exposition format for a counter with labels looks like:
|
||||
// teltonika_frames_total{codec="8E",result="ok"} 1
|
||||
expect(text).toMatch(/teltonika_frames_total\{[^}]*codec="8E"[^}]*result="ok"[^}]*\}\s+1/);
|
||||
expect(text).toMatch(/teltonika_records_published_total\{[^}]*codec="8E"[^}]*\}\s+1/);
|
||||
});
|
||||
|
||||
it('accumulates multiple increments on the same label set', async () => {
|
||||
const metrics = createMetrics();
|
||||
|
||||
metrics.inc('teltonika_frames_total', { codec: '8', result: 'ok' });
|
||||
metrics.inc('teltonika_frames_total', { codec: '8', result: 'ok' });
|
||||
metrics.inc('teltonika_frames_total', { codec: '8', result: 'ok' });
|
||||
|
||||
const text = await metrics.serializeMetrics();
|
||||
expect(text).toMatch(/teltonika_frames_total\{[^}]*codec="8"[^}]*result="ok"[^}]*\}\s+3/);
|
||||
});
|
||||
|
||||
it('tracks crc_fail result separately from ok', async () => {
|
||||
const metrics = createMetrics();
|
||||
|
||||
metrics.inc('teltonika_frames_total', { codec: '8', result: 'ok' });
|
||||
metrics.inc('teltonika_frames_total', { codec: '8', result: 'crc_fail' });
|
||||
|
||||
const text = await metrics.serializeMetrics();
|
||||
expect(text).toMatch(/teltonika_frames_total\{[^}]*codec="8"[^}]*result="ok"[^}]*\}\s+1/);
|
||||
expect(text).toMatch(/teltonika_frames_total\{[^}]*codec="8"[^}]*result="crc_fail"[^}]*\}\s+1/);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 3. Unknown codec canary
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('createMetrics — unknown codec canary', () => {
|
||||
it('records teltonika_unknown_codec_total{codec_id="0xff"} after an unknown-codec event', async () => {
|
||||
const metrics = createMetrics();
|
||||
|
||||
metrics.inc('teltonika_unknown_codec_total', { codec_id: '0xff' });
|
||||
|
||||
const text = await metrics.serializeMetrics();
|
||||
expect(text).toMatch(/teltonika_unknown_codec_total\{[^}]*codec_id="0xff"[^}]*\}\s+1/);
|
||||
});
|
||||
|
||||
it('distinguishes different codec_id values', async () => {
|
||||
const metrics = createMetrics();
|
||||
|
||||
metrics.inc('teltonika_unknown_codec_total', { codec_id: '0x0f' });
|
||||
metrics.inc('teltonika_unknown_codec_total', { codec_id: '0x0f' });
|
||||
metrics.inc('teltonika_unknown_codec_total', { codec_id: '0x20' });
|
||||
|
||||
const text = await metrics.serializeMetrics();
|
||||
expect(text).toMatch(/teltonika_unknown_codec_total\{[^}]*codec_id="0x0f"[^}]*\}\s+2/);
|
||||
expect(text).toMatch(/teltonika_unknown_codec_total\{[^}]*codec_id="0x20"[^}]*\}\s+1/);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 4. observe() — gauge and histogram
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('createMetrics — observe', () => {
|
||||
it('sets teltonika_publish_queue_depth gauge', async () => {
|
||||
const metrics = createMetrics();
|
||||
|
||||
metrics.observe('teltonika_publish_queue_depth', 42);
|
||||
|
||||
const text = await metrics.serializeMetrics();
|
||||
expect(text).toMatch(/teltonika_publish_queue_depth\s+42/);
|
||||
});
|
||||
|
||||
it('records teltonika_parse_duration_seconds histogram observation', async () => {
|
||||
const metrics = createMetrics();
|
||||
|
||||
metrics.observe('teltonika_parse_duration_seconds', 0.0003, { codec: '8E' });
|
||||
|
||||
const text = await metrics.serializeMetrics();
|
||||
// The histogram sum should contain the observed value
|
||||
expect(text).toMatch(/teltonika_parse_duration_seconds_sum\{[^}]*codec="8E"[^}]*\}\s+0\.0003/);
|
||||
});
|
||||
|
||||
it('ignores unknown metric names without throwing', () => {
|
||||
const metrics = createMetrics();
|
||||
// Must not throw — the Metrics interface contract is never-throw
|
||||
expect(() => metrics.inc('teltonika_nonexistent_metric')).not.toThrow();
|
||||
expect(() => metrics.observe('teltonika_nonexistent_metric', 1.0)).not.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 5. startMetricsServer — HTTP endpoint behaviour
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('startMetricsServer', () => {
|
||||
// Track servers for cleanup in case of test failure
|
||||
const openServers: http.Server[] = [];
|
||||
|
||||
afterEach(() => {
|
||||
for (const s of openServers) {
|
||||
if (s.listening) s.close();
|
||||
}
|
||||
openServers.length = 0;
|
||||
});
|
||||
|
||||
it('GET /metrics returns 200 with Prometheus text', async () => {
|
||||
const metrics = createMetrics();
|
||||
const readyzDeps: ReadyzDeps = {
|
||||
isRedisReady: () => true,
|
||||
isTcpListening: () => true,
|
||||
};
|
||||
|
||||
// port=0 → OS picks a free port
|
||||
const server = startMetricsServer(0, metrics.serializeMetrics, readyzDeps);
|
||||
openServers.push(server);
|
||||
|
||||
await withServer(server, async (port) => {
|
||||
const { statusCode, body } = await get(port, '/metrics');
|
||||
expect(statusCode).toBe(200);
|
||||
expect(body).toContain('teltonika_');
|
||||
expect(body).toContain('nodejs_');
|
||||
});
|
||||
});
|
||||
|
||||
it('GET /healthz returns 200 regardless of Redis/TCP state', async () => {
|
||||
const metrics = createMetrics();
|
||||
const readyzDeps: ReadyzDeps = {
|
||||
isRedisReady: () => false, // deliberately unhealthy
|
||||
isTcpListening: () => false,
|
||||
};
|
||||
|
||||
const server = startMetricsServer(0, metrics.serializeMetrics, readyzDeps);
|
||||
openServers.push(server);
|
||||
|
||||
await withServer(server, async (port) => {
|
||||
const { statusCode, body } = await get(port, '/healthz');
|
||||
expect(statusCode).toBe(200);
|
||||
expect(JSON.parse(body)).toEqual({ status: 'ok' });
|
||||
});
|
||||
});
|
||||
|
||||
it('GET /readyz returns 200 when Redis is ready and TCP is listening', async () => {
|
||||
const metrics = createMetrics();
|
||||
const readyzDeps: ReadyzDeps = {
|
||||
isRedisReady: () => true,
|
||||
isTcpListening: () => true,
|
||||
};
|
||||
|
||||
const server = startMetricsServer(0, metrics.serializeMetrics, readyzDeps);
|
||||
openServers.push(server);
|
||||
|
||||
await withServer(server, async (port) => {
|
||||
const { statusCode, body } = await get(port, '/readyz');
|
||||
expect(statusCode).toBe(200);
|
||||
expect(JSON.parse(body)).toEqual({ status: 'ok' });
|
||||
});
|
||||
});
|
||||
|
||||
it('GET /readyz returns 503 when Redis is not ready', async () => {
|
||||
const metrics = createMetrics();
|
||||
const readyzDeps: ReadyzDeps = {
|
||||
isRedisReady: () => false,
|
||||
isTcpListening: () => true,
|
||||
};
|
||||
|
||||
const server = startMetricsServer(0, metrics.serializeMetrics, readyzDeps);
|
||||
openServers.push(server);
|
||||
|
||||
await withServer(server, async (port) => {
|
||||
const { statusCode, body } = await get(port, '/readyz');
|
||||
expect(statusCode).toBe(503);
|
||||
const parsed = JSON.parse(body) as { status: string; redis: boolean; tcp: boolean };
|
||||
expect(parsed.status).toBe('not ready');
|
||||
expect(parsed.redis).toBe(false);
|
||||
expect(parsed.tcp).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
it('GET /readyz returns 503 when TCP server is not listening', async () => {
|
||||
const metrics = createMetrics();
|
||||
const readyzDeps: ReadyzDeps = {
|
||||
isRedisReady: () => true,
|
||||
isTcpListening: () => false,
|
||||
};
|
||||
|
||||
const server = startMetricsServer(0, metrics.serializeMetrics, readyzDeps);
|
||||
openServers.push(server);
|
||||
|
||||
await withServer(server, async (port) => {
|
||||
const { statusCode } = await get(port, '/readyz');
|
||||
expect(statusCode).toBe(503);
|
||||
});
|
||||
});
|
||||
|
||||
it('GET /readyz returns 503 when both Redis and TCP are down', async () => {
|
||||
const metrics = createMetrics();
|
||||
const readyzDeps: ReadyzDeps = {
|
||||
isRedisReady: () => false,
|
||||
isTcpListening: () => false,
|
||||
};
|
||||
|
||||
const server = startMetricsServer(0, metrics.serializeMetrics, readyzDeps);
|
||||
openServers.push(server);
|
||||
|
||||
await withServer(server, async (port) => {
|
||||
const { statusCode, body } = await get(port, '/readyz');
|
||||
expect(statusCode).toBe(503);
|
||||
const parsed = JSON.parse(body) as { status: string; redis: boolean; tcp: boolean };
|
||||
expect(parsed.redis).toBe(false);
|
||||
expect(parsed.tcp).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
it('GET /unknown-path returns 404', async () => {
|
||||
const metrics = createMetrics();
|
||||
const readyzDeps: ReadyzDeps = {
|
||||
isRedisReady: () => true,
|
||||
isTcpListening: () => true,
|
||||
};
|
||||
|
||||
const server = startMetricsServer(0, metrics.serializeMetrics, readyzDeps);
|
||||
openServers.push(server);
|
||||
|
||||
await withServer(server, async (port) => {
|
||||
const { statusCode } = await get(port, '/not-a-real-endpoint');
|
||||
expect(statusCode).toBe(404);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -3,6 +3,10 @@ import { defineConfig } from 'vitest/config';
|
||||
export default defineConfig({
|
||||
test: {
|
||||
include: ['test/**/*.test.ts'],
|
||||
// Integration tests need external services (Docker, real Redis). They are
|
||||
// opt-in via `pnpm test:integration` (see vitest.integration.config.ts).
|
||||
// Excluding them here keeps `pnpm test` fast and CI-safe.
|
||||
exclude: ['**/node_modules/**', 'test/**/*.integration.test.ts'],
|
||||
environment: 'node',
|
||||
coverage: {
|
||||
provider: 'v8',
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
import { defineConfig } from 'vitest/config';
|
||||
|
||||
/**
|
||||
* Vitest config for integration tests that depend on external services
|
||||
* (Docker, real Redis, etc.). Run via `pnpm test:integration`. Requires
|
||||
* a working Docker daemon — `testcontainers` will spin up the services
|
||||
* it needs, then tear them down.
|
||||
*
|
||||
* NOT run in default CI. Run locally before changes that touch the
|
||||
* Redis publisher, or run in a separate CI job that has Docker access.
|
||||
*/
|
||||
export default defineConfig({
|
||||
test: {
|
||||
include: ['test/**/*.integration.test.ts'],
|
||||
environment: 'node',
|
||||
// Container startup can be slow on first run (image pull, ryuk
|
||||
// container, etc). Allow generous hook + test timeouts.
|
||||
hookTimeout: 120_000,
|
||||
testTimeout: 60_000,
|
||||
},
|
||||
resolve: {
|
||||
extensions: ['.ts', '.js'],
|
||||
},
|
||||
});
|
||||
Reference in New Issue
Block a user