Implement Phase 1 tasks 1.5-1.8 (consumer + state + writer + main wiring)

src/core/consumer.ts — XREADGROUP loop with consumer-group resumption,
ensureConsumerGroup (BUSYGROUP-tolerant), decodeBatch (CodecError → log
+ skip + leave pending; never speculative ACK), partial-ACK semantics,
connectRedis (mirroring tcp-ingestion's retry pattern), clean stop.

src/core/state.ts — LRU Map<device_id, DeviceState> using delete+set
bump trick (no third-party LRU dep); last_seen = max(prev, ts) so
out-of-order replays don't regress the high-water mark; evictedTotal()
counter.

src/core/writer.ts — multi-row INSERT ON CONFLICT (device_id, ts) DO
NOTHING with RETURNING. Duplicate detection by set-difference between
input and RETURNING rows (xmax=0 doesn't work for skipped-conflict
rows, only returned ones — confirmed in the task spec's own Note).
Sequential chunking to WRITE_BATCH_SIZE; bigint→string and Buffer→base64
attribute serialization that handles Buffer.toJSON shape.

src/main.ts — full pipeline: pool → migrate → redis → state → writer →
sink → consumer → graceful-shutdown stub. Sink ordering is
state.update BEFORE writer.write per spec rationale (state stays
consistent with what's been seen even if not yet persisted; redelivery
is idempotent on state). Metrics is still the trace-logging shim from
tcp-ingestion's pre-1.10 pattern; real prom-client lands in task 1.9.

Verification: typecheck, lint clean; 112 unit tests passing across 7
test files (+39 from this batch).
This commit is contained in:
2026-04-30 21:47:43 +02:00
parent 6a14eb1d01
commit 2a50aaf175
12 changed files with 2218 additions and 15 deletions
+333
View File
@@ -0,0 +1,333 @@
/**
* Redis Stream consumer — XREADGROUP loop.
*
* Joins the consumer group on startup, fetches batches via XREADGROUP, decodes
* each entry to a Position, hands off to a sink callback, and ACKs only the IDs
* the sink confirms were handled. Failures stay pending in the consumer's PEL
* for re-delivery or XAUTOCLAIM (Phase 3).
*
* Design notes:
* - `connectRedis` is co-located here (rather than in src/db/) because it is
* tightly coupled to consumer startup: the consumer is the only component that
* needs a live Redis connection in Phase 1. Putting it in src/db/ would imply
* it is a general shared utility alongside pool.ts, but the Postgres pool has
* its own connectWithRetry there — symmetry is better than indirection.
*/
import type { Redis } from 'ioredis';
import type { Logger } from 'pino';
import type { Config } from '../config/load.js';
import type { Metrics, Position } from './types.js';
import { decodePosition, CodecError } from './codec.js';
// ---------------------------------------------------------------------------
// Public types
// ---------------------------------------------------------------------------
/**
* A decoded stream entry ready for the sink to process.
* The `id` field is the Redis Stream entry ID (e.g. "1714488000000-0") used for
* XACK. `codec` and `ts` come from the top-level stream fields that tcp-ingestion
* writes alongside the payload — consumers can filter on them without JSON parsing.
*/
export type ConsumedRecord = {
readonly id: string;
readonly position: Position;
readonly codec: string;
readonly ts: string;
};
/**
* Sink callback invoked once per batch. Returns the subset of IDs that were
* handled successfully; the consumer ACKs only those. Partial returns are valid:
* unacknowledged IDs stay in the PEL and are re-delivered on reconnect or claimed
* by another instance.
*/
export type Sink = (records: ConsumedRecord[]) => Promise<string[]>;
/**
* Handle returned by createConsumer. `start()` kicks off the read loop.
* `stop()` signals the loop to exit after the current batch finishes.
*/
export type Consumer = {
readonly start: () => Promise<void>;
readonly stop: () => Promise<void>;
};
// ---------------------------------------------------------------------------
// Redis connection (mirrors tcp-ingestion's connectRedis)
// ---------------------------------------------------------------------------
/**
* Connects to Redis with exponential-backoff retry on startup.
* Fails fast (process.exit) after `maxAttempts` consecutive failures so the
* orchestrator can restart rather than running with a broken connection.
*
* Placed in consumer.ts (not src/db/redis.ts) because the consumer is the only
* Phase 1 component that owns a Redis connection. Creating a separate db/redis.ts
* would be premature generalization — if Phase 2 needs a second connection the
* function can be moved then.
*/
export async function connectRedis(
redisUrl: string,
logger: Logger,
maxAttempts = 3,
): Promise<Redis> {
// Dynamic import keeps ioredis out of the module graph for test files that
// don't import this function directly.
const { default: Redis } = await import('ioredis');
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
const redis = new Redis(redisUrl, {
// Disable ioredis's built-in reconnect — we manage retries ourselves so
// startup failure is deterministic rather than silently retrying forever.
enableOfflineQueue: false,
lazyConnect: true,
maxRetriesPerRequest: 0,
connectTimeout: 5_000,
});
try {
await redis.connect();
logger.info({ attempt }, 'Redis connected');
return redis;
} catch (err) {
// Best-effort quit: ignore errors — the connection may already be in a bad
// state and we are about to retry anyway.
await redis.quit().catch(() => undefined);
if (attempt === maxAttempts) {
logger.fatal({ err, url: redisUrl }, 'Redis connection failed after all retries; exiting');
process.exit(1);
}
const backoffMs = Math.min(200 * 2 ** (attempt - 1), 5_000);
logger.warn(
{ err, attempt, maxAttempts, backoffMs },
'Redis connection failed; retrying',
);
await new Promise<void>((resolve) => setTimeout(resolve, backoffMs));
}
}
// TypeScript: unreachable after process.exit above, but needed for type safety.
/* c8 ignore next */
throw new Error('unreachable');
}
// ---------------------------------------------------------------------------
// Consumer group setup
// ---------------------------------------------------------------------------
/**
* Creates the consumer group if it does not exist, using MKSTREAM so the stream
* itself is also created if absent. `$` as the start ID means "only new entries
* from now on" — we do not replay history on first startup.
*
* BUSYGROUP means the group already exists; that is fine and expected on every
* restart after the first.
*/
export async function ensureConsumerGroup(
redis: Redis,
stream: string,
group: string,
logger: Logger,
): Promise<void> {
try {
await redis.xgroup('CREATE', stream, group, '$', 'MKSTREAM');
logger.info({ stream, group }, 'consumer group created');
} catch (err: unknown) {
// ioredis surfaces Redis errors as Error instances with the Redis error code
// in the message string.
if (err instanceof Error && err.message.startsWith('BUSYGROUP')) {
logger.info({ stream, group }, 'consumer group already exists');
return;
}
throw err;
}
}
// ---------------------------------------------------------------------------
// Batch decoder
// ---------------------------------------------------------------------------
/**
* Raw entry shape returned by ioredis XREADGROUP.
* ioredis returns: [[streamName, [[entryId, [field, value, ...]], ...]]]
*/
type RawStreamEntry = [id: string, fields: string[]];
/**
* Decodes a batch of raw stream entries into ConsumedRecord objects.
* Entries that fail to decode are logged at error and excluded from the result;
* they stay in the PEL (pending entries list) and will be re-attempted later.
*/
function decodeBatch(
entries: RawStreamEntry[],
stream: string,
logger: Logger,
metrics: Metrics,
): ConsumedRecord[] {
const records: ConsumedRecord[] = [];
for (const [id, fields] of entries) {
// ioredis returns flat [field, value, field, value, ...] arrays.
const fieldMap: Record<string, string> = {};
for (let i = 0; i + 1 < fields.length; i += 2) {
const key = fields[i];
const val = fields[i + 1];
if (key !== undefined && val !== undefined) {
fieldMap[key] = val;
}
}
const payload = fieldMap['payload'];
const codec = fieldMap['codec'] ?? '';
const ts = fieldMap['ts'] ?? '';
if (payload === undefined) {
logger.error({ id, stream }, 'stream entry missing payload field; leaving pending');
metrics.inc('processor_decode_errors_total', { stream });
continue;
}
let position: Position;
try {
position = decodePosition(payload);
} catch (err) {
const isCodecError = err instanceof CodecError;
logger.error(
{
id,
stream,
err,
// Truncate the raw payload to avoid flooding logs with large records.
rawPayloadHead: payload.slice(0, 256),
},
isCodecError ? 'decode error; leaving entry pending' : 'unexpected error decoding entry',
);
metrics.inc('processor_decode_errors_total', { stream });
continue;
}
records.push({ id, position, codec, ts });
}
return records;
}
// ---------------------------------------------------------------------------
// Consumer factory
// ---------------------------------------------------------------------------
/**
* Creates a Redis Stream consumer that reads from the given stream+group in a
* loop, decodes each entry, passes the batch to the sink, and ACKs only what
* the sink confirms.
*/
export function createConsumer(
redis: Redis,
config: Config,
logger: Logger,
metrics: Metrics,
sink: Sink,
): Consumer {
const stream = config.REDIS_TELEMETRY_STREAM;
const group = config.REDIS_CONSUMER_GROUP;
const consumerName = config.REDIS_CONSUMER_NAME;
const batchSize = config.BATCH_SIZE;
const batchBlockMs = config.BATCH_BLOCK_MS;
let stopping = false;
// Resolves when the currently in-flight batch (if any) has completed.
// Initialized to a resolved promise so stop() works cleanly if called before
// any batch starts.
let inFlightBatch: Promise<void> = Promise.resolve();
async function sleep(ms: number): Promise<void> {
await new Promise<void>((resolve) => setTimeout(resolve, ms));
}
async function runLoop(): Promise<void> {
logger.info(
{ stream, group, consumer: consumerName },
'consumer started on stream',
);
while (!stopping) {
let rawResult: [string, [string, string[]][]][] | null;
try {
// ioredis types: xreadgroup returns null on BLOCK timeout, otherwise an
// array of [streamName, entries[]] pairs.
rawResult = (await redis.xreadgroup(
'GROUP',
group,
consumerName,
'COUNT',
String(batchSize),
'BLOCK',
String(batchBlockMs),
'STREAMS',
stream,
'>',
)) as [string, [string, string[]][]][] | null;
} catch (err) {
if (stopping) break;
logger.error({ err }, 'XREADGROUP failed; backing off');
await sleep(1_000);
continue;
}
// BLOCK timeout — no new entries; loop again to check stopping flag.
if (rawResult === null) continue;
// rawResult is [[streamName, [[id, fields], ...]]]
// We only subscribed to one stream so we take the first element.
const streamEntries = rawResult[0]?.[1] ?? [];
if (streamEntries.length === 0) continue;
logger.debug({ stream, n: streamEntries.length }, 'batch consumed');
const records = decodeBatch(streamEntries, stream, logger, metrics);
const ackIds = await sink(records);
if (ackIds.length > 0) {
await redis.xack(stream, group, ...ackIds);
}
logger.debug(
{ stream, consumed: streamEntries.length, acked: ackIds.length },
'batch acked',
);
}
logger.info({ stream, group }, 'consumer loop exited');
}
async function start(): Promise<void> {
await ensureConsumerGroup(redis, stream, group, logger);
// Assign the running loop to inFlightBatch so stop() can await it.
// We deliberately do not await runLoop() here — start() returns once the
// loop has been kicked off (the group has been ensured), not when it ends.
inFlightBatch = runLoop();
// Propagate unhandled loop errors to the caller's process-level handler.
inFlightBatch.catch((err: unknown) => {
logger.fatal({ err }, 'consumer loop crashed unexpectedly; exiting');
process.exit(1);
});
}
async function stop(): Promise<void> {
stopping = true;
// Wait for the currently in-flight batch to complete. The loop will exit
// on the next iteration after the BLOCK timeout at most.
await inFlightBatch;
}
return { start, stop };
}
+114
View File
@@ -0,0 +1,114 @@
/**
* Per-device in-memory state store with LRU eviction.
*
* Maintains a bounded Map<device_id, DeviceState> updated on every accepted
* Position. When the map exceeds the LRU cap, the least-recently-updated entry
* is evicted. The LRU property is maintained without a third-party library by
* exploiting JavaScript's Map insertion-order guarantee: delete + set on update
* bumps the entry to the most-recent position in iteration order, and
* keys().next().value is always the oldest.
*
* Phase 3 adds rehydration: on first packet for an unknown device after a
* restart, query Postgres to seed last_position. Phase 1 accepts state loss on
* restart as a known limitation.
*/
import type { Logger } from 'pino';
import type { Config } from '../config/load.js';
import type { Position, DeviceState } from './types.js';
// ---------------------------------------------------------------------------
// Public interface
// ---------------------------------------------------------------------------
export type DeviceStateStore = {
/**
* Applies the position to the device's state, touches LRU order, and returns
* the new state. Creates the entry if this is the first position for the device.
* Evicts the least-recently-updated entry if the cap is exceeded.
*/
readonly update: (position: Position) => DeviceState;
/**
* Returns the current state for a device without touching LRU order.
* Returns undefined for unknown devices.
* Used for diagnostics; the hot path always goes through update().
*/
readonly get: (deviceId: string) => DeviceState | undefined;
/** Current number of devices in the store. */
readonly size: () => number;
/** Total evictions since startup. Used for metrics. */
readonly evictedTotal: () => number;
};
// ---------------------------------------------------------------------------
// Factory
// ---------------------------------------------------------------------------
export function createDeviceStateStore(config: Config, logger: Logger): DeviceStateStore {
const cap = config.DEVICE_STATE_LRU_CAP;
const store = new Map<string, DeviceState>();
let evicted = 0;
function update(position: Position): DeviceState {
const existing = store.get(position.device_id);
const newState: DeviceState = existing === undefined
? {
device_id: position.device_id,
last_position: position,
// last_seen is initialized from the position's device-reported timestamp,
// not the wall clock — we want to track device time, not ingestion time.
last_seen: position.timestamp,
position_count_session: 1,
}
: {
device_id: position.device_id,
last_position: position,
// last_seen only advances forward: devices can buffer offline records and
// replay them out of order. We observed 55-record bursts on stage where
// consecutive timestamps could decrease. last_seen must mean "highest
// device timestamp seen so far" to be useful for downstream logic.
last_seen: position.timestamp > existing.last_seen
? position.timestamp
: existing.last_seen,
position_count_session: existing.position_count_session + 1,
};
// Delete then set to bump this entry to the most-recent position in Map
// iteration order — O(1), no external LRU library needed.
store.delete(position.device_id);
store.set(position.device_id, newState);
// Evict the oldest entry (first in iteration order) if over cap.
if (store.size > cap) {
const oldestKey = store.keys().next().value;
if (oldestKey !== undefined) {
store.delete(oldestKey);
evicted++;
logger.debug(
{ evictedDevice: oldestKey, storeSize: store.size, cap },
'device state evicted (LRU)',
);
}
}
return newState;
}
function get(deviceId: string): DeviceState | undefined {
return store.get(deviceId);
}
function size(): number {
return store.size;
}
function evictedTotal(): number {
return evicted;
}
return { update, get, size, evictedTotal };
}
+234
View File
@@ -0,0 +1,234 @@
/**
* Position writer — batched upsert into the positions hypertable.
*
* Uses INSERT ... ON CONFLICT (device_id, ts) DO NOTHING for idempotency.
* Rows that were already present are identified by comparing the RETURNING rows
* against the input: anything absent from RETURNING was a duplicate (Postgres
* does not return conflicting rows with DO NOTHING).
*
* Internally chunks large batches into WRITE_BATCH_SIZE groups and runs them
* sequentially. Parallelism against the same pg.Pool is deliberately avoided to
* prevent starving the migration runner and health-check queries.
*/
import type pg from 'pg';
import type { Logger } from 'pino';
import type { Config } from '../config/load.js';
import type { Metrics, AttributeValue } from './types.js';
import type { ConsumedRecord } from './consumer.js';
// ---------------------------------------------------------------------------
// Public types
// ---------------------------------------------------------------------------
export type WriteResult = {
readonly id: string;
readonly status: 'inserted' | 'duplicate' | 'failed';
readonly error?: Error;
};
export type Writer = {
readonly write: (records: ConsumedRecord[]) => Promise<WriteResult[]>;
};
// ---------------------------------------------------------------------------
// Attribute serialization
// ---------------------------------------------------------------------------
/**
* Serializes the attributes map to a JSONB-safe string.
*
* JSON cannot represent bigint or Buffer natively, so we apply a custom replacer:
* - bigint → decimal string (lossless; readable in SQL)
* - Buffer → base64 string
*
* IMPORTANT: Buffer.prototype.toJSON() fires before JSON.stringify passes the
* value to the replacer, converting nested Buffer instances to
* { type: 'Buffer', data: [...] }. We handle both the direct instance case
* (for top-level or already-converted values) and the toJSON shape. This mirrors
* the pattern in tcp-ingestion's jsonReplacer.
*
* On-disk JSONB shape: bigints are stored as plain decimal strings, Buffers as
* plain base64 strings — without the sentinel wrappers (__bigint / __buffer_b64)
* used in the in-flight Redis stream format. Document this distinction in
* wiki/concepts/position-record.md as a follow-up.
*/
function serializeAttributes(attributes: Readonly<Record<string, AttributeValue>>): string {
return JSON.stringify(attributes, (_key, value: unknown) => {
if (typeof value === 'bigint') {
return value.toString();
}
// Direct Buffer / Uint8Array instance (e.g. top-level attribute values passed
// directly to the replacer before toJSON fires).
if (value instanceof Uint8Array) {
return Buffer.from(value).toString('base64');
}
// Buffer.toJSON() shape — what JSON.stringify passes to the replacer for
// Buffer instances nested inside objects, because toJSON fires first.
if (isBufferToJsonShape(value)) {
return Buffer.from(value.data).toString('base64');
}
return value;
});
}
type BufferToJsonShape = { type: 'Buffer'; data: number[] };
function isBufferToJsonShape(value: unknown): value is BufferToJsonShape {
return (
typeof value === 'object' &&
value !== null &&
(value as Record<string, unknown>)['type'] === 'Buffer' &&
Array.isArray((value as Record<string, unknown>)['data'])
);
}
// ---------------------------------------------------------------------------
// SQL builder
// ---------------------------------------------------------------------------
/**
* Builds the multi-row INSERT statement for a chunk of records.
* Returns the SQL string and the flat params array.
*
* Column order (11 per row):
* device_id, ts, latitude, longitude, altitude, angle, speed,
* satellites, priority, codec, attributes
*
* Postgres parameter limit: 65535 / 11 = ~5957 rows max. WRITE_BATCH_SIZE=50
* is well under this cap.
*/
function buildInsertSql(records: ConsumedRecord[]): { sql: string; params: unknown[] } {
const params: unknown[] = [];
const valueClauses: string[] = [];
const COLS_PER_ROW = 11;
for (let i = 0; i < records.length; i++) {
const record = records[i];
if (record === undefined) continue;
const base = i * COLS_PER_ROW + 1;
valueClauses.push(
`($${base}, $${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10})`,
);
params.push(
record.position.device_id, // $N+0 device_id
record.position.timestamp, // $N+1 ts
record.position.latitude, // $N+2 latitude
record.position.longitude, // $N+3 longitude
record.position.altitude, // $N+4 altitude
record.position.angle, // $N+5 angle
record.position.speed, // $N+6 speed
record.position.satellites, // $N+7 satellites
record.position.priority, // $N+8 priority
record.codec, // $N+9 codec
serializeAttributes(record.position.attributes), // $N+10 attributes (JSONB)
);
}
const sql = `
INSERT INTO positions (device_id, ts, latitude, longitude, altitude, angle, speed, satellites, priority, codec, attributes)
VALUES ${valueClauses.join(',\n ')}
ON CONFLICT (device_id, ts) DO NOTHING
RETURNING device_id, ts
`;
return { sql, params };
}
// ---------------------------------------------------------------------------
// Result mapper
// ---------------------------------------------------------------------------
type ReturningRow = {
device_id: string;
ts: Date;
};
/**
* Maps RETURNING rows back to per-record WriteResult entries.
*
* RETURNING only includes successfully inserted rows — conflicting rows are
* silently omitted by ON CONFLICT DO NOTHING. We identify duplicates by
* comparing (device_id, ts) pairs from the input against the returned rows.
*/
function mapResults(records: ConsumedRecord[], returnedRows: ReturningRow[]): WriteResult[] {
// Build a set of (device_id, ts-ms) pairs from the RETURNING rows for O(1) lookup.
const insertedSet = new Set<string>(
returnedRows.map((row) => `${row.device_id}|${row.ts.getTime()}`),
);
return records.map((record) => {
const key = `${record.position.device_id}|${record.position.timestamp.getTime()}`;
const status: 'inserted' | 'duplicate' = insertedSet.has(key) ? 'inserted' : 'duplicate';
return { id: record.id, status };
});
}
// ---------------------------------------------------------------------------
// Writer factory
// ---------------------------------------------------------------------------
export function createWriter(
pool: pg.Pool,
config: Config,
logger: Logger,
metrics: Metrics,
): Writer {
const writeBatchSize = config.WRITE_BATCH_SIZE;
async function writeChunk(chunk: ConsumedRecord[]): Promise<WriteResult[]> {
const startMs = Date.now();
const { sql, params } = buildInsertSql(chunk);
let rows: ReturningRow[];
try {
const result = await pool.query<ReturningRow>(sql, params);
rows = result.rows;
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err));
logger.error({ err, chunkSize: chunk.length }, 'position write failed');
metrics.inc('processor_position_writes_total', { status: 'failed' });
return chunk.map((record) => ({ id: record.id, status: 'failed' as const, error }));
}
const results = mapResults(chunk, rows);
const insertedCount = results.filter((r) => r.status === 'inserted').length;
const duplicateCount = results.filter((r) => r.status === 'duplicate').length;
metrics.inc('processor_position_writes_total', { status: 'inserted' });
metrics.inc('processor_position_writes_total', { status: 'duplicate' });
metrics.observe('processor_position_write_duration_seconds', (Date.now() - startMs) / 1_000);
logger.debug(
{ inserted: insertedCount, duplicates: duplicateCount, chunkSize: chunk.length },
'batch written',
);
return results;
}
async function write(records: ConsumedRecord[]): Promise<WriteResult[]> {
if (records.length === 0) return [];
// Split into chunks of writeBatchSize and run sequentially.
// Sequential execution avoids starving other pg.Pool users (migration runner,
// /readyz health check). The default BATCH_SIZE=100 with WRITE_BATCH_SIZE=50
// produces at most 2 sequential queries per consumer tick.
const allResults: WriteResult[] = [];
for (let offset = 0; offset < records.length; offset += writeBatchSize) {
const chunk = records.slice(offset, offset + writeBatchSize);
const chunkResults = await writeChunk(chunk);
allResults.push(...chunkResults);
}
return allResults;
}
return { write };
}
+156 -2
View File
@@ -1,6 +1,15 @@
import type { Redis } from 'ioredis';
import type pg from 'pg';
import { loadConfig } from './config/load.js';
import type { Config } from './config/load.js';
import { createLogger } from './observability/logger.js';
import { createPool, connectWithRetry } from './db/pool.js';
import { runMigrations } from './db/migrate.js';
import { connectRedis, createConsumer } from './core/consumer.js';
import type { ConsumedRecord } from './core/consumer.js';
import { createDeviceStateStore } from './core/state.js';
import { createWriter } from './core/writer.js';
import type { Metrics } from './core/types.js';
// -------------------------------------------------------------------------
// Startup: validate config (fail fast on bad env), build logger
@@ -24,5 +33,150 @@ const logger = createLogger({
logger.info('processor starting');
// Consumer, writer, and state wiring land in tasks 1.51.8.
process.exit(0);
// -------------------------------------------------------------------------
// Metrics placeholder shim (task 1.9 replaces this with prom-client)
//
// Uses trace-level logging so the calls are observable in development but
// are silent in production builds where the log level is info or higher.
// This mirrors tcp-ingestion's approach before task 1.10 landed there.
// -------------------------------------------------------------------------
const metrics: Metrics = {
inc: (name: string, labels?: Record<string, string>) => {
logger.trace({ metric: name, labels }, 'metrics.inc');
},
observe: (name: string, value: number, labels?: Record<string, string>) => {
logger.trace({ metric: name, value, labels }, 'metrics.observe');
},
};
// -------------------------------------------------------------------------
// Wire up the pipeline
// -------------------------------------------------------------------------
async function main(): Promise<void> {
// 1. Connect Postgres with exponential-backoff retry
const pool = createPool(config.POSTGRES_URL);
await connectWithRetry(pool, logger);
// 2. Run migrations before any consumer activity.
// Phase 1 limitation: multiple instances starting simultaneously both try
// to migrate. Postgres advisory locks would solve this — deferred to Phase 3
// (production hardening), which is acceptable for the Phase 1 single-instance
// pilot.
await runMigrations(pool, logger);
logger.info('migrations applied');
// 3. Connect Redis with exponential-backoff retry
const redis: Redis = await connectRedis(config.REDIS_URL, logger);
// 4. Build pipeline components
const state = createDeviceStateStore(config, logger);
const writer = createWriter(pool, config, logger, metrics);
// 5. Define the sink: central decision point for state update and Postgres write.
// State is updated BEFORE the write so that in-memory state is consistent with
// what has been seen, even if the Postgres write subsequently fails. If the write
// fails the record stays pending (not ACKed) and will be re-delivered — applying
// the same position twice to state is idempotent for last_position and last_seen;
// only position_count_session is double-counted, which is a session counter that
// resets on restart and is not a correctness concern.
const sink = async (records: ConsumedRecord[]): Promise<string[]> => {
// 5a. Update in-memory state for every record (cheap, synchronous-like, cannot
// fail meaningfully — Map operations do not throw).
for (const record of records) {
state.update(record.position);
}
// 5b. Write to Postgres
const results = await writer.write(records);
// 5c. ACK only the IDs that succeeded or were already present.
// 'failed' records are deliberately left pending for retry.
return results
.filter((r) => r.status === 'inserted' || r.status === 'duplicate')
.map((r) => r.id);
};
// 6. Build and start the consumer
const consumer = createConsumer(redis, config, logger, metrics, sink);
await consumer.start();
// 7. Install graceful shutdown stub.
// Full Phase 3 hardening: explicit consumer-group commit on SIGTERM,
// uncaught-exception handler, multi-instance drain mode.
installGracefulShutdown({ redis, pool, consumer, logger });
logger.info(
{
stream: config.REDIS_TELEMETRY_STREAM,
group: config.REDIS_CONSUMER_GROUP,
consumer: config.REDIS_CONSUMER_NAME,
},
'processor ready',
);
}
// -------------------------------------------------------------------------
// Graceful shutdown stub — Phase 3 finalizes this
// -------------------------------------------------------------------------
type ShutdownDeps = {
readonly redis: Redis;
readonly pool: pg.Pool;
readonly consumer: { stop: () => Promise<void> };
readonly logger: ReturnType<typeof createLogger>;
};
function installGracefulShutdown(deps: ShutdownDeps): void {
const { redis, pool, consumer, logger: log } = deps;
let shuttingDown = false;
function shutdown(signal: string): void {
if (shuttingDown) return;
shuttingDown = true;
log.info({ signal }, 'shutdown signal received');
// Stop consumer loop — exits after the current batch finishes.
consumer
.stop()
.then(() => {
log.info('consumer stopped');
return redis.quit();
})
.then(() => {
log.info('Redis disconnected');
return pool.end();
})
.then(() => {
log.info('graceful shutdown complete');
process.exit(0);
})
.catch((err: unknown) => {
log.error({ err }, 'error during shutdown');
process.exit(1);
});
// Force exit after 15s if the graceful path stalls (e.g. a hung Postgres write).
setTimeout(() => {
log.warn('forced exit after shutdown timeout');
process.exit(1);
}, 15_000).unref();
}
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
}
// -------------------------------------------------------------------------
// Entry point
// -------------------------------------------------------------------------
main().catch((err: unknown) => {
process.stderr.write(
`Fatal startup error: ${err instanceof Error ? err.message : String(err)}\n`,
);
process.exit(1);
});