# Task 2.4 — Command consumer (Redis stream reader) **Phase:** 2 — Outbound commands **Status:** ⬜ Not started **Depends on:** 2.1, 2.3 **Wiki refs:** `docs/wiki/concepts/phase-2-commands.md` § 9.6 ## Goal Each Ingestion instance runs a worker that consumes commands from `commands:outbound:{instance_id}`, looks up the local socket for the target IMEI, and dispatches the command to the appropriate codec encoder + write queue. ## Deliverables - `src/adapters/teltonika/command-consumer.ts`: - `CommandConsumer` class with `start()` and `stop()` methods. - Internal: a registry of `imei → SocketWriteQueue` for sessions held by this instance. - Methods exposed to the session lifecycle: `attach(imei, queue)`, `detach(imei)`. - Reads commands via `XREADGROUP commands:outbound:{instance_id} GROUP ingest {instance_id} COUNT 16 BLOCK 1000`. - Calls codec-specific encoder/handler based on the command's `codec` field. - On terminal outcome (delivered, responded, failed), publishes to `commands:responses`. - `src/adapters/teltonika/responses.ts`: - `publishResponse({ commandId, status, response?, failureReason? })` writes to `commands:responses` via `XADD`. ## Specification ### Stream consumption ```ts async start(): Promise { // Ensure the consumer group exists. MKSTREAM creates the stream if absent. try { await this.redis.xgroup('CREATE', this.streamKey, 'ingest', '$', 'MKSTREAM'); } catch (err: any) { if (!err.message?.includes('BUSYGROUP')) throw err; } while (!this.stopping) { const messages = await this.redis.xreadgroup( 'GROUP', 'ingest', this.instanceId, 'COUNT', 16, 'BLOCK', 1000, 'STREAMS', this.streamKey, '>', ); if (!messages) continue; for (const [, entries] of messages) { for (const [id, fields] of entries) { await this.handleCommand(id, fieldsToObject(fields)); } } } } ``` `fieldsToObject` converts Redis's flat `[key, value, key, value, ...]` array to a plain object. ### Command field shape Per the Phase 2 design, Directus's Flow publishes: ``` XADD commands:outbound:{instance_id} * command_id target_imei codec 12 | 14 payload expires_at ``` ### Dispatch ```ts async handleCommand(streamId: string, cmd: CommandMessage): Promise { const queue = this.sessions.get(cmd.target_imei); if (!queue) { await this.publishResponse({ commandId: cmd.command_id, status: 'failed', failureReason: 'socket_closed' }); await this.redis.xack(this.streamKey, 'ingest', streamId); return; } if (Date.now() / 1000 > cmd.expires_at) { await this.publishResponse({ commandId: cmd.command_id, status: 'failed', failureReason: 'expired_before_delivery' }); await this.redis.xack(this.streamKey, 'ingest', streamId); return; } try { const frame = encodeCommand(cmd.codec, cmd.command_id, cmd.payload); const responseBuf = await queue.writeCommand(cmd.command_id, frame, /* timeoutMs */ 30_000); const parsed = parseCommandResponse(cmd.codec, responseBuf); await this.publishResponse({ commandId: cmd.command_id, status: parsed.kind === 'ack' ? 'responded' : 'failed', response: parsed.text, failureReason: parsed.kind === 'nack' ? 'imei_mismatch' : undefined, }); } catch (err) { await this.publishResponse({ commandId: cmd.command_id, status: 'failed', failureReason: errToReason(err) }); } finally { await this.redis.xack(this.streamKey, 'ingest', streamId); } } ``` `encodeCommand` and `parseCommandResponse` come from tasks 2.5 (Codec 12) and 2.6 (Codec 14). ### `commands:responses` shape ``` XADD commands:responses * command_id status delivered | responded | failed response failure_reason socket_closed | expired_before_delivery | imei_mismatch | timeout | write_queue_full | ... responded_at ``` ### Lifecycle hooks In the Teltonika session: - After successful handshake: `commandConsumer.attach(imei, writeQueue)`. - On socket close: `commandConsumer.detach(imei)`. - The consumer must reject any in-flight command for a detached IMEI with `socket_closed`. ### Concurrency The consumer reads up to 16 messages per `XREADGROUP` call. Process them sequentially per call (`for await`). Multiple commands targeting different IMEIs can complete in parallel naturally because each goes to a different `SocketWriteQueue`. Within a single IMEI, the queue serializes them. ## Acceptance criteria - [ ] Publishing a command via `XADD commands:outbound:{instance_id}` causes the consumer to call `writeCommand` on the right session. - [ ] If the IMEI is not held by this instance, the consumer publishes `failed` with `socket_closed` to `commands:responses` and ACKs the stream entry. - [ ] If `expires_at` has passed, the consumer publishes `failed` with `expired_before_delivery` and ACKs. - [ ] On `stop()`, the consumer drains the in-flight message and exits the read loop cleanly. - [ ] `XACK` happens only after the response is published (or terminal failure recorded), so a crash mid-handler causes the command to be redelivered. ## Risks / open questions - Crash mid-handler: the command was sent on the wire but we crashed before `XACK`. After restart, the consumer will redeliver; the new instance won't have the device, so it publishes `socket_closed`. The result: command was delivered to the device but Directus thinks it failed. Operator re-issues. Acceptable v1; flagged in [[phase-2-commands]] as a sweeper concern. Idempotent device commands mitigate. - Duplicate delivery via `XPENDING`: not handling Pending Entries List explicitly in v1. If a consumer crashes, its claims time out and another consumer in the group can claim — but we're using `instance_id` as the consumer name, so cross-instance claiming would deliver commands to the wrong device. **Decision:** each instance is the only consumer in its own consumer group (group name = `ingest`, consumer name = `instance_id`, but stream is per-instance so no cross-claiming risk). Verify this matches the Directus-side publishing logic. ## Done (Fill in once complete.)