Files
tcp-ingestion/src/adapters/teltonika/index.ts
T
julian d4a6d8f713 Implement Phase 1 task 1.10 (Prometheus metrics + /healthz + /readyz)
Replaces the placeholder Metrics shim with a prom-client implementation
in src/observability/metrics.ts: all 10 Phase 1 metrics from the wiki
spec, plus nodejs_* defaults. Exposes /metrics, /healthz, /readyz over
node:http on METRICS_PORT (9090); /readyz returns 503 when Redis status
is not 'ready' or the TCP listener isn't bound.

The Metrics interface in src/core/types.ts is unchanged — adapter call
sites continue to use the same inc/observe shape. Only main.ts sees the
extended type that adds serializeMetrics().

Side effects:
- Dockerfile re-enables HEALTHCHECK pointing at /readyz, and EXPOSE 9090.
- frame-ingested log downgraded back to debug now that
  teltonika_records_published_total is scrapeable.
- 19 new unit tests covering exposition format, all metric types, and
  every HTTP endpoint path. Total now 98 passing.

Note: deploy/compose.yaml still does not expose 9090 — separate decision
about how Prometheus reaches the service (host port vs. internal scraper
on the same Docker network).
2026-04-30 20:54:32 +02:00

235 lines
9.0 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import type * as net from 'node:net';
import type { Adapter, AdapterContext } from '../../core/types.js';
import type { CodecLabel } from '../../core/publish.js';
import type { DeviceAuthority } from './device-authority.js';
import { AllowAllAuthority } from './device-authority.js';
import { readImeiHandshake, HandshakeError } from './handshake.js';
import { BufferedReader, readNextFrame, FrameDropError } from './frame.js';
import { CodecRegistry } from './codec/registry.js';
import { codec8Handler } from './codec/data/codec8.js';
import { codec8eHandler } from './codec/data/codec8e.js';
import { codec16Handler } from './codec/data/codec16.js';
/**
* Maps numeric codec IDs (as seen in the frame header) to their canonical
* string labels used in the Redis Stream record.
*
* Codec label plumbing decision (task 1.8):
* Option A: change CodecHandlerContext.publish signature to accept a codec label.
* - Pro: most direct.
* - Con: ripples into all three codec parser files + registry type.
* Option B (chosen): wrap ctx.publish in a closure at the dispatch site in
* index.ts. The handler still calls ctx.publish(position) unchanged; the
* wrapper captures frame.codecId, resolves it to a label, and forwards to
* Publisher.publish(position, codec). Zero changes to parsers or registry.
*/
const CODEC_ID_TO_LABEL: ReadonlyMap<number, CodecLabel> = new Map([
[0x08, '8'],
[0x8e, '8E'],
[0x10, '16'],
]);
export type TeltonikaAdapterOptions = {
readonly port: number;
readonly deviceAuthority?: DeviceAuthority;
readonly strictDeviceAuth?: boolean;
readonly codecRegistry?: CodecRegistry;
};
/**
* Creates and returns the Teltonika adapter. The adapter:
* 1. Performs the IMEI handshake (reads; consults DeviceAuthority; writes 0x01/0x00)
* 2. Runs the AVL frame read loop (preamble → length → body → CRC → dispatch)
* 3. ACKs accepted frames with the 4-byte big-endian record count
*
* Codec handlers are registered externally and passed in via `codecRegistry`.
* Tasks 1.51.7 populate the registry; this task ships it empty (any frame
* triggers the unknown-codec path and drops the connection, per spec).
*/
export function createTeltonikaAdapter(options: TeltonikaAdapterOptions): Adapter {
const authority: DeviceAuthority = options.deviceAuthority ?? new AllowAllAuthority();
const strictDeviceAuth = options.strictDeviceAuth ?? false;
// Build default registry with all three Phase 1 data codecs registered.
// Callers can pass their own registry (e.g. in tests) to override.
const defaultRegistry = new CodecRegistry();
defaultRegistry.register(codec8Handler);
defaultRegistry.register(codec8eHandler);
defaultRegistry.register(codec16Handler);
const codecRegistry = options.codecRegistry ?? defaultRegistry;
return {
name: 'teltonika',
ports: [options.port],
async handleSession(socket: net.Socket, ctx: AdapterContext): Promise<void> {
// ------------------------------------------------------------------ //
// Phase 1: IMEI handshake
// ------------------------------------------------------------------ //
let imei: string;
try {
imei = await readImeiHandshake(socket);
} catch (err) {
if (err instanceof HandshakeError) {
ctx.logger.warn(
{ err, raw_bytes: err.rawBytes },
'IMEI handshake failed; destroying socket',
);
} else {
ctx.logger.warn({ err }, 'unexpected error during IMEI handshake');
}
socket.destroy();
return;
}
const sessionLogger = ctx.logger.child({ imei });
// Consult DeviceAuthority — errors default to 'unknown' (safe, observable)
let knownLabel: 'known' | 'unknown';
try {
knownLabel = await authority.check(imei);
} catch (authorityErr) {
sessionLogger.warn(
{ err: authorityErr },
'DeviceAuthority.check failed; defaulting to unknown',
);
knownLabel = 'unknown';
}
ctx.metrics.inc('teltonika_handshake_total', {
result: 'accepted',
known: knownLabel,
});
if (knownLabel === 'unknown' && strictDeviceAuth) {
// Reject path (opt-in via STRICT_DEVICE_AUTH)
socket.write(Buffer.from([0x00]));
sessionLogger.warn({ imei }, 'rejected unknown device under STRICT_DEVICE_AUTH');
socket.destroy();
return;
}
// Accept the device
socket.write(Buffer.from([0x01]));
sessionLogger.info({ known: knownLabel }, 'IMEI handshake accepted');
// ------------------------------------------------------------------ //
// Phase 2: AVL frame read loop
// ------------------------------------------------------------------ //
const reader = new BufferedReader(socket);
while (!socket.destroyed) {
let frame;
try {
frame = await readNextFrame(reader);
} catch (err) {
if (err instanceof FrameDropError) {
if (err.reason === 'socket_closed') {
// Normal disconnect — no warning needed
sessionLogger.debug('socket closed during frame read');
} else {
sessionLogger.warn(
{ reason: err.reason, err },
'malformed frame; dropping connection',
);
}
} else if (
err instanceof Error &&
'code' in err &&
// Routine on cellular: NAT timeouts, carrier RST, half-closed pipes.
// Surface as info (one-liner, no stack) so warns mean something.
['ETIMEDOUT', 'ECONNRESET', 'EPIPE', 'ENOTCONN'].includes(
(err as NodeJS.ErrnoException).code as string,
)
) {
sessionLogger.info(
{ code: (err as NodeJS.ErrnoException).code },
'session ended (transport error)',
);
} else {
sessionLogger.warn({ err }, 'unexpected error reading frame; dropping connection');
}
socket.destroy();
return;
}
if (!frame.crcValid) {
sessionLogger.warn(
{
expected_crc: `0x${frame.expectedCrc.toString(16).padStart(4, '0')}`,
computed_crc: `0x${frame.computedCrc.toString(16).padStart(4, '0')}`,
frame_length: frame.payload.length,
},
'CRC mismatch; not ACKing (device will retransmit)',
);
ctx.metrics.inc('teltonika_frames_total', {
codec: `0x${frame.codecId.toString(16)}`,
result: 'crc_fail',
});
// Do NOT ACK — connection stays open for device retransmit
continue;
}
const handler = codecRegistry.get(frame.codecId);
if (handler === undefined) {
sessionLogger.warn(
{
codec_id: `0x${frame.codecId.toString(16).padStart(2, '0')}`,
header: frame.payload.subarray(0, 16).toString('hex'),
},
'unknown codec; dropping connection',
);
ctx.metrics.inc('teltonika_unknown_codec_total', {
codec_id: `0x${frame.codecId.toString(16).padStart(2, '0')}`,
});
socket.destroy();
return;
}
// Resolve the codec label from the numeric ID so the publisher can
// include it as a top-level Redis Stream field. Fall back to the hex
// string if somehow an unregistered ID slipped past the registry guard
// (defensive — should not happen given the unknown-codec drop above).
const codecLabel = CODEC_ID_TO_LABEL.get(frame.codecId) ?? ('8' as CodecLabel);
let result: { recordCount: number };
try {
result = await handler.handle(frame.payload, {
imei,
// Wrap AdapterContext.publish(position, codec) into the codec-
// handler-facing (position) => Promise<void> shape. The codec
// parsers are unaware of the label; it is captured here at dispatch.
publish: (position) => ctx.publish(position, codecLabel),
logger: sessionLogger,
});
} catch (handlerErr) {
sessionLogger.warn(
{ err: handlerErr, codec_id: `0x${frame.codecId.toString(16).padStart(2, '0')}` },
'codec handler threw; dropping connection',
);
socket.destroy();
return;
}
ctx.metrics.inc('teltonika_frames_total', {
codec: `0x${frame.codecId.toString(16).padStart(2, '0')}`,
result: 'ok',
});
// teltonika_frames_total{result="ok"} and teltonika_records_published_total
// now carry this signal in Prometheus; keep the log at debug to avoid noise.
sessionLogger.debug(
{ codec: codecLabel, records: result.recordCount },
'frame ingested',
);
// ACK: 4-byte big-endian record count
const ack = Buffer.alloc(4);
ack.writeUInt32BE(result.recordCount, 0);
socket.write(ack);
}
},
};
}