Implement Phase 1 tasks 1.1-1.4 (scaffold + core shell + Teltonika framing)
- Project scaffold (Node 22 + TS 5 + pnpm + vitest + ESLint flat config) - Core shell: TCP server, session loop, adapter registry, types - Configuration (zod-validated env) and pino logger - Teltonika adapter: IMEI handshake, frame envelope, CRC-16/IBM, codec dispatch registry, DeviceAuthority seam (AllowAllAuthority default) Codec data parsers (1.5-1.7), Redis publisher (1.8), and downstream tasks remain. 36 tests covering CRC, framing, handshake, device authority, config, and core server. typecheck/lint/test/build all clean.
This commit is contained in:
@@ -0,0 +1,53 @@
|
||||
import type { Logger } from 'pino';
|
||||
import type { Position } from '../../../core/types.js';
|
||||
|
||||
/**
|
||||
* Context passed to codec data handlers. Narrow contract: parsers receive
|
||||
* only what they need (IMEI for device_id, publish for emitting records,
|
||||
* logger for structured output).
|
||||
*
|
||||
* Phase 2 will add a `respond(bytes: Buffer) => void` to this ctx for command
|
||||
* codecs that write back to the socket. Data codecs (8, 8E, 16) never write.
|
||||
*/
|
||||
export type CodecHandlerContext = {
|
||||
readonly imei: string;
|
||||
readonly publish: (position: Position) => Promise<void>;
|
||||
readonly logger: Logger;
|
||||
};
|
||||
|
||||
/**
|
||||
* A codec data handler registered in the flat dispatch registry.
|
||||
* Each handler owns one codec ID and is responsible for:
|
||||
* - Parsing N1/N2 count validation (within the body it receives)
|
||||
* - Producing Position records via ctx.publish
|
||||
* - Returning the number of records accepted (used for the TCP ACK)
|
||||
*/
|
||||
export interface CodecDataHandler {
|
||||
readonly codec_id: number;
|
||||
handle(
|
||||
body: Buffer, // Full body: CodecID (1B) + N1 (1B) + records + N2 (1B)
|
||||
ctx: CodecHandlerContext,
|
||||
): Promise<{ recordCount: number }>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flat registry mapping codec IDs to their handlers.
|
||||
* Phase 1 registers 0x08, 0x8E, 0x10. Phase 2 will register 0x0C, 0x0E
|
||||
* alongside without modifying Phase 1 paths.
|
||||
*/
|
||||
export class CodecRegistry {
|
||||
private readonly handlers = new Map<number, CodecDataHandler>();
|
||||
|
||||
register(handler: CodecDataHandler): void {
|
||||
if (this.handlers.has(handler.codec_id)) {
|
||||
throw new Error(
|
||||
`Codec 0x${handler.codec_id.toString(16).toUpperCase()} is already registered.`,
|
||||
);
|
||||
}
|
||||
this.handlers.set(handler.codec_id, handler);
|
||||
}
|
||||
|
||||
get(codecId: number): CodecDataHandler | undefined {
|
||||
return this.handlers.get(codecId);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
/**
|
||||
* CRC-16/IBM (also known as CRC-16/ARC).
|
||||
* Polynomial: 0xA001 (reflected form of 0x8005).
|
||||
* Initial value: 0x0000. No final XOR.
|
||||
*
|
||||
* Teltonika uses this to protect the AVL data body from CodecID through N2
|
||||
* inclusive. The 4-byte CRC field in the frame carries the result in the lower
|
||||
* 2 bytes; upper 2 bytes are always zero.
|
||||
*
|
||||
* Implementation uses a precomputed 256-entry lookup table for performance —
|
||||
* protocol parsing is on the hot path and frame bodies can be up to ~1280 bytes.
|
||||
*/
|
||||
|
||||
// Build lookup table at module load time (one-off cost, not per-frame)
|
||||
const CRC_TABLE: Uint16Array = buildTable();
|
||||
|
||||
function buildTable(): Uint16Array {
|
||||
const table = new Uint16Array(256);
|
||||
for (let i = 0; i < 256; i++) {
|
||||
let crc = i;
|
||||
for (let j = 0; j < 8; j++) {
|
||||
if ((crc & 0x0001) !== 0) {
|
||||
crc = (crc >>> 1) ^ 0xa001;
|
||||
} else {
|
||||
crc = crc >>> 1;
|
||||
}
|
||||
}
|
||||
table[i] = crc;
|
||||
}
|
||||
return table;
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes CRC-16/IBM over the given buffer.
|
||||
* Returns a 16-bit unsigned integer in the range [0x0000, 0xFFFF].
|
||||
*/
|
||||
export function crc16Ibm(buf: Buffer): number {
|
||||
let crc = 0x0000;
|
||||
for (let i = 0; i < buf.length; i++) {
|
||||
const byte = buf[i] ?? 0;
|
||||
const tableIndex = (crc ^ byte) & 0xff;
|
||||
crc = (crc >>> 8) ^ (CRC_TABLE[tableIndex] ?? 0);
|
||||
}
|
||||
return crc & 0xffff;
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
/**
|
||||
* Determines whether a connecting device is a recognized member of the fleet.
|
||||
*
|
||||
* Phase 1 default: AllowAllAuthority — every IMEI returns 'known'. This keeps
|
||||
* all current devices working while making the label available for metrics.
|
||||
*
|
||||
* Phase 1.13 will add RedisAllowListAuthority, which reads a Directus-published
|
||||
* allow-list from Redis and returns 'unknown' for devices not in the list.
|
||||
*
|
||||
* The STRICT_DEVICE_AUTH flag causes 'unknown' devices to be rejected (0x00
|
||||
* handshake response + socket destroy). It is off by default.
|
||||
*/
|
||||
export interface DeviceAuthority {
|
||||
check(imei: string): Promise<'known' | 'unknown'>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Permissive default authority — always returns 'known'.
|
||||
* Used until a real authority is configured (task 1.13).
|
||||
*/
|
||||
export class AllowAllAuthority implements DeviceAuthority {
|
||||
async check(_imei: string): Promise<'known'> {
|
||||
return 'known';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,209 @@
|
||||
import type * as net from 'node:net';
|
||||
import { crc16Ibm } from './crc.js';
|
||||
|
||||
/**
|
||||
* Maximum permitted DataFieldLength value. Covers the 1280-byte max AVL packet
|
||||
* for most devices, with headroom. Frames above this threshold are structurally
|
||||
* malformed — we drop the connection rather than allocating unbounded memory.
|
||||
*/
|
||||
export const MAX_AVL_PACKET_SIZE = 1300;
|
||||
|
||||
/**
|
||||
* Reason a frame read attempt failed in a way that should close the connection.
|
||||
*/
|
||||
export type FrameDropReason =
|
||||
| 'invalid_preamble'
|
||||
| 'oversized_frame'
|
||||
| 'n1_n2_mismatch'
|
||||
| 'socket_closed';
|
||||
|
||||
export class FrameDropError extends Error {
|
||||
constructor(
|
||||
public readonly reason: FrameDropReason,
|
||||
message: string,
|
||||
) {
|
||||
super(message);
|
||||
this.name = 'FrameDropError';
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parsed AVL frame envelope. CRC validity is surfaced here so the session loop
|
||||
* can decide whether to ACK without the frame handler needing to re-compute.
|
||||
*/
|
||||
export type Frame = {
|
||||
readonly codecId: number;
|
||||
readonly payload: Buffer; // full body: CodecID + N1 + records + N2
|
||||
readonly crcValid: boolean;
|
||||
readonly expectedCrc: number;
|
||||
readonly computedCrc: number;
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// BufferedReader
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type PendingRead = {
|
||||
needed: number;
|
||||
resolve: (buf: Buffer) => void;
|
||||
reject: (err: Error) => void;
|
||||
};
|
||||
|
||||
/**
|
||||
* Accumulates bytes arriving across multiple `data` events and satisfies
|
||||
* exact-byte-count read requests. This handles the reality that TCP is a stream
|
||||
* — a 100-byte read may arrive as five 20-byte chunks.
|
||||
*
|
||||
* Design: a single pending read at a time. The session loop is sequential
|
||||
* (each `await reader.readExact(n)` completes before the next starts) so
|
||||
* concurrent reads are not needed.
|
||||
*/
|
||||
export class BufferedReader {
|
||||
private readonly chunks: Buffer[] = [];
|
||||
private buffered = 0;
|
||||
private pending: PendingRead | null = null;
|
||||
private closed = false;
|
||||
private closeError: Error | null = null;
|
||||
|
||||
constructor(private readonly socket: net.Socket) {
|
||||
socket.on('data', (chunk: Buffer) => this.onData(chunk));
|
||||
socket.on('close', () => this.onClose());
|
||||
socket.on('error', (err) => this.onSocketError(err));
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads exactly `n` bytes. Returns a Promise that resolves once the bytes are
|
||||
* available. Rejects if the socket closes before the read completes.
|
||||
*/
|
||||
readExact(n: number): Promise<Buffer> {
|
||||
// Fast path: data already buffered
|
||||
if (this.buffered >= n) {
|
||||
return Promise.resolve(this.consume(n));
|
||||
}
|
||||
|
||||
if (this.closed) {
|
||||
return Promise.reject(
|
||||
this.closeError ?? new FrameDropError('socket_closed', 'Socket closed before read completed'),
|
||||
);
|
||||
}
|
||||
|
||||
return new Promise<Buffer>((resolve, reject) => {
|
||||
this.pending = { needed: n, resolve, reject };
|
||||
});
|
||||
}
|
||||
|
||||
private onData(chunk: Buffer): void {
|
||||
this.chunks.push(chunk);
|
||||
this.buffered += chunk.length;
|
||||
this.tryFlush();
|
||||
}
|
||||
|
||||
private tryFlush(): void {
|
||||
if (this.pending !== null && this.buffered >= this.pending.needed) {
|
||||
const { needed, resolve } = this.pending;
|
||||
this.pending = null;
|
||||
resolve(this.consume(needed));
|
||||
}
|
||||
}
|
||||
|
||||
private consume(n: number): Buffer {
|
||||
// Concatenate chunks into a single contiguous buffer, take n bytes, put any
|
||||
// remainder back as the first chunk.
|
||||
const combined = Buffer.concat(this.chunks);
|
||||
this.chunks.length = 0;
|
||||
const taken = combined.subarray(0, n);
|
||||
const rest = combined.subarray(n);
|
||||
if (rest.length > 0) {
|
||||
this.chunks.push(rest);
|
||||
}
|
||||
this.buffered = rest.length;
|
||||
return Buffer.from(taken); // copy so callers own the memory
|
||||
}
|
||||
|
||||
private onClose(): void {
|
||||
this.closed = true;
|
||||
if (this.pending !== null) {
|
||||
const { reject } = this.pending;
|
||||
this.pending = null;
|
||||
reject(new FrameDropError('socket_closed', 'Socket closed before read completed'));
|
||||
}
|
||||
}
|
||||
|
||||
private onSocketError(err: Error): void {
|
||||
this.closed = true;
|
||||
this.closeError = err;
|
||||
if (this.pending !== null) {
|
||||
const { reject } = this.pending;
|
||||
this.pending = null;
|
||||
reject(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Frame envelope reader
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Reads the next complete AVL frame envelope from the socket via `reader`.
|
||||
* Validates preamble, length sanity, and CRC. Does NOT dispatch on codec ID —
|
||||
* that is the session loop's responsibility.
|
||||
*
|
||||
* Throws `FrameDropError` when the connection must be closed (invalid preamble,
|
||||
* oversized frame, N1≠N2). On CRC mismatch the frame is returned with
|
||||
* `crcValid: false` — the caller should not ACK but should keep the connection.
|
||||
*/
|
||||
export async function readNextFrame(reader: BufferedReader): Promise<Frame> {
|
||||
// 1. Preamble: 4 zero bytes
|
||||
const preamble = await reader.readExact(4);
|
||||
if (preamble.readUInt32BE(0) !== 0) {
|
||||
throw new FrameDropError(
|
||||
'invalid_preamble',
|
||||
`Expected preamble 0x00000000, got 0x${preamble.toString('hex')}`,
|
||||
);
|
||||
}
|
||||
|
||||
// 2. DataFieldLength: number of bytes from CodecID through N2
|
||||
const lengthBuf = await reader.readExact(4);
|
||||
const dataFieldLength = lengthBuf.readUInt32BE(0);
|
||||
|
||||
if (dataFieldLength < 2 || dataFieldLength > MAX_AVL_PACKET_SIZE) {
|
||||
throw new FrameDropError(
|
||||
'oversized_frame',
|
||||
`DataFieldLength ${dataFieldLength} out of bounds [2, ${MAX_AVL_PACKET_SIZE}]`,
|
||||
);
|
||||
}
|
||||
|
||||
// 3. Body: CodecID (1B) + N1 (1B) + AVL records + N2 (1B)
|
||||
const body = await reader.readExact(dataFieldLength);
|
||||
|
||||
// 4. CRC field: 4 bytes, value in lower 2 bytes (upper 2 are always 0)
|
||||
const crcField = await reader.readExact(4);
|
||||
const expectedCrc = crcField.readUInt16BE(2); // bytes 2–3 carry the 16-bit value
|
||||
const computedCrc = crc16Ibm(body);
|
||||
|
||||
// 5. Validate N1 === N2 (record count repeated for integrity)
|
||||
// Body layout: [CodecID 1B][N1 1B][...records...][N2 1B]
|
||||
// N2 is the last byte of body.
|
||||
if (body.length < 2) {
|
||||
throw new FrameDropError('oversized_frame', 'Body too short to contain CodecID and N1');
|
||||
}
|
||||
const n1 = body[1] ?? 0;
|
||||
const n2 = body[body.length - 1] ?? 0;
|
||||
if (n1 !== n2) {
|
||||
throw new FrameDropError(
|
||||
'n1_n2_mismatch',
|
||||
`N1 (${n1}) !== N2 (${n2}): structural mismatch, dropping connection`,
|
||||
);
|
||||
}
|
||||
|
||||
const codecId = body[0] ?? 0;
|
||||
|
||||
return {
|
||||
codecId,
|
||||
payload: body,
|
||||
crcValid: expectedCrc === computedCrc,
|
||||
expectedCrc,
|
||||
computedCrc,
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,62 @@
|
||||
import type * as net from 'node:net';
|
||||
import { BufferedReader } from './frame.js';
|
||||
|
||||
/**
|
||||
* IMEI validation pattern. Teltonika devices send 15-digit IMEIs; we allow
|
||||
* 14–16 to cover edge cases (14-digit test units, 16-digit future variants).
|
||||
*/
|
||||
const IMEI_PATTERN = /^\d{14,16}$/;
|
||||
|
||||
/**
|
||||
* Maximum IMEI byte length we accept. Teltonika spec says 15; we allow up to
|
||||
* 32 as headroom before rejecting as malformed.
|
||||
*/
|
||||
const MAX_IMEI_LENGTH = 32;
|
||||
|
||||
export class HandshakeError extends Error {
|
||||
constructor(
|
||||
message: string,
|
||||
public readonly rawBytes?: string,
|
||||
) {
|
||||
super(message);
|
||||
this.name = 'HandshakeError';
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads and validates the Teltonika IMEI handshake from the socket.
|
||||
* Wire format: [length 2B big-endian][IMEI ASCII bytes (length B)]
|
||||
*
|
||||
* Returns the IMEI string on success. Does NOT write the accept/reject byte —
|
||||
* that decision belongs to the session loop after consulting DeviceAuthority.
|
||||
*
|
||||
* Throws HandshakeError for any malformed input without writing to the socket.
|
||||
*/
|
||||
export async function readImeiHandshake(socket: net.Socket): Promise<string> {
|
||||
const reader = new BufferedReader(socket);
|
||||
|
||||
// 1. Read 2-byte length field
|
||||
const lengthBuf = await reader.readExact(2);
|
||||
const imeiLength = lengthBuf.readUInt16BE(0);
|
||||
|
||||
if (imeiLength === 0 || imeiLength > MAX_IMEI_LENGTH) {
|
||||
throw new HandshakeError(
|
||||
`IMEI length ${imeiLength} is outside valid range [1, ${MAX_IMEI_LENGTH}]`,
|
||||
lengthBuf.toString('hex'),
|
||||
);
|
||||
}
|
||||
|
||||
// 2. Read IMEI bytes
|
||||
const imeiBuf = await reader.readExact(imeiLength);
|
||||
const imei = imeiBuf.toString('ascii');
|
||||
|
||||
// 3. Validate: must be all digits, 14–16 chars
|
||||
if (!IMEI_PATTERN.test(imei)) {
|
||||
throw new HandshakeError(
|
||||
`IMEI "${imei}" does not match expected pattern (14–16 digits)`,
|
||||
imeiBuf.toString('hex'),
|
||||
);
|
||||
}
|
||||
|
||||
return imei;
|
||||
}
|
||||
@@ -0,0 +1,175 @@
|
||||
import type * as net from 'node:net';
|
||||
import type { Adapter, AdapterContext } from '../../core/types.js';
|
||||
import type { DeviceAuthority } from './device-authority.js';
|
||||
import { AllowAllAuthority } from './device-authority.js';
|
||||
import { readImeiHandshake, HandshakeError } from './handshake.js';
|
||||
import { BufferedReader, readNextFrame, FrameDropError } from './frame.js';
|
||||
import { CodecRegistry } from './codec/registry.js';
|
||||
|
||||
export type TeltonikaAdapterOptions = {
|
||||
readonly port: number;
|
||||
readonly deviceAuthority?: DeviceAuthority;
|
||||
readonly strictDeviceAuth?: boolean;
|
||||
readonly codecRegistry?: CodecRegistry;
|
||||
};
|
||||
|
||||
/**
|
||||
* Creates and returns the Teltonika adapter. The adapter:
|
||||
* 1. Performs the IMEI handshake (reads; consults DeviceAuthority; writes 0x01/0x00)
|
||||
* 2. Runs the AVL frame read loop (preamble → length → body → CRC → dispatch)
|
||||
* 3. ACKs accepted frames with the 4-byte big-endian record count
|
||||
*
|
||||
* Codec handlers are registered externally and passed in via `codecRegistry`.
|
||||
* Tasks 1.5–1.7 populate the registry; this task ships it empty (any frame
|
||||
* triggers the unknown-codec path and drops the connection, per spec).
|
||||
*/
|
||||
export function createTeltonikaAdapter(options: TeltonikaAdapterOptions): Adapter {
|
||||
const authority: DeviceAuthority = options.deviceAuthority ?? new AllowAllAuthority();
|
||||
const strictDeviceAuth = options.strictDeviceAuth ?? false;
|
||||
const codecRegistry = options.codecRegistry ?? new CodecRegistry();
|
||||
|
||||
return {
|
||||
name: 'teltonika',
|
||||
ports: [options.port],
|
||||
|
||||
async handleSession(socket: net.Socket, ctx: AdapterContext): Promise<void> {
|
||||
// ------------------------------------------------------------------ //
|
||||
// Phase 1: IMEI handshake
|
||||
// ------------------------------------------------------------------ //
|
||||
let imei: string;
|
||||
|
||||
try {
|
||||
imei = await readImeiHandshake(socket);
|
||||
} catch (err) {
|
||||
if (err instanceof HandshakeError) {
|
||||
ctx.logger.warn(
|
||||
{ err, raw_bytes: err.rawBytes },
|
||||
'IMEI handshake failed; destroying socket',
|
||||
);
|
||||
} else {
|
||||
ctx.logger.warn({ err }, 'unexpected error during IMEI handshake');
|
||||
}
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
|
||||
const sessionLogger = ctx.logger.child({ imei });
|
||||
|
||||
// Consult DeviceAuthority — errors default to 'unknown' (safe, observable)
|
||||
let knownLabel: 'known' | 'unknown';
|
||||
try {
|
||||
knownLabel = await authority.check(imei);
|
||||
} catch (authorityErr) {
|
||||
sessionLogger.warn(
|
||||
{ err: authorityErr },
|
||||
'DeviceAuthority.check failed; defaulting to unknown',
|
||||
);
|
||||
knownLabel = 'unknown';
|
||||
}
|
||||
|
||||
ctx.metrics.inc('teltonika_handshake_total', {
|
||||
result: 'accepted',
|
||||
known: knownLabel,
|
||||
});
|
||||
|
||||
if (knownLabel === 'unknown' && strictDeviceAuth) {
|
||||
// Reject path (opt-in via STRICT_DEVICE_AUTH)
|
||||
socket.write(Buffer.from([0x00]));
|
||||
sessionLogger.warn({ imei }, 'rejected unknown device under STRICT_DEVICE_AUTH');
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
|
||||
// Accept the device
|
||||
socket.write(Buffer.from([0x01]));
|
||||
sessionLogger.debug({ known: knownLabel }, 'IMEI handshake accepted');
|
||||
|
||||
// ------------------------------------------------------------------ //
|
||||
// Phase 2: AVL frame read loop
|
||||
// ------------------------------------------------------------------ //
|
||||
const reader = new BufferedReader(socket);
|
||||
|
||||
while (!socket.destroyed) {
|
||||
let frame;
|
||||
try {
|
||||
frame = await readNextFrame(reader);
|
||||
} catch (err) {
|
||||
if (err instanceof FrameDropError) {
|
||||
if (err.reason === 'socket_closed') {
|
||||
// Normal disconnect — no warning needed
|
||||
sessionLogger.debug('socket closed during frame read');
|
||||
} else {
|
||||
sessionLogger.warn(
|
||||
{ reason: err.reason, err },
|
||||
'malformed frame; dropping connection',
|
||||
);
|
||||
}
|
||||
} else {
|
||||
sessionLogger.warn({ err }, 'unexpected error reading frame; dropping connection');
|
||||
}
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!frame.crcValid) {
|
||||
sessionLogger.warn(
|
||||
{
|
||||
expected_crc: `0x${frame.expectedCrc.toString(16).padStart(4, '0')}`,
|
||||
computed_crc: `0x${frame.computedCrc.toString(16).padStart(4, '0')}`,
|
||||
frame_length: frame.payload.length,
|
||||
},
|
||||
'CRC mismatch; not ACKing (device will retransmit)',
|
||||
);
|
||||
ctx.metrics.inc('teltonika_frames_total', {
|
||||
codec: `0x${frame.codecId.toString(16)}`,
|
||||
result: 'crc_fail',
|
||||
});
|
||||
// Do NOT ACK — connection stays open for device retransmit
|
||||
continue;
|
||||
}
|
||||
|
||||
const handler = codecRegistry.get(frame.codecId);
|
||||
if (handler === undefined) {
|
||||
sessionLogger.warn(
|
||||
{
|
||||
codec_id: `0x${frame.codecId.toString(16).padStart(2, '0')}`,
|
||||
header: frame.payload.subarray(0, 16).toString('hex'),
|
||||
},
|
||||
'unknown codec; dropping connection',
|
||||
);
|
||||
ctx.metrics.inc('teltonika_unknown_codec_total', {
|
||||
codec_id: `0x${frame.codecId.toString(16).padStart(2, '0')}`,
|
||||
});
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
|
||||
let result: { recordCount: number };
|
||||
try {
|
||||
result = await handler.handle(frame.payload, {
|
||||
imei,
|
||||
publish: ctx.publish,
|
||||
logger: sessionLogger,
|
||||
});
|
||||
} catch (handlerErr) {
|
||||
sessionLogger.warn(
|
||||
{ err: handlerErr, codec_id: `0x${frame.codecId.toString(16).padStart(2, '0')}` },
|
||||
'codec handler threw; dropping connection',
|
||||
);
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
|
||||
ctx.metrics.inc('teltonika_frames_total', {
|
||||
codec: `0x${frame.codecId.toString(16).padStart(2, '0')}`,
|
||||
result: 'ok',
|
||||
});
|
||||
|
||||
// ACK: 4-byte big-endian record count
|
||||
const ack = Buffer.alloc(4);
|
||||
ack.writeUInt32BE(result.recordCount, 0);
|
||||
socket.write(ack);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { z } from 'zod';
|
||||
|
||||
const ConfigSchema = z.object({
|
||||
NODE_ENV: z.enum(['development', 'test', 'production']).default('development'),
|
||||
INSTANCE_ID: z.string().min(1).default(() => `local-${randomUUID().slice(0, 8)}`),
|
||||
LOG_LEVEL: z
|
||||
.enum(['fatal', 'error', 'warn', 'info', 'debug', 'trace'])
|
||||
.default('info'),
|
||||
|
||||
// Vendor port bindings — extend as adapters are added
|
||||
TELTONIKA_PORT: z.coerce.number().int().min(1).max(65535).default(5027),
|
||||
|
||||
// Redis — required in all environments; no silent default
|
||||
REDIS_URL: z.string().url(),
|
||||
REDIS_TELEMETRY_STREAM: z.string().min(1).default('telemetry:teltonika'),
|
||||
REDIS_STREAM_MAXLEN: z.coerce.number().int().min(0).default(1_000_000),
|
||||
|
||||
// Observability
|
||||
METRICS_PORT: z.coerce.number().int().min(0).max(65535).default(9090),
|
||||
|
||||
// Device authority — off by default; opt-in for strict reject-on-unknown
|
||||
STRICT_DEVICE_AUTH: z
|
||||
.string()
|
||||
.transform((v) => v === 'true' || v === '1')
|
||||
.default('false'),
|
||||
});
|
||||
|
||||
export type Config = z.infer<typeof ConfigSchema>;
|
||||
|
||||
/**
|
||||
* Validates process.env at startup and returns a typed Config.
|
||||
* Throws with a human-readable error listing every missing/invalid key if
|
||||
* validation fails — the intent is loud, fast failure rather than running
|
||||
* with bad configuration.
|
||||
*/
|
||||
export function loadConfig(env: Record<string, string | undefined> = process.env): Config {
|
||||
const result = ConfigSchema.safeParse(env);
|
||||
|
||||
if (!result.success) {
|
||||
const issues = result.error.issues
|
||||
.map((issue) => ` ${issue.path.join('.')}: ${issue.message}`)
|
||||
.join('\n');
|
||||
throw new Error(`Configuration error — invalid or missing environment variables:\n${issues}`);
|
||||
}
|
||||
|
||||
return result.data;
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
import type { Logger } from 'pino';
|
||||
import type { Position } from './types.js';
|
||||
|
||||
/**
|
||||
* Stub publish function — logs the position at debug level.
|
||||
* Real implementation (Redis Streams XADD) lands in task 1.8.
|
||||
* The signature is already the final shape so adapter types stabilize now.
|
||||
*/
|
||||
export function makePublisher(logger: Logger): (position: Position) => Promise<void> {
|
||||
return async (position: Position): Promise<void> => {
|
||||
logger.debug(
|
||||
{
|
||||
device_id: position.device_id,
|
||||
timestamp: position.timestamp.toISOString(),
|
||||
latitude: position.latitude,
|
||||
longitude: position.longitude,
|
||||
speed: position.speed,
|
||||
},
|
||||
'publish position (stub)',
|
||||
);
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
import type { Adapter } from './types.js';
|
||||
|
||||
/**
|
||||
* Maps TCP port numbers to their owning adapter. Populated at startup by main.ts
|
||||
* before the server begins accepting connections.
|
||||
*/
|
||||
export class AdapterRegistry {
|
||||
private readonly portMap = new Map<number, Adapter>();
|
||||
|
||||
register(adapter: Adapter): void {
|
||||
for (const port of adapter.ports) {
|
||||
if (this.portMap.has(port)) {
|
||||
throw new Error(
|
||||
`Port ${port} is already registered by adapter "${this.portMap.get(port)!.name}". ` +
|
||||
`Attempted to register adapter "${adapter.name}".`,
|
||||
);
|
||||
}
|
||||
this.portMap.set(port, adapter);
|
||||
}
|
||||
}
|
||||
|
||||
get(port: number): Adapter | undefined {
|
||||
return this.portMap.get(port);
|
||||
}
|
||||
|
||||
ports(): ReadonlyArray<number> {
|
||||
return Array.from(this.portMap.keys());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
import * as net from 'node:net';
|
||||
import type { Adapter, AdapterContext } from './types.js';
|
||||
import { runSession } from './session.js';
|
||||
|
||||
/**
|
||||
* Starts a TCP server for a single adapter. Returns the underlying net.Server
|
||||
* so callers can attach error listeners and call server.close() on shutdown.
|
||||
*/
|
||||
export function startServer(port: number, adapter: Adapter, ctx: AdapterContext): net.Server {
|
||||
const server = net.createServer((socket) => {
|
||||
// runSession is async but we must not await it here — that would serialize
|
||||
// connections. Fire-and-forget with an explicit rejection guard.
|
||||
runSession(socket, adapter, ctx).catch((err) => {
|
||||
ctx.logger.warn({ err, adapter: adapter.name }, 'unhandled error escaping runSession');
|
||||
});
|
||||
});
|
||||
|
||||
server.on('error', (err) => {
|
||||
ctx.logger.error({ err, port, adapter: adapter.name }, 'TCP server error');
|
||||
});
|
||||
|
||||
server.on('close', () => {
|
||||
ctx.logger.info({ port, adapter: adapter.name }, 'TCP server closed');
|
||||
});
|
||||
|
||||
server.listen(port, () => {
|
||||
ctx.logger.info({ port, adapter: adapter.name }, 'TCP server listening');
|
||||
});
|
||||
|
||||
return server;
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
import type * as net from 'node:net';
|
||||
import type { Adapter, AdapterContext } from './types.js';
|
||||
|
||||
/**
|
||||
* Wraps a single adapter session with standard socket configuration, error
|
||||
* handling, and lifecycle logging. Deliberately knows nothing about IMEI,
|
||||
* framing, or codecs — those are entirely the adapter's business.
|
||||
*/
|
||||
export async function runSession(
|
||||
socket: net.Socket,
|
||||
adapter: Adapter,
|
||||
ctx: AdapterContext,
|
||||
): Promise<void> {
|
||||
// Disable Nagle algorithm — we send small ACK bytes where latency matters
|
||||
// more than throughput efficiency.
|
||||
socket.setNoDelay(true);
|
||||
|
||||
// TCP keepalive: detect dead connections (idle NAT timeouts, crashed devices)
|
||||
// after 60s. Safe because devices already retransmit on reconnect.
|
||||
socket.setKeepAlive(true, 60_000);
|
||||
|
||||
const remoteAddress = `${socket.remoteAddress ?? 'unknown'}:${socket.remotePort ?? '?'}`;
|
||||
const sessionLogger = ctx.logger.child({ remote_address: remoteAddress });
|
||||
|
||||
sessionLogger.debug({ adapter: adapter.name }, 'session opened');
|
||||
|
||||
socket.on('error', (err) => {
|
||||
sessionLogger.debug({ err }, 'socket error');
|
||||
});
|
||||
|
||||
socket.on('end', () => {
|
||||
sessionLogger.debug('socket end');
|
||||
});
|
||||
|
||||
socket.on('close', (hadError) => {
|
||||
sessionLogger.debug({ had_error: hadError }, 'socket closed');
|
||||
});
|
||||
|
||||
try {
|
||||
await adapter.handleSession(socket, { ...ctx, logger: sessionLogger });
|
||||
} catch (err) {
|
||||
sessionLogger.warn({ err }, 'session handler threw; destroying socket');
|
||||
} finally {
|
||||
if (!socket.destroyed) {
|
||||
socket.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
import type { Logger } from 'pino';
|
||||
import type * as net from 'node:net';
|
||||
|
||||
/**
|
||||
* Normalized GPS position record — the boundary contract between vendor adapters
|
||||
* and all downstream consumers. Shape is Teltonika-canonical; other adapters must
|
||||
* map to the same type.
|
||||
*/
|
||||
export type Position = {
|
||||
readonly device_id: string;
|
||||
readonly timestamp: Date;
|
||||
readonly latitude: number;
|
||||
readonly longitude: number;
|
||||
readonly altitude: number;
|
||||
readonly angle: number; // heading 0–360°
|
||||
readonly speed: number; // km/h; 0 may mean "GPS invalid" — preserve verbatim
|
||||
readonly satellites: number;
|
||||
readonly priority: 0 | 1 | 2; // 0=Low, 1=High, 2=Panic
|
||||
readonly attributes: Readonly<Record<string, number | bigint | Buffer>>;
|
||||
};
|
||||
|
||||
/**
|
||||
* Minimal metrics surface exposed to adapters. Concrete implementation lands in
|
||||
* task 1.10; this placeholder keeps types stable through tasks 1.2–1.9.
|
||||
*/
|
||||
export type Metrics = {
|
||||
readonly inc: (name: string, labels?: Record<string, string>) => void;
|
||||
readonly observe: (name: string, value: number, labels?: Record<string, string>) => void;
|
||||
};
|
||||
|
||||
/**
|
||||
* Narrow context object passed into each adapter's session handler.
|
||||
* Adapters receive everything they need here; shell internals are not exposed.
|
||||
*/
|
||||
export type AdapterContext = {
|
||||
readonly publish: (position: Position) => Promise<void>;
|
||||
readonly logger: Logger;
|
||||
readonly metrics: Metrics;
|
||||
};
|
||||
|
||||
/**
|
||||
* Vendor adapter interface. Each adapter declares the port(s) it owns and
|
||||
* implements the session lifecycle. The core shell calls handleSession once per
|
||||
* accepted TCP connection and never touches vendor-specific framing.
|
||||
*/
|
||||
export type Adapter = {
|
||||
readonly name: string;
|
||||
readonly ports: readonly number[];
|
||||
handleSession(socket: net.Socket, ctx: AdapterContext): Promise<void>;
|
||||
};
|
||||
+73
@@ -0,0 +1,73 @@
|
||||
import { loadConfig } from './config/load.js';
|
||||
import { createLogger } from './observability/logger.js';
|
||||
import { makePublisher } from './core/publish.js';
|
||||
import { startServer } from './core/server.js';
|
||||
import { createTeltonikaAdapter } from './adapters/teltonika/index.js';
|
||||
import { AllowAllAuthority } from './adapters/teltonika/device-authority.js';
|
||||
import { CodecRegistry } from './adapters/teltonika/codec/registry.js';
|
||||
import type { Metrics } from './core/types.js';
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Startup: validate config (fail fast on bad env), build logger, boot server
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
let config;
|
||||
try {
|
||||
config = loadConfig();
|
||||
} catch (err) {
|
||||
// Config validation failures print a human-readable message and exit 1.
|
||||
// Logger is not available yet — process.stderr is the only output channel.
|
||||
process.stderr.write(`${err instanceof Error ? err.message : String(err)}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const logger = createLogger({
|
||||
level: config.LOG_LEVEL,
|
||||
nodeEnv: config.NODE_ENV,
|
||||
instanceId: config.INSTANCE_ID,
|
||||
});
|
||||
|
||||
logger.info('tcp-ingestion starting');
|
||||
|
||||
// Placeholder metrics implementation — replaced in task 1.10
|
||||
const metrics: Metrics = {
|
||||
inc: (name, labels) => logger.debug({ metric: name, labels }, 'metric inc'),
|
||||
observe: (name, value, labels) => logger.debug({ metric: name, value, labels }, 'metric observe'),
|
||||
};
|
||||
|
||||
const publisher = makePublisher(logger);
|
||||
|
||||
// Codec registry — empty until tasks 1.5–1.7 register handlers
|
||||
const codecRegistry = new CodecRegistry();
|
||||
|
||||
const teltonikaAdapter = createTeltonikaAdapter({
|
||||
port: config.TELTONIKA_PORT,
|
||||
deviceAuthority: new AllowAllAuthority(),
|
||||
strictDeviceAuth: config.STRICT_DEVICE_AUTH,
|
||||
codecRegistry,
|
||||
});
|
||||
|
||||
const ctx = {
|
||||
publish: publisher,
|
||||
logger,
|
||||
metrics,
|
||||
};
|
||||
|
||||
const server = startServer(config.TELTONIKA_PORT, teltonikaAdapter, ctx);
|
||||
|
||||
// Graceful shutdown
|
||||
function shutdown(signal: string): void {
|
||||
logger.info({ signal }, 'shutdown signal received; closing server');
|
||||
server.close(() => {
|
||||
logger.info('server closed; exiting');
|
||||
process.exit(0);
|
||||
});
|
||||
// Force exit after 10s if connections are still open
|
||||
setTimeout(() => {
|
||||
logger.warn('forced exit after timeout');
|
||||
process.exit(1);
|
||||
}, 10_000).unref();
|
||||
}
|
||||
|
||||
process.on('SIGTERM', () => shutdown('SIGTERM'));
|
||||
process.on('SIGINT', () => shutdown('SIGINT'));
|
||||
@@ -0,0 +1,42 @@
|
||||
import pino from 'pino';
|
||||
import type { Logger } from 'pino';
|
||||
|
||||
export type { Logger };
|
||||
|
||||
/**
|
||||
* Builds the root pino logger. Called once at startup with the config values.
|
||||
*
|
||||
* In development, pino-pretty is used for human-readable output (lazy import
|
||||
* so it is never bundled in production paths). In test/production, raw JSON is
|
||||
* emitted — fast and parseable by log aggregators.
|
||||
*/
|
||||
export function createLogger(options: {
|
||||
level: string;
|
||||
nodeEnv: string;
|
||||
instanceId: string;
|
||||
}): Logger {
|
||||
const { level, nodeEnv, instanceId } = options;
|
||||
|
||||
const base = {
|
||||
service: 'tcp-ingestion',
|
||||
instance_id: instanceId,
|
||||
};
|
||||
|
||||
if (nodeEnv === 'development') {
|
||||
return pino({
|
||||
level,
|
||||
base,
|
||||
transport: {
|
||||
target: 'pino-pretty',
|
||||
options: {
|
||||
colorize: true,
|
||||
translateTime: 'SYS:standard',
|
||||
ignore: 'pid,hostname',
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// Production and test: plain JSON — fast, no extra deps
|
||||
return pino({ level, base });
|
||||
}
|
||||
Reference in New Issue
Block a user