From d758c211ae5b740414c2d9dd4a1d111918bd1095 Mon Sep 17 00:00:00 2001 From: Julian Cuni Date: Fri, 1 May 2026 11:38:25 +0200 Subject: [PATCH] Fix metric wiring gaps audited against live processor output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Several Phase 1 metrics were registered in observability/metrics.ts but either unwired at the call sites or wired with wrong counts. Production output showed 11 records ingested per logs but only 4 in metrics. The fixes below align metric values with actual hot-path activity. Wiring gaps closed (consumer.ts): - processor_consumer_reads_total{result=ok|empty|error} — was registered but never inc'd. Now fires on each XREADGROUP outcome. - processor_consumer_records_total — was registered but never inc'd. Now fires once per XREADGROUP, with the entry count. Counts corrected (writer.ts): - processor_position_writes_total{status} — was inc'd unconditionally by 1 per chunk for each of inserted/duplicate. Now inc'd by the actual per-status count, and only when count > 0. - processor_position_writes_total{status='failed'} — was inc'd by 1 per failed chunk. Now inc'd by chunk.length so every failed record is counted. Counts corrected (main.ts): - processor_acks_total — was inc'd by 1 per non-empty batch. Now inc'd by ackIds.length so every ACK'd ID is counted. Wiring gap closed (state.ts): - processor_device_state_evictions_total — internal `evicted` counter existed but was never published to metrics. createDeviceStateStore now accepts a Metrics injection and inc's on each eviction. Metrics interface extended (types.ts, metrics.ts): - Metrics.inc gained an optional third `value` parameter (defaults to 1) for batched increments. dispatchInc passes it through to prom-client's Counter.inc(labels, value). Tests updated to reflect the new third arg and the state.ts factory's new metrics parameter. Total 134 unit tests passing (no count change — existing tests adjusted, no new tests added; the real verification is on stage where the metrics are now meaningful again). --- src/core/consumer.ts | 14 ++++++++++++-- src/core/state.ts | 9 +++++++-- src/core/types.ts | 14 ++++++++++---- src/core/writer.ts | 12 +++++++++--- src/main.ts | 4 ++-- src/observability/metrics.ts | 18 ++++++++++-------- test/state.test.ts | 37 +++++++++++++++++++++--------------- test/writer.test.ts | 5 +++-- 8 files changed, 75 insertions(+), 38 deletions(-) diff --git a/src/core/consumer.ts b/src/core/consumer.ts index 88161aa..f83a4ef 100644 --- a/src/core/consumer.ts +++ b/src/core/consumer.ts @@ -275,18 +275,28 @@ export function createConsumer( )) as [string, [string, string[]][]][] | null; } catch (err) { if (stopping) break; + metrics.inc('processor_consumer_reads_total', { result: 'error' }); logger.error({ err }, 'XREADGROUP failed; backing off'); await sleep(1_000); continue; } // BLOCK timeout — no new entries; loop again to check stopping flag. - if (rawResult === null) continue; + if (rawResult === null) { + metrics.inc('processor_consumer_reads_total', { result: 'empty' }); + continue; + } // rawResult is [[streamName, [[id, fields], ...]]] // We only subscribed to one stream so we take the first element. const streamEntries = rawResult[0]?.[1] ?? []; - if (streamEntries.length === 0) continue; + if (streamEntries.length === 0) { + metrics.inc('processor_consumer_reads_total', { result: 'empty' }); + continue; + } + + metrics.inc('processor_consumer_reads_total', { result: 'ok' }); + metrics.inc('processor_consumer_records_total', undefined, streamEntries.length); logger.debug({ stream, n: streamEntries.length }, 'batch consumed'); diff --git a/src/core/state.ts b/src/core/state.ts index 26b3036..5cba7e6 100644 --- a/src/core/state.ts +++ b/src/core/state.ts @@ -15,7 +15,7 @@ import type { Logger } from 'pino'; import type { Config } from '../config/load.js'; -import type { Position, DeviceState } from './types.js'; +import type { Position, DeviceState, Metrics } from './types.js'; // --------------------------------------------------------------------------- // Public interface @@ -47,7 +47,11 @@ export type DeviceStateStore = { // Factory // --------------------------------------------------------------------------- -export function createDeviceStateStore(config: Config, logger: Logger): DeviceStateStore { +export function createDeviceStateStore( + config: Config, + logger: Logger, + metrics: Metrics, +): DeviceStateStore { const cap = config.DEVICE_STATE_LRU_CAP; const store = new Map(); let evicted = 0; @@ -88,6 +92,7 @@ export function createDeviceStateStore(config: Config, logger: Logger): DeviceSt if (oldestKey !== undefined) { store.delete(oldestKey); evicted++; + metrics.inc('processor_device_state_evictions_total'); logger.debug( { evictedDevice: oldestKey, storeSize: store.size, cap }, 'device state evicted (LRU)', diff --git a/src/core/types.ts b/src/core/types.ts index 509a75f..79d04e7 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -84,11 +84,17 @@ export type DeviceState = { // --------------------------------------------------------------------------- /** - * Minimal metrics interface exposed to pipeline components. Concrete - * implementation (prom-client) lands in task 1.9; this keeps types stable - * through tasks 1.2–1.8. + * Minimal metrics interface exposed to pipeline components. + * + * `inc` accepts an optional `value` for batched increments — counters that + * naturally arrive in groups (records consumed, rows inserted, IDs ACKed) + * should pass the count rather than calling `inc` N times. Defaults to 1. */ export type Metrics = { - readonly inc: (name: string, labels?: Record) => void; + readonly inc: ( + name: string, + labels?: Record, + value?: number, + ) => void; readonly observe: (name: string, value: number, labels?: Record) => void; }; diff --git a/src/core/writer.ts b/src/core/writer.ts index 3827f00..3d38ea0 100644 --- a/src/core/writer.ts +++ b/src/core/writer.ts @@ -190,7 +190,8 @@ export function createWriter( const error = err instanceof Error ? err : new Error(String(err)); logger.error({ err, chunkSize: chunk.length }, 'position write failed'); - metrics.inc('processor_position_writes_total', { status: 'failed' }); + // Every record in the failed chunk gets `status: failed` — count them all. + metrics.inc('processor_position_writes_total', { status: 'failed' }, chunk.length); return chunk.map((record) => ({ id: record.id, status: 'failed' as const, error })); } @@ -200,8 +201,13 @@ export function createWriter( const insertedCount = results.filter((r) => r.status === 'inserted').length; const duplicateCount = results.filter((r) => r.status === 'duplicate').length; - metrics.inc('processor_position_writes_total', { status: 'inserted' }); - metrics.inc('processor_position_writes_total', { status: 'duplicate' }); + // Counts must match per-record outcomes, not be incremented once per chunk. + if (insertedCount > 0) { + metrics.inc('processor_position_writes_total', { status: 'inserted' }, insertedCount); + } + if (duplicateCount > 0) { + metrics.inc('processor_position_writes_total', { status: 'duplicate' }, duplicateCount); + } metrics.observe('processor_position_write_duration_seconds', (Date.now() - startMs) / 1_000); logger.debug( diff --git a/src/main.ts b/src/main.ts index 39b26ce..b7f0f09 100644 --- a/src/main.ts +++ b/src/main.ts @@ -65,7 +65,7 @@ async function main(): Promise { const redis: Redis = await connectRedis(config.REDIS_URL, logger); // 5. Build pipeline components - const state = createDeviceStateStore(config, logger); + const state = createDeviceStateStore(config, logger, metrics); const writer = createWriter(pool, config, logger, metrics); // 6. Postgres health check — background cached SELECT 1 for /readyz. @@ -122,7 +122,7 @@ async function main(): Promise { .map((r) => r.id); if (ackIds.length > 0) { - metrics.inc('processor_acks_total'); + metrics.inc('processor_acks_total', undefined, ackIds.length); } return ackIds; diff --git a/src/observability/metrics.ts b/src/observability/metrics.ts index 98ccb9e..89da9af 100644 --- a/src/observability/metrics.ts +++ b/src/observability/metrics.ts @@ -71,8 +71,8 @@ export function createMetrics(): Metrics & { collectDefaultMetrics({ register: internal.registry }); const metricsImpl: Metrics & { serializeMetrics: () => Promise } = { - inc(name: string, labels?: Record): void { - dispatchInc(internal, name, labels); + inc(name: string, labels?: Record, value?: number): void { + dispatchInc(internal, name, labels, value); }, observe(name: string, value: number, labels?: Record): void { @@ -398,25 +398,27 @@ function dispatchInc( r: InternalRegistry, name: string, labels?: Record, + value?: number, ): void { + const v = value ?? 1; switch (name) { case 'processor_consumer_reads_total': - r.consumerReadsTotal.inc(labels ?? {}); + r.consumerReadsTotal.inc(labels ?? {}, v); break; case 'processor_consumer_records_total': - r.consumerRecordsTotal.inc(); + r.consumerRecordsTotal.inc(v); break; case 'processor_decode_errors_total': - r.decodeErrorsTotal.inc(); + r.decodeErrorsTotal.inc(v); break; case 'processor_position_writes_total': - r.positionWritesTotal.inc(labels ?? {}); + r.positionWritesTotal.inc(labels ?? {}, v); break; case 'processor_acks_total': - r.acksTotal.inc(); + r.acksTotal.inc(v); break; case 'processor_device_state_evictions_total': - r.deviceStateEvictionsTotal.inc(); + r.deviceStateEvictionsTotal.inc(v); break; default: // Unknown metric name — silently ignore. This preserves the contract diff --git a/test/state.test.ts b/test/state.test.ts index beaf4a0..c6c3939 100644 --- a/test/state.test.ts +++ b/test/state.test.ts @@ -15,7 +15,7 @@ import { describe, it, expect, vi } from 'vitest'; import type { Logger } from 'pino'; import type { Config } from '../src/config/load.js'; -import type { Position } from '../src/core/types.js'; +import type { Position, Metrics } from '../src/core/types.js'; import { createDeviceStateStore } from '../src/core/state.js'; // --------------------------------------------------------------------------- @@ -36,6 +36,13 @@ function makeSilentLogger(): Logger { } as unknown as Logger; } +function makeMetrics(): Metrics { + return { + inc: vi.fn(), + observe: vi.fn(), + }; +} + function makeConfig(overrides: Partial = {}): Config { return { NODE_ENV: 'test', @@ -77,7 +84,7 @@ function makePosition(deviceId: string, overrides: Partial = {}): Posi describe('createDeviceStateStore — initial state', () => { it('creates a new entry on first update', () => { - const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics()); const position = makePosition('DEV001'); const state = store.update(position); @@ -89,7 +96,7 @@ describe('createDeviceStateStore — initial state', () => { }); it('increments position_count_session on subsequent updates', () => { - const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics()); const pos1 = makePosition('DEV001', { timestamp: new Date('2024-05-01T12:00:00.000Z') }); const pos2 = makePosition('DEV001', { timestamp: new Date('2024-05-01T12:00:01.000Z') }); const pos3 = makePosition('DEV001', { timestamp: new Date('2024-05-01T12:00:02.000Z') }); @@ -102,13 +109,13 @@ describe('createDeviceStateStore — initial state', () => { }); it('get() returns undefined for an unknown device', () => { - const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics()); expect(store.get('UNKNOWN')).toBeUndefined(); }); it('get() returns the current state for a known device', () => { - const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics()); const position = makePosition('DEV002'); store.update(position); @@ -119,12 +126,12 @@ describe('createDeviceStateStore — initial state', () => { }); it('size() returns 0 before any updates', () => { - const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics()); expect(store.size()).toBe(0); }); it('size() returns the number of distinct devices after updates', () => { - const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics()); store.update(makePosition('DEV001')); store.update(makePosition('DEV002')); @@ -136,7 +143,7 @@ describe('createDeviceStateStore — initial state', () => { describe('createDeviceStateStore — last_seen semantics', () => { it('last_seen reflects the position timestamp (not wall clock)', () => { - const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics()); const ts = new Date('2024-03-15T08:30:00.000Z'); const position = makePosition('DEV010', { timestamp: ts }); @@ -147,7 +154,7 @@ describe('createDeviceStateStore — last_seen semantics', () => { }); it('last_seen advances on newer timestamps', () => { - const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics()); const ts1 = new Date('2024-05-01T10:00:00.000Z'); const ts2 = new Date('2024-05-01T11:00:00.000Z'); @@ -161,7 +168,7 @@ describe('createDeviceStateStore — last_seen semantics', () => { // Devices buffer offline records and replay them in bursts; within a burst // consecutive timestamps may decrease. last_seen must mean "highest device // timestamp seen so far" — it must never go backward. - const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics()); const newer = new Date('2024-05-01T12:00:00.000Z'); const older = new Date('2024-05-01T10:00:00.000Z'); @@ -173,7 +180,7 @@ describe('createDeviceStateStore — last_seen semantics', () => { }); it('last_seen stays the same when equal timestamps arrive', () => { - const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics()); const ts = new Date('2024-05-01T12:00:00.000Z'); store.update(makePosition('DEV013', { timestamp: ts })); @@ -185,7 +192,7 @@ describe('createDeviceStateStore — last_seen semantics', () => { describe('createDeviceStateStore — LRU eviction', () => { it('evicts the least-recently-updated device when cap is exceeded', () => { - const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 3 }), makeSilentLogger()); + const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 3 }), makeSilentLogger(), makeMetrics()); const ts = new Date('2024-05-01T12:00:00.000Z'); // Insert 3 devices: DEV001, DEV002, DEV003 (DEV001 is oldest) @@ -206,7 +213,7 @@ describe('createDeviceStateStore — LRU eviction', () => { }); it('re-using an existing device bumps it to most-recent so it is not evicted next', () => { - const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 3 }), makeSilentLogger()); + const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 3 }), makeSilentLogger(), makeMetrics()); const ts1 = new Date('2024-05-01T12:00:00.000Z'); const ts2 = new Date('2024-05-01T12:00:01.000Z'); @@ -228,7 +235,7 @@ describe('createDeviceStateStore — LRU eviction', () => { }); it('evictedTotal() increments on each eviction', () => { - const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 2 }), makeSilentLogger()); + const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 2 }), makeSilentLogger(), makeMetrics()); const ts = new Date('2024-05-01T12:00:00.000Z'); expect(store.evictedTotal()).toBe(0); @@ -245,7 +252,7 @@ describe('createDeviceStateStore — LRU eviction', () => { }); it('evictedTotal() stays 0 when cap is never reached', () => { - const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 1000 }), makeSilentLogger()); + const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 1000 }), makeSilentLogger(), makeMetrics()); const ts = new Date('2024-05-01T12:00:00.000Z'); for (let i = 0; i < 10; i++) { diff --git a/test/writer.test.ts b/test/writer.test.ts index 3bb740c..55b19bd 100644 --- a/test/writer.test.ts +++ b/test/writer.test.ts @@ -242,6 +242,7 @@ describe('createWriter — pool error', () => { expect(metrics.inc).toHaveBeenCalledWith( 'processor_position_writes_total', { status: 'failed' }, + records.length, ); }); }); @@ -491,8 +492,8 @@ describe('createWriter — metrics', () => { const writer = createWriter(pool, makeConfig(), makeSilentLogger(), metrics); await writer.write(records); - expect(metrics.inc).toHaveBeenCalledWith('processor_position_writes_total', { status: 'inserted' }); - expect(metrics.inc).toHaveBeenCalledWith('processor_position_writes_total', { status: 'duplicate' }); + expect(metrics.inc).toHaveBeenCalledWith('processor_position_writes_total', { status: 'inserted' }, 1); + expect(metrics.inc).toHaveBeenCalledWith('processor_position_writes_total', { status: 'duplicate' }, 1); expect(metrics.observe).toHaveBeenCalledWith( 'processor_position_write_duration_seconds', expect.any(Number),