Fix metric wiring gaps audited against live processor output

Several Phase 1 metrics were registered in observability/metrics.ts but
either unwired at the call sites or wired with wrong counts. Production
output showed 11 records ingested per logs but only 4 in metrics. The
fixes below align metric values with actual hot-path activity.

Wiring gaps closed (consumer.ts):
- processor_consumer_reads_total{result=ok|empty|error} — was registered
  but never inc'd. Now fires on each XREADGROUP outcome.
- processor_consumer_records_total — was registered but never inc'd.
  Now fires once per XREADGROUP, with the entry count.

Counts corrected (writer.ts):
- processor_position_writes_total{status} — was inc'd unconditionally
  by 1 per chunk for each of inserted/duplicate. Now inc'd by the
  actual per-status count, and only when count > 0.
- processor_position_writes_total{status='failed'} — was inc'd by 1
  per failed chunk. Now inc'd by chunk.length so every failed record
  is counted.

Counts corrected (main.ts):
- processor_acks_total — was inc'd by 1 per non-empty batch. Now
  inc'd by ackIds.length so every ACK'd ID is counted.

Wiring gap closed (state.ts):
- processor_device_state_evictions_total — internal `evicted` counter
  existed but was never published to metrics. createDeviceStateStore
  now accepts a Metrics injection and inc's on each eviction.

Metrics interface extended (types.ts, metrics.ts):
- Metrics.inc gained an optional third `value` parameter (defaults to 1)
  for batched increments. dispatchInc passes it through to prom-client's
  Counter.inc(labels, value).

Tests updated to reflect the new third arg and the state.ts factory's
new metrics parameter. Total 134 unit tests passing (no count change —
existing tests adjusted, no new tests added; the real verification is
on stage where the metrics are now meaningful again).
This commit is contained in:
2026-05-01 11:38:25 +02:00
parent 4a9f55cdf0
commit d758c211ae
8 changed files with 75 additions and 38 deletions
+12 -2
View File
@@ -275,18 +275,28 @@ export function createConsumer(
)) as [string, [string, string[]][]][] | null;
} catch (err) {
if (stopping) break;
metrics.inc('processor_consumer_reads_total', { result: 'error' });
logger.error({ err }, 'XREADGROUP failed; backing off');
await sleep(1_000);
continue;
}
// BLOCK timeout — no new entries; loop again to check stopping flag.
if (rawResult === null) continue;
if (rawResult === null) {
metrics.inc('processor_consumer_reads_total', { result: 'empty' });
continue;
}
// rawResult is [[streamName, [[id, fields], ...]]]
// We only subscribed to one stream so we take the first element.
const streamEntries = rawResult[0]?.[1] ?? [];
if (streamEntries.length === 0) continue;
if (streamEntries.length === 0) {
metrics.inc('processor_consumer_reads_total', { result: 'empty' });
continue;
}
metrics.inc('processor_consumer_reads_total', { result: 'ok' });
metrics.inc('processor_consumer_records_total', undefined, streamEntries.length);
logger.debug({ stream, n: streamEntries.length }, 'batch consumed');
+7 -2
View File
@@ -15,7 +15,7 @@
import type { Logger } from 'pino';
import type { Config } from '../config/load.js';
import type { Position, DeviceState } from './types.js';
import type { Position, DeviceState, Metrics } from './types.js';
// ---------------------------------------------------------------------------
// Public interface
@@ -47,7 +47,11 @@ export type DeviceStateStore = {
// Factory
// ---------------------------------------------------------------------------
export function createDeviceStateStore(config: Config, logger: Logger): DeviceStateStore {
export function createDeviceStateStore(
config: Config,
logger: Logger,
metrics: Metrics,
): DeviceStateStore {
const cap = config.DEVICE_STATE_LRU_CAP;
const store = new Map<string, DeviceState>();
let evicted = 0;
@@ -88,6 +92,7 @@ export function createDeviceStateStore(config: Config, logger: Logger): DeviceSt
if (oldestKey !== undefined) {
store.delete(oldestKey);
evicted++;
metrics.inc('processor_device_state_evictions_total');
logger.debug(
{ evictedDevice: oldestKey, storeSize: store.size, cap },
'device state evicted (LRU)',
+10 -4
View File
@@ -84,11 +84,17 @@ export type DeviceState = {
// ---------------------------------------------------------------------------
/**
* Minimal metrics interface exposed to pipeline components. Concrete
* implementation (prom-client) lands in task 1.9; this keeps types stable
* through tasks 1.21.8.
* Minimal metrics interface exposed to pipeline components.
*
* `inc` accepts an optional `value` for batched increments — counters that
* naturally arrive in groups (records consumed, rows inserted, IDs ACKed)
* should pass the count rather than calling `inc` N times. Defaults to 1.
*/
export type Metrics = {
readonly inc: (name: string, labels?: Record<string, string>) => void;
readonly inc: (
name: string,
labels?: Record<string, string>,
value?: number,
) => void;
readonly observe: (name: string, value: number, labels?: Record<string, string>) => void;
};
+9 -3
View File
@@ -190,7 +190,8 @@ export function createWriter(
const error = err instanceof Error ? err : new Error(String(err));
logger.error({ err, chunkSize: chunk.length }, 'position write failed');
metrics.inc('processor_position_writes_total', { status: 'failed' });
// Every record in the failed chunk gets `status: failed` — count them all.
metrics.inc('processor_position_writes_total', { status: 'failed' }, chunk.length);
return chunk.map((record) => ({ id: record.id, status: 'failed' as const, error }));
}
@@ -200,8 +201,13 @@ export function createWriter(
const insertedCount = results.filter((r) => r.status === 'inserted').length;
const duplicateCount = results.filter((r) => r.status === 'duplicate').length;
metrics.inc('processor_position_writes_total', { status: 'inserted' });
metrics.inc('processor_position_writes_total', { status: 'duplicate' });
// Counts must match per-record outcomes, not be incremented once per chunk.
if (insertedCount > 0) {
metrics.inc('processor_position_writes_total', { status: 'inserted' }, insertedCount);
}
if (duplicateCount > 0) {
metrics.inc('processor_position_writes_total', { status: 'duplicate' }, duplicateCount);
}
metrics.observe('processor_position_write_duration_seconds', (Date.now() - startMs) / 1_000);
logger.debug(
+2 -2
View File
@@ -65,7 +65,7 @@ async function main(): Promise<void> {
const redis: Redis = await connectRedis(config.REDIS_URL, logger);
// 5. Build pipeline components
const state = createDeviceStateStore(config, logger);
const state = createDeviceStateStore(config, logger, metrics);
const writer = createWriter(pool, config, logger, metrics);
// 6. Postgres health check — background cached SELECT 1 for /readyz.
@@ -122,7 +122,7 @@ async function main(): Promise<void> {
.map((r) => r.id);
if (ackIds.length > 0) {
metrics.inc('processor_acks_total');
metrics.inc('processor_acks_total', undefined, ackIds.length);
}
return ackIds;
+10 -8
View File
@@ -71,8 +71,8 @@ export function createMetrics(): Metrics & {
collectDefaultMetrics({ register: internal.registry });
const metricsImpl: Metrics & { serializeMetrics: () => Promise<string> } = {
inc(name: string, labels?: Record<string, string>): void {
dispatchInc(internal, name, labels);
inc(name: string, labels?: Record<string, string>, value?: number): void {
dispatchInc(internal, name, labels, value);
},
observe(name: string, value: number, labels?: Record<string, string>): void {
@@ -398,25 +398,27 @@ function dispatchInc(
r: InternalRegistry,
name: string,
labels?: Record<string, string>,
value?: number,
): void {
const v = value ?? 1;
switch (name) {
case 'processor_consumer_reads_total':
r.consumerReadsTotal.inc(labels ?? {});
r.consumerReadsTotal.inc(labels ?? {}, v);
break;
case 'processor_consumer_records_total':
r.consumerRecordsTotal.inc();
r.consumerRecordsTotal.inc(v);
break;
case 'processor_decode_errors_total':
r.decodeErrorsTotal.inc();
r.decodeErrorsTotal.inc(v);
break;
case 'processor_position_writes_total':
r.positionWritesTotal.inc(labels ?? {});
r.positionWritesTotal.inc(labels ?? {}, v);
break;
case 'processor_acks_total':
r.acksTotal.inc();
r.acksTotal.inc(v);
break;
case 'processor_device_state_evictions_total':
r.deviceStateEvictionsTotal.inc();
r.deviceStateEvictionsTotal.inc(v);
break;
default:
// Unknown metric name — silently ignore. This preserves the contract
+22 -15
View File
@@ -15,7 +15,7 @@
import { describe, it, expect, vi } from 'vitest';
import type { Logger } from 'pino';
import type { Config } from '../src/config/load.js';
import type { Position } from '../src/core/types.js';
import type { Position, Metrics } from '../src/core/types.js';
import { createDeviceStateStore } from '../src/core/state.js';
// ---------------------------------------------------------------------------
@@ -36,6 +36,13 @@ function makeSilentLogger(): Logger {
} as unknown as Logger;
}
function makeMetrics(): Metrics {
return {
inc: vi.fn(),
observe: vi.fn(),
};
}
function makeConfig(overrides: Partial<Config> = {}): Config {
return {
NODE_ENV: 'test',
@@ -77,7 +84,7 @@ function makePosition(deviceId: string, overrides: Partial<Position> = {}): Posi
describe('createDeviceStateStore — initial state', () => {
it('creates a new entry on first update', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger());
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
const position = makePosition('DEV001');
const state = store.update(position);
@@ -89,7 +96,7 @@ describe('createDeviceStateStore — initial state', () => {
});
it('increments position_count_session on subsequent updates', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger());
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
const pos1 = makePosition('DEV001', { timestamp: new Date('2024-05-01T12:00:00.000Z') });
const pos2 = makePosition('DEV001', { timestamp: new Date('2024-05-01T12:00:01.000Z') });
const pos3 = makePosition('DEV001', { timestamp: new Date('2024-05-01T12:00:02.000Z') });
@@ -102,13 +109,13 @@ describe('createDeviceStateStore — initial state', () => {
});
it('get() returns undefined for an unknown device', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger());
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
expect(store.get('UNKNOWN')).toBeUndefined();
});
it('get() returns the current state for a known device', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger());
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
const position = makePosition('DEV002');
store.update(position);
@@ -119,12 +126,12 @@ describe('createDeviceStateStore — initial state', () => {
});
it('size() returns 0 before any updates', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger());
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
expect(store.size()).toBe(0);
});
it('size() returns the number of distinct devices after updates', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger());
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
store.update(makePosition('DEV001'));
store.update(makePosition('DEV002'));
@@ -136,7 +143,7 @@ describe('createDeviceStateStore — initial state', () => {
describe('createDeviceStateStore — last_seen semantics', () => {
it('last_seen reflects the position timestamp (not wall clock)', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger());
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
const ts = new Date('2024-03-15T08:30:00.000Z');
const position = makePosition('DEV010', { timestamp: ts });
@@ -147,7 +154,7 @@ describe('createDeviceStateStore — last_seen semantics', () => {
});
it('last_seen advances on newer timestamps', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger());
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
const ts1 = new Date('2024-05-01T10:00:00.000Z');
const ts2 = new Date('2024-05-01T11:00:00.000Z');
@@ -161,7 +168,7 @@ describe('createDeviceStateStore — last_seen semantics', () => {
// Devices buffer offline records and replay them in bursts; within a burst
// consecutive timestamps may decrease. last_seen must mean "highest device
// timestamp seen so far" — it must never go backward.
const store = createDeviceStateStore(makeConfig(), makeSilentLogger());
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
const newer = new Date('2024-05-01T12:00:00.000Z');
const older = new Date('2024-05-01T10:00:00.000Z');
@@ -173,7 +180,7 @@ describe('createDeviceStateStore — last_seen semantics', () => {
});
it('last_seen stays the same when equal timestamps arrive', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger());
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
const ts = new Date('2024-05-01T12:00:00.000Z');
store.update(makePosition('DEV013', { timestamp: ts }));
@@ -185,7 +192,7 @@ describe('createDeviceStateStore — last_seen semantics', () => {
describe('createDeviceStateStore — LRU eviction', () => {
it('evicts the least-recently-updated device when cap is exceeded', () => {
const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 3 }), makeSilentLogger());
const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 3 }), makeSilentLogger(), makeMetrics());
const ts = new Date('2024-05-01T12:00:00.000Z');
// Insert 3 devices: DEV001, DEV002, DEV003 (DEV001 is oldest)
@@ -206,7 +213,7 @@ describe('createDeviceStateStore — LRU eviction', () => {
});
it('re-using an existing device bumps it to most-recent so it is not evicted next', () => {
const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 3 }), makeSilentLogger());
const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 3 }), makeSilentLogger(), makeMetrics());
const ts1 = new Date('2024-05-01T12:00:00.000Z');
const ts2 = new Date('2024-05-01T12:00:01.000Z');
@@ -228,7 +235,7 @@ describe('createDeviceStateStore — LRU eviction', () => {
});
it('evictedTotal() increments on each eviction', () => {
const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 2 }), makeSilentLogger());
const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 2 }), makeSilentLogger(), makeMetrics());
const ts = new Date('2024-05-01T12:00:00.000Z');
expect(store.evictedTotal()).toBe(0);
@@ -245,7 +252,7 @@ describe('createDeviceStateStore — LRU eviction', () => {
});
it('evictedTotal() stays 0 when cap is never reached', () => {
const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 1000 }), makeSilentLogger());
const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 1000 }), makeSilentLogger(), makeMetrics());
const ts = new Date('2024-05-01T12:00:00.000Z');
for (let i = 0; i < 10; i++) {
+3 -2
View File
@@ -242,6 +242,7 @@ describe('createWriter — pool error', () => {
expect(metrics.inc).toHaveBeenCalledWith(
'processor_position_writes_total',
{ status: 'failed' },
records.length,
);
});
});
@@ -491,8 +492,8 @@ describe('createWriter — metrics', () => {
const writer = createWriter(pool, makeConfig(), makeSilentLogger(), metrics);
await writer.write(records);
expect(metrics.inc).toHaveBeenCalledWith('processor_position_writes_total', { status: 'inserted' });
expect(metrics.inc).toHaveBeenCalledWith('processor_position_writes_total', { status: 'duplicate' });
expect(metrics.inc).toHaveBeenCalledWith('processor_position_writes_total', { status: 'inserted' }, 1);
expect(metrics.inc).toHaveBeenCalledWith('processor_position_writes_total', { status: 'duplicate' }, 1);
expect(metrics.observe).toHaveBeenCalledWith(
'processor_position_write_duration_seconds',
expect.any(Number),