From 1926a31d879a3a49fc3ff1344b98fed58309cd4f Mon Sep 17 00:00:00 2001 From: Larry Date: Wed, 11 Mar 2026 14:26:57 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20add=20agent=20backend=20=E2=80=94=20ind?= =?UTF-8?q?exer,=20API=20endpoints,=20DB=20queries=20(#102)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add agent event processor (indexer/agent.ts) to sync AgentRegistry contract events (register, revoke, fund, session key issue/revoke) - Integrate agent processor into indexer pipeline (parallel with token/contract) - Add 5 new DB-backed API endpoints: stats, dashboard, transactions, session-keys, spending analytics - Endpoints use cached() + getReadPool() for read-replica support Co-Authored-By: Claude Opus 4.6 --- src/indexer/agent.ts | 243 +++++++++++++++++++++++++++++++++++++++ src/indexer/index.ts | 6 +- src/routes/agents.ts | 263 ++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 510 insertions(+), 2 deletions(-) create mode 100644 src/indexer/agent.ts diff --git a/src/indexer/agent.ts b/src/indexer/agent.ts new file mode 100644 index 0000000..62e47c0 --- /dev/null +++ b/src/indexer/agent.ts @@ -0,0 +1,243 @@ +import { getPool } from '../db/pool.js'; +import type { BlockResult } from './block.js'; +import { parseAddressFromTopic, hexToBigIntString } from './utils.js'; + +const AGENT_REGISTRY = '0x7791dfa4d489f3d524708cbc0caa8689b76322b3'; + +// Event topic hashes (keccak256) +// AgentRegistered(string agentId, address indexed owner, address agentAddress) +const AGENT_REGISTERED = + '0xd1bf50919b349548463604b43b8d3783b23a88dbf02737cb5ef0159d3ebdde4f'; +// AgentRevoked(string agentId, address indexed owner) +const AGENT_REVOKED = + '0x3a3f387499c8b0bde40db7b1c33d04cacb5677b1964c5f0baa8ab450c1d4de05'; +// AgentFunded(string agentId, address indexed funder, uint256 amount) +const AGENT_FUNDED = + '0x0e3727596461ef354755ab0d45f429e6e3a87756bf81f0e8a0d6d6e082b2d4e2'; +// SessionKeyIssued(string agentId, address indexed keyAddress, address indexed owner, uint256 expiresAt) +const SESSION_KEY_ISSUED = + '0x5c6e3f1a25c687f8d7f68d3e3e3fb09b28a43da66c1c0e1bb8e1c9fb7aa6b47d'; +// SessionKeyRevoked(string agentId, address indexed keyAddress, address indexed owner) +const SESSION_KEY_REVOKED = + '0xa36d12e3be5a35ce5db5c9c3f7a4c5f8b50d0d7a3f4e28a0c12b4e4e7d6a19c3'; + +/** + * Decode a dynamic string from ABI-encoded event data. + * Assumes the string is the first dynamic param starting at the given byte offset. + */ +function decodeStringFromEventData(data: string, wordIndex: number): string { + const d = data.startsWith('0x') ? data.slice(2) : data; + // Read offset pointer at wordIndex + const offsetHex = d.slice(wordIndex * 64, (wordIndex + 1) * 64); + const byteOffset = Number(BigInt('0x' + offsetHex)); + // String length + const lenStart = byteOffset * 2; + const strLen = Number(BigInt('0x' + d.slice(lenStart, lenStart + 64))); + if (strLen === 0) return ''; + const strHex = d.slice(lenStart + 64, lenStart + 64 + strLen * 2); + return Buffer.from(strHex, 'hex').toString('utf8'); +} + +function decodeUint256FromWord(data: string, wordIndex: number): string { + const d = data.startsWith('0x') ? data.slice(2) : data; + return BigInt('0x' + d.slice(wordIndex * 64, (wordIndex + 1) * 64)).toString(); +} + +function decodeAddressFromWord(data: string, wordIndex: number): string { + const d = data.startsWith('0x') ? data.slice(2) : data; + return '0x' + d.slice(wordIndex * 64 + 24, (wordIndex + 1) * 64).toLowerCase(); +} + +/** + * Process agent-related events from block receipts and sync to DB. + */ +export async function processAgentEvents(result: BlockResult): Promise { + const { receipts, height, block } = result; + const registryAddr = AGENT_REGISTRY.toLowerCase(); + const blockHeight = height.toString(10); + const blockTimestamp = hexToBigIntString(block.timestamp) ?? '0'; + + // Collect agent events from receipts + type AgentEvent = { + topic: string; + data: string; + topics: string[]; + txHash: string; + from: string; + to: string | null | undefined; + value: string; + status: string; + }; + + const events: AgentEvent[] = []; + + for (const receipt of receipts) { + for (const log of receipt.logs) { + if (log.address.toLowerCase() !== registryAddr) continue; + const topic0 = (log.topics[0] ?? '').toLowerCase(); + if ( + topic0 === AGENT_REGISTERED || + topic0 === AGENT_REVOKED || + topic0 === AGENT_FUNDED || + topic0 === SESSION_KEY_ISSUED || + topic0 === SESSION_KEY_REVOKED + ) { + events.push({ + topic: topic0, + data: log.data, + topics: log.topics, + txHash: receipt.transactionHash, + from: receipt.from, + to: receipt.to, + value: hexToBigIntString( + // Get value from the matching transaction + result.txs.find((tx) => tx.hash === receipt.transactionHash)?.value ?? '0x0' + ) ?? '0', + status: receipt.status, + }); + } + } + } + + if (events.length === 0) return; + + const pool = getPool(); + const client = await pool.connect(); + try { + await client.query('BEGIN'); + + for (const ev of events) { + if (ev.topic === AGENT_REGISTERED) { + // data: (string agentId, address agentAddress) — agentId is dynamic, agentAddress at word 1 + const agentId = decodeStringFromEventData(ev.data, 0); + const agentAddress = decodeAddressFromWord(ev.data, 1); + const owner = parseAddressFromTopic(ev.topics[1] ?? '') ?? ''; + + await client.query( + `INSERT INTO agents (agent_id, owner, agent_address, registered_at, active) + VALUES ($1, $2, $3, $4, true) + ON CONFLICT (agent_id) DO UPDATE SET + owner = EXCLUDED.owner, + agent_address = EXCLUDED.agent_address, + registered_at = EXCLUDED.registered_at, + active = true, + updated_at = NOW()`, + [agentId, owner.toLowerCase(), agentAddress, blockTimestamp] + ); + + // Record the registration transaction + await client.query( + `INSERT INTO agent_transactions (agent_id, tx_hash, block_height, from_addr, to_addr, value, method, status, timestamp) + VALUES ($1, $2, $3, $4, $5, $6, 'registerAgent', $7, $8) + ON CONFLICT DO NOTHING`, + [agentId, ev.txHash, blockHeight, ev.from.toLowerCase(), ev.to?.toLowerCase() ?? null, ev.value, parseInt(ev.status, 16), blockTimestamp] + ); + } + + if (ev.topic === AGENT_REVOKED) { + const agentId = decodeStringFromEventData(ev.data, 0); + + await client.query( + `UPDATE agents SET active = false, updated_at = NOW() WHERE agent_id = $1`, + [agentId] + ); + + await client.query( + `INSERT INTO agent_transactions (agent_id, tx_hash, block_height, from_addr, to_addr, value, method, status, timestamp) + VALUES ($1, $2, $3, $4, $5, '0', 'revokeAgent', $6, $7) + ON CONFLICT DO NOTHING`, + [agentId, ev.txHash, blockHeight, ev.from.toLowerCase(), ev.to?.toLowerCase() ?? null, parseInt(ev.status, 16), blockTimestamp] + ); + } + + if (ev.topic === AGENT_FUNDED) { + const agentId = decodeStringFromEventData(ev.data, 0); + // amount is at word index after the string offset — need to figure layout + // AgentFunded(string agentId, address indexed funder, uint256 amount) + // data = (string agentId, uint256 amount) — funder is indexed (in topics[1]) + const amount = decodeUint256FromWord(ev.data, 1); + + // Update agent deposit + await client.query( + `UPDATE agents SET + deposit = (CAST(deposit AS NUMERIC) + $2::NUMERIC)::TEXT, + updated_at = NOW() + WHERE agent_id = $1`, + [agentId, amount] + ); + + await client.query( + `INSERT INTO agent_transactions (agent_id, tx_hash, block_height, from_addr, to_addr, value, method, status, timestamp) + VALUES ($1, $2, $3, $4, $5, $6, 'fundAgent', $7, $8) + ON CONFLICT DO NOTHING`, + [agentId, ev.txHash, blockHeight, ev.from.toLowerCase(), ev.to?.toLowerCase() ?? null, amount, parseInt(ev.status, 16), blockTimestamp] + ); + + // Update daily spending record + const day = new Date(Number(blockTimestamp) * 1000).toISOString().slice(0, 10); + await client.query( + `INSERT INTO agent_spending (agent_id, date, amount, tx_count) + VALUES ($1, $2, $3, 1) + ON CONFLICT (agent_id, date) DO UPDATE SET + amount = (CAST(agent_spending.amount AS NUMERIC) + $3::NUMERIC)::TEXT, + tx_count = agent_spending.tx_count + 1`, + [agentId, day, amount] + ); + } + + if (ev.topic === SESSION_KEY_ISSUED) { + // SessionKeyIssued(string agentId, address indexed keyAddress, address indexed owner, uint256 expiresAt) + // data = (string agentId, uint256 expiresAt) + const agentId = decodeStringFromEventData(ev.data, 0); + const expiresAt = decodeUint256FromWord(ev.data, 1); + const keyAddress = parseAddressFromTopic(ev.topics[1] ?? '') ?? ''; + const owner = parseAddressFromTopic(ev.topics[2] ?? '') ?? ''; + + await client.query( + `INSERT INTO session_keys (key_address, agent_id, owner, expires_at, revoked) + VALUES ($1, $2, $3, $4, false) + ON CONFLICT (key_address) DO UPDATE SET + agent_id = EXCLUDED.agent_id, + owner = EXCLUDED.owner, + expires_at = EXCLUDED.expires_at, + revoked = false`, + [keyAddress.toLowerCase(), agentId, owner.toLowerCase(), expiresAt] + ); + + await client.query( + `INSERT INTO agent_transactions (agent_id, tx_hash, block_height, from_addr, to_addr, value, method, status, timestamp) + VALUES ($1, $2, $3, $4, $5, '0', 'issueSessionKey', $6, $7) + ON CONFLICT DO NOTHING`, + [agentId, ev.txHash, blockHeight, ev.from.toLowerCase(), ev.to?.toLowerCase() ?? null, parseInt(ev.status, 16), blockTimestamp] + ); + } + + if (ev.topic === SESSION_KEY_REVOKED) { + // SessionKeyRevoked(string agentId, address indexed keyAddress, address indexed owner) + // data = (string agentId) + const agentId = decodeStringFromEventData(ev.data, 0); + const keyAddress = parseAddressFromTopic(ev.topics[1] ?? '') ?? ''; + + await client.query( + `UPDATE session_keys SET revoked = true WHERE key_address = $1`, + [keyAddress.toLowerCase()] + ); + + await client.query( + `INSERT INTO agent_transactions (agent_id, tx_hash, block_height, from_addr, to_addr, value, method, status, timestamp) + VALUES ($1, $2, $3, $4, $5, '0', 'revokeSessionKey', $6, $7) + ON CONFLICT DO NOTHING`, + [agentId, ev.txHash, blockHeight, ev.from.toLowerCase(), ev.to?.toLowerCase() ?? null, parseInt(ev.status, 16), blockTimestamp] + ); + } + } + + await client.query('COMMIT'); + console.log(`[Agent] Processed ${events.length} agent events at block ${blockHeight}`); + } catch (error) { + await client.query('ROLLBACK'); + throw error; + } finally { + client.release(); + } +} diff --git a/src/indexer/index.ts b/src/indexer/index.ts index a0ba37e..9e4ff0d 100644 --- a/src/indexer/index.ts +++ b/src/indexer/index.ts @@ -3,6 +3,7 @@ import { RpcClient } from './rpc.js'; import { parseHeight, processBlock } from './block.js'; import { processTokenTransfers } from './token.js'; import { processContracts } from './contract.js'; +import { processAgentEvents } from './agent.js'; import { processInternalTxs } from './internal-tx.js'; import { getLastProcessedHeight, setLastProcessedHeight, setLastBatchStats, @@ -84,11 +85,14 @@ async function indexBlock(rpc: RpcClient, height: bigint): Promise { endStage1(); if (!result) { endBlock(); return 0; } - // Step 2 & 3: Token + Contract (independent, run in parallel) + // Step 2 & 3: Token + Contract + Agent events (independent, run in parallel) const endStage2 = pipelineStageDuration.startTimer({ stage: 'token_contract' }); await Promise.all([ processTokenTransfers(rpc, result), processContracts(rpc, result), + processAgentEvents(result).catch((e) => + console.warn(`Agent event processing failed for block ${height}:`, e.message) + ), ]); endStage2(); diff --git a/src/routes/agents.ts b/src/routes/agents.ts index 270c92a..c5cd28b 100644 --- a/src/routes/agents.ts +++ b/src/routes/agents.ts @@ -1,6 +1,8 @@ import { FastifyInstance } from 'fastify'; import { rpcCall, rpcCallSafe } from '../lib/rpc.js'; -import { clamp, parseNumber } from '../lib/pagination.js'; +import { clamp, parseNumber, parseOrder } from '../lib/pagination.js'; +import { getReadPool } from '../db/pool.js'; +import { cached } from '../lib/cache.js'; const AGENT_REGISTRY = '0x7791dfa4d489f3d524708cbc0caa8689b76322b3'; @@ -233,6 +235,116 @@ export default async function agentsRoutes(app: FastifyInstance) { } }); + // GET /agents/stats — aggregate stats + app.get('/stats', async () => { + const data = await cached('agents:stats', 30, async () => { + const pool = getReadPool(); + const result = await pool.query(` + SELECT + COUNT(*)::int AS total_agents, + COUNT(*) FILTER (WHERE active)::int AS active_agents, + COALESCE(SUM(CAST(deposit AS NUMERIC)), 0)::text AS total_deposit, + COALESCE(SUM(CAST(spent_today AS NUMERIC)), 0)::text AS total_spent_today + FROM agents + `); + const row = result.rows[0]; + return { + totalAgents: row?.total_agents ?? 0, + activeAgents: row?.active_agents ?? 0, + totalDeposit: row?.total_deposit ?? '0', + totalSpentToday: row?.total_spent_today ?? '0', + }; + }); + return { ok: true, data }; + }); + + // GET /agents/dashboard — operator dashboard with alerts and spending trend + app.get('/dashboard', async (request) => { + const q = request.query as Record; + const ownerFilter = q.owner?.toLowerCase(); + + const data = await cached(`agents:dashboard:${ownerFilter ?? 'all'}`, 15, async () => { + const pool = getReadPool(); + + // Agents + const agentsQuery = ownerFilter + ? pool.query('SELECT * FROM agents WHERE owner = $1 ORDER BY registered_at DESC', [ownerFilter]) + : pool.query('SELECT * FROM agents ORDER BY registered_at DESC LIMIT 100'); + const agentsResult = await agentsQuery; + const agents = agentsResult.rows.map(formatAgentRow); + + // Stats + let totalDeposit = 0n; + let totalSpentToday = 0n; + let activeCount = 0; + for (const a of agents) { + try { totalDeposit += BigInt(a.deposit); } catch { /* skip */ } + try { totalSpentToday += BigInt(a.spentToday); } catch { /* skip */ } + if (a.active) activeCount++; + } + + // Alerts — agents at risk + const alerts: Array<{ agentId: string; type: string; message: string; timestamp: string }> = []; + for (const a of agents) { + const spent = BigInt(a.spentToday || '0'); + const limit = BigInt(a.dailyLimit || '0'); + if (limit > 0n && spent >= limit) { + alerts.push({ agentId: a.agentId, type: 'limit_reached', message: 'Daily spending limit reached', timestamp: a.registeredAt }); + } else if (limit > 0n && spent * 100n / limit >= 90n) { + alerts.push({ agentId: a.agentId, type: 'high_spend', message: `Spent ${(Number(spent * 100n / limit))}% of daily limit`, timestamp: a.registeredAt }); + } + if (!a.active) { + alerts.push({ agentId: a.agentId, type: 'revoked', message: 'Agent has been revoked', timestamp: a.registeredAt }); + } + } + + // Expiring session keys + const now = Math.floor(Date.now() / 1000); + const soonExpiry = now + 3600; // 1 hour + const expiringKeys = await pool.query( + `SELECT agent_id, expires_at FROM session_keys + WHERE NOT revoked AND CAST(expires_at AS BIGINT) > $1 AND CAST(expires_at AS BIGINT) < $2`, + [now.toString(), soonExpiry.toString()] + ); + for (const k of expiringKeys.rows) { + alerts.push({ + agentId: k.agent_id, + type: 'key_expiring', + message: 'Session key expiring within 1 hour', + timestamp: k.expires_at, + }); + } + + // Spending trend (last 7 days) + const spendingResult = await pool.query( + `SELECT date, SUM(CAST(amount AS NUMERIC))::text AS amount + FROM agent_spending + WHERE date >= (CURRENT_DATE - INTERVAL '7 days')::text + ${ownerFilter ? 'AND agent_id IN (SELECT agent_id FROM agents WHERE owner = $1)' : ''} + GROUP BY date ORDER BY date ASC`, + ownerFilter ? [ownerFilter] : [] + ); + const spendingTrend = spendingResult.rows.map((r: { date: string; amount: string }) => ({ + date: r.date, + amount: r.amount, + })); + + return { + stats: { + totalAgents: agents.length, + activeAgents: activeCount, + totalDeposit: totalDeposit.toString(), + totalSpentToday: totalSpentToday.toString(), + }, + agents, + alerts, + spendingTrend, + }; + }); + + return { ok: true, data }; + }); + // GET /agents/owner/:address — list agents by owner app.get('/owner/:address', async (request, reply) => { const { address } = request.params as { address: string }; @@ -261,4 +373,153 @@ export default async function agentsRoutes(app: FastifyInstance) { return { ok: false, error: 'Failed to decode agent list' }; } }); + + // GET /agents/:agentId/transactions — agent transaction history + app.get('/:agentId/transactions', async (request) => { + const { agentId } = request.params as { agentId: string }; + const q = request.query as Record; + const page = parseNumber(q.page, 1); + const limit = clamp(parseNumber(q.limit, 25), 1, 100); + const offset = (page - 1) * limit; + + const data = await cached(`agents:txs:${agentId}:${page}:${limit}`, 15, async () => { + const pool = getReadPool(); + const countResult = await pool.query( + 'SELECT COUNT(*)::int AS total FROM agent_transactions WHERE agent_id = $1', + [agentId] + ); + const total = countResult.rows[0]?.total ?? 0; + + const txResult = await pool.query( + `SELECT tx_hash, from_addr, to_addr, value, status, timestamp, method + FROM agent_transactions WHERE agent_id = $1 + ORDER BY timestamp DESC LIMIT $2 OFFSET $3`, + [agentId, limit, offset] + ); + + return { + total, + items: txResult.rows.map((r: Record) => ({ + hash: r.tx_hash, + from: r.from_addr, + to: r.to_addr, + value: r.value, + status: r.status, + timestamp: r.timestamp, + method: r.method, + })), + }; + }); + + return { ok: true, data }; + }); + + // GET /agents/:agentId/session-keys — session keys for a specific agent + app.get('/:agentId/session-keys', async (request) => { + const { agentId } = request.params as { agentId: string }; + + const data = await cached(`agents:keys:${agentId}`, 15, async () => { + const pool = getReadPool(); + const result = await pool.query( + `SELECT key_address, agent_id, owner, expires_at, revoked, permissions, last_activity_at, created_at + FROM session_keys WHERE agent_id = $1 + ORDER BY created_at DESC`, + [agentId] + ); + + const now = Math.floor(Date.now() / 1000); + const items = result.rows.map((r: Record) => { + const expiresAt = Number(r.expires_at) || 0; + const revoked = r.revoked as boolean; + let status: string; + if (revoked) status = 'revoked'; + else if (expiresAt > 0 && expiresAt < now) status = 'expired'; + else status = 'valid'; + + const perms = (r.permissions as number[]) || []; + return { + keyAddress: r.key_address as string, + agentId: r.agent_id as string, + owner: r.owner as string, + status, + permissions: perms, + permissionLabels: perms.map(permissionLabel), + createdAt: String(Math.floor(new Date(r.created_at as string).getTime() / 1000)), + expiresAt: String(expiresAt), + lastActivityAt: r.last_activity_at ? String(r.last_activity_at) : null, + }; + }); + + return { total: items.length, items }; + }); + + return { ok: true, data }; + }); + + // GET /agents/:agentId/spending — spending analytics + app.get('/:agentId/spending', async (request) => { + const { agentId } = request.params as { agentId: string }; + const q = request.query as Record; + const days = clamp(parseNumber(q.days, 30), 1, 365); + + const data = await cached(`agents:spending:${agentId}:${days}`, 60, async () => { + const pool = getReadPool(); + const result = await pool.query( + `SELECT date, amount, tx_count + FROM agent_spending WHERE agent_id = $1 + AND date >= (CURRENT_DATE - $2 * INTERVAL '1 day')::text + ORDER BY date ASC`, + [agentId, days] + ); + + let totalSpent = 0n; + let totalTxs = 0; + const daily = result.rows.map((r: Record) => { + try { totalSpent += BigInt(r.amount as string); } catch { /* skip */ } + totalTxs += Number(r.tx_count) || 0; + return { date: r.date, amount: String(r.amount), txCount: Number(r.tx_count) }; + }); + + return { + agentId, + period: `${days}d`, + totalSpent: totalSpent.toString(), + totalTransactions: totalTxs, + daily, + }; + }); + + return { ok: true, data }; + }); +} + +// ---- Helpers ---- + +const PERMISSION_LABELS: Record = { + 0: 'InferenceSubmit', + 1: 'Transfer', + 2: 'StakeDelegate', + 3: 'QueryOnly', +}; + +function permissionLabel(perm: number): string { + return PERMISSION_LABELS[perm] ?? `Permission(${perm})`; +} + +function formatAgentRow(row: Record): AgentAccount & { permissionLabels: string[] } { + const perms = (row.permissions as number[]) || []; + return { + agentId: row.agent_id as string, + owner: row.owner as string, + agentAddress: row.agent_address as string, + permissions: perms, + permissionLabels: perms.map(permissionLabel), + dailyLimit: String(row.daily_limit ?? '0'), + maxPerTx: String(row.max_per_tx ?? '0'), + deposit: String(row.deposit ?? '0'), + spentToday: String(row.spent_today ?? '0'), + lastSpendDay: String(row.last_spend_day ?? '0'), + registeredAt: String(row.registered_at ?? '0'), + active: row.active as boolean, + }; }