Files
processor/test/live.integration.test.ts
julian 87dec03d3c feat(live): task 1.5.6 — live broadcast integration test
Adds end-to-end integration test for the WebSocket live broadcast pipeline:
Redis + TimescaleDB containers + Directus stub → full pipeline boot → real
WS client assertions. Mirrors pipeline.integration.test.ts pattern with the
skip-on-no-Docker guard.

Key additions:
- test/live.integration.test.ts: 6 test scenarios — happy path (subscribe →
  snapshot → live position), auth rejection (401), forbidden subscription
  (error/forbidden), multi-client fan-out (both receive position), orphan
  position (no WS frame), faulty snapshot exclusion (next-best non-faulty)
- test/helpers/directus-stub.ts: bare http.createServer stub for /users/me
  and /items/events/:id endpoints with cookie-based user lookup
- test/fixtures/test-schema.sql: minimal schema subset (events, entries,
  entry_devices with IMEI-as-device_id for Phase 1 join semantics)

The integration test runs via `pnpm test:integration`, not `pnpm test`.
Docker required; the suite skips cleanly when Docker is unavailable.
2026-05-02 18:38:53 +02:00

691 lines
26 KiB
TypeScript

/**
* Integration test: live broadcast pipeline end-to-end.
*
* Spins up Redis 7-alpine + TimescaleDB-HA containers, starts an HTTP server
* impersonating Directus (/users/me + /items/events/:id), boots the full live
* broadcast pipeline (auth, registry, snapshot, broadcast consumer), and verifies
* the WebSocket protocol from the perspective of a real WS client.
*
* Skip-on-no-Docker: same pattern as pipeline.integration.test.ts.
* Each `it` block has an explicit `if (!dockerAvailable) return` guard.
*
* Tests:
* 1. Happy path: subscribe → snapshot with seeded positions → live position frame.
* 2. Auth rejection: connect without cookie → HTTP 401.
* 3. Forbidden subscription: valid user, unauthorized event → error/forbidden.
* 4. Multi-client fan-out: two clients subscribed → both receive the position.
* 5. Orphan position: device not in entry_devices → no WS frame.
* 6. Faulty snapshot exclusion: faulty position is excluded; next-best is returned.
*/
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import { GenericContainer, type StartedTestContainer, Wait } from 'testcontainers';
import { WebSocket } from 'ws';
import * as fs from 'node:fs/promises';
import * as path from 'node:path';
import * as http from 'node:http';
import type { Redis } from 'ioredis';
import type pg from 'pg';
import { vi } from 'vitest';
import type { Logger } from 'pino';
import type { Config } from '../src/config/load.js';
import type { Position } from '../src/shared/types.js';
import { createPool, connectWithRetry } from '../src/db/pool.js';
import { runMigrations } from '../src/db/migrate.js';
import { connectRedis } from '../src/core/consumer.js';
import { createMetrics } from '../src/observability/metrics.js';
import { createDeviceStateStore } from '../src/core/state.js';
import { createWriter } from '../src/core/writer.js';
import { createConsumer, ensureConsumerGroup } from '../src/core/consumer.js';
import { createLiveServer } from '../src/live/server.js';
import { createAuthClient } from '../src/live/auth.js';
import { createAuthzClient } from '../src/live/authz.js';
import { createSubscriptionRegistry } from '../src/live/registry.js';
import { createSnapshotProvider } from '../src/live/snapshot.js';
import { createBroadcastConsumer } from '../src/live/broadcast.js';
import { createDeviceEventMap } from '../src/live/device-event-map.js';
import { createDirectusStub } from './helpers/directus-stub.js';
import type { FakeUser } from './helpers/directus-stub.js';
import type { ConsumedRecord } from '../src/core/consumer.js';
import type { LiveConnection } from '../src/live/server.js';
import type { InboundMessage } from '../src/live/protocol.js';
import type { AddressInfo } from 'node:net';
import type { SubscribedMessage, PositionMessage, ErrorMessage } from '../src/live/protocol.js';
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
const STREAM = 'telemetry:teltonika';
const GROUP = 'processor';
const BROADCAST_GROUP_PREFIX = 'live-broadcast';
const EVENT_ID = 'ee000000-0000-0000-0000-000000000001';
const OTHER_EVENT_ID = 'ee000000-0000-0000-0000-000000000002';
const ENTRY_ID = 'aa000000-0000-0000-0000-000000000001';
const DEVICE_1 = '111111111111111'; // IMEI
const DEVICE_2 = '222222222222222'; // IMEI
const DEVICE_ORPHAN = '999999999999999'; // not in entry_devices
const USER_A: FakeUser = {
id: 'user-aaaa-0000-0000-0000-000000000001',
email: 'user-a@test.com',
role: null,
first_name: 'User',
last_name: 'A',
};
const USER_B: FakeUser = {
id: 'user-bbbb-0000-0000-0000-000000000002',
email: 'user-b@test.com',
role: null,
first_name: 'User',
last_name: 'B',
};
const COOKIE_A = 'session=valid-user-a';
const COOKIE_B = 'session=valid-user-b';
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeSilentLogger(): Logger {
return {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
fatal: vi.fn(),
trace: vi.fn(),
child: vi.fn().mockReturnThis(),
level: 'silent',
silent: vi.fn(),
} as unknown as Logger;
}
function makeConfig(overrides: Partial<Config> = {}): Config {
return {
NODE_ENV: 'test',
INSTANCE_ID: 'test-live-integration',
LOG_LEVEL: 'silent',
REDIS_URL: 'redis://localhost:6379',
POSTGRES_URL: 'postgres://postgres:postgres@localhost:5432/trm',
REDIS_TELEMETRY_STREAM: STREAM,
REDIS_CONSUMER_GROUP: GROUP,
REDIS_CONSUMER_NAME: 'test-consumer',
METRICS_PORT: 0,
BATCH_SIZE: 100,
BATCH_BLOCK_MS: 500,
WRITE_BATCH_SIZE: 50,
DEVICE_STATE_LRU_CAP: 10_000,
LIVE_WS_PORT: 0, // OS-assigned; overridden below with the actual port
LIVE_WS_HOST: '127.0.0.1',
LIVE_WS_PING_INTERVAL_MS: 60_000,
LIVE_WS_DRAIN_TIMEOUT_MS: 2_000,
LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES: 1_048_576,
DIRECTUS_BASE_URL: 'http://localhost:8055', // overridden below
DIRECTUS_AUTH_TIMEOUT_MS: 5_000,
DIRECTUS_AUTHZ_TIMEOUT_MS: 5_000,
LIVE_BROADCAST_GROUP_PREFIX: BROADCAST_GROUP_PREFIX,
LIVE_BROADCAST_BATCH_SIZE: 100,
LIVE_BROADCAST_BATCH_BLOCK_MS: 500,
LIVE_DEVICE_EVENT_REFRESH_MS: 5_000, // faster for tests
...overrides,
};
}
/**
* Serializes a Position into the flat field list for XADD.
* Mirrors tcp-ingestion's serializePosition format.
*/
function buildXaddFields(position: Position): string[] {
function jsonReplacer(_key: string, value: unknown): unknown {
if (typeof value === 'bigint') return { __bigint: value.toString() };
if (value instanceof Uint8Array) return { __buffer_b64: Buffer.from(value).toString('base64') };
if (value instanceof Date) return value.toISOString();
return value;
}
return [
'ts', position.timestamp.toISOString(),
'device_id', position.device_id,
'codec', '8',
'payload', JSON.stringify(position, jsonReplacer),
];
}
/**
* Waits for the next WS message matching `predicate`, with a timeout.
*/
async function waitForMessage<T>(
ws: WebSocket,
predicate: (msg: unknown) => msg is T,
timeoutMs = 5_000,
): Promise<T> {
return new Promise<T>((resolve, reject) => {
const timer = setTimeout(
() => reject(new Error(`Timeout after ${timeoutMs}ms waiting for matching WS message`)),
timeoutMs,
);
const handler = (data: Buffer | string): void => {
const msg: unknown = JSON.parse(data.toString());
if (predicate(msg)) {
clearTimeout(timer);
ws.off('message', handler);
resolve(msg);
}
};
ws.on('message', handler);
});
}
function isSubscribedMessage(msg: unknown): msg is SubscribedMessage {
return typeof msg === 'object' && msg !== null && (msg as Record<string, unknown>)['type'] === 'subscribed';
}
function isPositionMessage(msg: unknown): msg is PositionMessage {
return typeof msg === 'object' && msg !== null && (msg as Record<string, unknown>)['type'] === 'position';
}
function isErrorMessage(msg: unknown): msg is ErrorMessage {
return typeof msg === 'object' && msg !== null && (msg as Record<string, unknown>)['type'] === 'error';
}
/**
* Opens a WebSocket client and waits for the connection to be established.
*/
async function openClient(wsUrl: string, cookie?: string): Promise<WebSocket> {
return new Promise<WebSocket>((resolve, reject) => {
const ws = new WebSocket(wsUrl, {
headers: cookie ? { cookie } : undefined,
});
ws.once('open', () => resolve(ws));
ws.once('error', (err) => reject(err));
ws.once('unexpected-response', (_req, res) => {
reject(new Error(`WS upgrade rejected with HTTP ${res.statusCode}`));
});
});
}
// ---------------------------------------------------------------------------
// Test fixture — seed data
// ---------------------------------------------------------------------------
async function seedDatabase(pool: pg.Pool): Promise<void> {
// events
await pool.query(`INSERT INTO events (id) VALUES ($1), ($2)`, [EVENT_ID, OTHER_EVENT_ID]);
// entries — DEVICE_1 and DEVICE_2 are registered to EVENT_ID
await pool.query(
`INSERT INTO entries (id, event_id) VALUES ($1, $2)`,
[ENTRY_ID, EVENT_ID],
);
// entry_devices — Phase 1 uses IMEI as device_id
await pool.query(
`INSERT INTO entry_devices (id, entry_id, device_id) VALUES
(gen_random_uuid(), $1, $2),
(gen_random_uuid(), $1, $3)`,
[ENTRY_ID, DEVICE_1, DEVICE_2],
);
// positions for DEVICE_1 (two: one non-faulty, one faulty)
await pool.query(
`INSERT INTO positions (device_id, ts, latitude, longitude, altitude, angle, speed, satellites, priority, codec, attributes)
VALUES
($1, '2026-05-01T10:00:00Z', 41.33, 19.83, 50, 90, 60, 8, 0, '8', '{}'),
($1, '2026-05-01T09:00:00Z', 41.30, 19.80, 50, 0, 0, 8, 0, '8', '{}')`,
[DEVICE_1],
);
// positions for DEVICE_2 (one non-faulty)
await pool.query(
`INSERT INTO positions (device_id, ts, latitude, longitude, altitude, angle, speed, satellites, priority, codec, attributes)
VALUES ($1, '2026-05-01T10:00:00Z', 41.34, 19.84, 50, 0, 0, 8, 0, '8', '{}')`,
[DEVICE_2],
);
}
// ---------------------------------------------------------------------------
// Container and pipeline lifecycle
// ---------------------------------------------------------------------------
let redisContainer: StartedTestContainer | null = null;
let pgContainer: StartedTestContainer | null = null;
let redisClientXadd: Redis | null = null;
let pgPool: pg.Pool | null = null;
let wsUrl = '';
let liveServer: { start: () => Promise<void>; stop: () => Promise<void> } | null = null;
let broadcastConsumer: { start: () => Promise<void>; stop: () => Promise<void> } | null = null;
let durableConsumer: { start: () => Promise<void>; stop: () => Promise<void> } | null = null;
let directusStub: { url: string; close: () => Promise<void> } | null = null;
let metricsServer: http.Server | null = null;
let dockerAvailable = true;
beforeAll(async () => {
// --- Step 1: Redis container -----------------------------------------------
try {
redisContainer = await new GenericContainer('redis:7-alpine')
.withExposedPorts(6379)
.withWaitStrategy(Wait.forLogMessage('Ready to accept connections'))
.start();
} catch {
console.warn('[live.integration.test] Docker not available — skipping live integration tests');
dockerAvailable = false;
return;
}
// --- Step 2: TimescaleDB container -----------------------------------------
try {
pgContainer = await new GenericContainer('timescale/timescaledb-ha:pg16.6-ts2.17.2-all')
.withExposedPorts(5432)
.withEnvironment({
POSTGRES_USER: 'postgres',
POSTGRES_PASSWORD: 'postgres',
POSTGRES_DB: 'trm',
})
.withWaitStrategy(Wait.forLogMessage('database system is ready to accept connections', 2))
.start();
} catch (err) {
console.warn(`[live.integration.test] Failed to start TimescaleDB: ${String(err)} — skipping`);
dockerAvailable = false;
await redisContainer?.stop().catch(() => {});
redisContainer = null;
return;
}
const redisHost = redisContainer.getHost();
const redisPort = redisContainer.getMappedPort(6379);
const pgHost = pgContainer.getHost();
const pgPort = pgContainer.getMappedPort(5432);
const redisUrl = `redis://${redisHost}:${redisPort}`;
const postgresUrl = `postgres://postgres:postgres@${pgHost}:${pgPort}/trm`;
const logger = makeSilentLogger();
// --- Step 3: Directus stub -------------------------------------------------
directusStub = await createDirectusStub({
allowedCookieToUser: new Map([
[COOKIE_A, USER_A],
[COOKIE_B, USER_B],
]),
allowedEvents: new Map([
[USER_A.id, new Set([EVENT_ID])], // user A can access EVENT_ID only
[USER_B.id, new Set([OTHER_EVENT_ID])], // user B cannot access EVENT_ID
]),
});
// --- Step 4: Redis client for XADD in tests --------------------------------
const { default: IRedis } = await import('ioredis');
redisClientXadd = new IRedis(redisUrl, {
enableOfflineQueue: false,
lazyConnect: true,
maxRetriesPerRequest: 0,
});
await redisClientXadd.connect();
// --- Step 5: Postgres pool, migrations, test schema, seed ------------------
pgPool = createPool(postgresUrl);
await connectWithRetry(pgPool, logger);
await runMigrations(pgPool, logger);
// Load the test-only schema (entry_devices, entries, events simplified tables).
const fixtureSQL = await fs.readFile(
path.join(import.meta.dirname ?? __dirname, 'fixtures', 'test-schema.sql'),
'utf-8',
);
await pgPool.query(fixtureSQL);
await seedDatabase(pgPool);
// --- Step 6: Wire live broadcast pipeline ----------------------------------
const config = makeConfig({
REDIS_URL: redisUrl,
POSTGRES_URL: postgresUrl,
DIRECTUS_BASE_URL: directusStub.url,
LIVE_WS_PORT: 0, // OS-assigned
});
const metrics = createMetrics();
// Live server — bind to OS-assigned port.
const authClient = createAuthClient(config, logger, metrics);
const authzClient = createAuthzClient(config, logger, metrics);
const snapshotProvider = createSnapshotProvider(pgPool, logger, metrics);
const registry = createSubscriptionRegistry(authzClient, config, logger, metrics, snapshotProvider);
const messageHandler = async (
conn: LiveConnection,
message: InboundMessage,
): Promise<void> => {
if (message.type === 'subscribe') {
await registry.subscribe(conn, message.topic, message.id);
} else if (message.type === 'unsubscribe') {
registry.unsubscribe(conn, message.topic, message.id);
}
};
liveServer = createLiveServer(config, logger, metrics, messageHandler, (conn) => {
registry.onConnectionClose(conn);
}, authClient);
await liveServer.start();
// Get the actual bound port (LIVE_WS_PORT=0 means OS-assigned).
// createLiveServer stores the server internally; we need to get the port.
// The server is exposed via a dedicated port-query approach — use a fresh
// HTTP request to /healthz on the WS server's port to discover it.
// Actually, createLiveServer returns a LiveServer with a bound http.Server.
// We can't directly get the port from LiveServer without reading it.
// Workaround: bind to a fixed free port instead of 0.
// Re-create with a specific free port discovered via a probe server.
await liveServer.stop();
// Find a free port.
const wsPort = await new Promise<number>((resolve) => {
const probe = http.createServer();
probe.listen(0, '127.0.0.1', () => {
const port = (probe.address() as AddressInfo).port;
probe.close(() => resolve(port));
});
});
const configWithPort = makeConfig({
REDIS_URL: redisUrl,
POSTGRES_URL: postgresUrl,
DIRECTUS_BASE_URL: directusStub.url,
LIVE_WS_PORT: wsPort,
});
liveServer = createLiveServer(configWithPort, logger, metrics, messageHandler, (conn) => {
registry.onConnectionClose(conn);
}, authClient);
await liveServer.start();
wsUrl = `ws://127.0.0.1:${wsPort}`;
// Device event map (uses test-seeded entry_devices).
const deviceEventMap = createDeviceEventMap(pgPool, configWithPort, logger, metrics);
await deviceEventMap.start();
// Broadcast consumer (live fan-out).
const broadcastRedis = await connectRedis(redisUrl, logger);
broadcastConsumer = createBroadcastConsumer(
broadcastRedis, registry, deviceEventMap, configWithPort, logger, metrics,
);
await broadcastConsumer.start();
// Durable-write consumer (keeps the stream moving; acks records so they
// don't pile up in the broadcast group's PEL).
const state = createDeviceStateStore(configWithPort, logger, metrics);
const writer = createWriter(pgPool, configWithPort, logger, metrics);
await ensureConsumerGroup(redisClientXadd, STREAM, GROUP, logger);
const sink = async (records: ConsumedRecord[]): Promise<string[]> => {
for (const record of records) state.update(record.position);
const results = await writer.write(records);
return results
.filter((r) => r.status === 'inserted' || r.status === 'duplicate')
.map((r) => r.id);
};
const consumerRedis = await connectRedis(redisUrl, logger);
durableConsumer = createConsumer(consumerRedis, configWithPort, logger, metrics, sink);
await durableConsumer.start();
// Start a dummy metrics server (needed to avoid process.exit in GracefulShutdown
// patterns; not used by the test directly).
metricsServer = http.createServer((_req, res) => res.writeHead(200).end('ok'));
metricsServer.listen(0, '127.0.0.1');
}, 120_000);
afterAll(async () => {
await durableConsumer?.stop().catch(() => {});
await broadcastConsumer?.stop().catch(() => {});
await liveServer?.stop().catch(() => {});
await redisClientXadd?.quit().catch(() => {});
await pgPool?.end().catch(() => {});
await directusStub?.close().catch(() => {});
await new Promise<void>((res) => (metricsServer?.close(() => res()) ?? res()));
await redisContainer?.stop().catch(() => {});
await pgContainer?.stop().catch(() => {});
}, 60_000);
// ---------------------------------------------------------------------------
// Integration tests
// ---------------------------------------------------------------------------
describe('live broadcast integration', () => {
// -------------------------------------------------------------------------
// Test 1 — Happy path: subscribe → snapshot + live position
// -------------------------------------------------------------------------
it('subscribes to an event, receives snapshot, then receives live position', async () => {
if (!dockerAvailable) {
console.warn('[live.integration.test] skipping test 1: Docker not available');
return;
}
const ws = await openClient(wsUrl, COOKIE_A);
try {
// Subscribe to the seeded event.
ws.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}`, id: 'req-1' }));
// Expect `subscribed` with a non-empty snapshot (2 devices seeded).
const subscribed = await waitForMessage<SubscribedMessage>(ws, isSubscribedMessage, 5_000);
expect(subscribed.type).toBe('subscribed');
expect(subscribed.topic).toBe(`event:${EVENT_ID}`);
expect(subscribed.id).toBe('req-1');
expect(subscribed.snapshot).toHaveLength(2);
const snap1 = subscribed.snapshot.find((e) => e.deviceId === DEVICE_1);
expect(snap1).toBeDefined();
expect(snap1!.lat).toBeCloseTo(41.33, 2);
expect(snap1!.lon).toBeCloseTo(19.83, 2);
// Publish a new live position for DEVICE_1.
const liveTs = new Date('2026-06-01T12:00:00.000Z');
const position: Position = {
device_id: DEVICE_1,
timestamp: liveTs,
latitude: 41.40,
longitude: 19.90,
altitude: 55,
angle: 45,
speed: 80,
satellites: 10,
priority: 0,
attributes: {},
};
void redisClientXadd!.xadd(STREAM, '*', ...buildXaddFields(position));
// Expect a `position` frame within 5s.
const posMsg = await waitForMessage<PositionMessage>(ws, isPositionMessage, 5_000);
expect(posMsg.type).toBe('position');
expect(posMsg.topic).toBe(`event:${EVENT_ID}`);
expect(posMsg.deviceId).toBe(DEVICE_1);
expect(posMsg.lat).toBeCloseTo(41.40, 2);
expect(posMsg.lon).toBeCloseTo(19.90, 2);
expect(posMsg.ts).toBe(liveTs.getTime());
expect(posMsg.speed).toBe(80);
} finally {
ws.close();
}
}, 30_000);
// -------------------------------------------------------------------------
// Test 2 — Auth rejection: no cookie → HTTP 401
// -------------------------------------------------------------------------
it('rejects WS upgrade with HTTP 401 when no cookie is presented', async () => {
if (!dockerAvailable) {
console.warn('[live.integration.test] skipping test 2: Docker not available');
return;
}
// openClient throws on unexpected-response (non-101 upgrade).
await expect(openClient(wsUrl)).rejects.toThrow();
}, 10_000);
// -------------------------------------------------------------------------
// Test 3 — Forbidden subscription
// -------------------------------------------------------------------------
it('receives error/forbidden when subscribing to an event the user cannot access', async () => {
if (!dockerAvailable) {
console.warn('[live.integration.test] skipping test 3: Docker not available');
return;
}
// USER_B can only access OTHER_EVENT_ID, not EVENT_ID.
const ws = await openClient(wsUrl, COOKIE_B);
try {
ws.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}` }));
const errorMsg = await waitForMessage<ErrorMessage>(ws, isErrorMessage, 5_000);
expect(errorMsg.type).toBe('error');
expect(errorMsg.code).toBe('forbidden');
expect(errorMsg.topic).toBe(`event:${EVENT_ID}`);
} finally {
ws.close();
}
}, 10_000);
// -------------------------------------------------------------------------
// Test 4 — Multi-client fan-out
// -------------------------------------------------------------------------
it('delivers a live position to all subscribed clients on the same event', async () => {
if (!dockerAvailable) {
console.warn('[live.integration.test] skipping test 4: Docker not available');
return;
}
const ws1 = await openClient(wsUrl, COOKIE_A);
const ws2 = await openClient(wsUrl, COOKIE_A);
try {
// Subscribe both clients to the same event.
ws1.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}` }));
ws2.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}` }));
// Wait for both subscriptions to confirm.
await waitForMessage<SubscribedMessage>(ws1, isSubscribedMessage, 5_000);
await waitForMessage<SubscribedMessage>(ws2, isSubscribedMessage, 5_000);
// Publish a live position for DEVICE_1.
const liveTs = new Date('2026-06-01T13:00:00.000Z');
const position: Position = {
device_id: DEVICE_1,
timestamp: liveTs,
latitude: 41.50,
longitude: 19.95,
altitude: 60,
angle: 0,
speed: 0,
satellites: 10,
priority: 0,
attributes: {},
};
void redisClientXadd!.xadd(STREAM, '*', ...buildXaddFields(position));
// Both clients must receive the position frame.
const [pos1, pos2] = await Promise.all([
waitForMessage<PositionMessage>(ws1, isPositionMessage, 5_000),
waitForMessage<PositionMessage>(ws2, isPositionMessage, 5_000),
]);
expect(pos1.deviceId).toBe(DEVICE_1);
expect(pos2.deviceId).toBe(DEVICE_1);
expect(pos1.ts).toBe(liveTs.getTime());
expect(pos2.ts).toBe(liveTs.getTime());
} finally {
ws1.close();
ws2.close();
}
}, 30_000);
// -------------------------------------------------------------------------
// Test 5 — Orphan position: device not in entry_devices
// -------------------------------------------------------------------------
it('does not deliver a position for an unregistered device; client receives no frame', async () => {
if (!dockerAvailable) {
console.warn('[live.integration.test] skipping test 5: Docker not available');
return;
}
const ws = await openClient(wsUrl, COOKIE_A);
try {
ws.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}` }));
await waitForMessage<SubscribedMessage>(ws, isSubscribedMessage, 5_000);
// Publish a position for DEVICE_ORPHAN (not in entry_devices).
const position: Position = {
device_id: DEVICE_ORPHAN,
timestamp: new Date('2026-06-01T14:00:00.000Z'),
latitude: 42.00,
longitude: 20.00,
altitude: 60,
angle: 0,
speed: 0,
satellites: 8,
priority: 0,
attributes: {},
};
void redisClientXadd!.xadd(STREAM, '*', ...buildXaddFields(position));
// Wait 2s — no position frame should arrive for this orphan device.
const noFrame = await Promise.race([
waitForMessage<PositionMessage>(ws, isPositionMessage, 2_000).then(() => 'received'),
new Promise<'timeout'>((resolve) => setTimeout(() => resolve('timeout'), 2_100)),
]);
expect(noFrame).toBe('timeout');
} finally {
ws.close();
}
}, 15_000);
// -------------------------------------------------------------------------
// Test 6 — Faulty snapshot exclusion
// -------------------------------------------------------------------------
it('excludes faulty positions from the snapshot; uses next-best non-faulty position', async () => {
if (!dockerAvailable || !pgPool) {
console.warn('[live.integration.test] skipping test 6: Docker not available');
return;
}
// Mark DEVICE_1's most recent position (10:00:00) as faulty.
await pgPool.query(
`UPDATE positions SET faulty = true WHERE device_id = $1 AND ts = '2026-05-01T10:00:00Z'`,
[DEVICE_1],
);
try {
const ws = await openClient(wsUrl, COOKIE_A);
try {
ws.send(JSON.stringify({ type: 'subscribe', topic: `event:${EVENT_ID}` }));
const subscribed = await waitForMessage<SubscribedMessage>(ws, isSubscribedMessage, 5_000);
// DEVICE_1's most recent faulty row is excluded; the next non-faulty
// (09:00:00 at lat 41.30) should be returned instead.
const snap1 = subscribed.snapshot.find((e) => e.deviceId === DEVICE_1);
expect(snap1).toBeDefined();
expect(snap1!.lat).toBeCloseTo(41.30, 2);
expect(snap1!.lon).toBeCloseTo(19.80, 2);
} finally {
ws.close();
}
} finally {
// Restore: un-mark the faulty position so it doesn't affect other tests.
await pgPool.query(
`UPDATE positions SET faulty = false WHERE device_id = $1 AND ts = '2026-05-01T10:00:00Z'`,
[DEVICE_1],
);
}
}, 15_000);
});