diff --git a/scripts/rugguard.py b/scripts/rugguard.py index 4e6ff39..cba15a7 100644 --- a/scripts/rugguard.py +++ b/scripts/rugguard.py @@ -1745,6 +1745,129 @@ def format_jsonl(rows: list[dict]) -> str: return "\n".join(lines) + "\n" if lines else "" +# ── Comparison Table ────────────────────────────────────────────────────── + + +def _format_comparison_table(reports: list[RugReport], + sort_by: str = "score") -> str: + """Render a side-by-side ASCII comparison table for multiple tokens. + + Each column = one token, auto-sized to widest value. + Tokens are sorted by safety_score ascending (riskiest first). + """ + if not reports: + return "" + + # Sort: score asc (riskiest first) + def _sort_key_score(r: RugReport) -> int: + return r.safety_score + + def _sort_key_name(r: RugReport) -> str: + return r.token.name or r.token.symbol or r.token.address + + def _sort_key_age(r: RugReport) -> int: + return -r.score.age_risk + + def _sort_key_liquidity(r: RugReport) -> float: + return -(r.dex_data.get("liquidity_usd", 0) if r.dex_data else 0) + + if sort_by == "score": + key_fn = _sort_key_score + elif sort_by == "name": + key_fn = _sort_key_name + elif sort_by == "age": + key_fn = _sort_key_age + elif sort_by == "liquidity": + key_fn = _sort_key_liquidity + else: + key_fn = _sort_key_score + sorted_reports = sorted(reports, key=key_fn) + + # Build rows: each is a list of values, one per token + headers = ["Metric"] + for r in sorted_reports: + sym = r.token.symbol or r.token.name or r.token.address[:8] + headers.append(sym) + + rows: list[list[str]] = [headers] + + def val(name: str, vals: list[str]) -> None: + rows.append([name] + vals) + + val("Safety Score", [str(r.safety_score) for r in sorted_reports]) + val("Risk Level", [r.risk_level for r in sorted_reports]) + + prices = [] + for r in sorted_reports: + if r.dex_data and r.dex_data.get("price_usd"): + p = r.dex_data["price_usd"] + prices.append(f"${p:.8f}" if p < 1 else f"${p:.4f}") + else: + prices.append("-") + val("Price", prices) + + changes = [] + for r in sorted_reports: + if r.dex_data and r.dex_data.get("price_change_24h"): + pct = r.dex_data["price_change_24h"] + changes.append(f"{pct:+.2f}%") + else: + changes.append("-") + val("24h Change", changes) + + liqs = [] + for r in sorted_reports: + if r.dex_data and r.dex_data.get("liquidity_usd"): + liqs.append(f"${r.dex_data['liquidity_usd']:,.0f}") + else: + liqs.append("-") + val("Liquidity", liqs) + + vols = [] + for r in sorted_reports: + if r.dex_data and r.dex_data.get("volume_24h"): + vols.append(f"${r.dex_data['volume_24h']:,.0f}") + else: + vols.append("-") + val("Volume 24h", vols) + + hldrs = [] + for r in sorted_reports: + if r.holders: + hldrs.append(str(r.holders.total_holders)) + else: + hldrs.append("-") + val("Holders", hldrs) + + top10 = [] + for r in sorted_reports: + if r.holders: + top10.append(f"{r.holders.top_10_pct:.1f}%") + else: + top10.append("-") + val("Top 10%", top10) + + wcounts = [str(len(r.warnings)) for r in sorted_reports] + val("Warnings", wcounts) + + col_widths: list[int] = [] + for ci in range(len(rows[0])): + col_widths.append(max(len(r[ci]) for r in rows)) + col_widths[0] = max(col_widths[0], 12) + + sep = " | " + lines = [] + hdr_parts = [h.ljust(col_widths[i]) for i, h in enumerate(rows[0])] + lines.append(sep.join(hdr_parts)) + sep_parts = ["-" * col_widths[i] for i in range(len(col_widths))] + lines.append(sep.join(sep_parts)) + for r in rows[1:]: + parts = [r[i].ljust(col_widths[i]) for i in range(len(r))] + lines.append(sep.join(parts)) + + return "\n".join(lines) + + # ── CLI Entry Point ──────────────────────────────────────────────────────── def cli_token(args: list[str]) -> None: @@ -1831,12 +1954,74 @@ def cli_wallet(args: list[str]) -> None: if result.get("risky_count", 0) > 0: sys.exit(2) + + +def cli_compare(args: list[str]) -> None: + """Compare multiple tokens side-by-side.""" + if not args: + print('Usage: python rugguard.py compare [ ...]', file=sys.stderr) + sys.exit(1) + + # Parse flags + as_json = "--json" in args + sort_by = "score" + mints = [] + i = 0 + while i < len(args): + a = args[i] + if a == "--json": + i += 1 + continue + if a == "--sort" and i + 1 < len(args): + sort_by = args[i + 1] + i += 2 + continue + if a.startswith("--sort="): + sort_by = a.split("=", 1)[1] + i += 1 + continue + mints.append(a) + i += 1 + + if len(mints) < 2: + print("Error: need at least 2 mint addresses to compare", file=sys.stderr) + sys.exit(1) + + reports: list[RugReport] = [] + errors: list[str] = [] + + for mint in mints: + try: + report = rug_check_token(mint.strip()) + reports.append(report) + except Exception as e: + errors.append(f"{mint[:8]}...: {e}") + # Insert a minimal placeholder for failed token + reports.append(RugReport( + token=TokenMeta(address=mint), + safety_score=0, risk_level="ERROR", + score=RugScore(), flags=RugFlags(), + warnings=[], recommendation="Check failed", + )) + + if errors: + for e in errors: + print(f"Warning: {e}", file=sys.stderr) + + if as_json: + import json as _json + print(_json.dumps([r.to_dict() for r in reports], indent=2, default=str)) + else: + print(_format_comparison_table(reports, sort_by=sort_by)) + + def cli_help() -> None: print("""Solana Rug Guard — On-chain rug-pull detection engine USAGE: python rugguard.py token [--json|--markdown] python rugguard.py wallet + python rugguard.py compare [ ...] [--json] python rugguard.py watch [--interval 60] [--iterations 0] [--history PATH] [--webhook URL] [--threshold SCORE] @@ -1844,6 +2029,10 @@ def cli_help() -> None: --json Output as JSON (default for token) --markdown Output as Markdown report --md Alias for --markdown + --export csv Export as CSV (compatible with spreadsheets) + --export jsonl + Export as JSONL (one JSON object per line) + --sort Sort tokens in compare output (default: score) WATCH OPTIONS: --interval Seconds between checks (default: 60) @@ -1856,6 +2045,8 @@ def cli_help() -> None: python rugguard.py token DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263 python rugguard.py token DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263 --markdown python rugguard.py wallet 9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM + python rugguard.py wallet 9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM --export jsonl + python rugguard.py compare DezXAZ8z... EPjFWdd5... [--json] python rugguard.py watch --iterations 1 --threshold 70 ENVIRONMENT: @@ -1876,6 +2067,8 @@ def main() -> None: cli_token(args) elif cmd == "wallet": cli_wallet(args) + elif cmd == "compare": + cli_compare(args) elif cmd == "watch": cli_watch(args) else: diff --git a/scripts/rugguard.py.rej b/scripts/rugguard.py.rej new file mode 100644 index 0000000..b9bd6f7 --- /dev/null +++ b/scripts/rugguard.py.rej @@ -0,0 +1,135 @@ +diff a/scripts/rugguard.py b/scripts/rugguard.py (rejected hunks) +@@ -1627,7 +1627,132 @@ def format_markdown(report: RugReport) -> str: + def format_json(report: RugReport) -> str: + """Format report as pretty JSON.""" + return json.dumps(report.to_dict(), indent=2, default=str) +-# ── CLI Entry Point ──────────────────────────────────────────────────────── ++ ++ ++# ── Comparison Table ────────────────────────────────────────────────────── ++ ++ ++def _format_comparison_table(reports: list[RugReport], ++ sort_by: str = "score") -> str: ++ """Render a side-by-side ASCII comparison table for multiple tokens. ++ ++ Each column = one token, auto-sized to widest value. ++ Tokens are sorted by safety_score ascending (riskiest first). ++ """ ++ if not reports: ++ return "" ++ ++ # Sort: score asc (riskiest first) ++ if sort_by == "score": ++ key_fn = lambda r: r.safety_score ++ elif sort_by == "name": ++ key_fn = lambda r: (r.token.name or r.token.symbol or r.token.address) ++ elif sort_by == "age": ++ key_fn = lambda r: -r.score.age_risk # less age risk = older ++ elif sort_by == "liquidity": ++ key_fn = lambda r: -(r.dex_data.get("liquidity_usd", 0) if r.dex_data else 0) ++ else: ++ key_fn = lambda r: r.safety_score ++ sorted_reports = sorted(reports, key=key_fn) ++ ++ # Build rows: each is a list of values, one per token ++ headers = ["Metric"] ++ for r in sorted_reports: ++ sym = r.token.symbol or r.token.name or r.token.address[:8] ++ headers.append(sym) ++ ++ rows: list[list[str]] = [headers] ++ ++ def val(name: str, vals: list[str]) -> None: ++ rows.append([name] + vals) ++ ++ val("Safety Score", [str(r.safety_score) for r in sorted_reports]) ++ val("Risk Level", [r.risk_level for r in sorted_reports]) ++ ++ # Price ++ prices = [] ++ for r in sorted_reports: ++ if r.dex_data and r.dex_data.get("price_usd"): ++ p = r.dex_data["price_usd"] ++ prices.append(f"${p:.8f}" if p < 1 else f"${p:.4f}") ++ else: ++ prices.append("—") ++ val("Price", prices) ++ ++ # 24h Change ++ changes = [] ++ for r in sorted_reports: ++ if r.dex_data and r.dex_data.get("price_change_24h"): ++ pct = r.dex_data["price_change_24h"] ++ changes.append(f"{pct:+.2f}%") ++ else: ++ changes.append("—") ++ val("24h Change", changes) ++ ++ # Liquidity ++ liqs = [] ++ for r in sorted_reports: ++ if r.dex_data and r.dex_data.get("liquidity_usd"): ++ liqs.append(f"${r.dex_data['liquidity_usd']:,.0f}") ++ else: ++ liqs.append("—") ++ val("Liquidity", liqs) ++ ++ # Volume 24h ++ vols = [] ++ for r in sorted_reports: ++ if r.dex_data and r.dex_data.get("volume_24h"): ++ vols.append(f"${r.dex_data['volume_24h']:,.0f}") ++ else: ++ vols.append("—") ++ val("Volume 24h", vols) ++ ++ # Holders ++ hldrs = [] ++ for r in sorted_reports: ++ if r.holders: ++ hldrs.append(str(r.holders.total_holders)) ++ else: ++ hldrs.append("—") ++ val("Holders", hldrs) ++ ++ # Top 10 % ++ top10 = [] ++ for r in sorted_reports: ++ if r.holders: ++ top10.append(f"{r.holders.top_10_pct:.1f}%") ++ else: ++ top10.append("—") ++ val("Top 10%", top10) ++ ++ # Warnings count ++ wcounts = [str(len(r.warnings)) for r in sorted_reports] ++ val("Warnings", wcounts) ++ ++ # Auto-size column widths ++ col_widths: list[int] = [] ++ for ci in range(len(rows[0])): ++ col_widths.append(max(len(r[ci]) for r in rows)) ++ col_widths[0] = max(col_widths[0], 12) # min metric column width ++ ++ # Render table ++ sep = " | " ++ lines = [] ++ # Header ++ hdr_parts = [h.ljust(col_widths[i]) for i, h in enumerate(rows[0])] ++ lines.append(sep.join(hdr_parts)) ++ # Separator ++ sep_parts = ["-" * col_widths[i] for i in range(len(col_widths))] ++ lines.append(sep.join(sep_parts)) ++ # Rows ++ for r in rows[1:]: ++ parts = [r[i].ljust(col_widths[i]) for i in range(len(r))] ++ lines.append(sep.join(parts)) ++ ++ return "\n".join(lines) ++ ++ ++# ── Export Helpers (CSV / JSONL) ────────────────────────────────────────── + + def cli_token(args: list[str]) -> None: + mint = args[0] if args else "" diff --git a/tests/test_checks.py b/tests/test_checks.py index c59bf68..8ab29a3 100644 --- a/tests/test_checks.py +++ b/tests/test_checks.py @@ -652,3 +652,45 @@ def test_export_implies_json_mode(self) -> None: # CSV should not contain markdown headers assert "# " not in csv_out assert "|" not in csv_out + + +# ── Comparison Tests ────────────────────────────────────────────────────── + +class TestCompare: + def test_table_two_tokens(self): + from rugguard import RugFlags, RugScore, TokenMeta, _format_comparison_table + flags = RugFlags() + r1 = RugReport( + token=TokenMeta(address="A"), safety_score=80, risk_level="LOW", + score=RugScore(), flags=flags, warnings=[], recommendation="", + ) + r2 = RugReport( + token=TokenMeta(address="B"), safety_score=30, risk_level="HIGH", + score=RugScore(), flags=flags, warnings=["Flag"], recommendation="", + ) + tbl = _format_comparison_table([r1, r2]) + assert "Safety Score" in tbl + assert "80" in tbl + assert "30" in tbl + assert "HIGH" in tbl + assert "---" in tbl + + def test_sort_by_name(self): + from rugguard import RugFlags, RugScore, TokenMeta, _format_comparison_table + flags = RugFlags() + r1 = RugReport( + token=TokenMeta(address="A", symbol="ZZZ"), safety_score=50, + risk_level="MEDIUM", score=RugScore(), flags=flags, warnings=[], recommendation="", + ) + r2 = RugReport( + token=TokenMeta(address="B", symbol="AAA"), safety_score=50, + risk_level="MEDIUM", score=RugScore(), flags=flags, warnings=[], recommendation="", + ) + tbl = _format_comparison_table([r1, r2], sort_by="name") + idx_aaa = tbl.index("AAA") + idx_zzz = tbl.index("ZZZ") + assert idx_aaa < idx_zzz, "AAA should be before ZZZ when sorted by name" + + def test_empty_no_crash(self): + from rugguard import _format_comparison_table + assert _format_comparison_table([]) == "" diff --git a/tests/test_checks.py.rej b/tests/test_checks.py.rej new file mode 100644 index 0000000..184619d --- /dev/null +++ b/tests/test_checks.py.rej @@ -0,0 +1,51 @@ +diff a/tests/test_checks.py b/tests/test_checks.py (rejected hunks) +@@ -450,4 +450,48 @@ def test_wallet_scan() -> None: + assert "address" in result + assert result["address"] == TEST_WALLET + assert "total_tokens" in result +- assert isinstance(result["total_tokens"], int) ++ assert isinstance(result['total_tokens'], int) ++ ++ ++# ── Comparison Tests ────────────────────────────────────────────────────── ++ ++class TestCompare: ++ def test_table_two_tokens(self): ++ from rugguard import _format_comparison_table, RugScore, RugFlags, TokenMeta ++ flags = RugFlags() ++ r1 = RugReport( ++ token=TokenMeta(address='A'), safety_score=80, risk_level='LOW', ++ score=RugScore(), flags=flags, warnings=[], recommendation='', ++ ) ++ r2 = RugReport( ++ token=TokenMeta(address='B'), safety_score=30, risk_level='HIGH', ++ score=RugScore(), flags=flags, warnings=['Flag'], recommendation='', ++ ) ++ tbl = _format_comparison_table([r1, r2]) ++ assert 'Safety Score' in tbl ++ assert '80' in tbl ++ assert '30' in tbl ++ assert 'HIGH' in tbl ++ # Rows should be separated by dashes ++ assert '---' in tbl ++ ++ def test_sort_by_name(self): ++ from rugguard import _format_comparison_table, RugScore, RugFlags, TokenMeta ++ flags = RugFlags() ++ r1 = RugReport( ++ token=TokenMeta(address='A', symbol='ZZZ'), safety_score=50, ++ risk_level='MEDIUM', score=RugScore(), flags=flags, warnings=[], recommendation='', ++ ) ++ r2 = RugReport( ++ token=TokenMeta(address='B', symbol='AAA'), safety_score=50, ++ risk_level='MEDIUM', score=RugScore(), flags=flags, warnings=[], recommendation='', ++ ) ++ tbl = _format_comparison_table([r1, r2], sort_by='name') ++ # AAA should come before ZZZ ++ idx_aaa = tbl.index('AAA') ++ idx_zzz = tbl.index('ZZZ') ++ assert idx_aaa < idx_zzz, 'AAA should be before ZZZ when sorted by name' ++ ++ def test_empty_no_crash(self): ++ from rugguard import _format_comparison_table ++ assert _format_comparison_table([]) == ''