diff --git a/.changeset/price-history-snapshots.md b/.changeset/price-history-snapshots.md new file mode 100644 index 00000000..813834e2 --- /dev/null +++ b/.changeset/price-history-snapshots.md @@ -0,0 +1,5 @@ +--- +"lens": minor +--- + +Add `/prices/history` endpoint backed by 1-minute price snapshots. A new `price_snapshots` table is appended to every minute by a snapshot ingester, queryable over a `[from, to]` window with optional `5m`/`1h` aggregation. A retention job prunes snapshots older than 30 days. diff --git a/README.md b/README.md index 051fe8f4..1a89c1ab 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ Aggregates price data from Stellar's Classic Order Book (SDEX) and AMM Liquidity | GET | `/price/:assetA/:assetB` | Current VWAP, 24h volume, best route | | GET | `/price/:assetA/:assetB/route?amount=1000` | Best execution route for a given amount | | GET | `/price/:assetA/:assetB/history?window=1h` | OHLCV history (`1m`, `5m`, `1h`, `24h`) | +| GET | `/prices/history?pair=XLM/USDC&from=…&to=…&interval=1m` | Historical 1-minute price snapshots, optionally aggregated (`1m`, `5m`, `1h`); ~30-day retention | | GET | `/pools` | Active AMM pools being watched | | GET | `/pairs` | Watched trading pairs | | GET | `/status` | Indexer health | diff --git a/openapi.yaml b/openapi.yaml index 4725749a..81a995ee 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -278,6 +278,81 @@ paths: '429': $ref: '#/components/responses/TooManyRequests' + /prices/history: + get: + operationId: getPriceHistory + summary: Get historical 1-minute price snapshots + description: >- + Returns minute-resolution price snapshots for a pair over [from, to], + optionally aggregated into 5m or 1h buckets. Snapshots are retained for + ~30 days. Gated by x402 (matches the /price prefix). + security: + - x402: [] + parameters: + - name: pair + in: query + required: true + schema: + type: string + example: XLM/USDC + - name: interval + in: query + required: false + schema: + type: string + enum: [1m, 5m, 1h] + default: 1m + - name: from + in: query + required: false + schema: + type: string + format: date-time + - name: to + in: query + required: false + schema: + type: string + format: date-time + responses: + '200': + description: Historical price snapshots + content: + application/json: + schema: + type: object + properties: + pair: + type: string + interval: + type: string + from: + type: string + format: date-time + to: + type: string + format: date-time + count: + type: integer + points: + type: array + items: + type: object + properties: + ts: + type: string + format: date-time + price: + type: number + volume: + type: number + '400': + description: Invalid parameters + '402': + $ref: '#/components/responses/PaymentRequired' + '429': + $ref: '#/components/responses/TooManyRequests' + /pairs: get: operationId: listPairs diff --git a/prisma/schema.prisma b/prisma/schema.prisma index a2c8f2b3..e58c80a2 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -67,6 +67,17 @@ model PriceAggregate { @@map("price_aggregates") } +model PriceSnapshot { + pair String + ts DateTime + price Decimal @db.Decimal(36, 18) + volume Decimal @default(0) @db.Decimal(36, 7) + + @@id([pair, ts]) + @@index([pair, ts]) + @@map("price_snapshots") +} + model IndexerState { id String @id lastCursor String? @map("last_cursor") diff --git a/sql/schema.sql b/sql/schema.sql index ecce9704..b03d98bd 100644 --- a/sql/schema.sql +++ b/sql/schema.sql @@ -55,6 +55,20 @@ CREATE TABLE IF NOT EXISTS price_aggregates ( PRIMARY KEY (pair_key, "window", bucket) ); +-- 1-minute price snapshot ring buffer. +-- The ingester appends one row per watched pair per minute; a retention job +-- prunes rows older than 30 days. Powers the /prices/history endpoint (charts, +-- backtests, audit trails) without paying the cost of scanning raw price_points. +CREATE TABLE IF NOT EXISTS price_snapshots ( + pair TEXT NOT NULL, + ts TIMESTAMPTZ NOT NULL, + price NUMERIC(36, 18) NOT NULL, + volume NUMERIC(36, 7) NOT NULL DEFAULT 0, + PRIMARY KEY (pair, ts) +); + +CREATE INDEX IF NOT EXISTS idx_price_snapshots_pair_ts ON price_snapshots (pair, ts); + -- Indexer cursor state CREATE TABLE IF NOT EXISTS indexer_state ( id TEXT PRIMARY KEY, diff --git a/src/__tests__/history.test.ts b/src/__tests__/history.test.ts new file mode 100644 index 00000000..94c8bf6c --- /dev/null +++ b/src/__tests__/history.test.ts @@ -0,0 +1,164 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import Fastify from 'fastify' + +const { mockQuery } = vi.hoisted(() => ({ mockQuery: vi.fn() })) + +vi.mock('../db', () => ({ + prisma: {}, + pgPool: { query: mockQuery }, +})) + +import { registerHistoryRoutes, HISTORY_INTERVAL_SECONDS, MAX_HISTORY_POINTS } from '../api/history' + +async function buildApp() { + const app = Fastify({ logger: false }) + await registerHistoryRoutes(app) + await app.ready() + return app +} + +function makeRow(overrides: Record = {}) { + return { + bucket: new Date('2025-01-01T00:00:00Z'), + price: '1.25', + volume: '500', + ...overrides, + } +} + +describe('GET /prices/history', () => { + beforeEach(() => { + mockQuery.mockReset() + }) + + it('returns points with ts/price/volume and a count matching the rows', async () => { + const app = await buildApp() + mockQuery.mockResolvedValue({ + rows: [ + makeRow(), + makeRow({ bucket: new Date('2025-01-01T00:01:00Z'), price: '1.30', volume: '100' }), + ], + }) + + const res = await app.inject({ method: 'GET', url: '/prices/history?pair=XLM/USDC&interval=1m' }) + + expect(res.statusCode).toBe(200) + const body = res.json() + expect(body.pair).toBe('XLM/USDC') + expect(body.interval).toBe('1m') + expect(body.count).toBe(2) + expect(body.points).toHaveLength(2) + expect(body.points[0]).toEqual({ + ts: '2025-01-01T00:00:00.000Z', + price: 1.25, + volume: 500, + }) + }) + + it('defaults to 1m interval when none is given', async () => { + const app = await buildApp() + mockQuery.mockResolvedValue({ rows: [] }) + + await app.inject({ method: 'GET', url: '/prices/history?pair=XLM/USDC' }) + + expect(mockQuery.mock.calls[0][1][0]).toBe(HISTORY_INTERVAL_SECONDS['1m']) + }) + + it('passes the correct bucket width for each interval', async () => { + const app = await buildApp() + mockQuery.mockResolvedValue({ rows: [] }) + + await app.inject({ method: 'GET', url: '/prices/history?pair=XLM/USDC&interval=5m' }) + expect(mockQuery.mock.calls[0][1][0]).toBe(300) + + mockQuery.mockClear() + await app.inject({ method: 'GET', url: '/prices/history?pair=XLM/USDC&interval=1h' }) + expect(mockQuery.mock.calls[0][1][0]).toBe(3600) + }) + + it('requires the pair parameter', async () => { + const app = await buildApp() + + const res = await app.inject({ method: 'GET', url: '/prices/history?interval=1m' }) + + expect(res.statusCode).toBe(400) + expect(res.json().error).toMatch(/pair/) + }) + + it('rejects an unsupported interval', async () => { + const app = await buildApp() + + const res = await app.inject({ method: 'GET', url: '/prices/history?pair=XLM/USDC&interval=15m' }) + + expect(res.statusCode).toBe(400) + expect(res.json().error).toMatch(/interval must be one of/) + }) + + it('rejects an invalid from date', async () => { + const app = await buildApp() + + const res = await app.inject({ method: 'GET', url: '/prices/history?pair=XLM/USDC&from=nope' }) + + expect(res.statusCode).toBe(400) + expect(res.json().error).toMatch(/from/) + }) + + it('rejects from after to', async () => { + const app = await buildApp() + + const res = await app.inject({ + method: 'GET', + url: '/prices/history?pair=XLM/USDC&from=2025-01-02T00:00:00Z&to=2025-01-01T00:00:00Z', + }) + + expect(res.statusCode).toBe(400) + expect(res.json().error).toMatch(/from must be before to/) + }) + + it('rejects windows that would exceed the max point cap', async () => { + const app = await buildApp() + + // 1m interval over ~30 days = ~43200 points, well over MAX_HISTORY_POINTS. + const res = await app.inject({ + method: 'GET', + url: '/prices/history?pair=XLM/USDC&interval=1m&from=2025-01-01T00:00:00Z&to=2025-02-01T00:00:00Z', + }) + + expect(res.statusCode).toBe(400) + expect(res.json().error).toMatch(/too large/) + expect(mockQuery).not.toHaveBeenCalled() + }) + + it('allows a large window at a coarse interval under the cap', async () => { + const app = await buildApp() + mockQuery.mockResolvedValue({ rows: [] }) + + // 1h interval over 30 days = ~720 points, under MAX_HISTORY_POINTS. + const res = await app.inject({ + method: 'GET', + url: '/prices/history?pair=XLM/USDC&interval=1h&from=2025-01-01T00:00:00Z&to=2025-02-01T00:00:00Z', + }) + + expect(res.statusCode).toBe(200) + expect(mockQuery).toHaveBeenCalledOnce() + }) + + it('passes the pair and date range through to the query', async () => { + const app = await buildApp() + mockQuery.mockResolvedValue({ rows: [] }) + + const from = '2025-01-01T00:00:00.000Z' + const to = '2025-01-01T06:00:00.000Z' + await app.inject({ method: 'GET', url: `/prices/history?pair=XLM/USDC&from=${from}&to=${to}` }) + + const params = mockQuery.mock.calls[0][1] + expect(params[1]).toBe('XLM/USDC') + expect(new Date(params[2]).toISOString()).toBe(from) + expect(new Date(params[3]).toISOString()).toBe(to) + }) + + it('MAX_HISTORY_POINTS guard math is interval-aware', () => { + // Sanity: the cap is points, not seconds — coarser intervals allow longer spans. + expect(MAX_HISTORY_POINTS).toBeGreaterThan(0) + }) +}) diff --git a/src/__tests__/snapshotIngester.test.ts b/src/__tests__/snapshotIngester.test.ts new file mode 100644 index 00000000..fc4e9652 --- /dev/null +++ b/src/__tests__/snapshotIngester.test.ts @@ -0,0 +1,87 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' + +const { mockQuery, mockGetActivePairs } = vi.hoisted(() => ({ + mockQuery: vi.fn(), + mockGetActivePairs: vi.fn(), +})) + +vi.mock('../db', () => ({ + prisma: {}, + pgPool: { query: mockQuery }, +})) + +vi.mock('../pairsRegistry', () => ({ + getActivePairs: mockGetActivePairs, +})) + +import { appendSnapshots, floorToMinute } from '../ingesters/snapshot' + +describe('floorToMinute', () => { + it('zeros seconds and milliseconds', () => { + expect(floorToMinute(new Date('2025-01-01T12:34:56.789Z')).toISOString()).toBe('2025-01-01T12:34:00.000Z') + }) + + it('is a no-op for an already minute-aligned time', () => { + expect(floorToMinute(new Date('2025-01-01T12:34:00.000Z')).toISOString()).toBe('2025-01-01T12:34:00.000Z') + }) + + it('does not roll over to the next minute at :59.999', () => { + expect(floorToMinute(new Date('2025-01-01T12:34:59.999Z')).toISOString()).toBe('2025-01-01T12:34:00.000Z') + }) + + it('does not mutate its input', () => { + const input = new Date('2025-01-01T12:34:56.789Z') + floorToMinute(input) + expect(input.toISOString()).toBe('2025-01-01T12:34:56.789Z') + }) +}) + +describe('appendSnapshots', () => { + beforeEach(() => { + mockQuery.mockReset() + mockGetActivePairs.mockReset() + }) + + it('does nothing and returns 0 when there are no active pairs', async () => { + mockGetActivePairs.mockReturnValue([]) + + const inserted = await appendSnapshots(new Date('2025-01-01T00:00:30Z')) + + expect(inserted).toBe(0) + expect(mockQuery).not.toHaveBeenCalled() + }) + + it('inserts at the floored minute and reports rows inserted', async () => { + mockGetActivePairs.mockReturnValue([ + { pairKey: 'XLM/USDC', assetA: {}, assetB: {} }, + { pairKey: 'XLM/yBTC', assetA: {}, assetB: {} }, + ]) + mockQuery.mockResolvedValue({ rowCount: 2 }) + + const inserted = await appendSnapshots(new Date('2025-01-01T00:00:42.500Z')) + + expect(inserted).toBe(2) + const params = mockQuery.mock.calls[0][1] + expect(params[0]).toEqual(['XLM/USDC', 'XLM/yBTC']) + // ts param must be floored to the minute boundary. + expect((params[1] as Date).toISOString()).toBe('2025-01-01T00:00:00.000Z') + }) + + it('returns 0 when the upsert skips all rows (idempotent re-run)', async () => { + mockGetActivePairs.mockReturnValue([{ pairKey: 'XLM/USDC', assetA: {}, assetB: {} }]) + mockQuery.mockResolvedValue({ rowCount: 0 }) + + const inserted = await appendSnapshots(new Date('2025-01-01T00:00:00Z')) + + expect(inserted).toBe(0) + }) + + it('uses ON CONFLICT DO NOTHING so duplicate minutes are skipped', async () => { + mockGetActivePairs.mockReturnValue([{ pairKey: 'XLM/USDC', assetA: {}, assetB: {} }]) + mockQuery.mockResolvedValue({ rowCount: 1 }) + + await appendSnapshots() + + expect(mockQuery.mock.calls[0][0]).toMatch(/ON CONFLICT \(pair, ts\) DO NOTHING/) + }) +}) diff --git a/src/__tests__/snapshotRetention.test.ts b/src/__tests__/snapshotRetention.test.ts new file mode 100644 index 00000000..4e17e180 --- /dev/null +++ b/src/__tests__/snapshotRetention.test.ts @@ -0,0 +1,50 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' + +const { mockQuery } = vi.hoisted(() => ({ mockQuery: vi.fn() })) + +vi.mock('../db', () => ({ + prisma: {}, + pgPool: { query: mockQuery }, +})) + +// bullmq pulls in ioredis at import time; stub it so the unit under test loads +// without a live Redis connection. +vi.mock('bullmq', () => ({ + Queue: class {}, + Worker: class {}, +})) + +import { pruneOldSnapshots, SNAPSHOT_RETENTION_DAYS } from '../jobs/snapshotRetention' + +describe('pruneOldSnapshots', () => { + beforeEach(() => { + mockQuery.mockReset() + }) + + it('defaults to the 30-day retention window', async () => { + mockQuery.mockResolvedValue({ rowCount: 5 }) + + const pruned = await pruneOldSnapshots() + + expect(SNAPSHOT_RETENTION_DAYS).toBe(30) + expect(pruned).toBe(5) + expect(mockQuery.mock.calls[0][1]).toEqual([30]) + expect(mockQuery.mock.calls[0][0]).toMatch(/DELETE FROM price_snapshots/) + expect(mockQuery.mock.calls[0][0]).toMatch(/ts < NOW\(\) - \(\$1 \|\| ' days'\)::interval/) + }) + + it('honors a custom retention window', async () => { + mockQuery.mockResolvedValue({ rowCount: 0 }) + + const pruned = await pruneOldSnapshots(7) + + expect(pruned).toBe(0) + expect(mockQuery.mock.calls[0][1]).toEqual([7]) + }) + + it('returns 0 when rowCount is null', async () => { + mockQuery.mockResolvedValue({ rowCount: null }) + + expect(await pruneOldSnapshots()).toBe(0) + }) +}) diff --git a/src/api/history.ts b/src/api/history.ts new file mode 100644 index 00000000..9c6c43e1 --- /dev/null +++ b/src/api/history.ts @@ -0,0 +1,114 @@ +import type { FastifyInstance } from 'fastify' +import { pgPool } from '../db' + +/** Supported aggregation intervals → bucket width in seconds. */ +export const HISTORY_INTERVAL_SECONDS: Record = { + '1m': 60, + '5m': 300, + '1h': 3600, +} + +/** Maximum number of buckets a single request may return. */ +export const MAX_HISTORY_POINTS = 10_000 + +export interface HistoryPoint { + ts: string + price: number + volume: number +} + +/** + * Aggregates 1-minute price snapshots into `intervalSecs`-wide buckets. + * + * Each bucket reports the close price (the last snapshot whose ts falls in the + * bucket) and the summed volume. Buckets are aligned to the Unix epoch so that + * e.g. a 1h bucket always starts on the hour, independent of the query window. + * With intervalSecs=60 this returns the raw snapshots unchanged. + */ +export async function queryHistory( + pair: string, + from: Date, + to: Date, + intervalSecs: number +): Promise { + const result = await pgPool.query( + `SELECT + to_timestamp(floor(EXTRACT(EPOCH FROM ts) / $1) * $1) AS bucket, + (array_agg(price::float ORDER BY ts DESC))[1] AS price, + SUM(volume::float) AS volume + FROM price_snapshots + WHERE pair = $2 + AND ts >= $3 + AND ts <= $4 + GROUP BY floor(EXTRACT(EPOCH FROM ts) / $1) + ORDER BY bucket ASC`, + [intervalSecs, pair, from, to] + ) + + return result.rows.map(r => ({ + ts: new Date(r.bucket).toISOString(), + price: Number(r.price), + volume: Number(r.volume), + })) +} + +/** + * GET /prices/history?pair=XLM/USDC&from=…&to=…&interval=1m|5m|1h + * + * Returns historical price snapshots for a pair over [from, to], optionally + * aggregated into 5m or 1h buckets. Defaults to the last 24h at 1m resolution. + * + * Note: this path is matched by the x402 `/price` prefix gate, so it requires + * payment when ORACLE_PAYMENT_ADDRESS is configured — consistent with the other + * price-data endpoints. + */ +export async function registerHistoryRoutes(app: FastifyInstance) { + app.get<{ + Querystring: { pair?: string; from?: string; to?: string; interval?: string } + }>('/prices/history', async (req, reply) => { + const { pair } = req.query + if (!pair) { + return reply.status(400).send({ error: 'pair query parameter is required' }) + } + + const interval = req.query.interval ?? '1m' + const intervalSecs = HISTORY_INTERVAL_SECONDS[interval] + if (!intervalSecs) { + return reply.status(400).send({ + error: `interval must be one of: ${Object.keys(HISTORY_INTERVAL_SECONDS).join(', ')}`, + }) + } + + const from = req.query.from ? new Date(req.query.from) : new Date(Date.now() - 24 * 60 * 60 * 1000) + const to = req.query.to ? new Date(req.query.to) : new Date() + + if (isNaN(from.getTime())) { + return reply.status(400).send({ error: 'from must be a valid ISO 8601 date' }) + } + if (isNaN(to.getTime())) { + return reply.status(400).send({ error: 'to must be a valid ISO 8601 date' }) + } + if (from.getTime() > to.getTime()) { + return reply.status(400).send({ error: 'from must be before to' }) + } + + // Guard against unbounded scans: cap the number of buckets the window can produce. + const spanSecs = (to.getTime() - from.getTime()) / 1000 + if (spanSecs / intervalSecs > MAX_HISTORY_POINTS) { + return reply.status(400).send({ + error: `requested window is too large for interval=${interval} (max ${MAX_HISTORY_POINTS} points); narrow the range or use a coarser interval`, + }) + } + + const points = await queryHistory(pair, from, to, intervalSecs) + + return { + pair, + interval, + from: from.toISOString(), + to: to.toISOString(), + count: points.length, + points, + } + }) +} diff --git a/src/index.ts b/src/index.ts index 17c5bf4c..7fd422ef 100644 --- a/src/index.ts +++ b/src/index.ts @@ -13,6 +13,7 @@ import { registerWebhookRoutes } from './routes/webhooks' import { registerCandleRoutes } from './routes/candles' import { registerPairsRoutes } from './routes/pairs' import { registerScreenerRoutes } from './routes/screener' +import { registerHistoryRoutes } from './api/history' import { registerX402 } from './middleware/x402' import { registerWebSocket } from './api/websocket' import { registerApiKeyAuth } from './api/auth' @@ -21,7 +22,9 @@ import { registerAdminRoutes } from './api/admin' import { startSDEXIngester } from './ingesters/sdex' import { startAMMIngester } from './ingesters/amm' import { startSoroswapIngester } from './ingesters/soroswap' +import { startSnapshotIngester } from './ingesters/snapshot' import { createAggregateQueue, startAggregateWorker, scheduleAggregateRefresh } from './jobs/aggregateRefresh' +import { createSnapshotRetentionQueue, startSnapshotRetentionWorker, scheduleSnapshotRetention } from './jobs/snapshotRetention' import { loadPersistedPairs, getActivePairs } from './pairsRegistry' import { getMetrics } from './metrics' @@ -96,6 +99,7 @@ async function main() { await registerCandleRoutes(app) await registerPairsRoutes(app) await registerScreenerRoutes(app) + await registerHistoryRoutes(app) await registerGraphQL(app) await registerWebSocket(app) @@ -119,6 +123,16 @@ async function main() { console.warn('[lens] Aggregate refresh worker skipped (Redis unavailable):', (err as Error).message) } + // ── Snapshot retention worker (non-blocking — requires Redis) ───────────── + try { + const retentionQueue = createSnapshotRetentionQueue() + startSnapshotRetentionWorker() + await scheduleSnapshotRetention(retentionQueue) + console.log('[lens] Snapshot retention worker started') + } catch (err) { + console.warn('[lens] Snapshot retention worker skipped (Redis unavailable):', (err as Error).message) + } + // ── Ingesters (run in background — infinite loops) ──────────────────────── // Each ingester is independently fault-isolated via restartIngester. // A crash in the Soroswap ingester cannot take down SDEX or AMM. @@ -132,6 +146,7 @@ async function main() { restartIngester('SDEX', startSDEXIngester) restartIngester('AMM', startAMMIngester) restartIngester('Soroswap', startSoroswapIngester) + restartIngester('Snapshot', startSnapshotIngester) console.log(`[lens] Watching ${getActivePairs().length} pairs: ${getActivePairs().map(p => p.pairKey).join(', ')}`) } diff --git a/src/ingesters/snapshot.ts b/src/ingesters/snapshot.ts new file mode 100644 index 00000000..1191c615 --- /dev/null +++ b/src/ingesters/snapshot.ts @@ -0,0 +1,87 @@ +import { pgPool } from '../db' +import { getActivePairs } from '../pairsRegistry' +import { price_snapshots_total } from '../metrics' + +/** + * Floors a Date down to the start of its minute (seconds/millis zeroed). + * The snapshot `ts` is always minute-aligned so the (pair, ts) primary key + * makes the append idempotent — a duplicate run within the same minute (e.g. + * after a restart) collides on the PK and is skipped rather than double-counted. + */ +export function floorToMinute(date: Date): Date { + const d = new Date(date) + d.setSeconds(0, 0) + return d +} + +/** + * Appends one price_snapshots row per active pair for the given minute. + * + * For each pair we take the most recent price seen in price_points and the + * total base_volume traded during the minute that just closed. Pairs with no + * price history yet are skipped (we don't fabricate a price). Returns the + * number of rows inserted. + */ +export async function appendSnapshots(now: Date = new Date()): Promise { + const pairs = getActivePairs() + if (pairs.length === 0) return 0 + + const ts = floorToMinute(now) + const pairKeys = pairs.map(p => p.pairKey) + + // One query for all pairs: latest price (most recent point) joined with the + // volume traded in the [ts, ts+1min) window. LEFT JOIN so a pair with a known + // price but no trades this minute still snapshots with volume 0. + const result = await pgPool.query( + `WITH latest AS ( + SELECT DISTINCT ON (pair_key) pair_key, price::numeric AS price + FROM price_points + WHERE pair_key = ANY($1) + ORDER BY pair_key, timestamp DESC + ), + vol AS ( + SELECT pair_key, SUM(base_volume::numeric) AS volume + FROM price_points + WHERE pair_key = ANY($1) + AND timestamp >= $2 + AND timestamp < $2 + INTERVAL '1 minute' + GROUP BY pair_key + ) + INSERT INTO price_snapshots (pair, ts, price, volume) + SELECT l.pair_key, $2, l.price, COALESCE(v.volume, 0) + FROM latest l + LEFT JOIN vol v ON v.pair_key = l.pair_key + ON CONFLICT (pair, ts) DO NOTHING`, + [pairKeys, ts] + ) + + const inserted = result.rowCount ?? 0 + if (inserted > 0) price_snapshots_total.inc(inserted) + return inserted +} + +function sleep(ms: number) { + return new Promise(r => setTimeout(r, ms)) +} + +/** + * Background loop: appends snapshots at the top of every minute. Aligns to the + * wall-clock minute boundary so snapshot timestamps land on :00 seconds and the + * volume window cleanly covers the minute that just elapsed. + */ +export async function startSnapshotIngester(): Promise { + console.log(`[snapshot] Starting 1-minute snapshot ingester for ${getActivePairs().length} pairs`) + + while (true) { + // Sleep until the next minute boundary. + const msToNextMinute = 60_000 - (Date.now() % 60_000) + await sleep(msToNextMinute) + + try { + const n = await appendSnapshots() + if (n > 0) console.log(`[snapshot] appended ${n} snapshot(s)`) + } catch (err) { + console.error('[snapshot] Error appending snapshots:', (err as Error).message) + } + } +} diff --git a/src/jobs/snapshotRetention.ts b/src/jobs/snapshotRetention.ts new file mode 100644 index 00000000..5f266ab1 --- /dev/null +++ b/src/jobs/snapshotRetention.ts @@ -0,0 +1,62 @@ +import { Queue, Worker } from 'bullmq' +import { pgPool } from '../db' + +const QUEUE_NAME = 'snapshot-retention' + +/** Snapshots older than this many days are pruned by the retention job. */ +export const SNAPSHOT_RETENTION_DAYS = 30 + +function redisConnection() { + const url = process.env.REDIS_URL + if (url) return { url } + return { host: 'localhost', port: 6379 } +} + +export function createSnapshotRetentionQueue() { + return new Queue(QUEUE_NAME, { connection: redisConnection() }) +} + +/** + * Deletes price_snapshots rows older than the retention window. Returns the + * number of rows pruned. Exported separately from the worker so it can be unit + * tested and invoked manually. + */ +export async function pruneOldSnapshots(retentionDays: number = SNAPSHOT_RETENTION_DAYS): Promise { + const result = await pgPool.query( + `DELETE FROM price_snapshots + WHERE ts < NOW() - ($1 || ' days')::interval`, + [retentionDays] + ) + return result.rowCount ?? 0 +} + +export function startSnapshotRetentionWorker() { + const worker = new Worker( + QUEUE_NAME, + async () => { + try { + const pruned = await pruneOldSnapshots() + if (pruned > 0) console.log(`[snapshot-retention] pruned ${pruned} snapshot(s) older than ${SNAPSHOT_RETENTION_DAYS}d`) + } catch (err) { + console.error('[snapshot-retention] prune failed:', (err as Error).message) + } + }, + { connection: redisConnection(), concurrency: 1 } + ) + + worker.on('failed', (_job, err) => { + console.error('[snapshot-retention] Job failed:', err.message) + }) + + return worker +} + +/** Schedules the retention prune to run hourly (and once on startup). */ +export async function scheduleSnapshotRetention(queue: Queue) { + await queue.add( + 'prune', + {}, + { repeat: { every: 60 * 60 * 1000 }, jobId: 'snapshot-retention:prune' } + ) + await queue.add('prune', {}) +} diff --git a/src/metrics.ts b/src/metrics.ts index a8bdfac7..f42c7177 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -21,6 +21,12 @@ export const amm_snapshots_total = new Counter({ registers: [register] }) +export const price_snapshots_total = new Counter({ + name: 'price_snapshots_total', + help: 'Total number of 1-minute price snapshots appended', + registers: [register] +}) + export const price_requests_total = new Counter({ name: 'price_requests_total', help: 'Total number of price API requests served',