Files
julian fa50df3e27 docs(planning): mark Phase 1.5 live broadcast as Done
Tasks 1.5.4, 1.5.5, 1.5.6 marked 🟩 with commit hashes and implementation
notes. Phase 1.5 status updated to Done in ROADMAP.md.
2026-05-02 18:39:22 +02:00

9.8 KiB

Task 1.5.6 — Integration test (testcontainers Redis + Postgres + Directus stub)

Phase: 1.5 — Live broadcast Status: 🟩 Done Depends on: 1.5.4, 1.5.5 Wiki refs:

Goal

End-to-end pipeline test: spin up Redis 7 + TimescaleDB + a stub HTTP server impersonating Directus's /users/me and /items/events/<id> endpoints, boot the Processor against them, connect a real WebSocket client with a cookie, subscribe to an event, publish a synthetic position to telemetry:teltonika, verify the client receives both the snapshot and the streamed position.

This is the test that proves the live channel composes correctly end-to-end — auth handshake, subscription registry, snapshot, broadcast fan-out all integrated. Mirror Phase 1's pipeline.integration.test.ts for structure and skip-on-no-Docker pattern.

Deliverables

  • test/live.integration.test.ts:
    • beforeAll: start Redis + TimescaleDB containers, start a tiny HTTP server impersonating Directus on a random port (acts as the auth + authz endpoint), seed entry_devices + entries + a few positions rows, boot a Processor instance pointed at all three. Skip cleanly if Docker is unavailable.
    • afterAll: stop the Processor, the Directus stub, and both containers.
    • Test 1 — Happy path: WS client connects with a valid cookie → subscribes to a seeded event → receives subscribed with a non-empty snapshot containing the seeded positions → publishes a synthetic position to Redis → receives the corresponding position frame within 1s.
    • Test 2 — Auth rejection: WS client connects without a cookie → upgrade fails with HTTP 401.
    • Test 3 — Forbidden subscription: Client with a cookie scoped to user A → subscribes to an event in an organization user A doesn't belong to → receives error/forbidden (Directus stub returns 403 for that user-event pair).
    • Test 4 — Multi-client fan-out: Two clients subscribed to the same event → publishing one position results in both clients receiving the position frame.
    • Test 5 — Orphan position: Publishing a position for a device that's not on entry_devices increments processor_live_broadcast_orphan_records_total and reaches no client.
    • Test 6 — Faulty-flagged snapshot exclusion: Mark a seeded position faulty=true directly in Postgres, subscribe, verify the snapshot uses the next-most-recent non-faulty position (or omits the device if none exists).
  • test/helpers/directus-stub.ts:
    • A minimal Express-or-bare-http.createServer stub that responds to:
      • GET /users/me — returns 200 + a fake user payload if a configured cookie is present, 401 otherwise.
      • GET /items/events/:id — returns 200 if the (cookie, eventId) pair is in an allow-list, 403 otherwise.
    • Exposed as createDirectusStub({ allowedCookieToUser: Map<string, FakeUser>, allowedEvents: Map<userId, Set<eventId>> }): { url: string; close: () => Promise<void> }.
  • vitest.integration.config.ts — the Phase 1 config already exists; extend the testTimeout if needed (the live test may need ~30s for the first-position round-trip on a cold cache).

Specification

Skip-on-no-Docker pattern

Same as Phase 1's pipeline.integration.test.ts:

let dockerAvailable = true;
beforeAll(async () => {
  try {
    redisContainer = await new GenericContainer('redis:7').withExposedPorts(6379).start();
  } catch (err) {
    dockerAvailable = false;
    console.warn('docker unavailable; skipping live integration tests');
    return;
  }
  // ... rest of setup
}, 120_000);

it('happy path', async () => {
  if (!dockerAvailable) return;
  // ... real test
});

Directus stub shape

The stub is intentionally tiny — we're not testing Directus, we're testing the Processor's interaction with whatever Directus returns. Two endpoints, hardcoded responses:

function createDirectusStub(opts: StubOpts): { url: string; close: () => Promise<void> } {
  const server = http.createServer(async (req, res) => {
    const cookie = req.headers.cookie ?? '';
    const user = opts.allowedCookieToUser.get(cookie);

    if (req.url === '/users/me') {
      if (!user) {
        res.writeHead(401).end();
        return;
      }
      res.writeHead(200, { 'content-type': 'application/json' });
      res.end(JSON.stringify({ data: user }));
      return;
    }

    const eventMatch = /^\/items\/events\/([0-9a-f-]+)/.exec(req.url ?? '');
    if (eventMatch) {
      if (!user) { res.writeHead(401).end(); return; }
      const eventId = eventMatch[1];
      const allowed = opts.allowedEvents.get(user.id)?.has(eventId);
      if (!allowed) { res.writeHead(403).end(); return; }
      res.writeHead(200, { 'content-type': 'application/json' });
      res.end(JSON.stringify({ data: { id: eventId } }));
      return;
    }

    res.writeHead(404).end();
  });

  return new Promise((resolve) => {
    server.listen(0, () => {
      const addr = server.address() as AddressInfo;
      resolve({
        url: `http://localhost:${addr.port}`,
        close: () => new Promise((res) => server.close(() => res())),
      });
    });
  });
}

This is ~40 lines of test infra. Don't pull in Express; bare http is enough.

Seeding data

The integration test needs realistic-ish seed data: at least one organization, one event, two entries, four entry_devices (so at least one device-to-event mapping per entry), and some positions for some of the devices.

Use a seed helper:

async function seed(pool: Pool) {
  await pool.query(`INSERT INTO organizations (id, name, slug) VALUES ($1, 'Test Org', 'test-org')`, [TEST_ORG_ID]);
  await pool.query(`INSERT INTO events (id, organization_id, name, slug, discipline, starts_at, ends_at)
    VALUES ($1, $2, 'Test Event', 'test-event', 'rally', '2026-01-01', '2026-12-31')`, [TEST_EVENT_ID, TEST_ORG_ID]);
  // ... etc.

  await pool.query(`INSERT INTO positions (device_id, ts, latitude, longitude, faulty)
    VALUES ($1, '2026-05-02T11:00:00Z', 41.327, 19.819, false),
           ($1, '2026-05-02T11:01:00Z', 41.328, 19.820, false),
           ($2, '2026-05-02T11:00:30Z', 41.330, 19.825, false)`,
    [TEST_DEVICE_1, TEST_DEVICE_2]);
}

Schema-creation in the integration test reuses the same migration runner that production uses. Don't reach into db-init/ or Directus's snapshot YAML from this test — the test is for the Processor, not the schema. Stub the minimum subset of Directus-managed tables in a setup migration that runs only in the test environment, OR (cleaner) point the test's pool at a Postgres that already has the schema loaded via a fixture SQL file.

The cleanest option: a single test/fixtures/test-schema.sql file that creates the minimum subset (organizations, events, entries, entry_devices, devices, positions) the integration test needs. Run it once in beforeAll. The duplication with the real schema is bounded — these collections are stable.

WebSocket client

Use ws's client mode (new WebSocket(url, { headers: { cookie: '...' } })). Set up an on('message') listener that pushes to an array; tests read from the array with a waitForMessage(predicate, timeout) helper:

async function waitForMessage<T>(
  ws: WebSocket,
  predicate: (msg: any) => msg is T,
  timeoutMs: number = 5_000
): Promise<T> {
  return new Promise((resolve, reject) => {
    const timer = setTimeout(() => reject(new Error('timeout waiting for message')), timeoutMs);
    const handler = (data: WebSocket.Data) => {
      const msg = JSON.parse(data.toString());
      if (predicate(msg)) {
        clearTimeout(timer);
        ws.off('message', handler);
        resolve(msg);
      }
    };
    ws.on('message', handler);
  });
}

This pattern is robust across the test suite — every test waits for a specific message shape, with a clear timeout error if the protocol breaks.

Synthetic position publishing

Reuse the XADD shape from Phase 1's pipeline.integration.test.ts. Helper:

async function publishPosition(redis: Redis, position: Position) {
  await redis.xadd(
    config.REDIS_TELEMETRY_STREAM,
    '*',
    'ts', position.ts.toISOString(),
    'device_id', position.deviceId,
    'codec', '8E',
    'payload', JSON.stringify(serializeForStream(position)),
  );
}

The serializeForStream helper handles the bigint/Buffer sentinel encoding (already exists in Phase 1; reuse it).

Acceptance criteria

  • pnpm test:integration -- live runs all six scenarios green when Docker is available.
  • Without Docker, the suite logs skip messages and exits 0 (does not fail).
  • First-run total time < 60s including container pulls; subsequent runs < 20s.
  • Each test cleans up after itself — no shared state between tests.
  • Tests don't depend on each other's order.

Risks / open questions

  • Schema duplication. test/fixtures/test-schema.sql will drift from the real schema unless we have a discipline. Mitigation: comment at the top of the fixture says "this is a subset of the production schema for testing only; sync when production schema changes." Worth documenting in OPERATIONS.md (Phase 3) as a maintenance task.
  • Test flakiness from polling. Same caveat as Phase 1: prefer waitForMessage over await sleep(N). The latter is reliably wrong.
  • Image pull times in CI. TimescaleDB image is large (~700MB). If integration tests run in CI, pre-pull. Phase 1's CI doesn't run integration; this phase doesn't change that — local + manual stage smoke is the gate.

Done

Landed in 2f2cf5c. Key divergence from spec: test/fixtures/test-schema.sql uses entry_devices.device_id TEXT (IMEI) instead of UUID FK to devices, matching Phase 1's IMEI-as-device_id convention. The live server uses a two-step startup (probe server → fixed port) because LIVE_WS_PORT=0 doesn't expose the bound port via the LiveServer public interface. The metricsServer dummy prevents afterAll hanging on unclosed handles.