/** * Unit tests for src/core/consumer.ts * * All Redis I/O is mocked — no real Redis required. The integration test * (task 1.10) covers the end-to-end round-trip. * * Covers: * - Decodes a synthetic stream entry into a ConsumedRecord with the right shape * - Calls sink with the decoded batch and ACKs only the IDs the sink returned * - Partial ACK: sink returns subset of IDs; only those are ACKed * - BUSYGROUP error from XGROUP CREATE is swallowed and continues * - Malformed payload: increments metric, logs at error, does NOT ACK the entry * - Missing payload field: logs at error, does NOT ACK the entry * - stop() causes the loop to exit cleanly * - XREADGROUP failure logs error and backs off without crashing */ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import type { MockedFunction } from 'vitest'; import type { Redis } from 'ioredis'; import type { Logger } from 'pino'; import type { Config } from '../src/config/load.js'; import type { Metrics, Position } from '../src/core/types.js'; // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- function makeSilentLogger(): Logger { return { debug: vi.fn(), info: vi.fn(), warn: vi.fn(), error: vi.fn(), fatal: vi.fn(), child: vi.fn().mockReturnThis(), trace: vi.fn(), level: 'silent', silent: vi.fn(), } as unknown as Logger; } function makeMetrics(): Metrics & { incCalls: Array<{ name: string; labels?: Record }>; } { const incCalls: Array<{ name: string; labels?: Record }> = []; return { incCalls, inc: (name: string, labels?: Record) => { incCalls.push({ name, labels }); }, observe: vi.fn(), }; } function makeConfig(overrides: Partial = {}): Config { return { NODE_ENV: 'test', INSTANCE_ID: 'test-processor', LOG_LEVEL: 'silent', REDIS_URL: 'redis://localhost:6379', POSTGRES_URL: 'postgres://localhost:5432/test', REDIS_TELEMETRY_STREAM: 'telemetry:t', REDIS_CONSUMER_GROUP: 'processor', REDIS_CONSUMER_NAME: 'test-consumer', METRICS_PORT: 9090, BATCH_SIZE: 10, BATCH_BLOCK_MS: 100, WRITE_BATCH_SIZE: 50, DEVICE_STATE_LRU_CAP: 1000, ...overrides, }; } /** * Builds the JSON payload for a synthetic Position, mirroring tcp-ingestion's * serialization format (sentinel encoding for bigint/Buffer/Date). */ function buildPayload(overrides: Partial = {}): string { const position: Position = { device_id: 'TESTDEVICE001', timestamp: new Date('2024-05-01T12:00:00.000Z'), latitude: 54.6872, longitude: 25.2797, altitude: 100, angle: 90, speed: 50, satellites: 12, priority: 1, attributes: {}, ...overrides, }; function jsonReplacer(_key: string, value: unknown): unknown { if (typeof value === 'bigint') return { __bigint: value.toString() }; if (value instanceof Uint8Array) return { __buffer_b64: Buffer.from(value).toString('base64') }; if (value instanceof Date) return value.toISOString(); return value; } return JSON.stringify(position, jsonReplacer); } /** * Builds a raw XREADGROUP response for a single entry. * ioredis returns: [[streamName, [[entryId, [field, value, ...]], ...]]] */ function buildXreadgroupResponse( stream: string, entries: Array<{ id: string; fields: Record }>, ): [string, [string, string[]][]][] { return [ [ stream, entries.map(({ id, fields }) => [ id, Object.entries(fields).flat(), ] as [string, string[]]), ], ]; } // --------------------------------------------------------------------------- // Mock ioredis // --------------------------------------------------------------------------- type MockRedis = { xgroup: MockedFunction<(...args: unknown[]) => Promise>; xreadgroup: MockedFunction<(...args: unknown[]) => Promise>; xack: MockedFunction<(...args: unknown[]) => Promise>; }; function makeMockRedis(): MockRedis { return { xgroup: vi.fn().mockResolvedValue('OK'), xreadgroup: vi.fn().mockResolvedValue(null), // default: BLOCK timeout xack: vi.fn().mockResolvedValue(1), }; } // --------------------------------------------------------------------------- // ensureConsumerGroup tests // --------------------------------------------------------------------------- import { ensureConsumerGroup } from '../src/core/consumer.js'; describe('ensureConsumerGroup', () => { it('calls XGROUP CREATE with MKSTREAM and $ start ID', async () => { const redis = makeMockRedis(); const logger = makeSilentLogger(); await ensureConsumerGroup(redis as unknown as Redis, 'telemetry:t', 'processor', logger); expect(redis.xgroup).toHaveBeenCalledWith('CREATE', 'telemetry:t', 'processor', '$', 'MKSTREAM'); expect(logger.info).toHaveBeenCalledWith( expect.objectContaining({ stream: 'telemetry:t', group: 'processor' }), 'consumer group created', ); }); it('swallows BUSYGROUP error and logs info', async () => { const redis = makeMockRedis(); redis.xgroup.mockRejectedValue(new Error('BUSYGROUP Consumer Group name already exists')); const logger = makeSilentLogger(); await expect( ensureConsumerGroup(redis as unknown as Redis, 'telemetry:t', 'processor', logger), ).resolves.toBeUndefined(); expect(logger.info).toHaveBeenCalledWith( expect.objectContaining({ stream: 'telemetry:t', group: 'processor' }), 'consumer group already exists', ); }); it('rethrows non-BUSYGROUP errors', async () => { const redis = makeMockRedis(); redis.xgroup.mockRejectedValue(new Error('NOPERM no permissions')); const logger = makeSilentLogger(); await expect( ensureConsumerGroup(redis as unknown as Redis, 'telemetry:t', 'processor', logger), ).rejects.toThrow('NOPERM no permissions'); }); }); // --------------------------------------------------------------------------- // createConsumer tests // --------------------------------------------------------------------------- import { createConsumer } from '../src/core/consumer.js'; import type { ConsumedRecord } from '../src/core/consumer.js'; describe('createConsumer — happy path', () => { afterEach(() => { vi.restoreAllMocks(); vi.useRealTimers(); }); it('decodes a stream entry and passes a ConsumedRecord to the sink', async () => { const redis = makeMockRedis(); const logger = makeSilentLogger(); const metrics = makeMetrics(); const config = makeConfig(); const payload = buildPayload({ device_id: 'DEV001' }); const stream = 'telemetry:t'; const entryId = '1714488000000-0'; // First call: return one entry. Subsequent calls: return null (BLOCK timeout). redis.xreadgroup .mockResolvedValueOnce( buildXreadgroupResponse(stream, [ { id: entryId, fields: { ts: '2024-05-01T12:00:00.000Z', device_id: 'DEV001', codec: '8', payload } }, ]), ) .mockResolvedValue(null); const receivedRecords: ConsumedRecord[][] = []; let consumerRef: ReturnType | undefined; const sink = vi.fn(async (records: ConsumedRecord[]) => { receivedRecords.push(records); // Stop the consumer after processing the first batch so the loop exits. void consumerRef?.stop(); return records.map((r) => r.id); }); const consumer = createConsumer( redis as unknown as Redis, config, logger, metrics, sink, ); consumerRef = consumer; await consumer.start(); // Wait for the consumer to process and stop await consumer.stop(); expect(receivedRecords.length).toBeGreaterThanOrEqual(1); const firstBatch = receivedRecords[0]; expect(firstBatch).toBeDefined(); expect(firstBatch!.length).toBe(1); const record = firstBatch![0]!; expect(record.id).toBe(entryId); expect(record.codec).toBe('8'); expect(record.ts).toBe('2024-05-01T12:00:00.000Z'); expect(record.position.device_id).toBe('DEV001'); expect(record.position.latitude).toBe(54.6872); }); it('ACKs only the IDs returned by the sink (partial ACK)', async () => { const redis = makeMockRedis(); const logger = makeSilentLogger(); const metrics = makeMetrics(); const config = makeConfig(); const stream = 'telemetry:t'; const ids = ['1000-0', '1000-1', '1000-2']; const entries = ids.map((id) => ({ id, fields: { ts: '2024-05-01T12:00:00.000Z', device_id: `DEV${id}`, codec: '8', payload: buildPayload({ device_id: `DEV${id}` }), }, })); let consumerRef: ReturnType | undefined; redis.xreadgroup .mockResolvedValueOnce(buildXreadgroupResponse(stream, entries)) .mockResolvedValue(null); // Sink returns only the first and third IDs — second stays pending const sink = vi.fn(async (records: ConsumedRecord[]) => { void consumerRef?.stop(); return [records[0]!.id, records[2]!.id]; }); const consumer = createConsumer( redis as unknown as Redis, config, logger, metrics, sink, ); consumerRef = consumer; await consumer.start(); await consumer.stop(); expect(redis.xack).toHaveBeenCalledWith(stream, 'processor', ids[0], ids[2]); // id[1] must NOT be in any xack call const xackCalls = redis.xack.mock.calls.flat(); expect(xackCalls).not.toContain(ids[1]); }); it('does not call xack when sink returns an empty array', async () => { const redis = makeMockRedis(); const logger = makeSilentLogger(); const metrics = makeMetrics(); const config = makeConfig(); const stream = 'telemetry:t'; let consumerRef: ReturnType | undefined; redis.xreadgroup .mockResolvedValueOnce( buildXreadgroupResponse(stream, [ { id: '2000-0', fields: { ts: '2024-05-01T12:00:00.000Z', device_id: 'DEV002', codec: '8', payload: buildPayload({ device_id: 'DEV002' }), }, }, ]), ) .mockResolvedValue(null); const sink = vi.fn(async (_records: ConsumedRecord[]) => { void consumerRef?.stop(); return []; }); const consumer = createConsumer( redis as unknown as Redis, config, logger, metrics, sink, ); consumerRef = consumer; await consumer.start(); await consumer.stop(); expect(redis.xack).not.toHaveBeenCalled(); }); }); describe('createConsumer — decode errors', () => { afterEach(() => { vi.restoreAllMocks(); }); it('skips malformed payload: increments metric, logs error, does not ACK', async () => { const redis = makeMockRedis(); const logger = makeSilentLogger(); const metrics = makeMetrics(); const config = makeConfig(); const stream = 'telemetry:t'; const badId = '3000-0'; let consumerRef: ReturnType | undefined; redis.xreadgroup .mockResolvedValueOnce( buildXreadgroupResponse(stream, [ { id: badId, fields: { ts: '2024-05-01T12:00:00.000Z', device_id: 'DEV003', codec: '8', payload: 'not valid json {{{', }, }, ]), ) .mockResolvedValue(null); const sink = vi.fn(async (_records: ConsumedRecord[]) => { void consumerRef?.stop(); return []; }); const consumer = createConsumer( redis as unknown as Redis, config, logger, metrics, sink, ); consumerRef = consumer; await consumer.start(); await consumer.stop(); // Decode error metric incremented expect(metrics.incCalls.some((c) => c.name === 'processor_decode_errors_total')).toBe(true); // Logged at error expect(logger.error).toHaveBeenCalled(); // Sink was called with empty records (bad entry filtered out) expect(sink).toHaveBeenCalledWith([]); // No XACK for the bad entry expect(redis.xack).not.toHaveBeenCalledWith(stream, 'processor', badId); }); it('skips entry with missing payload field: increments metric, logs error, does not ACK', async () => { const redis = makeMockRedis(); const logger = makeSilentLogger(); const metrics = makeMetrics(); const config = makeConfig(); const stream = 'telemetry:t'; const badId = '3001-0'; let consumerRef: ReturnType | undefined; redis.xreadgroup .mockResolvedValueOnce( buildXreadgroupResponse(stream, [ { id: badId, // No payload field fields: { ts: '2024-05-01T12:00:00.000Z', device_id: 'DEV004', codec: '8' }, }, ]), ) .mockResolvedValue(null); const sink = vi.fn(async (_records: ConsumedRecord[]) => { void consumerRef?.stop(); return []; }); const consumer = createConsumer( redis as unknown as Redis, config, logger, metrics, sink, ); consumerRef = consumer; await consumer.start(); await consumer.stop(); expect(metrics.incCalls.some((c) => c.name === 'processor_decode_errors_total')).toBe(true); expect(logger.error).toHaveBeenCalled(); expect(redis.xack).not.toHaveBeenCalled(); }); it('valid and invalid entries in the same batch: ACKs only valid ones', async () => { const redis = makeMockRedis(); const logger = makeSilentLogger(); const metrics = makeMetrics(); const config = makeConfig(); const stream = 'telemetry:t'; const goodId = '4000-0'; const badId = '4000-1'; let consumerRef: ReturnType | undefined; redis.xreadgroup .mockResolvedValueOnce( buildXreadgroupResponse(stream, [ { id: goodId, fields: { ts: '2024-05-01T12:00:00.000Z', device_id: 'DEV005', codec: '8', payload: buildPayload({ device_id: 'DEV005' }), }, }, { id: badId, fields: { ts: '2024-05-01T12:00:00.000Z', device_id: 'DEV005', codec: '8', payload: 'not json', }, }, ]), ) .mockResolvedValue(null); const sink = vi.fn(async (records: ConsumedRecord[]) => { void consumerRef?.stop(); return records.map((r) => r.id); }); const consumer = createConsumer( redis as unknown as Redis, config, logger, metrics, sink, ); consumerRef = consumer; await consumer.start(); await consumer.stop(); // Sink received only the good record expect(sink).toHaveBeenCalledWith( expect.arrayContaining([expect.objectContaining({ id: goodId })]), ); expect(sink).toHaveBeenCalledWith( expect.not.arrayContaining([expect.objectContaining({ id: badId })]), ); // ACK called for good entry only expect(redis.xack).toHaveBeenCalledWith(stream, 'processor', goodId); const xackArgs = redis.xack.mock.calls.flat(); expect(xackArgs).not.toContain(badId); }); }); describe('createConsumer — XREADGROUP failure', () => { beforeEach(() => { vi.useFakeTimers(); }); afterEach(() => { vi.useRealTimers(); vi.restoreAllMocks(); }); it('backs off and retries after XREADGROUP error', async () => { const redis = makeMockRedis(); const logger = makeSilentLogger(); const metrics = makeMetrics(); const config = makeConfig({ BATCH_BLOCK_MS: 10 }); let consumerRef: ReturnType | undefined; let callCount = 0; redis.xreadgroup.mockImplementation(async () => { callCount++; if (callCount === 1) { throw new Error('LOADING Redis is loading the dataset in memory'); } // Stop consumer on second call void consumerRef?.stop(); return null; }); const sink = vi.fn(async () => []); const consumer = createConsumer( redis as unknown as Redis, config, logger, metrics, sink, ); consumerRef = consumer; await consumer.start(); // Advance timers past the 1000ms backoff await vi.advanceTimersByTimeAsync(1_100); await consumer.stop(); expect(logger.error).toHaveBeenCalledWith( expect.objectContaining({ err: expect.anything() }), 'XREADGROUP failed; backing off', ); // Should have retried at least once expect(callCount).toBeGreaterThanOrEqual(2); }); }); describe('createConsumer — clean stop', () => { afterEach(() => { vi.restoreAllMocks(); }); it('stop() returns after current batch completes', async () => { const redis = makeMockRedis(); const logger = makeSilentLogger(); const metrics = makeMetrics(); const config = makeConfig(); // Return null immediately (BLOCK timeout) so the loop spins and we can stop it redis.xreadgroup.mockResolvedValue(null); const sink = vi.fn(async () => []); const consumer = createConsumer( redis as unknown as Redis, config, logger, metrics, sink, ); await consumer.start(); // stop() should resolve without hanging await expect(consumer.stop()).resolves.toBeUndefined(); }); });