From 1f23674822408a7bfa1ab8d457c8e498a81a45e0 Mon Sep 17 00:00:00 2001 From: timbrinded <79199034+timbrinded@users.noreply.github.com> Date: Fri, 5 Dec 2025 14:29:10 +0000 Subject: [PATCH 1/2] etherscan option --- .../adapters/asset_adapters/stakewise.py | 155 +++++++-- src/tq_oracle/clients/__init__.py | 3 + src/tq_oracle/clients/etherscan_logs.py | 303 ++++++++++++++++++ src/tq_oracle/settings.py | 2 + 4 files changed, 432 insertions(+), 31 deletions(-) create mode 100644 src/tq_oracle/clients/__init__.py create mode 100644 src/tq_oracle/clients/etherscan_logs.py diff --git a/src/tq_oracle/adapters/asset_adapters/stakewise.py b/src/tq_oracle/adapters/asset_adapters/stakewise.py index 9bae315..572d607 100644 --- a/src/tq_oracle/adapters/asset_adapters/stakewise.py +++ b/src/tq_oracle/adapters/asset_adapters/stakewise.py @@ -13,6 +13,7 @@ from web3.types import EventData from ...abi import fetch_subvault_addresses, load_stakewise_vault_abi +from ...clients.etherscan_logs import EtherscanLogsClient from ...constants import ( STAKEWISE_ADDRESSES, STAKEWISE_EXIT_LOG_CHUNK, @@ -128,6 +129,16 @@ def __init__( self._rpc_jitter = getattr(config, "rpc_jitter", 0.10) self._block_timestamp_cache: dict[int, int] = {} + # Optional Etherscan client for faster log queries + self._etherscan_client: EtherscanLogsClient | None = None + if adapter_config.etherscan_api_key: + self._etherscan_client = EtherscanLogsClient( + api_key=adapter_config.etherscan_api_key, + chain_id=config.chain_id, + page_size=adapter_config.etherscan_page_size, + ) + logger.debug("StakeWise Etherscan client enabled for log queries") + extra_address_candidates = [ self.w3.to_checksum_address(addr) for addr in adapter_config.extra_addresses @@ -352,49 +363,61 @@ async def _scan_exit_queue_tickets( return [] tickets: dict[int, ExitQueueTicket] = {} + + # Try Etherscan first (one-shot from block 0, no chunking needed) + if self._etherscan_client: + logger.debug( + "StakeWise exit queue scan via Etherscan — vault=%s user=%s", + context.address, + user, + ) + for event in context.exit_events: + logs = await self._get_exit_logs_etherscan( + event, user, 0, self.block_identifier, vault_address=context.address + ) + if logs is not None: + await self._process_exit_logs(logs, tickets) + else: + # Etherscan failed, fall back to RPC for everything + tickets.clear() + return await self._scan_exit_queue_tickets_rpc(context, user) + else: + return await self._scan_exit_queue_tickets_rpc(context, user) + + ordered = sorted(tickets.values(), key=lambda t: (t.block_number, t.log_index)) + logger.info( + "StakeWise exit queue scan completed (Etherscan) — vault=%s user=%s tickets=%d", + context.address, + user, + len(ordered), + ) + return ordered + + async def _scan_exit_queue_tickets_rpc( + self, context: StakewiseVaultContext, user: str + ) -> list[ExitQueueTicket]: + """Chunked RPC fallback for exit queue scanning.""" + tickets: dict[int, ExitQueueTicket] = {} min_block = self._resolve_min_block() iterations = 0 logger.warning( - f"StakeWise exit queue scan start for address:{user} StakewiseVault: {context.address} from block {min_block}, this might take some time..." + "StakeWise exit queue scan via RPC — vault=%s user=%s from_block=%d (this may take time)", + context.address, + user, + min_block, ) for from_block, to_block in self._block_ranges( self.block_identifier, min_block ): iterations += 1 for event in context.exit_events: - logs = await self._get_exit_logs(event, user, from_block, to_block) - for log in logs: - args = log["args"] - ticket_id = int(args["positionTicket"]) - block_number = int(log["blockNumber"]) - log_index = int(log["logIndex"]) - - existing = tickets.get(ticket_id) - if existing and not ( - block_number > existing.block_number - or ( - block_number == existing.block_number - and log_index > existing.log_index - ) - ): - continue - - timestamp = await self._resolve_block_timestamp(block_number) - assets_value = args.get("assets") - tickets[ticket_id] = ExitQueueTicket( - ticket=ticket_id, - shares=int(args["shares"]), - receiver=self.w3.to_checksum_address(args["receiver"]), - block_number=block_number, - log_index=log_index, - timestamp=timestamp, - assets_hint=None if assets_value is None else int(assets_value), - ) + logs = await self._get_exit_logs_rpc(event, user, from_block, to_block) + await self._process_exit_logs(logs, tickets) ordered = sorted(tickets.values(), key=lambda t: (t.block_number, t.log_index)) logger.info( - "StakeWise exit queue scan completed — vault=%s user=%s tickets=%d iterations=%d", + "StakeWise exit queue scan completed (RPC) — vault=%s user=%s tickets=%d iterations=%d", context.address, user, len(ordered), @@ -402,7 +425,77 @@ async def _scan_exit_queue_tickets( ) return ordered - async def _get_exit_logs( + async def _process_exit_logs( + self, logs: list[EventData], tickets: dict[int, ExitQueueTicket] + ) -> None: + """Process exit logs and update tickets dict.""" + for log in logs: + args = log["args"] + ticket_id = int(args["positionTicket"]) + block_number = int(log["blockNumber"]) + log_index = int(log["logIndex"]) + + existing = tickets.get(ticket_id) + if existing and not ( + block_number > existing.block_number + or ( + block_number == existing.block_number + and log_index > existing.log_index + ) + ): + continue + + timestamp = await self._resolve_block_timestamp(block_number) + assets_value = args.get("assets") + tickets[ticket_id] = ExitQueueTicket( + ticket=ticket_id, + shares=int(args["shares"]), + receiver=self.w3.to_checksum_address(args["receiver"]), + block_number=block_number, + log_index=log_index, + timestamp=timestamp, + assets_hint=None if assets_value is None else int(assets_value), + ) + + async def _get_exit_logs_etherscan( + self, + event: ContractEvent, + user: str, + from_block: int, + to_block: int, + *, + vault_address: str | None = None, + ) -> list[EventData] | None: + # Try Etherscan first if available + if self._etherscan_client is None: + raise ValueError("Etherscan client not configured") + + if vault_address is None: + raise ValueError("Vault address required for Etherscan") + + try: + return await asyncio.to_thread( + self._etherscan_client.fetch_logs, + event, + vault_address, + {"owner": user}, + from_block, + to_block, + ) + + except ValueError as exc: # pragma: no cover - provider variance + event_name = getattr(event, "abi", {}).get("name", "unknown") + logger.warning( + "StakeWise exit log query failed — event=%s user=%s chunk=[%d,%d] err=%s", + event_name, + user, + from_block, + to_block, + exc, + ) + return None + + async def _get_exit_logs_rpc( self, event: ContractEvent, user: str, diff --git a/src/tq_oracle/clients/__init__.py b/src/tq_oracle/clients/__init__.py new file mode 100644 index 0000000..062ae28 --- /dev/null +++ b/src/tq_oracle/clients/__init__.py @@ -0,0 +1,3 @@ +from .etherscan_logs import EtherscanLogsClient + +__all__ = ["EtherscanLogsClient"] diff --git a/src/tq_oracle/clients/etherscan_logs.py b/src/tq_oracle/clients/etherscan_logs.py new file mode 100644 index 0000000..6a1ddea --- /dev/null +++ b/src/tq_oracle/clients/etherscan_logs.py @@ -0,0 +1,303 @@ +"""Etherscan Logs API client for fetching event logs. + +This module provides a reusable client for the Etherscan logs/getLogs API endpoint +""" + +from typing import Any, Sequence, TypedDict + +import backoff +import requests +from web3 import Web3 +from web3.contract.contract import ContractEvent +from web3.types import EventData + +from eth_typing import HexStr + +from ..logger import get_logger + +logger = get_logger(__name__) + +ETHERSCAN_API_V2_URL = "https://api.etherscan.io/v2/api" + + +class EtherscanRateLimitError(Exception): + """Raised when Etherscan returns a rate limit error.""" + + pass + + +class EtherscanLogsResult(TypedDict, total=False): + """Raw Etherscan API response for a single log entry.""" + + address: str + topics: list[str] + data: str + blockNumber: str + blockHash: str + timeStamp: str + gasPrice: str + gasUsed: str + logIndex: str + transactionHash: str + transactionIndex: str + + +class EtherscanLogsResponse(TypedDict): + """Complete Etherscan logs API response.""" + + status: str + message: str + result: list[EtherscanLogsResult] | str + + +class EtherscanLogsClient: + """Client for fetching event logs from Etherscan API v2. + + Provides async-compatible log fetching with: + - Automatic pagination + - Exponential backoff retry logic + - Topic filtering support + - Conversion to Web3.py EventData format + """ + + def __init__( + self, + api_key: str, + chain_id: int, + *, + page_size: int = 1000, + request_timeout: int = 15, + ): + """Initialize the Etherscan logs client. + + Args: + api_key: Etherscan API key + chain_id: Chain ID for the network (1 for mainnet, etc.) + api_url: Base API URL (defaults to Etherscan v2 API) + page_size: Number of results per page (max 1000) + request_timeout: HTTP request timeout in seconds + """ + self._api_key = api_key + self._chain_id = chain_id + self._page_size = max(1, min(page_size, 1000)) + self._request_timeout = request_timeout + self._session = requests.Session() + + def fetch_logs( + self, + event: ContractEvent, + contract_address: str, + argument_filters: dict[str, str], + from_block: int, + to_block: int, + ) -> list[EventData] | None: + """Fetch and decode event logs from Etherscan. + + Args: + event: Web3 ContractEvent to fetch logs for + contract_address: Contract address to query + argument_filters: Dict mapping indexed field names to filter values + from_block: Starting block number + to_block: Ending block number + + Returns: + List of decoded EventData, or None if the request fails + """ + abi = getattr(event, "abi", None) + if not abi: + return None + + topics = self._resolve_topics(event, argument_filters) + if not topics or not topics[0]: + return None + + logs: list[EventData] = [] + page = 1 + filter_desc = ", ".join(f"{k}={v}" for k, v in argument_filters.items()) + + while True: + payload = self._call( + contract_address, + topics, + from_block, + to_block, + page, + ) + if payload is None: + return None + + status = payload.get("status", "").strip() + message = payload.get("message", "").strip().lower() + result = payload.get("result") + + if status != "1": + if isinstance(result, str) and result.lower() == "no records found": + break + if message == "no records found": + break + logger.warning( + "Etherscan log query failed — event=%s filters={%s} blocks=[%d,%d] error=%s", + event.event_name, + filter_desc, + from_block, + to_block, + payload.get("result") or payload.get("message"), + ) + return None + + if not isinstance(result, list): + logger.warning("Unexpected Etherscan logs result: %s", result) + return None + + for raw_log in result: + decoded = self._process_log(event, raw_log) + if decoded is not None: + logs.append(decoded) + + if len(result) < self._page_size: + break + page += 1 + + logger.debug( + "Etherscan query — event=%s filter={%s} found=%d", + event.event_name, + filter_desc, + len(logs), + ) + return logs + + @backoff.on_exception( + backoff.expo, + (requests.RequestException, ValueError, EtherscanRateLimitError), + max_time=30, + jitter=backoff.full_jitter, + ) + def _call( + self, + contract_address: str, + topics: Sequence[str | None], + from_block: int, + to_block: int, + page: int, + ) -> EtherscanLogsResponse | None: + """Make a paginated request to the Etherscan logs API.""" + params: dict[str, Any] = { + "chainid": str(self._chain_id), + "module": "logs", + "action": "getLogs", + "address": contract_address, + "fromBlock": str(from_block), + "toBlock": str(to_block), + "page": page, + "offset": self._page_size, + "sort": "asc", + "apikey": self._api_key, + } + + # Add topic filters + topic0 = topics[0] if len(topics) > 0 else None + topic1 = topics[1] if len(topics) > 1 else None + topic2 = topics[2] if len(topics) > 2 else None + + if topic0: + params["topic0"] = topic0 + if topic1: + params["topic1"] = topic1 + params["topic0_1_opr"] = "and" + if topic2: + params["topic2"] = topic2 + params["topic0_2_opr"] = "and" + + response = self._session.get( + ETHERSCAN_API_V2_URL, + params=params, + timeout=self._request_timeout, + ) + response.raise_for_status() + + payload = response.json() + if not isinstance(payload, dict): + raise ValueError("Unexpected Etherscan payload format") + + result = payload.get("result", "") + if isinstance(result, str) and "rate limit" in result.lower(): + raise EtherscanRateLimitError(result) + + return payload # type: ignore[return-value] + + def _resolve_topics( + self, + event: ContractEvent, + argument_filters: dict[str, str], + ) -> list[str | None]: + """Extract Ethereum log topics from a contract event with filters.""" + try: + params = event._get_event_filter_params(event.abi, argument_filters) + except Exception: + return [] + + topics = params.get("topics") or [] + resolved: list[str | None] = [] + + for index in range(len(topics)): + resolved.append(self._extract_topic(topics, index)) + + return resolved + + @staticmethod + def _extract_topic(topics: Sequence[Any], index: int) -> str | None: + """Safely extract and convert a single topic to hex string.""" + if len(topics) <= index: + return None + + value = topics[index] + if isinstance(value, list): + value = value[0] if value else None + if value is None: + return None + if isinstance(value, (bytes, bytearray)): + if isinstance(value, bytearray): + value = bytes(value) + return Web3.to_hex(value) + if isinstance(value, str): + return value + return Web3.to_hex(value) + + def _process_log( + self, + event: ContractEvent, + raw_log: EtherscanLogsResult, + ) -> EventData | None: + """Decode a raw Etherscan log using the contract event.""" + try: + formatted = self._format_log(raw_log) + return event.process_log(formatted) + except Exception: + return None + + @staticmethod + def _format_log(raw_log: EtherscanLogsResult) -> dict[str, Any]: + """Convert Etherscan API log format to Web3.py EventLog format.""" + + def to_int(value: str | None) -> int: + if not value: + return 0 + return Web3.to_int(hexstr=HexStr(value)) + + data_hex = raw_log.get("data") or "0x" + block_hash_hex = raw_log.get("blockHash") or "0x0" + tx_hash_hex = raw_log.get("transactionHash") or "0x0" + topics_hex = raw_log.get("topics", []) or [] + + return { + "address": raw_log.get("address"), + "blockHash": block_hash_hex, + "blockNumber": to_int(raw_log.get("blockNumber")), + "data": data_hex, + "logIndex": to_int(raw_log.get("logIndex")), + "topics": [ + Web3.to_bytes(hexstr=HexStr(topic)) for topic in topics_hex if topic + ], + "transactionHash": tx_hash_hex, + "transactionIndex": to_int(raw_log.get("transactionIndex")), + } diff --git a/src/tq_oracle/settings.py b/src/tq_oracle/settings.py index 9fca335..68032cd 100644 --- a/src/tq_oracle/settings.py +++ b/src/tq_oracle/settings.py @@ -57,6 +57,8 @@ class StakewiseAdapterSettings(BaseModel): stakewise_exit_max_lookback_blocks: int = STAKEWISE_EXIT_MAX_LOOKBACK_BLOCKS extra_addresses: list[str] = Field(default_factory=list) skip_exit_queue_scan: bool = False + etherscan_api_key: str | None = None + etherscan_page_size: int = 1000 model_config = ConfigDict(extra="ignore") From 80b5d1e09db18143b998e991c89edf95eb886234 Mon Sep 17 00:00:00 2001 From: timbrinded <79199034+timbrinded@users.noreply.github.com> Date: Fri, 5 Dec 2025 15:28:07 +0000 Subject: [PATCH 2/2] refactor --- src/tq_oracle/clients/etherscan_logs.py | 310 ++++++++---------------- 1 file changed, 105 insertions(+), 205 deletions(-) diff --git a/src/tq_oracle/clients/etherscan_logs.py b/src/tq_oracle/clients/etherscan_logs.py index 6a1ddea..e04f33f 100644 --- a/src/tq_oracle/clients/etherscan_logs.py +++ b/src/tq_oracle/clients/etherscan_logs.py @@ -1,18 +1,16 @@ -"""Etherscan Logs API client for fetching event logs. +"""Etherscan Logs API client for fetching event logs.""" -This module provides a reusable client for the Etherscan logs/getLogs API endpoint -""" - -from typing import Any, Sequence, TypedDict +from typing import Any, cast import backoff import requests +from eth_abi.codec import ABICodec from web3 import Web3 +from web3._utils.events import get_event_data +from web3._utils.filters import construct_event_filter_params from web3.contract.contract import ContractEvent from web3.types import EventData -from eth_typing import HexStr - from ..logger import get_logger logger = get_logger(__name__) @@ -23,42 +21,9 @@ class EtherscanRateLimitError(Exception): """Raised when Etherscan returns a rate limit error.""" - pass - - -class EtherscanLogsResult(TypedDict, total=False): - """Raw Etherscan API response for a single log entry.""" - - address: str - topics: list[str] - data: str - blockNumber: str - blockHash: str - timeStamp: str - gasPrice: str - gasUsed: str - logIndex: str - transactionHash: str - transactionIndex: str - - -class EtherscanLogsResponse(TypedDict): - """Complete Etherscan logs API response.""" - - status: str - message: str - result: list[EtherscanLogsResult] | str - class EtherscanLogsClient: - """Client for fetching event logs from Etherscan API v2. - - Provides async-compatible log fetching with: - - Automatic pagination - - Exponential backoff retry logic - - Topic filtering support - - Conversion to Web3.py EventData format - """ + """Client for fetching event logs from Etherscan API v2.""" def __init__( self, @@ -68,26 +33,17 @@ def __init__( page_size: int = 1000, request_timeout: int = 15, ): - """Initialize the Etherscan logs client. - - Args: - api_key: Etherscan API key - chain_id: Chain ID for the network (1 for mainnet, etc.) - api_url: Base API URL (defaults to Etherscan v2 API) - page_size: Number of results per page (max 1000) - request_timeout: HTTP request timeout in seconds - """ self._api_key = api_key self._chain_id = chain_id - self._page_size = max(1, min(page_size, 1000)) - self._request_timeout = request_timeout + self._page_size = min(page_size, 1000) + self._timeout = request_timeout self._session = requests.Session() def fetch_logs( self, event: ContractEvent, contract_address: str, - argument_filters: dict[str, str], + argument_filters: dict[str, Any], from_block: int, to_block: int, ) -> list[EventData] | None: @@ -103,201 +59,145 @@ def fetch_logs( Returns: List of decoded EventData, or None if the request fails """ - abi = getattr(event, "abi", None) - if not abi: + abi = event._get_event_abi() + codec: ABICodec = event.w3.codec + event_name = abi.get("name", "unknown") + + # Use documented web3 internal API for filter params + try: + _, filter_params = construct_event_filter_params( + abi, + codec, + address=Web3.to_checksum_address(contract_address), + argument_filters=argument_filters, + from_block=from_block, + to_block=to_block, + ) + except Exception as exc: + logger.warning( + "Etherscan filter params failed — event=%s error=%s", + event_name, + exc, + ) return None - topics = self._resolve_topics(event, argument_filters) - if not topics or not topics[0]: + topics: list[Any] = list(filter_params.get("topics") or []) + if not topics: + logger.warning( + "Etherscan query skipped — event=%s has no topics", + event_name, + ) return None logs: list[EventData] = [] page = 1 - filter_desc = ", ".join(f"{k}={v}" for k, v in argument_filters.items()) while True: - payload = self._call( - contract_address, - topics, - from_block, - to_block, - page, - ) - if payload is None: - return None - - status = payload.get("status", "").strip() - message = payload.get("message", "").strip().lower() - result = payload.get("result") - - if status != "1": - if isinstance(result, str) and result.lower() == "no records found": - break - if message == "no records found": - break - logger.warning( - "Etherscan log query failed — event=%s filters={%s} blocks=[%d,%d] error=%s", - event.event_name, - filter_desc, - from_block, - to_block, - payload.get("result") or payload.get("message"), - ) - return None + result = self._call(contract_address, topics, from_block, to_block, page) + if result is None: + return None if page == 1 else logs - if not isinstance(result, list): - logger.warning("Unexpected Etherscan logs result: %s", result) - return None - - for raw_log in result: - decoded = self._process_log(event, raw_log) - if decoded is not None: - logs.append(decoded) + for raw in result: + try: + log_entry = self._to_log_entry(raw) + logs.append(cast(EventData, get_event_data(codec, abi, log_entry))) + except Exception: + continue if len(result) < self._page_size: break page += 1 logger.debug( - "Etherscan query — event=%s filter={%s} found=%d", - event.event_name, - filter_desc, + "Etherscan query — event=%s filters=%s found=%d", + abi.get("name", "unknown"), + argument_filters, len(logs), ) return logs @backoff.on_exception( backoff.expo, - (requests.RequestException, ValueError, EtherscanRateLimitError), + (requests.RequestException, EtherscanRateLimitError), max_time=30, jitter=backoff.full_jitter, ) def _call( self, - contract_address: str, - topics: Sequence[str | None], + address: str, + topics: list[Any], from_block: int, to_block: int, page: int, - ) -> EtherscanLogsResponse | None: - """Make a paginated request to the Etherscan logs API.""" + ) -> list[dict[str, Any]] | None: + """Make a paginated request to Etherscan logs API.""" params: dict[str, Any] = { - "chainid": str(self._chain_id), + "chainid": self._chain_id, "module": "logs", "action": "getLogs", - "address": contract_address, - "fromBlock": str(from_block), - "toBlock": str(to_block), + "address": address, + "fromBlock": from_block, + "toBlock": to_block, "page": page, "offset": self._page_size, - "sort": "asc", "apikey": self._api_key, } - # Add topic filters - topic0 = topics[0] if len(topics) > 0 else None - topic1 = topics[1] if len(topics) > 1 else None - topic2 = topics[2] if len(topics) > 2 else None + # Add topics - convert bytes to hex string for Etherscan API + for i, topic in enumerate(topics[:3]): + if topic is not None: + params[f"topic{i}"] = topic.hex() if isinstance(topic, bytes) else topic + if i > 0: + params[f"topic0_{i}_opr"] = "and" - if topic0: - params["topic0"] = topic0 - if topic1: - params["topic1"] = topic1 - params["topic0_1_opr"] = "and" - if topic2: - params["topic2"] = topic2 - params["topic0_2_opr"] = "and" - - response = self._session.get( - ETHERSCAN_API_V2_URL, - params=params, - timeout=self._request_timeout, + resp = self._session.get( + ETHERSCAN_API_V2_URL, params=params, timeout=self._timeout ) - response.raise_for_status() - - payload = response.json() - if not isinstance(payload, dict): - raise ValueError("Unexpected Etherscan payload format") - - result = payload.get("result", "") - if isinstance(result, str) and "rate limit" in result.lower(): - raise EtherscanRateLimitError(result) - - return payload # type: ignore[return-value] - - def _resolve_topics( - self, - event: ContractEvent, - argument_filters: dict[str, str], - ) -> list[str | None]: - """Extract Ethereum log topics from a contract event with filters.""" - try: - params = event._get_event_filter_params(event.abi, argument_filters) - except Exception: - return [] - - topics = params.get("topics") or [] - resolved: list[str | None] = [] - - for index in range(len(topics)): - resolved.append(self._extract_topic(topics, index)) - - return resolved - - @staticmethod - def _extract_topic(topics: Sequence[Any], index: int) -> str | None: - """Safely extract and convert a single topic to hex string.""" - if len(topics) <= index: + resp.raise_for_status() + data = resp.json() + + result = data.get("result", "") + message = data.get("message", "").lower() + + # Handle string results (errors) + if isinstance(result, str): + if "rate limit" in result.lower(): + raise EtherscanRateLimitError(result) + if "no records" in result.lower(): + return [] + logger.debug("Etherscan returned error — result=%s", result) return None - value = topics[index] - if isinstance(value, list): - value = value[0] if value else None - if value is None: - return None - if isinstance(value, (bytes, bytearray)): - if isinstance(value, bytearray): - value = bytes(value) - return Web3.to_hex(value) - if isinstance(value, str): - return value - return Web3.to_hex(value) + # Handle list results + if isinstance(result, list): + # Empty list with "No records found" message is valid (no events) + if not result and "no records" in message: + return [] + # Success - return the results + if data.get("status") == "1": + return result + # Empty list with status=0 but no error message - treat as no records + if not result: + return [] - def _process_log( - self, - event: ContractEvent, - raw_log: EtherscanLogsResult, - ) -> EventData | None: - """Decode a raw Etherscan log using the contract event.""" - try: - formatted = self._format_log(raw_log) - return event.process_log(formatted) - except Exception: - return None + logger.debug( + "Etherscan unexpected response — status=%s message=%s", + data.get("status"), + message, + ) + return None @staticmethod - def _format_log(raw_log: EtherscanLogsResult) -> dict[str, Any]: - """Convert Etherscan API log format to Web3.py EventLog format.""" - - def to_int(value: str | None) -> int: - if not value: - return 0 - return Web3.to_int(hexstr=HexStr(value)) - - data_hex = raw_log.get("data") or "0x" - block_hash_hex = raw_log.get("blockHash") or "0x0" - tx_hash_hex = raw_log.get("transactionHash") or "0x0" - topics_hex = raw_log.get("topics", []) or [] - + def _to_log_entry(raw: dict[str, Any]) -> dict[str, Any]: + """Convert Etherscan log to web3 log entry format.""" return { - "address": raw_log.get("address"), - "blockHash": block_hash_hex, - "blockNumber": to_int(raw_log.get("blockNumber")), - "data": data_hex, - "logIndex": to_int(raw_log.get("logIndex")), - "topics": [ - Web3.to_bytes(hexstr=HexStr(topic)) for topic in topics_hex if topic - ], - "transactionHash": tx_hash_hex, - "transactionIndex": to_int(raw_log.get("transactionIndex")), + "address": raw.get("address"), + "blockHash": bytes.fromhex(raw.get("blockHash", "0" * 66)[2:]), + "blockNumber": int(raw.get("blockNumber", "0x0"), 16), + "data": bytes.fromhex((raw.get("data") or "0x")[2:] or "00"), + "logIndex": int(raw.get("logIndex", "0x0"), 16), + "topics": [bytes.fromhex(t[2:]) for t in raw.get("topics", []) if t], + "transactionHash": bytes.fromhex(raw.get("transactionHash", "0" * 66)[2:]), + "transactionIndex": int(raw.get("transactionIndex", "0x0"), 16), + "removed": False, }