2a50aaf175
src/core/consumer.ts — XREADGROUP loop with consumer-group resumption, ensureConsumerGroup (BUSYGROUP-tolerant), decodeBatch (CodecError → log + skip + leave pending; never speculative ACK), partial-ACK semantics, connectRedis (mirroring tcp-ingestion's retry pattern), clean stop. src/core/state.ts — LRU Map<device_id, DeviceState> using delete+set bump trick (no third-party LRU dep); last_seen = max(prev, ts) so out-of-order replays don't regress the high-water mark; evictedTotal() counter. src/core/writer.ts — multi-row INSERT ON CONFLICT (device_id, ts) DO NOTHING with RETURNING. Duplicate detection by set-difference between input and RETURNING rows (xmax=0 doesn't work for skipped-conflict rows, only returned ones — confirmed in the task spec's own Note). Sequential chunking to WRITE_BATCH_SIZE; bigint→string and Buffer→base64 attribute serialization that handles Buffer.toJSON shape. src/main.ts — full pipeline: pool → migrate → redis → state → writer → sink → consumer → graceful-shutdown stub. Sink ordering is state.update BEFORE writer.write per spec rationale (state stays consistent with what's been seen even if not yet persisted; redelivery is idempotent on state). Metrics is still the trace-logging shim from tcp-ingestion's pre-1.10 pattern; real prom-client lands in task 1.9. Verification: typecheck, lint clean; 112 unit tests passing across 7 test files (+39 from this batch).
609 lines
17 KiB
TypeScript
609 lines
17 KiB
TypeScript
/**
|
|
* Unit tests for src/core/consumer.ts
|
|
*
|
|
* All Redis I/O is mocked — no real Redis required. The integration test
|
|
* (task 1.10) covers the end-to-end round-trip.
|
|
*
|
|
* Covers:
|
|
* - Decodes a synthetic stream entry into a ConsumedRecord with the right shape
|
|
* - Calls sink with the decoded batch and ACKs only the IDs the sink returned
|
|
* - Partial ACK: sink returns subset of IDs; only those are ACKed
|
|
* - BUSYGROUP error from XGROUP CREATE is swallowed and continues
|
|
* - Malformed payload: increments metric, logs at error, does NOT ACK the entry
|
|
* - Missing payload field: logs at error, does NOT ACK the entry
|
|
* - stop() causes the loop to exit cleanly
|
|
* - XREADGROUP failure logs error and backs off without crashing
|
|
*/
|
|
|
|
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
|
import type { MockedFunction } from 'vitest';
|
|
import type { Redis } from 'ioredis';
|
|
import type { Logger } from 'pino';
|
|
import type { Config } from '../src/config/load.js';
|
|
import type { Metrics, Position } from '../src/core/types.js';
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
function makeSilentLogger(): Logger {
|
|
return {
|
|
debug: vi.fn(),
|
|
info: vi.fn(),
|
|
warn: vi.fn(),
|
|
error: vi.fn(),
|
|
fatal: vi.fn(),
|
|
child: vi.fn().mockReturnThis(),
|
|
trace: vi.fn(),
|
|
level: 'silent',
|
|
silent: vi.fn(),
|
|
} as unknown as Logger;
|
|
}
|
|
|
|
function makeMetrics(): Metrics & {
|
|
incCalls: Array<{ name: string; labels?: Record<string, string> }>;
|
|
} {
|
|
const incCalls: Array<{ name: string; labels?: Record<string, string> }> = [];
|
|
return {
|
|
incCalls,
|
|
inc: (name: string, labels?: Record<string, string>) => {
|
|
incCalls.push({ name, labels });
|
|
},
|
|
observe: vi.fn(),
|
|
};
|
|
}
|
|
|
|
function makeConfig(overrides: Partial<Config> = {}): Config {
|
|
return {
|
|
NODE_ENV: 'test',
|
|
INSTANCE_ID: 'test-processor',
|
|
LOG_LEVEL: 'silent',
|
|
REDIS_URL: 'redis://localhost:6379',
|
|
POSTGRES_URL: 'postgres://localhost:5432/test',
|
|
REDIS_TELEMETRY_STREAM: 'telemetry:t',
|
|
REDIS_CONSUMER_GROUP: 'processor',
|
|
REDIS_CONSUMER_NAME: 'test-consumer',
|
|
METRICS_PORT: 9090,
|
|
BATCH_SIZE: 10,
|
|
BATCH_BLOCK_MS: 100,
|
|
WRITE_BATCH_SIZE: 50,
|
|
DEVICE_STATE_LRU_CAP: 1000,
|
|
...overrides,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Builds the JSON payload for a synthetic Position, mirroring tcp-ingestion's
|
|
* serialization format (sentinel encoding for bigint/Buffer/Date).
|
|
*/
|
|
function buildPayload(overrides: Partial<Position> = {}): string {
|
|
const position: Position = {
|
|
device_id: 'TESTDEVICE001',
|
|
timestamp: new Date('2024-05-01T12:00:00.000Z'),
|
|
latitude: 54.6872,
|
|
longitude: 25.2797,
|
|
altitude: 100,
|
|
angle: 90,
|
|
speed: 50,
|
|
satellites: 12,
|
|
priority: 1,
|
|
attributes: {},
|
|
...overrides,
|
|
};
|
|
|
|
function jsonReplacer(_key: string, value: unknown): unknown {
|
|
if (typeof value === 'bigint') return { __bigint: value.toString() };
|
|
if (value instanceof Uint8Array) return { __buffer_b64: Buffer.from(value).toString('base64') };
|
|
if (value instanceof Date) return value.toISOString();
|
|
return value;
|
|
}
|
|
|
|
return JSON.stringify(position, jsonReplacer);
|
|
}
|
|
|
|
/**
|
|
* Builds a raw XREADGROUP response for a single entry.
|
|
* ioredis returns: [[streamName, [[entryId, [field, value, ...]], ...]]]
|
|
*/
|
|
function buildXreadgroupResponse(
|
|
stream: string,
|
|
entries: Array<{ id: string; fields: Record<string, string> }>,
|
|
): [string, [string, string[]][]][] {
|
|
return [
|
|
[
|
|
stream,
|
|
entries.map(({ id, fields }) => [
|
|
id,
|
|
Object.entries(fields).flat(),
|
|
] as [string, string[]]),
|
|
],
|
|
];
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Mock ioredis
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type MockRedis = {
|
|
xgroup: MockedFunction<(...args: unknown[]) => Promise<string>>;
|
|
xreadgroup: MockedFunction<(...args: unknown[]) => Promise<unknown>>;
|
|
xack: MockedFunction<(...args: unknown[]) => Promise<number>>;
|
|
};
|
|
|
|
function makeMockRedis(): MockRedis {
|
|
return {
|
|
xgroup: vi.fn().mockResolvedValue('OK'),
|
|
xreadgroup: vi.fn().mockResolvedValue(null), // default: BLOCK timeout
|
|
xack: vi.fn().mockResolvedValue(1),
|
|
};
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// ensureConsumerGroup tests
|
|
// ---------------------------------------------------------------------------
|
|
|
|
import { ensureConsumerGroup } from '../src/core/consumer.js';
|
|
|
|
describe('ensureConsumerGroup', () => {
|
|
it('calls XGROUP CREATE with MKSTREAM and $ start ID', async () => {
|
|
const redis = makeMockRedis();
|
|
const logger = makeSilentLogger();
|
|
|
|
await ensureConsumerGroup(redis as unknown as Redis, 'telemetry:t', 'processor', logger);
|
|
|
|
expect(redis.xgroup).toHaveBeenCalledWith('CREATE', 'telemetry:t', 'processor', '$', 'MKSTREAM');
|
|
expect(logger.info).toHaveBeenCalledWith(
|
|
expect.objectContaining({ stream: 'telemetry:t', group: 'processor' }),
|
|
'consumer group created',
|
|
);
|
|
});
|
|
|
|
it('swallows BUSYGROUP error and logs info', async () => {
|
|
const redis = makeMockRedis();
|
|
redis.xgroup.mockRejectedValue(new Error('BUSYGROUP Consumer Group name already exists'));
|
|
const logger = makeSilentLogger();
|
|
|
|
await expect(
|
|
ensureConsumerGroup(redis as unknown as Redis, 'telemetry:t', 'processor', logger),
|
|
).resolves.toBeUndefined();
|
|
|
|
expect(logger.info).toHaveBeenCalledWith(
|
|
expect.objectContaining({ stream: 'telemetry:t', group: 'processor' }),
|
|
'consumer group already exists',
|
|
);
|
|
});
|
|
|
|
it('rethrows non-BUSYGROUP errors', async () => {
|
|
const redis = makeMockRedis();
|
|
redis.xgroup.mockRejectedValue(new Error('NOPERM no permissions'));
|
|
const logger = makeSilentLogger();
|
|
|
|
await expect(
|
|
ensureConsumerGroup(redis as unknown as Redis, 'telemetry:t', 'processor', logger),
|
|
).rejects.toThrow('NOPERM no permissions');
|
|
});
|
|
});
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// createConsumer tests
|
|
// ---------------------------------------------------------------------------
|
|
|
|
import { createConsumer } from '../src/core/consumer.js';
|
|
import type { ConsumedRecord } from '../src/core/consumer.js';
|
|
|
|
describe('createConsumer — happy path', () => {
|
|
afterEach(() => {
|
|
vi.restoreAllMocks();
|
|
vi.useRealTimers();
|
|
});
|
|
|
|
it('decodes a stream entry and passes a ConsumedRecord to the sink', async () => {
|
|
const redis = makeMockRedis();
|
|
const logger = makeSilentLogger();
|
|
const metrics = makeMetrics();
|
|
const config = makeConfig();
|
|
|
|
const payload = buildPayload({ device_id: 'DEV001' });
|
|
const stream = 'telemetry:t';
|
|
const entryId = '1714488000000-0';
|
|
|
|
// First call: return one entry. Subsequent calls: return null (BLOCK timeout).
|
|
redis.xreadgroup
|
|
.mockResolvedValueOnce(
|
|
buildXreadgroupResponse(stream, [
|
|
{ id: entryId, fields: { ts: '2024-05-01T12:00:00.000Z', device_id: 'DEV001', codec: '8', payload } },
|
|
]),
|
|
)
|
|
.mockResolvedValue(null);
|
|
|
|
const receivedRecords: ConsumedRecord[][] = [];
|
|
let consumerRef: ReturnType<typeof createConsumer> | undefined;
|
|
|
|
const sink = vi.fn(async (records: ConsumedRecord[]) => {
|
|
receivedRecords.push(records);
|
|
// Stop the consumer after processing the first batch so the loop exits.
|
|
void consumerRef?.stop();
|
|
return records.map((r) => r.id);
|
|
});
|
|
|
|
const consumer = createConsumer(
|
|
redis as unknown as Redis,
|
|
config,
|
|
logger,
|
|
metrics,
|
|
sink,
|
|
);
|
|
consumerRef = consumer;
|
|
|
|
await consumer.start();
|
|
// Wait for the consumer to process and stop
|
|
await consumer.stop();
|
|
|
|
expect(receivedRecords.length).toBeGreaterThanOrEqual(1);
|
|
const firstBatch = receivedRecords[0];
|
|
expect(firstBatch).toBeDefined();
|
|
expect(firstBatch!.length).toBe(1);
|
|
|
|
const record = firstBatch![0]!;
|
|
expect(record.id).toBe(entryId);
|
|
expect(record.codec).toBe('8');
|
|
expect(record.ts).toBe('2024-05-01T12:00:00.000Z');
|
|
expect(record.position.device_id).toBe('DEV001');
|
|
expect(record.position.latitude).toBe(54.6872);
|
|
});
|
|
|
|
it('ACKs only the IDs returned by the sink (partial ACK)', async () => {
|
|
const redis = makeMockRedis();
|
|
const logger = makeSilentLogger();
|
|
const metrics = makeMetrics();
|
|
const config = makeConfig();
|
|
|
|
const stream = 'telemetry:t';
|
|
const ids = ['1000-0', '1000-1', '1000-2'];
|
|
|
|
const entries = ids.map((id) => ({
|
|
id,
|
|
fields: {
|
|
ts: '2024-05-01T12:00:00.000Z',
|
|
device_id: `DEV${id}`,
|
|
codec: '8',
|
|
payload: buildPayload({ device_id: `DEV${id}` }),
|
|
},
|
|
}));
|
|
|
|
let consumerRef: ReturnType<typeof createConsumer> | undefined;
|
|
|
|
redis.xreadgroup
|
|
.mockResolvedValueOnce(buildXreadgroupResponse(stream, entries))
|
|
.mockResolvedValue(null);
|
|
|
|
// Sink returns only the first and third IDs — second stays pending
|
|
const sink = vi.fn(async (records: ConsumedRecord[]) => {
|
|
void consumerRef?.stop();
|
|
return [records[0]!.id, records[2]!.id];
|
|
});
|
|
|
|
const consumer = createConsumer(
|
|
redis as unknown as Redis,
|
|
config,
|
|
logger,
|
|
metrics,
|
|
sink,
|
|
);
|
|
consumerRef = consumer;
|
|
|
|
await consumer.start();
|
|
await consumer.stop();
|
|
|
|
expect(redis.xack).toHaveBeenCalledWith(stream, 'processor', ids[0], ids[2]);
|
|
// id[1] must NOT be in any xack call
|
|
const xackCalls = redis.xack.mock.calls.flat();
|
|
expect(xackCalls).not.toContain(ids[1]);
|
|
});
|
|
|
|
it('does not call xack when sink returns an empty array', async () => {
|
|
const redis = makeMockRedis();
|
|
const logger = makeSilentLogger();
|
|
const metrics = makeMetrics();
|
|
const config = makeConfig();
|
|
|
|
const stream = 'telemetry:t';
|
|
let consumerRef: ReturnType<typeof createConsumer> | undefined;
|
|
|
|
redis.xreadgroup
|
|
.mockResolvedValueOnce(
|
|
buildXreadgroupResponse(stream, [
|
|
{
|
|
id: '2000-0',
|
|
fields: {
|
|
ts: '2024-05-01T12:00:00.000Z',
|
|
device_id: 'DEV002',
|
|
codec: '8',
|
|
payload: buildPayload({ device_id: 'DEV002' }),
|
|
},
|
|
},
|
|
]),
|
|
)
|
|
.mockResolvedValue(null);
|
|
|
|
const sink = vi.fn(async (_records: ConsumedRecord[]) => {
|
|
void consumerRef?.stop();
|
|
return [];
|
|
});
|
|
|
|
const consumer = createConsumer(
|
|
redis as unknown as Redis,
|
|
config,
|
|
logger,
|
|
metrics,
|
|
sink,
|
|
);
|
|
consumerRef = consumer;
|
|
|
|
await consumer.start();
|
|
await consumer.stop();
|
|
|
|
expect(redis.xack).not.toHaveBeenCalled();
|
|
});
|
|
});
|
|
|
|
describe('createConsumer — decode errors', () => {
|
|
afterEach(() => {
|
|
vi.restoreAllMocks();
|
|
});
|
|
|
|
it('skips malformed payload: increments metric, logs error, does not ACK', async () => {
|
|
const redis = makeMockRedis();
|
|
const logger = makeSilentLogger();
|
|
const metrics = makeMetrics();
|
|
const config = makeConfig();
|
|
|
|
const stream = 'telemetry:t';
|
|
const badId = '3000-0';
|
|
let consumerRef: ReturnType<typeof createConsumer> | undefined;
|
|
|
|
redis.xreadgroup
|
|
.mockResolvedValueOnce(
|
|
buildXreadgroupResponse(stream, [
|
|
{
|
|
id: badId,
|
|
fields: {
|
|
ts: '2024-05-01T12:00:00.000Z',
|
|
device_id: 'DEV003',
|
|
codec: '8',
|
|
payload: 'not valid json {{{',
|
|
},
|
|
},
|
|
]),
|
|
)
|
|
.mockResolvedValue(null);
|
|
|
|
const sink = vi.fn(async (_records: ConsumedRecord[]) => {
|
|
void consumerRef?.stop();
|
|
return [];
|
|
});
|
|
|
|
const consumer = createConsumer(
|
|
redis as unknown as Redis,
|
|
config,
|
|
logger,
|
|
metrics,
|
|
sink,
|
|
);
|
|
consumerRef = consumer;
|
|
|
|
await consumer.start();
|
|
await consumer.stop();
|
|
|
|
// Decode error metric incremented
|
|
expect(metrics.incCalls.some((c) => c.name === 'processor_decode_errors_total')).toBe(true);
|
|
|
|
// Logged at error
|
|
expect(logger.error).toHaveBeenCalled();
|
|
|
|
// Sink was called with empty records (bad entry filtered out)
|
|
expect(sink).toHaveBeenCalledWith([]);
|
|
|
|
// No XACK for the bad entry
|
|
expect(redis.xack).not.toHaveBeenCalledWith(stream, 'processor', badId);
|
|
});
|
|
|
|
it('skips entry with missing payload field: increments metric, logs error, does not ACK', async () => {
|
|
const redis = makeMockRedis();
|
|
const logger = makeSilentLogger();
|
|
const metrics = makeMetrics();
|
|
const config = makeConfig();
|
|
|
|
const stream = 'telemetry:t';
|
|
const badId = '3001-0';
|
|
let consumerRef: ReturnType<typeof createConsumer> | undefined;
|
|
|
|
redis.xreadgroup
|
|
.mockResolvedValueOnce(
|
|
buildXreadgroupResponse(stream, [
|
|
{
|
|
id: badId,
|
|
// No payload field
|
|
fields: { ts: '2024-05-01T12:00:00.000Z', device_id: 'DEV004', codec: '8' },
|
|
},
|
|
]),
|
|
)
|
|
.mockResolvedValue(null);
|
|
|
|
const sink = vi.fn(async (_records: ConsumedRecord[]) => {
|
|
void consumerRef?.stop();
|
|
return [];
|
|
});
|
|
|
|
const consumer = createConsumer(
|
|
redis as unknown as Redis,
|
|
config,
|
|
logger,
|
|
metrics,
|
|
sink,
|
|
);
|
|
consumerRef = consumer;
|
|
|
|
await consumer.start();
|
|
await consumer.stop();
|
|
|
|
expect(metrics.incCalls.some((c) => c.name === 'processor_decode_errors_total')).toBe(true);
|
|
expect(logger.error).toHaveBeenCalled();
|
|
expect(redis.xack).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it('valid and invalid entries in the same batch: ACKs only valid ones', async () => {
|
|
const redis = makeMockRedis();
|
|
const logger = makeSilentLogger();
|
|
const metrics = makeMetrics();
|
|
const config = makeConfig();
|
|
|
|
const stream = 'telemetry:t';
|
|
const goodId = '4000-0';
|
|
const badId = '4000-1';
|
|
let consumerRef: ReturnType<typeof createConsumer> | undefined;
|
|
|
|
redis.xreadgroup
|
|
.mockResolvedValueOnce(
|
|
buildXreadgroupResponse(stream, [
|
|
{
|
|
id: goodId,
|
|
fields: {
|
|
ts: '2024-05-01T12:00:00.000Z',
|
|
device_id: 'DEV005',
|
|
codec: '8',
|
|
payload: buildPayload({ device_id: 'DEV005' }),
|
|
},
|
|
},
|
|
{
|
|
id: badId,
|
|
fields: {
|
|
ts: '2024-05-01T12:00:00.000Z',
|
|
device_id: 'DEV005',
|
|
codec: '8',
|
|
payload: 'not json',
|
|
},
|
|
},
|
|
]),
|
|
)
|
|
.mockResolvedValue(null);
|
|
|
|
const sink = vi.fn(async (records: ConsumedRecord[]) => {
|
|
void consumerRef?.stop();
|
|
return records.map((r) => r.id);
|
|
});
|
|
|
|
const consumer = createConsumer(
|
|
redis as unknown as Redis,
|
|
config,
|
|
logger,
|
|
metrics,
|
|
sink,
|
|
);
|
|
consumerRef = consumer;
|
|
|
|
await consumer.start();
|
|
await consumer.stop();
|
|
|
|
// Sink received only the good record
|
|
expect(sink).toHaveBeenCalledWith(
|
|
expect.arrayContaining([expect.objectContaining({ id: goodId })]),
|
|
);
|
|
expect(sink).toHaveBeenCalledWith(
|
|
expect.not.arrayContaining([expect.objectContaining({ id: badId })]),
|
|
);
|
|
|
|
// ACK called for good entry only
|
|
expect(redis.xack).toHaveBeenCalledWith(stream, 'processor', goodId);
|
|
const xackArgs = redis.xack.mock.calls.flat();
|
|
expect(xackArgs).not.toContain(badId);
|
|
});
|
|
});
|
|
|
|
describe('createConsumer — XREADGROUP failure', () => {
|
|
beforeEach(() => {
|
|
vi.useFakeTimers();
|
|
});
|
|
|
|
afterEach(() => {
|
|
vi.useRealTimers();
|
|
vi.restoreAllMocks();
|
|
});
|
|
|
|
it('backs off and retries after XREADGROUP error', async () => {
|
|
const redis = makeMockRedis();
|
|
const logger = makeSilentLogger();
|
|
const metrics = makeMetrics();
|
|
const config = makeConfig({ BATCH_BLOCK_MS: 10 });
|
|
|
|
let consumerRef: ReturnType<typeof createConsumer> | undefined;
|
|
|
|
let callCount = 0;
|
|
redis.xreadgroup.mockImplementation(async () => {
|
|
callCount++;
|
|
if (callCount === 1) {
|
|
throw new Error('LOADING Redis is loading the dataset in memory');
|
|
}
|
|
// Stop consumer on second call
|
|
void consumerRef?.stop();
|
|
return null;
|
|
});
|
|
|
|
const sink = vi.fn(async () => []);
|
|
|
|
const consumer = createConsumer(
|
|
redis as unknown as Redis,
|
|
config,
|
|
logger,
|
|
metrics,
|
|
sink,
|
|
);
|
|
consumerRef = consumer;
|
|
|
|
await consumer.start();
|
|
|
|
// Advance timers past the 1000ms backoff
|
|
await vi.advanceTimersByTimeAsync(1_100);
|
|
await consumer.stop();
|
|
|
|
expect(logger.error).toHaveBeenCalledWith(
|
|
expect.objectContaining({ err: expect.anything() }),
|
|
'XREADGROUP failed; backing off',
|
|
);
|
|
|
|
// Should have retried at least once
|
|
expect(callCount).toBeGreaterThanOrEqual(2);
|
|
});
|
|
});
|
|
|
|
describe('createConsumer — clean stop', () => {
|
|
afterEach(() => {
|
|
vi.restoreAllMocks();
|
|
});
|
|
|
|
it('stop() returns after current batch completes', async () => {
|
|
const redis = makeMockRedis();
|
|
const logger = makeSilentLogger();
|
|
const metrics = makeMetrics();
|
|
const config = makeConfig();
|
|
|
|
// Return null immediately (BLOCK timeout) so the loop spins and we can stop it
|
|
redis.xreadgroup.mockResolvedValue(null);
|
|
|
|
const sink = vi.fn(async () => []);
|
|
|
|
const consumer = createConsumer(
|
|
redis as unknown as Redis,
|
|
config,
|
|
logger,
|
|
metrics,
|
|
sink,
|
|
);
|
|
|
|
await consumer.start();
|
|
|
|
// stop() should resolve without hanging
|
|
await expect(consumer.stop()).resolves.toBeUndefined();
|
|
});
|
|
});
|