Files
julian c8a5f4cd68 Add Phase 1 and Phase 2 planning documents
ROADMAP plus granular task files per phase. Phase 1 (12 tasks + 1.13
device authority) covers Codec 8/8E/16 telemetry ingestion; Phase 2
(6 tasks) covers Codec 12/14 outbound commands; Phase 3 enumerates
deferred items.
2026-04-30 15:50:49 +02:00

6.1 KiB

Task 2.4 — Command consumer (Redis stream reader)

Phase: 2 — Outbound commands Status: Not started Depends on: 2.1, 2.3 Wiki refs: docs/wiki/concepts/phase-2-commands.md § 9.6

Goal

Each Ingestion instance runs a worker that consumes commands from commands:outbound:{instance_id}, looks up the local socket for the target IMEI, and dispatches the command to the appropriate codec encoder + write queue.

Deliverables

  • src/adapters/teltonika/command-consumer.ts:
    • CommandConsumer class with start() and stop() methods.
    • Internal: a registry of imei → SocketWriteQueue for sessions held by this instance.
    • Methods exposed to the session lifecycle: attach(imei, queue), detach(imei).
    • Reads commands via XREADGROUP commands:outbound:{instance_id} GROUP ingest {instance_id} COUNT 16 BLOCK 1000.
    • Calls codec-specific encoder/handler based on the command's codec field.
    • On terminal outcome (delivered, responded, failed), publishes to commands:responses.
  • src/adapters/teltonika/responses.ts:
    • publishResponse({ commandId, status, response?, failureReason? }) writes to commands:responses via XADD.

Specification

Stream consumption

async start(): Promise<void> {
  // Ensure the consumer group exists. MKSTREAM creates the stream if absent.
  try {
    await this.redis.xgroup('CREATE', this.streamKey, 'ingest', '$', 'MKSTREAM');
  } catch (err: any) {
    if (!err.message?.includes('BUSYGROUP')) throw err;
  }

  while (!this.stopping) {
    const messages = await this.redis.xreadgroup(
      'GROUP', 'ingest', this.instanceId,
      'COUNT', 16, 'BLOCK', 1000,
      'STREAMS', this.streamKey, '>',
    );
    if (!messages) continue;
    for (const [, entries] of messages) {
      for (const [id, fields] of entries) {
        await this.handleCommand(id, fieldsToObject(fields));
      }
    }
  }
}

fieldsToObject converts Redis's flat [key, value, key, value, ...] array to a plain object.

Command field shape

Per the Phase 2 design, Directus's Flow publishes:

XADD commands:outbound:{instance_id} *
  command_id   <uuid>
  target_imei  <string>
  codec        12 | 14
  payload      <ASCII command text>
  expires_at   <unix-seconds>

Dispatch

async handleCommand(streamId: string, cmd: CommandMessage): Promise<void> {
  const queue = this.sessions.get(cmd.target_imei);
  if (!queue) {
    await this.publishResponse({ commandId: cmd.command_id, status: 'failed', failureReason: 'socket_closed' });
    await this.redis.xack(this.streamKey, 'ingest', streamId);
    return;
  }
  if (Date.now() / 1000 > cmd.expires_at) {
    await this.publishResponse({ commandId: cmd.command_id, status: 'failed', failureReason: 'expired_before_delivery' });
    await this.redis.xack(this.streamKey, 'ingest', streamId);
    return;
  }
  try {
    const frame = encodeCommand(cmd.codec, cmd.command_id, cmd.payload);
    const responseBuf = await queue.writeCommand(cmd.command_id, frame, /* timeoutMs */ 30_000);
    const parsed = parseCommandResponse(cmd.codec, responseBuf);
    await this.publishResponse({
      commandId: cmd.command_id,
      status: parsed.kind === 'ack' ? 'responded' : 'failed',
      response: parsed.text,
      failureReason: parsed.kind === 'nack' ? 'imei_mismatch' : undefined,
    });
  } catch (err) {
    await this.publishResponse({ commandId: cmd.command_id, status: 'failed', failureReason: errToReason(err) });
  } finally {
    await this.redis.xack(this.streamKey, 'ingest', streamId);
  }
}

encodeCommand and parseCommandResponse come from tasks 2.5 (Codec 12) and 2.6 (Codec 14).

commands:responses shape

XADD commands:responses *
  command_id      <uuid>
  status          delivered | responded | failed
  response        <ASCII response text, optional>
  failure_reason  socket_closed | expired_before_delivery | imei_mismatch | timeout | write_queue_full | ...
  responded_at    <ms>

Lifecycle hooks

In the Teltonika session:

  • After successful handshake: commandConsumer.attach(imei, writeQueue).
  • On socket close: commandConsumer.detach(imei).
  • The consumer must reject any in-flight command for a detached IMEI with socket_closed.

Concurrency

The consumer reads up to 16 messages per XREADGROUP call. Process them sequentially per call (for await). Multiple commands targeting different IMEIs can complete in parallel naturally because each goes to a different SocketWriteQueue. Within a single IMEI, the queue serializes them.

Acceptance criteria

  • Publishing a command via XADD commands:outbound:{instance_id} causes the consumer to call writeCommand on the right session.
  • If the IMEI is not held by this instance, the consumer publishes failed with socket_closed to commands:responses and ACKs the stream entry.
  • If expires_at has passed, the consumer publishes failed with expired_before_delivery and ACKs.
  • On stop(), the consumer drains the in-flight message and exits the read loop cleanly.
  • XACK happens only after the response is published (or terminal failure recorded), so a crash mid-handler causes the command to be redelivered.

Risks / open questions

  • Crash mid-handler: the command was sent on the wire but we crashed before XACK. After restart, the consumer will redeliver; the new instance won't have the device, so it publishes socket_closed. The result: command was delivered to the device but Directus thinks it failed. Operator re-issues. Acceptable v1; flagged in phase-2-commands as a sweeper concern. Idempotent device commands mitigate.
  • Duplicate delivery via XPENDING: not handling Pending Entries List explicitly in v1. If a consumer crashes, its claims time out and another consumer in the group can claim — but we're using instance_id as the consumer name, so cross-instance claiming would deliver commands to the wrong device. Decision: each instance is the only consumer in its own consumer group (group name = ingest, consumer name = instance_id, but stream is per-instance so no cross-claiming risk). Verify this matches the Directus-side publishing logic.

Done

(Fill in once complete.)