Files
julian c8a5f4cd68 Add Phase 1 and Phase 2 planning documents
ROADMAP plus granular task files per phase. Phase 1 (12 tasks + 1.13
device authority) covers Codec 8/8E/16 telemetry ingestion; Phase 2
(6 tasks) covers Codec 12/14 outbound commands; Phase 3 enumerates
deferred items.
2026-04-30 15:50:49 +02:00

4.9 KiB

Task 2.1 — Connection registry & heartbeat

Phase: 2 — Outbound commands Status: Not started Depends on: Phase 1 complete Wiki refs: docs/wiki/concepts/phase-2-commands.md § 9.3

Goal

Maintain a Redis-backed registry mapping device IMEI → Ingestion instance ID, so Directus can route outbound commands to the instance currently holding the device's TCP socket.

Deliverables

  • src/core/connection-registry.ts:
    • ConnectionRegistry class with methods register(imei), unregister(imei), unregisterAll(), heartbeat().
    • Internal state: Set<string> of held IMEIs for graceful-shutdown bulk cleanup.
  • Hook into the Teltonika session lifecycle (in src/adapters/teltonika/index.ts):
    • After IMEI handshake succeeds: registry.register(imei).
    • On socket close (any cause): registry.unregister(imei).
  • Heartbeat ticker started in src/main.ts, runs every 30 seconds.
  • Graceful shutdown calls registry.unregisterAll() (task 1.12 hook updated to include this).

Specification

Redis layout

  • Hash connections:registry: field = imei, value = instance_id. Single hash, all instances share it. Per-field TTL is not supported by Redis hashes — that's why the heartbeat key exists.
  • Key instance:heartbeat:{instance_id}: written every 30s with EX 90. Existence proves the instance is alive.

Operations

class ConnectionRegistry {
  private held = new Set<string>();
  constructor(private redis: Redis, private instanceId: string) {}

  async register(imei: string): Promise<void> {
    await this.redis.hset('connections:registry', imei, this.instanceId);
    this.held.add(imei);
  }

  async unregister(imei: string): Promise<void> {
    // Only delete if the entry still points at us.
    // (Race: a device reconnected to a different instance between
    //  our session ending and this delete.)
    const current = await this.redis.hget('connections:registry', imei);
    if (current === this.instanceId) {
      await this.redis.hdel('connections:registry', imei);
    }
    this.held.delete(imei);
  }

  async unregisterAll(): Promise<void> {
    if (this.held.size === 0) return;
    const pipeline = this.redis.pipeline();
    for (const imei of this.held) {
      pipeline.hdel('connections:registry', imei);
    }
    await pipeline.exec();
    this.held.clear();
  }

  async heartbeat(): Promise<void> {
    await this.redis.set(
      `instance:heartbeat:${this.instanceId}`,
      Date.now().toString(),
      'EX',
      90,
    );
  }
}

Heartbeat ticker

In main.ts:

const heartbeatInterval = setInterval(() => {
  registry.heartbeat().catch((err) => logger.error({ err }, 'heartbeat failed'));
}, 30_000);
// ensure cleared on shutdown

Run an initial heartbeat() immediately at startup so the instance is "alive" before the first 30s tick.

Race conditions to handle

  • Same IMEI on two instances at once. Possible when a device reconnects faster than we can detect close. The new instance's register overwrites the old's entry. The old instance's unregister checks if (current === this.instanceId) and skips the delete if not. Good.
  • Heartbeat key expires while instance is alive. Network glitch caused a write to fail. The janitor (task 2.2) will clear the registry entries; devices reconnect and the new entries get written. Acceptable — temporary loss of routability for affected devices, recoverable in seconds.
  • Hash entry without heartbeat. The instance died without graceful cleanup. Janitor handles this.

Phase 1 impact

Phase 1 code in src/adapters/teltonika/index.ts needs three hook points:

  1. After successful handshake.
  2. On socket.on('close').
  3. On graceful shutdown (already wired in task 1.12).

These are additive — no Phase 1 logic changes, only new calls to the registry.

Acceptance criteria

  • After a device handshake completes, HGET connections:registry <imei> returns the local instance ID.
  • After the socket closes, HGET connections:registry <imei> returns nil.
  • If two simulated instances "race" on the same IMEI, the registry ends up pointing at whichever instance most recently registered, and the loser's unregister does not delete the winner's entry.
  • Heartbeat key has EX 90 and is refreshed every 30s.
  • On SIGTERM, all held IMEIs are unregistered before exit.
  • Registry operations are non-blocking on the TCP read path — register/unregister use await but inside session lifecycle callbacks, not the per-frame hot path.

Risks / open questions

  • What if Redis is unavailable at registration time? Options: (A) fail the handshake, (B) accept the device but log + alert. Recommendation: B. Phase 1's "telemetry continues even if business plane is degraded" property must be preserved; commands routing is a Phase 2 nice-to-have. Track via teltonika_registry_failures_total.
  • Heartbeat write failures: log at warn, retry on next tick. Don't crash.

Done

(Fill in once complete.)