Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions scripts/rugguard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1745,6 +1745,129 @@ def format_jsonl(rows: list[dict]) -> str:
return "\n".join(lines) + "\n" if lines else ""


# ── Comparison Table ──────────────────────────────────────────────────────


def _format_comparison_table(reports: list[RugReport],
sort_by: str = "score") -> str:
"""Render a side-by-side ASCII comparison table for multiple tokens.

Each column = one token, auto-sized to widest value.
Tokens are sorted by safety_score ascending (riskiest first).
"""
if not reports:
return ""

# Sort: score asc (riskiest first)
def _sort_key_score(r: RugReport) -> int:
return r.safety_score

def _sort_key_name(r: RugReport) -> str:
return r.token.name or r.token.symbol or r.token.address

def _sort_key_age(r: RugReport) -> int:
return -r.score.age_risk

def _sort_key_liquidity(r: RugReport) -> float:
return -(r.dex_data.get("liquidity_usd", 0) if r.dex_data else 0)

if sort_by == "score":
key_fn = _sort_key_score
elif sort_by == "name":
key_fn = _sort_key_name
elif sort_by == "age":
key_fn = _sort_key_age
elif sort_by == "liquidity":
key_fn = _sort_key_liquidity
else:
key_fn = _sort_key_score
sorted_reports = sorted(reports, key=key_fn)

# Build rows: each is a list of values, one per token
headers = ["Metric"]
for r in sorted_reports:
sym = r.token.symbol or r.token.name or r.token.address[:8]
headers.append(sym)

rows: list[list[str]] = [headers]

def val(name: str, vals: list[str]) -> None:
rows.append([name] + vals)

val("Safety Score", [str(r.safety_score) for r in sorted_reports])
val("Risk Level", [r.risk_level for r in sorted_reports])

prices = []
for r in sorted_reports:
if r.dex_data and r.dex_data.get("price_usd"):
p = r.dex_data["price_usd"]
prices.append(f"${p:.8f}" if p < 1 else f"${p:.4f}")
else:
prices.append("-")
val("Price", prices)

changes = []
for r in sorted_reports:
if r.dex_data and r.dex_data.get("price_change_24h"):
pct = r.dex_data["price_change_24h"]
changes.append(f"{pct:+.2f}%")
else:
changes.append("-")
val("24h Change", changes)

liqs = []
for r in sorted_reports:
if r.dex_data and r.dex_data.get("liquidity_usd"):
liqs.append(f"${r.dex_data['liquidity_usd']:,.0f}")
else:
liqs.append("-")
val("Liquidity", liqs)

vols = []
for r in sorted_reports:
if r.dex_data and r.dex_data.get("volume_24h"):
vols.append(f"${r.dex_data['volume_24h']:,.0f}")
else:
vols.append("-")
val("Volume 24h", vols)

hldrs = []
for r in sorted_reports:
if r.holders:
hldrs.append(str(r.holders.total_holders))
else:
hldrs.append("-")
val("Holders", hldrs)

top10 = []
for r in sorted_reports:
if r.holders:
top10.append(f"{r.holders.top_10_pct:.1f}%")
else:
top10.append("-")
val("Top 10%", top10)

wcounts = [str(len(r.warnings)) for r in sorted_reports]
val("Warnings", wcounts)

col_widths: list[int] = []
for ci in range(len(rows[0])):
col_widths.append(max(len(r[ci]) for r in rows))
col_widths[0] = max(col_widths[0], 12)

sep = " | "
lines = []
hdr_parts = [h.ljust(col_widths[i]) for i, h in enumerate(rows[0])]
lines.append(sep.join(hdr_parts))
sep_parts = ["-" * col_widths[i] for i in range(len(col_widths))]
lines.append(sep.join(sep_parts))
for r in rows[1:]:
parts = [r[i].ljust(col_widths[i]) for i in range(len(r))]
lines.append(sep.join(parts))

return "\n".join(lines)


# ── CLI Entry Point ────────────────────────────────────────────────────────

def cli_token(args: list[str]) -> None:
Expand Down Expand Up @@ -1831,19 +1954,85 @@ def cli_wallet(args: list[str]) -> None:

if result.get("risky_count", 0) > 0:
sys.exit(2)


def cli_compare(args: list[str]) -> None:
"""Compare multiple tokens side-by-side."""
if not args:
print('Usage: python rugguard.py compare <MINT1> <MINT2> [<MINT3> ...]', file=sys.stderr)
sys.exit(1)

# Parse flags
as_json = "--json" in args
sort_by = "score"
mints = []
i = 0
while i < len(args):
a = args[i]
if a == "--json":
i += 1
continue
if a == "--sort" and i + 1 < len(args):
sort_by = args[i + 1]
i += 2
continue
if a.startswith("--sort="):
sort_by = a.split("=", 1)[1]
i += 1
continue
mints.append(a)
i += 1

if len(mints) < 2:
print("Error: need at least 2 mint addresses to compare", file=sys.stderr)
sys.exit(1)

reports: list[RugReport] = []
errors: list[str] = []

for mint in mints:
try:
report = rug_check_token(mint.strip())
reports.append(report)
except Exception as e:
errors.append(f"{mint[:8]}...: {e}")
# Insert a minimal placeholder for failed token
reports.append(RugReport(
token=TokenMeta(address=mint),
safety_score=0, risk_level="ERROR",
score=RugScore(), flags=RugFlags(),
warnings=[], recommendation="Check failed",
))

if errors:
for e in errors:
print(f"Warning: {e}", file=sys.stderr)

if as_json:
import json as _json
print(_json.dumps([r.to_dict() for r in reports], indent=2, default=str))
else:
print(_format_comparison_table(reports, sort_by=sort_by))


def cli_help() -> None:
print("""Solana Rug Guard — On-chain rug-pull detection engine

USAGE:
python rugguard.py token <MINT_ADDRESS> [--json|--markdown]
python rugguard.py wallet <WALLET_ADDRESS>
python rugguard.py compare <MINT1> <MINT2> [<MINT3> ...] [--json]
python rugguard.py watch <MINT_ADDRESS> [--interval 60] [--iterations 0]
[--history PATH] [--webhook URL] [--threshold SCORE]

OPTIONS:
--json Output as JSON (default for token)
--markdown Output as Markdown report
--md Alias for --markdown
--export csv Export as CSV (compatible with spreadsheets)
--export jsonl
Export as JSONL (one JSON object per line)
--sort Sort tokens in compare output (default: score)

WATCH OPTIONS:
--interval Seconds between checks (default: 60)
Expand All @@ -1856,6 +2045,8 @@ def cli_help() -> None:
python rugguard.py token DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263
python rugguard.py token DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263 --markdown
python rugguard.py wallet 9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM
python rugguard.py wallet 9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM --export jsonl
python rugguard.py compare DezXAZ8z... EPjFWdd5... [--json]
python rugguard.py watch <MINT_ADDRESS> --iterations 1 --threshold 70

ENVIRONMENT:
Expand All @@ -1876,6 +2067,8 @@ def main() -> None:
cli_token(args)
elif cmd == "wallet":
cli_wallet(args)
elif cmd == "compare":
cli_compare(args)
elif cmd == "watch":
cli_watch(args)
else:
Expand Down
135 changes: 135 additions & 0 deletions scripts/rugguard.py.rej
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
diff a/scripts/rugguard.py b/scripts/rugguard.py (rejected hunks)
@@ -1627,7 +1627,132 @@ def format_markdown(report: RugReport) -> str:
def format_json(report: RugReport) -> str:
"""Format report as pretty JSON."""
return json.dumps(report.to_dict(), indent=2, default=str)
-# ── CLI Entry Point ────────────────────────────────────────────────────────
+
+
+# ── Comparison Table ──────────────────────────────────────────────────────
+
+
+def _format_comparison_table(reports: list[RugReport],
+ sort_by: str = "score") -> str:
+ """Render a side-by-side ASCII comparison table for multiple tokens.
+
+ Each column = one token, auto-sized to widest value.
+ Tokens are sorted by safety_score ascending (riskiest first).
+ """
+ if not reports:
+ return ""
+
+ # Sort: score asc (riskiest first)
+ if sort_by == "score":
+ key_fn = lambda r: r.safety_score
+ elif sort_by == "name":
+ key_fn = lambda r: (r.token.name or r.token.symbol or r.token.address)
+ elif sort_by == "age":
+ key_fn = lambda r: -r.score.age_risk # less age risk = older
+ elif sort_by == "liquidity":
+ key_fn = lambda r: -(r.dex_data.get("liquidity_usd", 0) if r.dex_data else 0)
+ else:
+ key_fn = lambda r: r.safety_score
+ sorted_reports = sorted(reports, key=key_fn)
+
+ # Build rows: each is a list of values, one per token
+ headers = ["Metric"]
+ for r in sorted_reports:
+ sym = r.token.symbol or r.token.name or r.token.address[:8]
+ headers.append(sym)
+
+ rows: list[list[str]] = [headers]
+
+ def val(name: str, vals: list[str]) -> None:
+ rows.append([name] + vals)
+
+ val("Safety Score", [str(r.safety_score) for r in sorted_reports])
+ val("Risk Level", [r.risk_level for r in sorted_reports])
+
+ # Price
+ prices = []
+ for r in sorted_reports:
+ if r.dex_data and r.dex_data.get("price_usd"):
+ p = r.dex_data["price_usd"]
+ prices.append(f"${p:.8f}" if p < 1 else f"${p:.4f}")
+ else:
+ prices.append("—")
+ val("Price", prices)
+
+ # 24h Change
+ changes = []
+ for r in sorted_reports:
+ if r.dex_data and r.dex_data.get("price_change_24h"):
+ pct = r.dex_data["price_change_24h"]
+ changes.append(f"{pct:+.2f}%")
+ else:
+ changes.append("—")
+ val("24h Change", changes)
+
+ # Liquidity
+ liqs = []
+ for r in sorted_reports:
+ if r.dex_data and r.dex_data.get("liquidity_usd"):
+ liqs.append(f"${r.dex_data['liquidity_usd']:,.0f}")
+ else:
+ liqs.append("—")
+ val("Liquidity", liqs)
+
+ # Volume 24h
+ vols = []
+ for r in sorted_reports:
+ if r.dex_data and r.dex_data.get("volume_24h"):
+ vols.append(f"${r.dex_data['volume_24h']:,.0f}")
+ else:
+ vols.append("—")
+ val("Volume 24h", vols)
+
+ # Holders
+ hldrs = []
+ for r in sorted_reports:
+ if r.holders:
+ hldrs.append(str(r.holders.total_holders))
+ else:
+ hldrs.append("—")
+ val("Holders", hldrs)
+
+ # Top 10 %
+ top10 = []
+ for r in sorted_reports:
+ if r.holders:
+ top10.append(f"{r.holders.top_10_pct:.1f}%")
+ else:
+ top10.append("—")
+ val("Top 10%", top10)
+
+ # Warnings count
+ wcounts = [str(len(r.warnings)) for r in sorted_reports]
+ val("Warnings", wcounts)
+
+ # Auto-size column widths
+ col_widths: list[int] = []
+ for ci in range(len(rows[0])):
+ col_widths.append(max(len(r[ci]) for r in rows))
+ col_widths[0] = max(col_widths[0], 12) # min metric column width
+
+ # Render table
+ sep = " | "
+ lines = []
+ # Header
+ hdr_parts = [h.ljust(col_widths[i]) for i, h in enumerate(rows[0])]
+ lines.append(sep.join(hdr_parts))
+ # Separator
+ sep_parts = ["-" * col_widths[i] for i in range(len(col_widths))]
+ lines.append(sep.join(sep_parts))
+ # Rows
+ for r in rows[1:]:
+ parts = [r[i].ljust(col_widths[i]) for i in range(len(r))]
+ lines.append(sep.join(parts))
+
+ return "\n".join(lines)
+
+
+# ── Export Helpers (CSV / JSONL) ──────────────────────────────────────────

def cli_token(args: list[str]) -> None:
mint = args[0] if args else ""
Loading
Loading