# Task 1.5.6 — Integration test (testcontainers Redis + Postgres + Directus stub) **Phase:** 1.5 — Live broadcast **Status:** 🟩 Done **Depends on:** 1.5.4, 1.5.5 **Wiki refs:** — ## Goal End-to-end pipeline test: spin up Redis 7 + TimescaleDB + a stub HTTP server impersonating Directus's `/users/me` and `/items/events/` endpoints, boot the Processor against them, connect a real WebSocket client with a cookie, subscribe to an event, publish a synthetic position to `telemetry:teltonika`, verify the client receives both the snapshot and the streamed position. This is the test that proves the live channel composes correctly end-to-end — auth handshake, subscription registry, snapshot, broadcast fan-out all integrated. Mirror Phase 1's `pipeline.integration.test.ts` for structure and skip-on-no-Docker pattern. ## Deliverables - `test/live.integration.test.ts`: - `beforeAll`: start Redis + TimescaleDB containers, start a tiny HTTP server impersonating Directus on a random port (acts as the auth + authz endpoint), seed `entry_devices` + `entries` + a few `positions` rows, boot a Processor instance pointed at all three. Skip cleanly if Docker is unavailable. - `afterAll`: stop the Processor, the Directus stub, and both containers. - **Test 1 — Happy path:** WS client connects with a valid cookie → subscribes to a seeded event → receives `subscribed` with a non-empty snapshot containing the seeded positions → publishes a synthetic position to Redis → receives the corresponding `position` frame within 1s. - **Test 2 — Auth rejection:** WS client connects without a cookie → upgrade fails with HTTP 401. - **Test 3 — Forbidden subscription:** Client with a cookie scoped to user A → subscribes to an event in an organization user A doesn't belong to → receives `error/forbidden` (Directus stub returns 403 for that user-event pair). - **Test 4 — Multi-client fan-out:** Two clients subscribed to the same event → publishing one position results in both clients receiving the `position` frame. - **Test 5 — Orphan position:** Publishing a position for a device that's not on `entry_devices` increments `processor_live_broadcast_orphan_records_total` and reaches no client. - **Test 6 — Faulty-flagged snapshot exclusion:** Mark a seeded position `faulty=true` directly in Postgres, subscribe, verify the snapshot uses the next-most-recent non-faulty position (or omits the device if none exists). - `test/helpers/directus-stub.ts`: - A minimal Express-or-bare-`http.createServer` stub that responds to: - `GET /users/me` — returns 200 + a fake user payload if a configured cookie is present, 401 otherwise. - `GET /items/events/:id` — returns 200 if the (cookie, eventId) pair is in an allow-list, 403 otherwise. - Exposed as `createDirectusStub({ allowedCookieToUser: Map, allowedEvents: Map> }): { url: string; close: () => Promise }`. - `vitest.integration.config.ts` — the Phase 1 config already exists; extend the `testTimeout` if needed (the live test may need ~30s for the first-position round-trip on a cold cache). ## Specification ### Skip-on-no-Docker pattern Same as Phase 1's `pipeline.integration.test.ts`: ```ts let dockerAvailable = true; beforeAll(async () => { try { redisContainer = await new GenericContainer('redis:7').withExposedPorts(6379).start(); } catch (err) { dockerAvailable = false; console.warn('docker unavailable; skipping live integration tests'); return; } // ... rest of setup }, 120_000); it('happy path', async () => { if (!dockerAvailable) return; // ... real test }); ``` ### Directus stub shape The stub is intentionally tiny — we're not testing Directus, we're testing the Processor's interaction with whatever Directus returns. Two endpoints, hardcoded responses: ```ts function createDirectusStub(opts: StubOpts): { url: string; close: () => Promise } { const server = http.createServer(async (req, res) => { const cookie = req.headers.cookie ?? ''; const user = opts.allowedCookieToUser.get(cookie); if (req.url === '/users/me') { if (!user) { res.writeHead(401).end(); return; } res.writeHead(200, { 'content-type': 'application/json' }); res.end(JSON.stringify({ data: user })); return; } const eventMatch = /^\/items\/events\/([0-9a-f-]+)/.exec(req.url ?? ''); if (eventMatch) { if (!user) { res.writeHead(401).end(); return; } const eventId = eventMatch[1]; const allowed = opts.allowedEvents.get(user.id)?.has(eventId); if (!allowed) { res.writeHead(403).end(); return; } res.writeHead(200, { 'content-type': 'application/json' }); res.end(JSON.stringify({ data: { id: eventId } })); return; } res.writeHead(404).end(); }); return new Promise((resolve) => { server.listen(0, () => { const addr = server.address() as AddressInfo; resolve({ url: `http://localhost:${addr.port}`, close: () => new Promise((res) => server.close(() => res())), }); }); }); } ``` This is ~40 lines of test infra. Don't pull in Express; bare `http` is enough. ### Seeding data The integration test needs realistic-ish seed data: at least one organization, one event, two `entries`, four `entry_devices` (so at least one device-to-event mapping per entry), and some positions for some of the devices. Use a seed helper: ```ts async function seed(pool: Pool) { await pool.query(`INSERT INTO organizations (id, name, slug) VALUES ($1, 'Test Org', 'test-org')`, [TEST_ORG_ID]); await pool.query(`INSERT INTO events (id, organization_id, name, slug, discipline, starts_at, ends_at) VALUES ($1, $2, 'Test Event', 'test-event', 'rally', '2026-01-01', '2026-12-31')`, [TEST_EVENT_ID, TEST_ORG_ID]); // ... etc. await pool.query(`INSERT INTO positions (device_id, ts, latitude, longitude, faulty) VALUES ($1, '2026-05-02T11:00:00Z', 41.327, 19.819, false), ($1, '2026-05-02T11:01:00Z', 41.328, 19.820, false), ($2, '2026-05-02T11:00:30Z', 41.330, 19.825, false)`, [TEST_DEVICE_1, TEST_DEVICE_2]); } ``` Schema-creation in the integration test reuses the same migration runner that production uses. **Don't reach into `db-init/` or Directus's snapshot YAML** from this test — the test is for the Processor, not the schema. Stub the minimum subset of Directus-managed tables in a setup migration that runs only in the test environment, OR (cleaner) point the test's `pool` at a Postgres that already has the schema loaded via a fixture SQL file. The cleanest option: a single `test/fixtures/test-schema.sql` file that creates the minimum subset (organizations, events, entries, entry_devices, devices, positions) the integration test needs. Run it once in `beforeAll`. The duplication with the real schema is bounded — these collections are stable. ### WebSocket client Use `ws`'s client mode (`new WebSocket(url, { headers: { cookie: '...' } })`). Set up an `on('message')` listener that pushes to an array; tests read from the array with a `waitForMessage(predicate, timeout)` helper: ```ts async function waitForMessage( ws: WebSocket, predicate: (msg: any) => msg is T, timeoutMs: number = 5_000 ): Promise { return new Promise((resolve, reject) => { const timer = setTimeout(() => reject(new Error('timeout waiting for message')), timeoutMs); const handler = (data: WebSocket.Data) => { const msg = JSON.parse(data.toString()); if (predicate(msg)) { clearTimeout(timer); ws.off('message', handler); resolve(msg); } }; ws.on('message', handler); }); } ``` This pattern is robust across the test suite — every test waits for a specific message shape, with a clear timeout error if the protocol breaks. ### Synthetic position publishing Reuse the `XADD` shape from Phase 1's `pipeline.integration.test.ts`. Helper: ```ts async function publishPosition(redis: Redis, position: Position) { await redis.xadd( config.REDIS_TELEMETRY_STREAM, '*', 'ts', position.ts.toISOString(), 'device_id', position.deviceId, 'codec', '8E', 'payload', JSON.stringify(serializeForStream(position)), ); } ``` The `serializeForStream` helper handles the bigint/Buffer sentinel encoding (already exists in Phase 1; reuse it). ## Acceptance criteria - [ ] `pnpm test:integration -- live` runs all six scenarios green when Docker is available. - [ ] Without Docker, the suite logs skip messages and exits 0 (does not fail). - [ ] First-run total time < 60s including container pulls; subsequent runs < 20s. - [ ] Each test cleans up after itself — no shared state between tests. - [ ] Tests don't depend on each other's order. ## Risks / open questions - **Schema duplication.** `test/fixtures/test-schema.sql` will drift from the real schema unless we have a discipline. Mitigation: comment at the top of the fixture says "this is a subset of the production schema for testing only; sync when production schema changes." Worth documenting in `OPERATIONS.md` (Phase 3) as a maintenance task. - **Test flakiness from polling.** Same caveat as Phase 1: prefer `waitForMessage` over `await sleep(N)`. The latter is reliably wrong. - **Image pull times in CI.** TimescaleDB image is large (~700MB). If integration tests run in CI, pre-pull. Phase 1's CI doesn't run integration; this phase doesn't change that — local + manual stage smoke is the gate. ## Done Landed in `2f2cf5c`. Key divergence from spec: `test/fixtures/test-schema.sql` uses `entry_devices.device_id TEXT` (IMEI) instead of UUID FK to devices, matching Phase 1's IMEI-as-device_id convention. The live server uses a two-step startup (probe server → fixed port) because LIVE_WS_PORT=0 doesn't expose the bound port via the LiveServer public interface. The metricsServer dummy prevents afterAll hanging on unclosed handles.