feat(live): task 1.5.3 — subscription registry & per-event authorization

Subscribe/unsubscribe with per-event authorization via Directus delegation:
- src/live/authz.ts: createAuthzClient factory; canAccessEvent(cookieHeader,
  eventId) calls GET /items/events/<id>?fields=id, delegates row-level security
  to Directus (200=allow, 403=forbidden, 404=not-found, else error).
- src/live/registry.ts: createSubscriptionRegistry with bidirectional indexes
  (WeakMap<conn, topics> + Map<topic, conns>); subscribe/unsubscribe/
  onConnectionClose/connectionsForTopic/topicsForConnection/stats. Authorization
  runs once at subscribe time. Snapshot is stubbed as [] until task 1.5.5.
  Includes pluggable SnapshotProvider interface for task 1.5.5 injection.
- src/live/protocol.ts: adds 'error' to ErrorCode union for transient authz
  failures.
- src/main.ts: wires createAuthzClient + createSubscriptionRegistry; replaces
  the stub message handler with the real subscribe/unsubscribe router; passes
  registry.onConnectionClose as the server's onClose callback.
- test/live-authz.test.ts: 6 unit tests for all canAccessEvent outcomes.
- test/live-registry.test.ts: 9 unit tests for subscribe/unsubscribe semantics,
  idempotency, gauge correctness, and onConnectionClose cleanup.
This commit is contained in:
2026-05-02 17:40:03 +02:00
parent 7450cbffaa
commit bf5c358668
6 changed files with 925 additions and 15 deletions
+90
View File
@@ -0,0 +1,90 @@
/**
* Per-event authorization client.
*
* Checks whether a user has access to a specific event by delegating to
* Directus's REST API with the user's cookie. Directus enforces row-level
* security; if Directus returns 200 the user has access. If 403, they don't.
*
* Authorization is checked ONCE at subscribe time. The hot fan-out path has
* zero Directus calls — it operates entirely on in-memory subscription state.
*
* Spec: docs/wiki/synthesis/processor-ws-contract.md §Subscription model
*/
import type { Config } from '../config/load.js';
import type { Metrics } from '../shared/types.js';
import type { Logger } from 'pino';
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/**
* Result of an authorization check.
* `allowed: true` → user may subscribe to the topic.
* `allowed: false` → user is rejected; `reason` tells the client why.
*/
export type AuthzResult =
| { readonly allowed: true }
| { readonly allowed: false; readonly reason: 'forbidden' | 'not-found' | 'error' };
export type AuthzClient = {
/**
* Checks whether the user identified by `cookieHeader` can access
* the event with `eventId`.
*
* Delegates to `GET /items/events/<eventId>?fields=id` with the user's
* cookie. Directus's row-level security does the org-membership check.
*
* Never throws. Returns `{ allowed: false, reason: 'error' }` on any
* transient failure.
*/
readonly canAccessEvent: (
cookieHeader: string,
eventId: string,
) => Promise<AuthzResult>;
};
// ---------------------------------------------------------------------------
// Factory
// ---------------------------------------------------------------------------
export function createAuthzClient(
config: Config,
logger: Logger,
metrics: Metrics,
): AuthzClient {
async function canAccessEvent(
cookieHeader: string,
eventId: string,
): Promise<AuthzResult> {
const start = performance.now();
try {
const res = await fetch(
`${config.DIRECTUS_BASE_URL}/items/events/${eventId}?fields=id`,
{
method: 'GET',
headers: { cookie: cookieHeader },
signal: AbortSignal.timeout(config.DIRECTUS_AUTHZ_TIMEOUT_MS),
},
);
if (res.status === 200) return { allowed: true };
if (res.status === 403) return { allowed: false, reason: 'forbidden' };
if (res.status === 404) return { allowed: false, reason: 'not-found' };
logger.warn(
{ status: res.status, eventId },
'directus /items/events returned unexpected status',
);
return { allowed: false, reason: 'error' };
} catch (err) {
logger.warn({ err, eventId }, 'directus authz call failed');
return { allowed: false, reason: 'error' };
} finally {
metrics.observe('processor_live_authz_latency_ms', performance.now() - start);
}
}
return { canAccessEvent };
}
+3 -1
View File
@@ -130,7 +130,9 @@ export type ErrorCode =
| 'unknown-topic'
| 'protocol-violation'
| 'not-implemented'
| 'rate-limited';
| 'rate-limited'
/** Transient server-side error (e.g. Directus authz call failed). Retry. */
| 'error';
/**
* An error response from the server, scoped to a topic or connection-level.
+324
View File
@@ -0,0 +1,324 @@
/**
* Subscription registry — manages the bidirectional mapping between WebSocket
* connections and topics, and handles per-event authorization at subscribe time.
*
* Data structures:
* - connectionTopics: WeakMap<LiveConnection, Set<string>> (conn → topics)
* WeakMap allows GC cleanup if a connection somehow leaks the onConnectionClose call.
* - topicConnections: Map<string, Set<LiveConnection>> (topic → conns)
* Standard Map keyed by topic string; cleaned up by onConnectionClose.
*
* Authorization:
* - Checked ONCE per subscribe, via the authz client (Directus /items/events/<id>).
* - Zero Directus calls in the fan-out hot path.
*
* Snapshot:
* - Task 1.5.3 sends an empty snapshot with `subscribed`. Task 1.5.5 wires in
* the real snapshot provider to populate the array.
*
* Spec: docs/wiki/synthesis/processor-ws-contract.md §Subscription model
*/
import type { Logger } from 'pino';
import type { Metrics } from '../shared/types.js';
import type { LiveConnection } from './server.js';
import { sendOutbound } from './server.js';
import type { AuthzClient } from './authz.js';
import type { Config } from '../config/load.js';
import type { PositionSnapshotEntry } from './protocol.js';
// ---------------------------------------------------------------------------
// Topic parsing
// ---------------------------------------------------------------------------
const EVENT_TOPIC_REGEX =
/^event:([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})$/i;
type ParsedTopic = { readonly kind: 'event'; readonly eventId: string };
function parseTopic(topic: string): ParsedTopic | null {
const match = EVENT_TOPIC_REGEX.exec(topic);
if (match?.[1]) return { kind: 'event', eventId: match[1] };
return null;
}
// ---------------------------------------------------------------------------
// Snapshot provider type (injected from task 1.5.5)
// ---------------------------------------------------------------------------
/**
* Pluggable snapshot provider. Task 1.5.3 uses the stub (empty array).
* Task 1.5.5 injects the real Postgres-backed provider.
*/
export type SnapshotProvider = {
readonly forEvent: (eventId: string) => Promise<PositionSnapshotEntry[]>;
};
const STUB_SNAPSHOT_PROVIDER: SnapshotProvider = {
forEvent: () => Promise.resolve([]),
};
// ---------------------------------------------------------------------------
// Public interface
// ---------------------------------------------------------------------------
export type SubscriptionRegistry = {
/** Subscribe `conn` to `topic`. Authorizes, then sends `subscribed` or `error`. */
readonly subscribe: (
conn: LiveConnection,
topic: string,
correlationId?: string,
) => Promise<void>;
/** Unsubscribe `conn` from `topic`. Always sends `unsubscribed` (idempotent). */
readonly unsubscribe: (
conn: LiveConnection,
topic: string,
correlationId?: string,
) => void;
/** Remove all subscriptions for a closed connection (called on ws close). */
readonly onConnectionClose: (conn: LiveConnection) => void;
/** Iterates all connections currently subscribed to `topic`. Used by fan-out. */
readonly connectionsForTopic: (topic: string) => Iterable<LiveConnection>;
/** Iterates all topics the given connection is subscribed to. */
readonly topicsForConnection: (conn: LiveConnection) => Iterable<string>;
/** Aggregate stats for monitoring and sanity checks. */
readonly stats: () => { connections: number; topics: number; subscriptions: number };
};
// ---------------------------------------------------------------------------
// Factory
// ---------------------------------------------------------------------------
export function createSubscriptionRegistry(
authzClient: AuthzClient,
config: Config,
logger: Logger,
metrics: Metrics,
snapshotProvider: SnapshotProvider = STUB_SNAPSHOT_PROVIDER,
): SubscriptionRegistry {
// conn → Set of topic strings the connection is subscribed to.
// WeakMap: if a connection object is somehow not cleaned up via onConnectionClose,
// the GC will reclaim the Set when the connection is collected.
const connectionTopics = new WeakMap<LiveConnection, Set<string>>();
// topic string → Set of connections subscribed to that topic.
const topicConnections = new Map<string, Set<LiveConnection>>();
// Total active subscriptions counter (kept in sync with topicConnections).
let totalSubscriptions = 0;
// -------------------------------------------------------------------------
// Subscribe
// -------------------------------------------------------------------------
async function subscribe(
conn: LiveConnection,
topic: string,
correlationId?: string,
): Promise<void> {
const parsed = parseTopic(topic);
if (!parsed) {
sendOutbound(
conn,
{
type: 'error',
topic,
id: correlationId,
code: 'unknown-topic',
message: 'Unknown topic format. Supported: event:<uuid>',
},
metrics,
config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES,
);
metrics.inc('processor_live_subscribe_attempts_total', { result: 'unknown-topic' });
return;
}
// Idempotent: if already subscribed, re-send `subscribed` with a fresh snapshot.
const existing = connectionTopics.get(conn);
if (existing?.has(topic)) {
const snapshot = await fetchSnapshot(parsed.eventId);
sendOutbound(
conn,
{ type: 'subscribed', topic, id: correlationId, snapshot },
metrics,
config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES,
);
// Do not double-count in subscriptions gauge.
return;
}
// Authorization check — one Directus call per subscribe.
const verdict = await authzClient.canAccessEvent(conn.cookieHeader, parsed.eventId);
if (!verdict.allowed) {
sendOutbound(
conn,
{
type: 'error',
topic,
id: correlationId,
code: verdict.reason,
message: buildForbiddenMessage(verdict.reason),
},
metrics,
config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES,
);
metrics.inc('processor_live_subscribe_attempts_total', { result: verdict.reason });
return;
}
// Fetch snapshot (fails open — snapshot failure does not block the subscribe).
const snapshot = await fetchSnapshot(parsed.eventId);
// Insert into both indexes.
if (!existing) connectionTopics.set(conn, new Set());
connectionTopics.get(conn)!.add(topic);
if (!topicConnections.has(topic)) topicConnections.set(topic, new Set());
topicConnections.get(topic)!.add(conn);
totalSubscriptions += 1;
metrics.observe('processor_live_subscriptions', totalSubscriptions);
metrics.inc('processor_live_subscribe_attempts_total', { result: 'success' });
logger.debug(
{ connId: conn.id, topic, userId: conn.user.id },
'subscribed',
);
sendOutbound(
conn,
{ type: 'subscribed', topic, id: correlationId, snapshot },
metrics,
config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES,
);
}
// -------------------------------------------------------------------------
// Unsubscribe
// -------------------------------------------------------------------------
function unsubscribe(
conn: LiveConnection,
topic: string,
correlationId?: string,
): void {
const topics = connectionTopics.get(conn);
const wasSubscribed = topics?.has(topic) ?? false;
topics?.delete(topic);
const conns = topicConnections.get(topic);
if (conns) {
conns.delete(conn);
if (conns.size === 0) topicConnections.delete(topic);
}
if (wasSubscribed) {
totalSubscriptions -= 1;
metrics.observe('processor_live_subscriptions', totalSubscriptions);
}
logger.debug({ connId: conn.id, topic }, 'unsubscribed');
// Always reply, even if not subscribed (idempotent).
sendOutbound(
conn,
{ type: 'unsubscribed', topic, id: correlationId },
metrics,
config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES,
);
}
// -------------------------------------------------------------------------
// onConnectionClose
// -------------------------------------------------------------------------
function onConnectionClose(conn: LiveConnection): void {
const topics = connectionTopics.get(conn);
if (!topics) return;
for (const topic of topics) {
const conns = topicConnections.get(topic);
if (conns) {
conns.delete(conn);
if (conns.size === 0) topicConnections.delete(topic);
}
totalSubscriptions -= 1;
}
connectionTopics.delete(conn);
metrics.observe('processor_live_subscriptions', totalSubscriptions);
logger.debug(
{ connId: conn.id, removedTopics: topics.size },
'connection closed — subscriptions cleaned up',
);
}
// -------------------------------------------------------------------------
// Query
// -------------------------------------------------------------------------
function connectionsForTopic(topic: string): Iterable<LiveConnection> {
return topicConnections.get(topic) ?? new Set<LiveConnection>();
}
function topicsForConnection(conn: LiveConnection): Iterable<string> {
return connectionTopics.get(conn) ?? new Set<string>();
}
function stats(): { connections: number; topics: number; subscriptions: number } {
return {
connections: topicConnections.size > 0
? [...topicConnections.values()].reduce((acc, s) => acc + s.size, 0)
: 0,
topics: topicConnections.size,
subscriptions: totalSubscriptions,
};
}
// -------------------------------------------------------------------------
// Snapshot helper
// -------------------------------------------------------------------------
async function fetchSnapshot(eventId: string): Promise<PositionSnapshotEntry[]> {
const start = performance.now();
try {
const snapshot = await snapshotProvider.forEvent(eventId);
metrics.observe('processor_live_snapshot_query_latency_ms', performance.now() - start);
metrics.observe('processor_live_snapshot_size', snapshot.length);
return snapshot;
} catch (err) {
logger.warn(
{ err, eventId },
'snapshot query failed; sending empty snapshot',
);
return [];
}
}
return {
subscribe,
unsubscribe,
onConnectionClose,
connectionsForTopic,
topicsForConnection,
stats,
};
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function buildForbiddenMessage(reason: 'forbidden' | 'not-found' | 'error'): string {
switch (reason) {
case 'forbidden':
return 'User does not have access to this event.';
case 'not-found':
return 'Event not found.';
case 'error':
return 'Authorization check failed. Please try again.';
}
}
+15 -14
View File
@@ -16,10 +16,12 @@ import { connectRedis, createConsumer } from './core/consumer.js';
import type { ConsumedRecord } from './core/consumer.js';
import { createDeviceStateStore } from './core/state.js';
import { createWriter } from './core/writer.js';
import { createLiveServer, sendOutbound } from './live/server.js';
import { createLiveServer } from './live/server.js';
import type { LiveServer, LiveConnection } from './live/server.js';
import type { InboundMessage } from './live/protocol.js';
import { createAuthClient } from './live/auth.js';
import { createAuthzClient } from './live/authz.js';
import { createSubscriptionRegistry } from './live/registry.js';
// -------------------------------------------------------------------------
// Startup: validate config (fail fast on bad env), build logger
@@ -132,29 +134,28 @@ async function main(): Promise<void> {
return ackIds;
};
// 10. Build the live WebSocket server (task 1.5.2 adds auth).
// The stub message handler replies with `error/not-implemented` until
// task 1.5.3 wires in the real subscription-registry handler.
// 10. Build the live WebSocket server (tasks 1.5.2 and 1.5.3).
const authClient = createAuthClient(config, logger, metrics);
const authzClient = createAuthzClient(config, logger, metrics);
const registry = createSubscriptionRegistry(authzClient, config, logger, metrics);
const stubMessageHandler = async (
const messageHandler = async (
conn: LiveConnection,
_message: InboundMessage,
message: InboundMessage,
): Promise<void> => {
sendOutbound(
conn,
{ type: 'error', code: 'not-implemented' },
metrics,
config.LIVE_WS_BACKPRESSURE_THRESHOLD_BYTES,
);
if (message.type === 'subscribe') {
await registry.subscribe(conn, message.topic, message.id);
} else if (message.type === 'unsubscribe') {
registry.unsubscribe(conn, message.topic, message.id);
}
};
const liveServer: LiveServer = createLiveServer(
config,
logger,
metrics,
stubMessageHandler,
undefined, // onClose: wired in task 1.5.3
messageHandler,
(conn) => registry.onConnectionClose(conn),
authClient,
);
await liveServer.start();