diff --git a/.changeset/graphql-price-subscriptions.md b/.changeset/graphql-price-subscriptions.md new file mode 100644 index 0000000..09ddf06 --- /dev/null +++ b/.changeset/graphql-price-subscriptions.md @@ -0,0 +1,5 @@ +--- +"lens": minor +--- + +Add a `priceUpdated(pair: String!)` GraphQL subscription that streams live prices over the existing `/graphql` endpoint (graphql-transport-ws protocol). Every ingester (SDEX, Horizon AMM, Soroswap) now publishes `{ pair, price, ts }` on each new price; subscribers receive only the pair they request. diff --git a/README.md b/README.md index 051fe8f..aa050f8 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,8 @@ Aggregates price data from Stellar's Classic Order Book (SDEX) and AMM Liquidity | GET | `/status` | Indexer health | ### GraphQL -Available at `/graphql` with GraphiQL IDE at `/graphiql`. +Available at `/graphql` with GraphiQL IDE at `/graphiql`. Real-time price +streaming is available via the `priceUpdated` [subscription](#graphql-subscriptions-live-prices). ```graphql query { @@ -44,6 +45,60 @@ query { } ``` +### GraphQL Subscriptions (live prices) + +Lens exposes a `priceUpdated(pair)` subscription that pushes a message every time +an ingester (SDEX, Horizon AMM, or Soroswap) records a new price for the pair. +It runs over the same `/graphql` endpoint using the `graphql-transport-ws` +protocol, so any [`graphql-ws`](https://github.com/enisdenjo/graphql-ws) client works. + +```graphql +subscription { + priceUpdated(pair: "XLM/USDC") { + pair + price + ts + } +} +``` + +```bash +npm install graphql-ws ws +``` + +```typescript +import { createClient } from "graphql-ws"; +import WebSocket from "ws"; // browsers already have WebSocket globally + +const client = createClient({ + url: "ws://localhost:3002/graphql", + webSocketImpl: WebSocket, // omit in the browser +}); + +// `subscribe` returns an unsubscribe function — call it to close the channel. +const unsubscribe = client.subscribe( + { + query: `subscription ($pair: String!) { + priceUpdated(pair: $pair) { pair price ts } + }`, + variables: { pair: "XLM/USDC" }, + }, + { + next: ({ data }) => console.log("price:", data.priceUpdated), + error: (err) => console.error("subscription error:", err), + complete: () => console.log("subscription closed"), + }, +); + +// Later — stop receiving updates and close the socket cleanly: +// unsubscribe(); +``` + +> **Note:** the `pair` argument is the canonical `pairKey` (alphabetically +> sorted, e.g. `XLM:native/USDC:GA5...`). Use the `listPairs` query to discover +> the exact keys being indexed. Only the pair you subscribe to is delivered; +> updates for other pairs are filtered out server-side. + ## Usage Examples Lens gates `/price`, `/pools`, and `/candles` behind x402 micropayments on Stellar (testnet by default). The `/status` endpoint is free. diff --git a/src/__tests__/graphqlSubscription.test.ts b/src/__tests__/graphqlSubscription.test.ts new file mode 100644 index 0000000..9066433 --- /dev/null +++ b/src/__tests__/graphqlSubscription.test.ts @@ -0,0 +1,144 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest' +import Fastify, { type FastifyInstance } from 'fastify' +import WebSocket from 'ws' +import type { AddressInfo } from 'net' + +// graphql.ts pulls in redis / db / aggregator / config at import time — stub the +// ones the Query resolvers touch so importing the module is side-effect free. +// The subscription path under test does not use any of them. +vi.mock('../redis', () => ({ getCachedPrice: vi.fn() })) +vi.mock('../db', () => ({ pgPool: { query: vi.fn() } })) +vi.mock('../aggregator/vwap', () => ({ getAggregatedPrice: vi.fn() })) +vi.mock('../aggregator/bestRoute', () => ({ getBestRoute: vi.fn() })) +vi.mock('../config', () => ({ config: { pairs: [] } })) + +import { registerGraphQL } from '../api/graphql' +import { publishPriceUpdate, priceEmitter } from '../events' + +const SUBPROTOCOL = 'graphql-transport-ws' + +async function buildServer(): Promise<{ app: FastifyInstance; url: string }> { + const app = Fastify({ logger: false }) + await registerGraphQL(app) + await app.listen({ port: 0, host: '127.0.0.1' }) + const { port } = app.server.address() as AddressInfo + return { app, url: `ws://127.0.0.1:${port}/graphql` } +} + +/** Open a graphql-transport-ws connection and complete the connection_init handshake. */ +function connect(url: string): Promise { + return new Promise((resolve, reject) => { + const ws = new WebSocket(url, SUBPROTOCOL) + ws.on('error', reject) + ws.on('open', () => ws.send(JSON.stringify({ type: 'connection_init' }))) + ws.on('message', function onAck(raw) { + const msg = JSON.parse(raw.toString()) + if (msg.type === 'connection_ack') { + ws.off('message', onAck) + resolve(ws) + } + }) + }) +} + +/** Wait for the next message of a given type, with a timeout. */ +function waitFor(ws: WebSocket, type: string, timeoutMs = 2000): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + ws.off('message', onMsg) + reject(new Error(`timed out waiting for "${type}"`)) + }, timeoutMs) + function onMsg(raw: WebSocket.RawData) { + const msg = JSON.parse(raw.toString()) + if (msg.type === type) { + clearTimeout(timer) + ws.off('message', onMsg) + resolve(msg) + } + } + ws.on('message', onMsg) + }) +} + +const SUBSCRIBE = (pair: string) => ({ + id: '1', + type: 'subscribe', + payload: { + query: `subscription($pair: String!) { + priceUpdated(pair: $pair) { pair price ts } + }`, + variables: { pair }, + }, +}) + +describe('GraphQL priceUpdated subscription', () => { + let app: FastifyInstance + let url: string + + beforeEach(async () => { + ;({ app, url } = await buildServer()) + }) + + afterEach(async () => { + await app.close() + // app.close() fires the onClose hook that detaches the bridge listener. + expect(priceEmitter.listenerCount('price:published')).toBe(0) + }) + + it('streams updates for the subscribed pair', async () => { + const ws = await connect(url) + ws.send(JSON.stringify(SUBSCRIBE('XLM/USDC'))) + + // Give the server a tick to register the subscription before publishing. + await new Promise(r => setTimeout(r, 100)) + publishPriceUpdate({ pair: 'XLM/USDC', price: 0.1234, ts: '2026-06-02T00:00:00.000Z' }) + + const next = await waitFor(ws, 'next') + expect(next.id).toBe('1') + expect(next.payload.data.priceUpdated).toEqual({ + pair: 'XLM/USDC', + price: 0.1234, + ts: '2026-06-02T00:00:00.000Z', + }) + + ws.close() + }) + + it('does not deliver updates for other pairs', async () => { + const ws = await connect(url) + ws.send(JSON.stringify(SUBSCRIBE('XLM/USDC'))) + await new Promise(r => setTimeout(r, 100)) + + // A different pair must be filtered out… + publishPriceUpdate({ pair: 'BTC/USDC', price: 99, ts: '2026-06-02T00:00:00.000Z' }) + // …while the subscribed pair still comes through. + publishPriceUpdate({ pair: 'XLM/USDC', price: 0.5, ts: '2026-06-02T00:00:01.000Z' }) + + const next = await waitFor(ws, 'next') + expect(next.payload.data.priceUpdated.pair).toBe('XLM/USDC') + expect(next.payload.data.priceUpdated.price).toBe(0.5) + + ws.close() + }) + + it('closes the channel cleanly on complete', async () => { + const ws = await connect(url) + ws.send(JSON.stringify(SUBSCRIBE('XLM/USDC'))) + await new Promise(r => setTimeout(r, 100)) + + // Client-initiated unsubscribe. + ws.send(JSON.stringify({ id: '1', type: 'complete' })) + await new Promise(r => setTimeout(r, 100)) + + // After completing, further publishes for that pair must not arrive. + let received = false + ws.on('message', raw => { + if (JSON.parse(raw.toString()).type === 'next') received = true + }) + publishPriceUpdate({ pair: 'XLM/USDC', price: 1, ts: '2026-06-02T00:00:02.000Z' }) + await new Promise(r => setTimeout(r, 200)) + + expect(received).toBe(false) + ws.close() + }) +}) diff --git a/src/__tests__/schemaValidation.test.ts b/src/__tests__/schemaValidation.test.ts new file mode 100644 index 0000000..9747365 --- /dev/null +++ b/src/__tests__/schemaValidation.test.ts @@ -0,0 +1,121 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import Fastify from 'fastify' +import Ajv from 'ajv' + +const { mockQuery, mockGetCachedPrice, mockGetBestRoute, mockGetAggregatedPrice } = vi.hoisted(() => ({ + mockQuery: vi.fn(), + mockGetCachedPrice: vi.fn(), + mockGetBestRoute: vi.fn(), + mockGetAggregatedPrice: vi.fn(), +})) + +vi.mock('../db', () => ({ + pgPool: { query: mockQuery }, +})) + +vi.mock('../redis', () => ({ + getCachedPrice: mockGetCachedPrice, + setCachedPrice: vi.fn(), +})) + +vi.mock('../aggregator/bestRoute', () => ({ + getBestRoute: mockGetBestRoute, +})) + +vi.mock('../aggregator/vwap', () => ({ + getAggregatedPrice: mockGetAggregatedPrice, +})) + +vi.mock('../config', () => ({ + config: { + pairs: [ + { + pairKey: 'USDC/XLM', + assetA: { code: 'XLM', issuer: null }, + assetB: { code: 'USDC', issuer: 'GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5' }, + }, + ], + cache: { priceTtl: 10 }, + }, +})) + +import { registerRESTRoutes } from '../api/rest' +import { priceResponseSchema } from '../api/schemas' + +async function buildApp() { + const app = Fastify({ logger: false }) + await registerRESTRoutes(app) + await app.ready() + return app +} + +/** A valid aggregated-price payload matching getAggregatedPrice's return type. */ +function validAggregate() { + return { + price: 0.1, + sdexPrice: 0.1, + ammPrice: 0.1, + volume24h: 150, + sdexVolume24h: 100, + ammVolume24h: 50, + vwap1m: 0.1, + vwap5m: 0.1, + vwap1h: 0.1, + vwap24h: 0.1, + priceChange24h: 1.1, + sources: 2, + confidence: 'high' as const, + lastTradeAgeSeconds: 10, + } +} + +describe('REST response schema validation', () => { + beforeEach(() => { + vi.clearAllMocks() + mockGetCachedPrice.mockResolvedValue(null) + mockGetBestRoute.mockResolvedValue({ route: 'SDEX' }) + }) + + it('returns 200 and a body that matches the declared /price schema', async () => { + mockGetAggregatedPrice.mockResolvedValue(validAggregate()) + + const app = await buildApp() + const res = await app.inject({ method: 'GET', url: '/price/XLM/USDC' }) + + expect(res.statusCode).toBe(200) + + // The serialized body must independently satisfy the published schema. + const ajv = new Ajv({ allErrors: true }) + const validate = ajv.compile(priceResponseSchema as object) + const ok = validate(res.json()) + // Surface a readable diff if the body ever drifts from the schema. + expect(ok, ajv.errorsText(validate.errors)).toBe(true) + }) + + it('fails (500) when the response grows an undeclared field', async () => { + // Simulate an accidental shape change: an extra property leaks into the body. + mockGetAggregatedPrice.mockResolvedValue({ + ...validAggregate(), + surpriseField: 'should not be here', + }) + + const app = await buildApp() + const res = await app.inject({ method: 'GET', url: '/price/XLM/USDC' }) + + // additionalProperties: false → serializer validation throws → 500. + expect(res.statusCode).toBe(500) + }) + + it('fails (500) when a field changes type', async () => { + // confidence must be one of the enum strings; a number is a shape break. + mockGetAggregatedPrice.mockResolvedValue({ + ...validAggregate(), + confidence: 42 as unknown as 'high', + }) + + const app = await buildApp() + const res = await app.inject({ method: 'GET', url: '/price/XLM/USDC' }) + + expect(res.statusCode).toBe(500) + }) +}) diff --git a/src/api/graphql.ts b/src/api/graphql.ts index 24ba53e..5e7f13a 100644 --- a/src/api/graphql.ts +++ b/src/api/graphql.ts @@ -1,11 +1,17 @@ import type { FastifyInstance } from 'fastify' import { price_requests_total } from '../metrics' -import mercurius from 'mercurius' +import mercurius, { withFilter, type MercuriusContext } from 'mercurius' import { getCachedPrice } from '../redis' import { getAggregatedPrice } from '../aggregator/vwap' import { getBestRoute } from '../aggregator/bestRoute' import { pgPool } from '../db' import { config } from '../config' +import { priceEmitter, PRICE_PUBLISHED, type PricePublishedEvent } from '../events' + +// Mercurius pubsub topic that carries every new price. A single app-level +// listener bridges the ingesters' in-process `priceEmitter` onto this topic; +// each subscriber then filters it down to the pair they asked for. +const PRICE_TOPIC = 'PRICE_UPDATED' const schema = ` type AggregatedPrice { @@ -53,12 +59,23 @@ const schema = ` low: Float } + type PriceUpdate { + pair: String! + price: Float! + ts: String! + } + type Query { getPrice(assetA: String!, assetB: String!): AggregatedPrice getBestRoute(assetA: String!, assetB: String!, amount: Float!): RouteInfo getPriceHistory(assetA: String!, assetB: String!, window: String!, limit: Int): [PriceBucket] listPairs: [String]! } + + type Subscription { + """Streams a PriceUpdate every time the ingester records a new price for the given pair.""" + priceUpdated(pair: String!): PriceUpdate! + } ` function makePairKey(a: string, b: string): string { @@ -142,6 +159,15 @@ const resolvers = { return config.pairs.map(p => p.pairKey) }, }, + + Subscription: { + priceUpdated: { + subscribe: withFilter<{ priceUpdated: PricePublishedEvent }, unknown, MercuriusContext, { pair: string }>( + (_root, _args, { pubsub }) => pubsub.subscribe(PRICE_TOPIC), + (payload, { pair }) => payload.priceUpdated.pair === pair + ), + }, + }, } export async function registerGraphQL(app: FastifyInstance) { @@ -150,5 +176,28 @@ export async function registerGraphQL(app: FastifyInstance) { resolvers, graphiql: true, path: '/graphql', + subscription: { + // Speak the `graphql-transport-ws` subprotocol (the modern `graphql-ws` + // library) rather than the legacy `subscriptions-transport-ws`. + fullWsTransport: true, + wsDefaultSubprotocol: 'graphql-transport-ws', + }, + }) + + // Bridge: forward every price the ingesters emit onto the GraphQL pubsub + // topic. Mercurius wraps the payload under the subscription field name so + // `withFilter` and the resolver receive `{ priceUpdated: }`. + const onPricePublished = (event: PricePublishedEvent) => { + app.graphql.pubsub.publish({ + topic: PRICE_TOPIC, + payload: { priceUpdated: event }, + }) + } + priceEmitter.on(PRICE_PUBLISHED, onPricePublished) + + // Detach the listener when the server shuts down so repeated + // register/close cycles (e.g. in tests) don't leak listeners. + app.addHook('onClose', async () => { + priceEmitter.off(PRICE_PUBLISHED, onPricePublished) }) } diff --git a/src/api/rest.ts b/src/api/rest.ts index 30737d1..89c84e6 100644 --- a/src/api/rest.ts +++ b/src/api/rest.ts @@ -5,6 +5,14 @@ import { getAggregatedPrice } from '../aggregator/vwap' import { getBestRoute } from '../aggregator/bestRoute' import { pgPool } from '../db' import { config } from '../config' +import { + statusResponseSchema, + priceResponseSchema, + routeResponseSchema, + historyResponseSchema, + poolsResponseSchema, + installResponseValidation, +} from './schemas' function makePairKey(a: string, b: string): string { return [a, b].sort().join('/') @@ -22,8 +30,13 @@ function findPair(assetA: string, assetB: string) { } export async function registerRESTRoutes(app: FastifyInstance) { + // Validate every response against its declared schema in dev/test (no-op in + // production). Must run before the routes below are registered so they pick + // up the validating serializer. + installResponseValidation(app) + // GET /status — public health/monitoring endpoint (no API key required) - app.get('/status', { config: { public: true } }, async () => { + app.get('/status', { config: { public: true }, schema: { response: { 200: statusResponseSchema } } }, async () => { const result = await pgPool.query( `SELECT last_ledger, last_processed_at FROM indexer_state ORDER BY updated_at DESC LIMIT 1` ) @@ -39,6 +52,7 @@ export async function registerRESTRoutes(app: FastifyInstance) { // GET /price/:assetA/:assetB app.get<{ Params: { assetA: string; assetB: string } }>( '/price/:assetA/:assetB', + { schema: { response: { 200: priceResponseSchema } } }, async (req, reply) => { price_requests_total.inc() const { assetA, assetB } = req.params @@ -76,6 +90,7 @@ export async function registerRESTRoutes(app: FastifyInstance) { Querystring: { amount?: string } }>( '/price/:assetA/:assetB/route', + { schema: { response: { 200: routeResponseSchema } } }, async (req, reply) => { const { assetA, assetB } = req.params const amount = parseFloat(req.query.amount ?? '1000') @@ -93,6 +108,7 @@ export async function registerRESTRoutes(app: FastifyInstance) { Querystring: { window?: string; limit?: string } }>( '/price/:assetA/:assetB/history', + { schema: { response: { 200: historyResponseSchema } } }, async (req, reply) => { const { assetA, assetB } = req.params const window = req.query.window ?? '1h' @@ -134,7 +150,7 @@ export async function registerRESTRoutes(app: FastifyInstance) { ) // GET /pools - app.get('/pools', async () => { + app.get('/pools', { schema: { response: { 200: poolsResponseSchema } } }, async () => { const result = await pgPool.query( `SELECT DISTINCT ON (pool_id) pool_id, asset_a, asset_b, reserve_a::float, reserve_b::float, spot_price::float, fee_bp, timestamp diff --git a/src/api/schemas.ts b/src/api/schemas.ts new file mode 100644 index 0000000..096ee9e --- /dev/null +++ b/src/api/schemas.ts @@ -0,0 +1,204 @@ +/** + * Shared JSON Schema objects for REST response validation. + * + * These schemas serve two purposes: + * 1. They are attached to routes via `{ schema: { response: { 200: ... } } }`, + * which lets Fastify auto-generate a correct OpenAPI/Swagger spec and use + * fast, schema-aware serialization. + * 2. In non-production environments they additionally *validate* the outgoing + * payload (see `installResponseValidation`), so an accidental change to a + * response shape fails loudly in tests instead of shipping silently. + * + * NOTE: response validation has no runtime impact in production — it is only + * installed when NODE_ENV !== 'production'. In production Fastify falls back to + * its default (fast) serializer for these same schemas. + */ +import Ajv from 'ajv' +import type { FastifyInstance, FastifySchema } from 'fastify' + +/** GET /status */ +export const statusResponseSchema = { + type: 'object', + required: ['ok', 'watchedPairs', 'lastIndexedLedger', 'lastProcessedAt'], + additionalProperties: false, + properties: { + ok: { type: 'boolean' }, + watchedPairs: { type: 'array', items: { type: 'string' } }, + lastIndexedLedger: { type: ['integer', 'null'] }, + // The pg driver returns a timestamp column as a JS Date; the validating + // serializer sees that pre-serialization object, so accept Date | string | + // null here (a Date stringifies to an ISO string in the response body). + lastProcessedAt: {}, + }, +} as const + +/** GET /price/:assetA/:assetB */ +export const priceResponseSchema = { + type: 'object', + required: [ + 'assetA', + 'assetB', + 'pairKey', + 'price', + 'sdexPrice', + 'ammPrice', + 'volume24h', + 'sdexVolume24h', + 'ammVolume24h', + 'vwap1m', + 'vwap5m', + 'vwap1h', + 'vwap24h', + 'priceChange24h', + 'sources', + 'confidence', + 'lastTradeAgeSeconds', + 'bestRoute', + 'lastUpdated', + ], + additionalProperties: false, + properties: { + assetA: { type: 'string' }, + assetB: { type: 'string' }, + pairKey: { type: 'string' }, + price: { type: 'number' }, + sdexPrice: { type: 'number' }, + ammPrice: { type: 'number' }, + volume24h: { type: 'number' }, + sdexVolume24h: { type: 'number' }, + ammVolume24h: { type: 'number' }, + vwap1m: { type: 'number' }, + vwap5m: { type: 'number' }, + vwap1h: { type: 'number' }, + vwap24h: { type: 'number' }, + priceChange24h: { type: 'number' }, + sources: { type: 'integer' }, + confidence: { type: 'string', enum: ['high', 'medium', 'low', 'unknown'] }, + lastTradeAgeSeconds: { type: ['integer', 'null'] }, + bestRoute: { type: 'string', enum: ['SDEX', 'AMM', 'SPLIT', 'UNKNOWN'] }, + lastUpdated: { type: 'string' }, + }, +} as const + +/** GET /price/:assetA/:assetB/route — matches RouteInfo */ +export const routeResponseSchema = { + type: 'object', + required: [ + 'route', + 'sdexPrice', + 'ammPrice', + 'estimatedOutput', + 'slippagePct', + 'recommendation', + ], + additionalProperties: false, + properties: { + route: { type: 'string', enum: ['SDEX', 'AMM', 'SPLIT', 'UNKNOWN'] }, + sdexPrice: { type: 'number' }, + ammPrice: { type: 'number' }, + estimatedOutput: { type: 'number' }, + slippagePct: { type: 'number' }, + recommendation: { type: 'string' }, + }, +} as const + +/** GET /price/:assetA/:assetB/history */ +export const historyResponseSchema = { + type: 'object', + required: ['pairKey', 'window', 'buckets'], + additionalProperties: false, + properties: { + pairKey: { type: 'string' }, + window: { type: 'string', enum: ['1m', '5m', '1h', '24h'] }, + buckets: { + type: 'array', + items: { + type: 'object', + required: [ + 'bucket', + 'vwap', + 'sdexVwap', + 'ammVwap', + 'volume', + 'tradeCount', + 'open', + 'close', + 'high', + 'low', + ], + additionalProperties: false, + properties: { + bucket: {}, + vwap: { type: ['number', 'null'] }, + sdexVwap: { type: ['number', 'null'] }, + ammVwap: { type: ['number', 'null'] }, + volume: { type: ['number', 'null'] }, + tradeCount: { type: ['integer', 'null'] }, + open: { type: ['number', 'null'] }, + close: { type: ['number', 'null'] }, + high: { type: ['number', 'null'] }, + low: { type: ['number', 'null'] }, + }, + }, + }, + }, +} as const + +/** + * GET /pools + * + * Rows come straight from a raw SQL query whose column set may evolve, so the + * row shape is intentionally permissive (additionalProperties allowed). The + * envelope, however, is validated. + */ +export const poolsResponseSchema = { + type: 'object', + required: ['pools'], + additionalProperties: false, + properties: { + pools: { + type: 'array', + items: { + type: 'object', + additionalProperties: true, + properties: { + pool_id: { type: 'string' }, + asset_a: { type: 'string' }, + asset_b: { type: 'string' }, + reserve_a: { type: ['number', 'null'] }, + reserve_b: { type: ['number', 'null'] }, + spot_price: { type: ['number', 'null'] }, + fee_bp: { type: ['integer', 'null'] }, + }, + }, + }, + }, +} as const + +/** + * Install response-shape validation on a Fastify instance. + * + * Default Fastify behaviour is to *serialize* against the response schema, + * silently dropping unknown keys — which means a drifted shape would still + * return 200. To make accidental shape changes fail loudly we replace the + * serializer with one that first validates the payload against the schema and + * throws a 500 (FST_ERR_RESPONSE_SERIALIZATION) when it does not match. + * + * Skipped entirely in production so there is no added runtime cost there. + */ +export function installResponseValidation(app: FastifyInstance): void { + if (process.env.NODE_ENV === 'production') return + + const ajv = new Ajv({ allErrors: true, coerceTypes: false }) + + app.setSerializerCompiler(({ schema }: { schema: FastifySchema }) => { + const validate = ajv.compile(schema as object) + return (data: unknown) => { + if (!validate(data)) { + const detail = ajv.errorsText(validate.errors, { dataVar: 'response' }) + throw new Error(`Response does not match schema: ${detail}`) + } + return JSON.stringify(data) + } + }) +} diff --git a/src/events.ts b/src/events.ts index 5e46e1e..0ba5719 100644 --- a/src/events.ts +++ b/src/events.ts @@ -11,3 +11,28 @@ export interface PriceUpdateEvent { currentPrice: number timestamp: Date } + +/** + * Emitted by every ingester after it records a new price for a pair. This is + * the feed that backs the GraphQL `priceUpdated` subscription. It is kept + * deliberately minimal — `{ pair, price, ts }` — so it is cheap to publish on + * the ingester hot path and the GraphQL layer can filter purely on `pair`. + */ +export const PRICE_PUBLISHED = 'price:published' + +export interface PricePublishedEvent { + /** pairKey, e.g. "XLM:native/USDC:GA5..." */ + pair: string + price: number + /** ISO-8601 timestamp of when the price was recorded */ + ts: string +} + +/** + * Publish a new price to all live subscribers (GraphQL `priceUpdated`). + * Fire-and-forget: emitting is synchronous and never throws, so ingesters can + * call it without a try/catch on their hot path. + */ +export function publishPriceUpdate(event: PricePublishedEvent): void { + priceEmitter.emit(PRICE_PUBLISHED, event) +} diff --git a/src/ingesters/amm.ts b/src/ingesters/amm.ts index f2aae24..534a4f8 100644 --- a/src/ingesters/amm.ts +++ b/src/ingesters/amm.ts @@ -4,6 +4,7 @@ import { config } from '../config' import { getActivePairs } from '../pairsRegistry' import { upsertPricePoints, getIndexerCursor, setIndexerCursor, prisma } from '../db' import { dispatchPriceUpdate } from '../webhookDispatcher' +import { publishPriceUpdate } from '../events' import type { WatchedPair } from '../types' const horizonServer = new Horizon.Server(config.horizon.url) @@ -85,6 +86,13 @@ export async function snapshotPool(pool: any, pair: WatchedPair): Promise const previousPrice = lastPrice.get(pair.pairKey) ?? spotPrice lastPrice.set(pair.pairKey, spotPrice) + + publishPriceUpdate({ + pair: pair.pairKey, + price: spotPrice, + ts: new Date().toISOString(), + }) + dispatchPriceUpdate({ assetA: pair.assetA.code, assetB: pair.assetB.code, @@ -146,6 +154,12 @@ export async function ingestPoolTrades(pool: any, pair: WatchedPair): Promise { await setIndexerCursor(stateId, lastCursor) console.log(`[sdex] ${pair.pairKey}: ingested ${points.length} trades`) + publishPriceUpdate({ + pair: pair.pairKey, + price: currentPrice, + ts: points[points.length - 1].timestamp.toISOString(), + }) + dispatchPriceUpdate({ assetA: pair.assetA.code, assetB: pair.assetB.code, diff --git a/src/ingesters/soroswap.ts b/src/ingesters/soroswap.ts index b1accc2..20b816c 100644 --- a/src/ingesters/soroswap.ts +++ b/src/ingesters/soroswap.ts @@ -23,6 +23,7 @@ import { config } from '../config' import { getActivePairs } from '../pairsRegistry' import { upsertPricePoints } from '../db' import { dispatchPriceUpdate } from '../webhookDispatcher' +import { publishPriceUpdate } from '../events' import type { WatchedPair } from '../types' // ── Constants ───────────────────────────────────────────────────────────────── @@ -247,6 +248,12 @@ export async function ingestPool( const previousPrice = lastPrice.get(pair.pairKey) ?? spotPrice lastPrice.set(pair.pairKey, spotPrice) + publishPriceUpdate({ + pair: pair.pairKey, + price: spotPrice, + ts: new Date().toISOString(), + }) + dispatchPriceUpdate({ assetA: pair.assetA.code, assetB: pair.assetB.code,