Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ coverage/

.agents/
skills-lock.json
postgres/
postgres/
CLAUDE.md
plan.md
18 changes: 2 additions & 16 deletions src/agent/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { logger } from '../utils/logger';
import { scanAllProtocols } from './scanner';
import { executeRebalanceIfNeeded, getThresholds, logAgentAction } from './router';
import { captureAllUserBalances, cleanupOldSnapshots } from './snapshotter';
import { PrismaClient } from '@prisma/client';
import db from '../db';
import {
updateAgentHeartbeat,
updateAgentStatus,
Expand All @@ -16,8 +16,6 @@ import {
recordDbOperation
} from '../utils/metrics';

const prisma = new PrismaClient();

let isRunning = false;
let lastRebalanceAt: Date | null = null;
let currentProtocol: string | null = null;
Expand Down Expand Up @@ -75,7 +73,7 @@ async function rebalanceCheckJob(): Promise<void> {
updateAgentHeartbeat();

// Get all active positions
const positions = await prisma.position.findMany({
const positions = await db.position.findMany({
where: {
status: 'ACTIVE',
},
Expand Down Expand Up @@ -295,9 +293,6 @@ export async function stopAgentLoop(): Promise<void> {
});
cronJobs.length = 0;

// Close database connection
await prisma.$disconnect();

isRunning = false;
logger.info('✅ Agent loop stopped gracefully');
} catch (error) {
Expand All @@ -311,15 +306,6 @@ export async function stopAgentLoop(): Promise<void> {
* Setup graceful shutdown handlers
*/
function setupGracefulShutdown(): void {
const shutdown = async (signal: string) => {
logger.info(`Received ${signal}, shutting down gracefully...`);
await stopAgentLoop();
process.exit(0);
};

process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));

// Handle uncaught exceptions
process.on('uncaughtException', error => {
logger.error('Uncaught exception in agent', {
Expand Down
12 changes: 5 additions & 7 deletions src/agent/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@
* Router - Compares APYs and triggers rebalancing when conditions are met
*/

import { PrismaClient } from '@prisma/client';
import { logger } from '../utils/logger';
import { ProtocolComparison, RebalanceDetails, RebalanceThresholds } from './types';
import { scanAllProtocols, getCurrentOnChainApy } from './scanner';
import { triggerRebalance as submitRebalance } from '../stellar/contract';

const prisma = new PrismaClient();
import db from '../db';

const DEFAULT_THRESHOLDS: RebalanceThresholds = {
minimumImprovement: 0.5, // Must improve by at least 0.5%
Expand Down Expand Up @@ -153,7 +151,7 @@ export async function triggerRebalance(
);

if (positionIds.length > 0) {
const representativePosition = await prisma.position.findFirst({
const representativePosition = await db.position.findFirst({
where: {
id: { in: positionIds },
},
Expand All @@ -167,7 +165,7 @@ export async function triggerRebalance(
});

if (representativePosition) {
await prisma.transaction.create({
await db.transaction.create({
data: {
userId: representativePosition.userId,
positionId: representativePosition.id,
Expand Down Expand Up @@ -290,7 +288,7 @@ export async function logAgentAction(
): Promise<void> {
try {
// Log to all users for now - in production, could be per-user
const users = await prisma.user.findMany({
const users = await db.user.findMany({
select: { id: true },
take: 1, // For now, just log to first user
});
Expand All @@ -302,7 +300,7 @@ export async function logAgentAction(

const userId = users[0].id;

await prisma.agentLog.create({
await db.agentLog.create({
data: {
userId,
action: action as any,
Expand Down
8 changes: 3 additions & 5 deletions src/agent/scanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

import { logger } from '../utils/logger';
import { YieldProtocol, ProtocolRate } from './types';
import { PrismaClient } from '@prisma/client';

const prisma = new PrismaClient();
import db from '../db';

const PROTOCOLS = ['Blend', 'Stellar DEX', 'Luma'];
const ASSET_SYMBOL = 'USDC';
Expand Down Expand Up @@ -161,7 +159,7 @@ async function saveProtocolRates(protocols: YieldProtocol[]): Promise<void> {
const networkLabel = normalizeNetwork()

for (const protocol of protocols) {
await prisma.protocolRate.create({
await db.protocolRate.create({
data: {
protocolName: protocol.name,
assetSymbol: protocol.assetSymbol,
Expand All @@ -185,7 +183,7 @@ async function saveProtocolRates(protocols: YieldProtocol[]): Promise<void> {
*/
export async function getCurrentOnChainApy(protocolName: string): Promise<number | null> {
try {
const latestRate = await prisma.protocolRate.findFirst({
const latestRate = await db.protocolRate.findFirst({
where: {
protocolName,
assetSymbol: ASSET_SYMBOL,
Expand Down
14 changes: 6 additions & 8 deletions src/agent/snapshotter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,17 @@
* Snapshotter - Captures user balance snapshots for historical charting
*/

import { PrismaClient } from '@prisma/client';
import { logger } from '../utils/logger';
import { UserBalance } from './types';

const prisma = new PrismaClient();
import db from '../db';

/**
* Capture all user balance snapshots
* Runs non-blocking to avoid delaying rebalance checks
*/
export async function captureAllUserBalances(): Promise<void> {
try {
const positions = await prisma.position.findMany({
const positions = await db.position.findMany({
where: {
status: 'ACTIVE',
},
Expand Down Expand Up @@ -56,7 +54,7 @@ export async function captureAllUserBalances(): Promise<void> {

// Single batch insert is much faster than individual creates
if (snapshotData.length > 0) {
await prisma.yieldSnapshot.createMany({
await db.yieldSnapshot.createMany({
data: snapshotData,
skipDuplicates: false,
});
Expand Down Expand Up @@ -103,7 +101,7 @@ export async function getPositionHistory(
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - days);

const snapshots = await prisma.yieldSnapshot.findMany({
const snapshots = await db.yieldSnapshot.findMany({
where: {
positionId,
snapshotAt: {
Expand Down Expand Up @@ -151,7 +149,7 @@ export async function cleanupOldSnapshots(retentionDays: number = 90): Promise<v
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - retentionDays);

const deleted = await prisma.yieldSnapshot.deleteMany({
const deleted = await db.yieldSnapshot.deleteMany({
where: {
snapshotAt: {
lt: cutoffDate,
Expand All @@ -177,7 +175,7 @@ export async function cleanupOldSnapshots(retentionDays: number = 90): Promise<v
*/
export async function getLatestUserBalance(positionId: string): Promise<UserBalance | null> {
try {
const snapshot = await prisma.yieldSnapshot.findFirst({
const snapshot = await db.yieldSnapshot.findFirst({
where: {
positionId,
},
Expand Down
6 changes: 2 additions & 4 deletions src/jobs/sessionCleanup.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import { PrismaClient } from '@prisma/client';
import db from '../db';
import { logger } from '../utils/logger';
import { config } from '../config/env';

const prisma = new PrismaClient();

/**
* Delete all sessions whose expiration timestamp is in the past.
* Safe to call multiple times — it is idempotent.
*/
export async function cleanupExpiredSessions(): Promise<void> {
try {
const result = await prisma.session.deleteMany({
const result = await db.session.deleteMany({
where: { expiresAt: { lt: new Date() } },
});
if (result.count > 0) {
Expand Down
13 changes: 5 additions & 8 deletions src/stellar/__tests__/events.helpers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ jest.mock('@stellar/stellar-sdk', () => {
};
});

// Avoid spinning a real PrismaClient on import.
jest.mock('@prisma/client', () => {
const enums = jest.requireActual('@prisma/client');
return {
...enums,
PrismaClient: jest.fn().mockImplementation(() => ({})),
};
});
// Keep enum values from @prisma/client; no PrismaClient instantiation needed.
jest.mock('@prisma/client', () => jest.requireActual('@prisma/client'));

// Prevent the db singleton from opening a real connection.
jest.mock('../../db', () => ({ default: {} }));

// Avoid the dlq module pulling in a Prisma connection too.
jest.mock('../dlq', () => ({
Expand Down
23 changes: 11 additions & 12 deletions src/stellar/events.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { rpc, scValToNative, xdr } from '@stellar/stellar-sdk';
import { PrismaClient, TransactionType, TransactionStatus, Network } from '@prisma/client';
import { TransactionType, TransactionStatus, Network } from '@prisma/client';
import db from '../db';
import { Decimal } from '@prisma/client/runtime/library';
import { getRpcServer } from './client';
import { ContractEvent, DepositEvent, WithdrawEvent, RebalanceEvent, EventMetrics } from './types';
Expand All @@ -25,8 +26,6 @@ import {
const VAULT_CONTRACT_ID = config.stellar.vaultContractId;
const POLL_INTERVAL_MS = 5000;

const prisma = new PrismaClient();

let lastProcessedLedger = 0;
let isListening = false;

Expand Down Expand Up @@ -201,7 +200,7 @@ function parseRebalanceEvent(event: ContractEvent): RebalanceEvent {
/**
* Handle deposit event - persist to database
*/
async function handleDepositEvent(depositData: DepositEvent, event: ContractEvent, tx: any = prisma): Promise<void> {
async function handleDepositEvent(depositData: DepositEvent, event: ContractEvent, tx: any = db): Promise<void> {
const user = await timedDbOperation(() =>
tx.user.findUnique({ where: { walletAddress: depositData.user } })
) as any;
Expand Down Expand Up @@ -270,7 +269,7 @@ async function handleDepositEvent(depositData: DepositEvent, event: ContractEven
/**
* Handle withdraw event - persist to database
*/
async function handleWithdrawEvent(withdrawData: WithdrawEvent, event: ContractEvent, tx: any = prisma): Promise<void> {
async function handleWithdrawEvent(withdrawData: WithdrawEvent, event: ContractEvent, tx: any = db): Promise<void> {
const user = await timedDbOperation(() =>
tx.user.findUnique({ where: { walletAddress: withdrawData.user } })
) as any;
Expand Down Expand Up @@ -322,7 +321,7 @@ async function handleWithdrawEvent(withdrawData: WithdrawEvent, event: ContractE
/**
* Handle rebalance event - persist to database
*/
async function handleRebalanceEvent(rebalanceData: RebalanceEvent, event: ContractEvent, tx: any = prisma): Promise<void> {
async function handleRebalanceEvent(rebalanceData: RebalanceEvent, event: ContractEvent, tx: any = db): Promise<void> {
await timedDbOperation(() =>
tx.protocolRate.create({
data: {
Expand All @@ -341,7 +340,7 @@ async function handleRebalanceEvent(rebalanceData: RebalanceEvent, event: Contra
/**
* Handle contract event with persistence, idempotency, and validation (Issue #53)
*/
export async function handleEvent(event: ContractEvent, tx: any = prisma): Promise<void> {
export async function handleEvent(event: ContractEvent, tx: any = db): Promise<void> {
const startTime = Date.now();
try {
logger.info(`[Event] ${event.type} detected at ledger ${event.ledger}, tx: ${event.txHash}`);
Expand Down Expand Up @@ -437,7 +436,7 @@ export async function processEventBatch(events: ContractEvent[]): Promise<void>

try {
// Multiple events processed in a single transaction
await prisma.$transaction(async (tx) => {
await db.$transaction(async (tx) => {
for (const event of events) {
await handleEvent(event, tx);
processedCount++;
Expand All @@ -450,7 +449,7 @@ export async function processEventBatch(events: ContractEvent[]): Promise<void>
// Fallback: Process individually so robust events succeed
for (const event of events) {
try {
await handleEvent(event, prisma);
await handleEvent(event, db);
} catch (individualError) {
logger.error(`[Batch Fallback Error] Event processing completely failed for ${event.txHash}`);
}
Expand All @@ -462,7 +461,7 @@ export async function processEventBatch(events: ContractEvent[]): Promise<void>
* Load last processed ledger from database
*/
async function loadLastProcessedLedger(): Promise<number> {
const cursor = await prisma.eventCursor.findUnique({
const cursor = await db.eventCursor.findUnique({
where: { contractId: VAULT_CONTRACT_ID },
});

Expand All @@ -483,7 +482,7 @@ async function loadLastProcessedLedger(): Promise<number> {
* Update last processed ledger in database
*/
async function persistLastProcessedLedger(ledger: number): Promise<void> {
await prisma.eventCursor.upsert({
await db.eventCursor.upsert({
where: { contractId: VAULT_CONTRACT_ID },
update: {
lastProcessedLedger: ledger,
Expand Down Expand Up @@ -605,7 +604,7 @@ export async function backfillEvents(startLedger: number, endLedger?: number): P
export async function retryDeadLetterEvents(): Promise<void> {
logger.info(`[DLQ] Starting manual intervention retry for all DLQ events`);
await DeadLetterQueue.retryAll(async (eventPayload) => {
await handleEvent(eventPayload, prisma);
await handleEvent(eventPayload, db);
});
}

Expand Down
3 changes: 3 additions & 0 deletions tests/unit/stellar/events.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ jest.mock('@prisma/client', () => {
};
});

// Point the db singleton at the mock so events.ts uses it instead of a real connection.
jest.mock('../../../src/db', () => ({ default: mockPrisma }));

jest.mock('../../../src/stellar/client');
jest.mock('../../../src/utils/logger');
jest.mock('../../../src/config', () => ({
Expand Down
Loading