c8a5f4cd68
ROADMAP plus granular task files per phase. Phase 1 (12 tasks + 1.13 device authority) covers Codec 8/8E/16 telemetry ingestion; Phase 2 (6 tasks) covers Codec 12/14 outbound commands; Phase 3 enumerates deferred items.
4.9 KiB
4.9 KiB
Task 2.1 — Connection registry & heartbeat
Phase: 2 — Outbound commands
Status: ⬜ Not started
Depends on: Phase 1 complete
Wiki refs: docs/wiki/concepts/phase-2-commands.md § 9.3
Goal
Maintain a Redis-backed registry mapping device IMEI → Ingestion instance ID, so Directus can route outbound commands to the instance currently holding the device's TCP socket.
Deliverables
src/core/connection-registry.ts:ConnectionRegistryclass with methodsregister(imei),unregister(imei),unregisterAll(),heartbeat().- Internal state:
Set<string>of held IMEIs for graceful-shutdown bulk cleanup.
- Hook into the Teltonika session lifecycle (in
src/adapters/teltonika/index.ts):- After IMEI handshake succeeds:
registry.register(imei). - On socket close (any cause):
registry.unregister(imei).
- After IMEI handshake succeeds:
- Heartbeat ticker started in
src/main.ts, runs every 30 seconds. - Graceful shutdown calls
registry.unregisterAll()(task 1.12 hook updated to include this).
Specification
Redis layout
- Hash
connections:registry: field =imei, value =instance_id. Single hash, all instances share it. Per-field TTL is not supported by Redis hashes — that's why the heartbeat key exists. - Key
instance:heartbeat:{instance_id}: written every 30s withEX 90. Existence proves the instance is alive.
Operations
class ConnectionRegistry {
private held = new Set<string>();
constructor(private redis: Redis, private instanceId: string) {}
async register(imei: string): Promise<void> {
await this.redis.hset('connections:registry', imei, this.instanceId);
this.held.add(imei);
}
async unregister(imei: string): Promise<void> {
// Only delete if the entry still points at us.
// (Race: a device reconnected to a different instance between
// our session ending and this delete.)
const current = await this.redis.hget('connections:registry', imei);
if (current === this.instanceId) {
await this.redis.hdel('connections:registry', imei);
}
this.held.delete(imei);
}
async unregisterAll(): Promise<void> {
if (this.held.size === 0) return;
const pipeline = this.redis.pipeline();
for (const imei of this.held) {
pipeline.hdel('connections:registry', imei);
}
await pipeline.exec();
this.held.clear();
}
async heartbeat(): Promise<void> {
await this.redis.set(
`instance:heartbeat:${this.instanceId}`,
Date.now().toString(),
'EX',
90,
);
}
}
Heartbeat ticker
In main.ts:
const heartbeatInterval = setInterval(() => {
registry.heartbeat().catch((err) => logger.error({ err }, 'heartbeat failed'));
}, 30_000);
// ensure cleared on shutdown
Run an initial heartbeat() immediately at startup so the instance is "alive" before the first 30s tick.
Race conditions to handle
- Same IMEI on two instances at once. Possible when a device reconnects faster than we can detect close. The new instance's
registeroverwrites the old's entry. The old instance'sunregisterchecksif (current === this.instanceId)and skips the delete if not. Good. - Heartbeat key expires while instance is alive. Network glitch caused a write to fail. The janitor (task 2.2) will clear the registry entries; devices reconnect and the new entries get written. Acceptable — temporary loss of routability for affected devices, recoverable in seconds.
- Hash entry without heartbeat. The instance died without graceful cleanup. Janitor handles this.
Phase 1 impact
Phase 1 code in src/adapters/teltonika/index.ts needs three hook points:
- After successful handshake.
- On
socket.on('close'). - On graceful shutdown (already wired in task 1.12).
These are additive — no Phase 1 logic changes, only new calls to the registry.
Acceptance criteria
- After a device handshake completes,
HGET connections:registry <imei>returns the local instance ID. - After the socket closes,
HGET connections:registry <imei>returns nil. - If two simulated instances "race" on the same IMEI, the registry ends up pointing at whichever instance most recently registered, and the loser's
unregisterdoes not delete the winner's entry. - Heartbeat key has
EX 90and is refreshed every 30s. - On SIGTERM, all held IMEIs are unregistered before exit.
- Registry operations are non-blocking on the TCP read path — register/unregister use
awaitbut inside session lifecycle callbacks, not the per-frame hot path.
Risks / open questions
- What if Redis is unavailable at registration time? Options: (A) fail the handshake, (B) accept the device but log + alert. Recommendation: B. Phase 1's "telemetry continues even if business plane is degraded" property must be preserved; commands routing is a Phase 2 nice-to-have. Track via
teltonika_registry_failures_total. - Heartbeat write failures: log at warn, retry on next tick. Don't crash.
Done
(Fill in once complete.)