Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ Webhook payloads use the same JSON event shape and are sent only when a change/t
| `SOLANA_RUG_LIQ_THRESHOLD_LOW` | `100000` | Liquidity below this is scored as low risk (1pt). |
| `SOLANA_RUG_LIQ_VOL_RATIO_WARNING` | `15` | Volume/liquidity ratio above this triggers a wash-trading warning (+3pts). |
| `SOLANA_RUG_LIQ_VOL_RATIO_MIN` | `0.05` | Volume/liquidity ratio below this flags an inactive/dead pool (+3pts). |
| `WALLET_SCAN_WORKERS` | `4` | Max concurrency limit for wallet token scanning (max 10). |

---

Expand Down
128 changes: 101 additions & 27 deletions scripts/rugguard.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@

from __future__ import annotations

import concurrent.futures
import json
import os
import sqlite3
import sys
import threading
import time
import urllib.request
import uuid
Expand All @@ -34,6 +36,7 @@
]

RPC_URL = os.environ.get("SOLANA_RPC_URL", PUBLIC_RPCS[0])
WALLET_SCAN_WORKERS = min(10, max(1, int(os.environ.get("WALLET_SCAN_WORKERS", "4"))))

LAMPORTS_PER_SOL = 1_000_000_000

Expand All @@ -57,6 +60,20 @@ def _cached(key: str, ttl: int = CACHE_TTL) -> Any | None:
return None
def _set_cache(key: str, value: Any, ttl: int = CACHE_TTL) -> None:
_cache[key] = (time.time(), value)

# ── Concurrency Controls ───────────────────────────────────────────────────
_rpc_semaphore = threading.Semaphore(WALLET_SCAN_WORKERS)
_rpc_rate_limit_lock = threading.Lock()
_current_workers = WALLET_SCAN_WORKERS

def _reduce_concurrency():
global _current_workers
with _rpc_rate_limit_lock:
if _current_workers > 1:
_current_workers -= 1
# Permanently consume one semaphore permit to reduce max concurrency
_rpc_semaphore.acquire(blocking=False)

# ── HTTP / RPC Helpers (stdlib-only) ───────────────────────────────────────

def _http_post(url: str, payload: dict, timeout: int = 6) -> dict | None:
Expand All @@ -74,7 +91,11 @@ def _http_post(url: str, payload: dict, timeout: int = 6) -> dict | None:
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
except (urllib.error.URLError, urllib.error.HTTPError, OSError, json.JSONDecodeError):
except urllib.error.HTTPError as e:
if e.code == 429:
_reduce_concurrency()
return None
except (urllib.error.URLError, OSError, json.JSONDecodeError):
return None
def _rpc_call(
method: str,
Expand All @@ -97,7 +118,8 @@ def _rpc_call(
for attempt in range(retries + 1):
resp = _http_post(url, payload)
if resp is None:
# Try next RPC (but stay pinned if pin_rpc)
# Maybe a 429 happened, backoff slightly before retrying
time.sleep(2.0)
if not pin_rpc:
url = _next_rpc()
continue
Expand Down Expand Up @@ -1211,7 +1233,38 @@ def rug_check_token(mint: str) -> RugReport:
warnings=list(dict.fromkeys(all_warnings)),
dex_data=dex_data,
)
def rug_check_wallet(address: str) -> dict:
class ProgressBar:
def __init__(self, total: int, enabled: bool = True):
self.total = total
self.current = 0
self.risks = 0
self.start_time = time.time()
self.enabled = enabled and sys.stdout.isatty()

def update(self, risks_found: int):
self.current += 1
self.risks = risks_found
if not self.enabled:
return

elapsed = time.time() - self.start_time
eta_seconds = (elapsed / self.current) * (self.total - self.current) if self.current > 0 else 0
eta_str = f"{int(eta_seconds // 60)}m{int(eta_seconds % 60)}s"

pct = int((self.current / self.total) * 100) if self.total > 0 else 100
bars = int((self.current / self.total) * 10) if self.total > 0 else 10
bar_str = "█" * bars + "░" * (10 - bars)

msg = f"\r[{bar_str}] {self.current}/{self.total} tokens ({pct}%) | {self.risks} risks found | ETA: {eta_str}"
sys.stdout.write(msg)
sys.stdout.flush()

def finish(self):
if self.enabled:
sys.stdout.write("\n")
sys.stdout.flush()

def rug_check_wallet(address: str, disable_progress: bool = False) -> dict:
"""Scan a wallet for risky tokens held."""
result = _rpc_call("getTokenAccountsByOwner", [
address,
Expand All @@ -1229,36 +1282,56 @@ def rug_check_wallet(address: str) -> dict:

tokens = result["value"]
risky_tokens = []

scannable_tokens = []
for token_acct in tokens:
acct_data = token_acct.get("account", {}).get("data", {}).get("parsed", {}).get("info", {})
mint = acct_data.get("mint", "")
amount = int(acct_data.get("tokenAmount", {}).get("amount", "0"))
decimals = acct_data.get("tokenAmount", {}).get("decimals", 0)

if amount == 0:
continue
if amount > 0 and decimals > 0 and amount > 10 ** (decimals - 4):
scannable_tokens.append((mint, amount, decimals))

# Only scan tokens with meaningful value (more than 0.01 in raw amount)
if decimals > 0 and amount > 10 ** (decimals - 4):
# Quick check: just metadata + authorities (fast, no LP/holders)
token = fetch_token_meta(mint)
if not token:
continue
mint_active, _, a_warnings = check_authorities(token)
# Quick risk: mint not revoked = high risk
quick_safety = 50 if mint_active else 80
quick_level = "HIGH" if mint_active else "MEDIUM"
if quick_safety < 60:
risky_tokens.append({
"mint": mint,
"symbol": token.symbol or f"{mint[:4]}...{mint[-4:]}",
"balance_raw": amount,
"decimals": decimals,
"safety_score": quick_safety,
"risk_level": quick_level,
"top_warnings": a_warnings[:3],
})
progress = ProgressBar(len(scannable_tokens), enabled=not disable_progress)
risks_lock = threading.Lock()

def scan_token(item):
mint, amount, decimals = item
token = fetch_token_meta(mint)
if not token:
return None
mint_active, _, a_warnings = check_authorities(token)
quick_safety = 50 if mint_active else 80
quick_level = "HIGH" if mint_active else "MEDIUM"
if quick_safety < 60:
return {
"mint": mint,
"symbol": token.symbol or f"{mint[:4]}...{mint[-4:]}",
"balance_raw": amount,
"decimals": decimals,
"safety_score": quick_safety,
"risk_level": quick_level,
"top_warnings": a_warnings[:3],
}
return None

def scan_token_safe(item):
_rpc_semaphore.acquire()
try:
return scan_token(item)
finally:
_rpc_semaphore.release()

with concurrent.futures.ThreadPoolExecutor(max_workers=WALLET_SCAN_WORKERS) as executor:
futures = {executor.submit(scan_token_safe, item): item for item in scannable_tokens}
for future in concurrent.futures.as_completed(futures):
res = future.result()
with risks_lock:
if res:
risky_tokens.append(res)
progress.update(len(risky_tokens))

progress.finish()

risky_tokens.sort(key=lambda t: t["safety_score"])
total_tokens = len(tokens)
Expand Down Expand Up @@ -1698,7 +1771,8 @@ def cli_wallet(args: list[str]) -> None:
print('Usage: python rugguard.py wallet <ADDRESS>', file=sys.stderr)
sys.exit(1)

result = rug_check_wallet(address.strip())
disable_progress = "--json" in args or "--export" in args
result = rug_check_wallet(address.strip(), disable_progress=disable_progress)
print(json.dumps(result, indent=2, default=str))

if result.get("risky_count", 0) > 0:
Expand Down