diff --git a/.planning/ROADMAP.md b/.planning/ROADMAP.md index a2a3a59..2a7dab0 100644 --- a/.planning/ROADMAP.md +++ b/.planning/ROADMAP.md @@ -40,7 +40,7 @@ These rules govern every task. Any deviation must be discussed and documented as ### Phase 1 โ€” Throughput pipeline -**Status:** ๐ŸŸจ In progress (1.1โ€“1.4 done; 1.5โ€“1.11 ahead) +**Status:** ๐ŸŸจ In progress (1.1โ€“1.8 done; 1.9โ€“1.11 ahead) **Outcome:** A Node.js Processor that joins a Redis Streams consumer group on `telemetry:t`, decodes each `Position` (including `__bigint`/`__buffer_b64` sentinel reversal), upserts it into a TimescaleDB `positions` hypertable, updates per-device in-memory state (last position, last seen), `XACK`s on successful write, and exposes Prometheus metrics + health/readiness HTTP endpoints. End-to-end pilot-quality service; no domain logic yet. [**See `phase-1-throughput/README.md`**](./phase-1-throughput/README.md) @@ -51,10 +51,10 @@ These rules govern every task. Any deviation must be discussed and documented as | 1.2 | [Core types & contracts](./phase-1-throughput/02-core-types.md) | ๐ŸŸฉ | `290a08e` | | 1.3 | [Configuration & logging](./phase-1-throughput/03-config-and-logging.md) | ๐ŸŸฉ | `290a08e` | | 1.4 | [Postgres connection & `positions` hypertable](./phase-1-throughput/04-postgres-schema.md) | ๐ŸŸฉ | `290a08e` | -| 1.5 | [Redis Stream consumer (XREADGROUP)](./phase-1-throughput/05-stream-consumer.md) | โฌœ | โ€” | -| 1.6 | [Per-device in-memory state](./phase-1-throughput/06-device-state.md) | โฌœ | โ€” | -| 1.7 | [Position writer (batched upsert)](./phase-1-throughput/07-position-writer.md) | โฌœ | โ€” | -| 1.8 | [Main wiring & ACK semantics](./phase-1-throughput/08-main-wiring.md) | โฌœ | โ€” | +| 1.5 | [Redis Stream consumer (XREADGROUP)](./phase-1-throughput/05-stream-consumer.md) | ๐ŸŸฉ | *(pending commit SHA)* | +| 1.6 | [Per-device in-memory state](./phase-1-throughput/06-device-state.md) | ๐ŸŸฉ | *(pending commit SHA)* | +| 1.7 | [Position writer (batched upsert)](./phase-1-throughput/07-position-writer.md) | ๐ŸŸฉ | *(pending commit SHA)* | +| 1.8 | [Main wiring & ACK semantics](./phase-1-throughput/08-main-wiring.md) | ๐ŸŸฉ | *(pending commit SHA)* | | 1.9 | [Observability (Prometheus metrics + /healthz + /readyz)](./phase-1-throughput/09-observability.md) | โฌœ | โ€” | | 1.10 | [Integration test (testcontainers Redis + Postgres)](./phase-1-throughput/10-integration-test.md) | โฌœ | โ€” | | 1.11 | [Dockerfile & Gitea workflow](./phase-1-throughput/11-dockerfile-and-ci.md) | โฌœ | โ€” | diff --git a/.planning/phase-1-throughput/05-stream-consumer.md b/.planning/phase-1-throughput/05-stream-consumer.md index 3fc2cea..0be74b9 100644 --- a/.planning/phase-1-throughput/05-stream-consumer.md +++ b/.planning/phase-1-throughput/05-stream-consumer.md @@ -1,7 +1,7 @@ # Task 1.5 โ€” Redis Stream consumer (XREADGROUP) **Phase:** 1 โ€” Throughput pipeline -**Status:** โฌœ Not started +**Status:** ๐ŸŸฉ Done **Depends on:** 1.2, 1.3 **Wiki refs:** `docs/wiki/entities/redis-streams.md`, `docs/wiki/entities/processor.md` @@ -90,4 +90,4 @@ On `start()`: ## Done -(Fill in once complete: commit SHA, brief notes.) +`src/core/consumer.ts` โ€” XREADGROUP loop with `ensureConsumerGroup`, `decodeBatch`, partial-ACK semantics, `connectRedis` (co-located, not in `src/db/`), and clean stop. `test/consumer.test.ts` โ€” 11 tests covering happy path, partial ACK, BUSYGROUP swallow, decode error skip, missing payload skip, XREADGROUP backoff, clean stop. *(pending commit SHA)* diff --git a/.planning/phase-1-throughput/06-device-state.md b/.planning/phase-1-throughput/06-device-state.md index 0d6f718..38523e6 100644 --- a/.planning/phase-1-throughput/06-device-state.md +++ b/.planning/phase-1-throughput/06-device-state.md @@ -1,7 +1,7 @@ # Task 1.6 โ€” Per-device in-memory state **Phase:** 1 โ€” Throughput pipeline -**Status:** โฌœ Not started +**Status:** ๐ŸŸฉ Done **Depends on:** 1.2 **Wiki refs:** `docs/wiki/entities/processor.md` (ยง State management) @@ -78,4 +78,4 @@ The interface is built to extend: Phase 2 may add fields, but the existing field ## Done -(Fill in once complete: commit SHA, brief notes.) +`src/core/state.ts` โ€” LRU Map using delete+set bump trick, `last_seen = max(prev, position.timestamp)` semantics, `evictedTotal()` counter. `test/state.test.ts` โ€” 14 tests covering new-device creation, session counter increment, LRU eviction at cap, LRU re-touch, evictedTotal, out-of-order timestamp rejection, get/size. *(pending commit SHA)* diff --git a/.planning/phase-1-throughput/07-position-writer.md b/.planning/phase-1-throughput/07-position-writer.md index 0a9e9da..f9a648e 100644 --- a/.planning/phase-1-throughput/07-position-writer.md +++ b/.planning/phase-1-throughput/07-position-writer.md @@ -1,7 +1,7 @@ # Task 1.7 โ€” Position writer (batched upsert) **Phase:** 1 โ€” Throughput pipeline -**Status:** โฌœ Not started +**Status:** ๐ŸŸฉ Done **Depends on:** 1.2, 1.4 **Wiki refs:** `docs/wiki/entities/postgres-timescaledb.md` @@ -91,4 +91,6 @@ If a transaction-wide failure occurs (Pool dead, transient network), all records ## Done -(Fill in once complete: commit SHA, brief notes.) +`src/core/writer.ts` โ€” multi-row INSERT with RETURNING, duplicate detection by (device_id, ts) set diff, sequential chunking, bigint/Buffer attribute serialization (handles Buffer.toJSON shape). `test/writer.test.ts` โ€” 14 tests covering happy path, all-duplicate, mixed, pool error, chunk split, Buffer base64, bigint string, parameter ordering, metrics. *(pending commit SHA)* + +**Note:** The spec's `RETURNING (xmax = 0) AS inserted` idiom was replaced with a simpler set-difference approach โ€” compare RETURNING rows against input by (device_id, ts). The xmax approach is mentioned in the spec but then immediately qualified: "rows that hit the conflict are NOT returned." The set-diff is cleaner and avoids confusion. The spec's own Note section confirms this is the right approach. diff --git a/.planning/phase-1-throughput/08-main-wiring.md b/.planning/phase-1-throughput/08-main-wiring.md index 71c7fda..79a7f58 100644 --- a/.planning/phase-1-throughput/08-main-wiring.md +++ b/.planning/phase-1-throughput/08-main-wiring.md @@ -1,7 +1,7 @@ # Task 1.8 โ€” Main wiring & ACK semantics **Phase:** 1 โ€” Throughput pipeline -**Status:** โฌœ Not started +**Status:** ๐ŸŸฉ Done **Depends on:** 1.5, 1.6, 1.7 **Wiki refs:** `docs/wiki/entities/processor.md` @@ -97,4 +97,4 @@ After this task lands you should be able to run `pnpm dev` against a local Redis ## Done -(Fill in once complete: commit SHA, brief notes.) +`src/main.ts` โ€” full pipeline wiring: Postgres pool โ†’ migrations โ†’ Redis โ†’ state store โ†’ writer โ†’ sink โ†’ consumer โ†’ graceful shutdown stub. Metrics shim uses `logger.trace`. Sink ordering: state.update before writer.write per spec. *(pending commit SHA)* diff --git a/src/core/consumer.ts b/src/core/consumer.ts new file mode 100644 index 0000000..88161aa --- /dev/null +++ b/src/core/consumer.ts @@ -0,0 +1,333 @@ +/** + * Redis Stream consumer โ€” XREADGROUP loop. + * + * Joins the consumer group on startup, fetches batches via XREADGROUP, decodes + * each entry to a Position, hands off to a sink callback, and ACKs only the IDs + * the sink confirms were handled. Failures stay pending in the consumer's PEL + * for re-delivery or XAUTOCLAIM (Phase 3). + * + * Design notes: + * - `connectRedis` is co-located here (rather than in src/db/) because it is + * tightly coupled to consumer startup: the consumer is the only component that + * needs a live Redis connection in Phase 1. Putting it in src/db/ would imply + * it is a general shared utility alongside pool.ts, but the Postgres pool has + * its own connectWithRetry there โ€” symmetry is better than indirection. + */ + +import type { Redis } from 'ioredis'; +import type { Logger } from 'pino'; +import type { Config } from '../config/load.js'; +import type { Metrics, Position } from './types.js'; +import { decodePosition, CodecError } from './codec.js'; + +// --------------------------------------------------------------------------- +// Public types +// --------------------------------------------------------------------------- + +/** + * A decoded stream entry ready for the sink to process. + * The `id` field is the Redis Stream entry ID (e.g. "1714488000000-0") used for + * XACK. `codec` and `ts` come from the top-level stream fields that tcp-ingestion + * writes alongside the payload โ€” consumers can filter on them without JSON parsing. + */ +export type ConsumedRecord = { + readonly id: string; + readonly position: Position; + readonly codec: string; + readonly ts: string; +}; + +/** + * Sink callback invoked once per batch. Returns the subset of IDs that were + * handled successfully; the consumer ACKs only those. Partial returns are valid: + * unacknowledged IDs stay in the PEL and are re-delivered on reconnect or claimed + * by another instance. + */ +export type Sink = (records: ConsumedRecord[]) => Promise; + +/** + * Handle returned by createConsumer. `start()` kicks off the read loop. + * `stop()` signals the loop to exit after the current batch finishes. + */ +export type Consumer = { + readonly start: () => Promise; + readonly stop: () => Promise; +}; + +// --------------------------------------------------------------------------- +// Redis connection (mirrors tcp-ingestion's connectRedis) +// --------------------------------------------------------------------------- + +/** + * Connects to Redis with exponential-backoff retry on startup. + * Fails fast (process.exit) after `maxAttempts` consecutive failures so the + * orchestrator can restart rather than running with a broken connection. + * + * Placed in consumer.ts (not src/db/redis.ts) because the consumer is the only + * Phase 1 component that owns a Redis connection. Creating a separate db/redis.ts + * would be premature generalization โ€” if Phase 2 needs a second connection the + * function can be moved then. + */ +export async function connectRedis( + redisUrl: string, + logger: Logger, + maxAttempts = 3, +): Promise { + // Dynamic import keeps ioredis out of the module graph for test files that + // don't import this function directly. + const { default: Redis } = await import('ioredis'); + + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + const redis = new Redis(redisUrl, { + // Disable ioredis's built-in reconnect โ€” we manage retries ourselves so + // startup failure is deterministic rather than silently retrying forever. + enableOfflineQueue: false, + lazyConnect: true, + maxRetriesPerRequest: 0, + connectTimeout: 5_000, + }); + + try { + await redis.connect(); + logger.info({ attempt }, 'Redis connected'); + return redis; + } catch (err) { + // Best-effort quit: ignore errors โ€” the connection may already be in a bad + // state and we are about to retry anyway. + await redis.quit().catch(() => undefined); + + if (attempt === maxAttempts) { + logger.fatal({ err, url: redisUrl }, 'Redis connection failed after all retries; exiting'); + process.exit(1); + } + + const backoffMs = Math.min(200 * 2 ** (attempt - 1), 5_000); + logger.warn( + { err, attempt, maxAttempts, backoffMs }, + 'Redis connection failed; retrying', + ); + await new Promise((resolve) => setTimeout(resolve, backoffMs)); + } + } + + // TypeScript: unreachable after process.exit above, but needed for type safety. + /* c8 ignore next */ + throw new Error('unreachable'); +} + +// --------------------------------------------------------------------------- +// Consumer group setup +// --------------------------------------------------------------------------- + +/** + * Creates the consumer group if it does not exist, using MKSTREAM so the stream + * itself is also created if absent. `$` as the start ID means "only new entries + * from now on" โ€” we do not replay history on first startup. + * + * BUSYGROUP means the group already exists; that is fine and expected on every + * restart after the first. + */ +export async function ensureConsumerGroup( + redis: Redis, + stream: string, + group: string, + logger: Logger, +): Promise { + try { + await redis.xgroup('CREATE', stream, group, '$', 'MKSTREAM'); + logger.info({ stream, group }, 'consumer group created'); + } catch (err: unknown) { + // ioredis surfaces Redis errors as Error instances with the Redis error code + // in the message string. + if (err instanceof Error && err.message.startsWith('BUSYGROUP')) { + logger.info({ stream, group }, 'consumer group already exists'); + return; + } + throw err; + } +} + +// --------------------------------------------------------------------------- +// Batch decoder +// --------------------------------------------------------------------------- + +/** + * Raw entry shape returned by ioredis XREADGROUP. + * ioredis returns: [[streamName, [[entryId, [field, value, ...]], ...]]] + */ +type RawStreamEntry = [id: string, fields: string[]]; + +/** + * Decodes a batch of raw stream entries into ConsumedRecord objects. + * Entries that fail to decode are logged at error and excluded from the result; + * they stay in the PEL (pending entries list) and will be re-attempted later. + */ +function decodeBatch( + entries: RawStreamEntry[], + stream: string, + logger: Logger, + metrics: Metrics, +): ConsumedRecord[] { + const records: ConsumedRecord[] = []; + + for (const [id, fields] of entries) { + // ioredis returns flat [field, value, field, value, ...] arrays. + const fieldMap: Record = {}; + for (let i = 0; i + 1 < fields.length; i += 2) { + const key = fields[i]; + const val = fields[i + 1]; + if (key !== undefined && val !== undefined) { + fieldMap[key] = val; + } + } + + const payload = fieldMap['payload']; + const codec = fieldMap['codec'] ?? ''; + const ts = fieldMap['ts'] ?? ''; + + if (payload === undefined) { + logger.error({ id, stream }, 'stream entry missing payload field; leaving pending'); + metrics.inc('processor_decode_errors_total', { stream }); + continue; + } + + let position: Position; + try { + position = decodePosition(payload); + } catch (err) { + const isCodecError = err instanceof CodecError; + logger.error( + { + id, + stream, + err, + // Truncate the raw payload to avoid flooding logs with large records. + rawPayloadHead: payload.slice(0, 256), + }, + isCodecError ? 'decode error; leaving entry pending' : 'unexpected error decoding entry', + ); + metrics.inc('processor_decode_errors_total', { stream }); + continue; + } + + records.push({ id, position, codec, ts }); + } + + return records; +} + +// --------------------------------------------------------------------------- +// Consumer factory +// --------------------------------------------------------------------------- + +/** + * Creates a Redis Stream consumer that reads from the given stream+group in a + * loop, decodes each entry, passes the batch to the sink, and ACKs only what + * the sink confirms. + */ +export function createConsumer( + redis: Redis, + config: Config, + logger: Logger, + metrics: Metrics, + sink: Sink, +): Consumer { + const stream = config.REDIS_TELEMETRY_STREAM; + const group = config.REDIS_CONSUMER_GROUP; + const consumerName = config.REDIS_CONSUMER_NAME; + const batchSize = config.BATCH_SIZE; + const batchBlockMs = config.BATCH_BLOCK_MS; + + let stopping = false; + + // Resolves when the currently in-flight batch (if any) has completed. + // Initialized to a resolved promise so stop() works cleanly if called before + // any batch starts. + let inFlightBatch: Promise = Promise.resolve(); + + async function sleep(ms: number): Promise { + await new Promise((resolve) => setTimeout(resolve, ms)); + } + + async function runLoop(): Promise { + logger.info( + { stream, group, consumer: consumerName }, + 'consumer started on stream', + ); + + while (!stopping) { + let rawResult: [string, [string, string[]][]][] | null; + + try { + // ioredis types: xreadgroup returns null on BLOCK timeout, otherwise an + // array of [streamName, entries[]] pairs. + rawResult = (await redis.xreadgroup( + 'GROUP', + group, + consumerName, + 'COUNT', + String(batchSize), + 'BLOCK', + String(batchBlockMs), + 'STREAMS', + stream, + '>', + )) as [string, [string, string[]][]][] | null; + } catch (err) { + if (stopping) break; + logger.error({ err }, 'XREADGROUP failed; backing off'); + await sleep(1_000); + continue; + } + + // BLOCK timeout โ€” no new entries; loop again to check stopping flag. + if (rawResult === null) continue; + + // rawResult is [[streamName, [[id, fields], ...]]] + // We only subscribed to one stream so we take the first element. + const streamEntries = rawResult[0]?.[1] ?? []; + if (streamEntries.length === 0) continue; + + logger.debug({ stream, n: streamEntries.length }, 'batch consumed'); + + const records = decodeBatch(streamEntries, stream, logger, metrics); + + const ackIds = await sink(records); + + if (ackIds.length > 0) { + await redis.xack(stream, group, ...ackIds); + } + + logger.debug( + { stream, consumed: streamEntries.length, acked: ackIds.length }, + 'batch acked', + ); + } + + logger.info({ stream, group }, 'consumer loop exited'); + } + + async function start(): Promise { + await ensureConsumerGroup(redis, stream, group, logger); + + // Assign the running loop to inFlightBatch so stop() can await it. + // We deliberately do not await runLoop() here โ€” start() returns once the + // loop has been kicked off (the group has been ensured), not when it ends. + inFlightBatch = runLoop(); + + // Propagate unhandled loop errors to the caller's process-level handler. + inFlightBatch.catch((err: unknown) => { + logger.fatal({ err }, 'consumer loop crashed unexpectedly; exiting'); + process.exit(1); + }); + } + + async function stop(): Promise { + stopping = true; + // Wait for the currently in-flight batch to complete. The loop will exit + // on the next iteration after the BLOCK timeout at most. + await inFlightBatch; + } + + return { start, stop }; +} diff --git a/src/core/state.ts b/src/core/state.ts new file mode 100644 index 0000000..26b3036 --- /dev/null +++ b/src/core/state.ts @@ -0,0 +1,114 @@ +/** + * Per-device in-memory state store with LRU eviction. + * + * Maintains a bounded Map updated on every accepted + * Position. When the map exceeds the LRU cap, the least-recently-updated entry + * is evicted. The LRU property is maintained without a third-party library by + * exploiting JavaScript's Map insertion-order guarantee: delete + set on update + * bumps the entry to the most-recent position in iteration order, and + * keys().next().value is always the oldest. + * + * Phase 3 adds rehydration: on first packet for an unknown device after a + * restart, query Postgres to seed last_position. Phase 1 accepts state loss on + * restart as a known limitation. + */ + +import type { Logger } from 'pino'; +import type { Config } from '../config/load.js'; +import type { Position, DeviceState } from './types.js'; + +// --------------------------------------------------------------------------- +// Public interface +// --------------------------------------------------------------------------- + +export type DeviceStateStore = { + /** + * Applies the position to the device's state, touches LRU order, and returns + * the new state. Creates the entry if this is the first position for the device. + * Evicts the least-recently-updated entry if the cap is exceeded. + */ + readonly update: (position: Position) => DeviceState; + + /** + * Returns the current state for a device without touching LRU order. + * Returns undefined for unknown devices. + * Used for diagnostics; the hot path always goes through update(). + */ + readonly get: (deviceId: string) => DeviceState | undefined; + + /** Current number of devices in the store. */ + readonly size: () => number; + + /** Total evictions since startup. Used for metrics. */ + readonly evictedTotal: () => number; +}; + +// --------------------------------------------------------------------------- +// Factory +// --------------------------------------------------------------------------- + +export function createDeviceStateStore(config: Config, logger: Logger): DeviceStateStore { + const cap = config.DEVICE_STATE_LRU_CAP; + const store = new Map(); + let evicted = 0; + + function update(position: Position): DeviceState { + const existing = store.get(position.device_id); + + const newState: DeviceState = existing === undefined + ? { + device_id: position.device_id, + last_position: position, + // last_seen is initialized from the position's device-reported timestamp, + // not the wall clock โ€” we want to track device time, not ingestion time. + last_seen: position.timestamp, + position_count_session: 1, + } + : { + device_id: position.device_id, + last_position: position, + // last_seen only advances forward: devices can buffer offline records and + // replay them out of order. We observed 55-record bursts on stage where + // consecutive timestamps could decrease. last_seen must mean "highest + // device timestamp seen so far" to be useful for downstream logic. + last_seen: position.timestamp > existing.last_seen + ? position.timestamp + : existing.last_seen, + position_count_session: existing.position_count_session + 1, + }; + + // Delete then set to bump this entry to the most-recent position in Map + // iteration order โ€” O(1), no external LRU library needed. + store.delete(position.device_id); + store.set(position.device_id, newState); + + // Evict the oldest entry (first in iteration order) if over cap. + if (store.size > cap) { + const oldestKey = store.keys().next().value; + if (oldestKey !== undefined) { + store.delete(oldestKey); + evicted++; + logger.debug( + { evictedDevice: oldestKey, storeSize: store.size, cap }, + 'device state evicted (LRU)', + ); + } + } + + return newState; + } + + function get(deviceId: string): DeviceState | undefined { + return store.get(deviceId); + } + + function size(): number { + return store.size; + } + + function evictedTotal(): number { + return evicted; + } + + return { update, get, size, evictedTotal }; +} diff --git a/src/core/writer.ts b/src/core/writer.ts new file mode 100644 index 0000000..3827f00 --- /dev/null +++ b/src/core/writer.ts @@ -0,0 +1,234 @@ +/** + * Position writer โ€” batched upsert into the positions hypertable. + * + * Uses INSERT ... ON CONFLICT (device_id, ts) DO NOTHING for idempotency. + * Rows that were already present are identified by comparing the RETURNING rows + * against the input: anything absent from RETURNING was a duplicate (Postgres + * does not return conflicting rows with DO NOTHING). + * + * Internally chunks large batches into WRITE_BATCH_SIZE groups and runs them + * sequentially. Parallelism against the same pg.Pool is deliberately avoided to + * prevent starving the migration runner and health-check queries. + */ + +import type pg from 'pg'; +import type { Logger } from 'pino'; +import type { Config } from '../config/load.js'; +import type { Metrics, AttributeValue } from './types.js'; +import type { ConsumedRecord } from './consumer.js'; + +// --------------------------------------------------------------------------- +// Public types +// --------------------------------------------------------------------------- + +export type WriteResult = { + readonly id: string; + readonly status: 'inserted' | 'duplicate' | 'failed'; + readonly error?: Error; +}; + +export type Writer = { + readonly write: (records: ConsumedRecord[]) => Promise; +}; + +// --------------------------------------------------------------------------- +// Attribute serialization +// --------------------------------------------------------------------------- + +/** + * Serializes the attributes map to a JSONB-safe string. + * + * JSON cannot represent bigint or Buffer natively, so we apply a custom replacer: + * - bigint โ†’ decimal string (lossless; readable in SQL) + * - Buffer โ†’ base64 string + * + * IMPORTANT: Buffer.prototype.toJSON() fires before JSON.stringify passes the + * value to the replacer, converting nested Buffer instances to + * { type: 'Buffer', data: [...] }. We handle both the direct instance case + * (for top-level or already-converted values) and the toJSON shape. This mirrors + * the pattern in tcp-ingestion's jsonReplacer. + * + * On-disk JSONB shape: bigints are stored as plain decimal strings, Buffers as + * plain base64 strings โ€” without the sentinel wrappers (__bigint / __buffer_b64) + * used in the in-flight Redis stream format. Document this distinction in + * wiki/concepts/position-record.md as a follow-up. + */ +function serializeAttributes(attributes: Readonly>): string { + return JSON.stringify(attributes, (_key, value: unknown) => { + if (typeof value === 'bigint') { + return value.toString(); + } + // Direct Buffer / Uint8Array instance (e.g. top-level attribute values passed + // directly to the replacer before toJSON fires). + if (value instanceof Uint8Array) { + return Buffer.from(value).toString('base64'); + } + // Buffer.toJSON() shape โ€” what JSON.stringify passes to the replacer for + // Buffer instances nested inside objects, because toJSON fires first. + if (isBufferToJsonShape(value)) { + return Buffer.from(value.data).toString('base64'); + } + return value; + }); +} + +type BufferToJsonShape = { type: 'Buffer'; data: number[] }; + +function isBufferToJsonShape(value: unknown): value is BufferToJsonShape { + return ( + typeof value === 'object' && + value !== null && + (value as Record)['type'] === 'Buffer' && + Array.isArray((value as Record)['data']) + ); +} + +// --------------------------------------------------------------------------- +// SQL builder +// --------------------------------------------------------------------------- + +/** + * Builds the multi-row INSERT statement for a chunk of records. + * Returns the SQL string and the flat params array. + * + * Column order (11 per row): + * device_id, ts, latitude, longitude, altitude, angle, speed, + * satellites, priority, codec, attributes + * + * Postgres parameter limit: 65535 / 11 = ~5957 rows max. WRITE_BATCH_SIZE=50 + * is well under this cap. + */ +function buildInsertSql(records: ConsumedRecord[]): { sql: string; params: unknown[] } { + const params: unknown[] = []; + const valueClauses: string[] = []; + const COLS_PER_ROW = 11; + + for (let i = 0; i < records.length; i++) { + const record = records[i]; + if (record === undefined) continue; + + const base = i * COLS_PER_ROW + 1; + valueClauses.push( + `($${base}, $${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10})`, + ); + + params.push( + record.position.device_id, // $N+0 device_id + record.position.timestamp, // $N+1 ts + record.position.latitude, // $N+2 latitude + record.position.longitude, // $N+3 longitude + record.position.altitude, // $N+4 altitude + record.position.angle, // $N+5 angle + record.position.speed, // $N+6 speed + record.position.satellites, // $N+7 satellites + record.position.priority, // $N+8 priority + record.codec, // $N+9 codec + serializeAttributes(record.position.attributes), // $N+10 attributes (JSONB) + ); + } + + const sql = ` + INSERT INTO positions (device_id, ts, latitude, longitude, altitude, angle, speed, satellites, priority, codec, attributes) + VALUES ${valueClauses.join(',\n ')} + ON CONFLICT (device_id, ts) DO NOTHING + RETURNING device_id, ts + `; + + return { sql, params }; +} + +// --------------------------------------------------------------------------- +// Result mapper +// --------------------------------------------------------------------------- + +type ReturningRow = { + device_id: string; + ts: Date; +}; + +/** + * Maps RETURNING rows back to per-record WriteResult entries. + * + * RETURNING only includes successfully inserted rows โ€” conflicting rows are + * silently omitted by ON CONFLICT DO NOTHING. We identify duplicates by + * comparing (device_id, ts) pairs from the input against the returned rows. + */ +function mapResults(records: ConsumedRecord[], returnedRows: ReturningRow[]): WriteResult[] { + // Build a set of (device_id, ts-ms) pairs from the RETURNING rows for O(1) lookup. + const insertedSet = new Set( + returnedRows.map((row) => `${row.device_id}|${row.ts.getTime()}`), + ); + + return records.map((record) => { + const key = `${record.position.device_id}|${record.position.timestamp.getTime()}`; + const status: 'inserted' | 'duplicate' = insertedSet.has(key) ? 'inserted' : 'duplicate'; + return { id: record.id, status }; + }); +} + +// --------------------------------------------------------------------------- +// Writer factory +// --------------------------------------------------------------------------- + +export function createWriter( + pool: pg.Pool, + config: Config, + logger: Logger, + metrics: Metrics, +): Writer { + const writeBatchSize = config.WRITE_BATCH_SIZE; + + async function writeChunk(chunk: ConsumedRecord[]): Promise { + const startMs = Date.now(); + const { sql, params } = buildInsertSql(chunk); + + let rows: ReturningRow[]; + try { + const result = await pool.query(sql, params); + rows = result.rows; + } catch (err) { + const error = err instanceof Error ? err : new Error(String(err)); + logger.error({ err, chunkSize: chunk.length }, 'position write failed'); + + metrics.inc('processor_position_writes_total', { status: 'failed' }); + + return chunk.map((record) => ({ id: record.id, status: 'failed' as const, error })); + } + + const results = mapResults(chunk, rows); + + const insertedCount = results.filter((r) => r.status === 'inserted').length; + const duplicateCount = results.filter((r) => r.status === 'duplicate').length; + + metrics.inc('processor_position_writes_total', { status: 'inserted' }); + metrics.inc('processor_position_writes_total', { status: 'duplicate' }); + metrics.observe('processor_position_write_duration_seconds', (Date.now() - startMs) / 1_000); + + logger.debug( + { inserted: insertedCount, duplicates: duplicateCount, chunkSize: chunk.length }, + 'batch written', + ); + + return results; + } + + async function write(records: ConsumedRecord[]): Promise { + if (records.length === 0) return []; + + // Split into chunks of writeBatchSize and run sequentially. + // Sequential execution avoids starving other pg.Pool users (migration runner, + // /readyz health check). The default BATCH_SIZE=100 with WRITE_BATCH_SIZE=50 + // produces at most 2 sequential queries per consumer tick. + const allResults: WriteResult[] = []; + + for (let offset = 0; offset < records.length; offset += writeBatchSize) { + const chunk = records.slice(offset, offset + writeBatchSize); + const chunkResults = await writeChunk(chunk); + allResults.push(...chunkResults); + } + + return allResults; + } + + return { write }; +} diff --git a/src/main.ts b/src/main.ts index 8c090d4..7fe0ebf 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,6 +1,15 @@ +import type { Redis } from 'ioredis'; +import type pg from 'pg'; import { loadConfig } from './config/load.js'; import type { Config } from './config/load.js'; import { createLogger } from './observability/logger.js'; +import { createPool, connectWithRetry } from './db/pool.js'; +import { runMigrations } from './db/migrate.js'; +import { connectRedis, createConsumer } from './core/consumer.js'; +import type { ConsumedRecord } from './core/consumer.js'; +import { createDeviceStateStore } from './core/state.js'; +import { createWriter } from './core/writer.js'; +import type { Metrics } from './core/types.js'; // ------------------------------------------------------------------------- // Startup: validate config (fail fast on bad env), build logger @@ -24,5 +33,150 @@ const logger = createLogger({ logger.info('processor starting'); -// Consumer, writer, and state wiring land in tasks 1.5โ€“1.8. -process.exit(0); +// ------------------------------------------------------------------------- +// Metrics placeholder shim (task 1.9 replaces this with prom-client) +// +// Uses trace-level logging so the calls are observable in development but +// are silent in production builds where the log level is info or higher. +// This mirrors tcp-ingestion's approach before task 1.10 landed there. +// ------------------------------------------------------------------------- + +const metrics: Metrics = { + inc: (name: string, labels?: Record) => { + logger.trace({ metric: name, labels }, 'metrics.inc'); + }, + observe: (name: string, value: number, labels?: Record) => { + logger.trace({ metric: name, value, labels }, 'metrics.observe'); + }, +}; + +// ------------------------------------------------------------------------- +// Wire up the pipeline +// ------------------------------------------------------------------------- + +async function main(): Promise { + // 1. Connect Postgres with exponential-backoff retry + const pool = createPool(config.POSTGRES_URL); + await connectWithRetry(pool, logger); + + // 2. Run migrations before any consumer activity. + // Phase 1 limitation: multiple instances starting simultaneously both try + // to migrate. Postgres advisory locks would solve this โ€” deferred to Phase 3 + // (production hardening), which is acceptable for the Phase 1 single-instance + // pilot. + await runMigrations(pool, logger); + logger.info('migrations applied'); + + // 3. Connect Redis with exponential-backoff retry + const redis: Redis = await connectRedis(config.REDIS_URL, logger); + + // 4. Build pipeline components + const state = createDeviceStateStore(config, logger); + const writer = createWriter(pool, config, logger, metrics); + + // 5. Define the sink: central decision point for state update and Postgres write. + // State is updated BEFORE the write so that in-memory state is consistent with + // what has been seen, even if the Postgres write subsequently fails. If the write + // fails the record stays pending (not ACKed) and will be re-delivered โ€” applying + // the same position twice to state is idempotent for last_position and last_seen; + // only position_count_session is double-counted, which is a session counter that + // resets on restart and is not a correctness concern. + const sink = async (records: ConsumedRecord[]): Promise => { + // 5a. Update in-memory state for every record (cheap, synchronous-like, cannot + // fail meaningfully โ€” Map operations do not throw). + for (const record of records) { + state.update(record.position); + } + + // 5b. Write to Postgres + const results = await writer.write(records); + + // 5c. ACK only the IDs that succeeded or were already present. + // 'failed' records are deliberately left pending for retry. + return results + .filter((r) => r.status === 'inserted' || r.status === 'duplicate') + .map((r) => r.id); + }; + + // 6. Build and start the consumer + const consumer = createConsumer(redis, config, logger, metrics, sink); + await consumer.start(); + + // 7. Install graceful shutdown stub. + // Full Phase 3 hardening: explicit consumer-group commit on SIGTERM, + // uncaught-exception handler, multi-instance drain mode. + installGracefulShutdown({ redis, pool, consumer, logger }); + + logger.info( + { + stream: config.REDIS_TELEMETRY_STREAM, + group: config.REDIS_CONSUMER_GROUP, + consumer: config.REDIS_CONSUMER_NAME, + }, + 'processor ready', + ); +} + +// ------------------------------------------------------------------------- +// Graceful shutdown stub โ€” Phase 3 finalizes this +// ------------------------------------------------------------------------- + +type ShutdownDeps = { + readonly redis: Redis; + readonly pool: pg.Pool; + readonly consumer: { stop: () => Promise }; + readonly logger: ReturnType; +}; + +function installGracefulShutdown(deps: ShutdownDeps): void { + const { redis, pool, consumer, logger: log } = deps; + + let shuttingDown = false; + + function shutdown(signal: string): void { + if (shuttingDown) return; + shuttingDown = true; + + log.info({ signal }, 'shutdown signal received'); + + // Stop consumer loop โ€” exits after the current batch finishes. + consumer + .stop() + .then(() => { + log.info('consumer stopped'); + return redis.quit(); + }) + .then(() => { + log.info('Redis disconnected'); + return pool.end(); + }) + .then(() => { + log.info('graceful shutdown complete'); + process.exit(0); + }) + .catch((err: unknown) => { + log.error({ err }, 'error during shutdown'); + process.exit(1); + }); + + // Force exit after 15s if the graceful path stalls (e.g. a hung Postgres write). + setTimeout(() => { + log.warn('forced exit after shutdown timeout'); + process.exit(1); + }, 15_000).unref(); + } + + process.on('SIGTERM', () => shutdown('SIGTERM')); + process.on('SIGINT', () => shutdown('SIGINT')); +} + +// ------------------------------------------------------------------------- +// Entry point +// ------------------------------------------------------------------------- + +main().catch((err: unknown) => { + process.stderr.write( + `Fatal startup error: ${err instanceof Error ? err.message : String(err)}\n`, + ); + process.exit(1); +}); diff --git a/test/consumer.test.ts b/test/consumer.test.ts new file mode 100644 index 0000000..e456c82 --- /dev/null +++ b/test/consumer.test.ts @@ -0,0 +1,608 @@ +/** + * Unit tests for src/core/consumer.ts + * + * All Redis I/O is mocked โ€” no real Redis required. The integration test + * (task 1.10) covers the end-to-end round-trip. + * + * Covers: + * - Decodes a synthetic stream entry into a ConsumedRecord with the right shape + * - Calls sink with the decoded batch and ACKs only the IDs the sink returned + * - Partial ACK: sink returns subset of IDs; only those are ACKed + * - BUSYGROUP error from XGROUP CREATE is swallowed and continues + * - Malformed payload: increments metric, logs at error, does NOT ACK the entry + * - Missing payload field: logs at error, does NOT ACK the entry + * - stop() causes the loop to exit cleanly + * - XREADGROUP failure logs error and backs off without crashing + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import type { MockedFunction } from 'vitest'; +import type { Redis } from 'ioredis'; +import type { Logger } from 'pino'; +import type { Config } from '../src/config/load.js'; +import type { Metrics, Position } from '../src/core/types.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeSilentLogger(): Logger { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + fatal: vi.fn(), + child: vi.fn().mockReturnThis(), + trace: vi.fn(), + level: 'silent', + silent: vi.fn(), + } as unknown as Logger; +} + +function makeMetrics(): Metrics & { + incCalls: Array<{ name: string; labels?: Record }>; +} { + const incCalls: Array<{ name: string; labels?: Record }> = []; + return { + incCalls, + inc: (name: string, labels?: Record) => { + incCalls.push({ name, labels }); + }, + observe: vi.fn(), + }; +} + +function makeConfig(overrides: Partial = {}): Config { + return { + NODE_ENV: 'test', + INSTANCE_ID: 'test-processor', + LOG_LEVEL: 'silent', + REDIS_URL: 'redis://localhost:6379', + POSTGRES_URL: 'postgres://localhost:5432/test', + REDIS_TELEMETRY_STREAM: 'telemetry:t', + REDIS_CONSUMER_GROUP: 'processor', + REDIS_CONSUMER_NAME: 'test-consumer', + METRICS_PORT: 9090, + BATCH_SIZE: 10, + BATCH_BLOCK_MS: 100, + WRITE_BATCH_SIZE: 50, + DEVICE_STATE_LRU_CAP: 1000, + ...overrides, + }; +} + +/** + * Builds the JSON payload for a synthetic Position, mirroring tcp-ingestion's + * serialization format (sentinel encoding for bigint/Buffer/Date). + */ +function buildPayload(overrides: Partial = {}): string { + const position: Position = { + device_id: 'TESTDEVICE001', + timestamp: new Date('2024-05-01T12:00:00.000Z'), + latitude: 54.6872, + longitude: 25.2797, + altitude: 100, + angle: 90, + speed: 50, + satellites: 12, + priority: 1, + attributes: {}, + ...overrides, + }; + + function jsonReplacer(_key: string, value: unknown): unknown { + if (typeof value === 'bigint') return { __bigint: value.toString() }; + if (value instanceof Uint8Array) return { __buffer_b64: Buffer.from(value).toString('base64') }; + if (value instanceof Date) return value.toISOString(); + return value; + } + + return JSON.stringify(position, jsonReplacer); +} + +/** + * Builds a raw XREADGROUP response for a single entry. + * ioredis returns: [[streamName, [[entryId, [field, value, ...]], ...]]] + */ +function buildXreadgroupResponse( + stream: string, + entries: Array<{ id: string; fields: Record }>, +): [string, [string, string[]][]][] { + return [ + [ + stream, + entries.map(({ id, fields }) => [ + id, + Object.entries(fields).flat(), + ] as [string, string[]]), + ], + ]; +} + +// --------------------------------------------------------------------------- +// Mock ioredis +// --------------------------------------------------------------------------- + +type MockRedis = { + xgroup: MockedFunction<(...args: unknown[]) => Promise>; + xreadgroup: MockedFunction<(...args: unknown[]) => Promise>; + xack: MockedFunction<(...args: unknown[]) => Promise>; +}; + +function makeMockRedis(): MockRedis { + return { + xgroup: vi.fn().mockResolvedValue('OK'), + xreadgroup: vi.fn().mockResolvedValue(null), // default: BLOCK timeout + xack: vi.fn().mockResolvedValue(1), + }; +} + +// --------------------------------------------------------------------------- +// ensureConsumerGroup tests +// --------------------------------------------------------------------------- + +import { ensureConsumerGroup } from '../src/core/consumer.js'; + +describe('ensureConsumerGroup', () => { + it('calls XGROUP CREATE with MKSTREAM and $ start ID', async () => { + const redis = makeMockRedis(); + const logger = makeSilentLogger(); + + await ensureConsumerGroup(redis as unknown as Redis, 'telemetry:t', 'processor', logger); + + expect(redis.xgroup).toHaveBeenCalledWith('CREATE', 'telemetry:t', 'processor', '$', 'MKSTREAM'); + expect(logger.info).toHaveBeenCalledWith( + expect.objectContaining({ stream: 'telemetry:t', group: 'processor' }), + 'consumer group created', + ); + }); + + it('swallows BUSYGROUP error and logs info', async () => { + const redis = makeMockRedis(); + redis.xgroup.mockRejectedValue(new Error('BUSYGROUP Consumer Group name already exists')); + const logger = makeSilentLogger(); + + await expect( + ensureConsumerGroup(redis as unknown as Redis, 'telemetry:t', 'processor', logger), + ).resolves.toBeUndefined(); + + expect(logger.info).toHaveBeenCalledWith( + expect.objectContaining({ stream: 'telemetry:t', group: 'processor' }), + 'consumer group already exists', + ); + }); + + it('rethrows non-BUSYGROUP errors', async () => { + const redis = makeMockRedis(); + redis.xgroup.mockRejectedValue(new Error('NOPERM no permissions')); + const logger = makeSilentLogger(); + + await expect( + ensureConsumerGroup(redis as unknown as Redis, 'telemetry:t', 'processor', logger), + ).rejects.toThrow('NOPERM no permissions'); + }); +}); + +// --------------------------------------------------------------------------- +// createConsumer tests +// --------------------------------------------------------------------------- + +import { createConsumer } from '../src/core/consumer.js'; +import type { ConsumedRecord } from '../src/core/consumer.js'; + +describe('createConsumer โ€” happy path', () => { + afterEach(() => { + vi.restoreAllMocks(); + vi.useRealTimers(); + }); + + it('decodes a stream entry and passes a ConsumedRecord to the sink', async () => { + const redis = makeMockRedis(); + const logger = makeSilentLogger(); + const metrics = makeMetrics(); + const config = makeConfig(); + + const payload = buildPayload({ device_id: 'DEV001' }); + const stream = 'telemetry:t'; + const entryId = '1714488000000-0'; + + // First call: return one entry. Subsequent calls: return null (BLOCK timeout). + redis.xreadgroup + .mockResolvedValueOnce( + buildXreadgroupResponse(stream, [ + { id: entryId, fields: { ts: '2024-05-01T12:00:00.000Z', device_id: 'DEV001', codec: '8', payload } }, + ]), + ) + .mockResolvedValue(null); + + const receivedRecords: ConsumedRecord[][] = []; + let consumerRef: ReturnType | undefined; + + const sink = vi.fn(async (records: ConsumedRecord[]) => { + receivedRecords.push(records); + // Stop the consumer after processing the first batch so the loop exits. + void consumerRef?.stop(); + return records.map((r) => r.id); + }); + + const consumer = createConsumer( + redis as unknown as Redis, + config, + logger, + metrics, + sink, + ); + consumerRef = consumer; + + await consumer.start(); + // Wait for the consumer to process and stop + await consumer.stop(); + + expect(receivedRecords.length).toBeGreaterThanOrEqual(1); + const firstBatch = receivedRecords[0]; + expect(firstBatch).toBeDefined(); + expect(firstBatch!.length).toBe(1); + + const record = firstBatch![0]!; + expect(record.id).toBe(entryId); + expect(record.codec).toBe('8'); + expect(record.ts).toBe('2024-05-01T12:00:00.000Z'); + expect(record.position.device_id).toBe('DEV001'); + expect(record.position.latitude).toBe(54.6872); + }); + + it('ACKs only the IDs returned by the sink (partial ACK)', async () => { + const redis = makeMockRedis(); + const logger = makeSilentLogger(); + const metrics = makeMetrics(); + const config = makeConfig(); + + const stream = 'telemetry:t'; + const ids = ['1000-0', '1000-1', '1000-2']; + + const entries = ids.map((id) => ({ + id, + fields: { + ts: '2024-05-01T12:00:00.000Z', + device_id: `DEV${id}`, + codec: '8', + payload: buildPayload({ device_id: `DEV${id}` }), + }, + })); + + let consumerRef: ReturnType | undefined; + + redis.xreadgroup + .mockResolvedValueOnce(buildXreadgroupResponse(stream, entries)) + .mockResolvedValue(null); + + // Sink returns only the first and third IDs โ€” second stays pending + const sink = vi.fn(async (records: ConsumedRecord[]) => { + void consumerRef?.stop(); + return [records[0]!.id, records[2]!.id]; + }); + + const consumer = createConsumer( + redis as unknown as Redis, + config, + logger, + metrics, + sink, + ); + consumerRef = consumer; + + await consumer.start(); + await consumer.stop(); + + expect(redis.xack).toHaveBeenCalledWith(stream, 'processor', ids[0], ids[2]); + // id[1] must NOT be in any xack call + const xackCalls = redis.xack.mock.calls.flat(); + expect(xackCalls).not.toContain(ids[1]); + }); + + it('does not call xack when sink returns an empty array', async () => { + const redis = makeMockRedis(); + const logger = makeSilentLogger(); + const metrics = makeMetrics(); + const config = makeConfig(); + + const stream = 'telemetry:t'; + let consumerRef: ReturnType | undefined; + + redis.xreadgroup + .mockResolvedValueOnce( + buildXreadgroupResponse(stream, [ + { + id: '2000-0', + fields: { + ts: '2024-05-01T12:00:00.000Z', + device_id: 'DEV002', + codec: '8', + payload: buildPayload({ device_id: 'DEV002' }), + }, + }, + ]), + ) + .mockResolvedValue(null); + + const sink = vi.fn(async (_records: ConsumedRecord[]) => { + void consumerRef?.stop(); + return []; + }); + + const consumer = createConsumer( + redis as unknown as Redis, + config, + logger, + metrics, + sink, + ); + consumerRef = consumer; + + await consumer.start(); + await consumer.stop(); + + expect(redis.xack).not.toHaveBeenCalled(); + }); +}); + +describe('createConsumer โ€” decode errors', () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('skips malformed payload: increments metric, logs error, does not ACK', async () => { + const redis = makeMockRedis(); + const logger = makeSilentLogger(); + const metrics = makeMetrics(); + const config = makeConfig(); + + const stream = 'telemetry:t'; + const badId = '3000-0'; + let consumerRef: ReturnType | undefined; + + redis.xreadgroup + .mockResolvedValueOnce( + buildXreadgroupResponse(stream, [ + { + id: badId, + fields: { + ts: '2024-05-01T12:00:00.000Z', + device_id: 'DEV003', + codec: '8', + payload: 'not valid json {{{', + }, + }, + ]), + ) + .mockResolvedValue(null); + + const sink = vi.fn(async (_records: ConsumedRecord[]) => { + void consumerRef?.stop(); + return []; + }); + + const consumer = createConsumer( + redis as unknown as Redis, + config, + logger, + metrics, + sink, + ); + consumerRef = consumer; + + await consumer.start(); + await consumer.stop(); + + // Decode error metric incremented + expect(metrics.incCalls.some((c) => c.name === 'processor_decode_errors_total')).toBe(true); + + // Logged at error + expect(logger.error).toHaveBeenCalled(); + + // Sink was called with empty records (bad entry filtered out) + expect(sink).toHaveBeenCalledWith([]); + + // No XACK for the bad entry + expect(redis.xack).not.toHaveBeenCalledWith(stream, 'processor', badId); + }); + + it('skips entry with missing payload field: increments metric, logs error, does not ACK', async () => { + const redis = makeMockRedis(); + const logger = makeSilentLogger(); + const metrics = makeMetrics(); + const config = makeConfig(); + + const stream = 'telemetry:t'; + const badId = '3001-0'; + let consumerRef: ReturnType | undefined; + + redis.xreadgroup + .mockResolvedValueOnce( + buildXreadgroupResponse(stream, [ + { + id: badId, + // No payload field + fields: { ts: '2024-05-01T12:00:00.000Z', device_id: 'DEV004', codec: '8' }, + }, + ]), + ) + .mockResolvedValue(null); + + const sink = vi.fn(async (_records: ConsumedRecord[]) => { + void consumerRef?.stop(); + return []; + }); + + const consumer = createConsumer( + redis as unknown as Redis, + config, + logger, + metrics, + sink, + ); + consumerRef = consumer; + + await consumer.start(); + await consumer.stop(); + + expect(metrics.incCalls.some((c) => c.name === 'processor_decode_errors_total')).toBe(true); + expect(logger.error).toHaveBeenCalled(); + expect(redis.xack).not.toHaveBeenCalled(); + }); + + it('valid and invalid entries in the same batch: ACKs only valid ones', async () => { + const redis = makeMockRedis(); + const logger = makeSilentLogger(); + const metrics = makeMetrics(); + const config = makeConfig(); + + const stream = 'telemetry:t'; + const goodId = '4000-0'; + const badId = '4000-1'; + let consumerRef: ReturnType | undefined; + + redis.xreadgroup + .mockResolvedValueOnce( + buildXreadgroupResponse(stream, [ + { + id: goodId, + fields: { + ts: '2024-05-01T12:00:00.000Z', + device_id: 'DEV005', + codec: '8', + payload: buildPayload({ device_id: 'DEV005' }), + }, + }, + { + id: badId, + fields: { + ts: '2024-05-01T12:00:00.000Z', + device_id: 'DEV005', + codec: '8', + payload: 'not json', + }, + }, + ]), + ) + .mockResolvedValue(null); + + const sink = vi.fn(async (records: ConsumedRecord[]) => { + void consumerRef?.stop(); + return records.map((r) => r.id); + }); + + const consumer = createConsumer( + redis as unknown as Redis, + config, + logger, + metrics, + sink, + ); + consumerRef = consumer; + + await consumer.start(); + await consumer.stop(); + + // Sink received only the good record + expect(sink).toHaveBeenCalledWith( + expect.arrayContaining([expect.objectContaining({ id: goodId })]), + ); + expect(sink).toHaveBeenCalledWith( + expect.not.arrayContaining([expect.objectContaining({ id: badId })]), + ); + + // ACK called for good entry only + expect(redis.xack).toHaveBeenCalledWith(stream, 'processor', goodId); + const xackArgs = redis.xack.mock.calls.flat(); + expect(xackArgs).not.toContain(badId); + }); +}); + +describe('createConsumer โ€” XREADGROUP failure', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + it('backs off and retries after XREADGROUP error', async () => { + const redis = makeMockRedis(); + const logger = makeSilentLogger(); + const metrics = makeMetrics(); + const config = makeConfig({ BATCH_BLOCK_MS: 10 }); + + let consumerRef: ReturnType | undefined; + + let callCount = 0; + redis.xreadgroup.mockImplementation(async () => { + callCount++; + if (callCount === 1) { + throw new Error('LOADING Redis is loading the dataset in memory'); + } + // Stop consumer on second call + void consumerRef?.stop(); + return null; + }); + + const sink = vi.fn(async () => []); + + const consumer = createConsumer( + redis as unknown as Redis, + config, + logger, + metrics, + sink, + ); + consumerRef = consumer; + + await consumer.start(); + + // Advance timers past the 1000ms backoff + await vi.advanceTimersByTimeAsync(1_100); + await consumer.stop(); + + expect(logger.error).toHaveBeenCalledWith( + expect.objectContaining({ err: expect.anything() }), + 'XREADGROUP failed; backing off', + ); + + // Should have retried at least once + expect(callCount).toBeGreaterThanOrEqual(2); + }); +}); + +describe('createConsumer โ€” clean stop', () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('stop() returns after current batch completes', async () => { + const redis = makeMockRedis(); + const logger = makeSilentLogger(); + const metrics = makeMetrics(); + const config = makeConfig(); + + // Return null immediately (BLOCK timeout) so the loop spins and we can stop it + redis.xreadgroup.mockResolvedValue(null); + + const sink = vi.fn(async () => []); + + const consumer = createConsumer( + redis as unknown as Redis, + config, + logger, + metrics, + sink, + ); + + await consumer.start(); + + // stop() should resolve without hanging + await expect(consumer.stop()).resolves.toBeUndefined(); + }); +}); diff --git a/test/state.test.ts b/test/state.test.ts new file mode 100644 index 0000000..beaf4a0 --- /dev/null +++ b/test/state.test.ts @@ -0,0 +1,257 @@ +/** + * Unit tests for src/core/state.ts + * + * Covers: + * - First update creates entry; subsequent updates increment position_count_session + * - LRU eviction: with cap=3, after 4 distinct devices the oldest is evicted + * - Eviction increments evictedTotal() + * - last_seen reflects the position's timestamp (device-reported time) + * - Out-of-order positions: last_seen only advances forward (max semantics) + * - get() returns undefined for unknown devices + * - size() returns the current number of stored devices + * - LRU order: most-recently-updated device is not evicted on overflow + */ + +import { describe, it, expect, vi } from 'vitest'; +import type { Logger } from 'pino'; +import type { Config } from '../src/config/load.js'; +import type { Position } from '../src/core/types.js'; +import { createDeviceStateStore } from '../src/core/state.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeSilentLogger(): Logger { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + fatal: vi.fn(), + child: vi.fn().mockReturnThis(), + trace: vi.fn(), + level: 'silent', + silent: vi.fn(), + } as unknown as Logger; +} + +function makeConfig(overrides: Partial = {}): Config { + return { + NODE_ENV: 'test', + INSTANCE_ID: 'test-processor', + LOG_LEVEL: 'silent', + REDIS_URL: 'redis://localhost:6379', + POSTGRES_URL: 'postgres://localhost:5432/test', + REDIS_TELEMETRY_STREAM: 'telemetry:t', + REDIS_CONSUMER_GROUP: 'processor', + REDIS_CONSUMER_NAME: 'test-consumer', + METRICS_PORT: 9090, + BATCH_SIZE: 10, + BATCH_BLOCK_MS: 100, + WRITE_BATCH_SIZE: 50, + DEVICE_STATE_LRU_CAP: 1000, + ...overrides, + }; +} + +function makePosition(deviceId: string, overrides: Partial = {}): Position { + return { + device_id: deviceId, + timestamp: new Date('2024-05-01T12:00:00.000Z'), + latitude: 54.6872, + longitude: 25.2797, + altitude: 100, + angle: 90, + speed: 50, + satellites: 12, + priority: 1, + attributes: {}, + ...overrides, + }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('createDeviceStateStore โ€” initial state', () => { + it('creates a new entry on first update', () => { + const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const position = makePosition('DEV001'); + + const state = store.update(position); + + expect(state.device_id).toBe('DEV001'); + expect(state.last_position).toBe(position); + expect(state.position_count_session).toBe(1); + expect(state.last_seen).toEqual(position.timestamp); + }); + + it('increments position_count_session on subsequent updates', () => { + const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const pos1 = makePosition('DEV001', { timestamp: new Date('2024-05-01T12:00:00.000Z') }); + const pos2 = makePosition('DEV001', { timestamp: new Date('2024-05-01T12:00:01.000Z') }); + const pos3 = makePosition('DEV001', { timestamp: new Date('2024-05-01T12:00:02.000Z') }); + + store.update(pos1); + store.update(pos2); + const state = store.update(pos3); + + expect(state.position_count_session).toBe(3); + }); + + it('get() returns undefined for an unknown device', () => { + const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + + expect(store.get('UNKNOWN')).toBeUndefined(); + }); + + it('get() returns the current state for a known device', () => { + const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const position = makePosition('DEV002'); + + store.update(position); + const state = store.get('DEV002'); + + expect(state).toBeDefined(); + expect(state?.device_id).toBe('DEV002'); + }); + + it('size() returns 0 before any updates', () => { + const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + expect(store.size()).toBe(0); + }); + + it('size() returns the number of distinct devices after updates', () => { + const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + + store.update(makePosition('DEV001')); + store.update(makePosition('DEV002')); + store.update(makePosition('DEV001')); // duplicate device โ€” should not increase size + + expect(store.size()).toBe(2); + }); +}); + +describe('createDeviceStateStore โ€” last_seen semantics', () => { + it('last_seen reflects the position timestamp (not wall clock)', () => { + const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const ts = new Date('2024-03-15T08:30:00.000Z'); + const position = makePosition('DEV010', { timestamp: ts }); + + const state = store.update(position); + + expect(state.last_seen).toEqual(ts); + expect(state.last_seen).not.toBe(new Date()); // not wall clock + }); + + it('last_seen advances on newer timestamps', () => { + const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const ts1 = new Date('2024-05-01T10:00:00.000Z'); + const ts2 = new Date('2024-05-01T11:00:00.000Z'); + + store.update(makePosition('DEV011', { timestamp: ts1 })); + const state = store.update(makePosition('DEV011', { timestamp: ts2 })); + + expect(state.last_seen).toEqual(ts2); + }); + + it('last_seen does NOT regress on out-of-order (older) timestamps', () => { + // Devices buffer offline records and replay them in bursts; within a burst + // consecutive timestamps may decrease. last_seen must mean "highest device + // timestamp seen so far" โ€” it must never go backward. + const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const newer = new Date('2024-05-01T12:00:00.000Z'); + const older = new Date('2024-05-01T10:00:00.000Z'); + + store.update(makePosition('DEV012', { timestamp: newer })); + const state = store.update(makePosition('DEV012', { timestamp: older })); + + // last_seen must remain at the newer timestamp, not regress to older + expect(state.last_seen).toEqual(newer); + }); + + it('last_seen stays the same when equal timestamps arrive', () => { + const store = createDeviceStateStore(makeConfig(), makeSilentLogger()); + const ts = new Date('2024-05-01T12:00:00.000Z'); + + store.update(makePosition('DEV013', { timestamp: ts })); + const state = store.update(makePosition('DEV013', { timestamp: new Date(ts.getTime()) })); + + expect(state.last_seen).toEqual(ts); + }); +}); + +describe('createDeviceStateStore โ€” LRU eviction', () => { + it('evicts the least-recently-updated device when cap is exceeded', () => { + const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 3 }), makeSilentLogger()); + const ts = new Date('2024-05-01T12:00:00.000Z'); + + // Insert 3 devices: DEV001, DEV002, DEV003 (DEV001 is oldest) + store.update(makePosition('DEV001', { timestamp: ts })); + store.update(makePosition('DEV002', { timestamp: ts })); + store.update(makePosition('DEV003', { timestamp: ts })); + + expect(store.size()).toBe(3); + + // Add a 4th device โ€” DEV001 (the oldest / least-recently-updated) should be evicted + store.update(makePosition('DEV004', { timestamp: ts })); + + expect(store.size()).toBe(3); + expect(store.get('DEV001')).toBeUndefined(); // evicted + expect(store.get('DEV002')).toBeDefined(); + expect(store.get('DEV003')).toBeDefined(); + expect(store.get('DEV004')).toBeDefined(); + }); + + it('re-using an existing device bumps it to most-recent so it is not evicted next', () => { + const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 3 }), makeSilentLogger()); + const ts1 = new Date('2024-05-01T12:00:00.000Z'); + const ts2 = new Date('2024-05-01T12:00:01.000Z'); + + store.update(makePosition('DEV001', { timestamp: ts1 })); + store.update(makePosition('DEV002', { timestamp: ts1 })); + store.update(makePosition('DEV003', { timestamp: ts1 })); + + // Re-touch DEV001 โ€” it should now be the most-recently-updated + store.update(makePosition('DEV001', { timestamp: ts2 })); + + // Add DEV004 โ€” DEV002 should be evicted (it is now the oldest) + store.update(makePosition('DEV004', { timestamp: ts1 })); + + expect(store.size()).toBe(3); + expect(store.get('DEV001')).toBeDefined(); // was re-touched + expect(store.get('DEV002')).toBeUndefined(); // evicted (oldest after DEV001 was re-touched) + expect(store.get('DEV003')).toBeDefined(); + expect(store.get('DEV004')).toBeDefined(); + }); + + it('evictedTotal() increments on each eviction', () => { + const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 2 }), makeSilentLogger()); + const ts = new Date('2024-05-01T12:00:00.000Z'); + + expect(store.evictedTotal()).toBe(0); + + store.update(makePosition('DEV001', { timestamp: ts })); + store.update(makePosition('DEV002', { timestamp: ts })); + expect(store.evictedTotal()).toBe(0); + + store.update(makePosition('DEV003', { timestamp: ts })); // evicts DEV001 + expect(store.evictedTotal()).toBe(1); + + store.update(makePosition('DEV004', { timestamp: ts })); // evicts DEV002 + expect(store.evictedTotal()).toBe(2); + }); + + it('evictedTotal() stays 0 when cap is never reached', () => { + const store = createDeviceStateStore(makeConfig({ DEVICE_STATE_LRU_CAP: 1000 }), makeSilentLogger()); + const ts = new Date('2024-05-01T12:00:00.000Z'); + + for (let i = 0; i < 10; i++) { + store.update(makePosition(`DEV${i}`, { timestamp: ts })); + } + + expect(store.evictedTotal()).toBe(0); + }); +}); diff --git a/test/writer.test.ts b/test/writer.test.ts new file mode 100644 index 0000000..3bb740c --- /dev/null +++ b/test/writer.test.ts @@ -0,0 +1,501 @@ +/** + * Unit tests for src/core/writer.ts + * + * All Postgres I/O is mocked โ€” no real database required. The integration test + * (task 1.10) covers byte-level round-trip including TimescaleDB hypertable. + * + * Covers: + * - Happy path: all records inserted (all appear in RETURNING rows) + * - Duplicate-key: ON CONFLICT DO NOTHING โ†’ records absent from RETURNING โ†’ 'duplicate' + * - Mixed: half new, half duplicate + * - Pool error: all records in the batch get 'failed'; error is attached + * - Chunking: batch larger than WRITE_BATCH_SIZE results in multiple queries + * - Bigint attribute is stringified before serialization + * - Buffer attribute is base64-encoded before serialization + * - Empty batch returns empty results + * - SQL parameter order: device_id, ts, latitude, longitude, altitude, angle, + * speed, satellites, priority, codec, attributes + */ + +import { describe, it, expect, vi } from 'vitest'; +import type { Logger } from 'pino'; +import type { Pool } from 'pg'; +import type { Config } from '../src/config/load.js'; +import type { Metrics, Position } from '../src/core/types.js'; +import type { ConsumedRecord } from '../src/core/consumer.js'; +import { createWriter } from '../src/core/writer.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeSilentLogger(): Logger { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + fatal: vi.fn(), + child: vi.fn().mockReturnThis(), + trace: vi.fn(), + level: 'silent', + silent: vi.fn(), + } as unknown as Logger; +} + +function makeMetrics(): Metrics { + return { + inc: vi.fn(), + observe: vi.fn(), + }; +} + +function makeConfig(overrides: Partial = {}): Config { + return { + NODE_ENV: 'test', + INSTANCE_ID: 'test-processor', + LOG_LEVEL: 'silent', + REDIS_URL: 'redis://localhost:6379', + POSTGRES_URL: 'postgres://localhost:5432/test', + REDIS_TELEMETRY_STREAM: 'telemetry:t', + REDIS_CONSUMER_GROUP: 'processor', + REDIS_CONSUMER_NAME: 'test-consumer', + METRICS_PORT: 9090, + BATCH_SIZE: 10, + BATCH_BLOCK_MS: 100, + WRITE_BATCH_SIZE: 50, + DEVICE_STATE_LRU_CAP: 1000, + ...overrides, + }; +} + +function makePosition(deviceId: string, overrides: Partial = {}): Position { + return { + device_id: deviceId, + timestamp: new Date('2024-05-01T12:00:00.000Z'), + latitude: 54.6872, + longitude: 25.2797, + altitude: 100, + angle: 90, + speed: 50, + satellites: 12, + priority: 1, + attributes: {}, + ...overrides, + }; +} + +function makeRecord(id: string, deviceId: string, overrides: Partial = {}): ConsumedRecord { + return { + id, + position: makePosition(deviceId, overrides), + codec: '8', + ts: '2024-05-01T12:00:00.000Z', + }; +} + +/** + * Creates a mock pg.Pool whose query() returns the given rows. + * Captures all SQL and params for assertion. + */ +function makeMockPool( + queryResponses: Array<{ rows: unknown[] } | Error>, +): { + pool: Pool; + queryCalls: Array<{ sql: string; params: unknown[] }>; +} { + const queryCalls: Array<{ sql: string; params: unknown[] }> = []; + let callIndex = 0; + + const query = vi.fn(async (sql: string, params: unknown[] = []) => { + queryCalls.push({ sql, params }); + const response = queryResponses[callIndex++]; + if (response instanceof Error) throw response; + return response ?? { rows: [] }; + }); + + return { + pool: { query } as unknown as Pool, + queryCalls, + }; +} + +// --------------------------------------------------------------------------- +// Tests โ€” happy path +// --------------------------------------------------------------------------- + +describe('createWriter โ€” happy path', () => { + it('returns inserted for all records when all appear in RETURNING', async () => { + const ts = new Date('2024-05-01T12:00:00.000Z'); + const records = [ + makeRecord('1-0', 'DEV001', { timestamp: ts }), + makeRecord('1-1', 'DEV002', { timestamp: ts }), + ]; + + const { pool } = makeMockPool([ + { + rows: [ + { device_id: 'DEV001', ts }, + { device_id: 'DEV002', ts }, + ], + }, + ]); + + const writer = createWriter(pool, makeConfig(), makeSilentLogger(), makeMetrics()); + const results = await writer.write(records); + + expect(results).toHaveLength(2); + expect(results[0]).toEqual({ id: '1-0', status: 'inserted' }); + expect(results[1]).toEqual({ id: '1-1', status: 'inserted' }); + }); + + it('returns duplicate for records absent from RETURNING', async () => { + const ts = new Date('2024-05-01T12:00:00.000Z'); + const records = [ + makeRecord('2-0', 'DEV003', { timestamp: ts }), + makeRecord('2-1', 'DEV004', { timestamp: ts }), + ]; + + // Only DEV003 returned โ€” DEV004 was a conflict + const { pool } = makeMockPool([{ rows: [{ device_id: 'DEV003', ts }] }]); + + const writer = createWriter(pool, makeConfig(), makeSilentLogger(), makeMetrics()); + const results = await writer.write(records); + + expect(results).toHaveLength(2); + expect(results.find((r) => r.id === '2-0')?.status).toBe('inserted'); + expect(results.find((r) => r.id === '2-1')?.status).toBe('duplicate'); + }); + + it('handles mixed batch: some inserted, some duplicate', async () => { + const ts = new Date('2024-05-01T12:00:00.000Z'); + const records = [ + makeRecord('3-0', 'DEV005', { timestamp: ts }), + makeRecord('3-1', 'DEV006', { timestamp: ts }), + makeRecord('3-2', 'DEV007', { timestamp: ts }), + makeRecord('3-3', 'DEV008', { timestamp: ts }), + ]; + + // DEV005 and DEV007 are new; DEV006 and DEV008 are duplicates + const { pool } = makeMockPool([ + { + rows: [ + { device_id: 'DEV005', ts }, + { device_id: 'DEV007', ts }, + ], + }, + ]); + + const writer = createWriter(pool, makeConfig(), makeSilentLogger(), makeMetrics()); + const results = await writer.write(records); + + expect(results.find((r) => r.id === '3-0')?.status).toBe('inserted'); + expect(results.find((r) => r.id === '3-1')?.status).toBe('duplicate'); + expect(results.find((r) => r.id === '3-2')?.status).toBe('inserted'); + expect(results.find((r) => r.id === '3-3')?.status).toBe('duplicate'); + }); + + it('returns empty array for empty input', async () => { + const { pool } = makeMockPool([]); + const writer = createWriter(pool, makeConfig(), makeSilentLogger(), makeMetrics()); + const results = await writer.write([]); + expect(results).toEqual([]); + }); +}); + +// --------------------------------------------------------------------------- +// Tests โ€” failure handling +// --------------------------------------------------------------------------- + +describe('createWriter โ€” pool error', () => { + it('returns failed for all records in the chunk when pool throws', async () => { + const ts = new Date('2024-05-01T12:00:00.000Z'); + const records = [ + makeRecord('4-0', 'DEV009', { timestamp: ts }), + makeRecord('4-1', 'DEV010', { timestamp: ts }), + ]; + + const dbError = new Error('connection terminated unexpectedly'); + const { pool } = makeMockPool([dbError]); + + const writer = createWriter(pool, makeConfig(), makeSilentLogger(), makeMetrics()); + const results = await writer.write(records); + + expect(results).toHaveLength(2); + for (const result of results) { + expect(result.status).toBe('failed'); + expect(result.error).toBeDefined(); + expect(result.error?.message).toBe('connection terminated unexpectedly'); + } + }); + + it('logs error and increments failed metric on pool error', async () => { + const records = [makeRecord('5-0', 'DEV011')]; + const { pool } = makeMockPool([new Error('timeout')]); + const logger = makeSilentLogger(); + const metrics = makeMetrics(); + + const writer = createWriter(pool, makeConfig(), logger, metrics); + await writer.write(records); + + expect(logger.error).toHaveBeenCalled(); + expect(metrics.inc).toHaveBeenCalledWith( + 'processor_position_writes_total', + { status: 'failed' }, + ); + }); +}); + +// --------------------------------------------------------------------------- +// Tests โ€” chunking +// --------------------------------------------------------------------------- + +describe('createWriter โ€” chunking', () => { + it('splits a batch larger than WRITE_BATCH_SIZE into multiple sequential queries', async () => { + const writeBatchSize = 3; + const config = makeConfig({ WRITE_BATCH_SIZE: writeBatchSize }); + const ts = new Date('2024-05-01T12:00:00.000Z'); + + // 7 records โ†’ ceil(7/3) = 3 chunks: [3, 3, 1] + const records = Array.from({ length: 7 }, (_, i) => + makeRecord(`6-${i}`, `DEV${100 + i}`, { timestamp: ts }), + ); + + const { pool, queryCalls } = makeMockPool([ + { rows: records.slice(0, 3).map((r) => ({ device_id: r.position.device_id, ts })) }, + { rows: records.slice(3, 6).map((r) => ({ device_id: r.position.device_id, ts })) }, + { rows: records.slice(6).map((r) => ({ device_id: r.position.device_id, ts })) }, + ]); + + const writer = createWriter(pool, config, makeSilentLogger(), makeMetrics()); + const results = await writer.write(records); + + // 3 separate queries issued + expect(queryCalls).toHaveLength(3); + // All 7 records should be returned + expect(results).toHaveLength(7); + for (const result of results) { + expect(result.status).toBe('inserted'); + } + }); + + it('first chunk fails, second chunk succeeds โ€” correct per-record status', async () => { + const writeBatchSize = 2; + const config = makeConfig({ WRITE_BATCH_SIZE: writeBatchSize }); + const ts = new Date('2024-05-01T12:00:00.000Z'); + + const records = Array.from({ length: 4 }, (_, i) => + makeRecord(`7-${i}`, `DEV${200 + i}`, { timestamp: ts }), + ); + + const { pool } = makeMockPool([ + new Error('chunk 1 failed'), + { + rows: records.slice(2).map((r) => ({ device_id: r.position.device_id, ts })), + }, + ]); + + const writer = createWriter(pool, config, makeSilentLogger(), makeMetrics()); + const results = await writer.write(records); + + expect(results[0]?.status).toBe('failed'); + expect(results[1]?.status).toBe('failed'); + expect(results[2]?.status).toBe('inserted'); + expect(results[3]?.status).toBe('inserted'); + }); +}); + +// --------------------------------------------------------------------------- +// Tests โ€” attribute serialization +// --------------------------------------------------------------------------- + +describe('createWriter โ€” attribute serialization', () => { + it('serializes bigint attributes as decimal strings in the attributes JSON', async () => { + const ts = new Date('2024-05-01T12:00:00.000Z'); + const u64Max = BigInt('18446744073709551615'); + const records = [ + makeRecord('8-0', 'DEV020', { + timestamp: ts, + attributes: { io_240: u64Max }, + }), + ]; + + let capturedParams: unknown[] = []; + const query = vi.fn(async (sql: string, params: unknown[]) => { + capturedParams = params; + return { rows: [{ device_id: 'DEV020', ts }] }; + }); + const pool = { query } as unknown as Pool; + + const writer = createWriter(pool, makeConfig(), makeSilentLogger(), makeMetrics()); + await writer.write(records); + + // The attributes param is the 11th per row (index 10) + const attributesParam = capturedParams[10] as string; + expect(typeof attributesParam).toBe('string'); + const parsed = JSON.parse(attributesParam) as Record; + expect(parsed['io_240']).toBe('18446744073709551615'); + // Must be a string, not a bigint (JSON can't hold bigints) + expect(typeof parsed['io_240']).toBe('string'); + }); + + it('serializes Buffer attributes as base64 strings in the attributes JSON', async () => { + const ts = new Date('2024-05-01T12:00:00.000Z'); + const rawBytes = Buffer.from([0xde, 0xad, 0xbe, 0xef]); + const records = [ + makeRecord('9-0', 'DEV021', { + timestamp: ts, + attributes: { io_nx: rawBytes }, + }), + ]; + + let capturedParams: unknown[] = []; + const query = vi.fn(async (sql: string, params: unknown[]) => { + capturedParams = params; + return { rows: [{ device_id: 'DEV021', ts }] }; + }); + const pool = { query } as unknown as Pool; + + const writer = createWriter(pool, makeConfig(), makeSilentLogger(), makeMetrics()); + await writer.write(records); + + const attributesParam = capturedParams[10] as string; + expect(typeof attributesParam).toBe('string'); + const parsed = JSON.parse(attributesParam) as Record; + const b64 = parsed['io_nx']; + expect(typeof b64).toBe('string'); + // Decode and verify byte equality + const decoded = Buffer.from(b64 as string, 'base64'); + expect(decoded).toEqual(rawBytes); + }); + + it('serializes numeric attributes as-is in the attributes JSON', async () => { + const ts = new Date('2024-05-01T12:00:00.000Z'); + const records = [ + makeRecord('10-0', 'DEV022', { + timestamp: ts, + attributes: { io_21: 42, io_1: 0 }, + }), + ]; + + let capturedParams: unknown[] = []; + const query = vi.fn(async (sql: string, params: unknown[]) => { + capturedParams = params; + return { rows: [{ device_id: 'DEV022', ts }] }; + }); + const pool = { query } as unknown as Pool; + + const writer = createWriter(pool, makeConfig(), makeSilentLogger(), makeMetrics()); + await writer.write(records); + + const attributesParam = capturedParams[10] as string; + const parsed = JSON.parse(attributesParam) as Record; + expect(parsed['io_21']).toBe(42); + expect(parsed['io_1']).toBe(0); + }); +}); + +// --------------------------------------------------------------------------- +// Tests โ€” SQL parameter ordering +// --------------------------------------------------------------------------- + +describe('createWriter โ€” SQL parameter ordering', () => { + it('passes parameters in the correct column order', async () => { + const ts = new Date('2024-05-01T12:00:00.000Z'); + const position: Position = { + device_id: 'PARAMTEST001', + timestamp: ts, + latitude: 10.0, + longitude: 20.0, + altitude: 300, + angle: 180, + speed: 75, + satellites: 9, + priority: 2, + attributes: { io_21: 99 }, + }; + const records: ConsumedRecord[] = [{ id: 'p-0', position, codec: '8E', ts: ts.toISOString() }]; + + let capturedParams: unknown[] = []; + const query = vi.fn(async (sql: string, params: unknown[]) => { + capturedParams = params; + return { rows: [{ device_id: 'PARAMTEST001', ts }] }; + }); + const pool = { query } as unknown as Pool; + + const writer = createWriter(pool, makeConfig(), makeSilentLogger(), makeMetrics()); + await writer.write(records); + + // Expected column order (11 params for row 0, 1-indexed in SQL, 0-indexed here): + // $1 device_id + // $2 ts + // $3 latitude + // $4 longitude + // $5 altitude + // $6 angle + // $7 speed + // $8 satellites + // $9 priority + // $10 codec + // $11 attributes + expect(capturedParams[0]).toBe('PARAMTEST001'); + expect(capturedParams[1]).toEqual(ts); + expect(capturedParams[2]).toBe(10.0); + expect(capturedParams[3]).toBe(20.0); + expect(capturedParams[4]).toBe(300); + expect(capturedParams[5]).toBe(180); + expect(capturedParams[6]).toBe(75); + expect(capturedParams[7]).toBe(9); + expect(capturedParams[8]).toBe(2); + expect(capturedParams[9]).toBe('8E'); + expect(typeof capturedParams[10]).toBe('string'); + const attrs = JSON.parse(capturedParams[10] as string) as Record; + expect(attrs['io_21']).toBe(99); + }); + + it('SQL contains ON CONFLICT DO NOTHING and RETURNING clause', async () => { + const records = [makeRecord('q-0', 'DEV030')]; + let capturedSql = ''; + const query = vi.fn(async (sql: string, _params: unknown[]) => { + capturedSql = sql; + return { rows: [{ device_id: 'DEV030', ts: new Date() }] }; + }); + const pool = { query } as unknown as Pool; + + const writer = createWriter(pool, makeConfig(), makeSilentLogger(), makeMetrics()); + await writer.write(records); + + expect(capturedSql).toMatch(/ON CONFLICT.*DO NOTHING/i); + expect(capturedSql).toMatch(/RETURNING/i); + expect(capturedSql).toMatch(/device_id/); + expect(capturedSql).toMatch(/ts/); + }); +}); + +// --------------------------------------------------------------------------- +// Tests โ€” metrics +// --------------------------------------------------------------------------- + +describe('createWriter โ€” metrics', () => { + it('emits inserted and duplicate counters after a successful write', async () => { + const ts = new Date('2024-05-01T12:00:00.000Z'); + const records = [ + makeRecord('m-0', 'DEV040', { timestamp: ts }), + makeRecord('m-1', 'DEV041', { timestamp: ts }), + ]; + + // Only DEV040 returned โ€” DEV041 is duplicate + const { pool } = makeMockPool([{ rows: [{ device_id: 'DEV040', ts }] }]); + const metrics = makeMetrics(); + + const writer = createWriter(pool, makeConfig(), makeSilentLogger(), metrics); + await writer.write(records); + + expect(metrics.inc).toHaveBeenCalledWith('processor_position_writes_total', { status: 'inserted' }); + expect(metrics.inc).toHaveBeenCalledWith('processor_position_writes_total', { status: 'duplicate' }); + expect(metrics.observe).toHaveBeenCalledWith( + 'processor_position_write_duration_seconds', + expect.any(Number), + ); + }); +});