--- name: Processor Service description: processor service: Phase 1 and Phase 1.5 complete; key patterns, conventions, and quirks type: project --- Phase 1 complete (11 tasks). Phase 1.5 complete (6 tasks, commits b8ebbd0–54f1684). Live broadcast WebSocket endpoint is fully wired and tested. **Architecture divergence from tcp-ingestion:** - ESLint `import/no-restricted-paths` zone: `src/core/` cannot import `src/domain/` (preemptive for Phase 2). - `INSTANCE_ID` defaults to `'processor-1'` (fixed string, not random UUID). Deterministic for container environments. - `REDIS_CONSUMER_NAME` defaults to `INSTANCE_ID` at runtime (after schema parse), not as a zod schema default. - `connectRedis` lives in `src/core/consumer.ts` (not `src/db/`). Rationale: only the consumer needs a Redis connection in Phase 1. **Buffer.toJSON() gotcha in JSON.stringify replacers:** - `Buffer.isBuffer(value)` is false inside a `JSON.stringify` replacer for nested Buffer properties because `Buffer.prototype.toJSON()` fires before the replacer. - Must handle both `instanceof Uint8Array` (direct reference) AND the `{ type: 'Buffer', data: number[] }` shape. See `serializeAttributes` in `src/core/writer.ts`. **writer.ts — RETURNING duplicate detection:** - `ON CONFLICT DO NOTHING` does not return conflicting rows. Duplicates detected via set-diff on (device_id, ts) between input and RETURNING rows. **LRU Map trick (state.ts):** - Plain `Map` with `delete` + `set` on every update. `keys().next().value` is always the oldest. O(1), no external library. **Metrics (task 1.9):** - `src/observability/metrics.ts` exports: `createMetrics()`, `startMetricsServer()`, `createPostgresHealthCheck()`, `createConsumerLagSampler()`. - `createPostgresHealthCheck`: cached `SELECT 1`, 5 s TTL, background interval, sync `isReady()` accessor for `/readyz`. - `createConsumerLagSampler`: uses `XINFO GROUPS` → `lag` field (Redis 7.2+); falls back to `XLEN` if absent. - `processor_device_state_size` is a gauge updated via `metrics.observe()` in the sink (not in state.ts — avoids coupling state to metrics). - prom-client counters with label dims do NOT emit `{label} 0` at init. Test accordingly (check label-less counters for zero baseline). **Integration test (task 1.10):** - `test/pipeline.integration.test.ts`: 4 scenarios. Skip-on-no-Docker pattern: try container start in `beforeAll`, set `dockerAvailable = false` on error, each `it` early-returns. - TimescaleDB image: `timescale/timescaledb:latest-pg16` (not stock postgres). - Consumer uses a separate `connectRedis` connection; test XADD uses a separate `redisClient`. Mirrors production topology. - Test 4 (retry): stops pgContainer then restarts it. `pgContainer.restart()` returns a new `StartedTestContainer`. Must reassign `pgContainer` for the `afterAll` cleanup to reference the right instance. **Test patterns:** - `connectWithRetry` tests: use `maxAttempts=1` for process.exit test to avoid fake-timer noise. - Consumer tests: sink calls `void consumerRef?.stop()` to exit the loop. - Inline `import()` type annotations forbidden by ESLint (`@typescript-eslint/consistent-type-imports`). Always use top-level `import type`. **Dockerfile:** - `EXPOSE 9090` only (no TCP listener — Processor has none). - `HEALTHCHECK` uses `${METRICS_PORT:-9090}` default. - Label `org.opencontainers.image.source` not included (not in tcp-ingestion original either). **Why:** Phase 1 is the throughput pipeline + operational baseline. Phase 2 (domain logic) and Phase 3 (hardening) build on top. **How to apply:** Phase 2 adds domain logic to `src/domain/` — no changes to `src/core/`. Phase 3 adds graceful shutdown polish, XAUTOCLAIM, and state rehydration. --- ## Phase 1.5 — Live broadcast (Done) **ESLint boundary: `src/core/ ↔ src/live/` mutual exclusion.** Enforced by `import/no-restricted-paths`. Shared code lives in `src/shared/`: - `src/shared/types.ts`: `Metrics`, `Position`, `AttributeValue` - `src/shared/codec.ts`: `CodecError`, `decodePosition` Both `src/core/` modules re-export from shared to preserve existing import paths. **Live server (src/live/server.ts):** - `createLiveServer(config, logger, metrics, onMessage, onClose?, authClient?)` factory - `LIVE_WS_PORT=0` assigns an OS port but there's no public API to read it back. Integration tests use a two-step approach: probe a free port, then start with that fixed port. - Auth runs in the `upgrade` handler before completing the WebSocket handshake. **Auth/Authz (src/live/auth.ts, src/live/authz.ts):** - `validate(cookieHeader)` → Directus `/users/me?fields=...`; missing `data` key = error (not unauthorized); `data: null` = unauthorized. - `canAccessEvent(cookieHeader, eventId)` → Directus `/items/events/:id`; never throws. **Subscription registry (src/live/registry.ts):** - WeakMap> for conn→topics (GC-safe); Map> for topic→conns. - `SnapshotProvider` injected at construction; default is a stub returning `[]`. - `fetchSnapshot` in registry wraps in try/catch (fail open = send `subscribed` with empty snapshot on failure). **Device-event map (src/live/device-event-map.ts):** - In-memory `Map>` refreshed every `LIVE_DEVICE_EVENT_REFRESH_MS`. - Phase 1 deviation: `entry_devices.device_id` is IMEI text in the test fixture (`test/fixtures/test-schema.sql`) but UUID FK to devices.id in the real Directus schema. **Broadcast consumer (src/live/broadcast.ts):** - Per-instance consumer group `live-broadcast-{INSTANCE_ID}`; ACK immediately on consume. - Test strategy: `makeRedis` stub blocks the second `xreadgroup` call until xack fires (via `stopSignal` Promise), preventing tight-loop OOM in test workers. **Snapshot provider (src/live/snapshot.ts):** - `DISTINCT ON (p.device_id) ... ORDER BY p.device_id, p.ts DESC WHERE p.faulty = false` - Requires `positions_device_ts_idx ON positions (device_id, ts DESC)` (created in migration 0002). **Integration test (test/live.integration.test.ts):** - Directus stub: `test/helpers/directus-stub.ts` — bare `http.createServer`, no Express. - Test fixture: `test/fixtures/test-schema.sql` — simplified schema with `entry_devices.device_id TEXT`. - `waitForMessage(ws, predicate, timeoutMs)` helper for typed WS message assertions. - Orphan test: `Promise.race([waitForMessage(...), timeout])` → asserts `'timeout'` result.