d4a6d8f713
Replaces the placeholder Metrics shim with a prom-client implementation in src/observability/metrics.ts: all 10 Phase 1 metrics from the wiki spec, plus nodejs_* defaults. Exposes /metrics, /healthz, /readyz over node:http on METRICS_PORT (9090); /readyz returns 503 when Redis status is not 'ready' or the TCP listener isn't bound. The Metrics interface in src/core/types.ts is unchanged — adapter call sites continue to use the same inc/observe shape. Only main.ts sees the extended type that adds serializeMetrics(). Side effects: - Dockerfile re-enables HEALTHCHECK pointing at /readyz, and EXPOSE 9090. - frame-ingested log downgraded back to debug now that teltonika_records_published_total is scrapeable. - 19 new unit tests covering exposition format, all metric types, and every HTTP endpoint path. Total now 98 passing. Note: deploy/compose.yaml still does not expose 9090 — separate decision about how Prometheus reaches the service (host port vs. internal scraper on the same Docker network).
143 lines
5.1 KiB
TypeScript
143 lines
5.1 KiB
TypeScript
import type { Redis } from 'ioredis';
|
|
import type * as http from 'node:http';
|
|
import type * as net from 'node:net';
|
|
import { loadConfig } from './config/load.js';
|
|
import type { Config } from './config/load.js';
|
|
import { createLogger } from './observability/logger.js';
|
|
import { createMetrics, startMetricsServer } from './observability/metrics.js';
|
|
import { createPublisher, connectRedis } from './core/publish.js';
|
|
import { startServer } from './core/server.js';
|
|
import { createTeltonikaAdapter } from './adapters/teltonika/index.js';
|
|
import { AllowAllAuthority } from './adapters/teltonika/device-authority.js';
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Startup: validate config (fail fast on bad env), build logger, boot server
|
|
// -------------------------------------------------------------------------
|
|
|
|
let config: Config;
|
|
try {
|
|
config = loadConfig();
|
|
} catch (err) {
|
|
// Config validation failures print a human-readable message and exit 1.
|
|
// Logger is not available yet — process.stderr is the only output channel.
|
|
process.stderr.write(`${err instanceof Error ? err.message : String(err)}\n`);
|
|
process.exit(1);
|
|
}
|
|
|
|
const logger = createLogger({
|
|
level: config.LOG_LEVEL,
|
|
nodeEnv: config.NODE_ENV,
|
|
instanceId: config.INSTANCE_ID,
|
|
});
|
|
|
|
logger.info('tcp-ingestion starting');
|
|
|
|
// Real prom-client metrics implementation (task 1.10).
|
|
const metrics = createMetrics();
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Wire up the pipeline
|
|
// -------------------------------------------------------------------------
|
|
|
|
async function main(): Promise<void> {
|
|
// 1. Connect Redis with exponential-backoff retry (3 attempts, up to 5s backoff)
|
|
const redis = await connectRedis(config.REDIS_URL, logger);
|
|
|
|
// 2. Build the publisher (bounded queue + XADD worker)
|
|
const publisher = createPublisher(redis, config, logger, metrics);
|
|
|
|
// 3. Build the Teltonika adapter (all three Phase 1 codecs registered via defaultRegistry)
|
|
const teltonikaAdapter = createTeltonikaAdapter({
|
|
port: config.TELTONIKA_PORT,
|
|
deviceAuthority: new AllowAllAuthority(),
|
|
strictDeviceAuth: config.STRICT_DEVICE_AUTH,
|
|
// No explicit codecRegistry — createTeltonikaAdapter builds defaultRegistry
|
|
// with codec8Handler, codec8eHandler, codec16Handler pre-registered.
|
|
});
|
|
|
|
// 4. Start TCP server — publisher.publish is the AdapterContext.publish impl
|
|
const server = startServer(config.TELTONIKA_PORT, teltonikaAdapter, {
|
|
publish: publisher.publish,
|
|
logger,
|
|
metrics,
|
|
});
|
|
|
|
// 5. Start metrics HTTP server (task 1.10).
|
|
// readyzDeps use ioredis's synchronous `.status` field and net.Server's
|
|
// `.listening` boolean — no I/O, so these closures are always cheap.
|
|
const metricsServer = startMetricsServer(config.METRICS_PORT, metrics.serializeMetrics, {
|
|
isRedisReady: () => redis.status === 'ready',
|
|
isTcpListening: () => server.listening,
|
|
});
|
|
|
|
// 6. Install graceful shutdown (stub — full hardening in task 1.12)
|
|
installGracefulShutdown({ server, metricsServer, redis, publisher, logger });
|
|
|
|
logger.info({ port: config.TELTONIKA_PORT, metricsPort: config.METRICS_PORT }, 'tcp-ingestion ready');
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Graceful shutdown stub — task 1.12 finalizes this
|
|
// -------------------------------------------------------------------------
|
|
|
|
type ShutdownDeps = {
|
|
readonly server: net.Server;
|
|
readonly metricsServer: http.Server;
|
|
readonly redis: Redis;
|
|
readonly publisher: { drain(timeoutMs: number): Promise<void> };
|
|
readonly logger: ReturnType<typeof createLogger>;
|
|
};
|
|
|
|
function installGracefulShutdown(deps: ShutdownDeps): void {
|
|
const { server, metricsServer, redis, publisher, logger: log } = deps;
|
|
|
|
let shuttingDown = false;
|
|
|
|
function shutdown(signal: string): void {
|
|
if (shuttingDown) return;
|
|
shuttingDown = true;
|
|
|
|
log.info({ signal }, 'shutdown signal received');
|
|
|
|
// Stop accepting new TCP connections
|
|
server.close(() => {
|
|
log.info('TCP server closed');
|
|
});
|
|
|
|
// Close the metrics HTTP server before quitting Redis so /readyz reports
|
|
// not-ready during the drain window (task 1.12 will tighten this further).
|
|
metricsServer.close();
|
|
|
|
// Drain publisher queue then disconnect Redis
|
|
publisher
|
|
.drain(10_000)
|
|
.then(() => redis.quit())
|
|
.then(() => {
|
|
log.info('graceful shutdown complete');
|
|
process.exit(0);
|
|
})
|
|
.catch((err) => {
|
|
log.error({ err }, 'error during shutdown');
|
|
process.exit(1);
|
|
});
|
|
|
|
// Force exit after 15s if graceful path stalls
|
|
setTimeout(() => {
|
|
log.warn('forced exit after shutdown timeout');
|
|
process.exit(1);
|
|
}, 15_000).unref();
|
|
}
|
|
|
|
process.on('SIGTERM', () => shutdown('SIGTERM'));
|
|
process.on('SIGINT', () => shutdown('SIGINT'));
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Entry point
|
|
// -------------------------------------------------------------------------
|
|
|
|
main().catch((err) => {
|
|
process.stderr.write(`Fatal startup error: ${err instanceof Error ? err.message : String(err)}\n`);
|
|
process.exit(1);
|
|
});
|