From c031b30993f91af005b89a0d1b91b5b90ca95af9 Mon Sep 17 00:00:00 2001 From: KHHH2312 Date: Thu, 4 Jun 2026 17:23:35 +0100 Subject: [PATCH] feat: Concurrent wallet scanning with progress bar (#29) - Replaces sequential token scanning in `rug_check_wallet` with ThreadPoolExecutor. - Adds custom ANSI progress bar with ETA. - Includes 429 rate limit backoff using global semaphore. - Documents WALLET_SCAN_WORKERS env var in README.md. --- README.md | 1 + scripts/rugguard.py | 128 ++++++++++++++++++++++++++++++++++---------- 2 files changed, 102 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index 64fe1ad..622f04e 100644 --- a/README.md +++ b/README.md @@ -166,6 +166,7 @@ Webhook payloads use the same JSON event shape and are sent only when a change/t | `SOLANA_RUG_LIQ_THRESHOLD_LOW` | `100000` | Liquidity below this is scored as low risk (1pt). | | `SOLANA_RUG_LIQ_VOL_RATIO_WARNING` | `15` | Volume/liquidity ratio above this triggers a wash-trading warning (+3pts). | | `SOLANA_RUG_LIQ_VOL_RATIO_MIN` | `0.05` | Volume/liquidity ratio below this flags an inactive/dead pool (+3pts). | +| `WALLET_SCAN_WORKERS` | `4` | Max concurrency limit for wallet token scanning (max 10). | --- diff --git a/scripts/rugguard.py b/scripts/rugguard.py index 5593f3f..e2670b3 100644 --- a/scripts/rugguard.py +++ b/scripts/rugguard.py @@ -13,10 +13,12 @@ from __future__ import annotations +import concurrent.futures import json import os import sqlite3 import sys +import threading import time import urllib.request import uuid @@ -34,6 +36,7 @@ ] RPC_URL = os.environ.get("SOLANA_RPC_URL", PUBLIC_RPCS[0]) +WALLET_SCAN_WORKERS = min(10, max(1, int(os.environ.get("WALLET_SCAN_WORKERS", "4")))) LAMPORTS_PER_SOL = 1_000_000_000 @@ -57,6 +60,20 @@ def _cached(key: str, ttl: int = CACHE_TTL) -> Any | None: return None def _set_cache(key: str, value: Any, ttl: int = CACHE_TTL) -> None: _cache[key] = (time.time(), value) + +# ── Concurrency Controls ─────────────────────────────────────────────────── +_rpc_semaphore = threading.Semaphore(WALLET_SCAN_WORKERS) +_rpc_rate_limit_lock = threading.Lock() +_current_workers = WALLET_SCAN_WORKERS + +def _reduce_concurrency(): + global _current_workers + with _rpc_rate_limit_lock: + if _current_workers > 1: + _current_workers -= 1 + # Permanently consume one semaphore permit to reduce max concurrency + _rpc_semaphore.acquire(blocking=False) + # ── HTTP / RPC Helpers (stdlib-only) ─────────────────────────────────────── def _http_post(url: str, payload: dict, timeout: int = 6) -> dict | None: @@ -74,7 +91,11 @@ def _http_post(url: str, payload: dict, timeout: int = 6) -> dict | None: try: with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) - except (urllib.error.URLError, urllib.error.HTTPError, OSError, json.JSONDecodeError): + except urllib.error.HTTPError as e: + if e.code == 429: + _reduce_concurrency() + return None + except (urllib.error.URLError, OSError, json.JSONDecodeError): return None def _rpc_call( method: str, @@ -97,7 +118,8 @@ def _rpc_call( for attempt in range(retries + 1): resp = _http_post(url, payload) if resp is None: - # Try next RPC (but stay pinned if pin_rpc) + # Maybe a 429 happened, backoff slightly before retrying + time.sleep(2.0) if not pin_rpc: url = _next_rpc() continue @@ -1211,7 +1233,38 @@ def rug_check_token(mint: str) -> RugReport: warnings=list(dict.fromkeys(all_warnings)), dex_data=dex_data, ) -def rug_check_wallet(address: str) -> dict: +class ProgressBar: + def __init__(self, total: int, enabled: bool = True): + self.total = total + self.current = 0 + self.risks = 0 + self.start_time = time.time() + self.enabled = enabled and sys.stdout.isatty() + + def update(self, risks_found: int): + self.current += 1 + self.risks = risks_found + if not self.enabled: + return + + elapsed = time.time() - self.start_time + eta_seconds = (elapsed / self.current) * (self.total - self.current) if self.current > 0 else 0 + eta_str = f"{int(eta_seconds // 60)}m{int(eta_seconds % 60)}s" + + pct = int((self.current / self.total) * 100) if self.total > 0 else 100 + bars = int((self.current / self.total) * 10) if self.total > 0 else 10 + bar_str = "█" * bars + "░" * (10 - bars) + + msg = f"\r[{bar_str}] {self.current}/{self.total} tokens ({pct}%) | {self.risks} risks found | ETA: {eta_str}" + sys.stdout.write(msg) + sys.stdout.flush() + + def finish(self): + if self.enabled: + sys.stdout.write("\n") + sys.stdout.flush() + +def rug_check_wallet(address: str, disable_progress: bool = False) -> dict: """Scan a wallet for risky tokens held.""" result = _rpc_call("getTokenAccountsByOwner", [ address, @@ -1229,36 +1282,56 @@ def rug_check_wallet(address: str) -> dict: tokens = result["value"] risky_tokens = [] - + scannable_tokens = [] for token_acct in tokens: acct_data = token_acct.get("account", {}).get("data", {}).get("parsed", {}).get("info", {}) mint = acct_data.get("mint", "") amount = int(acct_data.get("tokenAmount", {}).get("amount", "0")) decimals = acct_data.get("tokenAmount", {}).get("decimals", 0) - if amount == 0: - continue + if amount > 0 and decimals > 0 and amount > 10 ** (decimals - 4): + scannable_tokens.append((mint, amount, decimals)) - # Only scan tokens with meaningful value (more than 0.01 in raw amount) - if decimals > 0 and amount > 10 ** (decimals - 4): - # Quick check: just metadata + authorities (fast, no LP/holders) - token = fetch_token_meta(mint) - if not token: - continue - mint_active, _, a_warnings = check_authorities(token) - # Quick risk: mint not revoked = high risk - quick_safety = 50 if mint_active else 80 - quick_level = "HIGH" if mint_active else "MEDIUM" - if quick_safety < 60: - risky_tokens.append({ - "mint": mint, - "symbol": token.symbol or f"{mint[:4]}...{mint[-4:]}", - "balance_raw": amount, - "decimals": decimals, - "safety_score": quick_safety, - "risk_level": quick_level, - "top_warnings": a_warnings[:3], - }) + progress = ProgressBar(len(scannable_tokens), enabled=not disable_progress) + risks_lock = threading.Lock() + + def scan_token(item): + mint, amount, decimals = item + token = fetch_token_meta(mint) + if not token: + return None + mint_active, _, a_warnings = check_authorities(token) + quick_safety = 50 if mint_active else 80 + quick_level = "HIGH" if mint_active else "MEDIUM" + if quick_safety < 60: + return { + "mint": mint, + "symbol": token.symbol or f"{mint[:4]}...{mint[-4:]}", + "balance_raw": amount, + "decimals": decimals, + "safety_score": quick_safety, + "risk_level": quick_level, + "top_warnings": a_warnings[:3], + } + return None + + def scan_token_safe(item): + _rpc_semaphore.acquire() + try: + return scan_token(item) + finally: + _rpc_semaphore.release() + + with concurrent.futures.ThreadPoolExecutor(max_workers=WALLET_SCAN_WORKERS) as executor: + futures = {executor.submit(scan_token_safe, item): item for item in scannable_tokens} + for future in concurrent.futures.as_completed(futures): + res = future.result() + with risks_lock: + if res: + risky_tokens.append(res) + progress.update(len(risky_tokens)) + + progress.finish() risky_tokens.sort(key=lambda t: t["safety_score"]) total_tokens = len(tokens) @@ -1698,7 +1771,8 @@ def cli_wallet(args: list[str]) -> None: print('Usage: python rugguard.py wallet
', file=sys.stderr) sys.exit(1) - result = rug_check_wallet(address.strip()) + disable_progress = "--json" in args or "--export" in args + result = rug_check_wallet(address.strip(), disable_progress=disable_progress) print(json.dumps(result, indent=2, default=str)) if result.get("risky_count", 0) > 0: