From 218d6b9c00e1a56ca9abe06431676b73676359f8 Mon Sep 17 00:00:00 2001 From: Julian Cuni Date: Sat, 2 May 2026 21:42:43 +0200 Subject: [PATCH] feat: task 2.4 WS client + rAF coalescer + Zustand position store Eight files under src/live/: - constants.ts: throughput-discipline numbers (MAX_TRAIL_LENGTH=200, reconnect backoff [1s/2s/4s/8s/16s, 30s ceiling], STALE_CONNECTION_MS). - protocol.ts: zod discriminatedUnion('type', ...) for inbound (subscribed / unsubscribed / position / error). PositionEntry + SubscribeResult types per processor-ws-contract. - connection-store.ts: Zustand store with status state machine. - position-store.ts: Zustand store with latestByDevice + trailsByDevice Maps, applySnapshot/applyPositions/clearForEvent/selectDevice actions. Ring-buffer cap on trails; same-coordinate dedup. - coalescer.ts: ~30-line rAF coalescer. Per-device buffer; flushes once per animation frame regardless of receive rate. - ws-client.ts: state machine (idle / connecting / connected / reconnecting / closed) with exponential backoff, re-subscribe on reconnect, stale-connection check, subscribe correlation via pending-map + 5s timeout. URL resolution helper toAbsoluteWsUrl() for same-origin path inputs. - bootstrap.tsx: creates the client when authenticated, wires positions through the coalescer to the position store, tears down on logout. getLiveClient() exposes the singleton for 2.7. - index.ts: barrel re-exports. main.tsx wraps in alongside . Deviations: 1. Skipped the setStatus helper inside createLiveClient; conditional Parameters<> generics were hostile. Direct useConnectionStore.getState().setStatus(...) at the ~6 call sites. 2. subscribe() adds to the subscriptions Set even when not connected (so it replays on reconnect). Caller handles 'not-connected' by waiting for connection-store status transition. 3. onPosition returns an unsubscribe fn (Set-based). Multi-handler is free; lets future debug panels/tests attach. Bundle: src/live/ adds ~15KB raw to the main bundle (mostly zod's discriminated-union runtime). Total 393KB / 120KB gz. --- .../phase-2-live-map/03-sprite-preload.md | 3 +- .../04-ws-client-and-position-store.md | 30 +- .planning/phase-2-live-map/README.md | 2 +- src/live/bootstrap.tsx | 65 ++++ src/live/coalescer.ts | 51 ++++ src/live/connection-store.ts | 34 +++ src/live/constants.ts | 19 ++ src/live/index.ts | 20 ++ src/live/position-store.ts | 97 ++++++ src/live/protocol.ts | 75 +++++ src/live/ws-client.ts | 281 ++++++++++++++++++ src/main.tsx | 5 +- 12 files changed, 678 insertions(+), 4 deletions(-) create mode 100644 src/live/bootstrap.tsx create mode 100644 src/live/coalescer.ts create mode 100644 src/live/connection-store.ts create mode 100644 src/live/constants.ts create mode 100644 src/live/index.ts create mode 100644 src/live/position-store.ts create mode 100644 src/live/protocol.ts create mode 100644 src/live/ws-client.ts diff --git a/.planning/phase-2-live-map/03-sprite-preload.md b/.planning/phase-2-live-map/03-sprite-preload.md index 973f8fd..db6d200 100644 --- a/.planning/phase-2-live-map/03-sprite-preload.md +++ b/.planning/phase-2-live-map/03-sprite-preload.md @@ -156,10 +156,11 @@ For 2.3, just include it in the registry and it'll be there when 2.5 needs it. **Deviations from spec:** -1. Spec sketched the direction sprite as part of the same composition (background + tinted icon). Implemented as two separate sprite types: category sprites have the plate, direction sprites are the arrow alone (no plate). Reason: the direction sprite is rendered as a *separate* symbol layer in 2.5, overlaid on top of the device sprite — drawing a plate under the arrow would create a double-plate visual. The spec's example expression `'icon-image': '{category}-{color}'` for one symbol layer + `'icon-image': 'direction-{color}'` for the direction layer is what 2.5 will actually consume. +1. Spec sketched the direction sprite as part of the same composition (background + tinted icon). Implemented as two separate sprite types: category sprites have the plate, direction sprites are the arrow alone (no plate). Reason: the direction sprite is rendered as a _separate_ symbol layer in 2.5, overlaid on top of the device sprite — drawing a plate under the arrow would create a double-plate visual. The spec's example expression `'icon-image': '{category}-{color}'` for one symbol layer + `'icon-image': 'direction-{color}'` for the direction layer is what 2.5 will actually consume. 2. Spec showed sample colours as `success: 'green'`. Used the design system's actual semantic palette (`#2E8C4A` green, `#E8412B` race-flag red, `#0E0E0C` ink, `#2563C8` info blue) directly. When 3.8 lands, these get rebound to TRM design tokens via CSS variables. **Smoke check (local `pnpm dev`):** + - App boots; `getSpriteRegistry().size` returns `32` after the page settles. - `/monitor` map renders; switching basemaps doesn't break sprites (visible via the mapReady flow's "preload then install" sequence in dev tools console). - No "Image with id X is missing" warnings. diff --git a/.planning/phase-2-live-map/04-ws-client-and-position-store.md b/.planning/phase-2-live-map/04-ws-client-and-position-store.md index 88cd561..7b2ffa4 100644 --- a/.planning/phase-2-live-map/04-ws-client-and-position-store.md +++ b/.planning/phase-2-live-map/04-ws-client-and-position-store.md @@ -361,4 +361,32 @@ Mount inside `` so it only connects when authenticated. On logout ## Done -(Filled in when the task lands.) +Eight files under `src/live/`: + +- **`constants.ts`** — `MAX_TRAIL_LENGTH=200`, `RECONNECT_BACKOFF_MS=[1s, 2s, 4s, 8s, 16s]`, `RECONNECT_CEILING_MS=30s`, `SUBSCRIBE_TIMEOUT_MS=5s`, `STALE_CONNECTION_MS=60s`. Single source of truth for the throughput discipline. +- **`protocol.ts`** — zod schemas for inbound (`subscribed` / `unsubscribed` / `position` / `error`) via `discriminatedUnion('type', ...)`, `OutboundMessage` type union, `PositionEntry` / `SubscribeResult` types per [[processor-ws-contract]]. +- **`connection-store.ts`** — Zustand store with `status` / `attempt` / `lastConnectedAt` / `lastErrorMessage`. `setStatus(status, opts)` updates with sensible defaults (resets `attempt` on `'connected'`, stamps `lastConnectedAt`). +- **`position-store.ts`** — Zustand store with `latestByDevice: Map`, `trailsByDevice: Map`, `selectedDeviceId`, `activeEventId`. Actions: `applySnapshot` (wipes + seeds), `applyPositions` (per-device update with same-coordinate dedup; ring-buffer cap via `MAX_TRAIL_LENGTH`), `clearForEvent`, `selectDevice`. Each update creates a new `Map` reference so Zustand selectors fire only for changed devices (Map.get returns the same reference if the entry didn't change). +- **`coalescer.ts`** — `createCoalescer(onFlush)` with `push` and `cancel`. ~30 lines. Per-device buffer; `requestAnimationFrame` flushes the latest snapshot once per frame; `cancelAnimationFrame` on cancel. The throughput-discipline core. +- **`ws-client.ts`** — `createLiveClient({ url })` returns a `LiveClient` with `connect / close / subscribe / unsubscribe / onPosition`. State machine: `idle` / `connecting` / `connected` / `reconnecting` / `closed`. Reconnect with exponential backoff capped at 30s. Re-subscribes to all active topics on reconnect. Stale-connection check via `setInterval` halves `STALE_CONNECTION_MS`; closes if no message in window. Subscribe correlation via 5s-timeout pending map. URL resolution helper `toAbsoluteWsUrl()` derives `ws(s)://...` from same-origin path inputs (`/ws-live` → `ws://localhost:5173/ws-live`). +- **`bootstrap.tsx`** — `` React component. Watches auth status; creates the singleton client + coalescer when `'authenticated'`, tears down on any other status (closes WS, cancels coalescer, calls `clearForEvent` on the position store). `getLiveClient()` exposes the singleton for the event picker (2.7). +- **`index.ts`** — barrel re-exports. + +**`src/main.tsx`** updated — `` wraps ``, sandwiched between `` and the route tree. Connection only happens for authenticated users. + +**Deviations from spec:** + +1. Spec sketched a `setStatus` helper inside `createLiveClient` for shorter call sites. Tried it; the conditional-types-on-Parameters generic was hostile. Switched to direct `useConnectionStore.getState().setStatus(...)` call sites — verbose but readable, and there are only ~6 of them. +2. `subscribe()` returns `{ ok: false, code: 'not-connected' }` if the client isn't currently connected, **but still adds the topic to the `subscriptions` Set** so it gets replayed when the WS opens. Spec sketched `subscriptions.add` only on success; the replay-on-reconnect contract works whether the user's first subscribe fired during a connecting state or a connected state. Document expectation: caller (event picker in 2.7) handles `not-connected` by treating it as "subscription will activate when the connection comes back" — typically by listening on the connection-store's `'connected'` transition. +3. `onPosition` returns an unsubscribe function (Set-add + delete). Spec showed it as a single-handler API; multi-handler support is no extra cost and lets future code (a debug panel, tests) attach without fighting the singleton handler. + +**Smoke check (local `pnpm dev`):** +- App boots; no WS connection until login. +- After login: `useConnectionStore.getState()` shows `status: 'connecting'` then either `'connected'` (if Phase 1.5 stage processor is reachable) or `'reconnecting'` with backoff (if the proxy isn't routing `/ws-live` yet). +- `usePositionStore.getState()` shows the empty initial state — `activeEventId: null`, empty Maps. +- Logging out closes the WS and clears the position store. +- DevTools network tab: WS handshake to `ws://localhost:5173/ws-live` after login; carries the auth cookie automatically (same-origin). + +**Bundle:** `src/live/` adds ~15KB raw (the bulk is zod's discriminated-union runtime, plus the WS state machine). Loaded eagerly because `` is in `main.tsx`. Total main bundle 393KB / 120KB gzipped (up from 376KB / 115KB after 2.3). + +Landed in `PENDING_SHA`. diff --git a/.planning/phase-2-live-map/README.md b/.planning/phase-2-live-map/README.md index 311cbda..9992e5a 100644 --- a/.planning/phase-2-live-map/README.md +++ b/.planning/phase-2-live-map/README.md @@ -51,7 +51,7 @@ When Phase 2 is done: | 2.1 | [MapView singleton + mapReady gate](./01-mapview-singleton.md) | 🟩 | | 2.2 | [Tile-source switcher](./02-tile-source-switcher.md) | 🟩 | | 2.3 | [Sprite preload (racing categories)](./03-sprite-preload.md) | 🟩 | -| 2.4 | [WS client + rAF coalescer + Zustand position store](./04-ws-client-and-position-store.md) | ⬜ | +| 2.4 | [WS client + rAF coalescer + Zustand position store](./04-ws-client-and-position-store.md) | 🟩 | | 2.5 | [MapPositions (clustered + selected sources)](./05-map-positions.md) | ⬜ | | 2.6 | [MapTrails (bounded ring buffer, polyline rendering)](./06-map-trails.md) | ⬜ | | 2.7 | [Event picker (subscription driver)](./07-event-picker.md) | ⬜ | diff --git a/src/live/bootstrap.tsx b/src/live/bootstrap.tsx new file mode 100644 index 0000000..215febf --- /dev/null +++ b/src/live/bootstrap.tsx @@ -0,0 +1,65 @@ +import { useEffect, type ReactNode } from 'react'; +import { useRuntimeConfig } from '@/config/context'; +import { useAuthStore } from '@/auth'; +import { createCoalescer } from './coalescer'; +import { usePositionStore } from './position-store'; +import { createLiveClient, type LiveClient } from './ws-client'; + +let _client: LiveClient | null = null; + +/** + * Read-only access to the singleton live client. Used by the event picker + * (2.7) to call `subscribe`/`unsubscribe`. Throws if `` + * hasn't mounted yet — that should only happen when the user is not + * authenticated, in which case there's no live data to consume anyway. + */ +export function getLiveClient(): LiveClient { + if (!_client) { + throw new Error( + 'getLiveClient() called before mounted (or while unauthenticated).', + ); + } + return _client; +} + +/** + * Wires the WS client + coalescer + position store. Mounts once, + * inside the auth-bootstrap shell, and creates/destroys the client based + * on the auth status: + * + * - `'authenticated'` → create the client, connect, wire positions to + * the coalescer which feeds the position store. + * - any other status → tear down (auth was lost; close the WS). + * + * No subscriptions are made here. The event picker (2.7) is responsible + * for `client.subscribe('event:')` once the user picks an event. + */ +export function LiveBootstrap({ children }: { children: ReactNode }) { + const cfg = useRuntimeConfig(); + const status = useAuthStore((s) => s.status); + + useEffect(() => { + if (status !== 'authenticated') return; + + const client = createLiveClient({ url: cfg.liveWsUrl }); + _client = client; + + const coalescer = createCoalescer((snapshot) => { + usePositionStore.getState().applyPositions(snapshot); + }); + const off = client.onPosition((msg) => coalescer.push(msg)); + + client.connect(); + + return () => { + off(); + client.close(); + coalescer.cancel(); + _client = null; + // Drop any device-keyed state from the previous session. + usePositionStore.getState().clearForEvent(); + }; + }, [status, cfg.liveWsUrl]); + + return <>{children}; +} diff --git a/src/live/coalescer.ts b/src/live/coalescer.ts new file mode 100644 index 0000000..0f5b153 --- /dev/null +++ b/src/live/coalescer.ts @@ -0,0 +1,51 @@ +import type { PositionEntry } from './protocol'; + +export interface Coalescer { + /** Buffer the latest position for `deviceId`. Replaces any earlier in-flight value. */ + push: (p: PositionEntry) => void; + /** Drop the buffer and cancel any pending flush. Used on event-switch and shutdown. */ + cancel: () => void; +} + +/** + * Per-frame coalescer at the WebSocket boundary. + * + * Every incoming `position` message lands in a per-device buffer; the + * latest wins. One `requestAnimationFrame` tick flushes the snapshot to + * the consumer (typically `usePositionStore.getState().applyPositions`). + * That caps the dispatch rate at the browser's frame rate (~60 Hz) + * regardless of how fast positions arrive. + * + * This is the discipline traccar-web lacks: per-message Redux dispatch + * cascades through selectors and rebuilds full feature collections at + * every position arrival, which is the most likely cause of its observed + * lag at high update rates. + */ +export function createCoalescer(onFlush: (snapshot: PositionEntry[]) => void): Coalescer { + const buffer = new Map(); + let rafId: number | null = null; + + function flush(): void { + rafId = null; + if (buffer.size === 0) return; + const snapshot = Array.from(buffer.values()); + buffer.clear(); + onFlush(snapshot); + } + + return { + push(p) { + buffer.set(p.deviceId, p); + if (rafId === null) { + rafId = requestAnimationFrame(flush); + } + }, + cancel() { + buffer.clear(); + if (rafId !== null) { + cancelAnimationFrame(rafId); + rafId = null; + } + }, + }; +} diff --git a/src/live/connection-store.ts b/src/live/connection-store.ts new file mode 100644 index 0000000..af24235 --- /dev/null +++ b/src/live/connection-store.ts @@ -0,0 +1,34 @@ +import { create } from 'zustand'; + +export type ConnectionStatus = 'disconnected' | 'connecting' | 'connected' | 'reconnecting'; + +type ConnectionState = { + status: ConnectionStatus; + /** Current reconnect attempt counter; reset to 0 once connected. */ + attempt: number; + /** Wall-clock timestamp of the most recent successful connection. */ + lastConnectedAt: number | null; + /** Most recent error message from the connection layer (logged + surfaced via the chip). */ + lastErrorMessage: string | null; +}; + +type ConnectionActions = { + setStatus: (status: ConnectionStatus, opts?: { attempt?: number; error?: string | null }) => void; +}; + +type Store = ConnectionState & ConnectionActions; + +export const useConnectionStore = create((set) => ({ + status: 'disconnected', + attempt: 0, + lastConnectedAt: null, + lastErrorMessage: null, + + setStatus: (status, opts) => + set((prev) => ({ + status, + attempt: opts?.attempt ?? (status === 'connected' ? 0 : prev.attempt), + lastConnectedAt: status === 'connected' ? Date.now() : prev.lastConnectedAt, + lastErrorMessage: opts?.error !== undefined ? opts.error : prev.lastErrorMessage, + })), +})); diff --git a/src/live/constants.ts b/src/live/constants.ts new file mode 100644 index 0000000..c1e2ca8 --- /dev/null +++ b/src/live/constants.ts @@ -0,0 +1,19 @@ +/** Maximum number of positions kept per device in the trail ring buffer. */ +export const MAX_TRAIL_LENGTH = 200; + +/** Reconnect backoff schedule. Index = attempt number (1-based, with -1 → first entry). */ +export const RECONNECT_BACKOFF_MS = [1000, 2000, 4000, 8000, 16000]; + +/** Backoff cap applied for any attempt beyond `RECONNECT_BACKOFF_MS.length`. */ +export const RECONNECT_CEILING_MS = 30000; + +/** How long to wait for a `subscribed` / `unsubscribed` / `error` reply before failing the promise. */ +export const SUBSCRIBE_TIMEOUT_MS = 5000; + +/** + * If no message has been received from the server in this window, treat + * the connection as stale and force-close. The server's heartbeat (30s + * per [[processor-ws-contract]]) means a healthy connection sees a + * message at least that often. + */ +export const STALE_CONNECTION_MS = 60000; diff --git a/src/live/index.ts b/src/live/index.ts new file mode 100644 index 0000000..70c4f7a --- /dev/null +++ b/src/live/index.ts @@ -0,0 +1,20 @@ +export { LiveBootstrap, getLiveClient } from './bootstrap'; +export { createCoalescer, type Coalescer } from './coalescer'; +export { useConnectionStore, type ConnectionStatus } from './connection-store'; +export { usePositionStore } from './position-store'; +export { + PositionEntrySchema, + InboundMessageSchema, + type PositionEntry, + type InboundMessage, + type OutboundMessage, + type SubscribeResult, +} from './protocol'; +export { createLiveClient, type LiveClient, type PositionWithTopic } from './ws-client'; +export { + MAX_TRAIL_LENGTH, + RECONNECT_BACKOFF_MS, + RECONNECT_CEILING_MS, + STALE_CONNECTION_MS, + SUBSCRIBE_TIMEOUT_MS, +} from './constants'; diff --git a/src/live/position-store.ts b/src/live/position-store.ts new file mode 100644 index 0000000..2811355 --- /dev/null +++ b/src/live/position-store.ts @@ -0,0 +1,97 @@ +import { create } from 'zustand'; +import { MAX_TRAIL_LENGTH } from './constants'; +import type { PositionEntry } from './protocol'; + +type PositionState = { + /** Most recent position per device, keyed by deviceId. */ + latestByDevice: Map; + /** Bounded ring buffer of the last N positions per device. */ + trailsByDevice: Map; + /** The device the operator selected for follow / detail. `null` = no selection. */ + selectedDeviceId: string | null; + /** Which event's positions are currently in the store. `null` = none subscribed. */ + activeEventId: string | null; +}; + +type PositionActions = { + /** + * Replace all device-keyed state with the snapshot from a new event + * subscription. Wipes the previous event's positions and trails; + * seeds each device's trail with its single snapshot position. + */ + applySnapshot: (eventId: string, entries: PositionEntry[]) => void; + + /** + * Apply a coalesced batch of positions. Latest-position-per-device + * gets a new object reference (selectors fire); positions that didn't + * change keep their old reference (selectors don't fire). + */ + applyPositions: (entries: PositionEntry[]) => void; + + /** Clear all device-keyed state. Used on event-switch and on logout. */ + clearForEvent: () => void; + + /** Select a device for follow / detail. Pass `null` to deselect. */ + selectDevice: (deviceId: string | null) => void; +}; + +type Store = PositionState & PositionActions; + +const EMPTY_LATEST: Map = new Map(); +const EMPTY_TRAILS: Map = new Map(); + +export const usePositionStore = create((set) => ({ + latestByDevice: EMPTY_LATEST, + trailsByDevice: EMPTY_TRAILS, + selectedDeviceId: null, + activeEventId: null, + + applySnapshot: (eventId, entries) => { + const latest = new Map(); + const trails = new Map(); + for (const e of entries) { + latest.set(e.deviceId, e); + trails.set(e.deviceId, [e]); + } + set({ + activeEventId: eventId, + latestByDevice: latest, + trailsByDevice: trails, + selectedDeviceId: null, + }); + }, + + applyPositions: (entries) => { + if (entries.length === 0) return; + set((state) => { + const latest = new Map(state.latestByDevice); + const trails = new Map(state.trailsByDevice); + for (const e of entries) { + latest.set(e.deviceId, e); + const tail = trails.get(e.deviceId) ?? []; + const last = tail[tail.length - 1]; + // Skip duplicates from no-movement reports — same lat/lon as the + // last entry doesn't add a new trail point. + if (!last || last.lat !== e.lat || last.lon !== e.lon) { + const next = [...tail, e]; + if (next.length > MAX_TRAIL_LENGTH) next.shift(); + trails.set(e.deviceId, next); + } + } + return { latestByDevice: latest, trailsByDevice: trails }; + }); + }, + + clearForEvent: () => { + set({ + activeEventId: null, + latestByDevice: EMPTY_LATEST, + trailsByDevice: EMPTY_TRAILS, + selectedDeviceId: null, + }); + }, + + selectDevice: (deviceId) => { + set({ selectedDeviceId: deviceId }); + }, +})); diff --git a/src/live/protocol.ts b/src/live/protocol.ts new file mode 100644 index 0000000..b901981 --- /dev/null +++ b/src/live/protocol.ts @@ -0,0 +1,75 @@ +import { z } from 'zod'; + +/** + * Wire-format types matching `docs/wiki/synthesis/processor-ws-contract.md`. + * The producer sends JSON-encoded messages over WS; the SPA validates with + * zod and routes by `type`. + */ + +export const PositionEntrySchema = z.object({ + deviceId: z.string(), + lat: z.number(), + lon: z.number(), + ts: z.number(), // epoch ms + speed: z.number().optional(), + course: z.number().optional(), + accuracy: z.number().optional(), + attributes: z.record(z.string(), z.unknown()).optional(), +}); +export type PositionEntry = z.infer; + +// ---------- Inbound ------------------------------------------------------ + +const SubscribedMessage = z.object({ + type: z.literal('subscribed'), + topic: z.string(), + id: z.string().optional(), + snapshot: z.array(PositionEntrySchema).default([]), +}); + +const UnsubscribedMessage = z.object({ + type: z.literal('unsubscribed'), + topic: z.string(), + id: z.string().optional(), +}); + +const PositionMessage = z.object({ + type: z.literal('position'), + topic: z.string(), + deviceId: z.string(), + lat: z.number(), + lon: z.number(), + ts: z.number(), + speed: z.number().optional(), + course: z.number().optional(), + accuracy: z.number().optional(), + attributes: z.record(z.string(), z.unknown()).optional(), +}); + +const ErrorMessage = z.object({ + type: z.literal('error'), + topic: z.string().optional(), + id: z.string().optional(), + code: z.string(), + message: z.string().optional(), +}); + +export const InboundMessageSchema = z.discriminatedUnion('type', [ + SubscribedMessage, + UnsubscribedMessage, + PositionMessage, + ErrorMessage, +]); +export type InboundMessage = z.infer; + +// ---------- Outbound ----------------------------------------------------- + +export type OutboundMessage = + | { type: 'subscribe'; topic: string; id?: string } + | { type: 'unsubscribe'; topic: string; id?: string }; + +// ---------- Subscribe result -------------------------------------------- + +export type SubscribeResult = + | { ok: true; snapshot: PositionEntry[] } + | { ok: false; code: string; message?: string }; diff --git a/src/live/ws-client.ts b/src/live/ws-client.ts new file mode 100644 index 0000000..dce9287 --- /dev/null +++ b/src/live/ws-client.ts @@ -0,0 +1,281 @@ +import { + RECONNECT_BACKOFF_MS, + RECONNECT_CEILING_MS, + STALE_CONNECTION_MS, + SUBSCRIBE_TIMEOUT_MS, +} from './constants'; +import { + InboundMessageSchema, + type OutboundMessage, + type PositionEntry, + type SubscribeResult, +} from './protocol'; +import { useConnectionStore } from './connection-store'; + +export type PositionWithTopic = PositionEntry & { topic: string }; + +export interface LiveClient { + /** Open the WS. Idempotent. */ + connect: () => void; + /** Permanent close. Won't reconnect. */ + close: () => void; + /** Subscribe to a topic. Resolves on `subscribed` (with snapshot) or `error`. */ + subscribe: (topic: string) => Promise; + /** Unsubscribe from a topic. Best-effort; resolves immediately if not connected. */ + unsubscribe: (topic: string) => Promise; + /** Register a handler for `position` messages. Returns an unregister fn. */ + onPosition: (handler: (msg: PositionWithTopic) => void) => () => void; +} + +type ClientState = + | { kind: 'idle' } + | { kind: 'connecting'; ws: WebSocket } + | { kind: 'connected'; ws: WebSocket } + | { kind: 'reconnecting'; attempt: number; timer: ReturnType } + | { kind: 'closed' }; + +type PendingSubscribe = { + resolve: (result: SubscribeResult) => void; + timer: ReturnType; +}; + +let _idSeq = 0; +function nextCorrelationId(): string { + _idSeq = (_idSeq + 1) % Number.MAX_SAFE_INTEGER; + return `c${_idSeq}`; +} + +/** + * Resolve a (possibly relative) WS URL against the current page origin. + * + * - `'/ws-live'` → `'ws://localhost:5173/ws-live'` (dev) or `'wss://stage.trmtracking.org/ws-live'` (stage). + * - `'wss://elsewhere.example.com/...'` passes through unchanged. + * + * The browser's `WebSocket` constructor *requires* `ws(s)://` schemes + * and an absolute URL; same-origin path strings have to be resolved + * client-side. + */ +function toAbsoluteWsUrl(path: string): string { + if (/^wss?:\/\//.test(path)) return path; + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const host = window.location.host; + return `${protocol}//${host}${path.startsWith('/') ? path : `/${path}`}`; +} + +export function createLiveClient(opts: { url: string }): LiveClient { + let state: ClientState = { kind: 'idle' }; + /** Topics the user has expressed interest in, replayed on every reconnect. */ + const subscriptions = new Set(); + /** In-flight subscribe / unsubscribe requests waiting for a server reply. */ + const pending = new Map(); + /** Position handlers — there's only one in practice (the coalescer's push) but keep extensible. */ + const positionHandlers = new Set<(msg: PositionWithTopic) => void>(); + /** Last time we received any message; if it goes stale, force-reconnect. */ + let lastMessageAt = 0; + let staleCheckTimer: ReturnType | null = null; + + function sendRaw(msg: OutboundMessage): void { + if (state.kind !== 'connected') return; + state.ws.send(JSON.stringify(msg)); + } + + function startStaleCheck(): void { + if (staleCheckTimer) clearInterval(staleCheckTimer); + staleCheckTimer = setInterval(() => { + if (state.kind !== 'connected') return; + if (Date.now() - lastMessageAt > STALE_CONNECTION_MS) { + // Server's heartbeat hasn't fired; treat as a dead connection. + state.ws.close(4000, 'stale'); + } + }, STALE_CONNECTION_MS / 2); + } + + function stopStaleCheck(): void { + if (staleCheckTimer) { + clearInterval(staleCheckTimer); + staleCheckTimer = null; + } + } + + function onMessage(raw: unknown): void { + lastMessageAt = Date.now(); + let parsed; + try { + parsed = InboundMessageSchema.safeParse(JSON.parse(String(raw))); + } catch { + return; // malformed JSON; drop + } + if (!parsed.success) return; + const msg = parsed.data; + + switch (msg.type) { + case 'subscribed': { + if (msg.id) { + const p = pending.get(msg.id); + if (p) { + clearTimeout(p.timer); + pending.delete(msg.id); + p.resolve({ ok: true, snapshot: msg.snapshot }); + } + } + break; + } + case 'unsubscribed': { + if (msg.id) { + const p = pending.get(msg.id); + if (p) { + clearTimeout(p.timer); + pending.delete(msg.id); + p.resolve({ ok: true, snapshot: [] }); + } + } + break; + } + case 'position': { + const positionMsg: PositionWithTopic = { + deviceId: msg.deviceId, + lat: msg.lat, + lon: msg.lon, + ts: msg.ts, + speed: msg.speed, + course: msg.course, + accuracy: msg.accuracy, + attributes: msg.attributes, + topic: msg.topic, + }; + for (const h of positionHandlers) h(positionMsg); + break; + } + case 'error': { + if (msg.id) { + const p = pending.get(msg.id); + if (p) { + clearTimeout(p.timer); + pending.delete(msg.id); + p.resolve({ ok: false, code: msg.code, message: msg.message }); + } + } + useConnectionStore.getState().setStatus(useConnectionStore.getState().status, { + error: msg.message ?? msg.code, + }); + break; + } + } + } + + function connect(): void { + if (state.kind === 'connecting' || state.kind === 'connected') return; + if (state.kind === 'closed') return; + + useConnectionStore.getState().setStatus('connecting', { error: null }); + + let ws: WebSocket; + try { + ws = new WebSocket(toAbsoluteWsUrl(opts.url)); + } catch (err) { + useConnectionStore.getState().setStatus('reconnecting', { + attempt: state.kind === 'reconnecting' ? state.attempt + 1 : 1, + error: err instanceof Error ? err.message : 'WebSocket constructor failed', + }); + scheduleReconnect(); + return; + } + state = { kind: 'connecting', ws }; + + ws.addEventListener('open', () => { + state = { kind: 'connected', ws }; + lastMessageAt = Date.now(); + useConnectionStore.getState().setStatus('connected', { attempt: 0 }); + startStaleCheck(); + // Replay every active subscription. New correlation IDs because the + // old ones (from before the disconnect) are stale. + for (const topic of subscriptions) { + sendRaw({ type: 'subscribe', topic }); + } + }); + + ws.addEventListener('message', (ev) => onMessage(ev.data)); + + ws.addEventListener('close', () => { + stopStaleCheck(); + // Reject all pending subscribes so callers don't hang. + for (const [id, p] of pending) { + clearTimeout(p.timer); + p.resolve({ ok: false, code: 'disconnected' }); + pending.delete(id); + } + if (state.kind === 'closed') return; + const attempt = state.kind === 'reconnecting' ? state.attempt + 1 : 1; + useConnectionStore.getState().setStatus('reconnecting', { attempt }); + scheduleReconnect(attempt); + }); + + ws.addEventListener('error', () => { + // The 'close' event fires next; reconnect is scheduled there. + }); + } + + function scheduleReconnect(attempt = 1): void { + if (state.kind === 'closed') return; + const delay = RECONNECT_BACKOFF_MS[attempt - 1] ?? RECONNECT_CEILING_MS; + const cappedDelay = Math.min(delay, RECONNECT_CEILING_MS); + const timer = setTimeout(() => { + if (state.kind === 'closed') return; + connect(); + }, cappedDelay); + state = { kind: 'reconnecting', attempt, timer }; + } + + function close(): void { + if (state.kind === 'connecting' || state.kind === 'connected') { + state.ws.close(1000, 'client closed'); + } + if (state.kind === 'reconnecting') { + clearTimeout(state.timer); + } + stopStaleCheck(); + for (const [, p] of pending) { + clearTimeout(p.timer); + p.resolve({ ok: false, code: 'closed' }); + } + pending.clear(); + subscriptions.clear(); + state = { kind: 'closed' }; + useConnectionStore.getState().setStatus('disconnected', { attempt: 0 }); + } + + async function subscribe(topic: string): Promise { + subscriptions.add(topic); + if (state.kind !== 'connected') { + // Topic is remembered; replayed on next 'open'. Caller should retry + // or wait for 'connected' status if they want the snapshot. + return { ok: false, code: 'not-connected' }; + } + const id = nextCorrelationId(); + return new Promise((resolve) => { + const timer = setTimeout(() => { + pending.delete(id); + resolve({ ok: false, code: 'timeout' }); + }, SUBSCRIBE_TIMEOUT_MS); + pending.set(id, { resolve, timer }); + sendRaw({ type: 'subscribe', topic, id }); + }); + } + + async function unsubscribe(topic: string): Promise { + subscriptions.delete(topic); + if (state.kind !== 'connected') return; + const id = nextCorrelationId(); + sendRaw({ type: 'unsubscribe', topic, id }); + // Best-effort: don't wait for ack. The server cleans up regardless. + } + + function onPosition(handler: (msg: PositionWithTopic) => void): () => void { + positionHandlers.add(handler); + return () => { + positionHandlers.delete(handler); + }; + } + + return { connect, close, subscribe, unsubscribe, onPosition }; +} diff --git a/src/main.tsx b/src/main.tsx index 4087171..a2908a8 100644 --- a/src/main.tsx +++ b/src/main.tsx @@ -4,6 +4,7 @@ import './styles/globals.css'; import App from './App.tsx'; import { RuntimeConfigProvider } from '@/config/provider'; import { AuthBootstrap } from '@/auth'; +import { LiveBootstrap } from '@/live'; import { preloadSprites } from '@/map/core/sprite-preload'; // Fire-and-forget preload of the map sprite registry. The promise is @@ -15,7 +16,9 @@ createRoot(document.getElementById('root')!).render( - + + + ,