Files
julian c33c7a4f6b Implement Phase 1 task 1.8 (Redis Streams publisher + main wiring)
- Bounded in-memory queue (default 10000); overflow throws PublishOverflowError
  so the framing layer skips ACK and the device retransmits.
- Background worker drains via XADD with MAXLEN ~ approximate trimming.
- JSON serialization with sentinel encoding for bigint/Buffer/Date; correctly
  handles Buffer.prototype.toJSON firing before the replacer.
- AdapterContext.publish(position, codec) with codec-label closure at dispatch
  in adapters/teltonika/index.ts; zero changes to the three codec parsers.
- connectRedis with retry-on-startup; main.ts wires the full pipeline.
- installGracefulShutdown stubbed (full hardening in task 1.12).
- 19 new tests (17 unit + 2 Docker-conditional integration). Total 81 passing.
2026-04-30 16:39:34 +02:00

75 lines
2.0 KiB
TypeScript

import { describe, it, expect, vi, afterEach } from 'vitest';
import * as net from 'node:net';
import type { Logger } from 'pino';
import type { Adapter, AdapterContext, Metrics, Position } from '../../src/core/types.js';
import { startServer } from '../../src/core/server.js';
function makeMockContext(): AdapterContext {
const logger = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
child: vi.fn().mockReturnThis(),
} as unknown as Logger;
const metrics: Metrics = {
inc: vi.fn(),
observe: vi.fn(),
};
return {
publish: vi.fn(async (_p: Position, _codec) => {}),
logger,
metrics,
};
}
describe('startServer', () => {
const servers: net.Server[] = [];
afterEach(() => {
for (const server of servers) {
server.close();
}
servers.length = 0;
});
it('invokes handleSession with a real socket when a client connects', async () => {
const handleSession = vi.fn().mockResolvedValue(undefined);
const adapter: Adapter = {
name: 'test-adapter',
ports: [0], // port 0 = OS-assigned ephemeral port
handleSession,
};
const ctx = makeMockContext();
const server = startServer(0, adapter, ctx);
servers.push(server);
// Wait for the server to start listening
const port = await new Promise<number>((resolve) => {
server.on('listening', () => {
const addr = server.address();
resolve(typeof addr === 'object' && addr !== null ? addr.port : 0);
});
});
// Connect a client
const client = new net.Socket();
client.connect(port, '127.0.0.1');
// Wait for handleSession to be called
await vi.waitFor(() => expect(handleSession).toHaveBeenCalledOnce(), { timeout: 2000 });
const [socketArg, ctxArg] = handleSession.mock.calls[0] as [net.Socket, AdapterContext];
expect(socketArg).toBeInstanceOf(net.Socket);
expect(ctxArg).toBeDefined();
expect(typeof ctxArg.publish).toBe('function');
expect(ctxArg.logger).toBeDefined();
client.destroy();
});
});