b3d6410af6
Adds snapshot provider that queries the latest non-faulty position per device registered to an event, returned in the `subscribed` reply so the SPA map is populated immediately rather than waiting for the first live broadcast batch. Key changes: - src/live/snapshot.ts: createSnapshotProvider factory using DISTINCT ON (device_id) ... ORDER BY device_id, ts DESC with WHERE faulty=false; converts Date ts to epoch ms; omits speed/course when 0 (matching broadcast convention) - src/main.ts: injects createSnapshotProvider(pool) into createSubscriptionRegistry - test/live-snapshot.test.ts: 7 unit tests covering: two-device result, empty event, faulty exclusion, DISTINCT ON semantics, parameterized query, metrics observation, and error propagation The snapshot query requires the positions_device_ts_idx created in migration 0002 (task 1.5.4). Snapshot failures fail open — registry.fetchSnapshot returns [] so the subscription still succeeds with an empty initial state.
312 lines
11 KiB
TypeScript
312 lines
11 KiB
TypeScript
import type * as http from 'node:http';
|
|
import type { Redis } from 'ioredis';
|
|
import type pg from 'pg';
|
|
import { loadConfig } from './config/load.js';
|
|
import type { Config } from './config/load.js';
|
|
import { createLogger } from './observability/logger.js';
|
|
import {
|
|
createMetrics,
|
|
startMetricsServer,
|
|
createPostgresHealthCheck,
|
|
createConsumerLagSampler,
|
|
} from './observability/metrics.js';
|
|
import { createPool, connectWithRetry } from './db/pool.js';
|
|
import { runMigrations } from './db/migrate.js';
|
|
import { connectRedis, createConsumer } from './core/consumer.js';
|
|
import type { ConsumedRecord } from './core/consumer.js';
|
|
import { createDeviceStateStore } from './core/state.js';
|
|
import { createWriter } from './core/writer.js';
|
|
import { createLiveServer } from './live/server.js';
|
|
import type { LiveServer, LiveConnection } from './live/server.js';
|
|
import type { InboundMessage } from './live/protocol.js';
|
|
import { createAuthClient } from './live/auth.js';
|
|
import { createAuthzClient } from './live/authz.js';
|
|
import { createSubscriptionRegistry } from './live/registry.js';
|
|
import { createBroadcastConsumer } from './live/broadcast.js';
|
|
import { createDeviceEventMap } from './live/device-event-map.js';
|
|
import { createSnapshotProvider } from './live/snapshot.js';
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Startup: validate config (fail fast on bad env), build logger
|
|
// -------------------------------------------------------------------------
|
|
|
|
let config: Config;
|
|
try {
|
|
config = loadConfig();
|
|
} catch (err) {
|
|
// Config validation failures print a human-readable message and exit 1.
|
|
// Logger is not available yet — process.stderr is the only output channel.
|
|
process.stderr.write(`${err instanceof Error ? err.message : String(err)}\n`);
|
|
process.exit(1);
|
|
}
|
|
|
|
const logger = createLogger({
|
|
level: config.LOG_LEVEL,
|
|
nodeEnv: config.NODE_ENV,
|
|
instanceId: config.INSTANCE_ID,
|
|
});
|
|
|
|
logger.info('processor starting');
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Wire up the pipeline
|
|
// -------------------------------------------------------------------------
|
|
|
|
async function main(): Promise<void> {
|
|
// 1. Build real prom-client metrics (replaces the trace-log shim from
|
|
// pre-1.9 main.ts). Metrics are wired before any I/O so that counters
|
|
// start at zero from the moment the process starts.
|
|
const metrics = createMetrics();
|
|
|
|
// 2. Connect Postgres with exponential-backoff retry
|
|
const pool = createPool(config.POSTGRES_URL);
|
|
await connectWithRetry(pool, logger);
|
|
|
|
// 3. Run migrations before any consumer activity.
|
|
// Phase 1 limitation: multiple instances starting simultaneously both try
|
|
// to migrate. Postgres advisory locks would solve this — deferred to Phase 3
|
|
// (production hardening), which is acceptable for the Phase 1 single-instance
|
|
// pilot.
|
|
await runMigrations(pool, logger);
|
|
logger.info('migrations applied');
|
|
|
|
// 4. Connect Redis with exponential-backoff retry
|
|
const redis: Redis = await connectRedis(config.REDIS_URL, logger);
|
|
|
|
// 5. Build pipeline components
|
|
const state = createDeviceStateStore(config, logger, metrics);
|
|
const writer = createWriter(pool, config, logger, metrics);
|
|
|
|
// 6. Postgres health check — background cached SELECT 1 for /readyz.
|
|
// The check starts probing immediately so /readyz is accurate from the
|
|
// first request after the metrics server starts listening.
|
|
const pgHealth = createPostgresHealthCheck(pool);
|
|
|
|
// 7. Start metrics HTTP server.
|
|
// Bound before the consumer starts so /healthz responds even during the
|
|
// brief window between metrics-server start and first stream read.
|
|
const metricsServer: http.Server = startMetricsServer(
|
|
config.METRICS_PORT,
|
|
() => metrics.serializeMetrics(),
|
|
{
|
|
isRedisReady: () => redis.status === 'ready',
|
|
isPostgresReady: pgHealth.isReady,
|
|
},
|
|
);
|
|
logger.info({ port: config.METRICS_PORT }, 'metrics server listening');
|
|
|
|
// 8. Start consumer lag sampler (background interval, every 10 s).
|
|
const lagSampler = createConsumerLagSampler(
|
|
redis,
|
|
config.REDIS_TELEMETRY_STREAM,
|
|
config.REDIS_CONSUMER_GROUP,
|
|
metrics,
|
|
(msg) => logger.debug(msg),
|
|
);
|
|
|
|
// 9. Define the sink: central decision point for state update and Postgres write.
|
|
// State is updated BEFORE the write so that in-memory state is consistent with
|
|
// what has been seen, even if the Postgres write subsequently fails. If the write
|
|
// fails the record stays pending (not ACKed) and will be re-delivered — applying
|
|
// the same position twice to state is idempotent for last_position and last_seen;
|
|
// only position_count_session is double-counted, which is a session counter that
|
|
// resets on restart and is not a correctness concern.
|
|
const sink = async (records: ConsumedRecord[]): Promise<string[]> => {
|
|
// 9a. Update in-memory state for every record (cheap, synchronous-like, cannot
|
|
// fail meaningfully — Map operations do not throw).
|
|
for (const record of records) {
|
|
state.update(record.position);
|
|
}
|
|
|
|
// 9b. Emit device-state gauges (sampled per-batch; cheap).
|
|
metrics.observe('processor_device_state_size', state.size());
|
|
|
|
// 9c. Write to Postgres
|
|
const results = await writer.write(records);
|
|
|
|
// 9d. ACK only the IDs that succeeded or were already present.
|
|
// 'failed' records are deliberately left pending for retry.
|
|
const ackIds = results
|
|
.filter((r) => r.status === 'inserted' || r.status === 'duplicate')
|
|
.map((r) => r.id);
|
|
|
|
if (ackIds.length > 0) {
|
|
metrics.inc('processor_acks_total', undefined, ackIds.length);
|
|
}
|
|
|
|
return ackIds;
|
|
};
|
|
|
|
// 10. Build the live WebSocket server (tasks 1.5.2 and 1.5.3).
|
|
const authClient = createAuthClient(config, logger, metrics);
|
|
const authzClient = createAuthzClient(config, logger, metrics);
|
|
const snapshotProvider = createSnapshotProvider(pool, logger, metrics);
|
|
const registry = createSubscriptionRegistry(authzClient, config, logger, metrics, snapshotProvider);
|
|
|
|
const messageHandler = async (
|
|
conn: LiveConnection,
|
|
message: InboundMessage,
|
|
): Promise<void> => {
|
|
if (message.type === 'subscribe') {
|
|
await registry.subscribe(conn, message.topic, message.id);
|
|
} else if (message.type === 'unsubscribe') {
|
|
registry.unsubscribe(conn, message.topic, message.id);
|
|
}
|
|
};
|
|
|
|
const liveServer: LiveServer = createLiveServer(
|
|
config,
|
|
logger,
|
|
metrics,
|
|
messageHandler,
|
|
(conn) => registry.onConnectionClose(conn),
|
|
authClient,
|
|
);
|
|
|
|
// 10b. Build the device-event map (Postgres-backed, periodic refresh).
|
|
const deviceEventMap = createDeviceEventMap(pool, config, logger, metrics);
|
|
|
|
// 10c. Build the broadcast consumer (per-instance consumer group fan-out).
|
|
const broadcastConsumer = createBroadcastConsumer(
|
|
redis,
|
|
registry,
|
|
deviceEventMap,
|
|
config,
|
|
logger,
|
|
metrics,
|
|
);
|
|
await liveServer.start();
|
|
await deviceEventMap.start();
|
|
await broadcastConsumer.start();
|
|
|
|
// 11. Build and start the durable-write consumer
|
|
const consumer = createConsumer(redis, config, logger, metrics, sink);
|
|
await consumer.start();
|
|
|
|
// 12. Install graceful shutdown.
|
|
// Shutdown order: live server first (no new connections),
|
|
// then broadcast consumer, then durable-write consumer last.
|
|
installGracefulShutdown({
|
|
redis,
|
|
pool,
|
|
consumer,
|
|
broadcastConsumer,
|
|
deviceEventMap,
|
|
liveServer,
|
|
metricsServer,
|
|
pgHealth,
|
|
lagSampler,
|
|
logger,
|
|
});
|
|
|
|
logger.info(
|
|
{
|
|
stream: config.REDIS_TELEMETRY_STREAM,
|
|
group: config.REDIS_CONSUMER_GROUP,
|
|
consumer: config.REDIS_CONSUMER_NAME,
|
|
metricsPort: config.METRICS_PORT,
|
|
wsPort: config.LIVE_WS_PORT,
|
|
},
|
|
'processor ready',
|
|
);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Graceful shutdown — Phase 3 finalizes this
|
|
// -------------------------------------------------------------------------
|
|
|
|
type ShutdownDeps = {
|
|
readonly redis: Redis;
|
|
readonly pool: pg.Pool;
|
|
readonly consumer: { stop: () => Promise<void> };
|
|
readonly broadcastConsumer: { stop: () => Promise<void> };
|
|
readonly deviceEventMap: { stop: () => void };
|
|
readonly liveServer: LiveServer;
|
|
readonly metricsServer: http.Server;
|
|
readonly pgHealth: { stop: () => void };
|
|
readonly lagSampler: { stop: () => void };
|
|
readonly logger: ReturnType<typeof createLogger>;
|
|
};
|
|
|
|
function installGracefulShutdown(deps: ShutdownDeps): void {
|
|
const {
|
|
redis, pool, consumer, broadcastConsumer, deviceEventMap,
|
|
liveServer, metricsServer, pgHealth, lagSampler, logger: log,
|
|
} = deps;
|
|
|
|
let shuttingDown = false;
|
|
|
|
function shutdown(signal: string): void {
|
|
if (shuttingDown) return;
|
|
shuttingDown = true;
|
|
|
|
log.info({ signal }, 'shutdown signal received');
|
|
|
|
// Cancel background intervals first — they hold no resources that need
|
|
// draining, and stopping them early prevents spurious log noise during
|
|
// the shutdown sequence.
|
|
lagSampler.stop();
|
|
pgHealth.stop();
|
|
|
|
// Shutdown order:
|
|
// 1. Live server — stop accepting new connections and drain existing ones
|
|
// first, so clients know the server is going away before the consumer
|
|
// stops processing.
|
|
// 2. Durable-write consumer — lets the in-flight batch finish.
|
|
// 3. Metrics server, Redis, Postgres.
|
|
liveServer
|
|
.stop()
|
|
.then(() => {
|
|
log.info('live server stopped');
|
|
deviceEventMap.stop();
|
|
return broadcastConsumer.stop();
|
|
})
|
|
.then(() => {
|
|
log.info('broadcast consumer stopped');
|
|
return consumer.stop();
|
|
})
|
|
.then(() => {
|
|
log.info('consumer stopped');
|
|
return new Promise<void>((resolve, reject) =>
|
|
metricsServer.close((err) => (err ? reject(err) : resolve())),
|
|
);
|
|
})
|
|
.then(() => {
|
|
log.info('metrics server closed');
|
|
return redis.quit();
|
|
})
|
|
.then(() => {
|
|
log.info('Redis disconnected');
|
|
return pool.end();
|
|
})
|
|
.then(() => {
|
|
log.info('graceful shutdown complete');
|
|
process.exit(0);
|
|
})
|
|
.catch((err: unknown) => {
|
|
log.error({ err }, 'error during shutdown');
|
|
process.exit(1);
|
|
});
|
|
|
|
// Force exit after 15s if the graceful path stalls (e.g. a hung Postgres write).
|
|
setTimeout(() => {
|
|
log.warn('forced exit after shutdown timeout');
|
|
process.exit(1);
|
|
}, 15_000).unref();
|
|
}
|
|
|
|
process.on('SIGTERM', () => shutdown('SIGTERM'));
|
|
process.on('SIGINT', () => shutdown('SIGINT'));
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Entry point
|
|
// -------------------------------------------------------------------------
|
|
|
|
main().catch((err: unknown) => {
|
|
process.stderr.write(
|
|
`Fatal startup error: ${err instanceof Error ? err.message : String(err)}\n`,
|
|
);
|
|
process.exit(1);
|
|
});
|