ROADMAP plus granular task files per phase. Phase 1 (12 tasks + 1.13 device authority) covers Codec 8/8E/16 telemetry ingestion; Phase 2 (6 tasks) covers Codec 12/14 outbound commands; Phase 3 enumerates deferred items.
6.1 KiB
Task 2.4 — Command consumer (Redis stream reader)
Phase: 2 — Outbound commands
Status: ⬜ Not started
Depends on: 2.1, 2.3
Wiki refs: docs/wiki/concepts/phase-2-commands.md § 9.6
Goal
Each Ingestion instance runs a worker that consumes commands from commands:outbound:{instance_id}, looks up the local socket for the target IMEI, and dispatches the command to the appropriate codec encoder + write queue.
Deliverables
src/adapters/teltonika/command-consumer.ts:CommandConsumerclass withstart()andstop()methods.- Internal: a registry of
imei → SocketWriteQueuefor sessions held by this instance. - Methods exposed to the session lifecycle:
attach(imei, queue),detach(imei). - Reads commands via
XREADGROUP commands:outbound:{instance_id} GROUP ingest {instance_id} COUNT 16 BLOCK 1000. - Calls codec-specific encoder/handler based on the command's
codecfield. - On terminal outcome (delivered, responded, failed), publishes to
commands:responses.
src/adapters/teltonika/responses.ts:publishResponse({ commandId, status, response?, failureReason? })writes tocommands:responsesviaXADD.
Specification
Stream consumption
async start(): Promise<void> {
// Ensure the consumer group exists. MKSTREAM creates the stream if absent.
try {
await this.redis.xgroup('CREATE', this.streamKey, 'ingest', '$', 'MKSTREAM');
} catch (err: any) {
if (!err.message?.includes('BUSYGROUP')) throw err;
}
while (!this.stopping) {
const messages = await this.redis.xreadgroup(
'GROUP', 'ingest', this.instanceId,
'COUNT', 16, 'BLOCK', 1000,
'STREAMS', this.streamKey, '>',
);
if (!messages) continue;
for (const [, entries] of messages) {
for (const [id, fields] of entries) {
await this.handleCommand(id, fieldsToObject(fields));
}
}
}
}
fieldsToObject converts Redis's flat [key, value, key, value, ...] array to a plain object.
Command field shape
Per the Phase 2 design, Directus's Flow publishes:
XADD commands:outbound:{instance_id} *
command_id <uuid>
target_imei <string>
codec 12 | 14
payload <ASCII command text>
expires_at <unix-seconds>
Dispatch
async handleCommand(streamId: string, cmd: CommandMessage): Promise<void> {
const queue = this.sessions.get(cmd.target_imei);
if (!queue) {
await this.publishResponse({ commandId: cmd.command_id, status: 'failed', failureReason: 'socket_closed' });
await this.redis.xack(this.streamKey, 'ingest', streamId);
return;
}
if (Date.now() / 1000 > cmd.expires_at) {
await this.publishResponse({ commandId: cmd.command_id, status: 'failed', failureReason: 'expired_before_delivery' });
await this.redis.xack(this.streamKey, 'ingest', streamId);
return;
}
try {
const frame = encodeCommand(cmd.codec, cmd.command_id, cmd.payload);
const responseBuf = await queue.writeCommand(cmd.command_id, frame, /* timeoutMs */ 30_000);
const parsed = parseCommandResponse(cmd.codec, responseBuf);
await this.publishResponse({
commandId: cmd.command_id,
status: parsed.kind === 'ack' ? 'responded' : 'failed',
response: parsed.text,
failureReason: parsed.kind === 'nack' ? 'imei_mismatch' : undefined,
});
} catch (err) {
await this.publishResponse({ commandId: cmd.command_id, status: 'failed', failureReason: errToReason(err) });
} finally {
await this.redis.xack(this.streamKey, 'ingest', streamId);
}
}
encodeCommand and parseCommandResponse come from tasks 2.5 (Codec 12) and 2.6 (Codec 14).
commands:responses shape
XADD commands:responses *
command_id <uuid>
status delivered | responded | failed
response <ASCII response text, optional>
failure_reason socket_closed | expired_before_delivery | imei_mismatch | timeout | write_queue_full | ...
responded_at <ms>
Lifecycle hooks
In the Teltonika session:
- After successful handshake:
commandConsumer.attach(imei, writeQueue). - On socket close:
commandConsumer.detach(imei). - The consumer must reject any in-flight command for a detached IMEI with
socket_closed.
Concurrency
The consumer reads up to 16 messages per XREADGROUP call. Process them sequentially per call (for await). Multiple commands targeting different IMEIs can complete in parallel naturally because each goes to a different SocketWriteQueue. Within a single IMEI, the queue serializes them.
Acceptance criteria
- Publishing a command via
XADD commands:outbound:{instance_id}causes the consumer to callwriteCommandon the right session. - If the IMEI is not held by this instance, the consumer publishes
failedwithsocket_closedtocommands:responsesand ACKs the stream entry. - If
expires_athas passed, the consumer publishesfailedwithexpired_before_deliveryand ACKs. - On
stop(), the consumer drains the in-flight message and exits the read loop cleanly. XACKhappens only after the response is published (or terminal failure recorded), so a crash mid-handler causes the command to be redelivered.
Risks / open questions
- Crash mid-handler: the command was sent on the wire but we crashed before
XACK. After restart, the consumer will redeliver; the new instance won't have the device, so it publishessocket_closed. The result: command was delivered to the device but Directus thinks it failed. Operator re-issues. Acceptable v1; flagged in phase-2-commands as a sweeper concern. Idempotent device commands mitigate. - Duplicate delivery via
XPENDING: not handling Pending Entries List explicitly in v1. If a consumer crashes, its claims time out and another consumer in the group can claim — but we're usinginstance_idas the consumer name, so cross-instance claiming would deliver commands to the wrong device. Decision: each instance is the only consumer in its own consumer group (group name =ingest, consumer name =instance_id, but stream is per-instance so no cross-claiming risk). Verify this matches the Directus-side publishing logic.
Done
(Fill in once complete.)