3.6 KiB
3.6 KiB
name: Processor Service description: processor service: Phase 1 complete (all 11 tasks), key patterns, conventions, and quirks type: project
Phase 1 complete. All 11 tasks landed. The throughput pipeline is done: consumer + writer + metrics + integration test + Docker + CI.
Architecture divergence from tcp-ingestion:
- ESLint
import/no-restricted-pathszone:src/core/cannot importsrc/domain/(preemptive for Phase 2). INSTANCE_IDdefaults to'processor-1'(fixed string, not random UUID). Deterministic for container environments.REDIS_CONSUMER_NAMEdefaults toINSTANCE_IDat runtime (after schema parse), not as a zod schema default.connectRedislives insrc/core/consumer.ts(notsrc/db/). Rationale: only the consumer needs a Redis connection in Phase 1.
Buffer.toJSON() gotcha in JSON.stringify replacers:
Buffer.isBuffer(value)is false inside aJSON.stringifyreplacer for nested Buffer properties becauseBuffer.prototype.toJSON()fires before the replacer.- Must handle both
instanceof Uint8Array(direct reference) AND the{ type: 'Buffer', data: number[] }shape. SeeserializeAttributesinsrc/core/writer.ts.
writer.ts — RETURNING duplicate detection:
ON CONFLICT DO NOTHINGdoes not return conflicting rows. Duplicates detected via set-diff on (device_id, ts) between input and RETURNING rows.
LRU Map trick (state.ts):
- Plain
Mapwithdelete+seton every update.keys().next().valueis always the oldest. O(1), no external library.
Metrics (task 1.9):
src/observability/metrics.tsexports:createMetrics(),startMetricsServer(),createPostgresHealthCheck(),createConsumerLagSampler().createPostgresHealthCheck: cachedSELECT 1, 5 s TTL, background interval, syncisReady()accessor for/readyz.createConsumerLagSampler: usesXINFO GROUPS→lagfield (Redis 7.2+); falls back toXLENif absent.processor_device_state_sizeis a gauge updated viametrics.observe()in the sink (not in state.ts — avoids coupling state to metrics).- prom-client counters with label dims do NOT emit
{label} 0at init. Test accordingly (check label-less counters for zero baseline).
Integration test (task 1.10):
test/pipeline.integration.test.ts: 4 scenarios. Skip-on-no-Docker pattern: try container start inbeforeAll, setdockerAvailable = falseon error, eachitearly-returns.- TimescaleDB image:
timescale/timescaledb:latest-pg16(not stock postgres). - Consumer uses a separate
connectRedisconnection; test XADD uses a separateredisClient. Mirrors production topology. - Test 4 (retry): stops pgContainer then restarts it.
pgContainer.restart()returns a newStartedTestContainer. Must reassignpgContainerfor theafterAllcleanup to reference the right instance.
Test patterns:
connectWithRetrytests: usemaxAttempts=1for process.exit test to avoid fake-timer noise.- Consumer tests: sink calls
void consumerRef?.stop()to exit the loop. - Inline
import()type annotations forbidden by ESLint (@typescript-eslint/consistent-type-imports). Always use top-levelimport type.
Dockerfile:
EXPOSE 9090only (no TCP listener — Processor has none).HEALTHCHECKuses${METRICS_PORT:-9090}default.- Label
org.opencontainers.image.sourcenot included (not in tcp-ingestion original either).
Why: Phase 1 is the throughput pipeline + operational baseline. Phase 2 (domain logic) and Phase 3 (hardening) build on top.
How to apply:
Phase 2 adds domain logic to src/domain/ — no changes to src/core/. Phase 3 adds graceful shutdown polish, XAUTOCLAIM, and state rehydration.