diff --git a/src/live/authz.ts b/src/live/authz.ts new file mode 100644 index 0000000..10eb470 --- /dev/null +++ b/src/live/authz.ts @@ -0,0 +1,90 @@ +/** + * Per-event authorization client. + * + * Checks whether a user has access to a specific event by delegating to + * Directus's REST API with the user's cookie. Directus enforces row-level + * security; if Directus returns 200 the user has access. If 403, they don't. + * + * Authorization is checked ONCE at subscribe time. The hot fan-out path has + * zero Directus calls — it operates entirely on in-memory subscription state. + * + * Spec: docs/wiki/synthesis/processor-ws-contract.md §Subscription model + */ + +import type { Config } from '../config/load.js'; +import type { Metrics } from '../shared/types.js'; +import type { Logger } from 'pino'; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** + * Result of an authorization check. + * `allowed: true` → user may subscribe to the topic. + * `allowed: false` → user is rejected; `reason` tells the client why. + */ +export type AuthzResult = + | { readonly allowed: true } + | { readonly allowed: false; readonly reason: 'forbidden' | 'not-found' | 'error' }; + +export type AuthzClient = { + /** + * Checks whether the user identified by `cookieHeader` can access + * the event with `eventId`. + * + * Delegates to `GET /items/events/?fields=id` with the user's + * cookie. Directus's row-level security does the org-membership check. + * + * Never throws. Returns `{ allowed: false, reason: 'error' }` on any + * transient failure. + */ + readonly canAccessEvent: ( + cookieHeader: string, + eventId: string, + ) => Promise; +}; + +// --------------------------------------------------------------------------- +// Factory +// --------------------------------------------------------------------------- + +export function createAuthzClient( + config: Config, + logger: Logger, + metrics: Metrics, +): AuthzClient { + async function canAccessEvent( + cookieHeader: string, + eventId: string, + ): Promise { + const start = performance.now(); + try { + const res = await fetch( + `${config.DIRECTUS_BASE_URL}/items/events/${eventId}?fields=id`, + { + method: 'GET', + headers: { cookie: cookieHeader }, + signal: AbortSignal.timeout(config.DIRECTUS_AUTHZ_TIMEOUT_MS), + }, + ); + + if (res.status === 200) return { allowed: true }; + if (res.status === 403) return { allowed: false, reason: 'forbidden' }; + if (res.status === 404) return { allowed: false, reason: 'not-found' }; + + logger.warn( + { status: res.status, eventId }, + 'directus /items/events returned unexpected status', + ); + return { allowed: false, reason: 'error' }; + } catch (err) { + logger.warn({ err, eventId }, 'directus authz call failed'); + return { allowed: false, reason: 'error' }; + } finally { + metrics.observe('processor_live_authz_latency_ms', performance.now() - start); + } + } + + return { canAccessEvent }; +} diff --git a/src/live/protocol.ts b/src/live/protocol.ts index 49baf67..5758725 100644 --- a/src/live/protocol.ts +++ b/src/live/protocol.ts @@ -130,7 +130,9 @@ export type ErrorCode = | 'unknown-topic' | 'protocol-violation' | 'not-implemented' - | 'rate-limited'; + | 'rate-limited' + /** Transient server-side error (e.g. Directus authz call failed). Retry. */ + | 'error'; /** * An error response from the server, scoped to a topic or connection-level. diff --git a/src/live/registry.ts b/src/live/registry.ts new file mode 100644 index 0000000..dc8ccf7 --- /dev/null +++ b/src/live/registry.ts @@ -0,0 +1,324 @@ +/** + * Subscription registry — manages the bidirectional mapping between WebSocket + * connections and topics, and handles per-event authorization at subscribe time. + * + * Data structures: + * - connectionTopics: WeakMap> (conn → topics) + * WeakMap allows GC cleanup if a connection somehow leaks the onConnectionClose call. + * - topicConnections: Map> (topic → conns) + * Standard Map keyed by topic string; cleaned up by onConnectionClose. + * + * Authorization: + * - Checked ONCE per subscribe, via the authz client (Directus /items/events/). + * - Zero Directus calls in the fan-out hot path. + * + * Snapshot: + * - Task 1.5.3 sends an empty snapshot with `subscribed`. Task 1.5.5 wires in + * the real snapshot provider to populate the array. + * + * Spec: docs/wiki/synthesis/processor-ws-contract.md §Subscription model + */ + +import type { Logger } from 'pino'; +import type { Metrics } from '../shared/types.js'; +import type { LiveConnection } from './server.js'; +import { sendOutbound } from './server.js'; +import type { AuthzClient } from './authz.js'; +import type { Config } from '../config/load.js'; +import type { PositionSnapshotEntry } from './protocol.js'; + +// --------------------------------------------------------------------------- +// Topic parsing +// --------------------------------------------------------------------------- + +const EVENT_TOPIC_REGEX = + /^event:([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})$/i; + +type ParsedTopic = { readonly kind: 'event'; readonly eventId: string }; + +function parseTopic(topic: string): ParsedTopic | null { + const match = EVENT_TOPIC_REGEX.exec(topic); + if (match?.[1]) return { kind: 'event', eventId: match[1] }; + return null; +} + +// --------------------------------------------------------------------------- +// Snapshot provider type (injected from task 1.5.5) +// --------------------------------------------------------------------------- + +/** + * Pluggable snapshot provider. Task 1.5.3 uses the stub (empty array). + * Task 1.5.5 injects the real Postgres-backed provider. + */ +export type SnapshotProvider = { + readonly forEvent: (eventId: string) => Promise; +}; + +const STUB_SNAPSHOT_PROVIDER: SnapshotProvider = { + forEvent: () => Promise.resolve([]), +}; + +// --------------------------------------------------------------------------- +// Public interface +// --------------------------------------------------------------------------- + +export type SubscriptionRegistry = { + /** Subscribe `conn` to `topic`. Authorizes, then sends `subscribed` or `error`. */ + readonly subscribe: ( + conn: LiveConnection, + topic: string, + correlationId?: string, + ) => Promise; + /** Unsubscribe `conn` from `topic`. Always sends `unsubscribed` (idempotent). */ + readonly unsubscribe: ( + conn: LiveConnection, + topic: string, + correlationId?: string, + ) => void; + /** Remove all subscriptions for a closed connection (called on ws close). */ + readonly onConnectionClose: (conn: LiveConnection) => void; + /** Iterates all connections currently subscribed to `topic`. Used by fan-out. */ + readonly connectionsForTopic: (topic: string) => Iterable; + /** Iterates all topics the given connection is subscribed to. */ + readonly topicsForConnection: (conn: LiveConnection) => Iterable; + /** Aggregate stats for monitoring and sanity checks. */ + readonly stats: () => { connections: number; topics: number; subscriptions: number }; +}; + +// --------------------------------------------------------------------------- +// Factory +// --------------------------------------------------------------------------- + +export function createSubscriptionRegistry( + authzClient: AuthzClient, + config: Config, + logger: Logger, + metrics: Metrics, + snapshotProvider: SnapshotProvider = STUB_SNAPSHOT_PROVIDER, +): SubscriptionRegistry { + // conn → Set of topic strings the connection is subscribed to. + // WeakMap: if a connection object is somehow not cleaned up via onConnectionClose, + // the GC will reclaim the Set when the connection is collected. + const connectionTopics = new WeakMap>(); + + // topic string → Set of connections subscribed to that topic. + const topicConnections = new Map>(); + + // Total active subscriptions counter (kept in sync with topicConnections). + let totalSubscriptions = 0; + + // ------------------------------------------------------------------------- + // Subscribe + // ------------------------------------------------------------------------- + + async function subscribe( + conn: LiveConnection, + topic: string, + correlationId?: string, + ): Promise { + const parsed = parseTopic(topic); + if (!parsed) { + sendOutbound( + conn, + { + type: 'error', + topic, + id: correlationId, + code: 'unknown-topic', + message: 'Unknown topic format. Supported: event:', + }, + metrics, + config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES, + ); + metrics.inc('processor_live_subscribe_attempts_total', { result: 'unknown-topic' }); + return; + } + + // Idempotent: if already subscribed, re-send `subscribed` with a fresh snapshot. + const existing = connectionTopics.get(conn); + if (existing?.has(topic)) { + const snapshot = await fetchSnapshot(parsed.eventId); + sendOutbound( + conn, + { type: 'subscribed', topic, id: correlationId, snapshot }, + metrics, + config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES, + ); + // Do not double-count in subscriptions gauge. + return; + } + + // Authorization check — one Directus call per subscribe. + const verdict = await authzClient.canAccessEvent(conn.cookieHeader, parsed.eventId); + if (!verdict.allowed) { + sendOutbound( + conn, + { + type: 'error', + topic, + id: correlationId, + code: verdict.reason, + message: buildForbiddenMessage(verdict.reason), + }, + metrics, + config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES, + ); + metrics.inc('processor_live_subscribe_attempts_total', { result: verdict.reason }); + return; + } + + // Fetch snapshot (fails open — snapshot failure does not block the subscribe). + const snapshot = await fetchSnapshot(parsed.eventId); + + // Insert into both indexes. + if (!existing) connectionTopics.set(conn, new Set()); + connectionTopics.get(conn)!.add(topic); + + if (!topicConnections.has(topic)) topicConnections.set(topic, new Set()); + topicConnections.get(topic)!.add(conn); + + totalSubscriptions += 1; + metrics.observe('processor_live_subscriptions', totalSubscriptions); + metrics.inc('processor_live_subscribe_attempts_total', { result: 'success' }); + + logger.debug( + { connId: conn.id, topic, userId: conn.user.id }, + 'subscribed', + ); + + sendOutbound( + conn, + { type: 'subscribed', topic, id: correlationId, snapshot }, + metrics, + config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES, + ); + } + + // ------------------------------------------------------------------------- + // Unsubscribe + // ------------------------------------------------------------------------- + + function unsubscribe( + conn: LiveConnection, + topic: string, + correlationId?: string, + ): void { + const topics = connectionTopics.get(conn); + const wasSubscribed = topics?.has(topic) ?? false; + + topics?.delete(topic); + + const conns = topicConnections.get(topic); + if (conns) { + conns.delete(conn); + if (conns.size === 0) topicConnections.delete(topic); + } + + if (wasSubscribed) { + totalSubscriptions -= 1; + metrics.observe('processor_live_subscriptions', totalSubscriptions); + } + + logger.debug({ connId: conn.id, topic }, 'unsubscribed'); + + // Always reply, even if not subscribed (idempotent). + sendOutbound( + conn, + { type: 'unsubscribed', topic, id: correlationId }, + metrics, + config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES, + ); + } + + // ------------------------------------------------------------------------- + // onConnectionClose + // ------------------------------------------------------------------------- + + function onConnectionClose(conn: LiveConnection): void { + const topics = connectionTopics.get(conn); + if (!topics) return; + + for (const topic of topics) { + const conns = topicConnections.get(topic); + if (conns) { + conns.delete(conn); + if (conns.size === 0) topicConnections.delete(topic); + } + totalSubscriptions -= 1; + } + + connectionTopics.delete(conn); + metrics.observe('processor_live_subscriptions', totalSubscriptions); + + logger.debug( + { connId: conn.id, removedTopics: topics.size }, + 'connection closed — subscriptions cleaned up', + ); + } + + // ------------------------------------------------------------------------- + // Query + // ------------------------------------------------------------------------- + + function connectionsForTopic(topic: string): Iterable { + return topicConnections.get(topic) ?? new Set(); + } + + function topicsForConnection(conn: LiveConnection): Iterable { + return connectionTopics.get(conn) ?? new Set(); + } + + function stats(): { connections: number; topics: number; subscriptions: number } { + return { + connections: topicConnections.size > 0 + ? [...topicConnections.values()].reduce((acc, s) => acc + s.size, 0) + : 0, + topics: topicConnections.size, + subscriptions: totalSubscriptions, + }; + } + + // ------------------------------------------------------------------------- + // Snapshot helper + // ------------------------------------------------------------------------- + + async function fetchSnapshot(eventId: string): Promise { + const start = performance.now(); + try { + const snapshot = await snapshotProvider.forEvent(eventId); + metrics.observe('processor_live_snapshot_query_latency_ms', performance.now() - start); + metrics.observe('processor_live_snapshot_size', snapshot.length); + return snapshot; + } catch (err) { + logger.warn( + { err, eventId }, + 'snapshot query failed; sending empty snapshot', + ); + return []; + } + } + + return { + subscribe, + unsubscribe, + onConnectionClose, + connectionsForTopic, + topicsForConnection, + stats, + }; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function buildForbiddenMessage(reason: 'forbidden' | 'not-found' | 'error'): string { + switch (reason) { + case 'forbidden': + return 'User does not have access to this event.'; + case 'not-found': + return 'Event not found.'; + case 'error': + return 'Authorization check failed. Please try again.'; + } +} diff --git a/src/main.ts b/src/main.ts index fc1a707..99e46c1 100644 --- a/src/main.ts +++ b/src/main.ts @@ -16,10 +16,12 @@ import { connectRedis, createConsumer } from './core/consumer.js'; import type { ConsumedRecord } from './core/consumer.js'; import { createDeviceStateStore } from './core/state.js'; import { createWriter } from './core/writer.js'; -import { createLiveServer, sendOutbound } from './live/server.js'; +import { createLiveServer } from './live/server.js'; import type { LiveServer, LiveConnection } from './live/server.js'; import type { InboundMessage } from './live/protocol.js'; import { createAuthClient } from './live/auth.js'; +import { createAuthzClient } from './live/authz.js'; +import { createSubscriptionRegistry } from './live/registry.js'; // ------------------------------------------------------------------------- // Startup: validate config (fail fast on bad env), build logger @@ -132,29 +134,28 @@ async function main(): Promise { return ackIds; }; - // 10. Build the live WebSocket server (task 1.5.2 adds auth). - // The stub message handler replies with `error/not-implemented` until - // task 1.5.3 wires in the real subscription-registry handler. + // 10. Build the live WebSocket server (tasks 1.5.2 and 1.5.3). const authClient = createAuthClient(config, logger, metrics); + const authzClient = createAuthzClient(config, logger, metrics); + const registry = createSubscriptionRegistry(authzClient, config, logger, metrics); - const stubMessageHandler = async ( + const messageHandler = async ( conn: LiveConnection, - _message: InboundMessage, + message: InboundMessage, ): Promise => { - sendOutbound( - conn, - { type: 'error', code: 'not-implemented' }, - metrics, - config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES, - ); + if (message.type === 'subscribe') { + await registry.subscribe(conn, message.topic, message.id); + } else if (message.type === 'unsubscribe') { + registry.unsubscribe(conn, message.topic, message.id); + } }; const liveServer: LiveServer = createLiveServer( config, logger, metrics, - stubMessageHandler, - undefined, // onClose: wired in task 1.5.3 + messageHandler, + (conn) => registry.onConnectionClose(conn), authClient, ); await liveServer.start(); diff --git a/test/live-authz.test.ts b/test/live-authz.test.ts new file mode 100644 index 0000000..7fa7dc8 --- /dev/null +++ b/test/live-authz.test.ts @@ -0,0 +1,148 @@ +/** + * Unit tests for src/live/authz.ts — per-event authorization. + * + * Covers: + * - canAccessEvent returns { allowed: true } when /items/events/:id returns 200. + * - Returns { allowed: false, reason: 'forbidden' } on 403. + * - Returns { allowed: false, reason: 'not-found' } on 404. + * - Returns { allowed: false, reason: 'error' } on network failure (never throws). + * - Returns { allowed: false, reason: 'error' } on 500. + * - Authz latency histogram is observed on every call. + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import type { Logger } from 'pino'; +import type { Config } from '../src/config/load.js'; +import type { Metrics } from '../src/core/types.js'; +import { createAuthzClient } from '../src/live/authz.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeSilentLogger(): Logger { + return { + debug: vi.fn(), info: vi.fn(), warn: vi.fn(), error: vi.fn(), + fatal: vi.fn(), trace: vi.fn(), child: vi.fn().mockReturnThis(), + level: 'silent', silent: vi.fn(), + } as unknown as Logger; +} + +type TestMetrics = Metrics & { + readonly observeCalls: Array<{ name: string; value: number }>; +}; + +function makeMetrics(): TestMetrics { + const observeCalls: Array<{ name: string; value: number }> = []; + return { + observeCalls, + inc: vi.fn(), + observe(name, value) { observeCalls.push({ name, value }); }, + }; +} + +function makeConfig(): Config { + return { + NODE_ENV: 'test', + INSTANCE_ID: 'test-1', + LOG_LEVEL: 'silent', + REDIS_URL: 'redis://localhost:6379', + POSTGRES_URL: 'postgres://localhost:5432/test', + REDIS_TELEMETRY_STREAM: 'telemetry:t', + REDIS_CONSUMER_GROUP: 'processor', + REDIS_CONSUMER_NAME: 'test-consumer', + METRICS_PORT: 0, + BATCH_SIZE: 100, + BATCH_BLOCK_MS: 500, + WRITE_BATCH_SIZE: 50, + DEVICE_STATE_LRU_CAP: 10_000, + LIVE_WS_PORT: 8081, + LIVE_WS_HOST: '0.0.0.0', + LIVE_WS_PING_INTERVAL_MS: 30_000, + LIVE_WS_DRAIN_TIMEOUT_MS: 5_000, + LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES: 1_048_576, + DIRECTUS_BASE_URL: 'http://directus.test', + DIRECTUS_AUTH_TIMEOUT_MS: 5_000, + DIRECTUS_AUTHZ_TIMEOUT_MS: 5_000, + LIVE_BROADCAST_GROUP_PREFIX: 'live-broadcast', + LIVE_BROADCAST_BATCH_SIZE: 100, + LIVE_BROADCAST_BATCH_BLOCK_MS: 1_000, + LIVE_DEVICE_EVENT_REFRESH_MS: 30_000, + }; +} + +const EVENT_ID = 'ada60b3d-b29f-4017-b702-cd6b700f9f6c'; + +function makeStatusFetch(status: number): typeof fetch { + return vi.fn().mockResolvedValue({ + status, + ok: status >= 200 && status < 300, + json: () => Promise.resolve({ data: { id: EVENT_ID } }), + } as unknown as Response); +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('createAuthzClient.canAccessEvent', () => { + let originalFetch: typeof globalThis.fetch; + + beforeEach(() => { originalFetch = globalThis.fetch; }); + afterEach(() => { + globalThis.fetch = originalFetch; + vi.restoreAllMocks(); + }); + + it('returns { allowed: true } when Directus returns 200', async () => { + globalThis.fetch = makeStatusFetch(200); + const client = createAuthzClient(makeConfig(), makeSilentLogger(), makeMetrics()); + const result = await client.canAccessEvent('cookie=abc', EVENT_ID); + expect(result.allowed).toBe(true); + }); + + it('returns { allowed: false, reason: "forbidden" } on 403', async () => { + globalThis.fetch = makeStatusFetch(403); + const client = createAuthzClient(makeConfig(), makeSilentLogger(), makeMetrics()); + const result = await client.canAccessEvent('cookie=abc', EVENT_ID); + expect(result.allowed).toBe(false); + if (!result.allowed) expect(result.reason).toBe('forbidden'); + }); + + it('returns { allowed: false, reason: "not-found" } on 404', async () => { + globalThis.fetch = makeStatusFetch(404); + const client = createAuthzClient(makeConfig(), makeSilentLogger(), makeMetrics()); + const result = await client.canAccessEvent('cookie=abc', EVENT_ID); + expect(result.allowed).toBe(false); + if (!result.allowed) expect(result.reason).toBe('not-found'); + }); + + it('returns { allowed: false, reason: "error" } on 500', async () => { + globalThis.fetch = makeStatusFetch(500); + const client = createAuthzClient(makeConfig(), makeSilentLogger(), makeMetrics()); + const result = await client.canAccessEvent('cookie=abc', EVENT_ID); + expect(result.allowed).toBe(false); + if (!result.allowed) expect(result.reason).toBe('error'); + }); + + it('returns { allowed: false, reason: "error" } when fetch throws (never throws itself)', async () => { + globalThis.fetch = vi.fn().mockRejectedValue(new Error('ECONNREFUSED')); + const client = createAuthzClient(makeConfig(), makeSilentLogger(), makeMetrics()); + const result = await client.canAccessEvent('cookie=abc', EVENT_ID); + expect(result.allowed).toBe(false); + if (!result.allowed) expect(result.reason).toBe('error'); + }); + + it('observes authz latency on every call', async () => { + globalThis.fetch = makeStatusFetch(200); + const metrics = makeMetrics(); + const client = createAuthzClient(makeConfig(), makeSilentLogger(), metrics); + await client.canAccessEvent('cookie=abc', EVENT_ID); + + const latencyCalls = metrics.observeCalls.filter( + (c) => c.name === 'processor_live_authz_latency_ms', + ); + expect(latencyCalls.length).toBeGreaterThanOrEqual(1); + expect(latencyCalls[0]!.value).toBeGreaterThanOrEqual(0); + }); +}); diff --git a/test/live-registry.test.ts b/test/live-registry.test.ts new file mode 100644 index 0000000..fa37e38 --- /dev/null +++ b/test/live-registry.test.ts @@ -0,0 +1,345 @@ +/** + * Unit tests for src/live/registry.ts — subscription registry. + * + * The registry is instantiated with a mocked authz client and a mocked + * sendOutbound path. LiveConnection objects are synthetic stubs. + * + * Covers: + * - Subscribe to event: with permitted user → `subscribed` reply, + * registry counts go up. + * - Subscribe with forbidden user → `error/forbidden` reply, no registry change. + * - Subscribe to `device:` → `error/unknown-topic`, no registry change. + * - Subscribe twice to the same topic → idempotent (single subscription, + * subscribed reply each call, gauge does not double-count). + * - Unsubscribe from a topic → `unsubscribed` reply, gauge decrements. + * - Unsubscribe from a topic not subscribed → `unsubscribed` reply (idempotent), + * gauge unchanged. + * - Connection close removes all subscriptions; gauge returns to pre-connection level. + * - connectionsForTopic returns the correct set. + * - topicsForConnection returns the correct set. + */ + +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import type { Logger } from 'pino'; +import type { Config } from '../src/config/load.js'; +import type { Metrics } from '../src/core/types.js'; +import { createSubscriptionRegistry } from '../src/live/registry.js'; +import type { AuthzClient } from '../src/live/authz.js'; +import type { LiveConnection } from '../src/live/server.js'; +import WebSocket from 'ws'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeSilentLogger(): Logger { + return { + debug: vi.fn(), info: vi.fn(), warn: vi.fn(), error: vi.fn(), + fatal: vi.fn(), trace: vi.fn(), child: vi.fn().mockReturnThis(), + level: 'silent', silent: vi.fn(), + } as unknown as Logger; +} + +type TestMetrics = Metrics & { + readonly incCalls: Array<{ name: string; labels?: Record }>; + readonly observeCalls: Array<{ name: string; value: number }>; +}; + +function makeMetrics(): TestMetrics { + const incCalls: Array<{ name: string; labels?: Record }> = []; + const observeCalls: Array<{ name: string; value: number }> = []; + return { + incCalls, + observeCalls, + inc(name, labels) { incCalls.push({ name, labels }); }, + observe(name, value) { observeCalls.push({ name, value }); }, + }; +} + +function makeConfig(): Config { + return { + NODE_ENV: 'test', + INSTANCE_ID: 'test-1', + LOG_LEVEL: 'silent', + REDIS_URL: 'redis://localhost:6379', + POSTGRES_URL: 'postgres://localhost:5432/test', + REDIS_TELEMETRY_STREAM: 'telemetry:t', + REDIS_CONSUMER_GROUP: 'processor', + REDIS_CONSUMER_NAME: 'test-consumer', + METRICS_PORT: 0, + BATCH_SIZE: 100, + BATCH_BLOCK_MS: 500, + WRITE_BATCH_SIZE: 50, + DEVICE_STATE_LRU_CAP: 10_000, + LIVE_WS_PORT: 8081, + LIVE_WS_HOST: '0.0.0.0', + LIVE_WS_PING_INTERVAL_MS: 30_000, + LIVE_WS_DRAIN_TIMEOUT_MS: 5_000, + LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES: 1_048_576, + DIRECTUS_BASE_URL: 'http://directus.test', + DIRECTUS_AUTH_TIMEOUT_MS: 5_000, + DIRECTUS_AUTHZ_TIMEOUT_MS: 5_000, + LIVE_BROADCAST_GROUP_PREFIX: 'live-broadcast', + LIVE_BROADCAST_BATCH_SIZE: 100, + LIVE_BROADCAST_BATCH_BLOCK_MS: 1_000, + LIVE_DEVICE_EVENT_REFRESH_MS: 30_000, + }; +} + +const EVENT_ID = 'ada60b3d-b29f-4017-b702-cd6b700f9f6c'; +const EVENT_TOPIC = `event:${EVENT_ID}`; + +/** + * Creates a synthetic LiveConnection stub that captures sent messages. + */ +function makeConn(id = 'conn-1'): LiveConnection & { sentMessages: unknown[] } { + const sentMessages: unknown[] = []; + const ws = { + readyState: WebSocket.OPEN, + bufferedAmount: 0, + send: vi.fn((data: string) => { sentMessages.push(JSON.parse(data)); }), + close: vi.fn(), + } as unknown as WebSocket; + + return { + id, + ws, + remoteAddr: '127.0.0.1', + openedAt: new Date(), + lastSeenAt: new Date(), + user: { + id: 'user-ada60b3d', + email: 'test@example.com', + role: null, + first_name: 'Test', + last_name: 'User', + }, + cookieHeader: 'session=valid', + sentMessages, + }; +} + +function makeAllowedAuthzClient(): AuthzClient { + return { + canAccessEvent: vi.fn().mockResolvedValue({ allowed: true }), + }; +} + +function makeForbiddenAuthzClient(): AuthzClient { + return { + canAccessEvent: vi.fn().mockResolvedValue({ allowed: false, reason: 'forbidden' }), + }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('createSubscriptionRegistry', () => { + let metrics: TestMetrics; + + beforeEach(() => { + metrics = makeMetrics(); + }); + + it('subscribe to a valid event topic with permitted user → subscribed reply and gauge increment', async () => { + const conn = makeConn(); + const registry = createSubscriptionRegistry( + makeAllowedAuthzClient(), makeConfig(), makeSilentLogger(), metrics, + ); + + await registry.subscribe(conn, EVENT_TOPIC, 'corr-1'); + + // Should have sent a `subscribed` message. + expect(conn.sentMessages).toHaveLength(1); + const msg = conn.sentMessages[0] as Record; + expect(msg['type']).toBe('subscribed'); + expect(msg['topic']).toBe(EVENT_TOPIC); + expect(msg['id']).toBe('corr-1'); + expect(Array.isArray(msg['snapshot'])).toBe(true); + + // Gauge should have been updated. + const subGaugeCalls = metrics.observeCalls.filter( + (c) => c.name === 'processor_live_subscriptions', + ); + expect(subGaugeCalls.length).toBeGreaterThanOrEqual(1); + expect(subGaugeCalls[subGaugeCalls.length - 1]!.value).toBe(1); + + // Success counter. + const successCalls = metrics.incCalls.filter( + (c) => c.name === 'processor_live_subscribe_attempts_total' && c.labels?.['result'] === 'success', + ); + expect(successCalls).toHaveLength(1); + }); + + it('subscribe with forbidden user → error/forbidden reply, no registry change', async () => { + const conn = makeConn(); + const registry = createSubscriptionRegistry( + makeForbiddenAuthzClient(), makeConfig(), makeSilentLogger(), metrics, + ); + + await registry.subscribe(conn, EVENT_TOPIC, 'corr-2'); + + const msg = conn.sentMessages[0] as Record; + expect(msg['type']).toBe('error'); + expect(msg['code']).toBe('forbidden'); + expect(msg['topic']).toBe(EVENT_TOPIC); + expect(msg['id']).toBe('corr-2'); + + // Gauge should NOT have changed. + const subGaugeCalls = metrics.observeCalls.filter( + (c) => c.name === 'processor_live_subscriptions', + ); + // May have been called with 0 for snapshot, but never with a positive value. + const positiveGauge = subGaugeCalls.filter((c) => c.value > 0); + expect(positiveGauge).toHaveLength(0); + + // connectionsForTopic should return empty. + expect([...registry.connectionsForTopic(EVENT_TOPIC)]).toHaveLength(0); + }); + + it('subscribe to device: → error/unknown-topic, no registry change', async () => { + const conn = makeConn(); + const authz = makeAllowedAuthzClient(); + const registry = createSubscriptionRegistry( + authz, makeConfig(), makeSilentLogger(), metrics, + ); + + await registry.subscribe(conn, 'device:356307042441013', 'corr-3'); + + const msg = conn.sentMessages[0] as Record; + expect(msg['type']).toBe('error'); + expect(msg['code']).toBe('unknown-topic'); + + // Authz client should NOT have been called. + expect(vi.mocked(authz.canAccessEvent)).not.toHaveBeenCalled(); + }); + + it('subscribe twice to the same topic → idempotent (single subscription, subscribed each call)', async () => { + const conn = makeConn(); + const registry = createSubscriptionRegistry( + makeAllowedAuthzClient(), makeConfig(), makeSilentLogger(), metrics, + ); + + await registry.subscribe(conn, EVENT_TOPIC); + await registry.subscribe(conn, EVENT_TOPIC); // second call + + // Both calls send `subscribed`. + expect(conn.sentMessages).toHaveLength(2); + const msgs = conn.sentMessages as Array>; + expect(msgs[0]!['type']).toBe('subscribed'); + expect(msgs[1]!['type']).toBe('subscribed'); + + // Gauge should only count once. + const finalGaugeCalls = metrics.observeCalls.filter( + (c) => c.name === 'processor_live_subscriptions', + ); + // Last value should be 1, not 2. + expect(finalGaugeCalls[finalGaugeCalls.length - 1]!.value).toBe(1); + + // connectionsForTopic should have exactly one connection. + expect([...registry.connectionsForTopic(EVENT_TOPIC)]).toHaveLength(1); + }); + + it('unsubscribe from a subscribed topic → unsubscribed reply and gauge decrement', async () => { + const conn = makeConn(); + const registry = createSubscriptionRegistry( + makeAllowedAuthzClient(), makeConfig(), makeSilentLogger(), metrics, + ); + + await registry.subscribe(conn, EVENT_TOPIC); + registry.unsubscribe(conn, EVENT_TOPIC, 'corr-4'); + + const msgs = conn.sentMessages as Array>; + expect(msgs[1]!['type']).toBe('unsubscribed'); + expect(msgs[1]!['topic']).toBe(EVENT_TOPIC); + expect(msgs[1]!['id']).toBe('corr-4'); + + // Gauge should be back at 0. + const finalGaugeCalls = metrics.observeCalls.filter( + (c) => c.name === 'processor_live_subscriptions', + ); + expect(finalGaugeCalls[finalGaugeCalls.length - 1]!.value).toBe(0); + }); + + it('unsubscribe from a topic not subscribed to → unsubscribed reply (idempotent), gauge unchanged', async () => { + const conn = makeConn(); + const registry = createSubscriptionRegistry( + makeAllowedAuthzClient(), makeConfig(), makeSilentLogger(), metrics, + ); + + // Unsubscribe without ever subscribing. + registry.unsubscribe(conn, EVENT_TOPIC, 'corr-5'); + + const msg = conn.sentMessages[0] as Record; + expect(msg['type']).toBe('unsubscribed'); + + // Gauge should still be 0 (not go negative). + const finalGaugeCalls = metrics.observeCalls.filter( + (c) => c.name === 'processor_live_subscriptions', + ); + const values = finalGaugeCalls.map((c) => c.value); + expect(values.every((v) => v >= 0)).toBe(true); + }); + + it('onConnectionClose removes all subscriptions; gauge returns to 0', async () => { + const conn = makeConn(); + const registry = createSubscriptionRegistry( + makeAllowedAuthzClient(), makeConfig(), makeSilentLogger(), metrics, + ); + + await registry.subscribe(conn, EVENT_TOPIC); + await registry.subscribe(conn, `event:f6114c7e-1e94-488a-93c3-41060fcb06bc`); + + registry.onConnectionClose(conn); + + // Gauge should be 0. + const finalGaugeCalls = metrics.observeCalls.filter( + (c) => c.name === 'processor_live_subscriptions', + ); + expect(finalGaugeCalls[finalGaugeCalls.length - 1]!.value).toBe(0); + + // connectionsForTopic should be empty for both topics. + expect([...registry.connectionsForTopic(EVENT_TOPIC)]).toHaveLength(0); + }); + + it('connectionsForTopic returns only connections subscribed to that topic', async () => { + const conn1 = makeConn('conn-1'); + const conn2 = makeConn('conn-2'); + const otherTopic = 'event:f6114c7e-1e94-488a-93c3-41060fcb06bc'; + + const registry = createSubscriptionRegistry( + makeAllowedAuthzClient(), makeConfig(), makeSilentLogger(), metrics, + ); + + await registry.subscribe(conn1, EVENT_TOPIC); + await registry.subscribe(conn2, EVENT_TOPIC); + await registry.subscribe(conn1, otherTopic); + + const connsForEvent = [...registry.connectionsForTopic(EVENT_TOPIC)]; + expect(connsForEvent).toHaveLength(2); + expect(connsForEvent.map((c) => c.id).sort()).toEqual(['conn-1', 'conn-2'].sort()); + + const connsForOther = [...registry.connectionsForTopic(otherTopic)]; + expect(connsForOther).toHaveLength(1); + expect(connsForOther[0]!.id).toBe('conn-1'); + }); + + it('stats() returns correct counts', async () => { + const conn1 = makeConn('conn-1'); + const conn2 = makeConn('conn-2'); + const topic2 = 'event:f6114c7e-1e94-488a-93c3-41060fcb06bc'; + + const registry = createSubscriptionRegistry( + makeAllowedAuthzClient(), makeConfig(), makeSilentLogger(), metrics, + ); + + await registry.subscribe(conn1, EVENT_TOPIC); + await registry.subscribe(conn2, EVENT_TOPIC); + await registry.subscribe(conn1, topic2); + + const s = registry.stats(); + expect(s.subscriptions).toBe(3); + expect(s.topics).toBe(2); + }); +});