/** * Unit tests for src/live/registry.ts — subscription registry. * * The registry is instantiated with a mocked authz client and a mocked * sendOutbound path. LiveConnection objects are synthetic stubs. * * Covers: * - Subscribe to event: with permitted user → `subscribed` reply, * registry counts go up. * - Subscribe with forbidden user → `error/forbidden` reply, no registry change. * - Subscribe to `device:` → `error/unknown-topic`, no registry change. * - Subscribe twice to the same topic → idempotent (single subscription, * subscribed reply each call, gauge does not double-count). * - Unsubscribe from a topic → `unsubscribed` reply, gauge decrements. * - Unsubscribe from a topic not subscribed → `unsubscribed` reply (idempotent), * gauge unchanged. * - Connection close removes all subscriptions; gauge returns to pre-connection level. * - connectionsForTopic returns the correct set. * - topicsForConnection returns the correct set. */ import { describe, it, expect, vi, beforeEach } from 'vitest'; import type { Logger } from 'pino'; import type { Config } from '../src/config/load.js'; import type { Metrics } from '../src/core/types.js'; import { createSubscriptionRegistry } from '../src/live/registry.js'; import type { AuthzClient } from '../src/live/authz.js'; import type { LiveConnection } from '../src/live/server.js'; import WebSocket from 'ws'; // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- function makeSilentLogger(): Logger { return { debug: vi.fn(), info: vi.fn(), warn: vi.fn(), error: vi.fn(), fatal: vi.fn(), trace: vi.fn(), child: vi.fn().mockReturnThis(), level: 'silent', silent: vi.fn(), } as unknown as Logger; } type TestMetrics = Metrics & { readonly incCalls: Array<{ name: string; labels?: Record }>; readonly observeCalls: Array<{ name: string; value: number }>; }; function makeMetrics(): TestMetrics { const incCalls: Array<{ name: string; labels?: Record }> = []; const observeCalls: Array<{ name: string; value: number }> = []; return { incCalls, observeCalls, inc(name, labels) { incCalls.push({ name, labels }); }, observe(name, value) { observeCalls.push({ name, value }); }, }; } function makeConfig(): Config { return { NODE_ENV: 'test', INSTANCE_ID: 'test-1', LOG_LEVEL: 'silent', REDIS_URL: 'redis://localhost:6379', POSTGRES_URL: 'postgres://localhost:5432/test', REDIS_TELEMETRY_STREAM: 'telemetry:t', REDIS_CONSUMER_GROUP: 'processor', REDIS_CONSUMER_NAME: 'test-consumer', METRICS_PORT: 0, BATCH_SIZE: 100, BATCH_BLOCK_MS: 500, WRITE_BATCH_SIZE: 50, DEVICE_STATE_LRU_CAP: 10_000, LIVE_WS_PORT: 8081, LIVE_WS_HOST: '0.0.0.0', LIVE_WS_PING_INTERVAL_MS: 30_000, LIVE_WS_DRAIN_TIMEOUT_MS: 5_000, LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES: 1_048_576, DIRECTUS_BASE_URL: 'http://directus.test', DIRECTUS_AUTH_TIMEOUT_MS: 5_000, DIRECTUS_AUTHZ_TIMEOUT_MS: 5_000, LIVE_BROADCAST_GROUP_PREFIX: 'live-broadcast', LIVE_BROADCAST_BATCH_SIZE: 100, LIVE_BROADCAST_BATCH_BLOCK_MS: 1_000, LIVE_DEVICE_EVENT_REFRESH_MS: 30_000, }; } const EVENT_ID = 'ada60b3d-b29f-4017-b702-cd6b700f9f6c'; const EVENT_TOPIC = `event:${EVENT_ID}`; /** * Creates a synthetic LiveConnection stub that captures sent messages. */ function makeConn(id = 'conn-1'): LiveConnection & { sentMessages: unknown[] } { const sentMessages: unknown[] = []; const ws = { readyState: WebSocket.OPEN, bufferedAmount: 0, send: vi.fn((data: string) => { sentMessages.push(JSON.parse(data)); }), close: vi.fn(), } as unknown as WebSocket; return { id, ws, remoteAddr: '127.0.0.1', openedAt: new Date(), lastSeenAt: new Date(), user: { id: 'user-ada60b3d', email: 'test@example.com', role: null, first_name: 'Test', last_name: 'User', }, cookieHeader: 'session=valid', sentMessages, }; } function makeAllowedAuthzClient(): AuthzClient { return { canAccessEvent: vi.fn().mockResolvedValue({ allowed: true }), }; } function makeForbiddenAuthzClient(): AuthzClient { return { canAccessEvent: vi.fn().mockResolvedValue({ allowed: false, reason: 'forbidden' }), }; } // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- describe('createSubscriptionRegistry', () => { let metrics: TestMetrics; beforeEach(() => { metrics = makeMetrics(); }); it('subscribe to a valid event topic with permitted user → subscribed reply and gauge increment', async () => { const conn = makeConn(); const registry = createSubscriptionRegistry( makeAllowedAuthzClient(), makeConfig(), makeSilentLogger(), metrics, ); await registry.subscribe(conn, EVENT_TOPIC, 'corr-1'); // Should have sent a `subscribed` message. expect(conn.sentMessages).toHaveLength(1); const msg = conn.sentMessages[0] as Record; expect(msg['type']).toBe('subscribed'); expect(msg['topic']).toBe(EVENT_TOPIC); expect(msg['id']).toBe('corr-1'); expect(Array.isArray(msg['snapshot'])).toBe(true); // Gauge should have been updated. const subGaugeCalls = metrics.observeCalls.filter( (c) => c.name === 'processor_live_subscriptions', ); expect(subGaugeCalls.length).toBeGreaterThanOrEqual(1); expect(subGaugeCalls[subGaugeCalls.length - 1]!.value).toBe(1); // Success counter. const successCalls = metrics.incCalls.filter( (c) => c.name === 'processor_live_subscribe_attempts_total' && c.labels?.['result'] === 'success', ); expect(successCalls).toHaveLength(1); }); it('subscribe with forbidden user → error/forbidden reply, no registry change', async () => { const conn = makeConn(); const registry = createSubscriptionRegistry( makeForbiddenAuthzClient(), makeConfig(), makeSilentLogger(), metrics, ); await registry.subscribe(conn, EVENT_TOPIC, 'corr-2'); const msg = conn.sentMessages[0] as Record; expect(msg['type']).toBe('error'); expect(msg['code']).toBe('forbidden'); expect(msg['topic']).toBe(EVENT_TOPIC); expect(msg['id']).toBe('corr-2'); // Gauge should NOT have changed. const subGaugeCalls = metrics.observeCalls.filter( (c) => c.name === 'processor_live_subscriptions', ); // May have been called with 0 for snapshot, but never with a positive value. const positiveGauge = subGaugeCalls.filter((c) => c.value > 0); expect(positiveGauge).toHaveLength(0); // connectionsForTopic should return empty. expect([...registry.connectionsForTopic(EVENT_TOPIC)]).toHaveLength(0); }); it('subscribe to device: → error/unknown-topic, no registry change', async () => { const conn = makeConn(); const authz = makeAllowedAuthzClient(); const registry = createSubscriptionRegistry( authz, makeConfig(), makeSilentLogger(), metrics, ); await registry.subscribe(conn, 'device:356307042441013', 'corr-3'); const msg = conn.sentMessages[0] as Record; expect(msg['type']).toBe('error'); expect(msg['code']).toBe('unknown-topic'); // Authz client should NOT have been called. expect(vi.mocked(authz.canAccessEvent)).not.toHaveBeenCalled(); }); it('subscribe twice to the same topic → idempotent (single subscription, subscribed each call)', async () => { const conn = makeConn(); const registry = createSubscriptionRegistry( makeAllowedAuthzClient(), makeConfig(), makeSilentLogger(), metrics, ); await registry.subscribe(conn, EVENT_TOPIC); await registry.subscribe(conn, EVENT_TOPIC); // second call // Both calls send `subscribed`. expect(conn.sentMessages).toHaveLength(2); const msgs = conn.sentMessages as Array>; expect(msgs[0]!['type']).toBe('subscribed'); expect(msgs[1]!['type']).toBe('subscribed'); // Gauge should only count once. const finalGaugeCalls = metrics.observeCalls.filter( (c) => c.name === 'processor_live_subscriptions', ); // Last value should be 1, not 2. expect(finalGaugeCalls[finalGaugeCalls.length - 1]!.value).toBe(1); // connectionsForTopic should have exactly one connection. expect([...registry.connectionsForTopic(EVENT_TOPIC)]).toHaveLength(1); }); it('unsubscribe from a subscribed topic → unsubscribed reply and gauge decrement', async () => { const conn = makeConn(); const registry = createSubscriptionRegistry( makeAllowedAuthzClient(), makeConfig(), makeSilentLogger(), metrics, ); await registry.subscribe(conn, EVENT_TOPIC); registry.unsubscribe(conn, EVENT_TOPIC, 'corr-4'); const msgs = conn.sentMessages as Array>; expect(msgs[1]!['type']).toBe('unsubscribed'); expect(msgs[1]!['topic']).toBe(EVENT_TOPIC); expect(msgs[1]!['id']).toBe('corr-4'); // Gauge should be back at 0. const finalGaugeCalls = metrics.observeCalls.filter( (c) => c.name === 'processor_live_subscriptions', ); expect(finalGaugeCalls[finalGaugeCalls.length - 1]!.value).toBe(0); }); it('unsubscribe from a topic not subscribed to → unsubscribed reply (idempotent), gauge unchanged', async () => { const conn = makeConn(); const registry = createSubscriptionRegistry( makeAllowedAuthzClient(), makeConfig(), makeSilentLogger(), metrics, ); // Unsubscribe without ever subscribing. registry.unsubscribe(conn, EVENT_TOPIC, 'corr-5'); const msg = conn.sentMessages[0] as Record; expect(msg['type']).toBe('unsubscribed'); // Gauge should still be 0 (not go negative). const finalGaugeCalls = metrics.observeCalls.filter( (c) => c.name === 'processor_live_subscriptions', ); const values = finalGaugeCalls.map((c) => c.value); expect(values.every((v) => v >= 0)).toBe(true); }); it('onConnectionClose removes all subscriptions; gauge returns to 0', async () => { const conn = makeConn(); const registry = createSubscriptionRegistry( makeAllowedAuthzClient(), makeConfig(), makeSilentLogger(), metrics, ); await registry.subscribe(conn, EVENT_TOPIC); await registry.subscribe(conn, `event:f6114c7e-1e94-488a-93c3-41060fcb06bc`); registry.onConnectionClose(conn); // Gauge should be 0. const finalGaugeCalls = metrics.observeCalls.filter( (c) => c.name === 'processor_live_subscriptions', ); expect(finalGaugeCalls[finalGaugeCalls.length - 1]!.value).toBe(0); // connectionsForTopic should be empty for both topics. expect([...registry.connectionsForTopic(EVENT_TOPIC)]).toHaveLength(0); }); it('connectionsForTopic returns only connections subscribed to that topic', async () => { const conn1 = makeConn('conn-1'); const conn2 = makeConn('conn-2'); const otherTopic = 'event:f6114c7e-1e94-488a-93c3-41060fcb06bc'; const registry = createSubscriptionRegistry( makeAllowedAuthzClient(), makeConfig(), makeSilentLogger(), metrics, ); await registry.subscribe(conn1, EVENT_TOPIC); await registry.subscribe(conn2, EVENT_TOPIC); await registry.subscribe(conn1, otherTopic); const connsForEvent = [...registry.connectionsForTopic(EVENT_TOPIC)]; expect(connsForEvent).toHaveLength(2); expect(connsForEvent.map((c) => c.id).sort()).toEqual(['conn-1', 'conn-2'].sort()); const connsForOther = [...registry.connectionsForTopic(otherTopic)]; expect(connsForOther).toHaveLength(1); expect(connsForOther[0]!.id).toBe('conn-1'); }); it('stats() returns correct counts', async () => { const conn1 = makeConn('conn-1'); const conn2 = makeConn('conn-2'); const topic2 = 'event:f6114c7e-1e94-488a-93c3-41060fcb06bc'; const registry = createSubscriptionRegistry( makeAllowedAuthzClient(), makeConfig(), makeSilentLogger(), metrics, ); await registry.subscribe(conn1, EVENT_TOPIC); await registry.subscribe(conn2, EVENT_TOPIC); await registry.subscribe(conn1, topic2); const s = registry.stats(); expect(s.subscriptions).toBe(3); expect(s.topics).toBe(2); }); });