Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/graphql-price-subscriptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"lens": minor
---

Add a `priceUpdated(pair: String!)` GraphQL subscription that streams live prices over the existing `/graphql` endpoint (graphql-transport-ws protocol). Every ingester (SDEX, Horizon AMM, Soroswap) now publishes `{ pair, price, ts }` on each new price; subscribers receive only the pair they request.
57 changes: 56 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ Aggregates price data from Stellar's Classic Order Book (SDEX) and AMM Liquidity
| GET | `/status` | Indexer health |

### GraphQL
Available at `/graphql` with GraphiQL IDE at `/graphiql`.
Available at `/graphql` with GraphiQL IDE at `/graphiql`. Real-time price
streaming is available via the `priceUpdated` [subscription](#graphql-subscriptions-live-prices).

```graphql
query {
Expand All @@ -44,6 +45,60 @@ query {
}
```

### GraphQL Subscriptions (live prices)

Lens exposes a `priceUpdated(pair)` subscription that pushes a message every time
an ingester (SDEX, Horizon AMM, or Soroswap) records a new price for the pair.
It runs over the same `/graphql` endpoint using the `graphql-transport-ws`
protocol, so any [`graphql-ws`](https://github.com/enisdenjo/graphql-ws) client works.

```graphql
subscription {
priceUpdated(pair: "XLM/USDC") {
pair
price
ts
}
}
```

```bash
npm install graphql-ws ws
```

```typescript
import { createClient } from "graphql-ws";
import WebSocket from "ws"; // browsers already have WebSocket globally

const client = createClient({
url: "ws://localhost:3002/graphql",
webSocketImpl: WebSocket, // omit in the browser
});

// `subscribe` returns an unsubscribe function — call it to close the channel.
const unsubscribe = client.subscribe(
{
query: `subscription ($pair: String!) {
priceUpdated(pair: $pair) { pair price ts }
}`,
variables: { pair: "XLM/USDC" },
},
{
next: ({ data }) => console.log("price:", data.priceUpdated),
error: (err) => console.error("subscription error:", err),
complete: () => console.log("subscription closed"),
},
);

// Later — stop receiving updates and close the socket cleanly:
// unsubscribe();
```

> **Note:** the `pair` argument is the canonical `pairKey` (alphabetically
> sorted, e.g. `XLM:native/USDC:GA5...`). Use the `listPairs` query to discover
> the exact keys being indexed. Only the pair you subscribe to is delivered;
> updates for other pairs are filtered out server-side.

## Usage Examples

Lens gates `/price`, `/pools`, and `/candles` behind x402 micropayments on Stellar (testnet by default). The `/status` endpoint is free.
Expand Down
144 changes: 144 additions & 0 deletions src/__tests__/graphqlSubscription.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'
import Fastify, { type FastifyInstance } from 'fastify'
import WebSocket from 'ws'
import type { AddressInfo } from 'net'

// graphql.ts pulls in redis / db / aggregator / config at import time — stub the
// ones the Query resolvers touch so importing the module is side-effect free.
// The subscription path under test does not use any of them.
vi.mock('../redis', () => ({ getCachedPrice: vi.fn() }))
vi.mock('../db', () => ({ pgPool: { query: vi.fn() } }))
vi.mock('../aggregator/vwap', () => ({ getAggregatedPrice: vi.fn() }))
vi.mock('../aggregator/bestRoute', () => ({ getBestRoute: vi.fn() }))
vi.mock('../config', () => ({ config: { pairs: [] } }))

import { registerGraphQL } from '../api/graphql'
import { publishPriceUpdate, priceEmitter } from '../events'

const SUBPROTOCOL = 'graphql-transport-ws'

async function buildServer(): Promise<{ app: FastifyInstance; url: string }> {
const app = Fastify({ logger: false })
await registerGraphQL(app)
await app.listen({ port: 0, host: '127.0.0.1' })
const { port } = app.server.address() as AddressInfo
return { app, url: `ws://127.0.0.1:${port}/graphql` }
}

/** Open a graphql-transport-ws connection and complete the connection_init handshake. */
function connect(url: string): Promise<WebSocket> {
return new Promise((resolve, reject) => {
const ws = new WebSocket(url, SUBPROTOCOL)
ws.on('error', reject)
ws.on('open', () => ws.send(JSON.stringify({ type: 'connection_init' })))
ws.on('message', function onAck(raw) {
const msg = JSON.parse(raw.toString())
if (msg.type === 'connection_ack') {
ws.off('message', onAck)
resolve(ws)
}
})
})
}

/** Wait for the next message of a given type, with a timeout. */
function waitFor(ws: WebSocket, type: string, timeoutMs = 2000): Promise<any> {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
ws.off('message', onMsg)
reject(new Error(`timed out waiting for "${type}"`))
}, timeoutMs)
function onMsg(raw: WebSocket.RawData) {
const msg = JSON.parse(raw.toString())
if (msg.type === type) {
clearTimeout(timer)
ws.off('message', onMsg)
resolve(msg)
}
}
ws.on('message', onMsg)
})
}

const SUBSCRIBE = (pair: string) => ({
id: '1',
type: 'subscribe',
payload: {
query: `subscription($pair: String!) {
priceUpdated(pair: $pair) { pair price ts }
}`,
variables: { pair },
},
})

describe('GraphQL priceUpdated subscription', () => {
let app: FastifyInstance
let url: string

beforeEach(async () => {
;({ app, url } = await buildServer())
})

afterEach(async () => {
await app.close()
// app.close() fires the onClose hook that detaches the bridge listener.
expect(priceEmitter.listenerCount('price:published')).toBe(0)
})

it('streams updates for the subscribed pair', async () => {
const ws = await connect(url)
ws.send(JSON.stringify(SUBSCRIBE('XLM/USDC')))

// Give the server a tick to register the subscription before publishing.
await new Promise(r => setTimeout(r, 100))
publishPriceUpdate({ pair: 'XLM/USDC', price: 0.1234, ts: '2026-06-02T00:00:00.000Z' })

const next = await waitFor(ws, 'next')
expect(next.id).toBe('1')
expect(next.payload.data.priceUpdated).toEqual({
pair: 'XLM/USDC',
price: 0.1234,
ts: '2026-06-02T00:00:00.000Z',
})

ws.close()
})

it('does not deliver updates for other pairs', async () => {
const ws = await connect(url)
ws.send(JSON.stringify(SUBSCRIBE('XLM/USDC')))
await new Promise(r => setTimeout(r, 100))

// A different pair must be filtered out…
publishPriceUpdate({ pair: 'BTC/USDC', price: 99, ts: '2026-06-02T00:00:00.000Z' })
// …while the subscribed pair still comes through.
publishPriceUpdate({ pair: 'XLM/USDC', price: 0.5, ts: '2026-06-02T00:00:01.000Z' })

const next = await waitFor(ws, 'next')
expect(next.payload.data.priceUpdated.pair).toBe('XLM/USDC')
expect(next.payload.data.priceUpdated.price).toBe(0.5)

ws.close()
})

it('closes the channel cleanly on complete', async () => {
const ws = await connect(url)
ws.send(JSON.stringify(SUBSCRIBE('XLM/USDC')))
await new Promise(r => setTimeout(r, 100))

// Client-initiated unsubscribe.
ws.send(JSON.stringify({ id: '1', type: 'complete' }))
await new Promise(r => setTimeout(r, 100))

// After completing, further publishes for that pair must not arrive.
let received = false
ws.on('message', raw => {
if (JSON.parse(raw.toString()).type === 'next') received = true
})
publishPriceUpdate({ pair: 'XLM/USDC', price: 1, ts: '2026-06-02T00:00:02.000Z' })
await new Promise(r => setTimeout(r, 200))

expect(received).toBe(false)
ws.close()
})
})
121 changes: 121 additions & 0 deletions src/__tests__/schemaValidation.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { describe, it, expect, vi, beforeEach } from 'vitest'
import Fastify from 'fastify'
import Ajv from 'ajv'

const { mockQuery, mockGetCachedPrice, mockGetBestRoute, mockGetAggregatedPrice } = vi.hoisted(() => ({
mockQuery: vi.fn(),
mockGetCachedPrice: vi.fn(),
mockGetBestRoute: vi.fn(),
mockGetAggregatedPrice: vi.fn(),
}))

vi.mock('../db', () => ({
pgPool: { query: mockQuery },
}))

vi.mock('../redis', () => ({
getCachedPrice: mockGetCachedPrice,
setCachedPrice: vi.fn(),
}))

vi.mock('../aggregator/bestRoute', () => ({
getBestRoute: mockGetBestRoute,
}))

vi.mock('../aggregator/vwap', () => ({
getAggregatedPrice: mockGetAggregatedPrice,
}))

vi.mock('../config', () => ({
config: {
pairs: [
{
pairKey: 'USDC/XLM',
assetA: { code: 'XLM', issuer: null },
assetB: { code: 'USDC', issuer: 'GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5' },
},
],
cache: { priceTtl: 10 },
},
}))

import { registerRESTRoutes } from '../api/rest'
import { priceResponseSchema } from '../api/schemas'

async function buildApp() {
const app = Fastify({ logger: false })
await registerRESTRoutes(app)
await app.ready()
return app
}

/** A valid aggregated-price payload matching getAggregatedPrice's return type. */
function validAggregate() {
return {
price: 0.1,
sdexPrice: 0.1,
ammPrice: 0.1,
volume24h: 150,
sdexVolume24h: 100,
ammVolume24h: 50,
vwap1m: 0.1,
vwap5m: 0.1,
vwap1h: 0.1,
vwap24h: 0.1,
priceChange24h: 1.1,
sources: 2,
confidence: 'high' as const,
lastTradeAgeSeconds: 10,
}
}

describe('REST response schema validation', () => {
beforeEach(() => {
vi.clearAllMocks()
mockGetCachedPrice.mockResolvedValue(null)
mockGetBestRoute.mockResolvedValue({ route: 'SDEX' })
})

it('returns 200 and a body that matches the declared /price schema', async () => {
mockGetAggregatedPrice.mockResolvedValue(validAggregate())

const app = await buildApp()
const res = await app.inject({ method: 'GET', url: '/price/XLM/USDC' })

expect(res.statusCode).toBe(200)

// The serialized body must independently satisfy the published schema.
const ajv = new Ajv({ allErrors: true })
const validate = ajv.compile(priceResponseSchema as object)
const ok = validate(res.json())
// Surface a readable diff if the body ever drifts from the schema.
expect(ok, ajv.errorsText(validate.errors)).toBe(true)
})

it('fails (500) when the response grows an undeclared field', async () => {
// Simulate an accidental shape change: an extra property leaks into the body.
mockGetAggregatedPrice.mockResolvedValue({
...validAggregate(),
surpriseField: 'should not be here',
})

const app = await buildApp()
const res = await app.inject({ method: 'GET', url: '/price/XLM/USDC' })

// additionalProperties: false → serializer validation throws → 500.
expect(res.statusCode).toBe(500)
})

it('fails (500) when a field changes type', async () => {
// confidence must be one of the enum strings; a number is a shape break.
mockGetAggregatedPrice.mockResolvedValue({
...validAggregate(),
confidence: 42 as unknown as 'high',
})

const app = await buildApp()
const res = await app.inject({ method: 'GET', url: '/price/XLM/USDC' })

expect(res.statusCode).toBe(500)
})
})
Loading