231 lines
12 KiB
Markdown
231 lines
12 KiB
Markdown
# Task 1.5.3 — Subscription registry & per-event authorization
|
||
|
||
**Phase:** 1.5 — Live broadcast
|
||
**Status:** ⬜ Not started
|
||
**Depends on:** 1.5.2
|
||
**Wiki refs:** `docs/wiki/synthesis/processor-ws-contract.md` §Subscription model; `docs/wiki/concepts/live-channel-architecture.md` §Authorization flow; `docs/wiki/synthesis/directus-schema-draft.md`
|
||
|
||
## Goal
|
||
|
||
Handle `subscribe` / `unsubscribe` messages: validate the topic format, authorize the user against the topic's organization, maintain in-memory bidirectional indexes (`connection → topics`, `topic → connections`), and emit the appropriate `subscribed` / `unsubscribed` / `error` responses. Authorization is a single Directus call per subscription; no per-message auth.
|
||
|
||
After this task, a connected client can `subscribe` to an event they have permission for, get an immediate `subscribed` response, and the registry knows which connections want updates for which event. The actual fan-out and snapshot land in 1.5.4 and 1.5.5 respectively — this task just owns the bookkeeping.
|
||
|
||
## Deliverables
|
||
|
||
- `src/live/registry.ts` exporting:
|
||
- `createSubscriptionRegistry(authzClient, logger, metrics): SubscriptionRegistry` — factory.
|
||
- `SubscriptionRegistry` interface:
|
||
```ts
|
||
interface SubscriptionRegistry {
|
||
subscribe(conn: LiveConnection, topic: string, correlationId?: string): Promise<void>;
|
||
unsubscribe(conn: LiveConnection, topic: string, correlationId?: string): Promise<void>;
|
||
onConnectionClose(conn: LiveConnection): void; // remove from all topics
|
||
connectionsForTopic(topic: string): Iterable<LiveConnection>; // used by 1.5.4 fan-out
|
||
topicsForConnection(conn: LiveConnection): Iterable<string>;
|
||
stats(): { connections: number; topics: number; subscriptions: number };
|
||
}
|
||
```
|
||
- Topic format validator: `event:<uuid>` is the only accepted shape in v1; anything else returns `error/unknown-topic`.
|
||
- `src/live/authz.ts` exporting:
|
||
- `createAuthzClient(config, logger): AuthzClient` — factory.
|
||
- `AuthzClient.canAccessEvent(user: AuthenticatedUser, eventId: string): Promise<AuthzResult>` — `{ allowed: true } | { allowed: false; reason: 'forbidden' | 'not-found' | 'error' }`.
|
||
- `src/live/server.ts` updated: the `onMessage` placeholder from 1.5.1 is replaced with a real router that dispatches `subscribe` / `unsubscribe` to the registry, calls `registry.onConnectionClose` in the `'close'` event handler.
|
||
- New Prometheus metrics:
|
||
- `processor_live_subscriptions{instance_id}` (gauge) — current total subscriptions.
|
||
- `processor_live_subscribe_attempts_total{result}` — `success` / `forbidden` / `not-found` / `unknown-topic` / `error`.
|
||
- `processor_live_authz_latency_ms` (histogram).
|
||
- `test/live-registry.test.ts`:
|
||
- Subscribe to `event:<uuid>` with a permitted user → `subscribed` reply, registry counts go up.
|
||
- Subscribe to `event:<uuid>` with a forbidden user → `error/forbidden` reply, no registry change.
|
||
- Subscribe to `device:<imei>` → `error/unknown-topic`, no registry change.
|
||
- Subscribe twice to the same topic → idempotent (single subscription, single `subscribed` reply on each call).
|
||
- Unsubscribe from a topic the connection isn't subscribed to → `unsubscribed` reply (idempotent), no error.
|
||
- Connection close removes all subscriptions; gauges decrement correctly.
|
||
- `test/live-authz.test.ts`:
|
||
- `canAccessEvent` returns `allowed: true` when `/items/events/<id>` returns 200 (Directus enforces RLS via the cookie; if Directus says yes, we say yes).
|
||
- Returns `allowed: false, reason: 'forbidden'` on 403.
|
||
- Returns `allowed: false, reason: 'not-found'` on 404.
|
||
- Returns `allowed: false, reason: 'error'` on network failure or 5xx (does not throw).
|
||
|
||
## Specification
|
||
|
||
### Topic parsing
|
||
|
||
```ts
|
||
const EventTopicRegex = /^event:([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})$/i;
|
||
|
||
function parseTopic(topic: string): { kind: 'event'; eventId: string } | null {
|
||
const m = EventTopicRegex.exec(topic);
|
||
if (m) return { kind: 'event', eventId: m[1] };
|
||
return null; // unknown topic shape
|
||
}
|
||
```
|
||
|
||
Future shapes (`device:<imei>`, `entry:<uuid>`, `org:<uuid>`) get added here when they're needed. The unknown-topic path returns a clear error rather than silently failing — clients always know if they typed a topic the server doesn't understand.
|
||
|
||
### Authorization model
|
||
|
||
The simplest correct authorization: **delegate to Directus's REST API with the user's cookie**. If `GET /items/events/<eventId>` returns 200, the user has access (Directus's RLS already does the org-membership check). If 403, they don't.
|
||
|
||
```ts
|
||
async function canAccessEvent(user: AuthenticatedUser, eventId: string): Promise<AuthzResult> {
|
||
const start = performance.now();
|
||
try {
|
||
const res = await fetch(`${config.DIRECTUS_BASE_URL}/items/events/${eventId}?fields=id`, {
|
||
method: 'GET',
|
||
headers: { cookie: user.cookieHeader }, // see "Carrying the cookie" below
|
||
signal: AbortSignal.timeout(config.DIRECTUS_AUTHZ_TIMEOUT_MS ?? 5_000),
|
||
});
|
||
|
||
if (res.status === 200) return { allowed: true };
|
||
if (res.status === 403) return { allowed: false, reason: 'forbidden' };
|
||
if (res.status === 404) return { allowed: false, reason: 'not-found' };
|
||
return { allowed: false, reason: 'error' };
|
||
} catch {
|
||
return { allowed: false, reason: 'error' };
|
||
} finally {
|
||
metrics.authzLatency.observe(performance.now() - start);
|
||
}
|
||
}
|
||
```
|
||
|
||
**Field projection** (`?fields=id`) keeps the response tiny — we don't need the event details, just the access verdict.
|
||
|
||
### Carrying the cookie
|
||
|
||
The auth handshake (1.5.2) validated the cookie and discarded it. For per-subscription Directus calls we need the original cookie header. Two options:
|
||
|
||
**Option A: Stash on the connection.** When 1.5.2 succeeds, save `cookieHeader` on `LiveConnection`. Trade-off: cookie material lives in process memory for the connection's lifetime.
|
||
|
||
**Option B: Re-fetch via service account.** The Processor has its own credentials; at subscribe time, query as that service account with the user id as a filter. Trade-off: more complex, requires the Processor to have a Directus account with read access to all events.
|
||
|
||
**Pick Option A.** Simpler, more honest (the user's own permissions are the source of truth for authorization), and the cookie is already on this server — we received it at upgrade. Memory cost is negligible (a cookie header is typically 100–500 bytes). Document that `LiveConnection` holds sensitive material and don't log it.
|
||
|
||
Update `LiveConnection` in `server.ts`:
|
||
|
||
```ts
|
||
export type LiveConnection = {
|
||
id: string;
|
||
ws: WebSocket;
|
||
remoteAddr: string;
|
||
openedAt: Date;
|
||
lastSeenAt: Date;
|
||
user: AuthenticatedUser;
|
||
cookieHeader: string; // ← added
|
||
};
|
||
```
|
||
|
||
And update 1.5.2's upgrade handler to pass the cookie through.
|
||
|
||
### Registry data structures
|
||
|
||
```ts
|
||
const connectionTopics = new WeakMap<LiveConnection, Set<string>>(); // conn → topics
|
||
const topicConnections = new Map<string, Set<LiveConnection>>(); // topic → conns
|
||
```
|
||
|
||
`WeakMap` for `connectionTopics` lets garbage collection clean up if a connection somehow leaks the explicit `onConnectionClose` call. `Set` semantics give idempotent subscribe/unsubscribe for free.
|
||
|
||
### Subscribe flow
|
||
|
||
```ts
|
||
async function subscribe(conn: LiveConnection, topic: string, correlationId?: string) {
|
||
const parsed = parseTopic(topic);
|
||
if (!parsed) {
|
||
sendOutbound(conn, { type: 'error', topic, id: correlationId, code: 'unknown-topic', message: 'Unknown topic format' });
|
||
metrics.subscribeAttempts.inc({ result: 'unknown-topic' });
|
||
return;
|
||
}
|
||
|
||
// Idempotent: already subscribed?
|
||
const existing = connectionTopics.get(conn);
|
||
if (existing?.has(topic)) {
|
||
// Re-send subscribed (snapshot will be fetched freshly in 1.5.5).
|
||
sendOutbound(conn, { type: 'subscribed', topic, id: correlationId, snapshot: [] });
|
||
return;
|
||
}
|
||
|
||
const verdict = await authzClient.canAccessEvent(conn.user, parsed.eventId);
|
||
if (!verdict.allowed) {
|
||
sendOutbound(conn, { type: 'error', topic, id: correlationId, code: verdict.reason });
|
||
metrics.subscribeAttempts.inc({ result: verdict.reason });
|
||
return;
|
||
}
|
||
|
||
// Insert into both indexes.
|
||
if (!existing) connectionTopics.set(conn, new Set());
|
||
connectionTopics.get(conn)!.add(topic);
|
||
|
||
if (!topicConnections.has(topic)) topicConnections.set(topic, new Set());
|
||
topicConnections.get(topic)!.add(conn);
|
||
|
||
metrics.subscriptions.inc();
|
||
metrics.subscribeAttempts.inc({ result: 'success' });
|
||
|
||
// 1.5.5 fills in the snapshot. For now, empty array.
|
||
sendOutbound(conn, { type: 'subscribed', topic, id: correlationId, snapshot: [] });
|
||
}
|
||
```
|
||
|
||
### Unsubscribe flow
|
||
|
||
```ts
|
||
async function unsubscribe(conn: LiveConnection, topic: string, correlationId?: string) {
|
||
connectionTopics.get(conn)?.delete(topic);
|
||
const conns = topicConnections.get(topic);
|
||
if (conns) {
|
||
conns.delete(conn);
|
||
if (conns.size === 0) topicConnections.delete(topic);
|
||
}
|
||
metrics.subscriptions.dec();
|
||
// Always reply, even if not subscribed (idempotent).
|
||
sendOutbound(conn, { type: 'unsubscribed', topic, id: correlationId });
|
||
}
|
||
```
|
||
|
||
### `onConnectionClose`
|
||
|
||
```ts
|
||
function onConnectionClose(conn: LiveConnection) {
|
||
const topics = connectionTopics.get(conn);
|
||
if (!topics) return;
|
||
for (const topic of topics) {
|
||
const conns = topicConnections.get(topic);
|
||
if (conns) {
|
||
conns.delete(conn);
|
||
if (conns.size === 0) topicConnections.delete(topic);
|
||
}
|
||
metrics.subscriptions.dec();
|
||
}
|
||
connectionTopics.delete(conn);
|
||
}
|
||
```
|
||
|
||
Hooked into the `ws.on('close', ...)` handler in `server.ts`.
|
||
|
||
## Acceptance criteria
|
||
|
||
- [ ] `pnpm typecheck`, `pnpm lint`, `pnpm test` clean.
|
||
- [ ] `wscat` flow: connect with a valid cookie → `{"type":"subscribe","topic":"event:<existing-event-id>"}` → `{"type":"subscribed","topic":"event:<id>","snapshot":[]}`.
|
||
- [ ] Forbidden flow: same client subscribing to an event in a different org → `{"type":"error","code":"forbidden"}`.
|
||
- [ ] Unknown topic flow: `{"type":"subscribe","topic":"foo:bar"}` → `{"type":"error","code":"unknown-topic"}`.
|
||
- [ ] Unsubscribe flow: client gets `unsubscribed` reply; gauge `processor_live_subscriptions` decrements.
|
||
- [ ] Disconnect cleans up: `processor_live_subscriptions` returns to its pre-connection level after the client disconnects.
|
||
- [ ] Idempotency: subscribing twice to the same topic doesn't double-count in `processor_live_subscriptions`.
|
||
|
||
## Risks / open questions
|
||
|
||
- **Authz latency budget.** Each subscribe is one Directus call. At race-start with hundreds of viewers subscribing simultaneously, that's a thundering herd. Pilot scale (≤20 viewers per event) is fine. If we ever see a herd: cache `(userId, eventId) → verdict` for 60s with manual invalidation hooks. Defer until measured.
|
||
- **What if the user is removed from the org mid-subscription?** Their existing subscriptions keep delivering until they disconnect. Phase 4 hardening can add periodic re-checks. For pilot, "trust the session" is fine.
|
||
- **Filter subscriptions to the user's own entries vs all in-event?** Race directors want to see everyone; participants might want to see only their own crew. Current spec is "everyone in the event" — Phase 4 permissions can refine. Document that v1 is open within an event.
|
||
- **Wildcard topics.** Not in scope. If we ever need it, the topic parser is the place to add `event:*` → "every event in the user's orgs."
|
||
|
||
## Done
|
||
|
||
Landed in `38de4bc`. Key deviations from spec:
|
||
- `canAccessEvent` signature is `(cookieHeader: string, eventId: string)` rather than `(user: AuthenticatedUser, eventId: string)` because `AuthenticatedUser` doesn't carry the cookie — the cookie lives on `LiveConnection.cookieHeader`. The call site passes `conn.cookieHeader` directly.
|
||
- Added `'error'` to `ErrorCode` in `protocol.ts` to handle transient authz failures; the spec omitted this case.
|
||
- `SnapshotProvider` interface is defined in `registry.ts` and defaults to a stub returning `[]`; task 1.5.5 injects the real implementation via the optional parameter.
|
||
- Snapshot fetching is integrated into the subscribe flow already (calls `fetchSnapshot` on success), making task 1.5.5 a pure injection of a better provider rather than a structural change to the subscribe flow.
|