/** * Unit tests for src/observability/metrics.ts * * Covers: * - createMetrics(): Prometheus exposition format contains all Phase 1 metrics * - Counter increments via metrics.inc() * - Histogram observation via metrics.observe() * - Gauge set via metrics.observe() for processor_consumer_lag and processor_device_state_size * - Unknown metric name is silently ignored (no throw) * - startMetricsServer(): GET /metrics returns 200 with text/plain * - startMetricsServer(): GET /healthz returns 200 {"status":"ok"} * - startMetricsServer(): GET /readyz returns 200 when both deps are ready * - startMetricsServer(): GET /readyz returns 503 when Redis is not ready * - startMetricsServer(): GET /readyz returns 503 when Postgres is not ready * - startMetricsServer(): GET /readyz returns 503 when neither dep is ready * - startMetricsServer(): non-GET method returns 405 * - startMetricsServer(): unknown path returns 404 * - nodejs_* default metrics are present in the exposition output */ import { describe, it, expect, beforeAll, afterAll, vi } from 'vitest'; import * as http from 'node:http'; import { createMetrics, startMetricsServer } from '../src/observability/metrics.js'; import type { ReadyzDeps } from '../src/observability/metrics.js'; // --------------------------------------------------------------------------- // HTTP helper — makes a simple GET (or other method) against the test server // --------------------------------------------------------------------------- function httpGet( port: number, path: string, method = 'GET', ): Promise<{ statusCode: number; body: string; contentType: string }> { return new Promise((resolve, reject) => { const req = http.request({ hostname: '127.0.0.1', port, path, method }, (res) => { let body = ''; res.on('data', (chunk: Buffer) => { body += chunk.toString(); }); res.on('end', () => { resolve({ statusCode: res.statusCode ?? 0, body, contentType: (res.headers['content-type'] as string | undefined) ?? '', }); }); }); req.on('error', reject); req.end(); }); } // --------------------------------------------------------------------------- // createMetrics tests // --------------------------------------------------------------------------- describe('createMetrics — exposition format', () => { it('returns valid Prometheus text format containing all Phase 1 metrics', async () => { const metrics = createMetrics(); const text = await metrics.serializeMetrics(); // Every metric from the task 1.9 inventory must appear in the output. expect(text).toContain('processor_consumer_reads_total'); expect(text).toContain('processor_consumer_records_total'); expect(text).toContain('processor_consumer_lag'); expect(text).toContain('processor_decode_errors_total'); expect(text).toContain('processor_position_writes_total'); expect(text).toContain('processor_position_write_duration_seconds'); expect(text).toContain('processor_acks_total'); expect(text).toContain('processor_device_state_size'); expect(text).toContain('processor_device_state_evictions_total'); // Default Node.js process metrics must be present. expect(text).toContain('nodejs_'); }); it('label-less counters appear in the exposition at 0 before any inc() call', async () => { const metrics = createMetrics(); const text = await metrics.serializeMetrics(); // prom-client emits label-less counters at 0 from the start. // Counters with label dims only appear once .inc() is called with a label value. expect(text).toMatch(/processor_consumer_records_total\s+0/); expect(text).toMatch(/processor_decode_errors_total\s+0/); expect(text).toMatch(/processor_acks_total\s+0/); expect(text).toMatch(/processor_device_state_evictions_total\s+0/); }); }); describe('createMetrics — counter increments', () => { it('increments processor_consumer_reads_total with label', async () => { const metrics = createMetrics(); metrics.inc('processor_consumer_reads_total', { result: 'ok' }); metrics.inc('processor_consumer_reads_total', { result: 'ok' }); metrics.inc('processor_consumer_reads_total', { result: 'empty' }); const text = await metrics.serializeMetrics(); // result="ok" incremented twice expect(text).toMatch(/processor_consumer_reads_total\{result="ok"\} 2/); // result="empty" incremented once expect(text).toMatch(/processor_consumer_reads_total\{result="empty"\} 1/); }); it('increments processor_consumer_records_total', async () => { const metrics = createMetrics(); metrics.inc('processor_consumer_records_total'); metrics.inc('processor_consumer_records_total'); metrics.inc('processor_consumer_records_total'); const text = await metrics.serializeMetrics(); expect(text).toMatch(/processor_consumer_records_total\s+3/); }); it('increments processor_decode_errors_total', async () => { const metrics = createMetrics(); metrics.inc('processor_decode_errors_total'); const text = await metrics.serializeMetrics(); expect(text).toMatch(/processor_decode_errors_total\s+1/); }); it('increments processor_position_writes_total with status label', async () => { const metrics = createMetrics(); metrics.inc('processor_position_writes_total', { status: 'inserted' }); metrics.inc('processor_position_writes_total', { status: 'duplicate' }); metrics.inc('processor_position_writes_total', { status: 'failed' }); const text = await metrics.serializeMetrics(); expect(text).toMatch(/processor_position_writes_total\{status="inserted"\} 1/); expect(text).toMatch(/processor_position_writes_total\{status="duplicate"\} 1/); expect(text).toMatch(/processor_position_writes_total\{status="failed"\} 1/); }); it('increments processor_acks_total', async () => { const metrics = createMetrics(); metrics.inc('processor_acks_total'); metrics.inc('processor_acks_total'); const text = await metrics.serializeMetrics(); expect(text).toMatch(/processor_acks_total\s+2/); }); it('increments processor_device_state_evictions_total', async () => { const metrics = createMetrics(); metrics.inc('processor_device_state_evictions_total'); const text = await metrics.serializeMetrics(); expect(text).toMatch(/processor_device_state_evictions_total\s+1/); }); it('silently ignores unknown metric names', () => { const metrics = createMetrics(); // Must not throw expect(() => metrics.inc('no_such_metric_total')).not.toThrow(); expect(() => metrics.observe('no_such_metric', 42)).not.toThrow(); }); }); describe('createMetrics — gauge and histogram', () => { it('sets processor_consumer_lag via observe()', async () => { const metrics = createMetrics(); metrics.observe('processor_consumer_lag', 42); const text = await metrics.serializeMetrics(); expect(text).toMatch(/processor_consumer_lag\s+42/); }); it('sets processor_device_state_size via observe()', async () => { const metrics = createMetrics(); metrics.observe('processor_device_state_size', 7); const text = await metrics.serializeMetrics(); expect(text).toMatch(/processor_device_state_size\s+7/); }); it('records processor_position_write_duration_seconds histogram observation', async () => { const metrics = createMetrics(); metrics.observe('processor_position_write_duration_seconds', 0.007); const text = await metrics.serializeMetrics(); // Histogram emits _bucket, _sum, _count lines. expect(text).toContain('processor_position_write_duration_seconds_sum'); expect(text).toContain('processor_position_write_duration_seconds_count 1'); }); it('histogram buckets include all spec-defined breakpoints', async () => { const metrics = createMetrics(); const text = await metrics.serializeMetrics(); // Spec buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5] const expectedBuckets = ['0.001', '0.005', '0.01', '0.05', '0.1', '0.5', '1', '5']; for (const bucket of expectedBuckets) { expect(text).toContain(`le="${bucket}"`); } }); }); // --------------------------------------------------------------------------- // startMetricsServer tests // --------------------------------------------------------------------------- describe('startMetricsServer — HTTP endpoints', () => { let server: http.Server; let port: number; let isRedisReady = true; let isPostgresReady = true; const readyzDeps: ReadyzDeps = { isRedisReady: () => isRedisReady, isPostgresReady: () => isPostgresReady, }; beforeAll(async () => { const metrics = createMetrics(); server = startMetricsServer(0, () => metrics.serializeMetrics(), readyzDeps); // Wait for the server to bind a port (port=0 lets OS pick) await new Promise((resolve) => { if (server.listening) { resolve(); } else { server.once('listening', () => resolve()); } }); const addr = server.address(); port = typeof addr === 'object' && addr !== null ? addr.port : 0; }); afterAll(async () => { await new Promise((resolve, reject) => server.close((err) => (err ? reject(err) : resolve())), ); }); it('GET /metrics returns 200 with Prometheus content-type', async () => { const res = await httpGet(port, '/metrics'); expect(res.statusCode).toBe(200); expect(res.contentType).toMatch('text/plain'); expect(res.body).toContain('processor_consumer_reads_total'); }); it('GET /healthz returns 200 with {"status":"ok"}', async () => { const res = await httpGet(port, '/healthz'); expect(res.statusCode).toBe(200); expect(JSON.parse(res.body)).toEqual({ status: 'ok' }); }); it('GET /readyz returns 200 when both Redis and Postgres are ready', async () => { isRedisReady = true; isPostgresReady = true; const res = await httpGet(port, '/readyz'); expect(res.statusCode).toBe(200); expect(JSON.parse(res.body)).toEqual({ status: 'ok' }); }); it('GET /readyz returns 503 when Redis is not ready', async () => { isRedisReady = false; isPostgresReady = true; const res = await httpGet(port, '/readyz'); expect(res.statusCode).toBe(503); const body = JSON.parse(res.body) as { status: string; redis: boolean; postgres: boolean }; expect(body.status).toBe('not ready'); expect(body.redis).toBe(false); expect(body.postgres).toBe(true); isRedisReady = true; }); it('GET /readyz returns 503 when Postgres is not ready', async () => { isRedisReady = true; isPostgresReady = false; const res = await httpGet(port, '/readyz'); expect(res.statusCode).toBe(503); const body = JSON.parse(res.body) as { status: string; redis: boolean; postgres: boolean }; expect(body.status).toBe('not ready'); expect(body.redis).toBe(true); expect(body.postgres).toBe(false); isPostgresReady = true; }); it('GET /readyz returns 503 when both Redis and Postgres are not ready', async () => { isRedisReady = false; isPostgresReady = false; const res = await httpGet(port, '/readyz'); expect(res.statusCode).toBe(503); const body = JSON.parse(res.body) as { status: string; redis: boolean; postgres: boolean }; expect(body.redis).toBe(false); expect(body.postgres).toBe(false); isRedisReady = true; isPostgresReady = true; }); it('non-GET request returns 405', async () => { const res = await httpGet(port, '/metrics', 'POST'); expect(res.statusCode).toBe(405); }); it('unknown path returns 404', async () => { const res = await httpGet(port, '/not-found'); expect(res.statusCode).toBe(404); }); }); describe('startMetricsServer — /metrics error path', () => { it('returns 500 when serializeMetrics rejects', async () => { const serializeMetrics = vi.fn().mockRejectedValue(new Error('prom-client exploded')); const server = startMetricsServer( 0, serializeMetrics, { isRedisReady: () => true, isPostgresReady: () => true }, ); await new Promise((resolve) => { if (server.listening) resolve(); else server.once('listening', () => resolve()); }); const addr = server.address(); const port = typeof addr === 'object' && addr !== null ? addr.port : 0; const res = await httpGet(port, '/metrics'); expect(res.statusCode).toBe(500); expect(res.body).toContain('prom-client exploded'); await new Promise((resolve, reject) => server.close((err) => (err ? reject(err) : resolve())), ); }); });