Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 174 additions & 0 deletions scripts/rugguard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,160 @@ def val(name: str, vals: list[str]) -> None:
return "\n".join(lines)


# ── Timeline ──────────────────────────────────────────────────────────────


def _fetch_timeline_events(mint: str) -> list[dict]:
"""Fetch chronological events for a token mint address.

Fetches up to 100 signatures and classifies each transaction.
Returns list of dicts with: time, rel_time, event, tx_sig, details
"""
events = []

# Fetch signatures
sigs = _rpc_call("getSignaturesForAddress", [mint, {"limit": 100}])
if not sigs:
return events

# Use first sig time as T0
t0 = None
for s in sigs:
bt = s.get("blockTime")
if bt:
t0 = bt
break

if not t0:
# Fallback: use earliest sig
t0 = sigs[-1].get("blockTime", time.time())

for sig_info in sigs:
tx_sig = sig_info.get("signature", "")
bt = sig_info.get("blockTime", 0)
if not tx_sig or not bt:
continue

rel_time = bt - t0
if rel_time < 0:
rel_time = 0

# Format relative time
if rel_time < 60:
rel_str = f"T+{rel_time}s"
elif rel_time < 3600:
rel_str = f"T+{rel_time // 60}m"
elif rel_time < 86400:
rel_str = f"T+{rel_time // 3600}h"
else:
rel_str = f"+{rel_time // 86400}d"

# Fetch transaction details
tx = _rpc_call("getTransaction", [tx_sig, {"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0}])
if not tx:
events.append({
"time": bt,
"rel_time": rel_str,
"event": "Transaction",
"tx_sig": tx_sig[:16] + "...",
"details": "",
"suspicious": False,
})
continue

meta = tx.get("meta", {})
if meta and meta.get("err"):
events.append({
"time": bt,
"rel_time": rel_str,
"event": "Failed Transaction",
"tx_sig": tx_sig[:16] + "...",
"details": str(meta.get("err", ""))[:80],
"suspicious": False,
})
continue

# Check instructions for classification
tx_data = tx.get("transaction", {})
msg = tx_data.get("message", {})
instructions = msg.get("instructions", [])

event_type = "Transaction"
details = ""
suspicious = rel_time < 3 # within 3s = sniper

for ix in instructions:
parsed = ix.get("parsed", {})

if "initializeMint" in str(ix) or "InitializeMint" in str(ix):
event_type = "Token Created"
suspicious = False
break
if "setAuthority" in str(ix) or "SetAuthority" in str(ix):
auth_info = parsed.get("info", {})
auth_type = auth_info.get("authorityType", "")
new_auth = auth_info.get("newAuthority", "none")
event_type = f"Authority Change ({auth_type})"
details = f"New: {new_auth[:8]}..." if new_auth != "none" else "Revoked"
if "revoke" in str(ix).lower() or new_auth == "none":
suspicious = False
else:
suspicious = True
break
if "initializeAccount" in str(ix).lower():
continue # noise
if "transfer" in str(ix).lower() or "Transfer" in str(ix):
# Check if large transfer
try:
amt = int(parsed.get("info", {}).get("amount", "0"))
except (ValueError, TypeError):
amt = 0
if amt > 1_000_000_000_000: # > 1M tokens (rough)
event_type = "Large Transfer"
suspicious = True
else:
event_type = "Transfer"
break

events.append({
"time": bt,
"rel_time": rel_str,
"event": event_type,
"tx_sig": tx_sig[:16] + "...",
"details": details,
"suspicious": suspicious,
})

# Sort chronologically
events.sort(key=lambda e: e["time"])
return events


def _format_timeline(mint: str, events: list[dict]) -> str:
"""Format timeline events as human-readable output."""
if not events:
return f"No events found for {mint[:8]}..."

lines = [f"# Timeline for {mint[:8]}..."]
lines.append("")

for e in events:
marker = "⚠️ " if e["suspicious"] else " "
detail = f" — {e['details']}" if e["details"] else ""
lines.append(f"{marker}{e['rel_time']}: {e['event']}{detail}")
lines.append(f" Tx: {e['tx_sig']}")

return "\n".join(lines)


def _format_timeline_json(events: list[dict]) -> str:
"""Format timeline events as JSON."""
clean = [{"rel_time": e["rel_time"], "event": e["event"],
"tx_sig": e["tx_sig"], "details": e["details"],
"suspicious": e["suspicious"]} for e in events]
return json.dumps(clean, indent=2)


# ── CLI Entry Point ────────────────────────────────────────────────────────

def cli_token(args: list[str]) -> None:
Expand Down Expand Up @@ -2158,6 +2312,22 @@ def cli_compare(args: list[str]) -> None:
print(_format_comparison_table(reports, sort_by=sort_by))


def cli_timeline(args: list[str]) -> None:
"""Show token timeline events."""
if not args:
print('Usage: python rugguard.py timeline <MINT> [--json]', file=sys.stderr)
sys.exit(1)

mint = args[0]
as_json = "--json" in args[1:]

events = _fetch_timeline_events(mint.strip())
if as_json:
print(_format_timeline_json(events))
else:
print(_format_timeline(mint, events))


def cli_help() -> None:
print("""Solana Rug Guard — On-chain rug-pull detection engine

Expand All @@ -2166,6 +2336,7 @@ def cli_help() -> None:
python rugguard.py wallet <WALLET_ADDRESS>
python rugguard.py badge <MINT> [--style flat|flat-square|plastic] [--label TEXT]
python rugguard.py compare <MINT1> <MINT2> [<MINT3> ...] [--json]
python rugguard.py timeline <MINT> [--json]
python rugguard.py watch <MINT_ADDRESS> [--interval 60] [--iterations 0]
[--history PATH] [--webhook URL] [--threshold SCORE]

Expand All @@ -2192,6 +2363,7 @@ def cli_help() -> None:
python rugguard.py wallet 9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM --export jsonl
python rugguard.py badge DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263
python rugguard.py compare DezXAZ8z... EPjFWdd5... [--json]
python rugguard.py timeline DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263
python rugguard.py watch <MINT_ADDRESS> --iterations 1 --threshold 70

ENVIRONMENT:
Expand All @@ -2216,6 +2388,8 @@ def main() -> None:
cli_badge(args)
elif cmd == "compare":
cli_compare(args)
elif cmd == "timeline":
cli_timeline(args)
elif cmd == "watch":
cli_watch(args)
else:
Expand Down
Loading
Loading