/** * Snapshot provider — returns the latest non-faulty position per device for a * given event at subscribe time. * * Called once per `subscribe` message, inside registry.ts's `subscribe()` after * authorization succeeds. The result is included in the `subscribed` reply so * the SPA map is fully populated immediately rather than waiting for the next * live broadcast batch. * * Query: * DISTINCT ON (p.device_id) ... ORDER BY p.device_id, p.ts DESC * returns the row with the highest `ts` per device in one Postgres pass. * Requires the `positions_device_ts_idx ON positions (device_id, ts DESC)` * index created in migration 0002. * * Spec: processor-ws-contract.md §Server response — subscribed; * task 1.5.5 §The query */ import type pg from 'pg'; import type { Logger } from 'pino'; import type { Metrics } from '../shared/types.js'; import type { PositionSnapshotEntry } from './protocol.js'; import type { SnapshotProvider } from './registry.js'; // --------------------------------------------------------------------------- // Query result row shape // --------------------------------------------------------------------------- type SnapshotRow = { device_id: string; latitude: number; longitude: number; ts: Date; speed: number; angle: number; }; // --------------------------------------------------------------------------- // Factory // --------------------------------------------------------------------------- export function createSnapshotProvider( pool: pg.Pool, logger: Logger, metrics: Metrics, ): SnapshotProvider { /** * Returns the latest non-faulty position for every device registered to the * given event. Returns an empty array when: * - the event has no `entry_devices` rows. * - the registered devices have no positions yet. * - all positions for a device are faulty. * * Never throws — the caller (registry.fetchSnapshot) already wraps in a * try/catch that falls back to an empty snapshot. */ async function forEvent(eventId: string): Promise { const start = performance.now(); const result = await pool.query( `SELECT DISTINCT ON (p.device_id) p.device_id, p.latitude, p.longitude, p.ts, p.speed, p.angle FROM positions p JOIN entry_devices ed ON ed.device_id = p.device_id JOIN entries e ON e.id = ed.entry_id WHERE e.event_id = $1 AND p.faulty = false ORDER BY p.device_id, p.ts DESC`, [eventId], ); const elapsed = performance.now() - start; metrics.observe('processor_live_snapshot_query_latency_ms', elapsed); metrics.observe('processor_live_snapshot_size', result.rows.length); logger.debug( { eventId, count: result.rows.length, elapsedMs: Math.round(elapsed) }, 'snapshot query completed', ); return result.rows.map(rowToSnapshotEntry); } return { forEvent }; } // --------------------------------------------------------------------------- // Row → wire type // --------------------------------------------------------------------------- /** * Maps a Postgres snapshot row to a PositionSnapshotEntry. * * Field omission convention: speed and course (angle) are omitted when zero, * matching the broadcast consumer's `toPositionMessage` convention. Per Teltonika * protocol, 0 speed may indicate an invalid GPS fix; 0 angle is meaningless when * the device is stationary. Emit the field only when it carries information. * * `ts` is stored as a `timestamptz` in Postgres and returned as a JavaScript * `Date` by node-postgres. Convert to epoch ms for the wire format. */ function rowToSnapshotEntry(row: SnapshotRow): PositionSnapshotEntry { const entry: PositionSnapshotEntry = { deviceId: row.device_id, lat: row.latitude, lon: row.longitude, ts: row.ts instanceof Date ? row.ts.getTime() : Number(row.ts), }; // Omit speed when 0 — matches broadcast.ts toPositionMessage convention. if (row.speed > 0) { (entry as Record)['speed'] = row.speed; } // Omit course when 0 — angle of 0 is uninformative when stationary. if (row.angle > 0) { (entry as Record)['course'] = row.angle; } return entry; }