Files
tcp-ingestion/test/teltonika-session.test.ts
T
julian c33c7a4f6b Implement Phase 1 task 1.8 (Redis Streams publisher + main wiring)
- Bounded in-memory queue (default 10000); overflow throws PublishOverflowError
  so the framing layer skips ACK and the device retransmits.
- Background worker drains via XADD with MAXLEN ~ approximate trimming.
- JSON serialization with sentinel encoding for bigint/Buffer/Date; correctly
  handles Buffer.prototype.toJSON firing before the replacer.
- AdapterContext.publish(position, codec) with codec-label closure at dispatch
  in adapters/teltonika/index.ts; zero changes to the three codec parsers.
- connectRedis with retry-on-startup; main.ts wires the full pipeline.
- installGracefulShutdown stubbed (full hardening in task 1.12).
- 19 new tests (17 unit + 2 Docker-conditional integration). Total 81 passing.
2026-04-30 16:39:34 +02:00

283 lines
8.4 KiB
TypeScript

import { describe, it, expect, vi, beforeEach } from 'vitest';
import { EventEmitter } from 'node:events';
import type * as net from 'node:net';
import type { Logger } from 'pino';
import type { AdapterContext, Metrics, Position } from '../src/core/types.js';
import { createTeltonikaAdapter } from '../src/adapters/teltonika/index.js';
import { CodecRegistry } from '../src/adapters/teltonika/codec/registry.js';
import type { DeviceAuthority } from '../src/adapters/teltonika/device-authority.js';
import { crc16Ibm } from '../src/adapters/teltonika/crc.js';
// ---------------------------------------------------------------------------
// Mock helpers
// ---------------------------------------------------------------------------
type MockSocket = net.Socket & {
push(buf: Buffer): void;
simulateClose(): void;
getWritten(): Buffer[];
};
function makeMockSocket(): MockSocket {
const written: Buffer[] = [];
const emitter = new EventEmitter() as MockSocket & {
destroyed: boolean;
write: ReturnType<typeof vi.fn>;
destroy: ReturnType<typeof vi.fn>;
setNoDelay: ReturnType<typeof vi.fn>;
setKeepAlive: ReturnType<typeof vi.fn>;
remoteAddress: string;
remotePort: number;
};
emitter.destroyed = false;
emitter.remoteAddress = '127.0.0.1';
emitter.remotePort = 9999;
emitter.setNoDelay = vi.fn();
emitter.setKeepAlive = vi.fn();
emitter.write = vi.fn((data: Buffer | string) => {
written.push(Buffer.isBuffer(data) ? data : Buffer.from(data));
return true;
});
emitter.destroy = vi.fn(() => {
if (!emitter.destroyed) {
emitter.destroyed = true;
emitter.emit('close', false);
}
});
emitter.push = (buf: Buffer) => emitter.emit('data', buf);
emitter.simulateClose = () => {
if (!emitter.destroyed) {
emitter.destroyed = true;
emitter.emit('close', false);
}
};
emitter.getWritten = () => written;
return emitter;
}
function makeMockContext(): AdapterContext {
const logger = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
child: vi.fn().mockReturnThis(),
} as unknown as Logger;
const metrics: Metrics = {
inc: vi.fn(),
observe: vi.fn(),
};
return {
publish: vi.fn(async (_p: Position, _codec) => {}),
logger,
metrics,
};
}
/**
* Encodes the Teltonika IMEI handshake wire format.
*/
function encodeImei(imei: string): Buffer {
const body = Buffer.from(imei, 'ascii');
const header = Buffer.alloc(2);
header.writeUInt16BE(body.length, 0);
return Buffer.concat([header, body]);
}
/**
* Builds a complete AVL frame. Body is CodecID + N1 + empty records + N2.
*/
function buildFrame(options: { codecId: number; recordCount?: number; crcOverride?: number }): Buffer {
const n = options.recordCount ?? 1;
const body = Buffer.from([options.codecId, n, n]);
const realCrc = crc16Ibm(body);
const crc = options.crcOverride !== undefined ? options.crcOverride : realCrc;
const preamble = Buffer.alloc(4, 0);
const lengthBuf = Buffer.alloc(4);
lengthBuf.writeUInt32BE(body.length, 0);
const crcBuf = Buffer.alloc(4);
crcBuf.writeUInt16BE(0, 0);
crcBuf.writeUInt16BE(crc, 2);
return Buffer.concat([preamble, lengthBuf, body, crcBuf]);
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
const IMEI = '356307042441013';
describe('Teltonika adapter — session', () => {
beforeEach(() => {
vi.clearAllMocks();
});
describe('unknown codec path', () => {
it('destroys the socket without writing an ACK when codec ID is not registered', async () => {
const socket = makeMockSocket();
const ctx = makeMockContext();
const registry = new CodecRegistry(); // empty — no codecs registered
const adapter = createTeltonikaAdapter({ port: 5027, codecRegistry: registry });
const sessionPromise = adapter.handleSession(socket, ctx);
// Push IMEI handshake — session writes 0x01 accept
socket.push(encodeImei(IMEI));
// Wait until the 0x01 accept byte is written
await vi.waitFor(
() => {
expect(socket.getWritten().length).toBeGreaterThan(0);
},
{ timeout: 2000 },
);
// Push a frame with an unregistered codec (0x99)
socket.push(buildFrame({ codecId: 0x99 }));
// Session should destroy the socket after unknown codec
await sessionPromise;
expect(socket.destroy).toHaveBeenCalled();
// The ONLY write must be the 0x01 accept byte — no 4-byte ACK
const allWritten = Buffer.concat(socket.getWritten());
expect(allWritten).toEqual(Buffer.from([0x01]));
}, 10_000);
});
describe('CRC mismatch path', () => {
it('does NOT write an ACK on CRC mismatch and keeps the socket open', async () => {
const socket = makeMockSocket();
const ctx = makeMockContext();
const registry = new CodecRegistry();
const adapter = createTeltonikaAdapter({ port: 5027, codecRegistry: registry });
const sessionPromise = adapter.handleSession(socket, ctx);
// Handshake
socket.push(encodeImei(IMEI));
// Wait for the accept byte
await vi.waitFor(
() => {
expect(socket.getWritten().length).toBeGreaterThan(0);
},
{ timeout: 2000 },
);
const writtenBeforeFrame = socket.getWritten().length;
// Push a frame with a deliberately wrong CRC
socket.push(buildFrame({ codecId: 0x08, crcOverride: 0xdead }));
// Give the event loop time to process the frame
await new Promise<void>((resolve) => setTimeout(resolve, 100));
// Socket must still be open
expect(socket.destroy).not.toHaveBeenCalled();
expect(socket.destroyed).toBe(false);
// No new writes after the frame (no ACK sent)
expect(socket.getWritten().length).toBe(writtenBeforeFrame);
// Clean up — simulate device disconnect
socket.simulateClose();
await sessionPromise;
}, 10_000);
});
describe('STRICT_DEVICE_AUTH', () => {
it('writes 0x00 and destroys socket for unknown device when STRICT_DEVICE_AUTH=true', async () => {
const socket = makeMockSocket();
const ctx = makeMockContext();
const strictAuthority: DeviceAuthority = {
check: vi.fn().mockResolvedValue('unknown'),
};
const adapter = createTeltonikaAdapter({
port: 5027,
deviceAuthority: strictAuthority,
strictDeviceAuth: true,
});
const sessionPromise = adapter.handleSession(socket, ctx);
socket.push(encodeImei(IMEI));
await sessionPromise;
const allWritten = Buffer.concat(socket.getWritten());
expect(allWritten).toEqual(Buffer.from([0x00]));
expect(socket.destroy).toHaveBeenCalled();
});
it('accepts unknown device when STRICT_DEVICE_AUTH=false (default)', async () => {
const socket = makeMockSocket();
const ctx = makeMockContext();
const unknownAuthority: DeviceAuthority = {
check: vi.fn().mockResolvedValue('unknown'),
};
const adapter = createTeltonikaAdapter({
port: 5027,
deviceAuthority: unknownAuthority,
strictDeviceAuth: false,
});
const sessionPromise = adapter.handleSession(socket, ctx);
socket.push(encodeImei(IMEI));
// Wait for the accept byte
await vi.waitFor(
() => {
expect(socket.getWritten().length).toBeGreaterThan(0);
},
{ timeout: 2000 },
);
const allWritten = Buffer.concat(socket.getWritten());
expect(allWritten[0]).toBe(0x01);
// Not destroyed
expect(socket.destroy).not.toHaveBeenCalled();
socket.simulateClose();
await sessionPromise;
}, 10_000);
});
describe('known device acceptance', () => {
it('writes 0x01 for a known device and stays connected', async () => {
const socket = makeMockSocket();
const ctx = makeMockContext();
const adapter = createTeltonikaAdapter({ port: 5027 });
const sessionPromise = adapter.handleSession(socket, ctx);
socket.push(encodeImei(IMEI));
// Wait for the accept byte
await vi.waitFor(
() => {
expect(socket.getWritten().length).toBeGreaterThan(0);
},
{ timeout: 2000 },
);
const allWritten = Buffer.concat(socket.getWritten());
expect(allWritten[0]).toBe(0x01);
socket.simulateClose();
await sessionPromise;
}, 10_000);
});
});