/** * Integration test: live broadcast pipeline end-to-end. * * Spins up Redis 7-alpine + TimescaleDB-HA containers, starts an HTTP server * impersonating Directus (/users/me + /items/events/:id), boots the full live * broadcast pipeline (auth, registry, snapshot, broadcast consumer), and verifies * the WebSocket protocol from the perspective of a real WS client. * * Skip-on-no-Docker: same pattern as pipeline.integration.test.ts. * Each `it` block has an explicit `if (!dockerAvailable) return` guard. * * Tests: * 1. Happy path: subscribe → snapshot with seeded positions → live position frame. * 2. Auth rejection: connect without cookie → HTTP 401. * 3. Forbidden subscription: valid user, unauthorized event → error/forbidden. * 4. Multi-client fan-out: two clients subscribed → both receive the position. * 5. Orphan position: device not in entry_devices → no WS frame. * 6. Faulty snapshot exclusion: faulty position is excluded; next-best is returned. */ import { describe, it, expect, beforeAll, afterAll } from 'vitest'; import { GenericContainer, type StartedTestContainer, Wait } from 'testcontainers'; import { WebSocket } from 'ws'; import * as fs from 'node:fs/promises'; import * as path from 'node:path'; import * as http from 'node:http'; import type { Redis } from 'ioredis'; import type pg from 'pg'; import { vi } from 'vitest'; import type { Logger } from 'pino'; import type { Config } from '../src/config/load.js'; import type { Position } from '../src/shared/types.js'; import { createPool, connectWithRetry } from '../src/db/pool.js'; import { runMigrations } from '../src/db/migrate.js'; import { connectRedis } from '../src/core/consumer.js'; import { createMetrics } from '../src/observability/metrics.js'; import { createDeviceStateStore } from '../src/core/state.js'; import { createWriter } from '../src/core/writer.js'; import { createConsumer, ensureConsumerGroup } from '../src/core/consumer.js'; import { createLiveServer } from '../src/live/server.js'; import { createAuthClient } from '../src/live/auth.js'; import { createAuthzClient } from '../src/live/authz.js'; import { createSubscriptionRegistry } from '../src/live/registry.js'; import { createSnapshotProvider } from '../src/live/snapshot.js'; import { createBroadcastConsumer } from '../src/live/broadcast.js'; import { createDeviceEventMap } from '../src/live/device-event-map.js'; import { createDirectusStub } from './helpers/directus-stub.js'; import type { FakeUser } from './helpers/directus-stub.js'; import type { ConsumedRecord } from '../src/core/consumer.js'; import type { LiveConnection } from '../src/live/server.js'; import type { InboundMessage } from '../src/live/protocol.js'; import type { AddressInfo } from 'node:net'; import type { SubscribedMessage, PositionMessage, ErrorMessage } from '../src/live/protocol.js'; // --------------------------------------------------------------------------- // Constants // --------------------------------------------------------------------------- const STREAM = 'telemetry:teltonika'; const GROUP = 'processor'; const BROADCAST_GROUP_PREFIX = 'live-broadcast'; const EVENT_ID = 'ee000000-0000-0000-0000-000000000001'; const OTHER_EVENT_ID = 'ee000000-0000-0000-0000-000000000002'; const ENTRY_ID = 'aa000000-0000-0000-0000-000000000001'; const DEVICE_1 = '111111111111111'; // IMEI const DEVICE_2 = '222222222222222'; // IMEI const DEVICE_ORPHAN = '999999999999999'; // not in entry_devices const USER_A: FakeUser = { id: 'user-aaaa-0000-0000-0000-000000000001', email: 'user-a@test.com', role: null, first_name: 'User', last_name: 'A', }; const USER_B: FakeUser = { id: 'user-bbbb-0000-0000-0000-000000000002', email: 'user-b@test.com', role: null, first_name: 'User', last_name: 'B', }; const COOKIE_A = 'session=valid-user-a'; const COOKIE_B = 'session=valid-user-b'; // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- function makeSilentLogger(): Logger { return { debug: vi.fn(), info: vi.fn(), warn: vi.fn(), error: vi.fn(), fatal: vi.fn(), trace: vi.fn(), child: vi.fn().mockReturnThis(), level: 'silent', silent: vi.fn(), } as unknown as Logger; } function makeConfig(overrides: Partial = {}): Config { return { NODE_ENV: 'test', INSTANCE_ID: 'test-live-integration', LOG_LEVEL: 'silent', REDIS_URL: 'redis://localhost:6379', POSTGRES_URL: 'postgres://postgres:postgres@localhost:5432/trm', REDIS_TELEMETRY_STREAM: STREAM, REDIS_CONSUMER_GROUP: GROUP, REDIS_CONSUMER_NAME: 'test-consumer', METRICS_PORT: 0, BATCH_SIZE: 100, BATCH_BLOCK_MS: 500, WRITE_BATCH_SIZE: 50, DEVICE_STATE_LRU_CAP: 10_000, LIVE_WS_PORT: 0, // OS-assigned; overridden below with the actual port LIVE_WS_HOST: '127.0.0.1', LIVE_WS_PING_INTERVAL_MS: 60_000, LIVE_WS_DRAIN_TIMEOUT_MS: 2_000, LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES: 1_048_576, DIRECTUS_BASE_URL: 'http://localhost:8055', // overridden below DIRECTUS_AUTH_TIMEOUT_MS: 5_000, DIRECTUS_AUTHZ_TIMEOUT_MS: 5_000, LIVE_BROADCAST_GROUP_PREFIX: BROADCAST_GROUP_PREFIX, LIVE_BROADCAST_BATCH_SIZE: 100, LIVE_BROADCAST_BATCH_BLOCK_MS: 500, LIVE_DEVICE_EVENT_REFRESH_MS: 5_000, // faster for tests ...overrides, }; } /** * Serializes a Position into the flat field list for XADD. * Mirrors tcp-ingestion's serializePosition format. */ function buildXaddFields(position: Position): string[] { function jsonReplacer(_key: string, value: unknown): unknown { if (typeof value === 'bigint') return { __bigint: value.toString() }; if (value instanceof Uint8Array) return { __buffer_b64: Buffer.from(value).toString('base64') }; if (value instanceof Date) return value.toISOString(); return value; } return [ 'ts', position.timestamp.toISOString(), 'device_id', position.device_id, 'codec', '8', 'payload', JSON.stringify(position, jsonReplacer), ]; } /** * Waits for the next WS message matching `predicate`, with a timeout. */ async function waitForMessage( ws: WebSocket, predicate: (msg: unknown) => msg is T, timeoutMs = 5_000, ): Promise { return new Promise((resolve, reject) => { const timer = setTimeout( () => reject(new Error(`Timeout after ${timeoutMs}ms waiting for matching WS message`)), timeoutMs, ); const handler = (data: Buffer | string): void => { const msg: unknown = JSON.parse(data.toString()); if (predicate(msg)) { clearTimeout(timer); ws.off('message', handler); resolve(msg); } }; ws.on('message', handler); }); } function isSubscribedMessage(msg: unknown): msg is SubscribedMessage { return typeof msg === 'object' && msg !== null && (msg as Record)['type'] === 'subscribed'; } function isPositionMessage(msg: unknown): msg is PositionMessage { return typeof msg === 'object' && msg !== null && (msg as Record)['type'] === 'position'; } function isErrorMessage(msg: unknown): msg is ErrorMessage { return typeof msg === 'object' && msg !== null && (msg as Record)['type'] === 'error'; } /** * Opens a WebSocket client and waits for the connection to be established. */ async function openClient(wsUrl: string, cookie?: string): Promise { return new Promise((resolve, reject) => { const ws = new WebSocket(wsUrl, { headers: cookie ? { cookie } : undefined, }); ws.once('open', () => resolve(ws)); ws.once('error', (err) => reject(err)); ws.once('unexpected-response', (_req, res) => { reject(new Error(`WS upgrade rejected with HTTP ${res.statusCode}`)); }); }); } // --------------------------------------------------------------------------- // Test fixture — seed data // --------------------------------------------------------------------------- async function seedDatabase(pool: pg.Pool): Promise { // events await pool.query(`INSERT INTO events (id) VALUES ($1), ($2)`, [EVENT_ID, OTHER_EVENT_ID]); // entries — DEVICE_1 and DEVICE_2 are registered to EVENT_ID await pool.query( `INSERT INTO entries (id, event_id) VALUES ($1, $2)`, [ENTRY_ID, EVENT_ID], ); // entry_devices — Phase 1 uses IMEI as device_id await pool.query( `INSERT INTO entry_devices (id, entry_id, device_id) VALUES (gen_random_uuid(), $1, $2), (gen_random_uuid(), $1, $3)`, [ENTRY_ID, DEVICE_1, DEVICE_2], ); // positions for DEVICE_1 (two: one non-faulty, one faulty) await pool.query( `INSERT INTO positions (device_id, ts, latitude, longitude, altitude, angle, speed, satellites, priority, codec, attributes) VALUES ($1, '2026-05-01T10:00:00Z', 41.33, 19.83, 50, 90, 60, 8, 0, '8', '{}'), ($1, '2026-05-01T09:00:00Z', 41.30, 19.80, 50, 0, 0, 8, 0, '8', '{}')`, [DEVICE_1], ); // positions for DEVICE_2 (one non-faulty) await pool.query( `INSERT INTO positions (device_id, ts, latitude, longitude, altitude, angle, speed, satellites, priority, codec, attributes) VALUES ($1, '2026-05-01T10:00:00Z', 41.34, 19.84, 50, 0, 0, 8, 0, '8', '{}')`, [DEVICE_2], ); } // --------------------------------------------------------------------------- // Container and pipeline lifecycle // --------------------------------------------------------------------------- let redisContainer: StartedTestContainer | null = null; let pgContainer: StartedTestContainer | null = null; let redisClientXadd: Redis | null = null; let pgPool: pg.Pool | null = null; let wsUrl = ''; let liveServer: { start: () => Promise; stop: () => Promise } | null = null; let broadcastConsumer: { start: () => Promise; stop: () => Promise } | null = null; let durableConsumer: { start: () => Promise; stop: () => Promise } | null = null; let directusStub: { url: string; close: () => Promise } | null = null; let metricsServer: http.Server | null = null; let dockerAvailable = true; beforeAll(async () => { // --- Step 1: Redis container ----------------------------------------------- try { redisContainer = await new GenericContainer('redis:7-alpine') .withExposedPorts(6379) .withWaitStrategy(Wait.forLogMessage('Ready to accept connections')) .start(); } catch { console.warn('[live.integration.test] Docker not available — skipping live integration tests'); dockerAvailable = false; return; } // --- Step 2: TimescaleDB container ----------------------------------------- try { pgContainer = await new GenericContainer('timescale/timescaledb-ha:pg16.6-ts2.17.2-all') .withExposedPorts(5432) .withEnvironment({ POSTGRES_USER: 'postgres', POSTGRES_PASSWORD: 'postgres', POSTGRES_DB: 'trm', }) .withWaitStrategy(Wait.forLogMessage('database system is ready to accept connections', 2)) .start(); } catch (err) { console.warn(`[live.integration.test] Failed to start TimescaleDB: ${String(err)} — skipping`); dockerAvailable = false; await redisContainer?.stop().catch(() => {}); redisContainer = null; return; } const redisHost = redisContainer.getHost(); const redisPort = redisContainer.getMappedPort(6379); const pgHost = pgContainer.getHost(); const pgPort = pgContainer.getMappedPort(5432); const redisUrl = `redis://${redisHost}:${redisPort}`; const postgresUrl = `postgres://postgres:postgres@${pgHost}:${pgPort}/trm`; const logger = makeSilentLogger(); // --- Step 3: Directus stub ------------------------------------------------- directusStub = await createDirectusStub({ allowedCookieToUser: new Map([ [COOKIE_A, USER_A], [COOKIE_B, USER_B], ]), allowedEvents: new Map([ [USER_A.id, new Set([EVENT_ID])], // user A can access EVENT_ID only [USER_B.id, new Set([OTHER_EVENT_ID])], // user B cannot access EVENT_ID ]), }); // --- Step 4: Redis client for XADD in tests -------------------------------- const { default: IRedis } = await import('ioredis'); redisClientXadd = new IRedis(redisUrl, { enableOfflineQueue: false, lazyConnect: true, maxRetriesPerRequest: 0, }); await redisClientXadd.connect(); // --- Step 5: Postgres pool, migrations, test schema, seed ------------------ pgPool = createPool(postgresUrl); await connectWithRetry(pgPool, logger); await runMigrations(pgPool, logger); // Load the test-only schema (entry_devices, entries, events simplified tables). const fixtureSQL = await fs.readFile( path.join(import.meta.dirname ?? __dirname, 'fixtures', 'test-schema.sql'), 'utf-8', ); await pgPool.query(fixtureSQL); await seedDatabase(pgPool); // --- Step 6: Wire live broadcast pipeline ---------------------------------- const config = makeConfig({ REDIS_URL: redisUrl, POSTGRES_URL: postgresUrl, DIRECTUS_BASE_URL: directusStub.url, LIVE_WS_PORT: 0, // OS-assigned }); const metrics = createMetrics(); // Live server — bind to OS-assigned port. const authClient = createAuthClient(config, logger, metrics); const authzClient = createAuthzClient(config, logger, metrics); const snapshotProvider = createSnapshotProvider(pgPool, logger, metrics); const registry = createSubscriptionRegistry(authzClient, config, logger, metrics, snapshotProvider); const messageHandler = async ( conn: LiveConnection, message: InboundMessage, ): Promise => { if (message.type === 'subscribe') { await registry.subscribe(conn, message.topic, message.id); } else if (message.type === 'unsubscribe') { registry.unsubscribe(conn, message.topic, message.id); } }; liveServer = createLiveServer(config, logger, metrics, messageHandler, (conn) => { registry.onConnectionClose(conn); }, authClient); await liveServer.start(); // Get the actual bound port (LIVE_WS_PORT=0 means OS-assigned). // createLiveServer stores the server internally; we need to get the port. // The server is exposed via a dedicated port-query approach — use a fresh // HTTP request to /healthz on the WS server's port to discover it. // Actually, createLiveServer returns a LiveServer with a bound http.Server. // We can't directly get the port from LiveServer without reading it. // Workaround: bind to a fixed free port instead of 0. // Re-create with a specific free port discovered via a probe server. await liveServer.stop(); // Find a free port. const wsPort = await new Promise((resolve) => { const probe = http.createServer(); probe.listen(0, '127.0.0.1', () => { const port = (probe.address() as AddressInfo).port; probe.close(() => resolve(port)); }); }); const configWithPort = makeConfig({ REDIS_URL: redisUrl, POSTGRES_URL: postgresUrl, DIRECTUS_BASE_URL: directusStub.url, LIVE_WS_PORT: wsPort, }); liveServer = createLiveServer(configWithPort, logger, metrics, messageHandler, (conn) => { registry.onConnectionClose(conn); }, authClient); await liveServer.start(); wsUrl = `ws://127.0.0.1:${wsPort}`; // Device event map (uses test-seeded entry_devices). const deviceEventMap = createDeviceEventMap(pgPool, configWithPort, logger, metrics); await deviceEventMap.start(); // Broadcast consumer (live fan-out). const broadcastRedis = await connectRedis(redisUrl, logger); broadcastConsumer = createBroadcastConsumer( broadcastRedis, registry, deviceEventMap, configWithPort, logger, metrics, ); await broadcastConsumer.start(); // Durable-write consumer (keeps the stream moving; acks records so they // don't pile up in the broadcast group's PEL). const state = createDeviceStateStore(configWithPort, logger, metrics); const writer = createWriter(pgPool, configWithPort, logger, metrics); await ensureConsumerGroup(redisClientXadd, STREAM, GROUP, logger); const sink = async (records: ConsumedRecord[]): Promise => { for (const record of records) state.update(record.position); const results = await writer.write(records); return results .filter((r) => r.status === 'inserted' || r.status === 'duplicate') .map((r) => r.id); }; const consumerRedis = await connectRedis(redisUrl, logger); durableConsumer = createConsumer(consumerRedis, configWithPort, logger, metrics, sink); await durableConsumer.start(); // Start a dummy metrics server (needed to avoid process.exit in GracefulShutdown // patterns; not used by the test directly). metricsServer = http.createServer((_req, res) => res.writeHead(200).end('ok')); metricsServer.listen(0, '127.0.0.1'); }, 120_000); afterAll(async () => { await durableConsumer?.stop().catch(() => {}); await broadcastConsumer?.stop().catch(() => {}); await liveServer?.stop().catch(() => {}); await redisClientXadd?.quit().catch(() => {}); await pgPool?.end().catch(() => {}); await directusStub?.close().catch(() => {}); await new Promise((res) => (metricsServer?.close(() => res()) ?? res())); await redisContainer?.stop().catch(() => {}); await pgContainer?.stop().catch(() => {}); }, 60_000); // --------------------------------------------------------------------------- // Integration tests // --------------------------------------------------------------------------- describe('live broadcast integration', () => { // ------------------------------------------------------------------------- // Test 1 — Happy path: subscribe → snapshot + live position // ------------------------------------------------------------------------- it('subscribes to an event, receives snapshot, then receives live position', async () => { if (!dockerAvailable) { console.warn('[live.integration.test] skipping test 1: Docker not available'); return; } const ws = await openClient(wsUrl, COOKIE_A); try { // Subscribe to the seeded event. ws.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}`, id: 'req-1' })); // Expect `subscribed` with a non-empty snapshot (2 devices seeded). const subscribed = await waitForMessage(ws, isSubscribedMessage, 5_000); expect(subscribed.type).toBe('subscribed'); expect(subscribed.topic).toBe(`event:${EVENT_ID}`); expect(subscribed.id).toBe('req-1'); expect(subscribed.snapshot).toHaveLength(2); const snap1 = subscribed.snapshot.find((e) => e.deviceId === DEVICE_1); expect(snap1).toBeDefined(); expect(snap1!.lat).toBeCloseTo(41.33, 2); expect(snap1!.lon).toBeCloseTo(19.83, 2); // Publish a new live position for DEVICE_1. const liveTs = new Date('2026-06-01T12:00:00.000Z'); const position: Position = { device_id: DEVICE_1, timestamp: liveTs, latitude: 41.40, longitude: 19.90, altitude: 55, angle: 45, speed: 80, satellites: 10, priority: 0, attributes: {}, }; void redisClientXadd!.xadd(STREAM, '*', ...buildXaddFields(position)); // Expect a `position` frame within 5s. const posMsg = await waitForMessage(ws, isPositionMessage, 5_000); expect(posMsg.type).toBe('position'); expect(posMsg.topic).toBe(`event:${EVENT_ID}`); expect(posMsg.deviceId).toBe(DEVICE_1); expect(posMsg.lat).toBeCloseTo(41.40, 2); expect(posMsg.lon).toBeCloseTo(19.90, 2); expect(posMsg.ts).toBe(liveTs.getTime()); expect(posMsg.speed).toBe(80); } finally { ws.close(); } }, 30_000); // ------------------------------------------------------------------------- // Test 2 — Auth rejection: no cookie → HTTP 401 // ------------------------------------------------------------------------- it('rejects WS upgrade with HTTP 401 when no cookie is presented', async () => { if (!dockerAvailable) { console.warn('[live.integration.test] skipping test 2: Docker not available'); return; } // openClient throws on unexpected-response (non-101 upgrade). await expect(openClient(wsUrl)).rejects.toThrow(); }, 10_000); // ------------------------------------------------------------------------- // Test 3 — Forbidden subscription // ------------------------------------------------------------------------- it('receives error/forbidden when subscribing to an event the user cannot access', async () => { if (!dockerAvailable) { console.warn('[live.integration.test] skipping test 3: Docker not available'); return; } // USER_B can only access OTHER_EVENT_ID, not EVENT_ID. const ws = await openClient(wsUrl, COOKIE_B); try { ws.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}` })); const errorMsg = await waitForMessage(ws, isErrorMessage, 5_000); expect(errorMsg.type).toBe('error'); expect(errorMsg.code).toBe('forbidden'); expect(errorMsg.topic).toBe(`event:${EVENT_ID}`); } finally { ws.close(); } }, 10_000); // ------------------------------------------------------------------------- // Test 4 — Multi-client fan-out // ------------------------------------------------------------------------- it('delivers a live position to all subscribed clients on the same event', async () => { if (!dockerAvailable) { console.warn('[live.integration.test] skipping test 4: Docker not available'); return; } const ws1 = await openClient(wsUrl, COOKIE_A); const ws2 = await openClient(wsUrl, COOKIE_A); try { // Subscribe both clients to the same event. ws1.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}` })); ws2.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}` })); // Wait for both subscriptions to confirm. await waitForMessage(ws1, isSubscribedMessage, 5_000); await waitForMessage(ws2, isSubscribedMessage, 5_000); // Publish a live position for DEVICE_1. const liveTs = new Date('2026-06-01T13:00:00.000Z'); const position: Position = { device_id: DEVICE_1, timestamp: liveTs, latitude: 41.50, longitude: 19.95, altitude: 60, angle: 0, speed: 0, satellites: 10, priority: 0, attributes: {}, }; void redisClientXadd!.xadd(STREAM, '*', ...buildXaddFields(position)); // Both clients must receive the position frame. const [pos1, pos2] = await Promise.all([ waitForMessage(ws1, isPositionMessage, 5_000), waitForMessage(ws2, isPositionMessage, 5_000), ]); expect(pos1.deviceId).toBe(DEVICE_1); expect(pos2.deviceId).toBe(DEVICE_1); expect(pos1.ts).toBe(liveTs.getTime()); expect(pos2.ts).toBe(liveTs.getTime()); } finally { ws1.close(); ws2.close(); } }, 30_000); // ------------------------------------------------------------------------- // Test 5 — Orphan position: device not in entry_devices // ------------------------------------------------------------------------- it('does not deliver a position for an unregistered device; client receives no frame', async () => { if (!dockerAvailable) { console.warn('[live.integration.test] skipping test 5: Docker not available'); return; } const ws = await openClient(wsUrl, COOKIE_A); try { ws.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}` })); await waitForMessage(ws, isSubscribedMessage, 5_000); // Publish a position for DEVICE_ORPHAN (not in entry_devices). const position: Position = { device_id: DEVICE_ORPHAN, timestamp: new Date('2026-06-01T14:00:00.000Z'), latitude: 42.00, longitude: 20.00, altitude: 60, angle: 0, speed: 0, satellites: 8, priority: 0, attributes: {}, }; void redisClientXadd!.xadd(STREAM, '*', ...buildXaddFields(position)); // Wait 2s — no position frame should arrive for this orphan device. const noFrame = await Promise.race([ waitForMessage(ws, isPositionMessage, 2_000).then(() => 'received'), new Promise<'timeout'>((resolve) => setTimeout(() => resolve('timeout'), 2_100)), ]); expect(noFrame).toBe('timeout'); } finally { ws.close(); } }, 15_000); // ------------------------------------------------------------------------- // Test 6 — Faulty snapshot exclusion // ------------------------------------------------------------------------- it('excludes faulty positions from the snapshot; uses next-best non-faulty position', async () => { if (!dockerAvailable || !pgPool) { console.warn('[live.integration.test] skipping test 6: Docker not available'); return; } // Mark DEVICE_1's most recent position (10:00:00) as faulty. await pgPool.query( `UPDATE positions SET faulty = true WHERE device_id = $1 AND ts = '2026-05-01T10:00:00Z'`, [DEVICE_1], ); try { const ws = await openClient(wsUrl, COOKIE_A); try { ws.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}` })); const subscribed = await waitForMessage(ws, isSubscribedMessage, 5_000); // DEVICE_1's most recent faulty row is excluded; the next non-faulty // (09:00:00 at lat 41.30) should be returned instead. const snap1 = subscribed.snapshot.find((e) => e.deviceId === DEVICE_1); expect(snap1).toBeDefined(); expect(snap1!.lat).toBeCloseTo(41.30, 2); expect(snap1!.lon).toBeCloseTo(19.80, 2); } finally { ws.close(); } } finally { // Restore: un-mark the faulty position so it doesn't affect other tests. await pgPool.query( `UPDATE positions SET faulty = false WHERE device_id = $1 AND ts = '2026-05-01T10:00:00Z'`, [DEVICE_1], ); } }, 15_000); });