Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 132 additions & 4 deletions scripts/rugguard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1670,22 +1670,126 @@ def format_markdown(report: RugReport) -> str:
def format_json(report: RugReport) -> str:
"""Format report as pretty JSON."""
return json.dumps(report.to_dict(), indent=2, default=str)


def _report_csv_rows(report: RugReport) -> list[dict]:
"""Build a list of flat dicts (one per token) for CSV/JSONL export from a token report."""
d = report.to_dict()
flat: dict[str, Any] = {}
flat["token_address"] = report.token.address
flat["token_symbol"] = report.token.symbol
flat["token_name"] = report.token.name
flat["token_decimals"] = report.token.decimals
flat["safety_score"] = report.safety_score
flat["risk_level"] = report.risk_level
flat["recommendation"] = report.recommendation
flat["warnings"] = "; ".join(report.warnings)
# Score breakdown
score = d.get("score", {})
for k, v in score.items():
flat[f"score_{k}"] = v
# Flags
flags = d.get("flags", {})
for k, v in flags.items():
flat[f"flag_{k}"] = int(bool(v))
# Market data
market = d.get("market_data", {})
for k, v in market.items():
flat[f"market_{k}"] = v if v is not None else ""
return [flat]


def _wallet_csv_rows(wallet_result: dict) -> list[dict]:
"""Build a list of flat dicts (one per risky token) for CSV/JSONL export from a wallet scan."""
rows: list[dict] = []
base: dict[str, Any] = {
"wallet_address": wallet_result.get("address", ""),
"total_tokens": wallet_result.get("total_tokens", 0),
"risky_count": wallet_result.get("risky_count", 0),
"summary": wallet_result.get("summary", ""),
}
risky = wallet_result.get("risky_tokens", [])
if not risky:
rows.append(base)
else:
for t in risky:
row = dict(base)
row["token_mint"] = t.get("mint", "")
row["token_symbol"] = t.get("symbol", "")
row["balance_raw"] = t.get("balance_raw", 0)
row["token_decimals"] = t.get("decimals", 0)
row["safety_score"] = t.get("safety_score", 0)
row["risk_level"] = t.get("risk_level", "")
row["top_warnings"] = "; ".join(t.get("top_warnings", []))
rows.append(row)
return rows


def format_csv(rows: list[dict]) -> str:
"""Format a list of flat dicts as CSV string."""
import csv as _csv
import io
if not rows:
return ""
buf = io.StringIO()
writer = _csv.DictWriter(buf, fieldnames=list(rows[0].keys()), quoting=_csv.QUOTE_MINIMAL)
writer.writeheader()
for row in rows:
writer.writerow({k: v if v is not None else "" for k, v in row.items()})
return buf.getvalue()


def format_jsonl(rows: list[dict]) -> str:
"""Format a list of flat dicts as JSONL (one JSON object per line)."""
lines = [json.dumps(row, default=str) for row in rows]
return "\n".join(lines) + "\n" if lines else ""


# ── CLI Entry Point ────────────────────────────────────────────────────────

def cli_token(args: list[str]) -> None:
mint = args[0] if args else ""
if not mint:
print('Usage: python rugguard.py token <MINT_ADDRESS>', file=sys.stderr)
print('Usage: python rugguard.py token <MINT_ADDRESS> [--json|--markdown|--export csv|jsonl]', file=sys.stderr)
sys.stderr.write('\n')
sys.exit(1)

mode = "json"
export_fmt = None
if "--json" in args:
mode = "json"
if "--markdown" in args or "--md" in args:
mode = "markdown"
# Handle --export csv, --export=jsonl, etc.
for a in args:
if a == "--export":
idx = args.index("--export")
if idx + 1 < len(args):
export_fmt = args[idx + 1].lower()
else:
print("--export requires a value: csv or jsonl", file=sys.stderr)
sys.exit(1)
mode = "export"
break
elif a.startswith("--export="):
export_fmt = a.split("=", 1)[1].lower()
if export_fmt not in ("csv", "jsonl"):
print(f"Unknown --export format: {export_fmt} (use csv or jsonl)", file=sys.stderr)
sys.exit(1)
mode = "export"
break

report = rug_check_token(mint.strip())
if mode == "markdown":
if mode == "export":
rows = _report_csv_rows(report)
if export_fmt == "csv":
print(format_csv(rows))
elif export_fmt == "jsonl":
print(format_jsonl(rows))
else:
print(f"Unknown --export format: {export_fmt} (use csv or jsonl)", file=sys.stderr)
sys.exit(1)
elif mode == "markdown":
print(format_markdown(report))
else:
print(format_json(report))
Expand All @@ -1695,11 +1799,35 @@ def cli_token(args: list[str]) -> None:
def cli_wallet(args: list[str]) -> None:
address = args[0] if args else ""
if not address:
print('Usage: python rugguard.py wallet <ADDRESS>', file=sys.stderr)
print('Usage: python rugguard.py wallet <ADDRESS> [--export csv|jsonl]', file=sys.stderr)
sys.exit(1)

export_fmt = None
for a in args:
if a == "--export":
idx = args.index("--export")
if idx + 1 < len(args):
export_fmt = args[idx + 1].lower()
else:
print("--export requires a value: csv or jsonl", file=sys.stderr)
sys.exit(1)
break
elif a.startswith("--export="):
export_fmt = a.split("=", 1)[1].lower()
if export_fmt not in ("csv", "jsonl"):
print(f"Unknown --export format: {export_fmt} (use csv or jsonl)", file=sys.stderr)
sys.exit(1)
break

result = rug_check_wallet(address.strip())
print(json.dumps(result, indent=2, default=str))
if export_fmt == "csv":
rows = _wallet_csv_rows(result)
print(format_csv(rows))
elif export_fmt == "jsonl":
rows = _wallet_csv_rows(result)
print(format_jsonl(rows))
else:
print(json.dumps(result, indent=2, default=str))

if result.get("risky_count", 0) > 0:
sys.exit(2)
Expand Down
182 changes: 179 additions & 3 deletions tests/test_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
RugReport,
RugScore,
TokenMeta,
_report_csv_rows,
_sparkline_from_change,
_wallet_csv_rows,
check_authorities,
compute_safety_score,
compute_score_components,
Expand All @@ -34,7 +36,9 @@
estimate_token_age,
fetch_token_holders,
fetch_token_meta,
format_csv,
format_json,
format_jsonl,
format_markdown,
load_last_history,
prune_history,
Expand Down Expand Up @@ -451,7 +455,7 @@ def test_wallet_scan() -> None:
assert "address" in result
assert result["address"] == TEST_WALLET
assert "total_tokens" in result
assert isinstance(result['total_tokens'], int)
assert isinstance(result["total_tokens"], int)


# Sparkline Tests
Expand All @@ -474,5 +478,177 @@ def test_flat(self):
def test_small_change_no_color(self):
result = _sparkline_from_change(0.5)
assert result is not None
assert '\U0001f7e2' not in result
assert '\U0001f534' not in result
assert "\U0001f7e2" not in result
assert "\U0001f534" not in result


# ── Export Tests ───────────────────────────────────────────────────────────

class TestExport:
def _make_report(self) -> RugReport:
return RugReport(
token=TokenMeta(
address=BONK_MINT,
symbol="BONK",
name="Bonk",
decimals=5,
supply=100000000000000,
),
safety_score=85,
risk_level="LOW",
score=RugScore(
mint_authority_risk=0,
freeze_authority_risk=0,
liquidity_risk=5,
holder_concentration_risk=3,
mint_history_risk=0,
honeypot_risk=0,
dev_risk=0,
age_risk=0,
low_liquidity_risk=1,
sniper_risk=0,
name_risk=0,
sub_penny_risk=0,
deployer_dump_risk=0,
overall_score=9,
),
warnings=["Low volume/liquidity ratio", "Thin liquidity warning"],
recommendation="Token appears safe — standard risks only.",
dex_data={
"dex": "raydium",
"liquidity_usd": 682000,
"volume_24h": 150000,
"price_usd": 0.00001234,
"price_change_24h": -5.2,
},
)

def test_report_csv_rows(self) -> None:
report = self._make_report()
rows = _report_csv_rows(report)
assert len(rows) == 1
row = rows[0]
assert row["token_address"] == BONK_MINT
assert row["token_symbol"] == "BONK"
assert row["safety_score"] == 85
assert row["risk_level"] == "LOW"
assert row["market_liquidity_usd"] == 682000

def test_format_csv_basic(self) -> None:
report = self._make_report()
rows = _report_csv_rows(report)
csv_out = format_csv(rows)
assert "token_address" in csv_out
assert "token_symbol" in csv_out
assert "BONK" in csv_out
assert "safety_score" in csv_out
assert "85" in csv_out

def test_format_csv_escaping(self) -> None:
report = self._make_report()
report.warnings = ["Has, comma inside", 'Has "quotes" inside']
rows = _report_csv_rows(report)
csv_out = format_csv(rows)
# CSV module handles quoting — roundtrip should preserve data
import csv as _csv
import io
reader = _csv.DictReader(io.StringIO(csv_out))
row = next(reader)
assert "Has, comma inside" in row["warnings"]
assert 'Has "quotes" inside' in row["warnings"]

def test_format_jsonl_basic(self) -> None:
report = self._make_report()
rows = _report_csv_rows(report)
jsonl_out = format_jsonl(rows)
lines = jsonl_out.strip().split("\n")
assert len(lines) == 1
parsed = json.loads(lines[0])
assert parsed["token_symbol"] == "BONK"
assert parsed["safety_score"] == 85

def test_format_jsonl_wallet_scan(self) -> None:
wallet_result = {
"address": TEST_WALLET,
"total_tokens": 5,
"risky_count": 2,
"risky_tokens": [
{
"mint": BONK_MINT,
"symbol": "BONK",
"balance_raw": 1000000,
"decimals": 5,
"safety_score": 45,
"risk_level": "MEDIUM",
"top_warnings": ["Mint authority active"],
},
{
"mint": USDC_MINT,
"symbol": "USDC",
"balance_raw": 5000000,
"decimals": 6,
"safety_score": 30,
"risk_level": "HIGH",
"top_warnings": ["Thin liquidity"],
},
],
"summary": "Found 2 risky tokens.",
}
rows = _wallet_csv_rows(wallet_result)
assert len(rows) == 2
assert rows[0]["token_mint"] == BONK_MINT
assert rows[0]["token_symbol"] == "BONK"
assert rows[1]["token_mint"] == USDC_MINT

def test_wallet_csv_format(self) -> None:
wallet_result = {
"address": TEST_WALLET,
"total_tokens": 3,
"risky_count": 1,
"risky_tokens": [
{
"mint": BONK_MINT,
"symbol": "BONK",
"balance_raw": 1000000,
"decimals": 5,
"safety_score": 45,
"risk_level": "MEDIUM",
"top_warnings": ["Test warning"],
},
],
"summary": "Found 1 risky token.",
}
rows = _wallet_csv_rows(wallet_result)
csv_out = format_csv(rows)
assert "wallet_address" in csv_out
assert "token_mint" in csv_out
assert "BONK" in csv_out

def test_wallet_jsonl_format(self) -> None:
wallet_result = {
"address": TEST_WALLET,
"total_tokens": 3,
"risky_count": 0,
"risky_tokens": [],
"summary": "No risky tokens.",
}
rows = _wallet_csv_rows(wallet_result)
jsonl_out = format_jsonl(rows)
parsed = json.loads(jsonl_out.strip())
assert parsed["wallet_address"] == TEST_WALLET
assert parsed["risky_count"] == 0

def test_format_csv_empty(self) -> None:
assert format_csv([]) == ""

def test_format_jsonl_empty(self) -> None:
assert format_jsonl([]) == ""

def test_export_implies_json_mode(self) -> None:
"""--export should not mix with --markdown — export takes precedence."""
report = self._make_report()
rows = _report_csv_rows(report)
csv_out = format_csv(rows)
# CSV should not contain markdown headers
assert "# " not in csv_out
assert "|" not in csv_out
Loading