import { RECONNECT_BACKOFF_MS, RECONNECT_CEILING_MS, STALE_CONNECTION_MS, SUBSCRIBE_TIMEOUT_MS, } from './constants'; import { InboundMessageSchema, type OutboundMessage, type PositionEntry, type SubscribeResult, } from './protocol'; import { useConnectionStore } from './connection-store'; export type PositionWithTopic = PositionEntry & { topic: string }; export interface LiveClient { /** Open the WS. Idempotent. */ connect: () => void; /** Permanent close. Won't reconnect. */ close: () => void; /** Subscribe to a topic. Resolves on `subscribed` (with snapshot) or `error`. */ subscribe: (topic: string) => Promise; /** Unsubscribe from a topic. Best-effort; resolves immediately if not connected. */ unsubscribe: (topic: string) => Promise; /** Register a handler for `position` messages. Returns an unregister fn. */ onPosition: (handler: (msg: PositionWithTopic) => void) => () => void; } type ClientState = | { kind: 'idle' } | { kind: 'connecting'; ws: WebSocket } | { kind: 'connected'; ws: WebSocket } | { kind: 'reconnecting'; attempt: number; timer: ReturnType } | { kind: 'closed' }; type PendingSubscribe = { resolve: (result: SubscribeResult) => void; timer: ReturnType; }; let _idSeq = 0; function nextCorrelationId(): string { _idSeq = (_idSeq + 1) % Number.MAX_SAFE_INTEGER; return `c${_idSeq}`; } /** * Resolve a (possibly relative) WS URL against the current page origin. * * - `'/ws-live'` → `'ws://localhost:5173/ws-live'` (dev) or `'wss://stage.trmtracking.org/ws-live'` (stage). * - `'wss://elsewhere.example.com/...'` passes through unchanged. * * The browser's `WebSocket` constructor *requires* `ws(s)://` schemes * and an absolute URL; same-origin path strings have to be resolved * client-side. */ function toAbsoluteWsUrl(path: string): string { if (/^wss?:\/\//.test(path)) return path; const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; const host = window.location.host; return `${protocol}//${host}${path.startsWith('/') ? path : `/${path}`}`; } export function createLiveClient(opts: { url: string }): LiveClient { let state: ClientState = { kind: 'idle' }; /** Topics the user has expressed interest in, replayed on every reconnect. */ const subscriptions = new Set(); /** In-flight subscribe / unsubscribe requests waiting for a server reply. */ const pending = new Map(); /** Position handlers — there's only one in practice (the coalescer's push) but keep extensible. */ const positionHandlers = new Set<(msg: PositionWithTopic) => void>(); /** Last time we received any message; if it goes stale, force-reconnect. */ let lastMessageAt = 0; let staleCheckTimer: ReturnType | null = null; function sendRaw(msg: OutboundMessage): void { if (state.kind !== 'connected') return; state.ws.send(JSON.stringify(msg)); } function startStaleCheck(): void { if (staleCheckTimer) clearInterval(staleCheckTimer); staleCheckTimer = setInterval(() => { if (state.kind !== 'connected') return; if (Date.now() - lastMessageAt > STALE_CONNECTION_MS) { // Server's heartbeat hasn't fired; treat as a dead connection. state.ws.close(4000, 'stale'); } }, STALE_CONNECTION_MS / 2); } function stopStaleCheck(): void { if (staleCheckTimer) { clearInterval(staleCheckTimer); staleCheckTimer = null; } } function onMessage(raw: unknown): void { lastMessageAt = Date.now(); let parsed; try { parsed = InboundMessageSchema.safeParse(JSON.parse(String(raw))); } catch { return; // malformed JSON; drop } if (!parsed.success) return; const msg = parsed.data; switch (msg.type) { case 'subscribed': { if (msg.id) { const p = pending.get(msg.id); if (p) { clearTimeout(p.timer); pending.delete(msg.id); p.resolve({ ok: true, snapshot: msg.snapshot }); } } break; } case 'unsubscribed': { if (msg.id) { const p = pending.get(msg.id); if (p) { clearTimeout(p.timer); pending.delete(msg.id); p.resolve({ ok: true, snapshot: [] }); } } break; } case 'position': { const positionMsg: PositionWithTopic = { deviceId: msg.deviceId, lat: msg.lat, lon: msg.lon, ts: msg.ts, speed: msg.speed, course: msg.course, accuracy: msg.accuracy, attributes: msg.attributes, topic: msg.topic, }; for (const h of positionHandlers) h(positionMsg); break; } case 'error': { if (msg.id) { const p = pending.get(msg.id); if (p) { clearTimeout(p.timer); pending.delete(msg.id); p.resolve({ ok: false, code: msg.code, message: msg.message }); } } useConnectionStore.getState().setStatus(useConnectionStore.getState().status, { error: msg.message ?? msg.code, }); break; } } } function connect(): void { if (state.kind === 'connecting' || state.kind === 'connected') return; if (state.kind === 'closed') return; useConnectionStore.getState().setStatus('connecting', { error: null }); let ws: WebSocket; try { ws = new WebSocket(toAbsoluteWsUrl(opts.url)); } catch (err) { useConnectionStore.getState().setStatus('reconnecting', { attempt: state.kind === 'reconnecting' ? state.attempt + 1 : 1, error: err instanceof Error ? err.message : 'WebSocket constructor failed', }); scheduleReconnect(); return; } state = { kind: 'connecting', ws }; ws.addEventListener('open', () => { state = { kind: 'connected', ws }; lastMessageAt = Date.now(); useConnectionStore.getState().setStatus('connected', { attempt: 0 }); startStaleCheck(); // Replay every active subscription. New correlation IDs because the // old ones (from before the disconnect) are stale. for (const topic of subscriptions) { sendRaw({ type: 'subscribe', topic }); } }); ws.addEventListener('message', (ev) => onMessage(ev.data)); ws.addEventListener('close', () => { stopStaleCheck(); // Reject all pending subscribes so callers don't hang. for (const [id, p] of pending) { clearTimeout(p.timer); p.resolve({ ok: false, code: 'disconnected' }); pending.delete(id); } if (state.kind === 'closed') return; const attempt = state.kind === 'reconnecting' ? state.attempt + 1 : 1; useConnectionStore.getState().setStatus('reconnecting', { attempt }); scheduleReconnect(attempt); }); ws.addEventListener('error', () => { // The 'close' event fires next; reconnect is scheduled there. }); } function scheduleReconnect(attempt = 1): void { if (state.kind === 'closed') return; const delay = RECONNECT_BACKOFF_MS[attempt - 1] ?? RECONNECT_CEILING_MS; const cappedDelay = Math.min(delay, RECONNECT_CEILING_MS); const timer = setTimeout(() => { if (state.kind === 'closed') return; connect(); }, cappedDelay); state = { kind: 'reconnecting', attempt, timer }; } function close(): void { if (state.kind === 'connecting' || state.kind === 'connected') { state.ws.close(1000, 'client closed'); } if (state.kind === 'reconnecting') { clearTimeout(state.timer); } stopStaleCheck(); for (const [, p] of pending) { clearTimeout(p.timer); p.resolve({ ok: false, code: 'closed' }); } pending.clear(); subscriptions.clear(); state = { kind: 'closed' }; useConnectionStore.getState().setStatus('disconnected', { attempt: 0 }); } async function subscribe(topic: string): Promise { subscriptions.add(topic); if (state.kind !== 'connected') { // Topic is remembered; replayed on next 'open'. Caller should retry // or wait for 'connected' status if they want the snapshot. return { ok: false, code: 'not-connected' }; } const id = nextCorrelationId(); return new Promise((resolve) => { const timer = setTimeout(() => { pending.delete(id); resolve({ ok: false, code: 'timeout' }); }, SUBSCRIBE_TIMEOUT_MS); pending.set(id, { resolve, timer }); sendRaw({ type: 'subscribe', topic, id }); }); } async function unsubscribe(topic: string): Promise { subscriptions.delete(topic); if (state.kind !== 'connected') return; const id = nextCorrelationId(); sendRaw({ type: 'unsubscribe', topic, id }); // Best-effort: don't wait for ack. The server cleans up regardless. } function onPosition(handler: (msg: PositionWithTopic) => void): () => void { positionHandlers.add(handler); return () => { positionHandlers.delete(handler); }; } return { connect, close, subscribe, unsubscribe, onPosition }; }