12 KiB
Task 1.5.3 — Subscription registry & per-event authorization
Phase: 1.5 — Live broadcast
Status: ⬜ Not started
Depends on: 1.5.2
Wiki refs: docs/wiki/synthesis/processor-ws-contract.md §Subscription model; docs/wiki/concepts/live-channel-architecture.md §Authorization flow; docs/wiki/synthesis/directus-schema-draft.md
Goal
Handle subscribe / unsubscribe messages: validate the topic format, authorize the user against the topic's organization, maintain in-memory bidirectional indexes (connection → topics, topic → connections), and emit the appropriate subscribed / unsubscribed / error responses. Authorization is a single Directus call per subscription; no per-message auth.
After this task, a connected client can subscribe to an event they have permission for, get an immediate subscribed response, and the registry knows which connections want updates for which event. The actual fan-out and snapshot land in 1.5.4 and 1.5.5 respectively — this task just owns the bookkeeping.
Deliverables
src/live/registry.tsexporting:createSubscriptionRegistry(authzClient, logger, metrics): SubscriptionRegistry— factory.SubscriptionRegistryinterface:interface SubscriptionRegistry { subscribe(conn: LiveConnection, topic: string, correlationId?: string): Promise<void>; unsubscribe(conn: LiveConnection, topic: string, correlationId?: string): Promise<void>; onConnectionClose(conn: LiveConnection): void; // remove from all topics connectionsForTopic(topic: string): Iterable<LiveConnection>; // used by 1.5.4 fan-out topicsForConnection(conn: LiveConnection): Iterable<string>; stats(): { connections: number; topics: number; subscriptions: number }; }- Topic format validator:
event:<uuid>is the only accepted shape in v1; anything else returnserror/unknown-topic.
src/live/authz.tsexporting:createAuthzClient(config, logger): AuthzClient— factory.AuthzClient.canAccessEvent(user: AuthenticatedUser, eventId: string): Promise<AuthzResult>—{ allowed: true } | { allowed: false; reason: 'forbidden' | 'not-found' | 'error' }.
src/live/server.tsupdated: theonMessageplaceholder from 1.5.1 is replaced with a real router that dispatchessubscribe/unsubscribeto the registry, callsregistry.onConnectionClosein the'close'event handler.- New Prometheus metrics:
processor_live_subscriptions{instance_id}(gauge) — current total subscriptions.processor_live_subscribe_attempts_total{result}—success/forbidden/not-found/unknown-topic/error.processor_live_authz_latency_ms(histogram).
test/live-registry.test.ts:- Subscribe to
event:<uuid>with a permitted user →subscribedreply, registry counts go up. - Subscribe to
event:<uuid>with a forbidden user →error/forbiddenreply, no registry change. - Subscribe to
device:<imei>→error/unknown-topic, no registry change. - Subscribe twice to the same topic → idempotent (single subscription, single
subscribedreply on each call). - Unsubscribe from a topic the connection isn't subscribed to →
unsubscribedreply (idempotent), no error. - Connection close removes all subscriptions; gauges decrement correctly.
- Subscribe to
test/live-authz.test.ts:canAccessEventreturnsallowed: truewhen/items/events/<id>returns 200 (Directus enforces RLS via the cookie; if Directus says yes, we say yes).- Returns
allowed: false, reason: 'forbidden'on 403. - Returns
allowed: false, reason: 'not-found'on 404. - Returns
allowed: false, reason: 'error'on network failure or 5xx (does not throw).
Specification
Topic parsing
const EventTopicRegex = /^event:([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})$/i;
function parseTopic(topic: string): { kind: 'event'; eventId: string } | null {
const m = EventTopicRegex.exec(topic);
if (m) return { kind: 'event', eventId: m[1] };
return null; // unknown topic shape
}
Future shapes (device:<imei>, entry:<uuid>, org:<uuid>) get added here when they're needed. The unknown-topic path returns a clear error rather than silently failing — clients always know if they typed a topic the server doesn't understand.
Authorization model
The simplest correct authorization: delegate to Directus's REST API with the user's cookie. If GET /items/events/<eventId> returns 200, the user has access (Directus's RLS already does the org-membership check). If 403, they don't.
async function canAccessEvent(user: AuthenticatedUser, eventId: string): Promise<AuthzResult> {
const start = performance.now();
try {
const res = await fetch(`${config.DIRECTUS_BASE_URL}/items/events/${eventId}?fields=id`, {
method: 'GET',
headers: { cookie: user.cookieHeader }, // see "Carrying the cookie" below
signal: AbortSignal.timeout(config.DIRECTUS_AUTHZ_TIMEOUT_MS ?? 5_000),
});
if (res.status === 200) return { allowed: true };
if (res.status === 403) return { allowed: false, reason: 'forbidden' };
if (res.status === 404) return { allowed: false, reason: 'not-found' };
return { allowed: false, reason: 'error' };
} catch {
return { allowed: false, reason: 'error' };
} finally {
metrics.authzLatency.observe(performance.now() - start);
}
}
Field projection (?fields=id) keeps the response tiny — we don't need the event details, just the access verdict.
Carrying the cookie
The auth handshake (1.5.2) validated the cookie and discarded it. For per-subscription Directus calls we need the original cookie header. Two options:
Option A: Stash on the connection. When 1.5.2 succeeds, save cookieHeader on LiveConnection. Trade-off: cookie material lives in process memory for the connection's lifetime.
Option B: Re-fetch via service account. The Processor has its own credentials; at subscribe time, query as that service account with the user id as a filter. Trade-off: more complex, requires the Processor to have a Directus account with read access to all events.
Pick Option A. Simpler, more honest (the user's own permissions are the source of truth for authorization), and the cookie is already on this server — we received it at upgrade. Memory cost is negligible (a cookie header is typically 100–500 bytes). Document that LiveConnection holds sensitive material and don't log it.
Update LiveConnection in server.ts:
export type LiveConnection = {
id: string;
ws: WebSocket;
remoteAddr: string;
openedAt: Date;
lastSeenAt: Date;
user: AuthenticatedUser;
cookieHeader: string; // ← added
};
And update 1.5.2's upgrade handler to pass the cookie through.
Registry data structures
const connectionTopics = new WeakMap<LiveConnection, Set<string>>(); // conn → topics
const topicConnections = new Map<string, Set<LiveConnection>>(); // topic → conns
WeakMap for connectionTopics lets garbage collection clean up if a connection somehow leaks the explicit onConnectionClose call. Set semantics give idempotent subscribe/unsubscribe for free.
Subscribe flow
async function subscribe(conn: LiveConnection, topic: string, correlationId?: string) {
const parsed = parseTopic(topic);
if (!parsed) {
sendOutbound(conn, { type: 'error', topic, id: correlationId, code: 'unknown-topic', message: 'Unknown topic format' });
metrics.subscribeAttempts.inc({ result: 'unknown-topic' });
return;
}
// Idempotent: already subscribed?
const existing = connectionTopics.get(conn);
if (existing?.has(topic)) {
// Re-send subscribed (snapshot will be fetched freshly in 1.5.5).
sendOutbound(conn, { type: 'subscribed', topic, id: correlationId, snapshot: [] });
return;
}
const verdict = await authzClient.canAccessEvent(conn.user, parsed.eventId);
if (!verdict.allowed) {
sendOutbound(conn, { type: 'error', topic, id: correlationId, code: verdict.reason });
metrics.subscribeAttempts.inc({ result: verdict.reason });
return;
}
// Insert into both indexes.
if (!existing) connectionTopics.set(conn, new Set());
connectionTopics.get(conn)!.add(topic);
if (!topicConnections.has(topic)) topicConnections.set(topic, new Set());
topicConnections.get(topic)!.add(conn);
metrics.subscriptions.inc();
metrics.subscribeAttempts.inc({ result: 'success' });
// 1.5.5 fills in the snapshot. For now, empty array.
sendOutbound(conn, { type: 'subscribed', topic, id: correlationId, snapshot: [] });
}
Unsubscribe flow
async function unsubscribe(conn: LiveConnection, topic: string, correlationId?: string) {
connectionTopics.get(conn)?.delete(topic);
const conns = topicConnections.get(topic);
if (conns) {
conns.delete(conn);
if (conns.size === 0) topicConnections.delete(topic);
}
metrics.subscriptions.dec();
// Always reply, even if not subscribed (idempotent).
sendOutbound(conn, { type: 'unsubscribed', topic, id: correlationId });
}
onConnectionClose
function onConnectionClose(conn: LiveConnection) {
const topics = connectionTopics.get(conn);
if (!topics) return;
for (const topic of topics) {
const conns = topicConnections.get(topic);
if (conns) {
conns.delete(conn);
if (conns.size === 0) topicConnections.delete(topic);
}
metrics.subscriptions.dec();
}
connectionTopics.delete(conn);
}
Hooked into the ws.on('close', ...) handler in server.ts.
Acceptance criteria
pnpm typecheck,pnpm lint,pnpm testclean.wscatflow: connect with a valid cookie →{"type":"subscribe","topic":"event:<existing-event-id>"}→{"type":"subscribed","topic":"event:<id>","snapshot":[]}.- Forbidden flow: same client subscribing to an event in a different org →
{"type":"error","code":"forbidden"}. - Unknown topic flow:
{"type":"subscribe","topic":"foo:bar"}→{"type":"error","code":"unknown-topic"}. - Unsubscribe flow: client gets
unsubscribedreply; gaugeprocessor_live_subscriptionsdecrements. - Disconnect cleans up:
processor_live_subscriptionsreturns to its pre-connection level after the client disconnects. - Idempotency: subscribing twice to the same topic doesn't double-count in
processor_live_subscriptions.
Risks / open questions
- Authz latency budget. Each subscribe is one Directus call. At race-start with hundreds of viewers subscribing simultaneously, that's a thundering herd. Pilot scale (≤20 viewers per event) is fine. If we ever see a herd: cache
(userId, eventId) → verdictfor 60s with manual invalidation hooks. Defer until measured. - What if the user is removed from the org mid-subscription? Their existing subscriptions keep delivering until they disconnect. Phase 4 hardening can add periodic re-checks. For pilot, "trust the session" is fine.
- Filter subscriptions to the user's own entries vs all in-event? Race directors want to see everyone; participants might want to see only their own crew. Current spec is "everyone in the event" — Phase 4 permissions can refine. Document that v1 is open within an event.
- Wildcard topics. Not in scope. If we ever need it, the topic parser is the place to add
event:*→ "every event in the user's orgs."
Done
Landed in 38de4bc. Key deviations from spec:
canAccessEventsignature is(cookieHeader: string, eventId: string)rather than(user: AuthenticatedUser, eventId: string)becauseAuthenticatedUserdoesn't carry the cookie — the cookie lives onLiveConnection.cookieHeader. The call site passesconn.cookieHeaderdirectly.- Added
'error'toErrorCodeinprotocol.tsto handle transient authz failures; the spec omitted this case. SnapshotProviderinterface is defined inregistry.tsand defaults to a stub returning[]; task 1.5.5 injects the real implementation via the optional parameter.- Snapshot fetching is integrated into the subscribe flow already (calls
fetchSnapshoton success), making task 1.5.5 a pure injection of a better provider rather than a structural change to the subscribe flow.