Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/price-history-snapshots.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"lens": minor
---

Add `/prices/history` endpoint backed by 1-minute price snapshots. A new `price_snapshots` table is appended to every minute by a snapshot ingester, queryable over a `[from, to]` window with optional `5m`/`1h` aggregation. A retention job prunes snapshots older than 30 days.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Aggregates price data from Stellar's Classic Order Book (SDEX) and AMM Liquidity
| GET | `/price/:assetA/:assetB` | Current VWAP, 24h volume, best route |
| GET | `/price/:assetA/:assetB/route?amount=1000` | Best execution route for a given amount |
| GET | `/price/:assetA/:assetB/history?window=1h` | OHLCV history (`1m`, `5m`, `1h`, `24h`) |
| GET | `/prices/history?pair=XLM/USDC&from=…&to=…&interval=1m` | Historical 1-minute price snapshots, optionally aggregated (`1m`, `5m`, `1h`); ~30-day retention |
| GET | `/pools` | Active AMM pools being watched |
| GET | `/pairs` | Watched trading pairs |
| GET | `/status` | Indexer health |
Expand Down
75 changes: 75 additions & 0 deletions openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,81 @@ paths:
'429':
$ref: '#/components/responses/TooManyRequests'

/prices/history:
get:
operationId: getPriceHistory
summary: Get historical 1-minute price snapshots
description: >-
Returns minute-resolution price snapshots for a pair over [from, to],
optionally aggregated into 5m or 1h buckets. Snapshots are retained for
~30 days. Gated by x402 (matches the /price prefix).
security:
- x402: []
parameters:
- name: pair
in: query
required: true
schema:
type: string
example: XLM/USDC
- name: interval
in: query
required: false
schema:
type: string
enum: [1m, 5m, 1h]
default: 1m
- name: from
in: query
required: false
schema:
type: string
format: date-time
- name: to
in: query
required: false
schema:
type: string
format: date-time
responses:
'200':
description: Historical price snapshots
content:
application/json:
schema:
type: object
properties:
pair:
type: string
interval:
type: string
from:
type: string
format: date-time
to:
type: string
format: date-time
count:
type: integer
points:
type: array
items:
type: object
properties:
ts:
type: string
format: date-time
price:
type: number
volume:
type: number
'400':
description: Invalid parameters
'402':
$ref: '#/components/responses/PaymentRequired'
'429':
$ref: '#/components/responses/TooManyRequests'

/pairs:
get:
operationId: listPairs
Expand Down
11 changes: 11 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ model PriceAggregate {
@@map("price_aggregates")
}

model PriceSnapshot {
pair String
ts DateTime
price Decimal @db.Decimal(36, 18)
volume Decimal @default(0) @db.Decimal(36, 7)

@@id([pair, ts])
@@index([pair, ts])
@@map("price_snapshots")
}

model IndexerState {
id String @id
lastCursor String? @map("last_cursor")
Expand Down
14 changes: 14 additions & 0 deletions sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ CREATE TABLE IF NOT EXISTS price_aggregates (
PRIMARY KEY (pair_key, "window", bucket)
);

-- 1-minute price snapshot ring buffer.
-- The ingester appends one row per watched pair per minute; a retention job
-- prunes rows older than 30 days. Powers the /prices/history endpoint (charts,
-- backtests, audit trails) without paying the cost of scanning raw price_points.
CREATE TABLE IF NOT EXISTS price_snapshots (
pair TEXT NOT NULL,
ts TIMESTAMPTZ NOT NULL,
price NUMERIC(36, 18) NOT NULL,
volume NUMERIC(36, 7) NOT NULL DEFAULT 0,
PRIMARY KEY (pair, ts)
);

CREATE INDEX IF NOT EXISTS idx_price_snapshots_pair_ts ON price_snapshots (pair, ts);

-- Indexer cursor state
CREATE TABLE IF NOT EXISTS indexer_state (
id TEXT PRIMARY KEY,
Expand Down
164 changes: 164 additions & 0 deletions src/__tests__/history.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import { describe, it, expect, vi, beforeEach } from 'vitest'
import Fastify from 'fastify'

const { mockQuery } = vi.hoisted(() => ({ mockQuery: vi.fn() }))

vi.mock('../db', () => ({
prisma: {},
pgPool: { query: mockQuery },
}))

import { registerHistoryRoutes, HISTORY_INTERVAL_SECONDS, MAX_HISTORY_POINTS } from '../api/history'

async function buildApp() {
const app = Fastify({ logger: false })
await registerHistoryRoutes(app)
await app.ready()
return app
}

function makeRow(overrides: Record<string, unknown> = {}) {
return {
bucket: new Date('2025-01-01T00:00:00Z'),
price: '1.25',
volume: '500',
...overrides,
}
}

describe('GET /prices/history', () => {
beforeEach(() => {
mockQuery.mockReset()
})

it('returns points with ts/price/volume and a count matching the rows', async () => {
const app = await buildApp()
mockQuery.mockResolvedValue({
rows: [
makeRow(),
makeRow({ bucket: new Date('2025-01-01T00:01:00Z'), price: '1.30', volume: '100' }),
],
})

const res = await app.inject({ method: 'GET', url: '/prices/history?pair=XLM/USDC&interval=1m' })

expect(res.statusCode).toBe(200)
const body = res.json()
expect(body.pair).toBe('XLM/USDC')
expect(body.interval).toBe('1m')
expect(body.count).toBe(2)
expect(body.points).toHaveLength(2)
expect(body.points[0]).toEqual({
ts: '2025-01-01T00:00:00.000Z',
price: 1.25,
volume: 500,
})
})

it('defaults to 1m interval when none is given', async () => {
const app = await buildApp()
mockQuery.mockResolvedValue({ rows: [] })

await app.inject({ method: 'GET', url: '/prices/history?pair=XLM/USDC' })

expect(mockQuery.mock.calls[0][1][0]).toBe(HISTORY_INTERVAL_SECONDS['1m'])
})

it('passes the correct bucket width for each interval', async () => {
const app = await buildApp()
mockQuery.mockResolvedValue({ rows: [] })

await app.inject({ method: 'GET', url: '/prices/history?pair=XLM/USDC&interval=5m' })
expect(mockQuery.mock.calls[0][1][0]).toBe(300)

mockQuery.mockClear()
await app.inject({ method: 'GET', url: '/prices/history?pair=XLM/USDC&interval=1h' })
expect(mockQuery.mock.calls[0][1][0]).toBe(3600)
})

it('requires the pair parameter', async () => {
const app = await buildApp()

const res = await app.inject({ method: 'GET', url: '/prices/history?interval=1m' })

expect(res.statusCode).toBe(400)
expect(res.json().error).toMatch(/pair/)
})

it('rejects an unsupported interval', async () => {
const app = await buildApp()

const res = await app.inject({ method: 'GET', url: '/prices/history?pair=XLM/USDC&interval=15m' })

expect(res.statusCode).toBe(400)
expect(res.json().error).toMatch(/interval must be one of/)
})

it('rejects an invalid from date', async () => {
const app = await buildApp()

const res = await app.inject({ method: 'GET', url: '/prices/history?pair=XLM/USDC&from=nope' })

expect(res.statusCode).toBe(400)
expect(res.json().error).toMatch(/from/)
})

it('rejects from after to', async () => {
const app = await buildApp()

const res = await app.inject({
method: 'GET',
url: '/prices/history?pair=XLM/USDC&from=2025-01-02T00:00:00Z&to=2025-01-01T00:00:00Z',
})

expect(res.statusCode).toBe(400)
expect(res.json().error).toMatch(/from must be before to/)
})

it('rejects windows that would exceed the max point cap', async () => {
const app = await buildApp()

// 1m interval over ~30 days = ~43200 points, well over MAX_HISTORY_POINTS.
const res = await app.inject({
method: 'GET',
url: '/prices/history?pair=XLM/USDC&interval=1m&from=2025-01-01T00:00:00Z&to=2025-02-01T00:00:00Z',
})

expect(res.statusCode).toBe(400)
expect(res.json().error).toMatch(/too large/)
expect(mockQuery).not.toHaveBeenCalled()
})

it('allows a large window at a coarse interval under the cap', async () => {
const app = await buildApp()
mockQuery.mockResolvedValue({ rows: [] })

// 1h interval over 30 days = ~720 points, under MAX_HISTORY_POINTS.
const res = await app.inject({
method: 'GET',
url: '/prices/history?pair=XLM/USDC&interval=1h&from=2025-01-01T00:00:00Z&to=2025-02-01T00:00:00Z',
})

expect(res.statusCode).toBe(200)
expect(mockQuery).toHaveBeenCalledOnce()
})

it('passes the pair and date range through to the query', async () => {
const app = await buildApp()
mockQuery.mockResolvedValue({ rows: [] })

const from = '2025-01-01T00:00:00.000Z'
const to = '2025-01-01T06:00:00.000Z'
await app.inject({ method: 'GET', url: `/prices/history?pair=XLM/USDC&from=${from}&to=${to}` })

const params = mockQuery.mock.calls[0][1]
expect(params[1]).toBe('XLM/USDC')
expect(new Date(params[2]).toISOString()).toBe(from)
expect(new Date(params[3]).toISOString()).toBe(to)
})

it('MAX_HISTORY_POINTS guard math is interval-aware', () => {
// Sanity: the cap is points, not seconds — coarser intervals allow longer spans.
expect(MAX_HISTORY_POINTS).toBeGreaterThan(0)
})
})
87 changes: 87 additions & 0 deletions src/__tests__/snapshotIngester.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { describe, it, expect, vi, beforeEach } from 'vitest'

const { mockQuery, mockGetActivePairs } = vi.hoisted(() => ({
mockQuery: vi.fn(),
mockGetActivePairs: vi.fn(),
}))

vi.mock('../db', () => ({
prisma: {},
pgPool: { query: mockQuery },
}))

vi.mock('../pairsRegistry', () => ({
getActivePairs: mockGetActivePairs,
}))

import { appendSnapshots, floorToMinute } from '../ingesters/snapshot'

describe('floorToMinute', () => {
it('zeros seconds and milliseconds', () => {
expect(floorToMinute(new Date('2025-01-01T12:34:56.789Z')).toISOString()).toBe('2025-01-01T12:34:00.000Z')
})

it('is a no-op for an already minute-aligned time', () => {
expect(floorToMinute(new Date('2025-01-01T12:34:00.000Z')).toISOString()).toBe('2025-01-01T12:34:00.000Z')
})

it('does not roll over to the next minute at :59.999', () => {
expect(floorToMinute(new Date('2025-01-01T12:34:59.999Z')).toISOString()).toBe('2025-01-01T12:34:00.000Z')
})

it('does not mutate its input', () => {
const input = new Date('2025-01-01T12:34:56.789Z')
floorToMinute(input)
expect(input.toISOString()).toBe('2025-01-01T12:34:56.789Z')
})
})

describe('appendSnapshots', () => {
beforeEach(() => {
mockQuery.mockReset()
mockGetActivePairs.mockReset()
})

it('does nothing and returns 0 when there are no active pairs', async () => {
mockGetActivePairs.mockReturnValue([])

const inserted = await appendSnapshots(new Date('2025-01-01T00:00:30Z'))

expect(inserted).toBe(0)
expect(mockQuery).not.toHaveBeenCalled()
})

it('inserts at the floored minute and reports rows inserted', async () => {
mockGetActivePairs.mockReturnValue([
{ pairKey: 'XLM/USDC', assetA: {}, assetB: {} },
{ pairKey: 'XLM/yBTC', assetA: {}, assetB: {} },
])
mockQuery.mockResolvedValue({ rowCount: 2 })

const inserted = await appendSnapshots(new Date('2025-01-01T00:00:42.500Z'))

expect(inserted).toBe(2)
const params = mockQuery.mock.calls[0][1]
expect(params[0]).toEqual(['XLM/USDC', 'XLM/yBTC'])
// ts param must be floored to the minute boundary.
expect((params[1] as Date).toISOString()).toBe('2025-01-01T00:00:00.000Z')
})

it('returns 0 when the upsert skips all rows (idempotent re-run)', async () => {
mockGetActivePairs.mockReturnValue([{ pairKey: 'XLM/USDC', assetA: {}, assetB: {} }])
mockQuery.mockResolvedValue({ rowCount: 0 })

const inserted = await appendSnapshots(new Date('2025-01-01T00:00:00Z'))

expect(inserted).toBe(0)
})

it('uses ON CONFLICT DO NOTHING so duplicate minutes are skipped', async () => {
mockGetActivePairs.mockReturnValue([{ pairKey: 'XLM/USDC', assetA: {}, assetB: {} }])
mockQuery.mockResolvedValue({ rowCount: 1 })

await appendSnapshots()

expect(mockQuery.mock.calls[0][0]).toMatch(/ON CONFLICT \(pair, ts\) DO NOTHING/)
})
})
Loading