Files
spa/.planning/phase-2-live-map/04-ws-client-and-position-store.md
T
julian 05543529e4 docs(planning): file Phase 2 task specs (live monitoring map)
Nine task files matching Phase 1's shape (Goal / Deliverables / Spec /
Acceptance / Risks / Done). README updated with full sequencing diagram,
files-modified outline, tech stack additions, design rules, and phase
acceptance.

| #   | Task                                                                  |
| --- | --------------------------------------------------------------------- |
| 2.1 | MapView singleton + mapReady gate                                     |
| 2.2 | Tile-source switcher (Esri / OpenTopoMap / OSM / optional Google)     |
| 2.3 | Sprite preload — 7 racing categories x 4 colour variants              |
| 2.4 | WS client + rAF coalescer + Zustand position store + connection store |
| 2.5 | MapPositions — clustered + selected sources                           |
| 2.6 | MapTrails — bounded ring buffer, polyline rendering                   |
| 2.7 | Event picker — TanStack Query + WS subscription orchestration         |
| 2.8 | Camera control trio — default-fit / selected-follow / one-shot        |
| 2.9 | Connection status + per-device last-seen indicators                   |

Sequencing: 2.1 and 2.4 are parallel foundations (singleton vs data
pipeline). Once both land, 2.5 / 2.6 / 2.7 / 2.9 fan out independently.
2.2 / 2.3 only need 2.1. 2.8 sits at the end on top of 2.1 + 2.5.

Each task documents its deliverables down to file paths + interface
shapes, includes concrete code sketches in the Specification, lists
explicit out-of-scope items, and surfaces risks for the implementer
to think about. An agent (or future me) can pick up any single task
and ship it without re-deriving the design from the wiki.

Resolved Phase 2 design decisions baked into the task files:
- Trails: flat-colour-per-device for v1, defer speed-coloured segments
  to a Phase 3 polish task.
- Cluster params: 14/50 (traccar default); tune after seeing real data.
- Event picker placement: top-left dropdown.
- Multi-event: out — single-select, one event at a time.
- Stale-position visual: fade icon opacity; defer warning badges.
2026-05-03 09:28:16 +02:00

15 KiB
Raw Blame History

Task 2.4 — WS client + rAF coalescer + Zustand position store

Phase: 2 — Live monitoring map Status: Not started Depends on: Phase 1 complete; processor Phase 1.5 deployed reachable. Wiki refs: docs/wiki/synthesis/processor-ws-contract.md (the wire spec); docs/wiki/concepts/maps-architecture.md §"WebSocket → map data flow"; docs/wiki/concepts/live-channel-architecture.md.

Goal

Build the data pipeline that brings live positions from Processor's WebSocket into a Zustand store the map components subscribe to. Three concerns:

  1. WS client — connect to /ws-live, send subscribe, handle subscribed (with snapshot), position, unsubscribed, error. Reconnect on close with exponential backoff.
  2. rAF coalescer — incoming position messages buffer per-device; one requestAnimationFrame flush per frame writes the latest snapshot to the store. Without this, traccar-web's per-message dispatch cascade is reproduced.
  3. Zustand position storelatestByDevice: Map<deviceId, PositionEntry>, trailsByDevice: Map<deviceId, RingBuffer<PositionEntry>>, selectedDeviceId: string | null. Plus a separate connection-store for WS state.

This is the throughput-discipline core. It runs whether or not the map is mounted; the map subscribes to the store and renders whatever's in it.

Deliverables

  • src/live/protocol.ts — zod schemas for outbound (subscribe / unsubscribe) and inbound (subscribed / position / unsubscribed / error) messages per processor-ws-contract. Plus PositionEntry type:
    type PositionEntry = {
      deviceId: string;
      lat: number;
      lon: number;
      ts: number;       // epoch ms
      speed?: number;
      course?: number;
      accuracy?: number;
      attributes?: Record<string, unknown>;
    };
    
  • src/live/ws-client.ts exporting:
    • createLiveClient(opts: { url: string }): LiveClient — factory.
    • LiveClient interface:
      interface LiveClient {
        connect(): void;             // idempotent
        close(): void;
        subscribe(topic: string): Promise<{ ok: true; snapshot: PositionEntry[] } | { ok: false; code: string; message?: string }>;
        unsubscribe(topic: string): Promise<void>;
        onPosition(handler: (msg: PositionEntry & { topic: string }) => void): () => void;
      }
      
    • Reconnect with exponential backoff: 1s / 2s / 4s / 8s / 16s / 30s ceiling.
    • Re-subscribes to all previously-active topics on reconnect.
    • Heartbeat: client pings every 60s if no message in that window; closes if no pong in 10s.
  • src/live/coalescer.ts exporting:
    • createCoalescer(onFlush: (snapshot: PositionEntry[]) => void): Coalescer
    • Coalescer.push(p: PositionEntry): void — non-blocking; writes to the buffer.
    • Internally: per-device map of latest-position; rAF loop flushes to the consumer once per frame; clears the buffer.
  • src/live/position-store.ts — Zustand store:
    type PositionState = {
      latestByDevice: Map<string, PositionEntry>;
      trailsByDevice: Map<string, PositionEntry[]>;
      selectedDeviceId: string | null;
      activeEventId: string | null;     // 2.7 sets this
    };
    
    type PositionActions = {
      applySnapshot(eventId: string, entries: PositionEntry[]): void;
      applyPositions(entries: PositionEntry[]): void;     // called by coalescer flush
      clearForEvent(): void;                              // on event switch
      selectDevice(deviceId: string | null): void;
    };
    
    Trail ring buffer is bounded by MAX_TRAIL_LENGTH (default 200, see MaxTrailLength config below).
  • src/live/connection-store.ts — Zustand store:
    type ConnectionState = {
      status: 'disconnected' | 'connecting' | 'connected' | 'reconnecting';
      attempt: number;          // current reconnect attempt (0 when connected)
      lastConnectedAt: number | null;
      lastErrorMessage: string | null;
    };
    
  • src/live/index.ts — barrel re-exports + a <LiveBootstrap> React component that creates the singleton client (using runtimeConfig.liveWsUrl) and wires the coalescer to the store. Mounted alongside <AuthBootstrap> in main.tsx.
  • Constantssrc/live/constants.ts:
    export const MAX_TRAIL_LENGTH = 200;          // points per device
    export const RAF_BUDGET_MS = 16;              // soft target; rAF naturally caps to ~60Hz
    export const RECONNECT_BACKOFF_MS = [1000, 2000, 4000, 8000, 16000];
    export const RECONNECT_CEILING_MS = 30000;
    export const HEARTBEAT_INTERVAL_MS = 60000;
    export const HEARTBEAT_TIMEOUT_MS = 10000;
    

Specification

Connection lifecycle

// src/live/ws-client.ts (sketch)

type ClientState =
  | { kind: 'idle' }
  | { kind: 'connecting'; ws: WebSocket }
  | { kind: 'connected'; ws: WebSocket }
  | { kind: 'reconnecting'; attempt: number; timer: ReturnType<typeof setTimeout> }
  | { kind: 'closed' };

function createLiveClient({ url }: { url: string }): LiveClient {
  let state: ClientState = { kind: 'idle' };
  const subscriptions = new Set<string>();
  const positionHandlers = new Set<(msg: PositionEntry & { topic: string }) => void>();
  const pendingSubscribes = new Map<string, { resolve: (r: SubscribeResult) => void; reject: (e: unknown) => void }>();

  function connect() {
    if (state.kind === 'connecting' || state.kind === 'connected') return;
    setConnectionState({ status: 'connecting' });

    const ws = new WebSocket(toAbsoluteWsUrl(url));
    state = { kind: 'connecting', ws };

    ws.addEventListener('open', () => {
      state = { kind: 'connected', ws };
      setConnectionState({ status: 'connected', attempt: 0, lastConnectedAt: Date.now() });
      // Re-subscribe to everything that was active before disconnect.
      for (const topic of subscriptions) {
        sendRaw({ type: 'subscribe', topic });
      }
    });

    ws.addEventListener('message', (ev) => onMessage(ev.data));
    ws.addEventListener('close', () => scheduleReconnect());
    ws.addEventListener('error', (err) => {
      logger.warn({ err }, 'WS error');
      // The 'close' event will fire next; reconnect from there.
    });
  }

  function scheduleReconnect() {
    if (state.kind === 'closed') return;
    const attempt =
      state.kind === 'reconnecting' ? state.attempt + 1 : 1;
    const delay = Math.min(
      RECONNECT_BACKOFF_MS[attempt - 1] ?? RECONNECT_CEILING_MS,
      RECONNECT_CEILING_MS,
    );
    setConnectionState({ status: 'reconnecting', attempt });
    const timer = setTimeout(() => connect(), delay);
    state = { kind: 'reconnecting', attempt, timer };
  }

  // ... close, subscribe, unsubscribe, onMessage handlers below
}

The state machine keeps the reconnect logic linear and testable. Subscriptions are remembered across disconnects in subscriptions Set so we can replay them on reopen.

URL resolution

The runtime config gives liveWsUrl: '/ws-live' (relative — same-origin via reverse proxy). The WebSocket constructor accepts relative URLs and resolves to ws(s)://<origin>/..., so we don't need explicit resolution. But the test path: when running unit tests under jsdom, window.location is localhost, so new WebSocket('/ws-live') resolves to ws://localhost/ws-live. For tests, the WS server fixture binds to localhost. Document this in the test files.

If we ever switch to absolute URLs in the runtime config, the same constructor handles it.

subscribe flow with correlation IDs

async subscribe(topic: string): Promise<SubscribeResult> {
  if (state.kind !== 'connected') {
    return { ok: false, code: 'not-connected' };
  }
  subscriptions.add(topic);
  const id = nanoid();
  const promise = new Promise<SubscribeResult>((resolve, reject) => {
    pendingSubscribes.set(id, { resolve, reject });
    setTimeout(() => {
      const pending = pendingSubscribes.get(id);
      if (pending) {
        pending.reject(new Error('subscribe timeout'));
        pendingSubscribes.delete(id);
      }
    }, 5000);
  });
  sendRaw({ type: 'subscribe', topic, id });
  return promise;
}

function onMessage(raw: unknown) {
  const parsed = InboundMessage.safeParse(JSON.parse(String(raw)));
  if (!parsed.success) return; // malformed; log and drop

  const msg = parsed.data;
  switch (msg.type) {
    case 'subscribed': {
      const pending = msg.id ? pendingSubscribes.get(msg.id) : null;
      pending?.resolve({ ok: true, snapshot: msg.snapshot ?? [] });
      if (msg.id) pendingSubscribes.delete(msg.id);
      break;
    }
    case 'position':
      for (const h of positionHandlers) h({ ...msg, topic: msg.topic });
      break;
    case 'error': {
      const pending = msg.id ? pendingSubscribes.get(msg.id) : null;
      pending?.resolve({ ok: false, code: msg.code, message: msg.message });
      if (msg.id) pendingSubscribes.delete(msg.id);
      break;
    }
    case 'unsubscribed':
      // Idle for now; could resolve a pending unsubscribe promise if we track them.
      break;
  }
}

rAF coalescer

// src/live/coalescer.ts
export function createCoalescer(onFlush: (snapshot: PositionEntry[]) => void) {
  const buffer = new Map<string, PositionEntry>();
  let scheduled = false;

  function flush() {
    scheduled = false;
    if (buffer.size === 0) return;
    const snapshot = Array.from(buffer.values());
    buffer.clear();
    onFlush(snapshot);
  }

  return {
    push(p: PositionEntry) {
      buffer.set(p.deviceId, p); // keep latest per device
      if (!scheduled) {
        scheduled = true;
        requestAnimationFrame(flush);
      }
    },
    cancel() {
      buffer.clear();
      scheduled = false;
    },
  };
}

This is the magic — the entire throughput-discipline pattern in <30 lines. The store's applyPositions is the onFlush consumer.

Trail ring buffer

// In the position store:
applyPositions(entries: PositionEntry[]) {
  set((state) => {
    const latest = new Map(state.latestByDevice);
    const trails = new Map(state.trailsByDevice);
    for (const e of entries) {
      latest.set(e.deviceId, e);
      const tail = trails.get(e.deviceId) ?? [];
      // Skip if same position as the last (don't add duplicates from no-movement reports).
      const last = tail[tail.length - 1];
      if (!last || last.lat !== e.lat || last.lon !== e.lon) {
        const next = [...tail, e];
        if (next.length > MAX_TRAIL_LENGTH) next.shift();
        trails.set(e.deviceId, next);
      }
    }
    return { latestByDevice: latest, trailsByDevice: trails };
  });
},

Map<string, PositionEntry[]> makes selector subscriptions cheap (deviceId-keyed lookups). The push-shift pattern keeps each device's array bounded; allocating a new array per update is the Zustand-idiomatic way to trigger subscribers.

Snapshot vs streaming

applySnapshot(eventId: string, entries: PositionEntry[]) {
  // Reset all device-keyed state for the new event.
  const latest = new Map<string, PositionEntry>();
  const trails = new Map<string, PositionEntry[]>();
  for (const e of entries) {
    latest.set(e.deviceId, e);
    trails.set(e.deviceId, [e]);
  }
  set({
    activeEventId: eventId,
    latestByDevice: latest,
    trailsByDevice: trails,
    selectedDeviceId: null,
  });
},

Snapshot wipes the previous event's state (called on event switch in 2.7) and seeds the trail with the snapshot's single position per device. Streaming positions from there append to the trail.

<LiveBootstrap> wiring

// src/live/index.tsx
let _client: LiveClient | null = null;

export function getLiveClient(): LiveClient {
  if (!_client) throw new Error('LiveBootstrap has not mounted yet');
  return _client;
}

export function LiveBootstrap({ children }: { children: ReactNode }) {
  const cfg = useRuntimeConfig();
  const status = useAuthStore((s) => s.status);

  useEffect(() => {
    if (status !== 'authenticated') return;
    _client = createLiveClient({ url: cfg.liveWsUrl });
    const coalescer = createCoalescer((snapshot) => {
      usePositionStore.getState().applyPositions(snapshot);
    });
    const off = _client.onPosition((msg) => coalescer.push(msg));
    _client.connect();
    return () => {
      off();
      _client?.close();
      _client = null;
      coalescer.cancel();
    };
  }, [status, cfg.liveWsUrl]);

  return <>{children}</>;
}

Mount inside <AuthBootstrap> so it only connects when authenticated. On logout (status flips to anonymous), the cleanup closes the WS.

What this task does NOT do

  • Map rendering. That's 2.5 / 2.6 — they read from the store.
  • Event picking. 2.7 — calls client.subscribe('event:<id>') and handles the snapshot.
  • Connection-status UI. 2.9 reads from connection-store and renders chips / banners.
  • Backpressure / drop-oldest. Defer until measured; the rAF coalescer caps the flush rate, not the receive rate. If receive ever overwhelms the buffer, add a per-device queue cap.

Acceptance criteria

  • pnpm typecheck, pnpm lint, pnpm format:check, pnpm build clean.
  • In the browser console after the page settles: usePositionStore.getState() shows the empty state (no event selected yet — 2.7 sets it).
  • After the user is authenticated, connection-store flips through connectingconnected.
  • Manually pushing a synthetic message into the WS (via DevTools) and confirming the position store receives it on the next animation frame.
  • Disconnecting the network → connection-store.status === 'reconnecting' within ~5 s, attempt counter increments, eventually reconnects when the network returns.
  • Logging out → WS closes, connection-store.status === 'disconnected'.
  • No memory growth across 1000 synthetic positions in a single session (verify via latestByDevice.size stays bounded by device count, trailsByDevice per-device array length stays ≤ 200).

Risks / open questions

  • Snapshot too large at high event scale. 500 devices × ~200 bytes = ~100KB. Tolerable. If we ever push to 5000 devices, snapshot streaming via multiple subscribed frames would help — defer.
  • rAF in background tabs. Browsers throttle rAF when the tab is backgrounded. The coalescer effectively pauses; positions buffer until the tab is foregrounded again. Acceptable — operators backgrounding the tab don't need real-time updates.
  • Zustand Map<> reactivity. Zustand subscribers fire on reference changes, not deep equality. Wrapping latest and trails in new Map(...) per update is the right pattern; selectors derive specific deviceIds via usePositionStore((s) => s.latestByDevice.get(deviceId)).
  • Trail-direction colour. If we later add speed-coloured per-segment trails (2.6's open question), the trail entries need speed carried through. Already in PositionEntry. Good.
  • Cookie auth on WS. Browser sends the session cookie automatically with the WS upgrade only if same-origin. Verify the reverse-proxy + Vite-dev-proxy paths preserve same-origin (they do — /ws-live is on the page's origin). Worth a smoke test.

Done

(Filled in when the task lands.)