6.3 KiB
name: Processor Service description: processor service: Phase 1 and Phase 1.5 complete; key patterns, conventions, and quirks type: project
Phase 1 complete (11 tasks). Phase 1.5 complete (6 tasks, commits b8ebbd0–54f1684). Live broadcast WebSocket endpoint is fully wired and tested.
Architecture divergence from tcp-ingestion:
- ESLint
import/no-restricted-pathszone:src/core/cannot importsrc/domain/(preemptive for Phase 2). INSTANCE_IDdefaults to'processor-1'(fixed string, not random UUID). Deterministic for container environments.REDIS_CONSUMER_NAMEdefaults toINSTANCE_IDat runtime (after schema parse), not as a zod schema default.connectRedislives insrc/core/consumer.ts(notsrc/db/). Rationale: only the consumer needs a Redis connection in Phase 1.
Buffer.toJSON() gotcha in JSON.stringify replacers:
Buffer.isBuffer(value)is false inside aJSON.stringifyreplacer for nested Buffer properties becauseBuffer.prototype.toJSON()fires before the replacer.- Must handle both
instanceof Uint8Array(direct reference) AND the{ type: 'Buffer', data: number[] }shape. SeeserializeAttributesinsrc/core/writer.ts.
writer.ts — RETURNING duplicate detection:
ON CONFLICT DO NOTHINGdoes not return conflicting rows. Duplicates detected via set-diff on (device_id, ts) between input and RETURNING rows.
LRU Map trick (state.ts):
- Plain
Mapwithdelete+seton every update.keys().next().valueis always the oldest. O(1), no external library.
Metrics (task 1.9):
src/observability/metrics.tsexports:createMetrics(),startMetricsServer(),createPostgresHealthCheck(),createConsumerLagSampler().createPostgresHealthCheck: cachedSELECT 1, 5 s TTL, background interval, syncisReady()accessor for/readyz.createConsumerLagSampler: usesXINFO GROUPS→lagfield (Redis 7.2+); falls back toXLENif absent.processor_device_state_sizeis a gauge updated viametrics.observe()in the sink (not in state.ts — avoids coupling state to metrics).- prom-client counters with label dims do NOT emit
{label} 0at init. Test accordingly (check label-less counters for zero baseline).
Integration test (task 1.10):
test/pipeline.integration.test.ts: 4 scenarios. Skip-on-no-Docker pattern: try container start inbeforeAll, setdockerAvailable = falseon error, eachitearly-returns.- TimescaleDB image:
timescale/timescaledb:latest-pg16(not stock postgres). - Consumer uses a separate
connectRedisconnection; test XADD uses a separateredisClient. Mirrors production topology. - Test 4 (retry): stops pgContainer then restarts it.
pgContainer.restart()returns a newStartedTestContainer. Must reassignpgContainerfor theafterAllcleanup to reference the right instance.
Test patterns:
connectWithRetrytests: usemaxAttempts=1for process.exit test to avoid fake-timer noise.- Consumer tests: sink calls
void consumerRef?.stop()to exit the loop. - Inline
import()type annotations forbidden by ESLint (@typescript-eslint/consistent-type-imports). Always use top-levelimport type.
Dockerfile:
EXPOSE 9090only (no TCP listener — Processor has none).HEALTHCHECKuses${METRICS_PORT:-9090}default.- Label
org.opencontainers.image.sourcenot included (not in tcp-ingestion original either).
Why: Phase 1 is the throughput pipeline + operational baseline. Phase 2 (domain logic) and Phase 3 (hardening) build on top.
How to apply:
Phase 2 adds domain logic to src/domain/ — no changes to src/core/. Phase 3 adds graceful shutdown polish, XAUTOCLAIM, and state rehydration.
Phase 1.5 — Live broadcast (Done)
ESLint boundary: src/core/ ↔ src/live/ mutual exclusion.
Enforced by import/no-restricted-paths. Shared code lives in src/shared/:
src/shared/types.ts:Metrics,Position,AttributeValuesrc/shared/codec.ts:CodecError,decodePositionBothsrc/core/modules re-export from shared to preserve existing import paths.
Live server (src/live/server.ts):
createLiveServer(config, logger, metrics, onMessage, onClose?, authClient?)factoryLIVE_WS_PORT=0assigns an OS port but there's no public API to read it back. Integration tests use a two-step approach: probe a free port, then start with that fixed port.- Auth runs in the
upgradehandler before completing the WebSocket handshake.
Auth/Authz (src/live/auth.ts, src/live/authz.ts):
validate(cookieHeader)→ Directus/users/me?fields=...; missingdatakey = error (not unauthorized);data: null= unauthorized.canAccessEvent(cookieHeader, eventId)→ Directus/items/events/:id; never throws.
Subscription registry (src/live/registry.ts):
- WeakMap<LiveConnection, Set> for conn→topics (GC-safe); Map<string, Set> for topic→conns.
SnapshotProviderinjected at construction; default is a stub returning[].fetchSnapshotin registry wraps in try/catch (fail open = sendsubscribedwith empty snapshot on failure).
Device-event map (src/live/device-event-map.ts):
- In-memory
Map<deviceId, Set<eventId>>refreshed everyLIVE_DEVICE_EVENT_REFRESH_MS. - Phase 1 deviation:
entry_devices.device_idis IMEI text in the test fixture (test/fixtures/test-schema.sql) but UUID FK to devices.id in the real Directus schema.
Broadcast consumer (src/live/broadcast.ts):
- Per-instance consumer group
live-broadcast-{INSTANCE_ID}; ACK immediately on consume. - Test strategy:
makeRedisstub blocks the secondxreadgroupcall until xack fires (viastopSignalPromise), preventing tight-loop OOM in test workers.
Snapshot provider (src/live/snapshot.ts):
DISTINCT ON (p.device_id) ... ORDER BY p.device_id, p.ts DESC WHERE p.faulty = false- Requires
positions_device_ts_idx ON positions (device_id, ts DESC)(created in migration 0002).
Integration test (test/live.integration.test.ts):
- Directus stub:
test/helpers/directus-stub.ts— barehttp.createServer, no Express. - Test fixture:
test/fixtures/test-schema.sql— simplified schema withentry_devices.device_id TEXT. waitForMessage<T>(ws, predicate, timeoutMs)helper for typed WS message assertions.- Orphan test:
Promise.race([waitForMessage(...), timeout])→ asserts'timeout'result.