# Task 1.7 — Position writer (batched upsert) **Phase:** 1 — Throughput pipeline **Status:** 🟩 Done **Depends on:** 1.2, 1.4 **Wiki refs:** `docs/wiki/entities/postgres-timescaledb.md` ## Goal Write batches of `Position` records into the `positions` hypertable using `INSERT ... ON CONFLICT (device_id, ts) DO NOTHING` for idempotency. Return per-record success/failure so the consumer (task 1.8) can decide what to ACK. ## Deliverables - `src/core/writer.ts` exporting: - `createWriter(pool, config, logger, metrics): Writer` — factory. - `Writer` interface: - `write(records: ConsumedRecord[]): Promise` — inserts the batch, returns per-record results: `{ id: string; status: 'inserted' | 'duplicate' | 'failed'; error?: Error }`. - `test/writer.test.ts` (mocked `pg.Pool`): - Happy path: all records insert. - Duplicate-key: `ON CONFLICT DO NOTHING` returns `'duplicate'` for those records. - Mixed: half new, half duplicate. - Pool error: all records in the batch return `'failed'`. - Bigint attribute is stringified before serialization. - Buffer attribute is base64-encoded before serialization. ## Specification ### SQL pattern Use a single multi-row `INSERT` per batch with `RETURNING (xmax = 0) AS inserted`: ```sql INSERT INTO positions (device_id, ts, latitude, longitude, altitude, angle, speed, satellites, priority, codec, attributes) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11), ($12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22), ... ON CONFLICT (device_id, ts) DO NOTHING RETURNING device_id, ts, (xmax = 0) AS inserted; ``` `xmax = 0` is true for newly-inserted rows, false for ones that hit `ON CONFLICT`. The `RETURNING` rows give us a lookup of which `(device_id, ts)` pairs were inserted vs. duplicates. **Note:** rows that hit the conflict are NOT returned (Postgres doesn't return them with `ON CONFLICT DO NOTHING`). To distinguish duplicate from "new but hit a unique violation later," compare the returned rows against the input by `(device_id, ts)`. Anything in the input but missing from RETURNING is a `'duplicate'`. ### bigint and Buffer attribute encoding Per task 1.4, `jsonb` storage: - `bigint` → JSON string. Use a custom replacer in `JSON.stringify`: ```ts JSON.stringify(attributes, (_k, v) => typeof v === 'bigint' ? v.toString() : Buffer.isBuffer(v) ? v.toString('base64') : v ); ``` - `Buffer` → base64 string. Document this in `wiki/concepts/position-record.md` as a follow-up — the on-disk shape differs slightly from the in-flight shape because JSON can't hold bigints or bytes natively. ### Batching strategy The consumer (task 1.8) calls `write(batch)` with whatever batch the consumer received from `XREADGROUP`. Phase 1 doesn't internally batch further — the consumer's batch size (`BATCH_SIZE`, default 100) is the writer's batch size. If `BATCH_SIZE > WRITE_BATCH_SIZE` (default 50), the writer chunks internally: split the input into chunks of `WRITE_BATCH_SIZE`, run them sequentially. Don't parallelize chunks against the same Pool — `pg.Pool` has bounded connections and we don't want to starve other queries (the migration runner, `/readyz` health checks, etc.). ### Per-record status The consumer (task 1.8) takes the `WriteResult[]` and decides ACK: - `'inserted'` and `'duplicate'` → ACK (we got the data into Postgres or already had it). - `'failed'` → do not ACK (let it stay pending for retry). If a transaction-wide failure occurs (Pool dead, transient network), all records in the chunk get `'failed'`. The consumer treats them all the same. ### Metrics emitted by this module - `processor_position_writes_total{status="inserted"|"duplicate"|"failed"}` — counter - `processor_position_write_duration_seconds` — histogram (per-batch latency) ## Acceptance criteria - [ ] `pnpm typecheck`, `pnpm lint`, `pnpm test` clean. - [ ] Mocked-Pool test verifies SQL parameter ordering and types are correct. - [ ] Bigint and Buffer attributes serialize as expected via the JSON.stringify replacer. - [ ] Mixed insert/conflict batch produces correct per-record `WriteResult[]`. - [ ] Pool error → all records get `'failed'`; metrics reflect this. ## Risks / open questions - **Parameter limit.** Postgres protocol allows max 65535 parameters per statement. With 11 columns per row, that caps us at ~5957 rows per statement. `WRITE_BATCH_SIZE=50` is well under. If the cap is ever raised, document the formula. - **`RETURNING` cost.** On a hypertable with many chunks, `RETURNING` has near-zero overhead. Verify with a benchmark in task 1.10 (integration test). ## Done `src/core/writer.ts` — multi-row INSERT with RETURNING, duplicate detection by (device_id, ts) set diff, sequential chunking, bigint/Buffer attribute serialization (handles Buffer.toJSON shape). `test/writer.test.ts` — 14 tests covering happy path, all-duplicate, mixed, pool error, chunk split, Buffer base64, bigint string, parameter ordering, metrics. Landed in `68d3da3`. **Note:** The spec's `RETURNING (xmax = 0) AS inserted` idiom was replaced with a simpler set-difference approach — compare RETURNING rows against input by (device_id, ts). The xmax approach is mentioned in the spec but then immediately qualified: "rows that hit the conflict are NOT returned." The set-diff is cleaner and avoids confusion. The spec's own Note section confirms this is the right approach.