# Task 2.3 — Per-socket write queue & outstanding-command tracker **Phase:** 2 — Outbound commands **Status:** ⬜ Not started **Depends on:** Phase 1 complete (specifically the session loop in 1.4) **Wiki refs:** `docs/wiki/concepts/phase-2-commands.md` § 9.6, § 9.8 ## Goal Provide a per-socket serialization layer so: 1. Outbound command frames do not interleave with codec ACK writes (which would corrupt the byte stream). 2. Only **one command is outstanding per socket at a time** (Teltonika's command codecs assume serial dispatch — there's no correlation ID in the protocol). ## Deliverables - `src/core/write-queue.ts`: - `SocketWriteQueue` class wrapping a `net.Socket` with an internal queue. - Methods: `writeAck(buf: Buffer): Promise`, `writeCommand(buf: Buffer): Promise`. - Per-socket state: `outstandingCommand: PendingCommand | null` with `commandId`, `timeout`, `resolve`, `reject` functions. - `awaitResponse(commandId, timeoutMs): Promise` — registers the in-flight command and waits for a response delivered via a separate `notifyResponse(buf)` method. - Update `src/adapters/teltonika/index.ts` session struct to hold a `SocketWriteQueue` per session. - Update Phase 1's framing layer (task 1.4 deliverable) to write ACKs through `queue.writeAck` instead of directly to the socket. ## Specification ### Why ACKs go through the queue too Phase 1 wrote ACKs directly to the socket. Phase 2 must serialize ACKs with command writes, otherwise: ``` Time T+0: codec parser writes ACK = [00 00 00 01] Time T+0: command consumer writes Codec 12 frame ``` Without serialization, the bytes interleave at the socket level, producing garbage on the wire. The fix is mandatory — every socket write goes through one queue. ### Queue semantics ```ts class SocketWriteQueue { private chain: Promise = Promise.resolve(); private outstanding: PendingCommand | null = null; constructor(private socket: net.Socket) {} async writeAck(buf: Buffer): Promise { this.chain = this.chain.then(() => this.writeRaw(buf)); return this.chain; } async writeCommand(commandId: string, buf: Buffer, timeoutMs = 30_000): Promise { if (this.outstanding) { // Wait for the previous command to resolve/reject before queueing this one. try { await this.outstanding.promise; } catch { /* prior command failed; we still proceed */ } } const pending: PendingCommand = makePending(commandId, timeoutMs); this.outstanding = pending; this.chain = this.chain.then(() => this.writeRaw(buf)); await this.chain; // bytes are on the wire return pending.promise; // resolves when notifyResponse called or rejects on timeout } notifyResponse(buf: Buffer): void { if (!this.outstanding) { // Unsolicited response. Log warn and ignore. return; } this.outstanding.resolveWith(buf); this.outstanding = null; } private writeRaw(buf: Buffer): Promise { return new Promise((resolve, reject) => { this.socket.write(buf, (err) => err ? reject(err) : resolve()); }); } } ``` `PendingCommand` exposes a promise that resolves when `resolveWith` is called and rejects when its `setTimeout` fires. ### Backpressure on queued commands A device with many queued commands could grow the queue unboundedly. Cap per-socket queue depth: - Soft: log a warning at 5 queued commands. - Hard: reject `writeCommand` with `WriteQueueFullError` at 20 queued commands. The command consumer publishes a failure to `commands:responses`. ### Timeout default 30 seconds per command. Override via `commandTimeoutMs` in the `commands` row (Phase 2 design has `expires_at`; that's a clock-time deadline at the Directus level. The per-write timeout is the protocol-level "device didn't respond" deadline). When the timeout fires, the queue resolves the outstanding promise with a rejection (`CommandTimeoutError`). The next queued command becomes the outstanding one and proceeds. ## Acceptance criteria - [ ] Two concurrent calls to `writeAck(buf1)` and `writeCommand(id, buf2)` produce bytes on the wire in submission order, no interleaving (verified with a TCP-level recording test). - [ ] `writeCommand` blocks subsequent `writeCommand` calls until the first resolves or times out. - [ ] `notifyResponse` correctly resolves the outstanding command's promise. - [ ] Timeout firing rejects the outstanding promise; the next queued command starts. - [ ] Queue depth metric (`teltonika_command_queue_depth{imei=...}`) — wait, no: per-IMEI labels are forbidden by task 1.10's cardinality rule. Use `teltonika_command_queue_depth_total` (gauge sum across sockets) and log per-IMEI in warns. - [ ] On socket close, all pending command promises reject with `SocketClosedError`. ## Risks / open questions - The "outstanding command" model assumes the device responds to commands in order, which Teltonika's protocol does (one outstanding per socket). If we discover devices that don't, we'd need correlation IDs — but the protocol doesn't carry them, so the answer is "you can't" and we'd add a queue depth limit smaller than 1 (i.e. don't ever queue, fail fast). - ACK write order vs response delivery: when a device sends an AVL frame and we're mid-command, the AVL frame's ACK queues behind the command bytes. Worst case: device receives ACK for AVL frame slightly later. Acceptable. ## Done (Fill in once complete.)