From 9c838daf53e7c2197dad26761642f5d28bc1cdeb Mon Sep 17 00:00:00 2001 From: Mikevill20 Date: Wed, 27 May 2026 14:00:47 +0100 Subject: [PATCH] Add GET /api/streams/:id/claimable --- backend/package-lock.json | 38 ++++++++ backend/package.json | 6 +- backend/src/index.ts | 62 +++++++++++++ backend/src/integration.test.ts | 136 +++++++++++++++++++++++++++- backend/src/services/streamStore.ts | 102 +++++++++------------ backend/src/swagger.ts | 55 +++++++++++ 6 files changed, 334 insertions(+), 65 deletions(-) diff --git a/backend/package-lock.json b/backend/package-lock.json index 8145302..00e6858 100644 --- a/backend/package-lock.json +++ b/backend/package-lock.json @@ -17,6 +17,7 @@ "express-rate-limit": "^7.5.0", "jsonwebtoken": "^9.0.2", "p-limit": "^4.0.0", + "prom-client": "^15.1.3", "swagger-ui-express": "^5.0.1", "ws": "^8.20.0", "zod": "^4.3.6" @@ -624,6 +625,15 @@ "url": "https://paulmillr.com/funding/" } }, + "node_modules/@opentelemetry/api": { + "version": "1.9.1", + "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.9.1.tgz", + "integrity": "sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q==", + "license": "Apache-2.0", + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/@paralleldrive/cuid2": { "version": "2.3.1", "resolved": "https://registry.npmjs.org/@paralleldrive/cuid2/-/cuid2-2.3.1.tgz", @@ -1651,6 +1661,12 @@ "file-uri-to-path": "1.0.0" } }, + "node_modules/bintrees": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/bintrees/-/bintrees-1.0.2.tgz", + "integrity": "sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw==", + "license": "MIT" + }, "node_modules/bl": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/bl/-/bl-4.1.0.tgz", @@ -3783,6 +3799,19 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/prom-client": { + "version": "15.1.3", + "resolved": "https://registry.npmjs.org/prom-client/-/prom-client-15.1.3.tgz", + "integrity": "sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g==", + "license": "Apache-2.0", + "dependencies": { + "@opentelemetry/api": "^1.4.0", + "tdigest": "^0.1.1" + }, + "engines": { + "node": "^16 || ^18 || >=20" + } + }, "node_modules/proxy-addr": { "version": "2.0.7", "resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz", @@ -4545,6 +4574,15 @@ "node": ">=6" } }, + "node_modules/tdigest": { + "version": "0.1.2", + "resolved": "https://registry.npmjs.org/tdigest/-/tdigest-0.1.2.tgz", + "integrity": "sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==", + "license": "MIT", + "dependencies": { + "bintrees": "1.0.2" + } + }, "node_modules/test-exclude": { "version": "6.0.0", "resolved": "https://registry.npmjs.org/test-exclude/-/test-exclude-6.0.0.tgz", diff --git a/backend/package.json b/backend/package.json index eb58bc8..cc8e5b1 100644 --- a/backend/package.json +++ b/backend/package.json @@ -19,7 +19,7 @@ "express-rate-limit": "^7.5.0", "jsonwebtoken": "^9.0.2", "p-limit": "^4.0.0", - + "prom-client": "^15.1.3", "swagger-ui-express": "^5.0.1", "ws": "^8.20.0", "zod": "^4.3.6" @@ -34,10 +34,10 @@ "@types/supertest": "^6.0.2", "@types/swagger-ui-express": "^4.1.8", "@types/ws": "^8.5.10", + "@vitest/coverage-v8": "^1.0.0", "supertest": "^7.0.0", "ts-node-dev": "^2.0.0", "typescript": "^5.2.2", - "vitest": "^1.0.0", - "@vitest/coverage-v8": "^1.0.0" + "vitest": "^1.0.0" } } diff --git a/backend/src/index.ts b/backend/src/index.ts index 683b18d..0500559 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -34,6 +34,8 @@ import { cancelStream, createStream, getStream, + getOnChainClaimableAmount, + getLatestLedgerTime, initSoroban, listStreams, listStreamsByRecipient, @@ -185,6 +187,25 @@ const mutationLimiter = rateLimit({ }, }); +const CLAIMABLE_RATE_LIMIT = Number(process.env.CLAIMABLE_RATE_LIMIT ?? 30); + +const claimableLimiter = rateLimit({ + windowMs: 60 * 1000, + max: CLAIMABLE_RATE_LIMIT, + standardHeaders: true, + legacyHeaders: false, + handler: (req: Request, res: Response) => { + const resetTime = (req as any).rateLimit?.resetTime; + const retryAfter = resetTime + ? Math.ceil((resetTime.getTime() - Date.now()) / 1000) + : 60; + res.set("Retry-After", String(Math.max(1, retryAfter))); + sendApiError(req, res, 429, "Too many requests. Please try again later.", { + code: "RATE_LIMIT_EXCEEDED", + }); + }, +}); + app.use(cors()); app.use(requestLogger); app.use(express.json()); @@ -416,6 +437,47 @@ app.get("/api/streams/:id", readLimiter, (req: Request, res: Response) => { }); }); +app.get("/api/streams/:id/claimable", claimableLimiter, async (req: Request, res: Response) => { + const parsedId = parseStreamId(req.params.id); + if (!parsedId.ok) { + sendValidationError(req, res, parsedId.issues); + return; + } + + const stream = getStream(parsedId.value); + if (!stream) { + sendApiError(req, res, 404, "Stream not found.", { code: "NOT_FOUND" }); + return; + } + + try { + if (stream.pausedAt !== undefined || stream.canceledAt !== undefined) { + const at = await getLatestLedgerTime(); + res.json({ + streamId: stream.id, + claimableAmount: 0, + assetCode: stream.assetCode, + at, + }); + return; + } + + const { claimableAmount, at } = await getOnChainClaimableAmount(stream.id); + res.json({ + streamId: stream.id, + claimableAmount: Number(claimableAmount), + assetCode: stream.assetCode, + at, + }); + } catch (error: any) { + console.error("Failed to query claimable amount:", error); + const normalizedError = normalizeUnknownApiError(error, "Failed to query claimable amount."); + sendApiError(req, res, normalizedError.statusCode, normalizedError.message, { + code: normalizedError.code ?? "INTERNAL_ERROR", + }); + } +}); + app.get("/api/recipients/:accountId/streams", readLimiter, (req: Request, res: Response) => { const parsedParams = recipientAccountIdSchema.safeParse({ accountId: req.params.accountId, diff --git a/backend/src/integration.test.ts b/backend/src/integration.test.ts index 13b4059..d7ec8b1 100644 --- a/backend/src/integration.test.ts +++ b/backend/src/integration.test.ts @@ -1,4 +1,4 @@ -import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest"; +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import request from "supertest"; import { app } from "./index"; import { initDb, getDb } from "./services/db"; @@ -6,6 +6,28 @@ import { Keypair } from "@stellar/stellar-sdk"; import path from "path"; import fs from "fs"; +const mockSimulateTransaction = vi.fn(); +const mockGetLatestLedger = vi.fn(); + +vi.mock("@stellar/stellar-sdk", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + rpc: { + ...actual.rpc, + Server: vi.fn().mockImplementation(() => ({ + getLatestLedger: mockGetLatestLedger, + simulateTransaction: mockSimulateTransaction, + prepareTransaction: vi.fn().mockImplementation((tx) => tx), + })), + Api: { + ...actual.rpc.Api, + isSimulationSuccess: (response: any) => response.kind === "success", + } + }, + }; +}); + // Use a separate test database const TEST_DB_PATH = path.join(__dirname, "..", "data", "test-streams.db"); @@ -415,6 +437,118 @@ describe("Backend Integration Tests", () => { }); }); + describe("GET /api/streams/:id/claimable", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("should return 200 and the claimable amount from Soroban simulation", async () => { + mockGetLatestLedger.mockResolvedValue({ + sequence: 12345, + closeTime: "1716812160", + }); + + mockSimulateTransaction.mockResolvedValue({ + kind: "success", + result: { + retval: 450, + }, + }); + + const response = await request(app).get(`/api/streams/${mockStream.id}/claimable`); + + expect(response.status).toBe(200); + expect(response.body).toEqual({ + streamId: mockStream.id, + claimableAmount: 450, + assetCode: mockStream.assetCode, + at: 1716812160, + }); + + expect(mockGetLatestLedger).toHaveBeenCalled(); + expect(mockSimulateTransaction).toHaveBeenCalled(); + }); + + it("should return 0 when stream is paused", async () => { + const db = getDb(); + db.prepare("UPDATE streams SET paused_at = ? WHERE id = ?").run(Math.floor(Date.now() / 1000) - 1000, mockStream.id); + + mockGetLatestLedger.mockResolvedValue({ + sequence: 12345, + closeTime: "1716812160", + }); + + const response = await request(app).get(`/api/streams/${mockStream.id}/claimable`); + + expect(response.status).toBe(200); + expect(response.body).toEqual({ + streamId: mockStream.id, + claimableAmount: 0, + assetCode: mockStream.assetCode, + at: 1716812160, + }); + + expect(mockSimulateTransaction).not.toHaveBeenCalled(); + }); + + it("should return 0 when stream is canceled", async () => { + const db = getDb(); + db.prepare("UPDATE streams SET canceled_at = ? WHERE id = ?").run(Math.floor(Date.now() / 1000) - 1000, mockStream.id); + + mockGetLatestLedger.mockResolvedValue({ + sequence: 12345, + closeTime: "1716812160", + }); + + const response = await request(app).get(`/api/streams/${mockStream.id}/claimable`); + + expect(response.status).toBe(200); + expect(response.body).toEqual({ + streamId: mockStream.id, + claimableAmount: 0, + assetCode: mockStream.assetCode, + at: 1716812160, + }); + + expect(mockSimulateTransaction).not.toHaveBeenCalled(); + }); + + it("should return 404 for non-existent stream", async () => { + const response = await request(app).get("/api/streams/999/claimable"); + + expect(response.status).toBe(404); + expect(response.body.error).toBe("Stream not found."); + }); + + it("should return 400 for invalid stream ID", async () => { + const response = await request(app).get("/api/streams/invalid-id/claimable"); + + expect(response.status).toBe(400); + expect(response.body.error).toContain("Stream ID must be"); + }); + + it("should enforce rate limit of 30 requests per minute", async () => { + mockGetLatestLedger.mockResolvedValue({ + sequence: 12345, + closeTime: "1716812160", + }); + mockSimulateTransaction.mockResolvedValue({ + kind: "success", + result: { retval: 10 }, + }); + + for (let i = 0; i < 31; i++) { + const response = await request(app).get(`/api/streams/${mockStream.id}/claimable`); + if (i < 30) { + expect(response.status).toBe(200); + } else { + expect(response.status).toBe(429); + expect(response.body.code).toBe("RATE_LIMIT_EXCEEDED"); + } + } + }); + }); + describe("GET /api/recipients/:accountId/streams", () => { it("should get streams for a recipient", async () => { const response = await request(app) diff --git a/backend/src/services/streamStore.ts b/backend/src/services/streamStore.ts index c9030e7..88009bc 100644 --- a/backend/src/services/streamStore.ts +++ b/backend/src/services/streamStore.ts @@ -398,10 +398,7 @@ export function calculateProgress( stream.pausedAt !== undefined ? Math.min(at, stream.pausedAt) : at; const elapsed = Math.max(0, Math.min(effectiveAt, effectiveEnd) - stream.startAt); - ?Math.min(stream.canceledAt, streamEnd + pausedDuration) - : streamEnd + pausedDuration; - const elapsed = Math.max(0, Math.min(at, effectiveEnd) - stream.startAt - pausedDuration); const ratio = Math.min(1, elapsed / stream.durationSeconds); const vestedAmount = stream.totalAmount * ratio; @@ -415,6 +412,46 @@ export function calculateProgress( }; } +export async function getOnChainClaimableAmount( + id: string, +): Promise<{ claimableAmount: number; at: number }> { + const sorobanContext = getSorobanContext(); + if (!sorobanContext || !rpcServer) { + throw new Error("Soroban RPC server is not initialized."); + } + + const sourceAccount = await sorobanContext.sourceAccountPromise; + const latestLedger = await rpcServer.getLatestLedger(); + const at = latestLedger.closeTime ? parseInt(latestLedger.closeTime, 10) : Math.floor(Date.now() / 1000); + + const simRes = await simulateContractCall( + sorobanContext.contract, + sourceAccount, + "claimable", + nativeToScVal(parseInt(id), { type: "u64" }), + nativeToScVal(at, { type: "u64" }), + ); + + if (!rpc.Api.isSimulationSuccess(simRes) || !simRes.result) { + throw new Error("Simulation failed: " + JSON.stringify(simRes)); + } + + const claimableAmount = Number(scValToNative(simRes.result.retval)); + return { claimableAmount, at }; +} + +export async function getLatestLedgerTime(): Promise { + if (!rpcServer) { + return Math.floor(Date.now() / 1000); + } + try { + const latestLedger = await rpcServer.getLatestLedger(); + return latestLedger.closeTime ? parseInt(latestLedger.closeTime, 10) : Math.floor(Date.now() / 1000); + } catch (e) { + return Math.floor(Date.now() / 1000); + } +} + export async function syncStreams() { const sorobanContext = getSorobanContext(); if (!sorobanContext) return; @@ -968,64 +1005,7 @@ export async function cancelStream( return stream; } -export function pauseStream(id: string): StreamRecord { - const stream = getStream(id); - if (!stream) { - const err: any = new Error("Stream not found."); - err.statusCode = 404; - throw err; - } - - const status = computeStatus(stream, nowInSeconds()); - if (status !== "active") { - const err: any = new Error("Only active streams can be paused."); - err.statusCode = 400; - throw err; - } - - stream.pausedAt = nowInSeconds(); - const db = getDb(); - db.transaction(() => { - upsertStream(stream); - recordEventWithDb(db, stream.id, "paused", stream.pausedAt!, stream.sender); - })(); - - triggerWebhook("paused", stream); - return stream; -} - -export function resumeStream(id: string): StreamRecord { - const stream = getStream(id); - if (!stream) { - const err: any = new Error("Stream not found."); - err.statusCode = 404; - throw err; - } - - if (stream.pausedAt === undefined) { - const err: any = new Error("Stream is not paused."); - err.statusCode = 400; - throw err; - } - - const now = nowInSeconds(); - const elapsed = now - stream.pausedAt; - stream.pausedDuration = (stream.pausedDuration ?? 0) + elapsed; - // Extend the effective duration so the recipient doesn't lose vesting time. - stream.durationSeconds += elapsed; - stream.pausedAt = undefined; - - const db = getDb(); - db.transaction(() => { - upsertStream(stream); - recordEventWithDb(db, stream.id, "resumed", now, stream.sender, undefined, { - pausedDuration: stream.pausedDuration, - }); - })(); - - triggerWebhook("resumed", stream); - return stream; -} +// Duplicate pauseStream and resumeStream functions removed export function updateStreamStartAt(id: string, newStartAt: number, diff --git a/backend/src/swagger.ts b/backend/src/swagger.ts index 9ed0547..1392d4c 100644 --- a/backend/src/swagger.ts +++ b/backend/src/swagger.ts @@ -694,6 +694,61 @@ export const swaggerDocument = { }, }, }, + "/api/streams/{id}/claimable": { + get: { + summary: "Get real-time claimable amount", + description: "Retrieves the current real-time claimable amount for a stream using Soroban contract simulation. Returns 0 if paused, canceled, or before the cliff.", + parameters: [ + { + name: "id", + in: "path", + required: true, + description: "The unique ID of the stream.", + schema: { + type: "string", + }, + }, + ], + responses: { + "200": { + description: "Real-time claimable amount and query context.", + content: { + "application/json": { + schema: { + type: "object", + properties: { + streamId: { type: "string", example: "1" }, + claimableAmount: { type: "number", example: 450.123456 }, + assetCode: { type: "string", example: "USDC" }, + at: { type: "integer", description: "Ledger timestamp at which query was simulated", example: 1716812160 }, + }, + }, + }, + }, + }, + "404": { + description: "Stream not found.", + content: { + "application/json": { + schema: { + $ref: "#/components/schemas/Error", + }, + }, + }, + }, + "500": { + description: "Failed to simulate claimable amount.", + content: { + "application/json": { + schema: { + $ref: "#/components/schemas/Error", + }, + }, + }, + }, + }, + }, + }, "/api/recipients/{accountId}/streams": { get: { summary: "Get recipient streams",