diff --git a/src/tq_oracle/adapters/asset_adapters/stakewise.py b/src/tq_oracle/adapters/asset_adapters/stakewise.py index 9bae315..572d607 100644 --- a/src/tq_oracle/adapters/asset_adapters/stakewise.py +++ b/src/tq_oracle/adapters/asset_adapters/stakewise.py @@ -13,6 +13,7 @@ from web3.types import EventData from ...abi import fetch_subvault_addresses, load_stakewise_vault_abi +from ...clients.etherscan_logs import EtherscanLogsClient from ...constants import ( STAKEWISE_ADDRESSES, STAKEWISE_EXIT_LOG_CHUNK, @@ -128,6 +129,16 @@ def __init__( self._rpc_jitter = getattr(config, "rpc_jitter", 0.10) self._block_timestamp_cache: dict[int, int] = {} + # Optional Etherscan client for faster log queries + self._etherscan_client: EtherscanLogsClient | None = None + if adapter_config.etherscan_api_key: + self._etherscan_client = EtherscanLogsClient( + api_key=adapter_config.etherscan_api_key, + chain_id=config.chain_id, + page_size=adapter_config.etherscan_page_size, + ) + logger.debug("StakeWise Etherscan client enabled for log queries") + extra_address_candidates = [ self.w3.to_checksum_address(addr) for addr in adapter_config.extra_addresses @@ -352,49 +363,61 @@ async def _scan_exit_queue_tickets( return [] tickets: dict[int, ExitQueueTicket] = {} + + # Try Etherscan first (one-shot from block 0, no chunking needed) + if self._etherscan_client: + logger.debug( + "StakeWise exit queue scan via Etherscan — vault=%s user=%s", + context.address, + user, + ) + for event in context.exit_events: + logs = await self._get_exit_logs_etherscan( + event, user, 0, self.block_identifier, vault_address=context.address + ) + if logs is not None: + await self._process_exit_logs(logs, tickets) + else: + # Etherscan failed, fall back to RPC for everything + tickets.clear() + return await self._scan_exit_queue_tickets_rpc(context, user) + else: + return await self._scan_exit_queue_tickets_rpc(context, user) + + ordered = sorted(tickets.values(), key=lambda t: (t.block_number, t.log_index)) + logger.info( + "StakeWise exit queue scan completed (Etherscan) — vault=%s user=%s tickets=%d", + context.address, + user, + len(ordered), + ) + return ordered + + async def _scan_exit_queue_tickets_rpc( + self, context: StakewiseVaultContext, user: str + ) -> list[ExitQueueTicket]: + """Chunked RPC fallback for exit queue scanning.""" + tickets: dict[int, ExitQueueTicket] = {} min_block = self._resolve_min_block() iterations = 0 logger.warning( - f"StakeWise exit queue scan start for address:{user} StakewiseVault: {context.address} from block {min_block}, this might take some time..." + "StakeWise exit queue scan via RPC — vault=%s user=%s from_block=%d (this may take time)", + context.address, + user, + min_block, ) for from_block, to_block in self._block_ranges( self.block_identifier, min_block ): iterations += 1 for event in context.exit_events: - logs = await self._get_exit_logs(event, user, from_block, to_block) - for log in logs: - args = log["args"] - ticket_id = int(args["positionTicket"]) - block_number = int(log["blockNumber"]) - log_index = int(log["logIndex"]) - - existing = tickets.get(ticket_id) - if existing and not ( - block_number > existing.block_number - or ( - block_number == existing.block_number - and log_index > existing.log_index - ) - ): - continue - - timestamp = await self._resolve_block_timestamp(block_number) - assets_value = args.get("assets") - tickets[ticket_id] = ExitQueueTicket( - ticket=ticket_id, - shares=int(args["shares"]), - receiver=self.w3.to_checksum_address(args["receiver"]), - block_number=block_number, - log_index=log_index, - timestamp=timestamp, - assets_hint=None if assets_value is None else int(assets_value), - ) + logs = await self._get_exit_logs_rpc(event, user, from_block, to_block) + await self._process_exit_logs(logs, tickets) ordered = sorted(tickets.values(), key=lambda t: (t.block_number, t.log_index)) logger.info( - "StakeWise exit queue scan completed — vault=%s user=%s tickets=%d iterations=%d", + "StakeWise exit queue scan completed (RPC) — vault=%s user=%s tickets=%d iterations=%d", context.address, user, len(ordered), @@ -402,7 +425,77 @@ async def _scan_exit_queue_tickets( ) return ordered - async def _get_exit_logs( + async def _process_exit_logs( + self, logs: list[EventData], tickets: dict[int, ExitQueueTicket] + ) -> None: + """Process exit logs and update tickets dict.""" + for log in logs: + args = log["args"] + ticket_id = int(args["positionTicket"]) + block_number = int(log["blockNumber"]) + log_index = int(log["logIndex"]) + + existing = tickets.get(ticket_id) + if existing and not ( + block_number > existing.block_number + or ( + block_number == existing.block_number + and log_index > existing.log_index + ) + ): + continue + + timestamp = await self._resolve_block_timestamp(block_number) + assets_value = args.get("assets") + tickets[ticket_id] = ExitQueueTicket( + ticket=ticket_id, + shares=int(args["shares"]), + receiver=self.w3.to_checksum_address(args["receiver"]), + block_number=block_number, + log_index=log_index, + timestamp=timestamp, + assets_hint=None if assets_value is None else int(assets_value), + ) + + async def _get_exit_logs_etherscan( + self, + event: ContractEvent, + user: str, + from_block: int, + to_block: int, + *, + vault_address: str | None = None, + ) -> list[EventData] | None: + # Try Etherscan first if available + if self._etherscan_client is None: + raise ValueError("Etherscan client not configured") + + if vault_address is None: + raise ValueError("Vault address required for Etherscan") + + try: + return await asyncio.to_thread( + self._etherscan_client.fetch_logs, + event, + vault_address, + {"owner": user}, + from_block, + to_block, + ) + + except ValueError as exc: # pragma: no cover - provider variance + event_name = getattr(event, "abi", {}).get("name", "unknown") + logger.warning( + "StakeWise exit log query failed — event=%s user=%s chunk=[%d,%d] err=%s", + event_name, + user, + from_block, + to_block, + exc, + ) + return None + + async def _get_exit_logs_rpc( self, event: ContractEvent, user: str, diff --git a/src/tq_oracle/clients/__init__.py b/src/tq_oracle/clients/__init__.py new file mode 100644 index 0000000..062ae28 --- /dev/null +++ b/src/tq_oracle/clients/__init__.py @@ -0,0 +1,3 @@ +from .etherscan_logs import EtherscanLogsClient + +__all__ = ["EtherscanLogsClient"] diff --git a/src/tq_oracle/clients/etherscan_logs.py b/src/tq_oracle/clients/etherscan_logs.py new file mode 100644 index 0000000..e04f33f --- /dev/null +++ b/src/tq_oracle/clients/etherscan_logs.py @@ -0,0 +1,203 @@ +"""Etherscan Logs API client for fetching event logs.""" + +from typing import Any, cast + +import backoff +import requests +from eth_abi.codec import ABICodec +from web3 import Web3 +from web3._utils.events import get_event_data +from web3._utils.filters import construct_event_filter_params +from web3.contract.contract import ContractEvent +from web3.types import EventData + +from ..logger import get_logger + +logger = get_logger(__name__) + +ETHERSCAN_API_V2_URL = "https://api.etherscan.io/v2/api" + + +class EtherscanRateLimitError(Exception): + """Raised when Etherscan returns a rate limit error.""" + + +class EtherscanLogsClient: + """Client for fetching event logs from Etherscan API v2.""" + + def __init__( + self, + api_key: str, + chain_id: int, + *, + page_size: int = 1000, + request_timeout: int = 15, + ): + self._api_key = api_key + self._chain_id = chain_id + self._page_size = min(page_size, 1000) + self._timeout = request_timeout + self._session = requests.Session() + + def fetch_logs( + self, + event: ContractEvent, + contract_address: str, + argument_filters: dict[str, Any], + from_block: int, + to_block: int, + ) -> list[EventData] | None: + """Fetch and decode event logs from Etherscan. + + Args: + event: Web3 ContractEvent to fetch logs for + contract_address: Contract address to query + argument_filters: Dict mapping indexed field names to filter values + from_block: Starting block number + to_block: Ending block number + + Returns: + List of decoded EventData, or None if the request fails + """ + abi = event._get_event_abi() + codec: ABICodec = event.w3.codec + event_name = abi.get("name", "unknown") + + # Use documented web3 internal API for filter params + try: + _, filter_params = construct_event_filter_params( + abi, + codec, + address=Web3.to_checksum_address(contract_address), + argument_filters=argument_filters, + from_block=from_block, + to_block=to_block, + ) + except Exception as exc: + logger.warning( + "Etherscan filter params failed — event=%s error=%s", + event_name, + exc, + ) + return None + + topics: list[Any] = list(filter_params.get("topics") or []) + if not topics: + logger.warning( + "Etherscan query skipped — event=%s has no topics", + event_name, + ) + return None + + logs: list[EventData] = [] + page = 1 + + while True: + result = self._call(contract_address, topics, from_block, to_block, page) + if result is None: + return None if page == 1 else logs + + for raw in result: + try: + log_entry = self._to_log_entry(raw) + logs.append(cast(EventData, get_event_data(codec, abi, log_entry))) + except Exception: + continue + + if len(result) < self._page_size: + break + page += 1 + + logger.debug( + "Etherscan query — event=%s filters=%s found=%d", + abi.get("name", "unknown"), + argument_filters, + len(logs), + ) + return logs + + @backoff.on_exception( + backoff.expo, + (requests.RequestException, EtherscanRateLimitError), + max_time=30, + jitter=backoff.full_jitter, + ) + def _call( + self, + address: str, + topics: list[Any], + from_block: int, + to_block: int, + page: int, + ) -> list[dict[str, Any]] | None: + """Make a paginated request to Etherscan logs API.""" + params: dict[str, Any] = { + "chainid": self._chain_id, + "module": "logs", + "action": "getLogs", + "address": address, + "fromBlock": from_block, + "toBlock": to_block, + "page": page, + "offset": self._page_size, + "apikey": self._api_key, + } + + # Add topics - convert bytes to hex string for Etherscan API + for i, topic in enumerate(topics[:3]): + if topic is not None: + params[f"topic{i}"] = topic.hex() if isinstance(topic, bytes) else topic + if i > 0: + params[f"topic0_{i}_opr"] = "and" + + resp = self._session.get( + ETHERSCAN_API_V2_URL, params=params, timeout=self._timeout + ) + resp.raise_for_status() + data = resp.json() + + result = data.get("result", "") + message = data.get("message", "").lower() + + # Handle string results (errors) + if isinstance(result, str): + if "rate limit" in result.lower(): + raise EtherscanRateLimitError(result) + if "no records" in result.lower(): + return [] + logger.debug("Etherscan returned error — result=%s", result) + return None + + # Handle list results + if isinstance(result, list): + # Empty list with "No records found" message is valid (no events) + if not result and "no records" in message: + return [] + # Success - return the results + if data.get("status") == "1": + return result + # Empty list with status=0 but no error message - treat as no records + if not result: + return [] + + logger.debug( + "Etherscan unexpected response — status=%s message=%s", + data.get("status"), + message, + ) + return None + + @staticmethod + def _to_log_entry(raw: dict[str, Any]) -> dict[str, Any]: + """Convert Etherscan log to web3 log entry format.""" + return { + "address": raw.get("address"), + "blockHash": bytes.fromhex(raw.get("blockHash", "0" * 66)[2:]), + "blockNumber": int(raw.get("blockNumber", "0x0"), 16), + "data": bytes.fromhex((raw.get("data") or "0x")[2:] or "00"), + "logIndex": int(raw.get("logIndex", "0x0"), 16), + "topics": [bytes.fromhex(t[2:]) for t in raw.get("topics", []) if t], + "transactionHash": bytes.fromhex(raw.get("transactionHash", "0" * 66)[2:]), + "transactionIndex": int(raw.get("transactionIndex", "0x0"), 16), + "removed": False, + } diff --git a/src/tq_oracle/settings.py b/src/tq_oracle/settings.py index 9fca335..68032cd 100644 --- a/src/tq_oracle/settings.py +++ b/src/tq_oracle/settings.py @@ -57,6 +57,8 @@ class StakewiseAdapterSettings(BaseModel): stakewise_exit_max_lookback_blocks: int = STAKEWISE_EXIT_MAX_LOOKBACK_BLOCKS extra_addresses: list[str] = Field(default_factory=list) skip_exit_queue_scan: bool = False + etherscan_api_key: str | None = None + etherscan_page_size: int = 1000 model_config = ConfigDict(extra="ignore")