Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import {
cancelStream,
createStream,
getStream,
getOnChainClaimableAmount,
getLatestLedgerTime,
initSoroban,
listStreams,
listStreamsByRecipient,
Expand Down Expand Up @@ -201,6 +203,25 @@ const mutationLimiter = rateLimit({
},
});

const CLAIMABLE_RATE_LIMIT = Number(process.env.CLAIMABLE_RATE_LIMIT ?? 30);

const claimableLimiter = rateLimit({
windowMs: 60 * 1000,
max: CLAIMABLE_RATE_LIMIT,
standardHeaders: true,
legacyHeaders: false,
handler: (req: Request, res: Response) => {
const resetTime = (req as any).rateLimit?.resetTime;
const retryAfter = resetTime
? Math.ceil((resetTime.getTime() - Date.now()) / 1000)
: 60;
res.set("Retry-After", String(Math.max(1, retryAfter)));
sendApiError(req, res, 429, "Too many requests. Please try again later.", {
code: "RATE_LIMIT_EXCEEDED",
});
},
});

app.use(cors());
app.use(requestLogger);
app.use(
Expand Down Expand Up @@ -501,13 +522,7 @@ app.get("/api/streams/:id", readLimiter, (req: Request, res: Response) => {
});
});

app.get(
"/api/recipients/:accountId/streams",
readLimiter,
(req: Request, res: Response) => {
const parsedParams = recipientAccountIdSchema.safeParse({
accountId: req.params.accountId,
});


if (!parsedParams.success) {
sendValidationError(req, res, parsedParams.error.issues);
Expand Down
136 changes: 135 additions & 1 deletion backend/src/integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,33 @@
import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest";
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import request from "supertest";
import { app } from "./index";
import { initDb, getDb } from "./services/db";
import { Keypair } from "@stellar/stellar-sdk";
import path from "path";
import fs from "fs";

const mockSimulateTransaction = vi.fn();
const mockGetLatestLedger = vi.fn();

vi.mock("@stellar/stellar-sdk", async (importOriginal) => {
const actual = await importOriginal<typeof import("@stellar/stellar-sdk")>();
return {
...actual,
rpc: {
...actual.rpc,
Server: vi.fn().mockImplementation(() => ({
getLatestLedger: mockGetLatestLedger,
simulateTransaction: mockSimulateTransaction,
prepareTransaction: vi.fn().mockImplementation((tx) => tx),
})),
Api: {
...actual.rpc.Api,
isSimulationSuccess: (response: any) => response.kind === "success",
}
},
};
});


// Use a separate test database
const TEST_DB_PATH = path.join(__dirname, "..", "data", "test-streams.db");
Expand Down Expand Up @@ -415,6 +437,118 @@ describe("Backend Integration Tests", () => {
});
});

describe("GET /api/streams/:id/claimable", () => {
beforeEach(() => {
vi.clearAllMocks();
});

it("should return 200 and the claimable amount from Soroban simulation", async () => {
mockGetLatestLedger.mockResolvedValue({
sequence: 12345,
closeTime: "1716812160",
});

mockSimulateTransaction.mockResolvedValue({
kind: "success",
result: {
retval: 450,
},
});

const response = await request(app).get(`/api/streams/${mockStream.id}/claimable`);

expect(response.status).toBe(200);
expect(response.body).toEqual({
streamId: mockStream.id,
claimableAmount: 450,
assetCode: mockStream.assetCode,
at: 1716812160,
});

expect(mockGetLatestLedger).toHaveBeenCalled();
expect(mockSimulateTransaction).toHaveBeenCalled();
});

it("should return 0 when stream is paused", async () => {
const db = getDb();
db.prepare("UPDATE streams SET paused_at = ? WHERE id = ?").run(Math.floor(Date.now() / 1000) - 1000, mockStream.id);

mockGetLatestLedger.mockResolvedValue({
sequence: 12345,
closeTime: "1716812160",
});

const response = await request(app).get(`/api/streams/${mockStream.id}/claimable`);

expect(response.status).toBe(200);
expect(response.body).toEqual({
streamId: mockStream.id,
claimableAmount: 0,
assetCode: mockStream.assetCode,
at: 1716812160,
});

expect(mockSimulateTransaction).not.toHaveBeenCalled();
});

it("should return 0 when stream is canceled", async () => {
const db = getDb();
db.prepare("UPDATE streams SET canceled_at = ? WHERE id = ?").run(Math.floor(Date.now() / 1000) - 1000, mockStream.id);

mockGetLatestLedger.mockResolvedValue({
sequence: 12345,
closeTime: "1716812160",
});

const response = await request(app).get(`/api/streams/${mockStream.id}/claimable`);

expect(response.status).toBe(200);
expect(response.body).toEqual({
streamId: mockStream.id,
claimableAmount: 0,
assetCode: mockStream.assetCode,
at: 1716812160,
});

expect(mockSimulateTransaction).not.toHaveBeenCalled();
});

it("should return 404 for non-existent stream", async () => {
const response = await request(app).get("/api/streams/999/claimable");

expect(response.status).toBe(404);
expect(response.body.error).toBe("Stream not found.");
});

it("should return 400 for invalid stream ID", async () => {
const response = await request(app).get("/api/streams/invalid-id/claimable");

expect(response.status).toBe(400);
expect(response.body.error).toContain("Stream ID must be");
});

it("should enforce rate limit of 30 requests per minute", async () => {
mockGetLatestLedger.mockResolvedValue({
sequence: 12345,
closeTime: "1716812160",
});
mockSimulateTransaction.mockResolvedValue({
kind: "success",
result: { retval: 10 },
});

for (let i = 0; i < 31; i++) {
const response = await request(app).get(`/api/streams/${mockStream.id}/claimable`);
if (i < 30) {
expect(response.status).toBe(200);
} else {
expect(response.status).toBe(429);
expect(response.body.code).toBe("RATE_LIMIT_EXCEEDED");
}
}
});
});
Comment on lines +440 to +550
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify claimable limiter definition and whether tests reset limiter state.
rg -n -C3 'claimableLimiter|rateLimit\(|/api/streams/:id/claimable|RATE_LIMIT_EXCEEDED' backend/src/index.ts backend/src/integration.test.ts
rg -n -C2 'beforeEach|afterEach|clearAllMocks|resetAllMocks' backend/src/integration.test.ts

Repository: ritik4ever/stellar-stream

Length of output: 5960


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show claimable limiter config and middleware usage
nl -ba backend/src/index.ts | sed -n '160,240p'
nl -ba backend/src/index.ts | sed -n '420,480p'

# Show test suite top-level beforeEach(s) and any rate-limit cleanup
# around the claimable describe and also earlier common beforeEach
nl -ba backend/src/integration.test.ts | sed -n '1,140p'
nl -ba backend/src/integration.test.ts | sed -n '420,620p'

# Search for any explicit reset/clear of express-rate-limit store
rg -n "claimableLimiter|express-rate-limit|rateLimit\(|reset|clear.*rate|limiter" backend/src/integration.test.ts

Repository: ritik4ever/stellar-stream

Length of output: 112


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Claimable limiter definition and route wiring
echo "### backend/src/index.ts: claimableLimiter config"
sed -n '180,220p' backend/src/index.ts
echo "### backend/src/index.ts: /claimable route wiring"
sed -n '420,465p' backend/src/index.ts

# Test suite initialization/fixtures
echo "### backend/src/integration.test.ts: imports + global hooks"
sed -n '1,120p' backend/src/integration.test.ts
echo "### backend/src/integration.test.ts: around claimable describe"
sed -n '400,590p' backend/src/integration.test.ts

# Search for limiter store reset/clear or re-instantiation
echo "### Search for express-rate-limit / limiter reset in tests"
rg -n "express-rate-limit|rateLimit\(|claimableLimiter|limiter|resetKey|reset|clear.*limiter|clear.*rate" backend/src/integration.test.ts

Repository: ritik4ever/stellar-stream

Length of output: 13170


🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -n "vi\.resetModules\(|resetModules|claimableLimiter|rateLimit" backend/src/integration.test.ts

Repository: ritik4ever/stellar-stream

Length of output: 51


Fix flaky /api/streams/:id/claimable rate-limit test caused by shared limiter state.

  • claimableLimiter is created once in backend/src/index.ts and attached as middleware for /api/streams/:id/claimable; the test only does vi.clearAllMocks(), which won’t reset the limiter’s in-memory bucket.
  • The earlier tests in the same GET /api/streams/:id/claimable block already hit the endpoint, consuming quota—so the expectation that the first 30 looped requests are always 200 (and the 31st is always 429) can fail.
  • Reset/clear the claimable limiter bucket before the rate-limit loop (e.g., re-import server/app after vi.resetModules() or expose a way to clear the limiter store), or adjust assertions to account for already-consumed quota.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/src/integration.test.ts` around lines 440 - 550, The rate-limit test
is flaky because claimableLimiter (created once in backend/src/index.ts) retains
consumed tokens across earlier tests; reset the limiter before the 31-request
loop by forcing module reload and re-creating the app so a fresh
claimableLimiter is instantiated (use vi.resetModules() then re-import the app
used by request(app)), or add and call a clear/reset function on
claimableLimiter before the loop; ensure you reference claimableLimiter and the
app instance used in the test so the limiter state is fresh when asserting 30 OK
and 1 429.


describe("GET /api/recipients/:accountId/streams", () => {
it("should get streams for a recipient", async () => {
const response = await request(app)
Expand Down
49 changes: 41 additions & 8 deletions backend/src/services/streamStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ export function calculateProgress(
stream.pausedAt !== undefined ? Math.min(at, stream.pausedAt) : at;



const ratio = Math.min(1, elapsed / stream.durationSeconds);
const vestedAmount = stream.totalAmount * ratio;

Expand All @@ -399,13 +400,46 @@ export function calculateProgress(
};
}

/**
* Syncs all on-chain streams from Soroban contract to local SQLite database.
* Fetches streams in parallel (max 5 concurrent RPC calls) with fallback to sequential.
* Updates existing streams and inserts new ones.
* @async
* @returns {Promise<void>}
*/
export async function getOnChainClaimableAmount(
id: string,
): Promise<{ claimableAmount: number; at: number }> {
const sorobanContext = getSorobanContext();
if (!sorobanContext || !rpcServer) {
throw new Error("Soroban RPC server is not initialized.");
}

const sourceAccount = await sorobanContext.sourceAccountPromise;
const latestLedger = await rpcServer.getLatestLedger();
const at = latestLedger.closeTime ? parseInt(latestLedger.closeTime, 10) : Math.floor(Date.now() / 1000);

const simRes = await simulateContractCall(
sorobanContext.contract,
sourceAccount,
"claimable",
nativeToScVal(parseInt(id), { type: "u64" }),
nativeToScVal(at, { type: "u64" }),
);

if (!rpc.Api.isSimulationSuccess(simRes) || !simRes.result) {
throw new Error("Simulation failed: " + JSON.stringify(simRes));
}

const claimableAmount = Number(scValToNative(simRes.result.retval));
return { claimableAmount, at };
}

export async function getLatestLedgerTime(): Promise<number> {
if (!rpcServer) {
return Math.floor(Date.now() / 1000);
}
try {
const latestLedger = await rpcServer.getLatestLedger();
return latestLedger.closeTime ? parseInt(latestLedger.closeTime, 10) : Math.floor(Date.now() / 1000);
} catch (e) {
return Math.floor(Date.now() / 1000);
}
}

export async function syncStreams() {
const sorobanContext = getSorobanContext();
if (!sorobanContext) return;
Expand Down Expand Up @@ -913,7 +947,6 @@ export async function cancelStream(
}



/**
* Updates the start time of a scheduled stream.
* Only scheduled streams (not yet started) can have their start time updated.
Expand Down
55 changes: 55 additions & 0 deletions backend/src/swagger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,61 @@ export const swaggerDocument = {
},
},
},
"/api/streams/{id}/claimable": {
get: {
summary: "Get real-time claimable amount",
description: "Retrieves the current real-time claimable amount for a stream using Soroban contract simulation. Returns 0 if paused, canceled, or before the cliff.",
parameters: [
{
name: "id",
in: "path",
required: true,
description: "The unique ID of the stream.",
schema: {
type: "string",
},
},
],
responses: {
"200": {
description: "Real-time claimable amount and query context.",
content: {
"application/json": {
schema: {
type: "object",
properties: {
streamId: { type: "string", example: "1" },
claimableAmount: { type: "number", example: 450.123456 },
assetCode: { type: "string", example: "USDC" },
at: { type: "integer", description: "Ledger timestamp at which query was simulated", example: 1716812160 },
},
},
},
},
},
Comment on lines +713 to +728
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Verify the claimable endpoint response structure

# Search for the claimable endpoint handler implementation
rg -nP -A 10 "get.*['\"/]api/streams/:id/claimable" --type=ts

# Also check for response construction patterns
rg -nP -C 3 "streamId.*claimableAmount.*assetCode" --type=ts

Repository: ritik4ever/stellar-stream

Length of output: 731


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show the full claimable handler block (approx) around the route definition
sed -n '420,520p' backend/src/index.ts

# Narrow search for how the response is sent within the claimable handler
rg -n "claimableLimiter|/api/streams/:id/claimable|claimable" backend/src/index.ts | head -n 50

# Find where the handler sends the 200 JSON (res.json / res.send / helper calls)
rg -n "app\.get\(\"/api/streams/:id/claimable\"|res\.json\(|res\.send\(|sendSuccess|sendApiSuccess" backend/src/index.ts -n

Repository: ritik4ever/stellar-stream

Length of output: 4438


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the swagger schema around the referenced lines
sed -n '690,760p' backend/src/swagger.ts

# Locate and inspect the helper functions that produce `at` and `claimableAmount`
rg -n "function getLatestLedgerTime|getLatestLedgerTime\s*=" backend/src -S
rg -n "function getOnChainClaimableAmount|getOnChainClaimableAmount\s*=" backend/src -S

Repository: ritik4ever/stellar-stream

Length of output: 2491


🏁 Script executed:

#!/bin/bash
set -euo pipefail

sed -n '380,520p' backend/src/services/streamStore.ts

Repository: ritik4ever/stellar-stream

Length of output: 4326


Update claimable swagger contract: structure matches implementation; add required fields

The /api/streams/{id}/claimable 200 response is correctly defined as an unwrapped root object (streamId, claimableAmount, assetCode, at) and the endpoint implementation returns the same shape (no data wrapper).
The schema still omits required; add it to make the contract explicit.

Option 2: Add required fields
                 schema: {
                   type: "object",
+                  required: ["streamId", "claimableAmount", "assetCode", "at"],
                   properties: {
                     streamId: { type: "string", example: "1" },
                     claimableAmount: { type: "number", example: 450.123456 },
                     assetCode: { type: "string", example: "USDC" },
                     at: { type: "integer", description: "Ledger timestamp at which query was simulated", example: 1716812160 },
                   },
                 },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"200": {
description: "Real-time claimable amount and query context.",
content: {
"application/json": {
schema: {
type: "object",
properties: {
streamId: { type: "string", example: "1" },
claimableAmount: { type: "number", example: 450.123456 },
assetCode: { type: "string", example: "USDC" },
at: { type: "integer", description: "Ledger timestamp at which query was simulated", example: 1716812160 },
},
},
},
},
},
"200": {
description: "Real-time claimable amount and query context.",
content: {
"application/json": {
schema: {
type: "object",
required: ["streamId", "claimableAmount", "assetCode", "at"],
properties: {
streamId: { type: "string", example: "1" },
claimableAmount: { type: "number", example: 450.123456 },
assetCode: { type: "string", example: "USDC" },
at: { type: "integer", description: "Ledger timestamp at which query was simulated", example: 1716812160 },
},
},
},
},
},
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/src/swagger.ts` around lines 713 - 728, The 200 response schema for
the /api/streams/{id}/claimable endpoint is missing a required array; update the
object schema (the response block that currently defines properties streamId,
claimableAmount, assetCode, and at) to include a required:
["streamId","claimableAmount","assetCode","at"] entry so the OpenAPI contract
explicitly marks those fields as mandatory.

"404": {
description: "Stream not found.",
content: {
"application/json": {
schema: {
$ref: "#/components/schemas/Error",
},
},
},
},
"500": {
description: "Failed to simulate claimable amount.",
content: {
"application/json": {
schema: {
$ref: "#/components/schemas/Error",
},
},
},
},
},
},
},
"/api/recipients/{accountId}/streams": {
get: {
summary: "Get recipient streams",
Expand Down