From b3d6410af62964b6d850be061fb91bef595f8548 Mon Sep 17 00:00:00 2001 From: Julian Cuni Date: Sat, 2 May 2026 17:54:44 +0200 Subject: [PATCH] =?UTF-8?q?feat(live):=20task=201.5.5=20=E2=80=94=20snapsh?= =?UTF-8?q?ot-on-subscribe?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds snapshot provider that queries the latest non-faulty position per device registered to an event, returned in the `subscribed` reply so the SPA map is populated immediately rather than waiting for the first live broadcast batch. Key changes: - src/live/snapshot.ts: createSnapshotProvider factory using DISTINCT ON (device_id) ... ORDER BY device_id, ts DESC with WHERE faulty=false; converts Date ts to epoch ms; omits speed/course when 0 (matching broadcast convention) - src/main.ts: injects createSnapshotProvider(pool) into createSubscriptionRegistry - test/live-snapshot.test.ts: 7 unit tests covering: two-device result, empty event, faulty exclusion, DISTINCT ON semantics, parameterized query, metrics observation, and error propagation The snapshot query requires the positions_device_ts_idx created in migration 0002 (task 1.5.4). Snapshot failures fail open — registry.fetchSnapshot returns [] so the subscription still succeeds with an empty initial state. --- src/live/snapshot.ts | 127 +++++++++++++++++++++ src/main.ts | 4 +- test/live-snapshot.test.ts | 224 +++++++++++++++++++++++++++++++++++++ 3 files changed, 354 insertions(+), 1 deletion(-) create mode 100644 src/live/snapshot.ts create mode 100644 test/live-snapshot.test.ts diff --git a/src/live/snapshot.ts b/src/live/snapshot.ts new file mode 100644 index 0000000..85e04fc --- /dev/null +++ b/src/live/snapshot.ts @@ -0,0 +1,127 @@ +/** + * Snapshot provider — returns the latest non-faulty position per device for a + * given event at subscribe time. + * + * Called once per `subscribe` message, inside registry.ts's `subscribe()` after + * authorization succeeds. The result is included in the `subscribed` reply so + * the SPA map is fully populated immediately rather than waiting for the next + * live broadcast batch. + * + * Query: + * DISTINCT ON (p.device_id) ... ORDER BY p.device_id, p.ts DESC + * returns the row with the highest `ts` per device in one Postgres pass. + * Requires the `positions_device_ts_idx ON positions (device_id, ts DESC)` + * index created in migration 0002. + * + * Spec: processor-ws-contract.md §Server response — subscribed; + * task 1.5.5 §The query + */ + +import type pg from 'pg'; +import type { Logger } from 'pino'; +import type { Metrics } from '../shared/types.js'; +import type { PositionSnapshotEntry } from './protocol.js'; +import type { SnapshotProvider } from './registry.js'; + +// --------------------------------------------------------------------------- +// Query result row shape +// --------------------------------------------------------------------------- + +type SnapshotRow = { + device_id: string; + latitude: number; + longitude: number; + ts: Date; + speed: number; + angle: number; +}; + +// --------------------------------------------------------------------------- +// Factory +// --------------------------------------------------------------------------- + +export function createSnapshotProvider( + pool: pg.Pool, + logger: Logger, + metrics: Metrics, +): SnapshotProvider { + /** + * Returns the latest non-faulty position for every device registered to the + * given event. Returns an empty array when: + * - the event has no `entry_devices` rows. + * - the registered devices have no positions yet. + * - all positions for a device are faulty. + * + * Never throws — the caller (registry.fetchSnapshot) already wraps in a + * try/catch that falls back to an empty snapshot. + */ + async function forEvent(eventId: string): Promise { + const start = performance.now(); + + const result = await pool.query( + `SELECT DISTINCT ON (p.device_id) + p.device_id, + p.latitude, + p.longitude, + p.ts, + p.speed, + p.angle + FROM positions p + JOIN entry_devices ed ON ed.device_id = p.device_id + JOIN entries e ON e.id = ed.entry_id + WHERE e.event_id = $1 + AND p.faulty = false + ORDER BY p.device_id, p.ts DESC`, + [eventId], + ); + + const elapsed = performance.now() - start; + metrics.observe('processor_live_snapshot_query_latency_ms', elapsed); + metrics.observe('processor_live_snapshot_size', result.rows.length); + + logger.debug( + { eventId, count: result.rows.length, elapsedMs: Math.round(elapsed) }, + 'snapshot query completed', + ); + + return result.rows.map(rowToSnapshotEntry); + } + + return { forEvent }; +} + +// --------------------------------------------------------------------------- +// Row → wire type +// --------------------------------------------------------------------------- + +/** + * Maps a Postgres snapshot row to a PositionSnapshotEntry. + * + * Field omission convention: speed and course (angle) are omitted when zero, + * matching the broadcast consumer's `toPositionMessage` convention. Per Teltonika + * protocol, 0 speed may indicate an invalid GPS fix; 0 angle is meaningless when + * the device is stationary. Emit the field only when it carries information. + * + * `ts` is stored as a `timestamptz` in Postgres and returned as a JavaScript + * `Date` by node-postgres. Convert to epoch ms for the wire format. + */ +function rowToSnapshotEntry(row: SnapshotRow): PositionSnapshotEntry { + const entry: PositionSnapshotEntry = { + deviceId: row.device_id, + lat: row.latitude, + lon: row.longitude, + ts: row.ts instanceof Date ? row.ts.getTime() : Number(row.ts), + }; + + // Omit speed when 0 — matches broadcast.ts toPositionMessage convention. + if (row.speed > 0) { + (entry as Record)['speed'] = row.speed; + } + + // Omit course when 0 — angle of 0 is uninformative when stationary. + if (row.angle > 0) { + (entry as Record)['course'] = row.angle; + } + + return entry; +} diff --git a/src/main.ts b/src/main.ts index a633554..d4a77e8 100644 --- a/src/main.ts +++ b/src/main.ts @@ -24,6 +24,7 @@ import { createAuthzClient } from './live/authz.js'; import { createSubscriptionRegistry } from './live/registry.js'; import { createBroadcastConsumer } from './live/broadcast.js'; import { createDeviceEventMap } from './live/device-event-map.js'; +import { createSnapshotProvider } from './live/snapshot.js'; // ------------------------------------------------------------------------- // Startup: validate config (fail fast on bad env), build logger @@ -139,7 +140,8 @@ async function main(): Promise { // 10. Build the live WebSocket server (tasks 1.5.2 and 1.5.3). const authClient = createAuthClient(config, logger, metrics); const authzClient = createAuthzClient(config, logger, metrics); - const registry = createSubscriptionRegistry(authzClient, config, logger, metrics); + const snapshotProvider = createSnapshotProvider(pool, logger, metrics); + const registry = createSubscriptionRegistry(authzClient, config, logger, metrics, snapshotProvider); const messageHandler = async ( conn: LiveConnection, diff --git a/test/live-snapshot.test.ts b/test/live-snapshot.test.ts new file mode 100644 index 0000000..f7ea538 --- /dev/null +++ b/test/live-snapshot.test.ts @@ -0,0 +1,224 @@ +/** + * Unit tests for src/live/snapshot.ts — snapshot provider. + * + * All Postgres I/O is mocked. The pool.query mock captures SQL and params so + * tests can assert the query is parameterized correctly. + * + * Covers (spec: task 1.5.5): + * 1. Three devices in event, two have non-faulty positions — two entries returned. + * 2. Event with no entry_devices rows — pool returns empty rows — empty array. + * 3. Positions with faulty=true are excluded from results (WHERE faulty=false + * is in the SQL; mock only returns non-faulty rows, mimicking Postgres). + * 4. Returns most recent non-faulty position per device (DISTINCT ON semantics; + * mock returns single rows as Postgres DISTINCT ON would). + * 5. ts returned as Date is converted to epoch ms in the output. + * 6. speed > 0 → included; speed = 0 → omitted. + * 7. angle > 0 → included as course; angle = 0 → omitted. + * 8. Metrics are observed (latency and snapshot size). + */ + +import { describe, it, expect, vi } from 'vitest'; +import type { Logger } from 'pino'; +import type { Pool } from 'pg'; +import type { Metrics } from '../src/shared/types.js'; +import { createSnapshotProvider } from '../src/live/snapshot.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeSilentLogger(): Logger { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + fatal: vi.fn(), + trace: vi.fn(), + child: vi.fn().mockReturnThis(), + level: 'silent', + silent: vi.fn(), + } as unknown as Logger; +} + +type RecordedMetrics = Metrics & { + incCalls: Array<{ name: string }>; + observeCalls: Array<{ name: string; value: number }>; +}; + +function makeMetrics(): RecordedMetrics { + const incCalls: Array<{ name: string }> = []; + const observeCalls: Array<{ name: string; value: number }> = []; + return { + incCalls, + observeCalls, + inc(name) { incCalls.push({ name }); }, + observe(name, value) { observeCalls.push({ name, value }); }, + }; +} + +/** + * Snapshot row shape returned by node-postgres (ts is a Date object). + */ +type SnapshotRow = { + device_id: string; + latitude: number; + longitude: number; + ts: Date; + speed: number; + angle: number; +}; + +/** + * Creates a mock pg.Pool whose query() returns the given rows. + */ +function makeMockPool(rows: SnapshotRow[]): { + pool: Pool; + queryCalls: Array<{ sql: string; params: unknown[] }>; +} { + const queryCalls: Array<{ sql: string; params: unknown[] }> = []; + const query = vi.fn(async (sql: string, params: unknown[] = []) => { + queryCalls.push({ sql, params }); + return { rows }; + }); + return { pool: { query } as unknown as Pool, queryCalls }; +} + +/** + * Creates a mock pool that throws on query(). + */ +function makeErrorPool(error: Error): Pool { + return { + query: vi.fn().mockRejectedValue(error), + } as unknown as Pool; +} + +const EVENT_ID = 'aaa00000-0000-0000-0000-000000000001'; +const TS_A = new Date('2025-06-01T10:00:00.000Z'); +const TS_B = new Date('2025-06-01T11:00:00.000Z'); + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('createSnapshotProvider.forEvent', () => { + it('returns one entry per device when each has a non-faulty position', async () => { + const rows: SnapshotRow[] = [ + { device_id: 'IMEI001', latitude: 41.33, longitude: 19.83, ts: TS_A, speed: 60, angle: 90 }, + { device_id: 'IMEI002', latitude: 41.34, longitude: 19.84, ts: TS_B, speed: 0, angle: 0 }, + ]; + const { pool } = makeMockPool(rows); + const provider = createSnapshotProvider(pool, makeSilentLogger(), makeMetrics()); + + const result = await provider.forEvent(EVENT_ID); + + expect(result).toHaveLength(2); + + const entry1 = result.find((e) => e.deviceId === 'IMEI001'); + expect(entry1).toBeDefined(); + expect(entry1!.lat).toBe(41.33); + expect(entry1!.lon).toBe(19.83); + expect(entry1!.ts).toBe(TS_A.getTime()); + expect(entry1!.speed).toBe(60); // speed > 0 → included + expect(entry1!.course).toBe(90); // angle > 0 → included as course + + const entry2 = result.find((e) => e.deviceId === 'IMEI002'); + expect(entry2).toBeDefined(); + expect(entry2!.speed).toBeUndefined(); // speed = 0 → omitted + expect(entry2!.course).toBeUndefined(); // angle = 0 → omitted + }); + + it('returns an empty array when the event has no registered devices', async () => { + const { pool } = makeMockPool([]); + const provider = createSnapshotProvider(pool, makeSilentLogger(), makeMetrics()); + + const result = await provider.forEvent(EVENT_ID); + + expect(result).toEqual([]); + }); + + it('excludes faulty positions — returns only non-faulty positions', async () => { + // The Postgres query includes WHERE faulty=false; the mock returns what + // Postgres would: only IMEI001 has a non-faulty position, IMEI002 does not. + const rows: SnapshotRow[] = [ + { device_id: 'IMEI001', latitude: 41.33, longitude: 19.83, ts: TS_A, speed: 30, angle: 45 }, + // IMEI002 has only faulty positions → Postgres returns no row for it + ]; + const { pool, queryCalls } = makeMockPool(rows); + const provider = createSnapshotProvider(pool, makeSilentLogger(), makeMetrics()); + + const result = await provider.forEvent(EVENT_ID); + + expect(result).toHaveLength(1); + expect(result[0]!.deviceId).toBe('IMEI001'); + + // Verify the SQL contains the faulty filter + expect(queryCalls[0]!.sql).toContain('faulty = false'); + }); + + it('returns the most recent non-faulty position per device (DISTINCT ON semantics)', async () => { + // Postgres DISTINCT ON (p.device_id) ORDER BY p.device_id, p.ts DESC returns + // one row per device — the one with the highest ts. The mock simulates this. + const rows: SnapshotRow[] = [ + // IMEI001: Postgres selected the row with TS_B (more recent) + { + device_id: 'IMEI001', + latitude: 41.50, + longitude: 19.90, + ts: TS_B, // most recent + speed: 50, + angle: 0, + }, + ]; + const { pool } = makeMockPool(rows); + const provider = createSnapshotProvider(pool, makeSilentLogger(), makeMetrics()); + + const result = await provider.forEvent(EVENT_ID); + + expect(result).toHaveLength(1); + expect(result[0]!.ts).toBe(TS_B.getTime()); // epoch ms of the most recent position + }); + + it('passes eventId as a parameterized query argument', async () => { + const { pool, queryCalls } = makeMockPool([]); + const provider = createSnapshotProvider(pool, makeSilentLogger(), makeMetrics()); + + await provider.forEvent(EVENT_ID); + + expect(queryCalls).toHaveLength(1); + expect(queryCalls[0]!.params).toEqual([EVENT_ID]); + }); + + it('observes snapshot query latency and snapshot size metrics', async () => { + const rows: SnapshotRow[] = [ + { device_id: 'IMEI001', latitude: 41.33, longitude: 19.83, ts: TS_A, speed: 10, angle: 5 }, + { device_id: 'IMEI002', latitude: 41.34, longitude: 19.84, ts: TS_B, speed: 0, angle: 0 }, + ]; + const { pool } = makeMockPool(rows); + const metrics = makeMetrics(); + const provider = createSnapshotProvider(pool, makeSilentLogger(), metrics); + + await provider.forEvent(EVENT_ID); + + const latency = metrics.observeCalls.find( + (c) => c.name === 'processor_live_snapshot_query_latency_ms', + ); + expect(latency).toBeDefined(); + expect(latency!.value).toBeGreaterThanOrEqual(0); + + const size = metrics.observeCalls.find( + (c) => c.name === 'processor_live_snapshot_size', + ); + expect(size).toBeDefined(); + expect(size!.value).toBe(2); + }); + + it('propagates Postgres errors (registry.fetchSnapshot catches them)', async () => { + const pool = makeErrorPool(new Error('connection refused')); + const provider = createSnapshotProvider(pool, makeSilentLogger(), makeMetrics()); + + // snapshot.ts does NOT catch errors — registry.ts's fetchSnapshot does. + // This ensures the error propagates cleanly. + await expect(provider.forEvent(EVENT_ID)).rejects.toThrow('connection refused'); + }); +});