diff --git a/rugguard/analysis.py b/rugguard/analysis.py index f073b21..eb44b69 100644 --- a/rugguard/analysis.py +++ b/rugguard/analysis.py @@ -9,6 +9,7 @@ from __future__ import annotations import concurrent.futures +import os import sys import threading import time @@ -29,7 +30,7 @@ fetch_token_meta, resolve_deployer, ) -from .rpc import WALLET_SCAN_WORKERS, _rpc_call, _rpc_semaphore +from .rpc import _rpc_call, _rpc_semaphore from .scoring import ( RugFlags, RugScore, @@ -41,6 +42,18 @@ compute_score_components, ) +# ── Configuración de concurrencia ───────────────────────────────────────────── +# Por defecto 4 workers, máximo 10. Se puede configurar con variable de entorno. +DEFAULT_WORKERS = 4 +MAX_WORKERS = 10 +WALLET_SCAN_WORKERS = min( + int(os.getenv("WALLET_SCAN_WORKERS", DEFAULT_WORKERS)), + MAX_WORKERS +) + +# Semáforo global para limitar llamadas RPC concurrentes +_rpc_semaphore = threading.Semaphore(WALLET_SCAN_WORKERS) + # ── Data Model ──────────────────────────────────────────────────────────────── @@ -217,6 +230,12 @@ def rug_check_token(mint: str) -> RugReport: class ProgressBar: + """Barra de progreso ANSI con soporte para concurrencia. + + Muestra: + [████████░░] 45/500 tokens (9%) | 3 risks found | ETA: 2m30s + """ + def __init__(self, total: int, enabled: bool = True): self.total = total self.current = 0 @@ -231,12 +250,15 @@ def update(self, risks_found: int): return elapsed = time.time() - self.start_time - eta_seconds = (elapsed / self.current) * (self.total - self.current) if self.current > 0 else 0 - eta_str = f"{int(eta_seconds // 60)}m{int(eta_seconds % 60)}s" + if self.current > 0: + eta_seconds = (elapsed / self.current) * (self.total - self.current) + else: + eta_seconds = 0 + eta_str = f"{int(eta_seconds // 60)}m{int(eta_seconds % 60)}s" if eta_seconds > 0 else "calculating" pct = int((self.current / self.total) * 100) if self.total > 0 else 100 - bars = int((self.current / self.total) * 10) if self.total > 0 else 10 - bar_str = "█" * bars + "░" * (10 - bars) + bars = int((self.current / self.total) * 20) if self.total > 0 else 20 # 20 caracteres de ancho + bar_str = "█" * bars + "░" * (20 - bars) msg = f"\r[{bar_str}] {self.current}/{self.total} tokens ({pct}%) | {self.risks} risks found | ETA: {eta_str}" sys.stdout.write(msg) @@ -248,11 +270,41 @@ def finish(self): sys.stdout.flush() -# ── Wallet Scanning ─────────────────────────────────────────────────────────── +# ── Wallet Scanning (Concurrente optimizado) ────────────────────────────── + + +def scan_token_safe(mint: str, amount: int, decimals: int) -> dict | None: + """Escanea un token de forma segura con manejo de excepciones y rate limiting.""" + with _rpc_semaphore: + try: + token = fetch_token_meta(mint) + if not token: + return None + mint_active, _, a_warnings = check_authorities(token) + quick_safety = 50 if mint_active else 80 + quick_level = "HIGH" if mint_active else "MEDIUM" + if quick_safety < 60: + return { + "mint": mint, + "symbol": token.symbol or f"{mint[:4]}...{mint[-4:]}", + "balance_raw": amount, + "decimals": decimals, + "safety_score": quick_safety, + "risk_level": quick_level, + "top_warnings": a_warnings[:3], + } + return None + except Exception as e: + # Si hay error, no romper el escaneo completo + return None def rug_check_wallet(address: str, disable_progress: bool = False) -> dict: - """Scan a wallet for risky tokens held.""" + """Escanea una wallet en busca de tokens riesgosos con concurrencia. + + Usa ThreadPoolExecutor para escanear tokens en paralelo. + El número de workers se configura con WALLET_SCAN_WORKERS (default 4, max 10). + """ result = _rpc_call( "getTokenAccountsByOwner", [ @@ -273,6 +325,7 @@ def rug_check_wallet(address: str, disable_progress: bool = False) -> dict: tokens = result["value"] risky_tokens = [] scannable_tokens = [] + for token_acct in tokens: acct_data = token_acct.get("account", {}).get("data", {}).get("parsed", {}).get("info", {}) mint = acct_data.get("mint", "") @@ -285,41 +338,27 @@ def rug_check_wallet(address: str, disable_progress: bool = False) -> dict: progress = ProgressBar(len(scannable_tokens), enabled=not disable_progress) risks_lock = threading.Lock() - def scan_token(item): - mint, amount, decimals = item - token = fetch_token_meta(mint) - if not token: - return None - mint_active, _, a_warnings = check_authorities(token) - quick_safety = 50 if mint_active else 80 - quick_level = "HIGH" if mint_active else "MEDIUM" - if quick_safety < 60: - return { - "mint": mint, - "symbol": token.symbol or f"{mint[:4]}...{mint[-4:]}", - "balance_raw": amount, - "decimals": decimals, - "safety_score": quick_safety, - "risk_level": quick_level, - "top_warnings": a_warnings[:3], - } - return None - - def scan_token_safe(item): - _rpc_semaphore.acquire() - try: - return scan_token(item) - finally: - _rpc_semaphore.release() - + # Usar ThreadPoolExecutor para escaneo concurrente + # El número de workers se toma de WALLET_SCAN_WORKERS with concurrent.futures.ThreadPoolExecutor(max_workers=WALLET_SCAN_WORKERS) as executor: - futures = {executor.submit(scan_token_safe, item): item for item in scannable_tokens} - for future in concurrent.futures.as_completed(futures): - res = future.result() - with risks_lock: - if res: - risky_tokens.append(res) - progress.update(len(risky_tokens)) + # Enviar todas las tareas + future_to_token = { + executor.submit(scan_token_safe, mint, amount, decimals): (mint, amount, decimals) + for mint, amount, decimals in scannable_tokens + } + + # Procesar resultados a medida que se completan + for future in concurrent.futures.as_completed(future_to_token): + try: + res = future.result() + with risks_lock: + if res: + risky_tokens.append(res) + progress.update(len(risky_tokens)) + except Exception as e: + # Si una tarea falla, continuar con las demás + with risks_lock: + progress.update(len(risky_tokens)) progress.finish() @@ -341,4 +380,4 @@ def scan_token_safe(item): "risky_count": risky_count, "risky_tokens": risky_tokens, "summary": summary, - } + } \ No newline at end of file