Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 189 additions & 1 deletion scripts/rugguard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1627,7 +1627,132 @@ def format_markdown(report: RugReport) -> str:
def format_json(report: RugReport) -> str:
"""Format report as pretty JSON."""
return json.dumps(report.to_dict(), indent=2, default=str)
# ── CLI Entry Point ────────────────────────────────────────────────────────


# ── Comparison Table ──────────────────────────────────────────────────────


def _format_comparison_table(reports: list[RugReport],
sort_by: str = "score") -> str:
"""Render a side-by-side ASCII comparison table for multiple tokens.

Each column = one token, auto-sized to widest value.
Tokens are sorted by safety_score ascending (riskiest first).
"""
if not reports:
return ""

# Sort: score asc (riskiest first)
if sort_by == "score":
key_fn = lambda r: r.safety_score
elif sort_by == "name":
key_fn = lambda r: (r.token.name or r.token.symbol or r.token.address)
elif sort_by == "age":
key_fn = lambda r: -r.score.age_risk # less age risk = older
elif sort_by == "liquidity":
key_fn = lambda r: -(r.dex_data.get("liquidity_usd", 0) if r.dex_data else 0)
else:
key_fn = lambda r: r.safety_score
sorted_reports = sorted(reports, key=key_fn)

# Build rows: each is a list of values, one per token
headers = ["Metric"]
for r in sorted_reports:
sym = r.token.symbol or r.token.name or r.token.address[:8]
headers.append(sym)

rows: list[list[str]] = [headers]

def val(name: str, vals: list[str]) -> None:
rows.append([name] + vals)

val("Safety Score", [str(r.safety_score) for r in sorted_reports])
val("Risk Level", [r.risk_level for r in sorted_reports])

# Price
prices = []
for r in sorted_reports:
if r.dex_data and r.dex_data.get("price_usd"):
p = r.dex_data["price_usd"]
prices.append(f"${p:.8f}" if p < 1 else f"${p:.4f}")
else:
prices.append("—")
val("Price", prices)

# 24h Change
changes = []
for r in sorted_reports:
if r.dex_data and r.dex_data.get("price_change_24h"):
pct = r.dex_data["price_change_24h"]
changes.append(f"{pct:+.2f}%")
else:
changes.append("—")
val("24h Change", changes)

# Liquidity
liqs = []
for r in sorted_reports:
if r.dex_data and r.dex_data.get("liquidity_usd"):
liqs.append(f"${r.dex_data['liquidity_usd']:,.0f}")
else:
liqs.append("—")
val("Liquidity", liqs)

# Volume 24h
vols = []
for r in sorted_reports:
if r.dex_data and r.dex_data.get("volume_24h"):
vols.append(f"${r.dex_data['volume_24h']:,.0f}")
else:
vols.append("—")
val("Volume 24h", vols)

# Holders
hldrs = []
for r in sorted_reports:
if r.holders:
hldrs.append(str(r.holders.total_holders))
else:
hldrs.append("—")
val("Holders", hldrs)

# Top 10 %
top10 = []
for r in sorted_reports:
if r.holders:
top10.append(f"{r.holders.top_10_pct:.1f}%")
else:
top10.append("—")
val("Top 10%", top10)

# Warnings count
wcounts = [str(len(r.warnings)) for r in sorted_reports]
val("Warnings", wcounts)

# Auto-size column widths
col_widths: list[int] = []
for ci in range(len(rows[0])):
col_widths.append(max(len(r[ci]) for r in rows))
col_widths[0] = max(col_widths[0], 12) # min metric column width

# Render table
sep = " | "
lines = []
# Header
hdr_parts = [h.ljust(col_widths[i]) for i, h in enumerate(rows[0])]
lines.append(sep.join(hdr_parts))
# Separator
sep_parts = ["-" * col_widths[i] for i in range(len(col_widths))]
lines.append(sep.join(sep_parts))
# Rows
for r in rows[1:]:
parts = [r[i].ljust(col_widths[i]) for i in range(len(r))]
lines.append(sep.join(parts))

return "\n".join(lines)


# ── Export Helpers (CSV / JSONL) ──────────────────────────────────────────

def cli_token(args: list[str]) -> None:
mint = args[0] if args else ""
Expand Down Expand Up @@ -1660,19 +1785,78 @@ def cli_wallet(args: list[str]) -> None:

if result.get("risky_count", 0) > 0:
sys.exit(2)


def cli_compare(args: list[str]) -> None:
"""Compare multiple tokens side-by-side."""
if not args:
print('Usage: python rugguard.py compare <MINT1> <MINT2> [<MINT3> ...]', file=sys.stderr)
sys.exit(1)

# Parse flags
as_json = "--json" in args
sort_by = "score"
mints = []
for a in args:
if a == "--json":
continue
if a.startswith("--sort="):
sort_by = a.split("=", 1)[1]
elif a == "--sort" and args.index(a) + 1 < len(args):
continue # handled by next arg
else:
mints.append(a)

if len(mints) < 2:
print("Error: need at least 2 mint addresses to compare", file=sys.stderr)
sys.exit(1)

reports: list[RugReport] = []
errors: list[str] = []

for mint in mints:
try:
report = rug_check_token(mint.strip())
reports.append(report)
except Exception as e:
errors.append(f"{mint[:8]}...: {e}")
# Insert a minimal placeholder for failed token
reports.append(RugReport(
token=TokenMeta(address=mint),
safety_score=0, risk_level="ERROR",
score=RugScore(), flags=RugFlags(),
warnings=[], recommendation="Check failed",
))

if errors:
for e in errors:
print(f"Warning: {e}", file=sys.stderr)

if as_json:
import json as _json
print(_json.dumps([r.to_dict() for r in reports], indent=2, default=str))
else:
print(_format_comparison_table(reports, sort_by=sort_by))


def cli_help() -> None:
print("""Solana Rug Guard — On-chain rug-pull detection engine

USAGE:
python rugguard.py token <MINT_ADDRESS> [--json|--markdown]
python rugguard.py wallet <WALLET_ADDRESS>
python rugguard.py compare <MINT1> <MINT2> [<MINT3> ...] [--json]
python rugguard.py watch <MINT_ADDRESS> [--interval 60] [--iterations 0]
[--history PATH] [--webhook URL] [--threshold SCORE]

OPTIONS:
--json Output as JSON (default for token)
--markdown Output as Markdown report
--md Alias for --markdown
--export csv Export as CSV (compatible with spreadsheets)
--export jsonl
Export as JSONL (one JSON object per line)
--sort Sort tokens in compare output (default: score)

WATCH OPTIONS:
--interval Seconds between checks (default: 60)
Expand All @@ -1685,6 +1869,8 @@ def cli_help() -> None:
python rugguard.py token DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263
python rugguard.py token DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263 --markdown
python rugguard.py wallet 9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM
python rugguard.py wallet 9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM --export jsonl
python rugguard.py compare DezXAZ8z... EPjFWdd5... [--json]
python rugguard.py watch <MINT_ADDRESS> --iterations 1 --threshold 70

ENVIRONMENT:
Expand All @@ -1705,6 +1891,8 @@ def main() -> None:
cli_token(args)
elif cmd == "wallet":
cli_wallet(args)
elif cmd == "compare":
cli_compare(args)
elif cmd == "watch":
cli_watch(args)
else:
Expand Down
46 changes: 45 additions & 1 deletion tests/test_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,4 +450,48 @@ def test_wallet_scan() -> None:
assert "address" in result
assert result["address"] == TEST_WALLET
assert "total_tokens" in result
assert isinstance(result["total_tokens"], int)
assert isinstance(result['total_tokens'], int)


# ── Comparison Tests ──────────────────────────────────────────────────────

class TestCompare:
def test_table_two_tokens(self):
from rugguard import _format_comparison_table, RugScore, RugFlags, TokenMeta
flags = RugFlags()
r1 = RugReport(
token=TokenMeta(address='A'), safety_score=80, risk_level='LOW',
score=RugScore(), flags=flags, warnings=[], recommendation='',
)
r2 = RugReport(
token=TokenMeta(address='B'), safety_score=30, risk_level='HIGH',
score=RugScore(), flags=flags, warnings=['Flag'], recommendation='',
)
tbl = _format_comparison_table([r1, r2])
assert 'Safety Score' in tbl
assert '80' in tbl
assert '30' in tbl
assert 'HIGH' in tbl
# Rows should be separated by dashes
assert '---' in tbl

def test_sort_by_name(self):
from rugguard import _format_comparison_table, RugScore, RugFlags, TokenMeta
flags = RugFlags()
r1 = RugReport(
token=TokenMeta(address='A', symbol='ZZZ'), safety_score=50,
risk_level='MEDIUM', score=RugScore(), flags=flags, warnings=[], recommendation='',
)
r2 = RugReport(
token=TokenMeta(address='B', symbol='AAA'), safety_score=50,
risk_level='MEDIUM', score=RugScore(), flags=flags, warnings=[], recommendation='',
)
tbl = _format_comparison_table([r1, r2], sort_by='name')
# AAA should come before ZZZ
idx_aaa = tbl.index('AAA')
idx_zzz = tbl.index('ZZZ')
assert idx_aaa < idx_zzz, 'AAA should be before ZZZ when sorted by name'

def test_empty_no_crash(self):
from rugguard import _format_comparison_table
assert _format_comparison_table([]) == ''