diff --git a/README.md b/README.md index 64fe1ad..622f04e 100644 --- a/README.md +++ b/README.md @@ -166,6 +166,7 @@ Webhook payloads use the same JSON event shape and are sent only when a change/t | `SOLANA_RUG_LIQ_THRESHOLD_LOW` | `100000` | Liquidity below this is scored as low risk (1pt). | | `SOLANA_RUG_LIQ_VOL_RATIO_WARNING` | `15` | Volume/liquidity ratio above this triggers a wash-trading warning (+3pts). | | `SOLANA_RUG_LIQ_VOL_RATIO_MIN` | `0.05` | Volume/liquidity ratio below this flags an inactive/dead pool (+3pts). | +| `WALLET_SCAN_WORKERS` | `4` | Max concurrency limit for wallet token scanning (max 10). | --- diff --git a/scripts/rugguard.py b/scripts/rugguard.py index cba15a7..4117171 100644 --- a/scripts/rugguard.py +++ b/scripts/rugguard.py @@ -13,10 +13,12 @@ from __future__ import annotations +import concurrent.futures import json import os import sqlite3 import sys +import threading import time import urllib.request import uuid @@ -34,6 +36,7 @@ ] RPC_URL = os.environ.get("SOLANA_RPC_URL", PUBLIC_RPCS[0]) +WALLET_SCAN_WORKERS = min(10, max(1, int(os.environ.get("WALLET_SCAN_WORKERS", "4")))) LAMPORTS_PER_SOL = 1_000_000_000 @@ -57,6 +60,20 @@ def _cached(key: str, ttl: int = CACHE_TTL) -> Any | None: return None def _set_cache(key: str, value: Any, ttl: int = CACHE_TTL) -> None: _cache[key] = (time.time(), value) + +# ── Concurrency Controls ─────────────────────────────────────────────────── +_rpc_semaphore = threading.Semaphore(WALLET_SCAN_WORKERS) +_rpc_rate_limit_lock = threading.Lock() +_current_workers = WALLET_SCAN_WORKERS + +def _reduce_concurrency(): + global _current_workers + with _rpc_rate_limit_lock: + if _current_workers > 1: + _current_workers -= 1 + # Permanently consume one semaphore permit to reduce max concurrency + _rpc_semaphore.acquire(blocking=False) + # ── HTTP / RPC Helpers (stdlib-only) ─────────────────────────────────────── def _http_post(url: str, payload: dict, timeout: int = 6) -> dict | None: @@ -74,7 +91,11 @@ def _http_post(url: str, payload: dict, timeout: int = 6) -> dict | None: try: with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) - except (urllib.error.URLError, urllib.error.HTTPError, OSError, json.JSONDecodeError): + except urllib.error.HTTPError as e: + if e.code == 429: + _reduce_concurrency() + return None + except (urllib.error.URLError, OSError, json.JSONDecodeError): return None def _rpc_call( method: str, @@ -97,7 +118,8 @@ def _rpc_call( for attempt in range(retries + 1): resp = _http_post(url, payload) if resp is None: - # Try next RPC (but stay pinned if pin_rpc) + # Maybe a 429 happened, backoff slightly before retrying + time.sleep(2.0) if not pin_rpc: url = _next_rpc() continue @@ -1211,7 +1233,38 @@ def rug_check_token(mint: str) -> RugReport: warnings=list(dict.fromkeys(all_warnings)), dex_data=dex_data, ) -def rug_check_wallet(address: str) -> dict: +class ProgressBar: + def __init__(self, total: int, enabled: bool = True): + self.total = total + self.current = 0 + self.risks = 0 + self.start_time = time.time() + self.enabled = enabled and sys.stdout.isatty() + + def update(self, risks_found: int): + self.current += 1 + self.risks = risks_found + if not self.enabled: + return + + elapsed = time.time() - self.start_time + eta_seconds = (elapsed / self.current) * (self.total - self.current) if self.current > 0 else 0 + eta_str = f"{int(eta_seconds // 60)}m{int(eta_seconds % 60)}s" + + pct = int((self.current / self.total) * 100) if self.total > 0 else 100 + bars = int((self.current / self.total) * 10) if self.total > 0 else 10 + bar_str = "█" * bars + "░" * (10 - bars) + + msg = f"\r[{bar_str}] {self.current}/{self.total} tokens ({pct}%) | {self.risks} risks found | ETA: {eta_str}" + sys.stdout.write(msg) + sys.stdout.flush() + + def finish(self): + if self.enabled: + sys.stdout.write("\n") + sys.stdout.flush() + +def rug_check_wallet(address: str, disable_progress: bool = False) -> dict: """Scan a wallet for risky tokens held.""" result = _rpc_call("getTokenAccountsByOwner", [ address, @@ -1229,36 +1282,56 @@ def rug_check_wallet(address: str) -> dict: tokens = result["value"] risky_tokens = [] - + scannable_tokens = [] for token_acct in tokens: acct_data = token_acct.get("account", {}).get("data", {}).get("parsed", {}).get("info", {}) mint = acct_data.get("mint", "") amount = int(acct_data.get("tokenAmount", {}).get("amount", "0")) decimals = acct_data.get("tokenAmount", {}).get("decimals", 0) - if amount == 0: - continue + if amount > 0 and decimals > 0 and amount > 10 ** (decimals - 4): + scannable_tokens.append((mint, amount, decimals)) - # Only scan tokens with meaningful value (more than 0.01 in raw amount) - if decimals > 0 and amount > 10 ** (decimals - 4): - # Quick check: just metadata + authorities (fast, no LP/holders) - token = fetch_token_meta(mint) - if not token: - continue - mint_active, _, a_warnings = check_authorities(token) - # Quick risk: mint not revoked = high risk - quick_safety = 50 if mint_active else 80 - quick_level = "HIGH" if mint_active else "MEDIUM" - if quick_safety < 60: - risky_tokens.append({ - "mint": mint, - "symbol": token.symbol or f"{mint[:4]}...{mint[-4:]}", - "balance_raw": amount, - "decimals": decimals, - "safety_score": quick_safety, - "risk_level": quick_level, - "top_warnings": a_warnings[:3], - }) + progress = ProgressBar(len(scannable_tokens), enabled=not disable_progress) + risks_lock = threading.Lock() + + def scan_token(item): + mint, amount, decimals = item + token = fetch_token_meta(mint) + if not token: + return None + mint_active, _, a_warnings = check_authorities(token) + quick_safety = 50 if mint_active else 80 + quick_level = "HIGH" if mint_active else "MEDIUM" + if quick_safety < 60: + return { + "mint": mint, + "symbol": token.symbol or f"{mint[:4]}...{mint[-4:]}", + "balance_raw": amount, + "decimals": decimals, + "safety_score": quick_safety, + "risk_level": quick_level, + "top_warnings": a_warnings[:3], + } + return None + + def scan_token_safe(item): + _rpc_semaphore.acquire() + try: + return scan_token(item) + finally: + _rpc_semaphore.release() + + with concurrent.futures.ThreadPoolExecutor(max_workers=WALLET_SCAN_WORKERS) as executor: + futures = {executor.submit(scan_token_safe, item): item for item in scannable_tokens} + for future in concurrent.futures.as_completed(futures): + res = future.result() + with risks_lock: + if res: + risky_tokens.append(res) + progress.update(len(risky_tokens)) + + progress.finish() risky_tokens.sort(key=lambda t: t["safety_score"]) total_tokens = len(tokens) @@ -1942,7 +2015,8 @@ def cli_wallet(args: list[str]) -> None: sys.exit(1) break - result = rug_check_wallet(address.strip()) + disable_progress = "--json" in args or "--export" in args + result = rug_check_wallet(address.strip(), disable_progress=disable_progress) if export_fmt == "csv": rows = _wallet_csv_rows(result) print(format_csv(rows)) diff --git a/scripts/rugguard.py.rej b/scripts/rugguard.py.rej index b9bd6f7..04beaf4 100644 --- a/scripts/rugguard.py.rej +++ b/scripts/rugguard.py.rej @@ -1,135 +1,11 @@ diff a/scripts/rugguard.py b/scripts/rugguard.py (rejected hunks) -@@ -1627,7 +1627,132 @@ def format_markdown(report: RugReport) -> str: - def format_json(report: RugReport) -> str: - """Format report as pretty JSON.""" - return json.dumps(report.to_dict(), indent=2, default=str) --# ── CLI Entry Point ──────────────────────────────────────────────────────── -+ -+ -+# ── Comparison Table ────────────────────────────────────────────────────── -+ -+ -+def _format_comparison_table(reports: list[RugReport], -+ sort_by: str = "score") -> str: -+ """Render a side-by-side ASCII comparison table for multiple tokens. -+ -+ Each column = one token, auto-sized to widest value. -+ Tokens are sorted by safety_score ascending (riskiest first). -+ """ -+ if not reports: -+ return "" -+ -+ # Sort: score asc (riskiest first) -+ if sort_by == "score": -+ key_fn = lambda r: r.safety_score -+ elif sort_by == "name": -+ key_fn = lambda r: (r.token.name or r.token.symbol or r.token.address) -+ elif sort_by == "age": -+ key_fn = lambda r: -r.score.age_risk # less age risk = older -+ elif sort_by == "liquidity": -+ key_fn = lambda r: -(r.dex_data.get("liquidity_usd", 0) if r.dex_data else 0) -+ else: -+ key_fn = lambda r: r.safety_score -+ sorted_reports = sorted(reports, key=key_fn) -+ -+ # Build rows: each is a list of values, one per token -+ headers = ["Metric"] -+ for r in sorted_reports: -+ sym = r.token.symbol or r.token.name or r.token.address[:8] -+ headers.append(sym) -+ -+ rows: list[list[str]] = [headers] -+ -+ def val(name: str, vals: list[str]) -> None: -+ rows.append([name] + vals) -+ -+ val("Safety Score", [str(r.safety_score) for r in sorted_reports]) -+ val("Risk Level", [r.risk_level for r in sorted_reports]) -+ -+ # Price -+ prices = [] -+ for r in sorted_reports: -+ if r.dex_data and r.dex_data.get("price_usd"): -+ p = r.dex_data["price_usd"] -+ prices.append(f"${p:.8f}" if p < 1 else f"${p:.4f}") -+ else: -+ prices.append("—") -+ val("Price", prices) -+ -+ # 24h Change -+ changes = [] -+ for r in sorted_reports: -+ if r.dex_data and r.dex_data.get("price_change_24h"): -+ pct = r.dex_data["price_change_24h"] -+ changes.append(f"{pct:+.2f}%") -+ else: -+ changes.append("—") -+ val("24h Change", changes) -+ -+ # Liquidity -+ liqs = [] -+ for r in sorted_reports: -+ if r.dex_data and r.dex_data.get("liquidity_usd"): -+ liqs.append(f"${r.dex_data['liquidity_usd']:,.0f}") -+ else: -+ liqs.append("—") -+ val("Liquidity", liqs) -+ -+ # Volume 24h -+ vols = [] -+ for r in sorted_reports: -+ if r.dex_data and r.dex_data.get("volume_24h"): -+ vols.append(f"${r.dex_data['volume_24h']:,.0f}") -+ else: -+ vols.append("—") -+ val("Volume 24h", vols) -+ -+ # Holders -+ hldrs = [] -+ for r in sorted_reports: -+ if r.holders: -+ hldrs.append(str(r.holders.total_holders)) -+ else: -+ hldrs.append("—") -+ val("Holders", hldrs) -+ -+ # Top 10 % -+ top10 = [] -+ for r in sorted_reports: -+ if r.holders: -+ top10.append(f"{r.holders.top_10_pct:.1f}%") -+ else: -+ top10.append("—") -+ val("Top 10%", top10) -+ -+ # Warnings count -+ wcounts = [str(len(r.warnings)) for r in sorted_reports] -+ val("Warnings", wcounts) -+ -+ # Auto-size column widths -+ col_widths: list[int] = [] -+ for ci in range(len(rows[0])): -+ col_widths.append(max(len(r[ci]) for r in rows)) -+ col_widths[0] = max(col_widths[0], 12) # min metric column width -+ -+ # Render table -+ sep = " | " -+ lines = [] -+ # Header -+ hdr_parts = [h.ljust(col_widths[i]) for i, h in enumerate(rows[0])] -+ lines.append(sep.join(hdr_parts)) -+ # Separator -+ sep_parts = ["-" * col_widths[i] for i in range(len(col_widths))] -+ lines.append(sep.join(sep_parts)) -+ # Rows -+ for r in rows[1:]: -+ parts = [r[i].ljust(col_widths[i]) for i in range(len(r))] -+ lines.append(sep.join(parts)) -+ -+ return "\n".join(lines) -+ -+ -+# ── Export Helpers (CSV / JSONL) ────────────────────────────────────────── +@@ -1698,7 +1771,8 @@ def cli_wallet(args: list[str]) -> None: + print('Usage: python rugguard.py wallet
', file=sys.stderr) + sys.exit(1) - def cli_token(args: list[str]) -> None: - mint = args[0] if args else "" +- result = rug_check_wallet(address.strip()) ++ disable_progress = "--json" in args or "--export" in args ++ result = rug_check_wallet(address.strip(), disable_progress=disable_progress) + print(json.dumps(result, indent=2, default=str)) + + if result.get("risky_count", 0) > 0: