From 20ebd9b4735c9d8a4fc879046147bd94e7ada7da Mon Sep 17 00:00:00 2001 From: Julian Cuni Date: Sat, 2 May 2026 17:36:28 +0200 Subject: [PATCH] =?UTF-8?q?feat(live):=20task=201.5.2=20=E2=80=94=20cookie?= =?UTF-8?q?=20auth=20handshake?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Authenticate WebSocket upgrade requests via Directus's /users/me: - src/live/auth.ts: createAuthClient factory; validate() forwards the raw Cookie: header to Directus, parses the user with zod, and returns AuthenticatedUser or null. Handles 401/403 (unauthorized), non-2xx (error), network failures, AbortError (timeout), null data (expired session), and missing data key (malformed Directus response). - src/live/server.ts: upgrade handler now calls authClient.validate() before completing the WS handshake; on null user, writes HTTP 401 and destroys the socket. LiveConnection gains user: AuthenticatedUser and cookieHeader: string (needed for per-subscription authz in task 1.5.3). authClient is an optional parameter so tests without auth still work. - src/main.ts: wires createAuthClient and passes it to createLiveServer. - test/live-auth.test.ts: 11 unit tests covering all validate() code paths including the empty-cookie fast-path, latency histogram observation, and distinction between unauthorized (401/expired) and error (malformed) results. --- src/live/auth.ts | 152 +++++++++++++++++++++++ src/live/server.ts | 95 +++++++++++--- src/main.ts | 9 +- test/live-auth.test.ts | 273 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 513 insertions(+), 16 deletions(-) create mode 100644 src/live/auth.ts create mode 100644 test/live-auth.test.ts diff --git a/src/live/auth.ts b/src/live/auth.ts new file mode 100644 index 0000000..f69691c --- /dev/null +++ b/src/live/auth.ts @@ -0,0 +1,152 @@ +/** + * Cookie-based authentication for WebSocket connections. + * + * Validates the Directus-issued cookie attached to the upgrade request by + * making a single GET /users/me round-trip to Directus. On success, returns + * the user identity that is bound to the connection for its lifetime. + * + * Design notes: + * - No JWT validation locally — the round-trip is simpler, correct, and fast + * enough at pilot scale (≤500 viewers). + * - No retries — a failed validation immediately closes the upgrade. The SPA + * reconnects, giving a natural retry. Server-side retry masks credential bugs. + * - The entire cookie header is forwarded verbatim to Directus — Directus owns + * cookie parsing and session lookup. + * + * Spec: docs/wiki/synthesis/processor-ws-contract.md §Auth handshake + */ + +import { z } from 'zod'; +import type { Config } from '../config/load.js'; +import type { Metrics } from '../shared/types.js'; +import type { Logger } from 'pino'; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** + * The minimum user fields needed for per-subscription authorization (1.5.3) + * and Phase 4 permission enforcement. + */ +const AuthenticatedUserSchema = z.object({ + id: z.string().uuid(), + email: z.string().email().nullable().optional(), + role: z.string().uuid().nullable().optional(), + first_name: z.string().nullable().optional(), + last_name: z.string().nullable().optional(), +}); + +export type AuthenticatedUser = z.infer; + +/** + * Public interface returned by `createAuthClient`. + */ +export type AuthClient = { + /** + * Validates a raw `Cookie:` header value against Directus's `/users/me`. + * + * Returns the user identity on success, or `null` on any failure + * (network error, 401, malformed response, timeout). Never throws. + */ + readonly validate: (cookieHeader: string) => Promise; +}; + +// --------------------------------------------------------------------------- +// Factory +// --------------------------------------------------------------------------- + +export function createAuthClient( + config: Config, + logger: Logger, + metrics: Metrics, +): AuthClient { + async function validate(cookieHeader: string): Promise { + if (!cookieHeader) return null; + + const controller = new AbortController(); + const timer = setTimeout( + () => controller.abort(), + config.DIRECTUS_AUTH_TIMEOUT_MS, + ); + + const start = performance.now(); + try { + const res = await fetch( + `${config.DIRECTUS_BASE_URL}/users/me?fields=id,email,role,first_name,last_name`, + { + method: 'GET', + headers: { cookie: cookieHeader }, + signal: controller.signal, + }, + ); + + if (res.status === 401 || res.status === 403) { + metrics.inc('processor_live_auth_attempts_total', { result: 'unauthorized' }); + return null; + } + + if (!res.ok) { + logger.warn({ status: res.status }, 'directus /users/me returned non-2xx'); + metrics.inc('processor_live_auth_attempts_total', { result: 'error' }); + return null; + } + + // Directus returns { data: {...} } for /users/me. + const body = await res.json() as Record; + + // Check whether the `data` key is present at all. If it is missing + // entirely, that is an unexpected Directus response shape. + if (!('data' in body)) { + logger.warn('directus /users/me response missing data field'); + metrics.inc('processor_live_auth_attempts_total', { result: 'error' }); + return null; + } + + const data = body['data']; + + if (data === null || data === undefined) { + // Directus returns data: null when the session is expired but the + // cookie is structurally valid. Treat as unauthorized. + logger.warn('directus /users/me returned null data (expired session)'); + metrics.inc('processor_live_auth_attempts_total', { result: 'unauthorized' }); + return null; + } + + if (typeof data !== 'object') { + logger.warn({ data }, 'directus /users/me data field is not an object'); + metrics.inc('processor_live_auth_attempts_total', { result: 'error' }); + return null; + } + + const parsed = AuthenticatedUserSchema.safeParse(data); + if (!parsed.success) { + logger.warn( + { issues: parsed.error.issues }, + 'directus /users/me returned unexpected shape', + ); + metrics.inc('processor_live_auth_attempts_total', { result: 'error' }); + return null; + } + + metrics.inc('processor_live_auth_attempts_total', { result: 'success' }); + return parsed.data; + } catch (err) { + if (err instanceof Error && err.name === 'AbortError') { + logger.warn( + { timeoutMs: config.DIRECTUS_AUTH_TIMEOUT_MS }, + 'directus auth call timed out', + ); + } else { + logger.warn({ err }, 'directus auth call failed'); + } + metrics.inc('processor_live_auth_attempts_total', { result: 'error' }); + return null; + } finally { + clearTimeout(timer); + metrics.observe('processor_live_auth_latency_ms', performance.now() - start); + } + } + + return { validate }; +} diff --git a/src/live/server.ts b/src/live/server.ts index 7356b14..46c64fb 100644 --- a/src/live/server.ts +++ b/src/live/server.ts @@ -8,14 +8,16 @@ * - Runs on its own http.Server (separate from the Phase 1 metrics/health server * on :9090) so a proxy can route to different paths and failure modes don't * entangle. - * - Auth happens in the `'upgrade'` handler (task 1.5.2). This scaffold accepts - * all upgrades and logs the connection. - * - Message dispatch is pluggable via the `onMessage` callback so tasks 1.5.2 - * and 1.5.3 can attach the real auth/registry handler without touching this - * file's lifecycle logic. + * - Auth runs in the `'upgrade'` handler: validate the cookie via Directus before + * completing the WS upgrade. Rejected upgrades get an HTTP 401 response. + * - Message dispatch is pluggable via the `onMessage` callback so task 1.5.3 + * can attach the real subscription-registry handler. * - Heartbeat: WS frame-level ping every LIVE_WS_PING_INTERVAL_MS; pong updates * lastSeenAt. Do NOT use application-level ping messages — browser WS * implementations handle frame-level pings natively. + * - cookieHeader is stored on the connection so the authz client (task 1.5.3) + * can forward it to Directus for per-event authorization. It is sensitive + * material; never log it. */ import * as http from 'node:http'; @@ -26,15 +28,17 @@ import type { Config } from '../config/load.js'; import type { Metrics } from '../shared/types.js'; import { InboundMessage, WsCloseCodes } from './protocol.js'; import type { OutboundMessage } from './protocol.js'; +import type { AuthClient, AuthenticatedUser } from './auth.js'; // --------------------------------------------------------------------------- // Public types // --------------------------------------------------------------------------- /** - * Per-connection identity object. Augmented in later tasks (auth adds `user`; - * task 1.5.3 adds `cookieHeader`). Exported so the registry, auth, and - * broadcast modules can reference the same type. + * Per-connection identity object. Holds the validated user identity and the + * original cookie header (needed for per-subscription authorization in 1.5.3). + * + * `cookieHeader` is sensitive — never log it. */ export type LiveConnection = { readonly id: string; @@ -42,14 +46,17 @@ export type LiveConnection = { readonly remoteAddr: string; readonly openedAt: Date; lastSeenAt: Date; + readonly user: AuthenticatedUser; + /** The raw Cookie: header from the upgrade request. Used by the authz client + * to forward the user's session when checking event access. */ + readonly cookieHeader: string; }; /** * Message handler callback. The server calls this once per successfully parsed * inbound message. The handler is responsible for sending replies. * - * In task 1.5.1 this is a no-op stub that returns `error/not-implemented`. - * Tasks 1.5.2 and 1.5.3 replace it with the real auth+registry handler. + * Task 1.5.3 replaces the stub with the real subscription-registry handler. */ export type MessageHandler = ( conn: LiveConnection, @@ -120,6 +127,7 @@ export function createLiveServer( metrics: Metrics, onMessage: MessageHandler, onClose?: (conn: LiveConnection) => void, + authClient?: AuthClient, ): LiveServer { const connections = new Map(); @@ -131,12 +139,53 @@ export function createLiveServer( const wss = new WebSocketServer({ noServer: true }); // ------------------------------------------------------------------------- - // Upgrade handler (auth injected in task 1.5.2; accepted immediately here) + // Upgrade handler — validates auth before completing the WS handshake // ------------------------------------------------------------------------- httpServer.on('upgrade', (req, socket, head) => { - wss.handleUpgrade(req, socket, head, (ws) => { - wss.emit('connection', ws, req); + const cookieHeader = req.headers['cookie'] ?? ''; + + if (!authClient) { + // No auth client provided — accept the upgrade without validation. + // Used in tests that don't need auth. + wss.handleUpgrade(req, socket, head, (ws) => { + wss.emit('connection', ws, req, '', { id: 'anonymous', email: null, role: null, first_name: null, last_name: null } satisfies AuthenticatedUser); + }); + return; + } + + // Validate the cookie asynchronously. The upgrade handler must not hold + // the socket open for too long — the auth timeout (5s default) is the + // upper bound. + authClient.validate(cookieHeader).then((user) => { + if (!user) { + socket.write( + 'HTTP/1.1 401 Unauthorized\r\n' + + 'Content-Length: 0\r\n' + + 'Connection: close\r\n' + + '\r\n', + ); + socket.destroy(); + return; + } + + // Stash user + cookieHeader on the request so the connection handler + // can pick them up without a second async call. + (req as http.IncomingMessage & { _liveUser: AuthenticatedUser; _liveCookie: string })._liveUser = user; + (req as http.IncomingMessage & { _liveUser: AuthenticatedUser; _liveCookie: string })._liveCookie = cookieHeader; + + wss.handleUpgrade(req, socket, head, (ws) => { + wss.emit('connection', ws, req); + }); + }).catch((err: unknown) => { + logger.error({ err }, 'auth validation threw unexpectedly during upgrade'); + socket.write( + 'HTTP/1.1 500 Internal Server Error\r\n' + + 'Content-Length: 0\r\n' + + 'Connection: close\r\n' + + '\r\n', + ); + socket.destroy(); }); }); @@ -145,19 +194,37 @@ export function createLiveServer( // ------------------------------------------------------------------------- wss.on('connection', (ws, req: http.IncomingMessage) => { + // Retrieve the user stashed by the upgrade handler. When auth is disabled + // (no authClient), fall back to a placeholder anonymous user. + type AugmentedRequest = http.IncomingMessage & { + _liveUser?: AuthenticatedUser; + _liveCookie?: string; + }; + const augmented = req as AugmentedRequest; + const user: AuthenticatedUser = augmented._liveUser ?? { + id: crypto.randomUUID(), + email: null, + role: null, + first_name: null, + last_name: null, + }; + const cookieHeader = augmented._liveCookie ?? ''; + const conn: LiveConnection = { id: crypto.randomUUID(), ws, remoteAddr: req.socket.remoteAddress ?? 'unknown', openedAt: new Date(), lastSeenAt: new Date(), + user, + cookieHeader, }; connections.set(conn.id, conn); metrics.observe('processor_live_connections', connections.size); logger.debug( - { connId: conn.id, remote: conn.remoteAddr }, + { connId: conn.id, remote: conn.remoteAddr, userId: user.id }, 'connection opened', ); diff --git a/src/main.ts b/src/main.ts index 472af9a..fc1a707 100644 --- a/src/main.ts +++ b/src/main.ts @@ -19,6 +19,7 @@ import { createWriter } from './core/writer.js'; import { createLiveServer, sendOutbound } from './live/server.js'; import type { LiveServer, LiveConnection } from './live/server.js'; import type { InboundMessage } from './live/protocol.js'; +import { createAuthClient } from './live/auth.js'; // ------------------------------------------------------------------------- // Startup: validate config (fail fast on bad env), build logger @@ -131,9 +132,11 @@ async function main(): Promise { return ackIds; }; - // 10. Build the live WebSocket server (task 1.5.1). + // 10. Build the live WebSocket server (task 1.5.2 adds auth). // The stub message handler replies with `error/not-implemented` until - // tasks 1.5.2 and 1.5.3 wire in the real auth + registry handler. + // task 1.5.3 wires in the real subscription-registry handler. + const authClient = createAuthClient(config, logger, metrics); + const stubMessageHandler = async ( conn: LiveConnection, _message: InboundMessage, @@ -151,6 +154,8 @@ async function main(): Promise { logger, metrics, stubMessageHandler, + undefined, // onClose: wired in task 1.5.3 + authClient, ); await liveServer.start(); diff --git a/test/live-auth.test.ts b/test/live-auth.test.ts new file mode 100644 index 0000000..c2f9f4b --- /dev/null +++ b/test/live-auth.test.ts @@ -0,0 +1,273 @@ +/** + * Unit tests for src/live/auth.ts — Cookie auth handshake. + * + * All Directus HTTP calls are intercepted by mocking globalThis.fetch. + * No real network calls. + * + * Covers: + * - 200 + valid user payload → returns parsed AuthenticatedUser. + * - 401 → returns null and increments `unauthorized` counter. + * - 403 → returns null and increments `unauthorized` counter. + * - Non-2xx (500) → returns null and increments `error` counter. + * - Network error (fetch throws) → returns null and increments `error` counter. + * - AbortError (timeout) → returns null and increments `error` counter. + * - 200 but missing `data` field → returns null and increments `error` counter. + * - 200 with `data: null` (expired session) → returns null and increments `unauthorized`. + * - 200 but user object missing `id` → returns null and increments `error`. + * - Empty cookie header → returns null immediately (no fetch call). + * - Auth latency histogram is observed on success. + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import type { Logger } from 'pino'; +import type { Config } from '../src/config/load.js'; +import type { Metrics } from '../src/core/types.js'; +import { createAuthClient } from '../src/live/auth.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeSilentLogger(): Logger { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + fatal: vi.fn(), + trace: vi.fn(), + child: vi.fn().mockReturnThis(), + level: 'silent', + silent: vi.fn(), + } as unknown as Logger; +} + +type TestMetrics = Metrics & { + readonly incCalls: Array<{ name: string; labels?: Record }>; + readonly observeCalls: Array<{ name: string; value: number }>; +}; + +function makeMetrics(): TestMetrics { + const incCalls: Array<{ name: string; labels?: Record }> = []; + const observeCalls: Array<{ name: string; value: number }> = []; + return { + incCalls, + observeCalls, + inc(name, labels) { incCalls.push({ name, labels }); }, + observe(name, value) { observeCalls.push({ name, value }); }, + }; +} + +function makeConfig(overrides: Partial = {}): Config { + return { + NODE_ENV: 'test', + INSTANCE_ID: 'test-1', + LOG_LEVEL: 'silent', + REDIS_URL: 'redis://localhost:6379', + POSTGRES_URL: 'postgres://localhost:5432/test', + REDIS_TELEMETRY_STREAM: 'telemetry:t', + REDIS_CONSUMER_GROUP: 'processor', + REDIS_CONSUMER_NAME: 'test-consumer', + METRICS_PORT: 0, + BATCH_SIZE: 100, + BATCH_BLOCK_MS: 500, + WRITE_BATCH_SIZE: 50, + DEVICE_STATE_LRU_CAP: 10_000, + LIVE_WS_PORT: 8081, + LIVE_WS_HOST: '0.0.0.0', + LIVE_WS_PING_INTERVAL_MS: 30_000, + LIVE_WS_DRAIN_TIMEOUT_MS: 5_000, + LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES: 1_048_576, + DIRECTUS_BASE_URL: 'http://directus.test', + DIRECTUS_AUTH_TIMEOUT_MS: 5_000, + DIRECTUS_AUTHZ_TIMEOUT_MS: 5_000, + LIVE_BROADCAST_GROUP_PREFIX: 'live-broadcast', + LIVE_BROADCAST_BATCH_SIZE: 100, + LIVE_BROADCAST_BATCH_BLOCK_MS: 1_000, + LIVE_DEVICE_EVENT_REFRESH_MS: 30_000, + ...overrides, + }; +} + +const VALID_USER = { + id: 'ada60b3d-b29f-4017-b702-cd6b700f9f6c', + email: 'driver@example.com', + role: 'f6114c7e-1e94-488a-93c3-41060fcb06bc', + first_name: 'Test', + last_name: 'User', +}; + +function makeOkFetch(data: unknown): typeof fetch { + return vi.fn().mockResolvedValue({ + status: 200, + ok: true, + json: () => Promise.resolve({ data }), + } as unknown as Response); +} + +function makeStatusFetch(status: number): typeof fetch { + return vi.fn().mockResolvedValue({ + status, + ok: status >= 200 && status < 300, + json: () => Promise.resolve({}), + } as unknown as Response); +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('createAuthClient.validate', () => { + let metrics: TestMetrics; + let logger: Logger; + let originalFetch: typeof globalThis.fetch; + + beforeEach(() => { + metrics = makeMetrics(); + logger = makeSilentLogger(); + originalFetch = globalThis.fetch; + }); + + afterEach(() => { + globalThis.fetch = originalFetch; + vi.restoreAllMocks(); + }); + + it('returns the parsed user when Directus returns 200 with a valid user payload', async () => { + globalThis.fetch = makeOkFetch(VALID_USER); + const client = createAuthClient(makeConfig(), logger, metrics); + const user = await client.validate('session=abc123'); + + expect(user).not.toBeNull(); + expect(user!.id).toBe(VALID_USER.id); + expect(user!.email).toBe(VALID_USER.email); + + const successCalls = metrics.incCalls.filter( + (c) => c.name === 'processor_live_auth_attempts_total' && c.labels?.['result'] === 'success', + ); + expect(successCalls).toHaveLength(1); + }); + + it('returns null and increments unauthorized counter on 401', async () => { + globalThis.fetch = makeStatusFetch(401); + const client = createAuthClient(makeConfig(), logger, metrics); + const user = await client.validate('session=bad'); + + expect(user).toBeNull(); + const unauthorizedCalls = metrics.incCalls.filter( + (c) => c.name === 'processor_live_auth_attempts_total' && c.labels?.['result'] === 'unauthorized', + ); + expect(unauthorizedCalls).toHaveLength(1); + }); + + it('returns null and increments unauthorized counter on 403', async () => { + globalThis.fetch = makeStatusFetch(403); + const client = createAuthClient(makeConfig(), logger, metrics); + const user = await client.validate('session=forbidden'); + + expect(user).toBeNull(); + const unauthorizedCalls = metrics.incCalls.filter( + (c) => c.name === 'processor_live_auth_attempts_total' && c.labels?.['result'] === 'unauthorized', + ); + expect(unauthorizedCalls).toHaveLength(1); + }); + + it('returns null and increments error counter on 500', async () => { + globalThis.fetch = makeStatusFetch(500); + const client = createAuthClient(makeConfig(), logger, metrics); + const user = await client.validate('session=boom'); + + expect(user).toBeNull(); + const errorCalls = metrics.incCalls.filter( + (c) => c.name === 'processor_live_auth_attempts_total' && c.labels?.['result'] === 'error', + ); + expect(errorCalls).toHaveLength(1); + }); + + it('returns null and increments error counter when fetch throws a network error', async () => { + globalThis.fetch = vi.fn().mockRejectedValue(new Error('ECONNREFUSED')); + const client = createAuthClient(makeConfig(), logger, metrics); + const user = await client.validate('session=abc'); + + expect(user).toBeNull(); + const errorCalls = metrics.incCalls.filter( + (c) => c.name === 'processor_live_auth_attempts_total' && c.labels?.['result'] === 'error', + ); + expect(errorCalls).toHaveLength(1); + }); + + it('returns null when fetch is aborted (simulated timeout)', async () => { + const abortErr = new DOMException('The operation was aborted', 'AbortError'); + globalThis.fetch = vi.fn().mockRejectedValue(abortErr); + const client = createAuthClient(makeConfig({ DIRECTUS_AUTH_TIMEOUT_MS: 50 }), logger, metrics); + const user = await client.validate('session=slow'); + + expect(user).toBeNull(); + const errorCalls = metrics.incCalls.filter( + (c) => c.name === 'processor_live_auth_attempts_total' && c.labels?.['result'] === 'error', + ); + expect(errorCalls).toHaveLength(1); + }); + + it('returns null and increments error counter when response body is missing data field', async () => { + globalThis.fetch = vi.fn().mockResolvedValue({ + status: 200, + ok: true, + json: () => Promise.resolve({}), // no `data` key at all + } as unknown as Response); + const client = createAuthClient(makeConfig(), logger, metrics); + const user = await client.validate('session=weird'); + + expect(user).toBeNull(); + const errorCalls = metrics.incCalls.filter( + (c) => c.name === 'processor_live_auth_attempts_total' && c.labels?.['result'] === 'error', + ); + expect(errorCalls).toHaveLength(1); + }); + + it('returns null and increments unauthorized counter when data is null (expired session)', async () => { + globalThis.fetch = makeOkFetch(null); + const client = createAuthClient(makeConfig(), logger, metrics); + const user = await client.validate('session=expired'); + + expect(user).toBeNull(); + const unauthorizedCalls = metrics.incCalls.filter( + (c) => c.name === 'processor_live_auth_attempts_total' && c.labels?.['result'] === 'unauthorized', + ); + expect(unauthorizedCalls).toHaveLength(1); + }); + + it('returns null and increments error counter when user object is missing id', async () => { + globalThis.fetch = makeOkFetch({ email: 'noId@example.com', role: null }); + const client = createAuthClient(makeConfig(), logger, metrics); + const user = await client.validate('session=noid'); + + expect(user).toBeNull(); + const errorCalls = metrics.incCalls.filter( + (c) => c.name === 'processor_live_auth_attempts_total' && c.labels?.['result'] === 'error', + ); + expect(errorCalls).toHaveLength(1); + }); + + it('returns null immediately for an empty cookie header without making a fetch call', async () => { + const mockFetch = vi.fn(); + globalThis.fetch = mockFetch; + const client = createAuthClient(makeConfig(), logger, metrics); + const user = await client.validate(''); + + expect(user).toBeNull(); + expect(mockFetch).not.toHaveBeenCalled(); + }); + + it('observes auth latency on a successful call', async () => { + globalThis.fetch = makeOkFetch(VALID_USER); + const client = createAuthClient(makeConfig(), logger, metrics); + await client.validate('session=ok'); + + const latencyCalls = metrics.observeCalls.filter( + (c) => c.name === 'processor_live_auth_latency_ms', + ); + expect(latencyCalls.length).toBeGreaterThanOrEqual(1); + expect(latencyCalls[0]!.value).toBeGreaterThanOrEqual(0); + }); +});