From d9d5206f655a71c33a04f95163906f8e285d9bfa Mon Sep 17 00:00:00 2001 From: GraphTheory Date: Thu, 4 Jun 2026 12:31:55 -0400 Subject: [PATCH 1/2] feat: add CSV and JSONL export with --export flag (Fixes #24) Adds --export csv/jsonl to token and wallet commands. Both format flags and --export=value shorthand supported. 32 tests pass, ruff clean. Co-authored-by: copernicusjones Co-authored-by: lb1192176991-lab --- scripts/rugguard.py | 136 +++++++++++++++++++++++++++++++- tests/test_checks.py | 182 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 311 insertions(+), 7 deletions(-) diff --git a/scripts/rugguard.py b/scripts/rugguard.py index 5593f3f..4e6ff39 100644 --- a/scripts/rugguard.py +++ b/scripts/rugguard.py @@ -1670,22 +1670,126 @@ def format_markdown(report: RugReport) -> str: def format_json(report: RugReport) -> str: """Format report as pretty JSON.""" return json.dumps(report.to_dict(), indent=2, default=str) + + +def _report_csv_rows(report: RugReport) -> list[dict]: + """Build a list of flat dicts (one per token) for CSV/JSONL export from a token report.""" + d = report.to_dict() + flat: dict[str, Any] = {} + flat["token_address"] = report.token.address + flat["token_symbol"] = report.token.symbol + flat["token_name"] = report.token.name + flat["token_decimals"] = report.token.decimals + flat["safety_score"] = report.safety_score + flat["risk_level"] = report.risk_level + flat["recommendation"] = report.recommendation + flat["warnings"] = "; ".join(report.warnings) + # Score breakdown + score = d.get("score", {}) + for k, v in score.items(): + flat[f"score_{k}"] = v + # Flags + flags = d.get("flags", {}) + for k, v in flags.items(): + flat[f"flag_{k}"] = int(bool(v)) + # Market data + market = d.get("market_data", {}) + for k, v in market.items(): + flat[f"market_{k}"] = v if v is not None else "" + return [flat] + + +def _wallet_csv_rows(wallet_result: dict) -> list[dict]: + """Build a list of flat dicts (one per risky token) for CSV/JSONL export from a wallet scan.""" + rows: list[dict] = [] + base: dict[str, Any] = { + "wallet_address": wallet_result.get("address", ""), + "total_tokens": wallet_result.get("total_tokens", 0), + "risky_count": wallet_result.get("risky_count", 0), + "summary": wallet_result.get("summary", ""), + } + risky = wallet_result.get("risky_tokens", []) + if not risky: + rows.append(base) + else: + for t in risky: + row = dict(base) + row["token_mint"] = t.get("mint", "") + row["token_symbol"] = t.get("symbol", "") + row["balance_raw"] = t.get("balance_raw", 0) + row["token_decimals"] = t.get("decimals", 0) + row["safety_score"] = t.get("safety_score", 0) + row["risk_level"] = t.get("risk_level", "") + row["top_warnings"] = "; ".join(t.get("top_warnings", [])) + rows.append(row) + return rows + + +def format_csv(rows: list[dict]) -> str: + """Format a list of flat dicts as CSV string.""" + import csv as _csv + import io + if not rows: + return "" + buf = io.StringIO() + writer = _csv.DictWriter(buf, fieldnames=list(rows[0].keys()), quoting=_csv.QUOTE_MINIMAL) + writer.writeheader() + for row in rows: + writer.writerow({k: v if v is not None else "" for k, v in row.items()}) + return buf.getvalue() + + +def format_jsonl(rows: list[dict]) -> str: + """Format a list of flat dicts as JSONL (one JSON object per line).""" + lines = [json.dumps(row, default=str) for row in rows] + return "\n".join(lines) + "\n" if lines else "" + + # ── CLI Entry Point ──────────────────────────────────────────────────────── def cli_token(args: list[str]) -> None: mint = args[0] if args else "" if not mint: - print('Usage: python rugguard.py token ', file=sys.stderr) + print('Usage: python rugguard.py token [--json|--markdown|--export csv|jsonl]', file=sys.stderr) + sys.stderr.write('\n') sys.exit(1) mode = "json" + export_fmt = None if "--json" in args: mode = "json" if "--markdown" in args or "--md" in args: mode = "markdown" + # Handle --export csv, --export=jsonl, etc. + for a in args: + if a == "--export": + idx = args.index("--export") + if idx + 1 < len(args): + export_fmt = args[idx + 1].lower() + else: + print("--export requires a value: csv or jsonl", file=sys.stderr) + sys.exit(1) + mode = "export" + break + elif a.startswith("--export="): + export_fmt = a.split("=", 1)[1].lower() + if export_fmt not in ("csv", "jsonl"): + print(f"Unknown --export format: {export_fmt} (use csv or jsonl)", file=sys.stderr) + sys.exit(1) + mode = "export" + break report = rug_check_token(mint.strip()) - if mode == "markdown": + if mode == "export": + rows = _report_csv_rows(report) + if export_fmt == "csv": + print(format_csv(rows)) + elif export_fmt == "jsonl": + print(format_jsonl(rows)) + else: + print(f"Unknown --export format: {export_fmt} (use csv or jsonl)", file=sys.stderr) + sys.exit(1) + elif mode == "markdown": print(format_markdown(report)) else: print(format_json(report)) @@ -1695,11 +1799,35 @@ def cli_token(args: list[str]) -> None: def cli_wallet(args: list[str]) -> None: address = args[0] if args else "" if not address: - print('Usage: python rugguard.py wallet
', file=sys.stderr) + print('Usage: python rugguard.py wallet
[--export csv|jsonl]', file=sys.stderr) sys.exit(1) + export_fmt = None + for a in args: + if a == "--export": + idx = args.index("--export") + if idx + 1 < len(args): + export_fmt = args[idx + 1].lower() + else: + print("--export requires a value: csv or jsonl", file=sys.stderr) + sys.exit(1) + break + elif a.startswith("--export="): + export_fmt = a.split("=", 1)[1].lower() + if export_fmt not in ("csv", "jsonl"): + print(f"Unknown --export format: {export_fmt} (use csv or jsonl)", file=sys.stderr) + sys.exit(1) + break + result = rug_check_wallet(address.strip()) - print(json.dumps(result, indent=2, default=str)) + if export_fmt == "csv": + rows = _wallet_csv_rows(result) + print(format_csv(rows)) + elif export_fmt == "jsonl": + rows = _wallet_csv_rows(result) + print(format_jsonl(rows)) + else: + print(json.dumps(result, indent=2, default=str)) if result.get("risky_count", 0) > 0: sys.exit(2) diff --git a/tests/test_checks.py b/tests/test_checks.py index 428321f..c59bf68 100644 --- a/tests/test_checks.py +++ b/tests/test_checks.py @@ -25,7 +25,9 @@ RugReport, RugScore, TokenMeta, + _report_csv_rows, _sparkline_from_change, + _wallet_csv_rows, check_authorities, compute_safety_score, compute_score_components, @@ -34,7 +36,9 @@ estimate_token_age, fetch_token_holders, fetch_token_meta, + format_csv, format_json, + format_jsonl, format_markdown, load_last_history, prune_history, @@ -451,7 +455,7 @@ def test_wallet_scan() -> None: assert "address" in result assert result["address"] == TEST_WALLET assert "total_tokens" in result - assert isinstance(result['total_tokens'], int) + assert isinstance(result["total_tokens"], int) # Sparkline Tests @@ -474,5 +478,177 @@ def test_flat(self): def test_small_change_no_color(self): result = _sparkline_from_change(0.5) assert result is not None - assert '\U0001f7e2' not in result - assert '\U0001f534' not in result + assert "\U0001f7e2" not in result + assert "\U0001f534" not in result + + +# ── Export Tests ─────────────────────────────────────────────────────────── + +class TestExport: + def _make_report(self) -> RugReport: + return RugReport( + token=TokenMeta( + address=BONK_MINT, + symbol="BONK", + name="Bonk", + decimals=5, + supply=100000000000000, + ), + safety_score=85, + risk_level="LOW", + score=RugScore( + mint_authority_risk=0, + freeze_authority_risk=0, + liquidity_risk=5, + holder_concentration_risk=3, + mint_history_risk=0, + honeypot_risk=0, + dev_risk=0, + age_risk=0, + low_liquidity_risk=1, + sniper_risk=0, + name_risk=0, + sub_penny_risk=0, + deployer_dump_risk=0, + overall_score=9, + ), + warnings=["Low volume/liquidity ratio", "Thin liquidity warning"], + recommendation="Token appears safe — standard risks only.", + dex_data={ + "dex": "raydium", + "liquidity_usd": 682000, + "volume_24h": 150000, + "price_usd": 0.00001234, + "price_change_24h": -5.2, + }, + ) + + def test_report_csv_rows(self) -> None: + report = self._make_report() + rows = _report_csv_rows(report) + assert len(rows) == 1 + row = rows[0] + assert row["token_address"] == BONK_MINT + assert row["token_symbol"] == "BONK" + assert row["safety_score"] == 85 + assert row["risk_level"] == "LOW" + assert row["market_liquidity_usd"] == 682000 + + def test_format_csv_basic(self) -> None: + report = self._make_report() + rows = _report_csv_rows(report) + csv_out = format_csv(rows) + assert "token_address" in csv_out + assert "token_symbol" in csv_out + assert "BONK" in csv_out + assert "safety_score" in csv_out + assert "85" in csv_out + + def test_format_csv_escaping(self) -> None: + report = self._make_report() + report.warnings = ["Has, comma inside", 'Has "quotes" inside'] + rows = _report_csv_rows(report) + csv_out = format_csv(rows) + # CSV module handles quoting — roundtrip should preserve data + import csv as _csv + import io + reader = _csv.DictReader(io.StringIO(csv_out)) + row = next(reader) + assert "Has, comma inside" in row["warnings"] + assert 'Has "quotes" inside' in row["warnings"] + + def test_format_jsonl_basic(self) -> None: + report = self._make_report() + rows = _report_csv_rows(report) + jsonl_out = format_jsonl(rows) + lines = jsonl_out.strip().split("\n") + assert len(lines) == 1 + parsed = json.loads(lines[0]) + assert parsed["token_symbol"] == "BONK" + assert parsed["safety_score"] == 85 + + def test_format_jsonl_wallet_scan(self) -> None: + wallet_result = { + "address": TEST_WALLET, + "total_tokens": 5, + "risky_count": 2, + "risky_tokens": [ + { + "mint": BONK_MINT, + "symbol": "BONK", + "balance_raw": 1000000, + "decimals": 5, + "safety_score": 45, + "risk_level": "MEDIUM", + "top_warnings": ["Mint authority active"], + }, + { + "mint": USDC_MINT, + "symbol": "USDC", + "balance_raw": 5000000, + "decimals": 6, + "safety_score": 30, + "risk_level": "HIGH", + "top_warnings": ["Thin liquidity"], + }, + ], + "summary": "Found 2 risky tokens.", + } + rows = _wallet_csv_rows(wallet_result) + assert len(rows) == 2 + assert rows[0]["token_mint"] == BONK_MINT + assert rows[0]["token_symbol"] == "BONK" + assert rows[1]["token_mint"] == USDC_MINT + + def test_wallet_csv_format(self) -> None: + wallet_result = { + "address": TEST_WALLET, + "total_tokens": 3, + "risky_count": 1, + "risky_tokens": [ + { + "mint": BONK_MINT, + "symbol": "BONK", + "balance_raw": 1000000, + "decimals": 5, + "safety_score": 45, + "risk_level": "MEDIUM", + "top_warnings": ["Test warning"], + }, + ], + "summary": "Found 1 risky token.", + } + rows = _wallet_csv_rows(wallet_result) + csv_out = format_csv(rows) + assert "wallet_address" in csv_out + assert "token_mint" in csv_out + assert "BONK" in csv_out + + def test_wallet_jsonl_format(self) -> None: + wallet_result = { + "address": TEST_WALLET, + "total_tokens": 3, + "risky_count": 0, + "risky_tokens": [], + "summary": "No risky tokens.", + } + rows = _wallet_csv_rows(wallet_result) + jsonl_out = format_jsonl(rows) + parsed = json.loads(jsonl_out.strip()) + assert parsed["wallet_address"] == TEST_WALLET + assert parsed["risky_count"] == 0 + + def test_format_csv_empty(self) -> None: + assert format_csv([]) == "" + + def test_format_jsonl_empty(self) -> None: + assert format_jsonl([]) == "" + + def test_export_implies_json_mode(self) -> None: + """--export should not mix with --markdown — export takes precedence.""" + report = self._make_report() + rows = _report_csv_rows(report) + csv_out = format_csv(rows) + # CSV should not contain markdown headers + assert "# " not in csv_out + assert "|" not in csv_out From 6b3d9676cfb93f15134b5748f204f528e111540e Mon Sep 17 00:00:00 2001 From: GraphTheory Date: Thu, 4 Jun 2026 12:36:51 -0400 Subject: [PATCH 2/2] credit co-authors for CSV/JSONL export feature Co-authored-by: copernicusjones Co-authored-by: lb1192176991-lab