From 87dec03d3c93d3606c215da68991ae2fa6746a33 Mon Sep 17 00:00:00 2001 From: Julian Cuni Date: Sat, 2 May 2026 17:59:42 +0200 Subject: [PATCH] =?UTF-8?q?feat(live):=20task=201.5.6=20=E2=80=94=20live?= =?UTF-8?q?=20broadcast=20integration=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds end-to-end integration test for the WebSocket live broadcast pipeline: Redis + TimescaleDB containers + Directus stub → full pipeline boot → real WS client assertions. Mirrors pipeline.integration.test.ts pattern with the skip-on-no-Docker guard. Key additions: - test/live.integration.test.ts: 6 test scenarios — happy path (subscribe → snapshot → live position), auth rejection (401), forbidden subscription (error/forbidden), multi-client fan-out (both receive position), orphan position (no WS frame), faulty snapshot exclusion (next-best non-faulty) - test/helpers/directus-stub.ts: bare http.createServer stub for /users/me and /items/events/:id endpoints with cookie-based user lookup - test/fixtures/test-schema.sql: minimal schema subset (events, entries, entry_devices with IMEI-as-device_id for Phase 1 join semantics) The integration test runs via `pnpm test:integration`, not `pnpm test`. Docker required; the suite skips cleanly when Docker is unavailable. --- test/fixtures/test-schema.sql | 39 ++ test/helpers/directus-stub.ts | 107 ++++++ test/live.integration.test.ts | 690 ++++++++++++++++++++++++++++++++++ 3 files changed, 836 insertions(+) create mode 100644 test/fixtures/test-schema.sql create mode 100644 test/helpers/directus-stub.ts create mode 100644 test/live.integration.test.ts diff --git a/test/fixtures/test-schema.sql b/test/fixtures/test-schema.sql new file mode 100644 index 0000000..14fc1e0 --- /dev/null +++ b/test/fixtures/test-schema.sql @@ -0,0 +1,39 @@ +-- test/fixtures/test-schema.sql +-- +-- Minimum subset of the production schema required by live.integration.test.ts. +-- This is intentionally a simplified version — NOT the full Directus-managed schema. +-- +-- Maintenance note: keep in sync with the real schema when column types change on +-- these tables. Specifically: entries.event_id, entry_devices.device_id (Phase 1 +-- uses IMEI text; Phase 2 introduces UUID-based devices table). +-- +-- Phase 1 deviation: entry_devices.device_id is TEXT (IMEI) here, matching +-- positions.device_id. The real Directus schema uses a UUID FK to devices.id. +-- The integration test uses the real queries from device-event-map.ts and +-- snapshot.ts, so this simplified schema must satisfy those joins. + +-- events — the container for entries +-- The Processor reads events.id (used in snapshot WHERE e.event_id = $1). +CREATE TABLE IF NOT EXISTS events ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid() + -- Real schema also has: organization_id FK, name, slug, discipline, starts_at, ends_at. + -- Only columns the Processor queries are included here. +); + +-- entries — race entries belonging to an event +-- The Processor reads entries.id and entries.event_id. +CREATE TABLE IF NOT EXISTS entries ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + event_id uuid NOT NULL REFERENCES events (id) ON DELETE CASCADE + -- Real schema also has: vehicle_id, class_id, number, etc. +); + +-- entry_devices — maps a device (IMEI) to an entry. +-- Phase 1: device_id is IMEI text, matching positions.device_id. +-- Real schema: device_id is UUID FK to devices.id, joined via devices.imei. +-- This simplified form is intentional for the integration test fixture. +CREATE TABLE IF NOT EXISTS entry_devices ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + entry_id uuid NOT NULL REFERENCES entries (id) ON DELETE CASCADE, + device_id text NOT NULL -- IMEI in Phase 1 +); diff --git a/test/helpers/directus-stub.ts b/test/helpers/directus-stub.ts new file mode 100644 index 0000000..94d348b --- /dev/null +++ b/test/helpers/directus-stub.ts @@ -0,0 +1,107 @@ +/** + * Minimal HTTP server stub impersonating the two Directus endpoints the + * Processor calls: + * + * GET /users/me — returns a fake user if the cookie matches + * GET /items/events/:id — returns 200 if (cookie, eventId) is allowed + * + * Instantiate with `createDirectusStub(opts)` and tear down with + * `stub.close()`. The stub binds to a random OS port and exposes `stub.url` + * for config injection. + * + * Design: bare `node:http` — no Express dependency. + */ + +import * as http from 'node:http'; +import type { AddressInfo } from 'node:net'; + +// --------------------------------------------------------------------------- +// Public types +// --------------------------------------------------------------------------- + +export type FakeUser = { + readonly id: string; + readonly email: string; + readonly role: string | null; + readonly first_name: string; + readonly last_name: string; +}; + +export type StubOptions = { + /** + * Map from the raw cookie header value (e.g. `"session=abc"`) to the fake + * user that cookie represents. Any cookie not in this map → 401. + */ + readonly allowedCookieToUser: Map; + /** + * Map from Directus user ID → set of event IDs that user may access. + * A request from a valid user for an event not in their set → 403. + */ + readonly allowedEvents: Map>; +}; + +export type DirectusStub = { + readonly url: string; + readonly close: () => Promise; +}; + +// --------------------------------------------------------------------------- +// Factory +// --------------------------------------------------------------------------- + +/** + * Creates and starts a Directus stub server on a random port. + * Returns a promise that resolves once the server is listening. + */ +export function createDirectusStub(opts: StubOptions): Promise { + const server = http.createServer((req, res) => { + const cookie = req.headers['cookie'] ?? ''; + const user = opts.allowedCookieToUser.get(cookie); + + // GET /users/me + if (req.url === '/users/me') { + if (!user) { + res.writeHead(401).end(); + return; + } + res.writeHead(200, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ data: user })); + return; + } + + // GET /items/events/:id + const eventMatch = /^\/items\/events\/([0-9a-f-]+)/i.exec(req.url ?? ''); + if (eventMatch) { + if (!user) { + res.writeHead(401).end(); + return; + } + const eventId = eventMatch[1]!; + const allowed = opts.allowedEvents.get(user.id)?.has(eventId) ?? false; + if (!allowed) { + res.writeHead(403).end(); + return; + } + res.writeHead(200, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ data: { id: eventId } })); + return; + } + + res.writeHead(404).end(); + }); + + return new Promise((resolve, reject) => { + server.on('error', reject); + server.listen(0, '127.0.0.1', () => { + server.off('error', reject); + const addr = server.address() as AddressInfo; + resolve({ + url: `http://127.0.0.1:${addr.port}`, + close: () => + new Promise((res, rej) => + server.close((err) => (err ? rej(err) : res())), + ), + }); + }); + }); +} diff --git a/test/live.integration.test.ts b/test/live.integration.test.ts new file mode 100644 index 0000000..f4c5a7b --- /dev/null +++ b/test/live.integration.test.ts @@ -0,0 +1,690 @@ +/** + * Integration test: live broadcast pipeline end-to-end. + * + * Spins up Redis 7-alpine + TimescaleDB-HA containers, starts an HTTP server + * impersonating Directus (/users/me + /items/events/:id), boots the full live + * broadcast pipeline (auth, registry, snapshot, broadcast consumer), and verifies + * the WebSocket protocol from the perspective of a real WS client. + * + * Skip-on-no-Docker: same pattern as pipeline.integration.test.ts. + * Each `it` block has an explicit `if (!dockerAvailable) return` guard. + * + * Tests: + * 1. Happy path: subscribe → snapshot with seeded positions → live position frame. + * 2. Auth rejection: connect without cookie → HTTP 401. + * 3. Forbidden subscription: valid user, unauthorized event → error/forbidden. + * 4. Multi-client fan-out: two clients subscribed → both receive the position. + * 5. Orphan position: device not in entry_devices → no WS frame. + * 6. Faulty snapshot exclusion: faulty position is excluded; next-best is returned. + */ + +import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import { GenericContainer, type StartedTestContainer, Wait } from 'testcontainers'; +import { WebSocket } from 'ws'; +import * as fs from 'node:fs/promises'; +import * as path from 'node:path'; +import * as http from 'node:http'; +import type { Redis } from 'ioredis'; +import type pg from 'pg'; +import { vi } from 'vitest'; +import type { Logger } from 'pino'; +import type { Config } from '../src/config/load.js'; +import type { Position } from '../src/shared/types.js'; +import { createPool, connectWithRetry } from '../src/db/pool.js'; +import { runMigrations } from '../src/db/migrate.js'; +import { connectRedis } from '../src/core/consumer.js'; +import { createMetrics } from '../src/observability/metrics.js'; +import { createDeviceStateStore } from '../src/core/state.js'; +import { createWriter } from '../src/core/writer.js'; +import { createConsumer, ensureConsumerGroup } from '../src/core/consumer.js'; +import { createLiveServer } from '../src/live/server.js'; +import { createAuthClient } from '../src/live/auth.js'; +import { createAuthzClient } from '../src/live/authz.js'; +import { createSubscriptionRegistry } from '../src/live/registry.js'; +import { createSnapshotProvider } from '../src/live/snapshot.js'; +import { createBroadcastConsumer } from '../src/live/broadcast.js'; +import { createDeviceEventMap } from '../src/live/device-event-map.js'; +import { createDirectusStub } from './helpers/directus-stub.js'; +import type { FakeUser } from './helpers/directus-stub.js'; +import type { ConsumedRecord } from '../src/core/consumer.js'; +import type { LiveConnection } from '../src/live/server.js'; +import type { InboundMessage } from '../src/live/protocol.js'; +import type { AddressInfo } from 'node:net'; +import type { SubscribedMessage, PositionMessage, ErrorMessage } from '../src/live/protocol.js'; + +// --------------------------------------------------------------------------- +// Constants +// --------------------------------------------------------------------------- + +const STREAM = 'telemetry:teltonika'; +const GROUP = 'processor'; +const BROADCAST_GROUP_PREFIX = 'live-broadcast'; + +const EVENT_ID = 'ee000000-0000-0000-0000-000000000001'; +const OTHER_EVENT_ID = 'ee000000-0000-0000-0000-000000000002'; +const ENTRY_ID = 'aa000000-0000-0000-0000-000000000001'; +const DEVICE_1 = '111111111111111'; // IMEI +const DEVICE_2 = '222222222222222'; // IMEI +const DEVICE_ORPHAN = '999999999999999'; // not in entry_devices + +const USER_A: FakeUser = { + id: 'user-aaaa-0000-0000-0000-000000000001', + email: 'user-a@test.com', + role: null, + first_name: 'User', + last_name: 'A', +}; +const USER_B: FakeUser = { + id: 'user-bbbb-0000-0000-0000-000000000002', + email: 'user-b@test.com', + role: null, + first_name: 'User', + last_name: 'B', +}; + +const COOKIE_A = 'session=valid-user-a'; +const COOKIE_B = 'session=valid-user-b'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeSilentLogger(): Logger { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + fatal: vi.fn(), + trace: vi.fn(), + child: vi.fn().mockReturnThis(), + level: 'silent', + silent: vi.fn(), + } as unknown as Logger; +} + +function makeConfig(overrides: Partial = {}): Config { + return { + NODE_ENV: 'test', + INSTANCE_ID: 'test-live-integration', + LOG_LEVEL: 'silent', + REDIS_URL: 'redis://localhost:6379', + POSTGRES_URL: 'postgres://postgres:postgres@localhost:5432/trm', + REDIS_TELEMETRY_STREAM: STREAM, + REDIS_CONSUMER_GROUP: GROUP, + REDIS_CONSUMER_NAME: 'test-consumer', + METRICS_PORT: 0, + BATCH_SIZE: 100, + BATCH_BLOCK_MS: 500, + WRITE_BATCH_SIZE: 50, + DEVICE_STATE_LRU_CAP: 10_000, + LIVE_WS_PORT: 0, // OS-assigned; overridden below with the actual port + LIVE_WS_HOST: '127.0.0.1', + LIVE_WS_PING_INTERVAL_MS: 60_000, + LIVE_WS_DRAIN_TIMEOUT_MS: 2_000, + LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES: 1_048_576, + DIRECTUS_BASE_URL: 'http://localhost:8055', // overridden below + DIRECTUS_AUTH_TIMEOUT_MS: 5_000, + DIRECTUS_AUTHZ_TIMEOUT_MS: 5_000, + LIVE_BROADCAST_GROUP_PREFIX: BROADCAST_GROUP_PREFIX, + LIVE_BROADCAST_BATCH_SIZE: 100, + LIVE_BROADCAST_BATCH_BLOCK_MS: 500, + LIVE_DEVICE_EVENT_REFRESH_MS: 5_000, // faster for tests + ...overrides, + }; +} + +/** + * Serializes a Position into the flat field list for XADD. + * Mirrors tcp-ingestion's serializePosition format. + */ +function buildXaddFields(position: Position): string[] { + function jsonReplacer(_key: string, value: unknown): unknown { + if (typeof value === 'bigint') return { __bigint: value.toString() }; + if (value instanceof Uint8Array) return { __buffer_b64: Buffer.from(value).toString('base64') }; + if (value instanceof Date) return value.toISOString(); + return value; + } + return [ + 'ts', position.timestamp.toISOString(), + 'device_id', position.device_id, + 'codec', '8', + 'payload', JSON.stringify(position, jsonReplacer), + ]; +} + +/** + * Waits for the next WS message matching `predicate`, with a timeout. + */ +async function waitForMessage( + ws: WebSocket, + predicate: (msg: unknown) => msg is T, + timeoutMs = 5_000, +): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout( + () => reject(new Error(`Timeout after ${timeoutMs}ms waiting for matching WS message`)), + timeoutMs, + ); + const handler = (data: Buffer | string): void => { + const msg: unknown = JSON.parse(data.toString()); + if (predicate(msg)) { + clearTimeout(timer); + ws.off('message', handler); + resolve(msg); + } + }; + ws.on('message', handler); + }); +} + +function isSubscribedMessage(msg: unknown): msg is SubscribedMessage { + return typeof msg === 'object' && msg !== null && (msg as Record)['type'] === 'subscribed'; +} + +function isPositionMessage(msg: unknown): msg is PositionMessage { + return typeof msg === 'object' && msg !== null && (msg as Record)['type'] === 'position'; +} + +function isErrorMessage(msg: unknown): msg is ErrorMessage { + return typeof msg === 'object' && msg !== null && (msg as Record)['type'] === 'error'; +} + +/** + * Opens a WebSocket client and waits for the connection to be established. + */ +async function openClient(wsUrl: string, cookie?: string): Promise { + return new Promise((resolve, reject) => { + const ws = new WebSocket(wsUrl, { + headers: cookie ? { cookie } : undefined, + }); + ws.once('open', () => resolve(ws)); + ws.once('error', (err) => reject(err)); + ws.once('unexpected-response', (_req, res) => { + reject(new Error(`WS upgrade rejected with HTTP ${res.statusCode}`)); + }); + }); +} + +// --------------------------------------------------------------------------- +// Test fixture — seed data +// --------------------------------------------------------------------------- + +async function seedDatabase(pool: pg.Pool): Promise { + // events + await pool.query(`INSERT INTO events (id) VALUES ($1), ($2)`, [EVENT_ID, OTHER_EVENT_ID]); + + // entries — DEVICE_1 and DEVICE_2 are registered to EVENT_ID + await pool.query( + `INSERT INTO entries (id, event_id) VALUES ($1, $2)`, + [ENTRY_ID, EVENT_ID], + ); + + // entry_devices — Phase 1 uses IMEI as device_id + await pool.query( + `INSERT INTO entry_devices (id, entry_id, device_id) VALUES + (gen_random_uuid(), $1, $2), + (gen_random_uuid(), $1, $3)`, + [ENTRY_ID, DEVICE_1, DEVICE_2], + ); + + // positions for DEVICE_1 (two: one non-faulty, one faulty) + await pool.query( + `INSERT INTO positions (device_id, ts, latitude, longitude, altitude, angle, speed, satellites, priority, codec, attributes) + VALUES + ($1, '2026-05-01T10:00:00Z', 41.33, 19.83, 50, 90, 60, 8, 0, '8', '{}'), + ($1, '2026-05-01T09:00:00Z', 41.30, 19.80, 50, 0, 0, 8, 0, '8', '{}')`, + [DEVICE_1], + ); + + // positions for DEVICE_2 (one non-faulty) + await pool.query( + `INSERT INTO positions (device_id, ts, latitude, longitude, altitude, angle, speed, satellites, priority, codec, attributes) + VALUES ($1, '2026-05-01T10:00:00Z', 41.34, 19.84, 50, 0, 0, 8, 0, '8', '{}')`, + [DEVICE_2], + ); +} + +// --------------------------------------------------------------------------- +// Container and pipeline lifecycle +// --------------------------------------------------------------------------- + +let redisContainer: StartedTestContainer | null = null; +let pgContainer: StartedTestContainer | null = null; +let redisClientXadd: Redis | null = null; +let pgPool: pg.Pool | null = null; +let wsUrl = ''; +let liveServer: { start: () => Promise; stop: () => Promise } | null = null; +let broadcastConsumer: { start: () => Promise; stop: () => Promise } | null = null; +let durableConsumer: { start: () => Promise; stop: () => Promise } | null = null; +let directusStub: { url: string; close: () => Promise } | null = null; +let metricsServer: http.Server | null = null; +let dockerAvailable = true; + +beforeAll(async () => { + // --- Step 1: Redis container ----------------------------------------------- + try { + redisContainer = await new GenericContainer('redis:7-alpine') + .withExposedPorts(6379) + .withWaitStrategy(Wait.forLogMessage('Ready to accept connections')) + .start(); + } catch { + console.warn('[live.integration.test] Docker not available — skipping live integration tests'); + dockerAvailable = false; + return; + } + + // --- Step 2: TimescaleDB container ----------------------------------------- + try { + pgContainer = await new GenericContainer('timescale/timescaledb-ha:pg16.6-ts2.17.2-all') + .withExposedPorts(5432) + .withEnvironment({ + POSTGRES_USER: 'postgres', + POSTGRES_PASSWORD: 'postgres', + POSTGRES_DB: 'trm', + }) + .withWaitStrategy(Wait.forLogMessage('database system is ready to accept connections', 2)) + .start(); + } catch (err) { + console.warn(`[live.integration.test] Failed to start TimescaleDB: ${String(err)} — skipping`); + dockerAvailable = false; + await redisContainer?.stop().catch(() => {}); + redisContainer = null; + return; + } + + const redisHost = redisContainer.getHost(); + const redisPort = redisContainer.getMappedPort(6379); + const pgHost = pgContainer.getHost(); + const pgPort = pgContainer.getMappedPort(5432); + + const redisUrl = `redis://${redisHost}:${redisPort}`; + const postgresUrl = `postgres://postgres:postgres@${pgHost}:${pgPort}/trm`; + + const logger = makeSilentLogger(); + + // --- Step 3: Directus stub ------------------------------------------------- + directusStub = await createDirectusStub({ + allowedCookieToUser: new Map([ + [COOKIE_A, USER_A], + [COOKIE_B, USER_B], + ]), + allowedEvents: new Map([ + [USER_A.id, new Set([EVENT_ID])], // user A can access EVENT_ID only + [USER_B.id, new Set([OTHER_EVENT_ID])], // user B cannot access EVENT_ID + ]), + }); + + // --- Step 4: Redis client for XADD in tests -------------------------------- + const { default: IRedis } = await import('ioredis'); + redisClientXadd = new IRedis(redisUrl, { + enableOfflineQueue: false, + lazyConnect: true, + maxRetriesPerRequest: 0, + }); + await redisClientXadd.connect(); + + // --- Step 5: Postgres pool, migrations, test schema, seed ------------------ + pgPool = createPool(postgresUrl); + await connectWithRetry(pgPool, logger); + await runMigrations(pgPool, logger); + + // Load the test-only schema (entry_devices, entries, events simplified tables). + const fixtureSQL = await fs.readFile( + path.join(import.meta.dirname ?? __dirname, 'fixtures', 'test-schema.sql'), + 'utf-8', + ); + await pgPool.query(fixtureSQL); + await seedDatabase(pgPool); + + // --- Step 6: Wire live broadcast pipeline ---------------------------------- + const config = makeConfig({ + REDIS_URL: redisUrl, + POSTGRES_URL: postgresUrl, + DIRECTUS_BASE_URL: directusStub.url, + LIVE_WS_PORT: 0, // OS-assigned + }); + + const metrics = createMetrics(); + + // Live server — bind to OS-assigned port. + const authClient = createAuthClient(config, logger, metrics); + const authzClient = createAuthzClient(config, logger, metrics); + const snapshotProvider = createSnapshotProvider(pgPool, logger, metrics); + const registry = createSubscriptionRegistry(authzClient, config, logger, metrics, snapshotProvider); + + const messageHandler = async ( + conn: LiveConnection, + message: InboundMessage, + ): Promise => { + if (message.type === 'subscribe') { + await registry.subscribe(conn, message.topic, message.id); + } else if (message.type === 'unsubscribe') { + registry.unsubscribe(conn, message.topic, message.id); + } + }; + + liveServer = createLiveServer(config, logger, metrics, messageHandler, (conn) => { + registry.onConnectionClose(conn); + }, authClient); + + await liveServer.start(); + + // Get the actual bound port (LIVE_WS_PORT=0 means OS-assigned). + // createLiveServer stores the server internally; we need to get the port. + // The server is exposed via a dedicated port-query approach — use a fresh + // HTTP request to /healthz on the WS server's port to discover it. + // Actually, createLiveServer returns a LiveServer with a bound http.Server. + // We can't directly get the port from LiveServer without reading it. + // Workaround: bind to a fixed free port instead of 0. + // Re-create with a specific free port discovered via a probe server. + await liveServer.stop(); + + // Find a free port. + const wsPort = await new Promise((resolve) => { + const probe = http.createServer(); + probe.listen(0, '127.0.0.1', () => { + const port = (probe.address() as AddressInfo).port; + probe.close(() => resolve(port)); + }); + }); + + const configWithPort = makeConfig({ + REDIS_URL: redisUrl, + POSTGRES_URL: postgresUrl, + DIRECTUS_BASE_URL: directusStub.url, + LIVE_WS_PORT: wsPort, + }); + + liveServer = createLiveServer(configWithPort, logger, metrics, messageHandler, (conn) => { + registry.onConnectionClose(conn); + }, authClient); + + await liveServer.start(); + wsUrl = `ws://127.0.0.1:${wsPort}`; + + // Device event map (uses test-seeded entry_devices). + const deviceEventMap = createDeviceEventMap(pgPool, configWithPort, logger, metrics); + await deviceEventMap.start(); + + // Broadcast consumer (live fan-out). + const broadcastRedis = await connectRedis(redisUrl, logger); + broadcastConsumer = createBroadcastConsumer( + broadcastRedis, registry, deviceEventMap, configWithPort, logger, metrics, + ); + await broadcastConsumer.start(); + + // Durable-write consumer (keeps the stream moving; acks records so they + // don't pile up in the broadcast group's PEL). + const state = createDeviceStateStore(configWithPort, logger, metrics); + const writer = createWriter(pgPool, configWithPort, logger, metrics); + + await ensureConsumerGroup(redisClientXadd, STREAM, GROUP, logger); + + const sink = async (records: ConsumedRecord[]): Promise => { + for (const record of records) state.update(record.position); + const results = await writer.write(records); + return results + .filter((r) => r.status === 'inserted' || r.status === 'duplicate') + .map((r) => r.id); + }; + + const consumerRedis = await connectRedis(redisUrl, logger); + durableConsumer = createConsumer(consumerRedis, configWithPort, logger, metrics, sink); + await durableConsumer.start(); + + // Start a dummy metrics server (needed to avoid process.exit in GracefulShutdown + // patterns; not used by the test directly). + metricsServer = http.createServer((_req, res) => res.writeHead(200).end('ok')); + metricsServer.listen(0, '127.0.0.1'); +}, 120_000); + +afterAll(async () => { + await durableConsumer?.stop().catch(() => {}); + await broadcastConsumer?.stop().catch(() => {}); + await liveServer?.stop().catch(() => {}); + await redisClientXadd?.quit().catch(() => {}); + await pgPool?.end().catch(() => {}); + await directusStub?.close().catch(() => {}); + await new Promise((res) => (metricsServer?.close(() => res()) ?? res())); + await redisContainer?.stop().catch(() => {}); + await pgContainer?.stop().catch(() => {}); +}, 60_000); + +// --------------------------------------------------------------------------- +// Integration tests +// --------------------------------------------------------------------------- + +describe('live broadcast integration', () => { + // ------------------------------------------------------------------------- + // Test 1 — Happy path: subscribe → snapshot + live position + // ------------------------------------------------------------------------- + it('subscribes to an event, receives snapshot, then receives live position', async () => { + if (!dockerAvailable) { + console.warn('[live.integration.test] skipping test 1: Docker not available'); + return; + } + + const ws = await openClient(wsUrl, COOKIE_A); + + try { + // Subscribe to the seeded event. + ws.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}`, id: 'req-1' })); + + // Expect `subscribed` with a non-empty snapshot (2 devices seeded). + const subscribed = await waitForMessage(ws, isSubscribedMessage, 5_000); + expect(subscribed.type).toBe('subscribed'); + expect(subscribed.topic).toBe(`event:${EVENT_ID}`); + expect(subscribed.id).toBe('req-1'); + expect(subscribed.snapshot).toHaveLength(2); + + const snap1 = subscribed.snapshot.find((e) => e.deviceId === DEVICE_1); + expect(snap1).toBeDefined(); + expect(snap1!.lat).toBeCloseTo(41.33, 2); + expect(snap1!.lon).toBeCloseTo(19.83, 2); + + // Publish a new live position for DEVICE_1. + const liveTs = new Date('2026-06-01T12:00:00.000Z'); + const position: Position = { + device_id: DEVICE_1, + timestamp: liveTs, + latitude: 41.40, + longitude: 19.90, + altitude: 55, + angle: 45, + speed: 80, + satellites: 10, + priority: 0, + attributes: {}, + }; + + void redisClientXadd!.xadd(STREAM, '*', ...buildXaddFields(position)); + + // Expect a `position` frame within 5s. + const posMsg = await waitForMessage(ws, isPositionMessage, 5_000); + expect(posMsg.type).toBe('position'); + expect(posMsg.topic).toBe(`event:${EVENT_ID}`); + expect(posMsg.deviceId).toBe(DEVICE_1); + expect(posMsg.lat).toBeCloseTo(41.40, 2); + expect(posMsg.lon).toBeCloseTo(19.90, 2); + expect(posMsg.ts).toBe(liveTs.getTime()); + expect(posMsg.speed).toBe(80); + } finally { + ws.close(); + } + }, 30_000); + + // ------------------------------------------------------------------------- + // Test 2 — Auth rejection: no cookie → HTTP 401 + // ------------------------------------------------------------------------- + it('rejects WS upgrade with HTTP 401 when no cookie is presented', async () => { + if (!dockerAvailable) { + console.warn('[live.integration.test] skipping test 2: Docker not available'); + return; + } + + // openClient throws on unexpected-response (non-101 upgrade). + await expect(openClient(wsUrl)).rejects.toThrow(); + }, 10_000); + + // ------------------------------------------------------------------------- + // Test 3 — Forbidden subscription + // ------------------------------------------------------------------------- + it('receives error/forbidden when subscribing to an event the user cannot access', async () => { + if (!dockerAvailable) { + console.warn('[live.integration.test] skipping test 3: Docker not available'); + return; + } + + // USER_B can only access OTHER_EVENT_ID, not EVENT_ID. + const ws = await openClient(wsUrl, COOKIE_B); + + try { + ws.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}` })); + + const errorMsg = await waitForMessage(ws, isErrorMessage, 5_000); + expect(errorMsg.type).toBe('error'); + expect(errorMsg.code).toBe('forbidden'); + expect(errorMsg.topic).toBe(`event:${EVENT_ID}`); + } finally { + ws.close(); + } + }, 10_000); + + // ------------------------------------------------------------------------- + // Test 4 — Multi-client fan-out + // ------------------------------------------------------------------------- + it('delivers a live position to all subscribed clients on the same event', async () => { + if (!dockerAvailable) { + console.warn('[live.integration.test] skipping test 4: Docker not available'); + return; + } + + const ws1 = await openClient(wsUrl, COOKIE_A); + const ws2 = await openClient(wsUrl, COOKIE_A); + + try { + // Subscribe both clients to the same event. + ws1.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}` })); + ws2.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}` })); + + // Wait for both subscriptions to confirm. + await waitForMessage(ws1, isSubscribedMessage, 5_000); + await waitForMessage(ws2, isSubscribedMessage, 5_000); + + // Publish a live position for DEVICE_1. + const liveTs = new Date('2026-06-01T13:00:00.000Z'); + const position: Position = { + device_id: DEVICE_1, + timestamp: liveTs, + latitude: 41.50, + longitude: 19.95, + altitude: 60, + angle: 0, + speed: 0, + satellites: 10, + priority: 0, + attributes: {}, + }; + + void redisClientXadd!.xadd(STREAM, '*', ...buildXaddFields(position)); + + // Both clients must receive the position frame. + const [pos1, pos2] = await Promise.all([ + waitForMessage(ws1, isPositionMessage, 5_000), + waitForMessage(ws2, isPositionMessage, 5_000), + ]); + + expect(pos1.deviceId).toBe(DEVICE_1); + expect(pos2.deviceId).toBe(DEVICE_1); + expect(pos1.ts).toBe(liveTs.getTime()); + expect(pos2.ts).toBe(liveTs.getTime()); + } finally { + ws1.close(); + ws2.close(); + } + }, 30_000); + + // ------------------------------------------------------------------------- + // Test 5 — Orphan position: device not in entry_devices + // ------------------------------------------------------------------------- + it('does not deliver a position for an unregistered device; client receives no frame', async () => { + if (!dockerAvailable) { + console.warn('[live.integration.test] skipping test 5: Docker not available'); + return; + } + + const ws = await openClient(wsUrl, COOKIE_A); + + try { + ws.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}` })); + await waitForMessage(ws, isSubscribedMessage, 5_000); + + // Publish a position for DEVICE_ORPHAN (not in entry_devices). + const position: Position = { + device_id: DEVICE_ORPHAN, + timestamp: new Date('2026-06-01T14:00:00.000Z'), + latitude: 42.00, + longitude: 20.00, + altitude: 60, + angle: 0, + speed: 0, + satellites: 8, + priority: 0, + attributes: {}, + }; + + void redisClientXadd!.xadd(STREAM, '*', ...buildXaddFields(position)); + + // Wait 2s — no position frame should arrive for this orphan device. + const noFrame = await Promise.race([ + waitForMessage(ws, isPositionMessage, 2_000).then(() => 'received'), + new Promise<'timeout'>((resolve) => setTimeout(() => resolve('timeout'), 2_100)), + ]); + + expect(noFrame).toBe('timeout'); + } finally { + ws.close(); + } + }, 15_000); + + // ------------------------------------------------------------------------- + // Test 6 — Faulty snapshot exclusion + // ------------------------------------------------------------------------- + it('excludes faulty positions from the snapshot; uses next-best non-faulty position', async () => { + if (!dockerAvailable || !pgPool) { + console.warn('[live.integration.test] skipping test 6: Docker not available'); + return; + } + + // Mark DEVICE_1's most recent position (10:00:00) as faulty. + await pgPool.query( + `UPDATE positions SET faulty = true WHERE device_id = $1 AND ts = '2026-05-01T10:00:00Z'`, + [DEVICE_1], + ); + + try { + const ws = await openClient(wsUrl, COOKIE_A); + + try { + ws.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}` })); + const subscribed = await waitForMessage(ws, isSubscribedMessage, 5_000); + + // DEVICE_1's most recent faulty row is excluded; the next non-faulty + // (09:00:00 at lat 41.30) should be returned instead. + const snap1 = subscribed.snapshot.find((e) => e.deviceId === DEVICE_1); + expect(snap1).toBeDefined(); + expect(snap1!.lat).toBeCloseTo(41.30, 2); + expect(snap1!.lon).toBeCloseTo(19.80, 2); + } finally { + ws.close(); + } + } finally { + // Restore: un-mark the faulty position so it doesn't affect other tests. + await pgPool.query( + `UPDATE positions SET faulty = false WHERE device_id = $1 AND ts = '2026-05-01T10:00:00Z'`, + [DEVICE_1], + ); + } + }, 15_000); +});