c8a5f4cd68
ROADMAP plus granular task files per phase. Phase 1 (12 tasks + 1.13 device authority) covers Codec 8/8E/16 telemetry ingestion; Phase 2 (6 tasks) covers Codec 12/14 outbound commands; Phase 3 enumerates deferred items.
119 lines
4.9 KiB
Markdown
119 lines
4.9 KiB
Markdown
# Task 2.1 — Connection registry & heartbeat
|
|
|
|
**Phase:** 2 — Outbound commands
|
|
**Status:** ⬜ Not started
|
|
**Depends on:** Phase 1 complete
|
|
**Wiki refs:** `docs/wiki/concepts/phase-2-commands.md` § 9.3
|
|
|
|
## Goal
|
|
|
|
Maintain a Redis-backed registry mapping device IMEI → Ingestion instance ID, so Directus can route outbound commands to the instance currently holding the device's TCP socket.
|
|
|
|
## Deliverables
|
|
|
|
- `src/core/connection-registry.ts`:
|
|
- `ConnectionRegistry` class with methods `register(imei)`, `unregister(imei)`, `unregisterAll()`, `heartbeat()`.
|
|
- Internal state: `Set<string>` of held IMEIs for graceful-shutdown bulk cleanup.
|
|
- Hook into the Teltonika session lifecycle (in `src/adapters/teltonika/index.ts`):
|
|
- After IMEI handshake succeeds: `registry.register(imei)`.
|
|
- On socket close (any cause): `registry.unregister(imei)`.
|
|
- Heartbeat ticker started in `src/main.ts`, runs every 30 seconds.
|
|
- Graceful shutdown calls `registry.unregisterAll()` (task 1.12 hook updated to include this).
|
|
|
|
## Specification
|
|
|
|
### Redis layout
|
|
|
|
- **Hash** `connections:registry`: field = `imei`, value = `instance_id`. Single hash, all instances share it. Per-field TTL is not supported by Redis hashes — that's why the heartbeat key exists.
|
|
- **Key** `instance:heartbeat:{instance_id}`: written every 30s with `EX 90`. Existence proves the instance is alive.
|
|
|
|
### Operations
|
|
|
|
```ts
|
|
class ConnectionRegistry {
|
|
private held = new Set<string>();
|
|
constructor(private redis: Redis, private instanceId: string) {}
|
|
|
|
async register(imei: string): Promise<void> {
|
|
await this.redis.hset('connections:registry', imei, this.instanceId);
|
|
this.held.add(imei);
|
|
}
|
|
|
|
async unregister(imei: string): Promise<void> {
|
|
// Only delete if the entry still points at us.
|
|
// (Race: a device reconnected to a different instance between
|
|
// our session ending and this delete.)
|
|
const current = await this.redis.hget('connections:registry', imei);
|
|
if (current === this.instanceId) {
|
|
await this.redis.hdel('connections:registry', imei);
|
|
}
|
|
this.held.delete(imei);
|
|
}
|
|
|
|
async unregisterAll(): Promise<void> {
|
|
if (this.held.size === 0) return;
|
|
const pipeline = this.redis.pipeline();
|
|
for (const imei of this.held) {
|
|
pipeline.hdel('connections:registry', imei);
|
|
}
|
|
await pipeline.exec();
|
|
this.held.clear();
|
|
}
|
|
|
|
async heartbeat(): Promise<void> {
|
|
await this.redis.set(
|
|
`instance:heartbeat:${this.instanceId}`,
|
|
Date.now().toString(),
|
|
'EX',
|
|
90,
|
|
);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Heartbeat ticker
|
|
|
|
In `main.ts`:
|
|
|
|
```ts
|
|
const heartbeatInterval = setInterval(() => {
|
|
registry.heartbeat().catch((err) => logger.error({ err }, 'heartbeat failed'));
|
|
}, 30_000);
|
|
// ensure cleared on shutdown
|
|
```
|
|
|
|
Run an initial `heartbeat()` immediately at startup so the instance is "alive" before the first 30s tick.
|
|
|
|
### Race conditions to handle
|
|
|
|
- **Same IMEI on two instances at once.** Possible when a device reconnects faster than we can detect close. The new instance's `register` overwrites the old's entry. The old instance's `unregister` checks `if (current === this.instanceId)` and skips the delete if not. Good.
|
|
- **Heartbeat key expires while instance is alive.** Network glitch caused a write to fail. The janitor (task 2.2) will clear the registry entries; devices reconnect and the new entries get written. Acceptable — temporary loss of routability for affected devices, recoverable in seconds.
|
|
- **Hash entry without heartbeat.** The instance died without graceful cleanup. Janitor handles this.
|
|
|
|
### Phase 1 impact
|
|
|
|
Phase 1 code in `src/adapters/teltonika/index.ts` needs three hook points:
|
|
1. After successful handshake.
|
|
2. On `socket.on('close')`.
|
|
3. On graceful shutdown (already wired in task 1.12).
|
|
|
|
These are additive — no Phase 1 logic changes, only new calls to the registry.
|
|
|
|
## Acceptance criteria
|
|
|
|
- [ ] After a device handshake completes, `HGET connections:registry <imei>` returns the local instance ID.
|
|
- [ ] After the socket closes, `HGET connections:registry <imei>` returns nil.
|
|
- [ ] If two simulated instances "race" on the same IMEI, the registry ends up pointing at whichever instance most recently registered, and the loser's `unregister` does not delete the winner's entry.
|
|
- [ ] Heartbeat key has `EX 90` and is refreshed every 30s.
|
|
- [ ] On SIGTERM, all held IMEIs are unregistered before exit.
|
|
- [ ] Registry operations are non-blocking on the TCP read path — register/unregister use `await` but inside session lifecycle callbacks, not the per-frame hot path.
|
|
|
|
## Risks / open questions
|
|
|
|
- What if Redis is unavailable at registration time? Options: (A) fail the handshake, (B) accept the device but log + alert. **Recommendation: B.** Phase 1's "telemetry continues even if business plane is degraded" property must be preserved; commands routing is a Phase 2 nice-to-have. Track via `teltonika_registry_failures_total`.
|
|
- Heartbeat write failures: log at warn, retry on next tick. Don't crash.
|
|
|
|
## Done
|
|
|
|
(Fill in once complete.)
|