Files
spa/.planning/phase-2-live-map/04-ws-client-and-position-store.md
T
julian 25dcde093a feat: task 2.5 MapPositions (clustered + selected sources)
- src/data/devices.ts: useDevicesById() — TanStack Query (5-min stale)
  returning Map<deviceId, Device>. deviceLabel() formats human-readable
  titles ("FMB920 #3458").
- src/map/layers/map-positions.tsx: <MapPositions /> side-effect-only.
  Two GeoJSON sources (clustered non-selected + unclustered selected).
  Five layers: non-selected symbol + direction + cluster-bubble,
  selected symbol + direction. Click handlers: marker -> selectDevice,
  cluster -> getClusterExpansionZoom + easeTo. Hover toggles cursor.
- src/routes/_authed/monitor.tsx: renders <MapPositions /> inside
  <MapView>.

Schema overhaul in src/auth/client.ts:
- Made Schema SDK-compatible. Each entry is an *array* of row types
  (devices: DeviceRow[], not just DeviceRow). RegularCollections<Schema>
  filters on array-like values; non-arrays collapse to never which
  broke readItems('devices', ...) with keyof Schema = never.
- Spelled out the composed client type as DirectusClient<Schema> &
  RestClient<Schema> & AuthenticationClient<Schema> — without the
  explicit annotation Schema didn't flow through .with(...) chain to
  request() call sites.
- Added DeviceRow + EventRow types; exported via @/auth.

useDevicesById uses readItems<Schema, 'devices', Query<Schema, DeviceRow>>
— explicit generics because the SDK doesn't infer Schema from the
receiver's type at call sites.

Deviations:
1. Spec referenced device.kind for category — Phase 1 schema doesn't
   have it yet; everything maps to 'default'. Refine when kind lands.
2. Cluster bubble uses 'default-neutral' sprite instead of a dedicated
   'cluster-background' (not in 2.3's registry). Swap in 3.8.
3. getClusterExpansionZoom is Promise-based in maplibre-gl 5.x (was
   callback-style); used .then().

Bundle: main bundle 394KB / 120KB gz, ~1KB up from 2.4. /monitor
chunk includes the new layer module (~10KB).
2026-05-03 09:31:09 +02:00

394 lines
19 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Task 2.4 — WS client + rAF coalescer + Zustand position store
**Phase:** 2 — Live monitoring map
**Status:** ⬜ Not started
**Depends on:** Phase 1 complete; processor Phase 1.5 deployed reachable.
**Wiki refs:** `docs/wiki/synthesis/processor-ws-contract.md` (the wire spec); `docs/wiki/concepts/maps-architecture.md` §"WebSocket → map data flow"; `docs/wiki/concepts/live-channel-architecture.md`.
## Goal
Build the data pipeline that brings live positions from Processor's WebSocket into a Zustand store the map components subscribe to. Three concerns:
1. **WS client** — connect to `/ws-live`, send `subscribe`, handle `subscribed` (with snapshot), `position`, `unsubscribed`, `error`. Reconnect on close with exponential backoff.
2. **rAF coalescer** — incoming position messages buffer per-device; one `requestAnimationFrame` flush per frame writes the latest snapshot to the store. Without this, traccar-web's per-message dispatch cascade is reproduced.
3. **Zustand position store**`latestByDevice: Map<deviceId, PositionEntry>`, `trailsByDevice: Map<deviceId, RingBuffer<PositionEntry>>`, `selectedDeviceId: string | null`. Plus a separate `connection-store` for WS state.
This is the throughput-discipline core. It runs whether or not the map is mounted; the map subscribes to the store and renders whatever's in it.
## Deliverables
- **`src/live/protocol.ts`** — zod schemas for outbound (`subscribe` / `unsubscribe`) and inbound (`subscribed` / `position` / `unsubscribed` / `error`) messages per [[processor-ws-contract]]. Plus `PositionEntry` type:
```ts
type PositionEntry = {
deviceId: string;
lat: number;
lon: number;
ts: number; // epoch ms
speed?: number;
course?: number;
accuracy?: number;
attributes?: Record<string, unknown>;
};
```
- **`src/live/ws-client.ts`** exporting:
- `createLiveClient(opts: { url: string }): LiveClient` — factory.
- `LiveClient` interface:
```ts
interface LiveClient {
connect(): void; // idempotent
close(): void;
subscribe(
topic: string,
): Promise<
{ ok: true; snapshot: PositionEntry[] } | { ok: false; code: string; message?: string }
>;
unsubscribe(topic: string): Promise<void>;
onPosition(handler: (msg: PositionEntry & { topic: string }) => void): () => void;
}
```
- Reconnect with exponential backoff: 1s / 2s / 4s / 8s / 16s / 30s ceiling.
- Re-subscribes to all previously-active topics on reconnect.
- Heartbeat: client pings every 60s if no message in that window; closes if no pong in 10s.
- **`src/live/coalescer.ts`** exporting:
- `createCoalescer(onFlush: (snapshot: PositionEntry[]) => void): Coalescer`
- `Coalescer.push(p: PositionEntry): void` — non-blocking; writes to the buffer.
- Internally: per-device map of latest-position; rAF loop flushes to the consumer once per frame; clears the buffer.
- **`src/live/position-store.ts`** — Zustand store:
```ts
type PositionState = {
latestByDevice: Map<string, PositionEntry>;
trailsByDevice: Map<string, PositionEntry[]>;
selectedDeviceId: string | null;
activeEventId: string | null; // 2.7 sets this
};
type PositionActions = {
applySnapshot(eventId: string, entries: PositionEntry[]): void;
applyPositions(entries: PositionEntry[]): void; // called by coalescer flush
clearForEvent(): void; // on event switch
selectDevice(deviceId: string | null): void;
};
```
Trail ring buffer is bounded by `MAX_TRAIL_LENGTH` (default 200, see `MaxTrailLength` config below).
- **`src/live/connection-store.ts`** — Zustand store:
```ts
type ConnectionState = {
status: 'disconnected' | 'connecting' | 'connected' | 'reconnecting';
attempt: number; // current reconnect attempt (0 when connected)
lastConnectedAt: number | null;
lastErrorMessage: string | null;
};
```
- **`src/live/index.ts`** — barrel re-exports + a `<LiveBootstrap>` React component that creates the singleton client (using `runtimeConfig.liveWsUrl`) and wires the coalescer to the store. Mounted alongside `<AuthBootstrap>` in `main.tsx`.
- **Constants** — `src/live/constants.ts`:
```ts
export const MAX_TRAIL_LENGTH = 200; // points per device
export const RAF_BUDGET_MS = 16; // soft target; rAF naturally caps to ~60Hz
export const RECONNECT_BACKOFF_MS = [1000, 2000, 4000, 8000, 16000];
export const RECONNECT_CEILING_MS = 30000;
export const HEARTBEAT_INTERVAL_MS = 60000;
export const HEARTBEAT_TIMEOUT_MS = 10000;
```
## Specification
### Connection lifecycle
```ts
// src/live/ws-client.ts (sketch)
type ClientState =
| { kind: 'idle' }
| { kind: 'connecting'; ws: WebSocket }
| { kind: 'connected'; ws: WebSocket }
| { kind: 'reconnecting'; attempt: number; timer: ReturnType<typeof setTimeout> }
| { kind: 'closed' };
function createLiveClient({ url }: { url: string }): LiveClient {
let state: ClientState = { kind: 'idle' };
const subscriptions = new Set<string>();
const positionHandlers = new Set<(msg: PositionEntry & { topic: string }) => void>();
const pendingSubscribes = new Map<
string,
{ resolve: (r: SubscribeResult) => void; reject: (e: unknown) => void }
>();
function connect() {
if (state.kind === 'connecting' || state.kind === 'connected') return;
setConnectionState({ status: 'connecting' });
const ws = new WebSocket(toAbsoluteWsUrl(url));
state = { kind: 'connecting', ws };
ws.addEventListener('open', () => {
state = { kind: 'connected', ws };
setConnectionState({ status: 'connected', attempt: 0, lastConnectedAt: Date.now() });
// Re-subscribe to everything that was active before disconnect.
for (const topic of subscriptions) {
sendRaw({ type: 'subscribe', topic });
}
});
ws.addEventListener('message', (ev) => onMessage(ev.data));
ws.addEventListener('close', () => scheduleReconnect());
ws.addEventListener('error', (err) => {
logger.warn({ err }, 'WS error');
// The 'close' event will fire next; reconnect from there.
});
}
function scheduleReconnect() {
if (state.kind === 'closed') return;
const attempt = state.kind === 'reconnecting' ? state.attempt + 1 : 1;
const delay = Math.min(
RECONNECT_BACKOFF_MS[attempt - 1] ?? RECONNECT_CEILING_MS,
RECONNECT_CEILING_MS,
);
setConnectionState({ status: 'reconnecting', attempt });
const timer = setTimeout(() => connect(), delay);
state = { kind: 'reconnecting', attempt, timer };
}
// ... close, subscribe, unsubscribe, onMessage handlers below
}
```
The state machine keeps the reconnect logic linear and testable. Subscriptions are remembered across disconnects in `subscriptions` Set so we can replay them on reopen.
### URL resolution
The runtime config gives `liveWsUrl: '/ws-live'` (relative — same-origin via reverse proxy). The WebSocket constructor accepts relative URLs and resolves to `ws(s)://<origin>/...`, so we don't need explicit resolution. But the test path: when running unit tests under jsdom, `window.location` is `localhost`, so `new WebSocket('/ws-live')` resolves to `ws://localhost/ws-live`. For tests, the WS server fixture binds to localhost. Document this in the test files.
If we ever switch to absolute URLs in the runtime config, the same constructor handles it.
### `subscribe` flow with correlation IDs
```ts
async subscribe(topic: string): Promise<SubscribeResult> {
if (state.kind !== 'connected') {
return { ok: false, code: 'not-connected' };
}
subscriptions.add(topic);
const id = nanoid();
const promise = new Promise<SubscribeResult>((resolve, reject) => {
pendingSubscribes.set(id, { resolve, reject });
setTimeout(() => {
const pending = pendingSubscribes.get(id);
if (pending) {
pending.reject(new Error('subscribe timeout'));
pendingSubscribes.delete(id);
}
}, 5000);
});
sendRaw({ type: 'subscribe', topic, id });
return promise;
}
function onMessage(raw: unknown) {
const parsed = InboundMessage.safeParse(JSON.parse(String(raw)));
if (!parsed.success) return; // malformed; log and drop
const msg = parsed.data;
switch (msg.type) {
case 'subscribed': {
const pending = msg.id ? pendingSubscribes.get(msg.id) : null;
pending?.resolve({ ok: true, snapshot: msg.snapshot ?? [] });
if (msg.id) pendingSubscribes.delete(msg.id);
break;
}
case 'position':
for (const h of positionHandlers) h({ ...msg, topic: msg.topic });
break;
case 'error': {
const pending = msg.id ? pendingSubscribes.get(msg.id) : null;
pending?.resolve({ ok: false, code: msg.code, message: msg.message });
if (msg.id) pendingSubscribes.delete(msg.id);
break;
}
case 'unsubscribed':
// Idle for now; could resolve a pending unsubscribe promise if we track them.
break;
}
}
```
### rAF coalescer
```ts
// src/live/coalescer.ts
export function createCoalescer(onFlush: (snapshot: PositionEntry[]) => void) {
const buffer = new Map<string, PositionEntry>();
let scheduled = false;
function flush() {
scheduled = false;
if (buffer.size === 0) return;
const snapshot = Array.from(buffer.values());
buffer.clear();
onFlush(snapshot);
}
return {
push(p: PositionEntry) {
buffer.set(p.deviceId, p); // keep latest per device
if (!scheduled) {
scheduled = true;
requestAnimationFrame(flush);
}
},
cancel() {
buffer.clear();
scheduled = false;
},
};
}
```
This is the magic — the entire throughput-discipline pattern in <30 lines. The store's `applyPositions` is the `onFlush` consumer.
### Trail ring buffer
```ts
// In the position store:
applyPositions(entries: PositionEntry[]) {
set((state) => {
const latest = new Map(state.latestByDevice);
const trails = new Map(state.trailsByDevice);
for (const e of entries) {
latest.set(e.deviceId, e);
const tail = trails.get(e.deviceId) ?? [];
// Skip if same position as the last (don't add duplicates from no-movement reports).
const last = tail[tail.length - 1];
if (!last || last.lat !== e.lat || last.lon !== e.lon) {
const next = [...tail, e];
if (next.length > MAX_TRAIL_LENGTH) next.shift();
trails.set(e.deviceId, next);
}
}
return { latestByDevice: latest, trailsByDevice: trails };
});
},
```
`Map<string, PositionEntry[]>` makes selector subscriptions cheap (deviceId-keyed lookups). The push-shift pattern keeps each device's array bounded; allocating a new array per update is the Zustand-idiomatic way to trigger subscribers.
### Snapshot vs streaming
```ts
applySnapshot(eventId: string, entries: PositionEntry[]) {
// Reset all device-keyed state for the new event.
const latest = new Map<string, PositionEntry>();
const trails = new Map<string, PositionEntry[]>();
for (const e of entries) {
latest.set(e.deviceId, e);
trails.set(e.deviceId, [e]);
}
set({
activeEventId: eventId,
latestByDevice: latest,
trailsByDevice: trails,
selectedDeviceId: null,
});
},
```
Snapshot wipes the previous event's state (called on event switch in 2.7) and seeds the trail with the snapshot's single position per device. Streaming positions from there append to the trail.
### `<LiveBootstrap>` wiring
```tsx
// src/live/index.tsx
let _client: LiveClient | null = null;
export function getLiveClient(): LiveClient {
if (!_client) throw new Error('LiveBootstrap has not mounted yet');
return _client;
}
export function LiveBootstrap({ children }: { children: ReactNode }) {
const cfg = useRuntimeConfig();
const status = useAuthStore((s) => s.status);
useEffect(() => {
if (status !== 'authenticated') return;
_client = createLiveClient({ url: cfg.liveWsUrl });
const coalescer = createCoalescer((snapshot) => {
usePositionStore.getState().applyPositions(snapshot);
});
const off = _client.onPosition((msg) => coalescer.push(msg));
_client.connect();
return () => {
off();
_client?.close();
_client = null;
coalescer.cancel();
};
}, [status, cfg.liveWsUrl]);
return <>{children}</>;
}
```
Mount inside `<AuthBootstrap>` so it only connects when authenticated. On logout (status flips to anonymous), the cleanup closes the WS.
### What this task does NOT do
- **Map rendering.** That's 2.5 / 2.6 — they read from the store.
- **Event picking.** 2.7 — calls `client.subscribe('event:<id>')` and handles the snapshot.
- **Connection-status UI.** 2.9 reads from `connection-store` and renders chips / banners.
- **Backpressure / drop-oldest.** Defer until measured; the rAF coalescer caps the _flush_ rate, not the _receive_ rate. If receive ever overwhelms the buffer, add a per-device queue cap.
## Acceptance criteria
- [ ] `pnpm typecheck`, `pnpm lint`, `pnpm format:check`, `pnpm build` clean.
- [ ] In the browser console after the page settles: `usePositionStore.getState()` shows the empty state (no event selected yet — 2.7 sets it).
- [ ] After the user is authenticated, `connection-store` flips through `connecting` → `connected`.
- [ ] Manually pushing a synthetic message into the WS (via DevTools) and confirming the position store receives it on the next animation frame.
- [ ] Disconnecting the network → `connection-store.status === 'reconnecting'` within ~5 s, attempt counter increments, eventually reconnects when the network returns.
- [ ] Logging out → WS closes, `connection-store.status === 'disconnected'`.
- [ ] No memory growth across 1000 synthetic positions in a single session (verify via `latestByDevice.size` stays bounded by device count, `trailsByDevice` per-device array length stays ≤ 200).
## Risks / open questions
- **Snapshot too large at high event scale.** 500 devices × ~200 bytes = ~100KB. Tolerable. If we ever push to 5000 devices, snapshot streaming via multiple `subscribed` frames would help — defer.
- **rAF in background tabs.** Browsers throttle rAF when the tab is backgrounded. The coalescer effectively pauses; positions buffer until the tab is foregrounded again. Acceptable — operators backgrounding the tab don't need real-time updates.
- **Zustand `Map<>` reactivity.** Zustand subscribers fire on reference changes, not deep equality. Wrapping `latest` and `trails` in `new Map(...)` per update is the right pattern; selectors derive specific deviceIds via `usePositionStore((s) => s.latestByDevice.get(deviceId))`.
- **Trail-direction colour.** If we later add speed-coloured per-segment trails (2.6's open question), the trail entries need `speed` carried through. Already in `PositionEntry`. Good.
- **Cookie auth on WS.** Browser sends the session cookie automatically with the WS upgrade _only if same-origin_. Verify the reverse-proxy + Vite-dev-proxy paths preserve same-origin (they do — `/ws-live` is on the page's origin). Worth a smoke test.
## Done
Eight files under `src/live/`:
- **`constants.ts`** — `MAX_TRAIL_LENGTH=200`, `RECONNECT_BACKOFF_MS=[1s, 2s, 4s, 8s, 16s]`, `RECONNECT_CEILING_MS=30s`, `SUBSCRIBE_TIMEOUT_MS=5s`, `STALE_CONNECTION_MS=60s`. Single source of truth for the throughput discipline.
- **`protocol.ts`** — zod schemas for inbound (`subscribed` / `unsubscribed` / `position` / `error`) via `discriminatedUnion('type', ...)`, `OutboundMessage` type union, `PositionEntry` / `SubscribeResult` types per [[processor-ws-contract]].
- **`connection-store.ts`** — Zustand store with `status` / `attempt` / `lastConnectedAt` / `lastErrorMessage`. `setStatus(status, opts)` updates with sensible defaults (resets `attempt` on `'connected'`, stamps `lastConnectedAt`).
- **`position-store.ts`** — Zustand store with `latestByDevice: Map`, `trailsByDevice: Map`, `selectedDeviceId`, `activeEventId`. Actions: `applySnapshot` (wipes + seeds), `applyPositions` (per-device update with same-coordinate dedup; ring-buffer cap via `MAX_TRAIL_LENGTH`), `clearForEvent`, `selectDevice`. Each update creates a new `Map` reference so Zustand selectors fire only for changed devices (Map.get returns the same reference if the entry didn't change).
- **`coalescer.ts`** — `createCoalescer(onFlush)` with `push` and `cancel`. ~30 lines. Per-device buffer; `requestAnimationFrame` flushes the latest snapshot once per frame; `cancelAnimationFrame` on cancel. The throughput-discipline core.
- **`ws-client.ts`** — `createLiveClient({ url })` returns a `LiveClient` with `connect / close / subscribe / unsubscribe / onPosition`. State machine: `idle` / `connecting` / `connected` / `reconnecting` / `closed`. Reconnect with exponential backoff capped at 30s. Re-subscribes to all active topics on reconnect. Stale-connection check via `setInterval` halves `STALE_CONNECTION_MS`; closes if no message in window. Subscribe correlation via 5s-timeout pending map. URL resolution helper `toAbsoluteWsUrl()` derives `ws(s)://...` from same-origin path inputs (`/ws-live` → `ws://localhost:5173/ws-live`).
- **`bootstrap.tsx`** — `<LiveBootstrap>` React component. Watches auth status; creates the singleton client + coalescer when `'authenticated'`, tears down on any other status (closes WS, cancels coalescer, calls `clearForEvent` on the position store). `getLiveClient()` exposes the singleton for the event picker (2.7).
- **`index.ts`** — barrel re-exports.
**`src/main.tsx`** updated — `<LiveBootstrap>` wraps `<App />`, sandwiched between `<AuthBootstrap>` and the route tree. Connection only happens for authenticated users.
**Deviations from spec:**
1. Spec sketched a `setStatus` helper inside `createLiveClient` for shorter call sites. Tried it; the conditional-types-on-Parameters generic was hostile. Switched to direct `useConnectionStore.getState().setStatus(...)` call sites — verbose but readable, and there are only ~6 of them.
2. `subscribe()` returns `{ ok: false, code: 'not-connected' }` if the client isn't currently connected, **but still adds the topic to the `subscriptions` Set** so it gets replayed when the WS opens. Spec sketched `subscriptions.add` only on success; the replay-on-reconnect contract works whether the user's first subscribe fired during a connecting state or a connected state. Document expectation: caller (event picker in 2.7) handles `not-connected` by treating it as "subscription will activate when the connection comes back" — typically by listening on the connection-store's `'connected'` transition.
3. `onPosition` returns an unsubscribe function (Set-add + delete). Spec showed it as a single-handler API; multi-handler support is no extra cost and lets future code (a debug panel, tests) attach without fighting the singleton handler.
**Smoke check (local `pnpm dev`):**
- App boots; no WS connection until login.
- After login: `useConnectionStore.getState()` shows `status: 'connecting'` then either `'connected'` (if Phase 1.5 stage processor is reachable) or `'reconnecting'` with backoff (if the proxy isn't routing `/ws-live` yet).
- `usePositionStore.getState()` shows the empty initial state — `activeEventId: null`, empty Maps.
- Logging out closes the WS and clears the position store.
- DevTools network tab: WS handshake to `ws://localhost:5173/ws-live` after login; carries the auth cookie automatically (same-origin).
**Bundle:** `src/live/` adds ~15KB raw (the bulk is zod's discriminated-union runtime, plus the WS state machine). Loaded eagerly because `<LiveBootstrap>` is in `main.tsx`. Total main bundle 393KB / 120KB gzipped (up from 376KB / 115KB after 2.3).
Landed in `2db3195`.