c8a5f4cd68
ROADMAP plus granular task files per phase. Phase 1 (12 tasks + 1.13 device authority) covers Codec 8/8E/16 telemetry ingestion; Phase 2 (6 tasks) covers Codec 12/14 outbound commands; Phase 3 enumerates deferred items.
113 lines
5.3 KiB
Markdown
113 lines
5.3 KiB
Markdown
# Task 2.3 — Per-socket write queue & outstanding-command tracker
|
|
|
|
**Phase:** 2 — Outbound commands
|
|
**Status:** ⬜ Not started
|
|
**Depends on:** Phase 1 complete (specifically the session loop in 1.4)
|
|
**Wiki refs:** `docs/wiki/concepts/phase-2-commands.md` § 9.6, § 9.8
|
|
|
|
## Goal
|
|
|
|
Provide a per-socket serialization layer so:
|
|
|
|
1. Outbound command frames do not interleave with codec ACK writes (which would corrupt the byte stream).
|
|
2. Only **one command is outstanding per socket at a time** (Teltonika's command codecs assume serial dispatch — there's no correlation ID in the protocol).
|
|
|
|
## Deliverables
|
|
|
|
- `src/core/write-queue.ts`:
|
|
- `SocketWriteQueue` class wrapping a `net.Socket` with an internal queue.
|
|
- Methods: `writeAck(buf: Buffer): Promise<void>`, `writeCommand(buf: Buffer): Promise<void>`.
|
|
- Per-socket state: `outstandingCommand: PendingCommand | null` with `commandId`, `timeout`, `resolve`, `reject` functions.
|
|
- `awaitResponse(commandId, timeoutMs): Promise<Buffer>` — registers the in-flight command and waits for a response delivered via a separate `notifyResponse(buf)` method.
|
|
- Update `src/adapters/teltonika/index.ts` session struct to hold a `SocketWriteQueue` per session.
|
|
- Update Phase 1's framing layer (task 1.4 deliverable) to write ACKs through `queue.writeAck` instead of directly to the socket.
|
|
|
|
## Specification
|
|
|
|
### Why ACKs go through the queue too
|
|
|
|
Phase 1 wrote ACKs directly to the socket. Phase 2 must serialize ACKs with command writes, otherwise:
|
|
|
|
```
|
|
Time T+0: codec parser writes ACK = [00 00 00 01]
|
|
Time T+0: command consumer writes Codec 12 frame
|
|
```
|
|
|
|
Without serialization, the bytes interleave at the socket level, producing garbage on the wire. The fix is mandatory — every socket write goes through one queue.
|
|
|
|
### Queue semantics
|
|
|
|
```ts
|
|
class SocketWriteQueue {
|
|
private chain: Promise<void> = Promise.resolve();
|
|
private outstanding: PendingCommand | null = null;
|
|
|
|
constructor(private socket: net.Socket) {}
|
|
|
|
async writeAck(buf: Buffer): Promise<void> {
|
|
this.chain = this.chain.then(() => this.writeRaw(buf));
|
|
return this.chain;
|
|
}
|
|
|
|
async writeCommand(commandId: string, buf: Buffer, timeoutMs = 30_000): Promise<Buffer> {
|
|
if (this.outstanding) {
|
|
// Wait for the previous command to resolve/reject before queueing this one.
|
|
try { await this.outstanding.promise; } catch { /* prior command failed; we still proceed */ }
|
|
}
|
|
const pending: PendingCommand = makePending(commandId, timeoutMs);
|
|
this.outstanding = pending;
|
|
this.chain = this.chain.then(() => this.writeRaw(buf));
|
|
await this.chain; // bytes are on the wire
|
|
return pending.promise; // resolves when notifyResponse called or rejects on timeout
|
|
}
|
|
|
|
notifyResponse(buf: Buffer): void {
|
|
if (!this.outstanding) {
|
|
// Unsolicited response. Log warn and ignore.
|
|
return;
|
|
}
|
|
this.outstanding.resolveWith(buf);
|
|
this.outstanding = null;
|
|
}
|
|
|
|
private writeRaw(buf: Buffer): Promise<void> {
|
|
return new Promise((resolve, reject) => {
|
|
this.socket.write(buf, (err) => err ? reject(err) : resolve());
|
|
});
|
|
}
|
|
}
|
|
```
|
|
|
|
`PendingCommand` exposes a promise that resolves when `resolveWith` is called and rejects when its `setTimeout` fires.
|
|
|
|
### Backpressure on queued commands
|
|
|
|
A device with many queued commands could grow the queue unboundedly. Cap per-socket queue depth:
|
|
|
|
- Soft: log a warning at 5 queued commands.
|
|
- Hard: reject `writeCommand` with `WriteQueueFullError` at 20 queued commands. The command consumer publishes a failure to `commands:responses`.
|
|
|
|
### Timeout default
|
|
|
|
30 seconds per command. Override via `commandTimeoutMs` in the `commands` row (Phase 2 design has `expires_at`; that's a clock-time deadline at the Directus level. The per-write timeout is the protocol-level "device didn't respond" deadline).
|
|
|
|
When the timeout fires, the queue resolves the outstanding promise with a rejection (`CommandTimeoutError`). The next queued command becomes the outstanding one and proceeds.
|
|
|
|
## Acceptance criteria
|
|
|
|
- [ ] Two concurrent calls to `writeAck(buf1)` and `writeCommand(id, buf2)` produce bytes on the wire in submission order, no interleaving (verified with a TCP-level recording test).
|
|
- [ ] `writeCommand` blocks subsequent `writeCommand` calls until the first resolves or times out.
|
|
- [ ] `notifyResponse` correctly resolves the outstanding command's promise.
|
|
- [ ] Timeout firing rejects the outstanding promise; the next queued command starts.
|
|
- [ ] Queue depth metric (`teltonika_command_queue_depth{imei=...}`) — wait, no: per-IMEI labels are forbidden by task 1.10's cardinality rule. Use `teltonika_command_queue_depth_total` (gauge sum across sockets) and log per-IMEI in warns.
|
|
- [ ] On socket close, all pending command promises reject with `SocketClosedError`.
|
|
|
|
## Risks / open questions
|
|
|
|
- The "outstanding command" model assumes the device responds to commands in order, which Teltonika's protocol does (one outstanding per socket). If we discover devices that don't, we'd need correlation IDs — but the protocol doesn't carry them, so the answer is "you can't" and we'd add a queue depth limit smaller than 1 (i.e. don't ever queue, fail fast).
|
|
- ACK write order vs response delivery: when a device sends an AVL frame and we're mid-command, the AVL frame's ACK queues behind the command bytes. Worst case: device receives ACK for AVL frame slightly later. Acceptable.
|
|
|
|
## Done
|
|
|
|
(Fill in once complete.)
|