Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 81 additions & 42 deletions rugguard/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from __future__ import annotations

import concurrent.futures
import os
import sys
import threading
import time
Expand All @@ -29,7 +30,7 @@
fetch_token_meta,
resolve_deployer,
)
from .rpc import WALLET_SCAN_WORKERS, _rpc_call, _rpc_semaphore
from .rpc import _rpc_call, _rpc_semaphore
from .scoring import (
RugFlags,
RugScore,
Expand All @@ -41,6 +42,18 @@
compute_score_components,
)

# ── Configuración de concurrencia ─────────────────────────────────────────────
# Por defecto 4 workers, máximo 10. Se puede configurar con variable de entorno.
DEFAULT_WORKERS = 4
MAX_WORKERS = 10
WALLET_SCAN_WORKERS = min(
int(os.getenv("WALLET_SCAN_WORKERS", DEFAULT_WORKERS)),
MAX_WORKERS
)

# Semáforo global para limitar llamadas RPC concurrentes
_rpc_semaphore = threading.Semaphore(WALLET_SCAN_WORKERS)

# ── Data Model ────────────────────────────────────────────────────────────────


Expand Down Expand Up @@ -217,6 +230,12 @@ def rug_check_token(mint: str) -> RugReport:


class ProgressBar:
"""Barra de progreso ANSI con soporte para concurrencia.

Muestra:
[████████░░] 45/500 tokens (9%) | 3 risks found | ETA: 2m30s
"""

def __init__(self, total: int, enabled: bool = True):
self.total = total
self.current = 0
Expand All @@ -231,12 +250,15 @@ def update(self, risks_found: int):
return

elapsed = time.time() - self.start_time
eta_seconds = (elapsed / self.current) * (self.total - self.current) if self.current > 0 else 0
eta_str = f"{int(eta_seconds // 60)}m{int(eta_seconds % 60)}s"
if self.current > 0:
eta_seconds = (elapsed / self.current) * (self.total - self.current)
else:
eta_seconds = 0
eta_str = f"{int(eta_seconds // 60)}m{int(eta_seconds % 60)}s" if eta_seconds > 0 else "calculating"

pct = int((self.current / self.total) * 100) if self.total > 0 else 100
bars = int((self.current / self.total) * 10) if self.total > 0 else 10
bar_str = "█" * bars + "░" * (10 - bars)
bars = int((self.current / self.total) * 20) if self.total > 0 else 20 # 20 caracteres de ancho
bar_str = "█" * bars + "░" * (20 - bars)

msg = f"\r[{bar_str}] {self.current}/{self.total} tokens ({pct}%) | {self.risks} risks found | ETA: {eta_str}"
sys.stdout.write(msg)
Expand All @@ -248,11 +270,41 @@ def finish(self):
sys.stdout.flush()


# ── Wallet Scanning ───────────────────────────────────────────────────────────
# ── Wallet Scanning (Concurrente optimizado) ──────────────────────────────


def scan_token_safe(mint: str, amount: int, decimals: int) -> dict | None:
"""Escanea un token de forma segura con manejo de excepciones y rate limiting."""
with _rpc_semaphore:
try:
token = fetch_token_meta(mint)
if not token:
return None
mint_active, _, a_warnings = check_authorities(token)
quick_safety = 50 if mint_active else 80
quick_level = "HIGH" if mint_active else "MEDIUM"
if quick_safety < 60:
return {
"mint": mint,
"symbol": token.symbol or f"{mint[:4]}...{mint[-4:]}",
"balance_raw": amount,
"decimals": decimals,
"safety_score": quick_safety,
"risk_level": quick_level,
"top_warnings": a_warnings[:3],
}
return None
except Exception as e:
# Si hay error, no romper el escaneo completo
return None


def rug_check_wallet(address: str, disable_progress: bool = False) -> dict:
"""Scan a wallet for risky tokens held."""
"""Escanea una wallet en busca de tokens riesgosos con concurrencia.

Usa ThreadPoolExecutor para escanear tokens en paralelo.
El número de workers se configura con WALLET_SCAN_WORKERS (default 4, max 10).
"""
result = _rpc_call(
"getTokenAccountsByOwner",
[
Expand All @@ -273,6 +325,7 @@ def rug_check_wallet(address: str, disable_progress: bool = False) -> dict:
tokens = result["value"]
risky_tokens = []
scannable_tokens = []

for token_acct in tokens:
acct_data = token_acct.get("account", {}).get("data", {}).get("parsed", {}).get("info", {})
mint = acct_data.get("mint", "")
Expand All @@ -285,41 +338,27 @@ def rug_check_wallet(address: str, disable_progress: bool = False) -> dict:
progress = ProgressBar(len(scannable_tokens), enabled=not disable_progress)
risks_lock = threading.Lock()

def scan_token(item):
mint, amount, decimals = item
token = fetch_token_meta(mint)
if not token:
return None
mint_active, _, a_warnings = check_authorities(token)
quick_safety = 50 if mint_active else 80
quick_level = "HIGH" if mint_active else "MEDIUM"
if quick_safety < 60:
return {
"mint": mint,
"symbol": token.symbol or f"{mint[:4]}...{mint[-4:]}",
"balance_raw": amount,
"decimals": decimals,
"safety_score": quick_safety,
"risk_level": quick_level,
"top_warnings": a_warnings[:3],
}
return None

def scan_token_safe(item):
_rpc_semaphore.acquire()
try:
return scan_token(item)
finally:
_rpc_semaphore.release()

# Usar ThreadPoolExecutor para escaneo concurrente
# El número de workers se toma de WALLET_SCAN_WORKERS
with concurrent.futures.ThreadPoolExecutor(max_workers=WALLET_SCAN_WORKERS) as executor:
futures = {executor.submit(scan_token_safe, item): item for item in scannable_tokens}
for future in concurrent.futures.as_completed(futures):
res = future.result()
with risks_lock:
if res:
risky_tokens.append(res)
progress.update(len(risky_tokens))
# Enviar todas las tareas
future_to_token = {
executor.submit(scan_token_safe, mint, amount, decimals): (mint, amount, decimals)
for mint, amount, decimals in scannable_tokens
}

# Procesar resultados a medida que se completan
for future in concurrent.futures.as_completed(future_to_token):
try:
res = future.result()
with risks_lock:
if res:
risky_tokens.append(res)
progress.update(len(risky_tokens))
except Exception as e:
# Si una tarea falla, continuar con las demás
with risks_lock:
progress.update(len(risky_tokens))

progress.finish()

Expand All @@ -341,4 +380,4 @@ def scan_token_safe(item):
"risky_count": risky_count,
"risky_tokens": risky_tokens,
"summary": summary,
}
}