Implement Phase 1 task 1.10 (Prometheus metrics + /healthz + /readyz)
Replaces the placeholder Metrics shim with a prom-client implementation in src/observability/metrics.ts: all 10 Phase 1 metrics from the wiki spec, plus nodejs_* defaults. Exposes /metrics, /healthz, /readyz over node:http on METRICS_PORT (9090); /readyz returns 503 when Redis status is not 'ready' or the TCP listener isn't bound. The Metrics interface in src/core/types.ts is unchanged — adapter call sites continue to use the same inc/observe shape. Only main.ts sees the extended type that adds serializeMetrics(). Side effects: - Dockerfile re-enables HEALTHCHECK pointing at /readyz, and EXPOSE 9090. - frame-ingested log downgraded back to debug now that teltonika_records_published_total is scrapeable. - 19 new unit tests covering exposition format, all metric types, and every HTTP endpoint path. Total now 98 passing. Note: deploy/compose.yaml still does not expose 9090 — separate decision about how Prometheus reaches the service (host port vs. internal scraper on the same Docker network).
This commit is contained in:
@@ -217,9 +217,9 @@ export function createTeltonikaAdapter(options: TeltonikaAdapterOptions): Adapte
|
||||
result: 'ok',
|
||||
});
|
||||
|
||||
// Pilot-stage visibility into per-frame ingest. Downgrade to debug
|
||||
// once task 1.10 wires up prom-client and frames_total is scrapeable.
|
||||
sessionLogger.info(
|
||||
// teltonika_frames_total{result="ok"} and teltonika_records_published_total
|
||||
// now carry this signal in Prometheus; keep the log at debug to avoid noise.
|
||||
sessionLogger.debug(
|
||||
{ codec: codecLabel, records: result.recordCount },
|
||||
'frame ingested',
|
||||
);
|
||||
|
||||
+22
-12
@@ -1,13 +1,14 @@
|
||||
import type { Redis } from 'ioredis';
|
||||
import type * as http from 'node:http';
|
||||
import type * as net from 'node:net';
|
||||
import { loadConfig } from './config/load.js';
|
||||
import type { Config } from './config/load.js';
|
||||
import { createLogger } from './observability/logger.js';
|
||||
import { createMetrics, startMetricsServer } from './observability/metrics.js';
|
||||
import { createPublisher, connectRedis } from './core/publish.js';
|
||||
import { startServer } from './core/server.js';
|
||||
import { createTeltonikaAdapter } from './adapters/teltonika/index.js';
|
||||
import { AllowAllAuthority } from './adapters/teltonika/device-authority.js';
|
||||
import type { Metrics } from './core/types.js';
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Startup: validate config (fail fast on bad env), build logger, boot server
|
||||
@@ -31,12 +32,8 @@ const logger = createLogger({
|
||||
|
||||
logger.info('tcp-ingestion starting');
|
||||
|
||||
// Placeholder metrics implementation — replaced in task 1.10.
|
||||
// Using the Metrics interface from types.ts (no prom-client yet).
|
||||
const metrics: Metrics = {
|
||||
inc: (name, labels) => logger.trace({ metric: name, labels }, 'metric inc'),
|
||||
observe: (name, value, labels) => logger.trace({ metric: name, value, labels }, 'metric observe'),
|
||||
};
|
||||
// Real prom-client metrics implementation (task 1.10).
|
||||
const metrics = createMetrics();
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Wire up the pipeline
|
||||
@@ -65,10 +62,18 @@ async function main(): Promise<void> {
|
||||
metrics,
|
||||
});
|
||||
|
||||
// 5. Install graceful shutdown (stub — full hardening in task 1.12)
|
||||
installGracefulShutdown({ server, redis, publisher, logger });
|
||||
// 5. Start metrics HTTP server (task 1.10).
|
||||
// readyzDeps use ioredis's synchronous `.status` field and net.Server's
|
||||
// `.listening` boolean — no I/O, so these closures are always cheap.
|
||||
const metricsServer = startMetricsServer(config.METRICS_PORT, metrics.serializeMetrics, {
|
||||
isRedisReady: () => redis.status === 'ready',
|
||||
isTcpListening: () => server.listening,
|
||||
});
|
||||
|
||||
logger.info({ port: config.TELTONIKA_PORT }, 'tcp-ingestion ready');
|
||||
// 6. Install graceful shutdown (stub — full hardening in task 1.12)
|
||||
installGracefulShutdown({ server, metricsServer, redis, publisher, logger });
|
||||
|
||||
logger.info({ port: config.TELTONIKA_PORT, metricsPort: config.METRICS_PORT }, 'tcp-ingestion ready');
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
@@ -77,13 +82,14 @@ async function main(): Promise<void> {
|
||||
|
||||
type ShutdownDeps = {
|
||||
readonly server: net.Server;
|
||||
readonly metricsServer: http.Server;
|
||||
readonly redis: Redis;
|
||||
readonly publisher: { drain(timeoutMs: number): Promise<void> };
|
||||
readonly logger: ReturnType<typeof createLogger>;
|
||||
};
|
||||
|
||||
function installGracefulShutdown(deps: ShutdownDeps): void {
|
||||
const { server, redis, publisher, logger: log } = deps;
|
||||
const { server, metricsServer, redis, publisher, logger: log } = deps;
|
||||
|
||||
let shuttingDown = false;
|
||||
|
||||
@@ -93,11 +99,15 @@ function installGracefulShutdown(deps: ShutdownDeps): void {
|
||||
|
||||
log.info({ signal }, 'shutdown signal received');
|
||||
|
||||
// Stop accepting new connections
|
||||
// Stop accepting new TCP connections
|
||||
server.close(() => {
|
||||
log.info('TCP server closed');
|
||||
});
|
||||
|
||||
// Close the metrics HTTP server before quitting Redis so /readyz reports
|
||||
// not-ready during the drain window (task 1.12 will tighten this further).
|
||||
metricsServer.close();
|
||||
|
||||
// Drain publisher queue then disconnect Redis
|
||||
publisher
|
||||
.drain(10_000)
|
||||
|
||||
@@ -0,0 +1,326 @@
|
||||
import * as http from 'node:http';
|
||||
import {
|
||||
Registry,
|
||||
Counter,
|
||||
Gauge,
|
||||
Histogram,
|
||||
collectDefaultMetrics,
|
||||
} from 'prom-client';
|
||||
import type { Metrics } from '../core/types.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Readiness probe dependencies — injected to keep this module free of
|
||||
// adapters/ and core/ imports that would violate the layering rule.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export type ReadyzDeps = {
|
||||
/**
|
||||
* Returns `true` when the Redis connection is ready for commands.
|
||||
* Typically: `() => redis.status === 'ready'`
|
||||
*/
|
||||
readonly isRedisReady: () => boolean;
|
||||
/**
|
||||
* Returns `true` when the TCP listener is bound.
|
||||
* Typically: `() => tcpServer.listening`
|
||||
*/
|
||||
readonly isTcpListening: () => boolean;
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Internal metric registry type — one typed field per metric in the inventory.
|
||||
// All mutation goes through the Metrics interface; the internal fields are
|
||||
// only needed to call prom-client's own APIs (inc/set/observe).
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type InternalRegistry = {
|
||||
readonly registry: Registry;
|
||||
readonly connectionsActive: Gauge;
|
||||
readonly handshakeTotal: Counter;
|
||||
readonly deviceAuthorityFailuresTotal: Counter;
|
||||
readonly framesTotal: Counter;
|
||||
readonly recordsPublishedTotal: Counter;
|
||||
readonly parseDurationSeconds: Histogram;
|
||||
readonly unknownCodecTotal: Counter;
|
||||
readonly publishQueueDepth: Gauge;
|
||||
readonly publishOverflowTotal: Counter;
|
||||
readonly publishDurationSeconds: Histogram;
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// createMetrics — builds the full prom-client registry and returns a Metrics
|
||||
// wrapper that satisfies the existing call-site interface.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Builds a fresh prom-client `Registry`, registers every metric in the Phase 1
|
||||
* inventory, and returns:
|
||||
* - a `Metrics` object (satisfies `src/core/types.ts:Metrics`) for injection
|
||||
* into adapters and the publisher
|
||||
* - a `serializeMetrics()` function for exposition format
|
||||
*
|
||||
* `collectDefaultMetrics` is called once to enable Node.js process metrics
|
||||
* (GC, event loop lag, heap stats, etc.) under the same registry.
|
||||
*/
|
||||
export function createMetrics(): Metrics & {
|
||||
serializeMetrics: () => Promise<string>;
|
||||
} {
|
||||
const internal = buildInternalRegistry();
|
||||
|
||||
// Expose default Node.js process metrics (nodejs_*) on the same registry.
|
||||
collectDefaultMetrics({ register: internal.registry });
|
||||
|
||||
const metricsImpl: Metrics & { serializeMetrics: () => Promise<string> } = {
|
||||
inc(name: string, labels?: Record<string, string>): void {
|
||||
dispatchInc(internal, name, labels);
|
||||
},
|
||||
|
||||
observe(name: string, value: number, labels?: Record<string, string>): void {
|
||||
dispatchObserve(internal, name, value, labels);
|
||||
},
|
||||
|
||||
serializeMetrics(): Promise<string> {
|
||||
return internal.registry.metrics();
|
||||
},
|
||||
};
|
||||
|
||||
return metricsImpl;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// serializeMetrics — standalone helper for use outside the Metrics wrapper.
|
||||
// Exported so startMetricsServer can call it without holding a reference to
|
||||
// the internal registry.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// Note: this is accessed via the metricsImpl.serializeMetrics closure above.
|
||||
// A standalone export is not required; the server takes the bound method.
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// startMetricsServer — minimal node:http server for /metrics, /healthz, /readyz
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Starts the Prometheus metrics HTTP server on the given port.
|
||||
*
|
||||
* Endpoints:
|
||||
* GET /metrics — Prometheus exposition format (text/plain; version=0.0.4)
|
||||
* GET /healthz — 200 if the process is alive (liveness probe)
|
||||
* GET /readyz — 200 if Redis is connected AND TCP server is listening;
|
||||
* 503 otherwise (readiness probe)
|
||||
*
|
||||
* @param port Port to bind; 0 lets the OS pick (useful in tests).
|
||||
* @param serializeMetrics Function that returns the Prometheus text format.
|
||||
* @param readyzDeps Sync accessors for Redis and TCP readiness state.
|
||||
*/
|
||||
export function startMetricsServer(
|
||||
port: number,
|
||||
serializeMetrics: () => Promise<string>,
|
||||
readyzDeps: ReadyzDeps,
|
||||
): http.Server {
|
||||
const server = http.createServer((req, res) => {
|
||||
const url = req.url ?? '/';
|
||||
const method = req.method ?? 'GET';
|
||||
|
||||
// Reject non-GET requests for all endpoints.
|
||||
if (method !== 'GET') {
|
||||
res.writeHead(405, { 'Content-Type': 'text/plain' });
|
||||
res.end('Method Not Allowed');
|
||||
return;
|
||||
}
|
||||
|
||||
if (url === '/metrics') {
|
||||
serializeMetrics()
|
||||
.then((text) => {
|
||||
res.writeHead(200, { 'Content-Type': 'text/plain; version=0.0.4; charset=utf-8' });
|
||||
res.end(text);
|
||||
})
|
||||
.catch((err: unknown) => {
|
||||
res.writeHead(500, { 'Content-Type': 'text/plain' });
|
||||
res.end(`Internal Server Error: ${err instanceof Error ? err.message : String(err)}`);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (url === '/healthz') {
|
||||
res.writeHead(200, { 'Content-Type': 'application/json' });
|
||||
res.end(JSON.stringify({ status: 'ok' }));
|
||||
return;
|
||||
}
|
||||
|
||||
if (url === '/readyz') {
|
||||
const redisOk = readyzDeps.isRedisReady();
|
||||
const tcpOk = readyzDeps.isTcpListening();
|
||||
|
||||
if (redisOk && tcpOk) {
|
||||
res.writeHead(200, { 'Content-Type': 'application/json' });
|
||||
res.end(JSON.stringify({ status: 'ok' }));
|
||||
} else {
|
||||
res.writeHead(503, { 'Content-Type': 'application/json' });
|
||||
res.end(JSON.stringify({ status: 'not ready', redis: redisOk, tcp: tcpOk }));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
res.writeHead(404, { 'Content-Type': 'text/plain' });
|
||||
res.end('Not Found');
|
||||
});
|
||||
|
||||
server.listen(port);
|
||||
return server;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Private: registry construction
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function buildInternalRegistry(): InternalRegistry {
|
||||
const registry = new Registry();
|
||||
|
||||
const connectionsActive = new Gauge({
|
||||
name: 'teltonika_connections_active',
|
||||
help: 'Currently open device sessions.',
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const handshakeTotal = new Counter({
|
||||
name: 'teltonika_handshake_total',
|
||||
help: 'IMEI handshake outcomes.',
|
||||
labelNames: ['result', 'known'],
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const deviceAuthorityFailuresTotal = new Counter({
|
||||
name: 'teltonika_device_authority_failures_total',
|
||||
help: 'Times a DeviceAuthority.check call threw or timed out.',
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const framesTotal = new Counter({
|
||||
name: 'teltonika_frames_total',
|
||||
help: 'Frame-level outcomes.',
|
||||
labelNames: ['codec', 'result'],
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const recordsPublishedTotal = new Counter({
|
||||
name: 'teltonika_records_published_total',
|
||||
help: 'AVL records emitted to Redis.',
|
||||
labelNames: ['codec'],
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const parseDurationSeconds = new Histogram({
|
||||
name: 'teltonika_parse_duration_seconds',
|
||||
help: 'Per-frame parse time.',
|
||||
labelNames: ['codec'],
|
||||
buckets: [0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1],
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const unknownCodecTotal = new Counter({
|
||||
name: 'teltonika_unknown_codec_total',
|
||||
help: 'Canary for codec coverage drift.',
|
||||
labelNames: ['codec_id'],
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const publishQueueDepth = new Gauge({
|
||||
name: 'teltonika_publish_queue_depth',
|
||||
help: 'Current bounded-queue depth.',
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const publishOverflowTotal = new Counter({
|
||||
name: 'teltonika_publish_overflow_total',
|
||||
help: 'Records dropped because the queue was full.',
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
const publishDurationSeconds = new Histogram({
|
||||
name: 'teltonika_publish_duration_seconds',
|
||||
help: 'XADD latency.',
|
||||
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0],
|
||||
registers: [registry],
|
||||
});
|
||||
|
||||
return {
|
||||
registry,
|
||||
connectionsActive,
|
||||
handshakeTotal,
|
||||
deviceAuthorityFailuresTotal,
|
||||
framesTotal,
|
||||
recordsPublishedTotal,
|
||||
parseDurationSeconds,
|
||||
unknownCodecTotal,
|
||||
publishQueueDepth,
|
||||
publishOverflowTotal,
|
||||
publishDurationSeconds,
|
||||
};
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Private: dispatch helpers — map string metric names to typed prom-client calls
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function dispatchInc(
|
||||
r: InternalRegistry,
|
||||
name: string,
|
||||
labels?: Record<string, string>,
|
||||
): void {
|
||||
switch (name) {
|
||||
case 'teltonika_handshake_total':
|
||||
r.handshakeTotal.inc(labels ?? {});
|
||||
break;
|
||||
case 'teltonika_device_authority_failures_total':
|
||||
r.deviceAuthorityFailuresTotal.inc();
|
||||
break;
|
||||
case 'teltonika_frames_total':
|
||||
r.framesTotal.inc(labels ?? {});
|
||||
break;
|
||||
case 'teltonika_records_published_total':
|
||||
r.recordsPublishedTotal.inc(labels ?? {});
|
||||
break;
|
||||
case 'teltonika_unknown_codec_total':
|
||||
r.unknownCodecTotal.inc(labels ?? {});
|
||||
break;
|
||||
case 'teltonika_publish_overflow_total':
|
||||
r.publishOverflowTotal.inc(labels ?? {});
|
||||
break;
|
||||
case 'teltonika_connections_active':
|
||||
// inc() on a gauge means +1 (connection opened)
|
||||
r.connectionsActive.inc();
|
||||
break;
|
||||
default:
|
||||
// Unknown metric name — silently ignore. This preserves the contract
|
||||
// that the Metrics interface never throws, and avoids crashing the
|
||||
// process when a call site references a metric not yet in the registry
|
||||
// (e.g. during staged rollouts or future tasks).
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
function dispatchObserve(
|
||||
r: InternalRegistry,
|
||||
name: string,
|
||||
value: number,
|
||||
labels?: Record<string, string>,
|
||||
): void {
|
||||
switch (name) {
|
||||
case 'teltonika_parse_duration_seconds':
|
||||
r.parseDurationSeconds.observe(labels ?? {}, value);
|
||||
break;
|
||||
case 'teltonika_publish_duration_seconds':
|
||||
r.publishDurationSeconds.observe(value);
|
||||
break;
|
||||
case 'teltonika_publish_queue_depth':
|
||||
r.publishQueueDepth.set(value);
|
||||
break;
|
||||
case 'teltonika_connections_active':
|
||||
// observe() on a gauge means set(value) — caller controls the value
|
||||
r.connectionsActive.set(value);
|
||||
break;
|
||||
default:
|
||||
// Unknown metric name — silently ignore (see dispatchInc comment).
|
||||
break;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user