Files
docs/.claude/agent-memory/ts-node-backend-engineer/project_processor.md
T

6.3 KiB
Raw Blame History


name: Processor Service description: processor service: Phase 1 and Phase 1.5 complete; key patterns, conventions, and quirks type: project

Phase 1 complete (11 tasks). Phase 1.5 complete (6 tasks, commits b8ebbd054f1684). Live broadcast WebSocket endpoint is fully wired and tested.

Architecture divergence from tcp-ingestion:

  • ESLint import/no-restricted-paths zone: src/core/ cannot import src/domain/ (preemptive for Phase 2).
  • INSTANCE_ID defaults to 'processor-1' (fixed string, not random UUID). Deterministic for container environments.
  • REDIS_CONSUMER_NAME defaults to INSTANCE_ID at runtime (after schema parse), not as a zod schema default.
  • connectRedis lives in src/core/consumer.ts (not src/db/). Rationale: only the consumer needs a Redis connection in Phase 1.

Buffer.toJSON() gotcha in JSON.stringify replacers:

  • Buffer.isBuffer(value) is false inside a JSON.stringify replacer for nested Buffer properties because Buffer.prototype.toJSON() fires before the replacer.
  • Must handle both instanceof Uint8Array (direct reference) AND the { type: 'Buffer', data: number[] } shape. See serializeAttributes in src/core/writer.ts.

writer.ts — RETURNING duplicate detection:

  • ON CONFLICT DO NOTHING does not return conflicting rows. Duplicates detected via set-diff on (device_id, ts) between input and RETURNING rows.

LRU Map trick (state.ts):

  • Plain Map with delete + set on every update. keys().next().value is always the oldest. O(1), no external library.

Metrics (task 1.9):

  • src/observability/metrics.ts exports: createMetrics(), startMetricsServer(), createPostgresHealthCheck(), createConsumerLagSampler().
  • createPostgresHealthCheck: cached SELECT 1, 5 s TTL, background interval, sync isReady() accessor for /readyz.
  • createConsumerLagSampler: uses XINFO GROUPSlag field (Redis 7.2+); falls back to XLEN if absent.
  • processor_device_state_size is a gauge updated via metrics.observe() in the sink (not in state.ts — avoids coupling state to metrics).
  • prom-client counters with label dims do NOT emit {label} 0 at init. Test accordingly (check label-less counters for zero baseline).

Integration test (task 1.10):

  • test/pipeline.integration.test.ts: 4 scenarios. Skip-on-no-Docker pattern: try container start in beforeAll, set dockerAvailable = false on error, each it early-returns.
  • TimescaleDB image: timescale/timescaledb:latest-pg16 (not stock postgres).
  • Consumer uses a separate connectRedis connection; test XADD uses a separate redisClient. Mirrors production topology.
  • Test 4 (retry): stops pgContainer then restarts it. pgContainer.restart() returns a new StartedTestContainer. Must reassign pgContainer for the afterAll cleanup to reference the right instance.

Test patterns:

  • connectWithRetry tests: use maxAttempts=1 for process.exit test to avoid fake-timer noise.
  • Consumer tests: sink calls void consumerRef?.stop() to exit the loop.
  • Inline import() type annotations forbidden by ESLint (@typescript-eslint/consistent-type-imports). Always use top-level import type.

Dockerfile:

  • EXPOSE 9090 only (no TCP listener — Processor has none).
  • HEALTHCHECK uses ${METRICS_PORT:-9090} default.
  • Label org.opencontainers.image.source not included (not in tcp-ingestion original either).

Why: Phase 1 is the throughput pipeline + operational baseline. Phase 2 (domain logic) and Phase 3 (hardening) build on top.

How to apply: Phase 2 adds domain logic to src/domain/ — no changes to src/core/. Phase 3 adds graceful shutdown polish, XAUTOCLAIM, and state rehydration.


Phase 1.5 — Live broadcast (Done)

ESLint boundary: src/core/ ↔ src/live/ mutual exclusion. Enforced by import/no-restricted-paths. Shared code lives in src/shared/:

  • src/shared/types.ts: Metrics, Position, AttributeValue
  • src/shared/codec.ts: CodecError, decodePosition Both src/core/ modules re-export from shared to preserve existing import paths.

Live server (src/live/server.ts):

  • createLiveServer(config, logger, metrics, onMessage, onClose?, authClient?) factory
  • LIVE_WS_PORT=0 assigns an OS port but there's no public API to read it back. Integration tests use a two-step approach: probe a free port, then start with that fixed port.
  • Auth runs in the upgrade handler before completing the WebSocket handshake.

Auth/Authz (src/live/auth.ts, src/live/authz.ts):

  • validate(cookieHeader) → Directus /users/me?fields=...; missing data key = error (not unauthorized); data: null = unauthorized.
  • canAccessEvent(cookieHeader, eventId) → Directus /items/events/:id; never throws.

Subscription registry (src/live/registry.ts):

  • WeakMap<LiveConnection, Set> for conn→topics (GC-safe); Map<string, Set> for topic→conns.
  • SnapshotProvider injected at construction; default is a stub returning [].
  • fetchSnapshot in registry wraps in try/catch (fail open = send subscribed with empty snapshot on failure).

Device-event map (src/live/device-event-map.ts):

  • In-memory Map<deviceId, Set<eventId>> refreshed every LIVE_DEVICE_EVENT_REFRESH_MS.
  • Phase 1 deviation: entry_devices.device_id is IMEI text in the test fixture (test/fixtures/test-schema.sql) but UUID FK to devices.id in the real Directus schema.

Broadcast consumer (src/live/broadcast.ts):

  • Per-instance consumer group live-broadcast-{INSTANCE_ID}; ACK immediately on consume.
  • Test strategy: makeRedis stub blocks the second xreadgroup call until xack fires (via stopSignal Promise), preventing tight-loop OOM in test workers.

Snapshot provider (src/live/snapshot.ts):

  • DISTINCT ON (p.device_id) ... ORDER BY p.device_id, p.ts DESC WHERE p.faulty = false
  • Requires positions_device_ts_idx ON positions (device_id, ts DESC) (created in migration 0002).

Integration test (test/live.integration.test.ts):

  • Directus stub: test/helpers/directus-stub.ts — bare http.createServer, no Express.
  • Test fixture: test/fixtures/test-schema.sql — simplified schema with entry_devices.device_id TEXT.
  • waitForMessage<T>(ws, predicate, timeoutMs) helper for typed WS message assertions.
  • Orphan test: Promise.race([waitForMessage(...), timeout]) → asserts 'timeout' result.