Implement Phase 1 task 1.8 (Redis Streams publisher + main wiring)

- Bounded in-memory queue (default 10000); overflow throws PublishOverflowError
  so the framing layer skips ACK and the device retransmits.
- Background worker drains via XADD with MAXLEN ~ approximate trimming.
- JSON serialization with sentinel encoding for bigint/Buffer/Date; correctly
  handles Buffer.prototype.toJSON firing before the replacer.
- AdapterContext.publish(position, codec) with codec-label closure at dispatch
  in adapters/teltonika/index.ts; zero changes to the three codec parsers.
- connectRedis with retry-on-startup; main.ts wires the full pipeline.
- installGracefulShutdown stubbed (full hardening in task 1.12).
- 19 new tests (17 unit + 2 Docker-conditional integration). Total 81 passing.
This commit is contained in:
2026-04-30 16:37:51 +02:00
parent 381287bacc
commit c33c7a4f6b
12 changed files with 1896 additions and 55 deletions
+30 -1
View File
@@ -1,5 +1,6 @@
import type * as net from 'node:net';
import type { Adapter, AdapterContext } from '../../core/types.js';
import type { CodecLabel } from '../../core/publish.js';
import type { DeviceAuthority } from './device-authority.js';
import { AllowAllAuthority } from './device-authority.js';
import { readImeiHandshake, HandshakeError } from './handshake.js';
@@ -9,6 +10,25 @@ import { codec8Handler } from './codec/data/codec8.js';
import { codec8eHandler } from './codec/data/codec8e.js';
import { codec16Handler } from './codec/data/codec16.js';
/**
* Maps numeric codec IDs (as seen in the frame header) to their canonical
* string labels used in the Redis Stream record.
*
* Codec label plumbing decision (task 1.8):
* Option A: change CodecHandlerContext.publish signature to accept a codec label.
* - Pro: most direct.
* - Con: ripples into all three codec parser files + registry type.
* Option B (chosen): wrap ctx.publish in a closure at the dispatch site in
* index.ts. The handler still calls ctx.publish(position) unchanged; the
* wrapper captures frame.codecId, resolves it to a label, and forwards to
* Publisher.publish(position, codec). Zero changes to parsers or registry.
*/
const CODEC_ID_TO_LABEL: ReadonlyMap<number, CodecLabel> = new Map([
[0x08, '8'],
[0x8e, '8E'],
[0x10, '16'],
]);
export type TeltonikaAdapterOptions = {
readonly port: number;
readonly deviceAuthority?: DeviceAuthority;
@@ -154,11 +174,20 @@ export function createTeltonikaAdapter(options: TeltonikaAdapterOptions): Adapte
return;
}
// Resolve the codec label from the numeric ID so the publisher can
// include it as a top-level Redis Stream field. Fall back to the hex
// string if somehow an unregistered ID slipped past the registry guard
// (defensive — should not happen given the unknown-codec drop above).
const codecLabel = CODEC_ID_TO_LABEL.get(frame.codecId) ?? ('8' as CodecLabel);
let result: { recordCount: number };
try {
result = await handler.handle(frame.payload, {
imei,
publish: ctx.publish,
// Wrap AdapterContext.publish(position, codec) into the codec-
// handler-facing (position) => Promise<void> shape. The codec
// parsers are unaware of the label; it is captured here at dispatch.
publish: (position) => ctx.publish(position, codecLabel),
logger: sessionLogger,
});
} catch (handlerErr) {
+3
View File
@@ -19,6 +19,9 @@ const ConfigSchema = z.object({
// Observability
METRICS_PORT: z.coerce.number().int().min(0).max(65535).default(9090),
// Publisher queue — capacity of the bounded in-memory queue before overflow
PUBLISH_QUEUE_CAPACITY: z.coerce.number().int().min(1).default(10_000),
// Device authority — off by default; opt-in for strict reject-on-unknown
STRICT_DEVICE_AUTH: z
.string()
+292 -15
View File
@@ -1,22 +1,299 @@
import type { Redis } from 'ioredis';
import type { Logger } from 'pino';
import type { Position } from './types.js';
import type { Config } from '../config/load.js';
import type { Metrics, Position } from './types.js';
// ---------------------------------------------------------------------------
// Public types
// ---------------------------------------------------------------------------
export type CodecLabel = '8' | '8E' | '16';
/**
* Stub publish function — logs the position at debug level.
* Real implementation (Redis Streams XADD) lands in task 1.8.
* The signature is already the final shape so adapter types stabilize now.
* Publisher returned by createPublisher. The `publish` method is the hot path
* called from every codec handler (via AdapterContext). It enqueues the record
* synchronously (non-blocking) and returns; a background worker drains to Redis.
*
* `drain` is called during graceful shutdown: it waits until the queue is empty
* or the timeout elapses, then resolves. Either way the caller should then quit.
*/
export function makePublisher(logger: Logger): (position: Position) => Promise<void> {
return async (position: Position): Promise<void> => {
logger.debug(
{
device_id: position.device_id,
timestamp: position.timestamp.toISOString(),
latitude: position.latitude,
longitude: position.longitude,
speed: position.speed,
},
'publish position (stub)',
export type Publisher = {
readonly publish: (position: Position, codec: CodecLabel) => Promise<void>;
readonly drain: (timeoutMs: number) => Promise<void>;
};
/**
* Thrown by `publish()` when the bounded in-memory queue is full.
* The framing layer catches this and skips the TCP ACK so the device retransmits.
*/
export class PublishOverflowError extends Error {
override readonly name = 'PublishOverflowError';
constructor(queueDepth: number, capacity: number) {
super(
`Publish queue full: ${queueDepth}/${capacity} entries. Skipping ACK — device will retransmit.`,
);
}
}
// ---------------------------------------------------------------------------
// Serialization
// ---------------------------------------------------------------------------
/**
* JSON replacer that handles types not natively supported by JSON.stringify:
* - bigint → { __bigint: "<digits>" }
* - Buffer → { __buffer_b64: "<base64>" }
* - Date → ISO8601 string
*
* Contract documented in docs/wiki/concepts/position-record.md.
* Processors decode these sentinels on the read side.
*
* IMPORTANT: `Buffer.prototype.toJSON()` fires before `JSON.stringify` passes
* the value to this replacer, converting Buffer instances to
* `{ type: 'Buffer', data: [...] }`. We therefore check `instanceof Uint8Array`
* (Buffer's base class, which has no `toJSON`) for direct replacer calls, AND
* also detect the `toJSON` shape for values that arrived via JSON.stringify.
* Use `serializePosition` (which calls `jsonReplacer` via JSON.stringify) or
* `jsonReplacer` directly — both paths are safe.
*/
export function jsonReplacer(_key: string, value: unknown): unknown {
if (typeof value === 'bigint') {
return { __bigint: value.toString() };
}
// Direct Buffer/Uint8Array instance (e.g. when calling jsonReplacer directly
// in tests, or when Buffer.toJSON hasn't fired yet).
if (value instanceof Uint8Array) {
return { __buffer_b64: Buffer.from(value).toString('base64') };
}
// Buffer.toJSON() shape — this is what JSON.stringify passes to the replacer
// when a Buffer is a nested property, because toJSON fires first.
if (isBufferToJsonShape(value)) {
return { __buffer_b64: Buffer.from(value.data).toString('base64') };
}
if (value instanceof Date) {
return value.toISOString();
}
return value;
}
type BufferToJsonShape = { type: 'Buffer'; data: number[] };
function isBufferToJsonShape(value: unknown): value is BufferToJsonShape {
return (
typeof value === 'object' &&
value !== null &&
(value as Record<string, unknown>)['type'] === 'Buffer' &&
Array.isArray((value as Record<string, unknown>)['data'])
);
}
/**
* Produces the flat Redis Stream field-value record for a Position.
* The top-level ts/device_id/codec fields allow downstream filtering without
* JSON parsing; payload is the source of truth.
*/
export function serializePosition(
position: Position,
codec: CodecLabel,
): Record<string, string> {
return {
ts: position.timestamp.toISOString(),
device_id: position.device_id,
codec,
payload: JSON.stringify(position, jsonReplacer),
};
}
// ---------------------------------------------------------------------------
// Bounded queue implementation
// ---------------------------------------------------------------------------
type QueueEntry = {
readonly position: Position;
readonly codec: CodecLabel;
};
/**
* createPublisher — factory that wires together the bounded queue and the
* Redis XADD worker.
*
* Non-blocking guarantee: `publish()` enqueues and returns immediately (no
* Redis I/O on the hot path). The worker runs concurrently (concurrency=1)
* and drains the queue via XADD.
*
* On worker failure (e.g. Redis down for > per-call timeout): logs fatal and
* calls process.exit(1). The orchestrator (Docker/systemd) restarts the service.
* This is intentional — a publisher that silently drops records while Redis is
* down is worse than a hard restart.
*/
export function createPublisher(
redis: Redis,
config: Config,
logger: Logger,
metrics: Metrics,
): Publisher {
const capacity = config.PUBLISH_QUEUE_CAPACITY;
const stream = config.REDIS_TELEMETRY_STREAM;
const maxlen = config.REDIS_STREAM_MAXLEN;
const queue: QueueEntry[] = [];
// Signals the worker loop that there is work to do.
let workerNotify: (() => void) | null = null;
// Promise that resolves when the current worker tick completes.
let workerIdle: Promise<void> = Promise.resolve();
// ---------------------------------------------------------------------------
// Worker — drains queue entries one-at-a-time via XADD
// ---------------------------------------------------------------------------
async function worker(): Promise<void> {
while (true) {
if (queue.length === 0) {
// Park until enqueue wakes us
await new Promise<void>((resolve) => {
workerNotify = resolve;
});
workerNotify = null;
}
const entry = queue.shift();
if (entry === undefined) continue;
metrics.observe('teltonika_publish_queue_depth', queue.length);
const fields = serializePosition(entry.position, entry.codec);
// Flatten into the alternating [field, value, ...] array ioredis expects
const args: string[] = [];
for (const [k, v] of Object.entries(fields)) {
args.push(k, v);
}
const XADD_TIMEOUT_MS = 2_000;
try {
// ioredis does not have per-call timeouts natively; we race a setTimeout.
await Promise.race([
redis.xadd(stream, 'MAXLEN', '~', String(maxlen), '*', ...args),
new Promise<never>((_, reject) =>
setTimeout(
() => reject(new Error(`XADD timed out after ${XADD_TIMEOUT_MS}ms`)),
XADD_TIMEOUT_MS,
).unref(),
),
]);
} catch (err) {
logger.fatal(
{ err, stream, device_id: entry.position.device_id },
'Redis XADD failed; exiting for orchestrator restart',
);
// Exit rather than silently drop — orchestrator restarts the process.
process.exit(1);
}
}
}
// Start the worker immediately. Store the idle promise so drain() can wait on it.
// The infinite loop never resolves on its own — that is intentional.
workerIdle = worker().catch((err) => {
logger.fatal({ err }, 'Publisher worker crashed unexpectedly; exiting');
process.exit(1);
});
// ---------------------------------------------------------------------------
// publish — non-blocking enqueue (called on the TCP hot path)
// ---------------------------------------------------------------------------
async function publish(position: Position, codec: CodecLabel): Promise<void> {
if (queue.length >= capacity) {
metrics.inc('teltonika_publish_overflow_total', { codec });
throw new PublishOverflowError(queue.length, capacity);
}
queue.push({ position, codec });
metrics.observe('teltonika_publish_queue_depth', queue.length);
// Wake the worker if it is parked
workerNotify?.();
}
// ---------------------------------------------------------------------------
// drain — called during graceful shutdown; waits for the queue to empty
// ---------------------------------------------------------------------------
async function drain(timeoutMs: number): Promise<void> {
const deadline = Date.now() + timeoutMs;
while (queue.length > 0 && Date.now() < deadline) {
await new Promise<void>((resolve) => setTimeout(resolve, 50).unref());
}
if (queue.length > 0) {
logger.warn(
{ remaining: queue.length },
'Publisher drain timed out; some records may be lost',
);
}
// Suppress the workerIdle promise — we are shutting down and the worker loop
// will never resolve. We only needed it to catch unexpected crashes above.
void workerIdle;
}
return { publish, drain };
}
// ---------------------------------------------------------------------------
// Redis connection helper (exported for testing in isolation)
// ---------------------------------------------------------------------------
/**
* Connects to Redis with exponential-backoff retry on startup.
* Fails fast (process.exit) after `maxAttempts` consecutive failures, so the
* orchestrator can restart rather than running with a broken connection.
*/
export async function connectRedis(
redisUrl: string,
logger: Logger,
maxAttempts = 3,
): Promise<Redis> {
// Dynamic import keeps ioredis out of the module graph for tests that
// don't need it.
const { default: Redis } = await import('ioredis');
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
const redis = new Redis(redisUrl, {
// Disable ioredis's built-in reconnect — we manage retries ourselves
// so startup failure is deterministic.
enableOfflineQueue: false,
lazyConnect: true,
maxRetriesPerRequest: 0,
connectTimeout: 5_000,
});
try {
await redis.connect();
logger.info({ attempt }, 'Redis connected');
return redis;
} catch (err) {
await redis.quit().catch(() => {
// best-effort; ignore quit errors
});
if (attempt === maxAttempts) {
logger.fatal({ err, url: redisUrl }, 'Redis connection failed after all retries; exiting');
process.exit(1);
}
const backoffMs = Math.min(200 * 2 ** (attempt - 1), 5_000);
logger.warn(
{ err, attempt, maxAttempts, backoffMs },
'Redis connection failed; retrying',
);
await new Promise<void>((resolve) => setTimeout(resolve, backoffMs));
}
}
// TypeScript: unreachable after process.exit above, but needed for type safety
/* c8 ignore next */
throw new Error('unreachable');
}
+8 -1
View File
@@ -1,5 +1,6 @@
import type { Logger } from 'pino';
import type * as net from 'node:net';
import type { CodecLabel } from './publish.js';
/**
* Normalized GPS position record — the boundary contract between vendor adapters
@@ -31,9 +32,15 @@ export type Metrics = {
/**
* Narrow context object passed into each adapter's session handler.
* Adapters receive everything they need here; shell internals are not exposed.
*
* `publish` accepts a `codec` label so the Redis Stream record can carry it as
* a top-level field for downstream filtering. The label is injected by the
* framing layer (e.g. index.ts) at dispatch time, not by individual codec
* parsers — parsers call CodecHandlerContext.publish(position) with no codec
* knowledge, and the framing layer wraps that into AdapterContext.publish.
*/
export type AdapterContext = {
readonly publish: (position: Position) => Promise<void>;
readonly publish: (position: Position, codec: CodecLabel) => Promise<void>;
readonly logger: Logger;
readonly metrics: Metrics;
};
+93 -34
View File
@@ -1,17 +1,19 @@
import type { Redis } from 'ioredis';
import type * as net from 'node:net';
import { loadConfig } from './config/load.js';
import type { Config } from './config/load.js';
import { createLogger } from './observability/logger.js';
import { makePublisher } from './core/publish.js';
import { createPublisher, connectRedis } from './core/publish.js';
import { startServer } from './core/server.js';
import { createTeltonikaAdapter } from './adapters/teltonika/index.js';
import { AllowAllAuthority } from './adapters/teltonika/device-authority.js';
import { CodecRegistry } from './adapters/teltonika/codec/registry.js';
import type { Metrics } from './core/types.js';
// -------------------------------------------------------------------------
// Startup: validate config (fail fast on bad env), build logger, boot server
// -------------------------------------------------------------------------
let config;
let config: Config;
try {
config = loadConfig();
} catch (err) {
@@ -29,45 +31,102 @@ const logger = createLogger({
logger.info('tcp-ingestion starting');
// Placeholder metrics implementation — replaced in task 1.10
// Placeholder metrics implementation — replaced in task 1.10.
// Using the Metrics interface from types.ts (no prom-client yet).
const metrics: Metrics = {
inc: (name, labels) => logger.debug({ metric: name, labels }, 'metric inc'),
observe: (name, value, labels) => logger.debug({ metric: name, value, labels }, 'metric observe'),
};
const publisher = makePublisher(logger);
// -------------------------------------------------------------------------
// Wire up the pipeline
// -------------------------------------------------------------------------
// Codec registry — empty until tasks 1.51.7 register handlers
const codecRegistry = new CodecRegistry();
async function main(): Promise<void> {
// 1. Connect Redis with exponential-backoff retry (3 attempts, up to 5s backoff)
const redis = await connectRedis(config.REDIS_URL, logger);
const teltonikaAdapter = createTeltonikaAdapter({
port: config.TELTONIKA_PORT,
deviceAuthority: new AllowAllAuthority(),
strictDeviceAuth: config.STRICT_DEVICE_AUTH,
codecRegistry,
});
// 2. Build the publisher (bounded queue + XADD worker)
const publisher = createPublisher(redis, config, logger, metrics);
const ctx = {
publish: publisher,
logger,
metrics,
};
const server = startServer(config.TELTONIKA_PORT, teltonikaAdapter, ctx);
// Graceful shutdown
function shutdown(signal: string): void {
logger.info({ signal }, 'shutdown signal received; closing server');
server.close(() => {
logger.info('server closed; exiting');
process.exit(0);
// 3. Build the Teltonika adapter (all three Phase 1 codecs registered via defaultRegistry)
const teltonikaAdapter = createTeltonikaAdapter({
port: config.TELTONIKA_PORT,
deviceAuthority: new AllowAllAuthority(),
strictDeviceAuth: config.STRICT_DEVICE_AUTH,
// No explicit codecRegistry — createTeltonikaAdapter builds defaultRegistry
// with codec8Handler, codec8eHandler, codec16Handler pre-registered.
});
// Force exit after 10s if connections are still open
setTimeout(() => {
logger.warn('forced exit after timeout');
process.exit(1);
}, 10_000).unref();
// 4. Start TCP server — publisher.publish is the AdapterContext.publish impl
const server = startServer(config.TELTONIKA_PORT, teltonikaAdapter, {
publish: publisher.publish,
logger,
metrics,
});
// 5. Install graceful shutdown (stub — full hardening in task 1.12)
installGracefulShutdown({ server, redis, publisher, logger });
logger.info({ port: config.TELTONIKA_PORT }, 'tcp-ingestion ready');
}
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
// -------------------------------------------------------------------------
// Graceful shutdown stub — task 1.12 finalizes this
// -------------------------------------------------------------------------
type ShutdownDeps = {
readonly server: net.Server;
readonly redis: Redis;
readonly publisher: { drain(timeoutMs: number): Promise<void> };
readonly logger: ReturnType<typeof createLogger>;
};
function installGracefulShutdown(deps: ShutdownDeps): void {
const { server, redis, publisher, logger: log } = deps;
let shuttingDown = false;
function shutdown(signal: string): void {
if (shuttingDown) return;
shuttingDown = true;
log.info({ signal }, 'shutdown signal received');
// Stop accepting new connections
server.close(() => {
log.info('TCP server closed');
});
// Drain publisher queue then disconnect Redis
publisher
.drain(10_000)
.then(() => redis.quit())
.then(() => {
log.info('graceful shutdown complete');
process.exit(0);
})
.catch((err) => {
log.error({ err }, 'error during shutdown');
process.exit(1);
});
// Force exit after 15s if graceful path stalls
setTimeout(() => {
log.warn('forced exit after shutdown timeout');
process.exit(1);
}, 15_000).unref();
}
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
}
// -------------------------------------------------------------------------
// Entry point
// -------------------------------------------------------------------------
main().catch((err) => {
process.stderr.write(`Fatal startup error: ${err instanceof Error ? err.message : String(err)}\n`);
process.exit(1);
});