Files
docs/wiki/synthesis/processor-ws-contract.md
T

284 lines
16 KiB
Markdown

---
title: Processor WebSocket contract
type: synthesis
created: 2026-05-02
updated: 2026-05-03
sources: [gps-tracking-architecture, traccar-maps-architecture]
tags: [websocket, protocol, contract, telemetry-plane, decision]
---
# Processor WebSocket contract
The wire-level specification of the WebSocket endpoint that fans live position updates from [[processor]] (or its eventual replacement gateway — see Implementation status) to [[react-spa]] clients. Both sides build against this contract; changes require a coordinated update on both sides.
This page is the protocol spec. The architectural rationale lives in [[live-channel-architecture]]; the consumer-side rendering pattern in [[maps-architecture]]; the inheritance from a working production reference in [[traccar-maps-architecture]].
## Implementation status
**Shipped as `processor` Phase 1.5 — Live broadcast** (landed 2026-05-02). All six tasks merged: 1.5.1 WS server scaffold + heartbeat, 1.5.2 cookie auth handshake, 1.5.3 subscription registry & per-event authorization, 1.5.4 broadcast consumer group & fan-out, 1.5.5 snapshot-on-subscribe, 1.5.6 integration test. 178/178 unit tests + 6 integration scenarios green.
The endpoint is hosted *inside* the Processor process (as [[processor]] and [[live-channel-architecture]] specify). Lifting it into a separate `live-gateway` service remains the documented escape hatch in [[live-channel-architecture]] §"Scale considerations" if sustained > 10k WS messages/sec ever demands it — not currently planned.
This contract is implementation-agnostic in the sense that the wire format wouldn't change if we ever did lift the endpoint out — only the host process would.
## Endpoint
```
wss://<env>.dev.trmtracking.org/ws-live
```
Path **`/ws-live`** (locked 2026-05-03). The companion business-plane channel hosted by [[directus]] is at **`/ws-business`** on the same origin (proxy-rewritten to Directus's native `/websocket`). Both names are read by the SPA from `/config.json` (`liveWsUrl` and `businessWsUrl`).
Served behind the same Traefik instance that fronts [[directus]] and the [[react-spa]] static bundle on the Komodo host (per `NEW-HOST-KOMODO-TRAEFIK.md`). **Single origin is non-negotiable** — same-origin is what allows the auth cookie to flow with the WebSocket upgrade request (see Auth handshake below). The SPA reaches the endpoint as a relative URL, never a cross-origin URL.
## Transport
- **Protocol:** WebSocket (RFC 6455) over TLS at the edge. Internal hop from Traefik to the producer is plain WS on the deploy stack's default Compose network plus the external `proxy` network shared with Traefik.
- **Subprotocol:** none required. Future versions may add a `Sec-WebSocket-Protocol` of `trm.live.v1` if we need to negotiate versions; for now the path is the version.
- **Frame format:** text frames, JSON-encoded. No binary frames. (If we ever need to ship raw position bytes for a high-frequency optimisation, that's a v2 concern.)
- **Heartbeat:** the producer sends a ping every 30 s; the consumer responds. Consumer-side liveness is enforced by `setInterval` checking time-since-last-message > 60s ⇒ reconnect.
## Auth handshake
Cookie-based, same-origin, validated against [[directus]] once at connection time. The SPA uses the Directus SDK in session mode (see [[react-spa]] §"Auth pattern"); the producer is cookie-name-agnostic and just forwards whatever cookie header the upgrade carries.
```
1. Browser opens WebSocket to wss://<origin>/processor/ws.
Same-origin → browser automatically attaches the httpOnly session cookie
issued by Directus's /auth/login (session mode).
2. Producer reads the entire Cookie header from the upgrade request.
GET /users/me to Directus, forwarding the header verbatim.
200 → user identity (id, role, etc.) is bound to the connection.
401/403 → close the WebSocket with code 4401 (unauthorized).
3. Connection is now authenticated. The producer holds (connectionId → user)
in memory. No further per-message auth.
```
Implementation notes:
- **Cookie validation cache.** `/users/me` round-trip per connection is fine at pilot scale (≤500 viewers). At higher scale, cache the validation result for the connection's lifetime; on logout / session expiry the SPA reconnects, which re-validates.
- **No JWT in URL.** Don't pass tokens in query strings — they end up in proxy logs. Cookie is the only credential.
- **Why cookie not Authorization header.** Browsers don't let you set Authorization on a WebSocket upgrade. Cookies flow automatically. Same-origin is what makes this work.
- **Cookie-name-agnostic.** The producer never parses individual cookies; it forwards the whole header to `/users/me` and lets Directus identify the session. This keeps the producer working unchanged if Directus's cookie name or auth-mode default ever changes.
## Subscription model
After authentication, the SPA subscribes to event-scoped topics. One connection can hold multiple subscriptions; per-event authorization is checked once at subscribe time.
### Topic format
```
event:<eventId>
```
`<eventId>` is the UUID of an `events` row. Authorization: the user must have a record in `organization_users` for the event's organization (any role). Phase 4 of [[directus]] (permissions) will tighten this; for now membership is enough.
Future topic shapes (not in v1):
- `device:<deviceId>` — single-device follow.
- `entry:<entryId>` — follow a specific competitor across stages.
- `org:<orgId>` — broad org-wide watch (admin-only).
The protocol is forward-compatible: any string-typed topic is valid; producer rejects unknown shapes with `error/unknown-topic`.
### Subscribe
```json
// Client → Server
{
"type": "subscribe",
"topic": "event:ada60b3d-b29f-4017-b702-cd6b700f9f6c",
"id": "client-correlation-id-1"
}
```
`id` is optional; if present, the server echoes it on the response so the client can correlate.
### Server response — subscribed
```json
// Server → Client
{
"type": "subscribed",
"topic": "event:ada60b3d-b29f-4017-b702-cd6b700f9f6c",
"id": "client-correlation-id-1",
"snapshot": [
{ "deviceId": "cbed320e...", "lat": 41.327, "lon": 19.819, "ts": 1714654800000, "speed": 42.3, "course": 187, "accuracy": 5.0, "attributes": {} },
{ "deviceId": "f6114c7e...", "lat": 41.328, "lon": 19.820, "ts": 1714654799000, "speed": 38.1, "course": 184, "accuracy": 4.5, "attributes": {} }
]
}
```
The snapshot is the **latest known position per device** registered to the event (via `entry_devices``entries``events`). Without it, the SPA opens to a black map until devices report — feels broken.
### Server response — error
```json
// Server → Client
{
"type": "error",
"topic": "event:ada60b3d-b29f-4017-b702-cd6b700f9f6c",
"id": "client-correlation-id-1",
"code": "forbidden",
"message": "User does not belong to the event's organization."
}
```
Error codes (initial set; extensible):
| Code | Meaning |
|---|---|
| `forbidden` | User authenticated but not authorized for this topic. |
| `not-found` | Topic refers to a non-existent entity (event id has no row). |
| `unknown-topic` | Topic format not recognised. |
| `rate-limited` | Subscribe rate exceeded (Phase 3 hardening; reserved). |
### Streaming updates
After `subscribed`, the server pushes one message per position-of-interest:
```json
// Server → Client
{
"type": "position",
"topic": "event:ada60b3d-b29f-4017-b702-cd6b700f9f6c",
"deviceId": "cbed320e-1e94-488a-93c3-41060fcb06bc",
"lat": 41.32791,
"lon": 19.81947,
"ts": 1714654801000,
"speed": 42.5,
"course": 188,
"accuracy": 5.0,
"attributes": {}
}
```
Field semantics:
| Field | Type | Required | Notes |
|---|---|---|---|
| `type` | `"position"` | yes | Discriminator. |
| `topic` | string | yes | Echoes the subscription. Allows multiplexing on one connection. |
| `deviceId` | uuid | yes | The `devices.id` (not the IMEI). SPA looks up device → entry → vehicle/crew via TanStack Query against [[directus]]. |
| `lat` / `lon` | number (degrees, WGS84) | yes | GPS coordinates. **Coordinate order in JSON is `lat`/`lon`** (not `[lon,lat]` GeoJSON ordering — that conversion happens in the SPA). |
| `ts` | number (epoch milliseconds, UTC) | yes | Authoritative timestamp from the device's GPS fix. **Always use this, never `Date.now()` on the client.** |
| `speed` | number (km/h) | optional | Omitted if device reports speed=0 with invalid GPS fix (per [[teltonika]] convention). |
| `course` | number (degrees, 0=N, clockwise) | optional | Heading. Omitted if unknown. |
| `accuracy` | number (metres) | optional | Position accuracy radius for the [[react-spa]]'s accuracy-circle layer. |
| `attributes` | object | optional, default `{}` | The decoded IO bag. Phase 1 ships the raw IO map; Phase 2 of [[processor]] adds named attributes per [[io-element-bag]]. SPA must tolerate empty / unknown shapes. |
The producer should **omit fields rather than send `null`** for absent values. Reduces JSON size and removes ambiguity (null = "we don't know" vs missing = "device didn't report").
### Unsubscribe
```json
// Client → Server
{
"type": "unsubscribe",
"topic": "event:ada60b3d-b29f-4017-b702-cd6b700f9f6c",
"id": "client-correlation-id-2"
}
```
Server response:
```json
// Server → Client
{
"type": "unsubscribed",
"topic": "event:ada60b3d-b29f-4017-b702-cd6b700f9f6c",
"id": "client-correlation-id-2"
}
```
The connection stays open with whatever other subscriptions are active. Closing the WebSocket is the cleanup-everything path.
## Reconnect semantics
The client reconnects on close (other than code 4401). Backoff: 1s, 2s, 4s, 8s, 16s, then 30s steady. Cap at 30s.
On reconnect, the client **must re-subscribe to all previously-active topics**. The server treats reconnect as a fresh connection; subscription state lives in memory only.
The server should accept reconnects from the same user without rate-limiting at pilot scale. Phase 3 may add a per-user concurrent-connection cap.
## Multi-instance behaviour
When [[processor]] (or the gateway service) runs more than one replica:
- Each instance reads the [[redis-streams]] telemetry stream on **two consumer groups**:
- `processor` — the durable-write group (work-split: only one instance handles each record for the DB write).
- `live-broadcast-{instance_id}` — a per-instance fan-out group (every instance reads every record for fan-out).
- Connected clients are bound to one instance via the load balancer; that instance fans out to its own clients only. No cross-instance broadcasting needed.
- The reconnect is what handles instance failure — client reconnects, gets re-load-balanced to a healthy instance, re-subscribes.
This design is documented in [[live-channel-architecture]] §"Multi-instance Processor".
## Connection limits and back-pressure
Pilot-scale targets (subject to revision after first dogfood):
| Metric | Target |
|---|---|
| Concurrent connections per instance | 100 |
| Subscriptions per connection | 4 (one event + room for future per-device follow) |
| Position messages per second per connection | ≤ 500 (race start with 500 devices reporting at 1Hz) |
| End-to-end latency (Redis stream → client) | p95 < 500ms |
| Reconnect storm tolerance | 200 reconnects/sec for 5 seconds (race start surge) |
If a slow consumer can't drain its queue, the server **drops oldest position messages** for that connection (per-device; latest position is always preserved). Position data is always-fresh — backlog isn't valuable. Only `subscribed`/`unsubscribed`/`error` control messages are guaranteed delivery.
## Deployment
The endpoint terminates inside the [[processor]] container. Public routing is handled by Traefik on the Komodo host via Docker container labels — no nginx, no openresty, no NPM in the deploy repo. See `NEW-HOST-KOMODO-TRAEFIK.md` for the platform-wide infra contract and the per-host path map.
Concrete shape (placeholder host; replace with the per-environment hostname):
```yaml
processor:
networks: [default, proxy]
labels:
- "traefik.enable=true"
- "traefik.docker.network=proxy"
- "traefik.http.routers.processor-live.rule=Host(`<env>.dev.trmtracking.org`) && PathPrefix(`/ws-live`)"
- "traefik.http.routers.processor-live.entrypoints=websecure"
- "traefik.http.routers.processor-live.tls=true"
- "traefik.http.routers.processor-live.priority=100"
- "traefik.http.services.processor-live.loadbalancer.server.port=<PROCESSOR_WS_PORT>"
```
Three things this depends on:
- **Same origin as Directus and the SPA.** All three answer on the same hostname; Traefik routes by path. The cookie auth handshake described above requires this — different origins block the cookie flow on the WebSocket upgrade.
- **Traefik handles WS upgrade transparently.** No `proxy_http_version` / `Upgrade` / `Connection` header gymnastics required (those were artifacts of the legacy nginx-proxy-manager + openresty setup). Traefik v3 negotiates the upgrade based on the request headers alone.
- **Cookie header forwarding.** The default Traefik forward strategy preserves cookies across the upgrade. Don't introduce middlewares that strip headers between the SPA and the processor — the producer needs the entire `Cookie` header to forward to Directus's `/users/me`.
`<PROCESSOR_WS_PORT>` is the port the Phase 1.5 WS server binds; pin it in the processor service's compose definition and reference it consistently.
## Versioning
This is `v1`. Breaking changes (renaming fields, changing semantics) require:
1. New endpoint path (`/processor/ws/v2`).
2. Update this synthesis page to document both versions.
3. Deprecation window: v1 stays online for ≥ one full event cycle after v2 lands.
Non-breaking additions (new optional fields, new message types, new error codes) ship in v1 without ceremony — both sides should ignore unknown fields and unknown `type` values.
## Open questions
- **Session expiry while connected.** Directus session cookies have a finite lifetime. The WebSocket connection's already-validated identity is unaffected for as long as the connection stays open — the producer authorised once at upgrade and doesn't re-check. If the session expires server-side, the SPA's next REST call (or its periodic `/users/me` ping, if added) will fail with 401, the SPA will redirect to login, and on re-login the SPA reconnects the WebSocket — which re-validates. Pilot answer: producer never re-validates mid-connection. Phase 3 hardening can revisit if real-world session durations make this feel wrong.
- **Device-to-event resolution snapshot freshness.** The snapshot includes "every device registered to the event"; that registration set may change while a client is subscribed. Initial answer: subscription holds the registration set captured at subscribe time; new entries added mid-event don't appear until the client reconnects. Acceptable for pilot.
- **Faulty-flag visibility.** When an operator flips a position's `faulty=true` flag in [[directus]], should the live channel emit a correction? Current answer: no — faulty flagging is post-hoc operator review, not a live concern. Live map shows whatever was streamed at the time. The recompute pipeline ([[processor]] faulty position handling) corrects derived data, not the live history.
- **Replay-mode endpoint.** Out of v1 scope. A future `event:<id>:replay` topic could stream historical positions at a chosen speed. Defer.
## Cross-references
- [[live-channel-architecture]] — architectural rationale and dual-channel design.
- [[processor]] — the entity nominally hosting this endpoint (subject to the Implementation status note above).
- [[react-spa]] — the consumer.
- [[maps-architecture]] — consumer-side throughput discipline (rAF coalescer) that this contract is consumed through.
- [[traccar-maps-architecture]] — the working production reference whose WS contract shape this draws from (with refinements for our needs).
- [[directus]] — auth source (cookie validator) and the data source for event/device/org metadata the SPA looks up alongside the live stream.