c8a5f4cd68
ROADMAP plus granular task files per phase. Phase 1 (12 tasks + 1.13 device authority) covers Codec 8/8E/16 telemetry ingestion; Phase 2 (6 tasks) covers Codec 12/14 outbound commands; Phase 3 enumerates deferred items.
142 lines
6.1 KiB
Markdown
142 lines
6.1 KiB
Markdown
# Task 2.4 — Command consumer (Redis stream reader)
|
|
|
|
**Phase:** 2 — Outbound commands
|
|
**Status:** ⬜ Not started
|
|
**Depends on:** 2.1, 2.3
|
|
**Wiki refs:** `docs/wiki/concepts/phase-2-commands.md` § 9.6
|
|
|
|
## Goal
|
|
|
|
Each Ingestion instance runs a worker that consumes commands from `commands:outbound:{instance_id}`, looks up the local socket for the target IMEI, and dispatches the command to the appropriate codec encoder + write queue.
|
|
|
|
## Deliverables
|
|
|
|
- `src/adapters/teltonika/command-consumer.ts`:
|
|
- `CommandConsumer` class with `start()` and `stop()` methods.
|
|
- Internal: a registry of `imei → SocketWriteQueue` for sessions held by this instance.
|
|
- Methods exposed to the session lifecycle: `attach(imei, queue)`, `detach(imei)`.
|
|
- Reads commands via `XREADGROUP commands:outbound:{instance_id} GROUP ingest {instance_id} COUNT 16 BLOCK 1000`.
|
|
- Calls codec-specific encoder/handler based on the command's `codec` field.
|
|
- On terminal outcome (delivered, responded, failed), publishes to `commands:responses`.
|
|
- `src/adapters/teltonika/responses.ts`:
|
|
- `publishResponse({ commandId, status, response?, failureReason? })` writes to `commands:responses` via `XADD`.
|
|
|
|
## Specification
|
|
|
|
### Stream consumption
|
|
|
|
```ts
|
|
async start(): Promise<void> {
|
|
// Ensure the consumer group exists. MKSTREAM creates the stream if absent.
|
|
try {
|
|
await this.redis.xgroup('CREATE', this.streamKey, 'ingest', '$', 'MKSTREAM');
|
|
} catch (err: any) {
|
|
if (!err.message?.includes('BUSYGROUP')) throw err;
|
|
}
|
|
|
|
while (!this.stopping) {
|
|
const messages = await this.redis.xreadgroup(
|
|
'GROUP', 'ingest', this.instanceId,
|
|
'COUNT', 16, 'BLOCK', 1000,
|
|
'STREAMS', this.streamKey, '>',
|
|
);
|
|
if (!messages) continue;
|
|
for (const [, entries] of messages) {
|
|
for (const [id, fields] of entries) {
|
|
await this.handleCommand(id, fieldsToObject(fields));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
`fieldsToObject` converts Redis's flat `[key, value, key, value, ...]` array to a plain object.
|
|
|
|
### Command field shape
|
|
|
|
Per the Phase 2 design, Directus's Flow publishes:
|
|
|
|
```
|
|
XADD commands:outbound:{instance_id} *
|
|
command_id <uuid>
|
|
target_imei <string>
|
|
codec 12 | 14
|
|
payload <ASCII command text>
|
|
expires_at <unix-seconds>
|
|
```
|
|
|
|
### Dispatch
|
|
|
|
```ts
|
|
async handleCommand(streamId: string, cmd: CommandMessage): Promise<void> {
|
|
const queue = this.sessions.get(cmd.target_imei);
|
|
if (!queue) {
|
|
await this.publishResponse({ commandId: cmd.command_id, status: 'failed', failureReason: 'socket_closed' });
|
|
await this.redis.xack(this.streamKey, 'ingest', streamId);
|
|
return;
|
|
}
|
|
if (Date.now() / 1000 > cmd.expires_at) {
|
|
await this.publishResponse({ commandId: cmd.command_id, status: 'failed', failureReason: 'expired_before_delivery' });
|
|
await this.redis.xack(this.streamKey, 'ingest', streamId);
|
|
return;
|
|
}
|
|
try {
|
|
const frame = encodeCommand(cmd.codec, cmd.command_id, cmd.payload);
|
|
const responseBuf = await queue.writeCommand(cmd.command_id, frame, /* timeoutMs */ 30_000);
|
|
const parsed = parseCommandResponse(cmd.codec, responseBuf);
|
|
await this.publishResponse({
|
|
commandId: cmd.command_id,
|
|
status: parsed.kind === 'ack' ? 'responded' : 'failed',
|
|
response: parsed.text,
|
|
failureReason: parsed.kind === 'nack' ? 'imei_mismatch' : undefined,
|
|
});
|
|
} catch (err) {
|
|
await this.publishResponse({ commandId: cmd.command_id, status: 'failed', failureReason: errToReason(err) });
|
|
} finally {
|
|
await this.redis.xack(this.streamKey, 'ingest', streamId);
|
|
}
|
|
}
|
|
```
|
|
|
|
`encodeCommand` and `parseCommandResponse` come from tasks 2.5 (Codec 12) and 2.6 (Codec 14).
|
|
|
|
### `commands:responses` shape
|
|
|
|
```
|
|
XADD commands:responses *
|
|
command_id <uuid>
|
|
status delivered | responded | failed
|
|
response <ASCII response text, optional>
|
|
failure_reason socket_closed | expired_before_delivery | imei_mismatch | timeout | write_queue_full | ...
|
|
responded_at <ms>
|
|
```
|
|
|
|
### Lifecycle hooks
|
|
|
|
In the Teltonika session:
|
|
|
|
- After successful handshake: `commandConsumer.attach(imei, writeQueue)`.
|
|
- On socket close: `commandConsumer.detach(imei)`.
|
|
- The consumer must reject any in-flight command for a detached IMEI with `socket_closed`.
|
|
|
|
### Concurrency
|
|
|
|
The consumer reads up to 16 messages per `XREADGROUP` call. Process them sequentially per call (`for await`). Multiple commands targeting different IMEIs can complete in parallel naturally because each goes to a different `SocketWriteQueue`. Within a single IMEI, the queue serializes them.
|
|
|
|
## Acceptance criteria
|
|
|
|
- [ ] Publishing a command via `XADD commands:outbound:{instance_id}` causes the consumer to call `writeCommand` on the right session.
|
|
- [ ] If the IMEI is not held by this instance, the consumer publishes `failed` with `socket_closed` to `commands:responses` and ACKs the stream entry.
|
|
- [ ] If `expires_at` has passed, the consumer publishes `failed` with `expired_before_delivery` and ACKs.
|
|
- [ ] On `stop()`, the consumer drains the in-flight message and exits the read loop cleanly.
|
|
- [ ] `XACK` happens only after the response is published (or terminal failure recorded), so a crash mid-handler causes the command to be redelivered.
|
|
|
|
## Risks / open questions
|
|
|
|
- Crash mid-handler: the command was sent on the wire but we crashed before `XACK`. After restart, the consumer will redeliver; the new instance won't have the device, so it publishes `socket_closed`. The result: command was delivered to the device but Directus thinks it failed. Operator re-issues. Acceptable v1; flagged in [[phase-2-commands]] as a sweeper concern. Idempotent device commands mitigate.
|
|
- Duplicate delivery via `XPENDING`: not handling Pending Entries List explicitly in v1. If a consumer crashes, its claims time out and another consumer in the group can claim — but we're using `instance_id` as the consumer name, so cross-instance claiming would deliver commands to the wrong device. **Decision:** each instance is the only consumer in its own consumer group (group name = `ingest`, consumer name = `instance_id`, but stream is per-instance so no cross-claiming risk). Verify this matches the Directus-side publishing logic.
|
|
|
|
## Done
|
|
|
|
(Fill in once complete.)
|