Files
julian d758c211ae Fix metric wiring gaps audited against live processor output
Several Phase 1 metrics were registered in observability/metrics.ts but
either unwired at the call sites or wired with wrong counts. Production
output showed 11 records ingested per logs but only 4 in metrics. The
fixes below align metric values with actual hot-path activity.

Wiring gaps closed (consumer.ts):
- processor_consumer_reads_total{result=ok|empty|error} — was registered
  but never inc'd. Now fires on each XREADGROUP outcome.
- processor_consumer_records_total — was registered but never inc'd.
  Now fires once per XREADGROUP, with the entry count.

Counts corrected (writer.ts):
- processor_position_writes_total{status} — was inc'd unconditionally
  by 1 per chunk for each of inserted/duplicate. Now inc'd by the
  actual per-status count, and only when count > 0.
- processor_position_writes_total{status='failed'} — was inc'd by 1
  per failed chunk. Now inc'd by chunk.length so every failed record
  is counted.

Counts corrected (main.ts):
- processor_acks_total — was inc'd by 1 per non-empty batch. Now
  inc'd by ackIds.length so every ACK'd ID is counted.

Wiring gap closed (state.ts):
- processor_device_state_evictions_total — internal `evicted` counter
  existed but was never published to metrics. createDeviceStateStore
  now accepts a Metrics injection and inc's on each eviction.

Metrics interface extended (types.ts, metrics.ts):
- Metrics.inc gained an optional third `value` parameter (defaults to 1)
  for batched increments. dispatchInc passes it through to prom-client's
  Counter.inc(labels, value).

Tests updated to reflect the new third arg and the state.ts factory's
new metrics parameter. Total 134 unit tests passing (no count change —
existing tests adjusted, no new tests added; the real verification is
on stage where the metrics are now meaningful again).
2026-05-01 11:43:06 +02:00

265 lines
9.7 KiB
TypeScript

/**
* Unit tests for src/core/state.ts
*
* Covers:
* - First update creates entry; subsequent updates increment position_count_session
* - LRU eviction: with cap=3, after 4 distinct devices the oldest is evicted
* - Eviction increments evictedTotal()
* - last_seen reflects the position's timestamp (device-reported time)
* - Out-of-order positions: last_seen only advances forward (max semantics)
* - get() returns undefined for unknown devices
* - size() returns the current number of stored devices
* - LRU order: most-recently-updated device is not evicted on overflow
*/
import { describe, it, expect, vi } from 'vitest';
import type { Logger } from 'pino';
import type { Config } from '../src/config/load.js';
import type { Position, Metrics } from '../src/core/types.js';
import { createDeviceStateStore } from '../src/core/state.js';
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeSilentLogger(): Logger {
return {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
fatal: vi.fn(),
child: vi.fn().mockReturnThis(),
trace: vi.fn(),
level: 'silent',
silent: vi.fn(),
} as unknown as Logger;
}
function makeMetrics(): Metrics {
return {
inc: vi.fn(),
observe: vi.fn(),
};
}
function makeConfig(overrides: Partial<Config> = {}): Config {
return {
NODE_ENV: 'test',
INSTANCE_ID: 'test-processor',
LOG_LEVEL: 'silent',
REDIS_URL: 'redis://localhost:6379',
POSTGRES_URL: 'postgres://localhost:5432/test',
REDIS_TELEMETRY_STREAM: 'telemetry:t',
REDIS_CONSUMER_GROUP: 'processor',
REDIS_CONSUMER_NAME: 'test-consumer',
METRICS_PORT: 9090,
BATCH_SIZE: 10,
BATCH_BLOCK_MS: 100,
WRITE_BATCH_SIZE: 50,
DEVICE_STATE_LRU_CAP: 1000,
...overrides,
};
}
function makePosition(deviceId: string, overrides: Partial<Position> = {}): Position {
return {
device_id: deviceId,
timestamp: new Date('2024-05-01T12:00:00.000Z'),
latitude: 54.6872,
longitude: 25.2797,
altitude: 100,
angle: 90,
speed: 50,
satellites: 12,
priority: 1,
attributes: {},
...overrides,
};
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe('createDeviceStateStore — initial state', () => {
it('creates a new entry on first update', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
const position = makePosition('DEV001');
const state = store.update(position);
expect(state.device_id).toBe('DEV001');
expect(state.last_position).toBe(position);
expect(state.position_count_session).toBe(1);
expect(state.last_seen).toEqual(position.timestamp);
});
it('increments position_count_session on subsequent updates', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
const pos1 = makePosition('DEV001', { timestamp: new Date('2024-05-01T12:00:00.000Z') });
const pos2 = makePosition('DEV001', { timestamp: new Date('2024-05-01T12:00:01.000Z') });
const pos3 = makePosition('DEV001', { timestamp: new Date('2024-05-01T12:00:02.000Z') });
store.update(pos1);
store.update(pos2);
const state = store.update(pos3);
expect(state.position_count_session).toBe(3);
});
it('get() returns undefined for an unknown device', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
expect(store.get('UNKNOWN')).toBeUndefined();
});
it('get() returns the current state for a known device', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
const position = makePosition('DEV002');
store.update(position);
const state = store.get('DEV002');
expect(state).toBeDefined();
expect(state?.device_id).toBe('DEV002');
});
it('size() returns 0 before any updates', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
expect(store.size()).toBe(0);
});
it('size() returns the number of distinct devices after updates', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
store.update(makePosition('DEV001'));
store.update(makePosition('DEV002'));
store.update(makePosition('DEV001')); // duplicate device — should not increase size
expect(store.size()).toBe(2);
});
});
describe('createDeviceStateStore — last_seen semantics', () => {
it('last_seen reflects the position timestamp (not wall clock)', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
const ts = new Date('2024-03-15T08:30:00.000Z');
const position = makePosition('DEV010', { timestamp: ts });
const state = store.update(position);
expect(state.last_seen).toEqual(ts);
expect(state.last_seen).not.toBe(new Date()); // not wall clock
});
it('last_seen advances on newer timestamps', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
const ts1 = new Date('2024-05-01T10:00:00.000Z');
const ts2 = new Date('2024-05-01T11:00:00.000Z');
store.update(makePosition('DEV011', { timestamp: ts1 }));
const state = store.update(makePosition('DEV011', { timestamp: ts2 }));
expect(state.last_seen).toEqual(ts2);
});
it('last_seen does NOT regress on out-of-order (older) timestamps', () => {
// Devices buffer offline records and replay them in bursts; within a burst
// consecutive timestamps may decrease. last_seen must mean "highest device
// timestamp seen so far" — it must never go backward.
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
const newer = new Date('2024-05-01T12:00:00.000Z');
const older = new Date('2024-05-01T10:00:00.000Z');
store.update(makePosition('DEV012', { timestamp: newer }));
const state = store.update(makePosition('DEV012', { timestamp: older }));
// last_seen must remain at the newer timestamp, not regress to older
expect(state.last_seen).toEqual(newer);
});
it('last_seen stays the same when equal timestamps arrive', () => {
const store = createDeviceStateStore(makeConfig(), makeSilentLogger(), makeMetrics());
const ts = new Date('2024-05-01T12:00:00.000Z');
store.update(makePosition('DEV013', { timestamp: ts }));
const state = store.update(makePosition('DEV013', { timestamp: new Date(ts.getTime()) }));
expect(state.last_seen).toEqual(ts);
});
});
describe('createDeviceStateStore — LRU eviction', () => {
it('evicts the least-recently-updated device when cap is exceeded', () => {
const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 3 }), makeSilentLogger(), makeMetrics());
const ts = new Date('2024-05-01T12:00:00.000Z');
// Insert 3 devices: DEV001, DEV002, DEV003 (DEV001 is oldest)
store.update(makePosition('DEV001', { timestamp: ts }));
store.update(makePosition('DEV002', { timestamp: ts }));
store.update(makePosition('DEV003', { timestamp: ts }));
expect(store.size()).toBe(3);
// Add a 4th device — DEV001 (the oldest / least-recently-updated) should be evicted
store.update(makePosition('DEV004', { timestamp: ts }));
expect(store.size()).toBe(3);
expect(store.get('DEV001')).toBeUndefined(); // evicted
expect(store.get('DEV002')).toBeDefined();
expect(store.get('DEV003')).toBeDefined();
expect(store.get('DEV004')).toBeDefined();
});
it('re-using an existing device bumps it to most-recent so it is not evicted next', () => {
const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 3 }), makeSilentLogger(), makeMetrics());
const ts1 = new Date('2024-05-01T12:00:00.000Z');
const ts2 = new Date('2024-05-01T12:00:01.000Z');
store.update(makePosition('DEV001', { timestamp: ts1 }));
store.update(makePosition('DEV002', { timestamp: ts1 }));
store.update(makePosition('DEV003', { timestamp: ts1 }));
// Re-touch DEV001 — it should now be the most-recently-updated
store.update(makePosition('DEV001', { timestamp: ts2 }));
// Add DEV004 — DEV002 should be evicted (it is now the oldest)
store.update(makePosition('DEV004', { timestamp: ts1 }));
expect(store.size()).toBe(3);
expect(store.get('DEV001')).toBeDefined(); // was re-touched
expect(store.get('DEV002')).toBeUndefined(); // evicted (oldest after DEV001 was re-touched)
expect(store.get('DEV003')).toBeDefined();
expect(store.get('DEV004')).toBeDefined();
});
it('evictedTotal() increments on each eviction', () => {
const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 2 }), makeSilentLogger(), makeMetrics());
const ts = new Date('2024-05-01T12:00:00.000Z');
expect(store.evictedTotal()).toBe(0);
store.update(makePosition('DEV001', { timestamp: ts }));
store.update(makePosition('DEV002', { timestamp: ts }));
expect(store.evictedTotal()).toBe(0);
store.update(makePosition('DEV003', { timestamp: ts })); // evicts DEV001
expect(store.evictedTotal()).toBe(1);
store.update(makePosition('DEV004', { timestamp: ts })); // evicts DEV002
expect(store.evictedTotal()).toBe(2);
});
it('evictedTotal() stays 0 when cap is never reached', () => {
const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 1000 }), makeSilentLogger(), makeMetrics());
const ts = new Date('2024-05-01T12:00:00.000Z');
for (let i = 0; i < 10; i++) {
store.update(makePosition(`DEV${i}`, { timestamp: ts }));
}
expect(store.evictedTotal()).toBe(0);
});
});