diff --git a/scripts/rugguard.py b/scripts/rugguard.py index dcd51cc..1536bd0 100644 --- a/scripts/rugguard.py +++ b/scripts/rugguard.py @@ -1627,7 +1627,132 @@ def format_markdown(report: RugReport) -> str: def format_json(report: RugReport) -> str: """Format report as pretty JSON.""" return json.dumps(report.to_dict(), indent=2, default=str) -# ── CLI Entry Point ──────────────────────────────────────────────────────── + + +# ── Comparison Table ────────────────────────────────────────────────────── + + +def _format_comparison_table(reports: list[RugReport], + sort_by: str = "score") -> str: + """Render a side-by-side ASCII comparison table for multiple tokens. + + Each column = one token, auto-sized to widest value. + Tokens are sorted by safety_score ascending (riskiest first). + """ + if not reports: + return "" + + # Sort: score asc (riskiest first) + if sort_by == "score": + key_fn = lambda r: r.safety_score + elif sort_by == "name": + key_fn = lambda r: (r.token.name or r.token.symbol or r.token.address) + elif sort_by == "age": + key_fn = lambda r: -r.score.age_risk # less age risk = older + elif sort_by == "liquidity": + key_fn = lambda r: -(r.dex_data.get("liquidity_usd", 0) if r.dex_data else 0) + else: + key_fn = lambda r: r.safety_score + sorted_reports = sorted(reports, key=key_fn) + + # Build rows: each is a list of values, one per token + headers = ["Metric"] + for r in sorted_reports: + sym = r.token.symbol or r.token.name or r.token.address[:8] + headers.append(sym) + + rows: list[list[str]] = [headers] + + def val(name: str, vals: list[str]) -> None: + rows.append([name] + vals) + + val("Safety Score", [str(r.safety_score) for r in sorted_reports]) + val("Risk Level", [r.risk_level for r in sorted_reports]) + + # Price + prices = [] + for r in sorted_reports: + if r.dex_data and r.dex_data.get("price_usd"): + p = r.dex_data["price_usd"] + prices.append(f"${p:.8f}" if p < 1 else f"${p:.4f}") + else: + prices.append("—") + val("Price", prices) + + # 24h Change + changes = [] + for r in sorted_reports: + if r.dex_data and r.dex_data.get("price_change_24h"): + pct = r.dex_data["price_change_24h"] + changes.append(f"{pct:+.2f}%") + else: + changes.append("—") + val("24h Change", changes) + + # Liquidity + liqs = [] + for r in sorted_reports: + if r.dex_data and r.dex_data.get("liquidity_usd"): + liqs.append(f"${r.dex_data['liquidity_usd']:,.0f}") + else: + liqs.append("—") + val("Liquidity", liqs) + + # Volume 24h + vols = [] + for r in sorted_reports: + if r.dex_data and r.dex_data.get("volume_24h"): + vols.append(f"${r.dex_data['volume_24h']:,.0f}") + else: + vols.append("—") + val("Volume 24h", vols) + + # Holders + hldrs = [] + for r in sorted_reports: + if r.holders: + hldrs.append(str(r.holders.total_holders)) + else: + hldrs.append("—") + val("Holders", hldrs) + + # Top 10 % + top10 = [] + for r in sorted_reports: + if r.holders: + top10.append(f"{r.holders.top_10_pct:.1f}%") + else: + top10.append("—") + val("Top 10%", top10) + + # Warnings count + wcounts = [str(len(r.warnings)) for r in sorted_reports] + val("Warnings", wcounts) + + # Auto-size column widths + col_widths: list[int] = [] + for ci in range(len(rows[0])): + col_widths.append(max(len(r[ci]) for r in rows)) + col_widths[0] = max(col_widths[0], 12) # min metric column width + + # Render table + sep = " | " + lines = [] + # Header + hdr_parts = [h.ljust(col_widths[i]) for i, h in enumerate(rows[0])] + lines.append(sep.join(hdr_parts)) + # Separator + sep_parts = ["-" * col_widths[i] for i in range(len(col_widths))] + lines.append(sep.join(sep_parts)) + # Rows + for r in rows[1:]: + parts = [r[i].ljust(col_widths[i]) for i in range(len(r))] + lines.append(sep.join(parts)) + + return "\n".join(lines) + + +# ── Export Helpers (CSV / JSONL) ────────────────────────────────────────── def cli_token(args: list[str]) -> None: mint = args[0] if args else "" @@ -1660,12 +1785,67 @@ def cli_wallet(args: list[str]) -> None: if result.get("risky_count", 0) > 0: sys.exit(2) + + +def cli_compare(args: list[str]) -> None: + """Compare multiple tokens side-by-side.""" + if not args: + print('Usage: python rugguard.py compare [ ...]', file=sys.stderr) + sys.exit(1) + + # Parse flags + as_json = "--json" in args + sort_by = "score" + mints = [] + for a in args: + if a == "--json": + continue + if a.startswith("--sort="): + sort_by = a.split("=", 1)[1] + elif a == "--sort" and args.index(a) + 1 < len(args): + continue # handled by next arg + else: + mints.append(a) + + if len(mints) < 2: + print("Error: need at least 2 mint addresses to compare", file=sys.stderr) + sys.exit(1) + + reports: list[RugReport] = [] + errors: list[str] = [] + + for mint in mints: + try: + report = rug_check_token(mint.strip()) + reports.append(report) + except Exception as e: + errors.append(f"{mint[:8]}...: {e}") + # Insert a minimal placeholder for failed token + reports.append(RugReport( + token=TokenMeta(address=mint), + safety_score=0, risk_level="ERROR", + score=RugScore(), flags=RugFlags(), + warnings=[], recommendation="Check failed", + )) + + if errors: + for e in errors: + print(f"Warning: {e}", file=sys.stderr) + + if as_json: + import json as _json + print(_json.dumps([r.to_dict() for r in reports], indent=2, default=str)) + else: + print(_format_comparison_table(reports, sort_by=sort_by)) + + def cli_help() -> None: print("""Solana Rug Guard — On-chain rug-pull detection engine USAGE: python rugguard.py token [--json|--markdown] python rugguard.py wallet + python rugguard.py compare [ ...] [--json] python rugguard.py watch [--interval 60] [--iterations 0] [--history PATH] [--webhook URL] [--threshold SCORE] @@ -1673,6 +1853,10 @@ def cli_help() -> None: --json Output as JSON (default for token) --markdown Output as Markdown report --md Alias for --markdown + --export csv Export as CSV (compatible with spreadsheets) + --export jsonl + Export as JSONL (one JSON object per line) + --sort Sort tokens in compare output (default: score) WATCH OPTIONS: --interval Seconds between checks (default: 60) @@ -1685,6 +1869,8 @@ def cli_help() -> None: python rugguard.py token DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263 python rugguard.py token DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263 --markdown python rugguard.py wallet 9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM + python rugguard.py wallet 9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM --export jsonl + python rugguard.py compare DezXAZ8z... EPjFWdd5... [--json] python rugguard.py watch --iterations 1 --threshold 70 ENVIRONMENT: @@ -1705,6 +1891,8 @@ def main() -> None: cli_token(args) elif cmd == "wallet": cli_wallet(args) + elif cmd == "compare": + cli_compare(args) elif cmd == "watch": cli_watch(args) else: diff --git a/tests/test_checks.py b/tests/test_checks.py index 27ac3ce..d26fa89 100644 --- a/tests/test_checks.py +++ b/tests/test_checks.py @@ -450,4 +450,48 @@ def test_wallet_scan() -> None: assert "address" in result assert result["address"] == TEST_WALLET assert "total_tokens" in result - assert isinstance(result["total_tokens"], int) + assert isinstance(result['total_tokens'], int) + + +# ── Comparison Tests ────────────────────────────────────────────────────── + +class TestCompare: + def test_table_two_tokens(self): + from rugguard import _format_comparison_table, RugScore, RugFlags, TokenMeta + flags = RugFlags() + r1 = RugReport( + token=TokenMeta(address='A'), safety_score=80, risk_level='LOW', + score=RugScore(), flags=flags, warnings=[], recommendation='', + ) + r2 = RugReport( + token=TokenMeta(address='B'), safety_score=30, risk_level='HIGH', + score=RugScore(), flags=flags, warnings=['Flag'], recommendation='', + ) + tbl = _format_comparison_table([r1, r2]) + assert 'Safety Score' in tbl + assert '80' in tbl + assert '30' in tbl + assert 'HIGH' in tbl + # Rows should be separated by dashes + assert '---' in tbl + + def test_sort_by_name(self): + from rugguard import _format_comparison_table, RugScore, RugFlags, TokenMeta + flags = RugFlags() + r1 = RugReport( + token=TokenMeta(address='A', symbol='ZZZ'), safety_score=50, + risk_level='MEDIUM', score=RugScore(), flags=flags, warnings=[], recommendation='', + ) + r2 = RugReport( + token=TokenMeta(address='B', symbol='AAA'), safety_score=50, + risk_level='MEDIUM', score=RugScore(), flags=flags, warnings=[], recommendation='', + ) + tbl = _format_comparison_table([r1, r2], sort_by='name') + # AAA should come before ZZZ + idx_aaa = tbl.index('AAA') + idx_zzz = tbl.index('ZZZ') + assert idx_aaa < idx_zzz, 'AAA should be before ZZZ when sorted by name' + + def test_empty_no_crash(self): + from rugguard import _format_comparison_table + assert _format_comparison_table([]) == ''