diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..09288aa --- /dev/null +++ b/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install solana-rug and its Telegram bot dependencies +COPY scripts/ scripts/ +COPY solana_rug/ solana_rug/ +COPY pyproject.toml setup.py ./ + +RUN pip install --no-cache-dir . python-telegram-bot + +ENV TELEGRAM_BOT_TOKEN="" +ENV SOLANA_RPC_URL="" + +CMD ["python", "-m", "scripts.telegram_bot"] diff --git a/scripts/rugguard.py b/scripts/rugguard.py index 4117171..63107eb 100644 --- a/scripts/rugguard.py +++ b/scripts/rugguard.py @@ -1745,6 +1745,50 @@ def format_json(report: RugReport) -> str: return json.dumps(report.to_dict(), indent=2, default=str) +def _svg_badge(report: RugReport, style: str = "flat", label: str = "safety") -> str: + """Generate a shields.io-compatible SVG badge.""" + score = report.safety_score + level = report.risk_level + if score >= 70: + bg = "#4c1" + elif score >= 40: + bg = "#e67e22" + elif score >= 20: + bg = "#e74c3c" + else: + bg = "#c0392b" + + label_text = label + value_text = str(score) + "/100 - " + level + label_w = max(len(label_text) * 7 + 10, 40) + value_w = len(value_text) * 7 + 10 + total_w = label_w + value_w + h = 20 + rx = 3 if style == "flat" else 0 + lx = label_w // 2 + vx = label_w + value_w // 2 + + lines = [] + lines.append('') + lines.append('' + '' + '' + '') + lines.append('') + lines.append('') + lines.append('') + lines.append('') + lines.append('' + _escape_svg(label_text) + '') + lines.append('' + _escape_svg(value_text) + '') + lines.append('') + return "\n".join(lines) + + +def _escape_svg(text: str) -> str: + return (text.replace("&", "&").replace("<", "<").replace(">", ">")) + + def _report_csv_rows(report: RugReport) -> list[dict]: """Build a list of flat dicts (one per token) for CSV/JSONL export from a token report.""" d = report.to_dict() @@ -2030,6 +2074,31 @@ def cli_wallet(args: list[str]) -> None: sys.exit(2) +def cli_badge(args: list[str]) -> None: + """Generate an SVG safety score badge for a token.""" + if not args: + print('Usage: python rugguard.py badge [--style flat|flat-square|plastic]', + '[--label TEXT]', file=sys.stderr) + sys.exit(1) + + mint = args[0] + style = "flat" + label = "safety" + + for idx, a in enumerate(args): + if a.startswith("--style="): + style = a.split("=", 1)[1] + elif a == "--style" and idx + 1 < len(args): + style = args[idx + 1] + if a.startswith("--label="): + label = a.split("=", 1)[1] + elif a == "--label" and idx + 1 < len(args): + label = args[idx + 1] + + report = rug_check_token(mint.strip()) + print(_svg_badge(report, style=style, label=label)) + + def cli_compare(args: list[str]) -> None: """Compare multiple tokens side-by-side.""" if not args: @@ -2095,6 +2164,7 @@ def cli_help() -> None: USAGE: python rugguard.py token [--json|--markdown] python rugguard.py wallet + python rugguard.py badge [--style flat|flat-square|plastic] [--label TEXT] python rugguard.py compare [ ...] [--json] python rugguard.py watch [--interval 60] [--iterations 0] [--history PATH] [--webhook URL] [--threshold SCORE] @@ -2120,6 +2190,7 @@ def cli_help() -> None: python rugguard.py token DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263 --markdown python rugguard.py wallet 9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM python rugguard.py wallet 9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM --export jsonl + python rugguard.py badge DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263 python rugguard.py compare DezXAZ8z... EPjFWdd5... [--json] python rugguard.py watch --iterations 1 --threshold 70 @@ -2141,6 +2212,8 @@ def main() -> None: cli_token(args) elif cmd == "wallet": cli_wallet(args) + elif cmd == "badge": + cli_badge(args) elif cmd == "compare": cli_compare(args) elif cmd == "watch": diff --git a/scripts/telegram_bot.py b/scripts/telegram_bot.py new file mode 100644 index 0000000..5e890a0 --- /dev/null +++ b/scripts/telegram_bot.py @@ -0,0 +1,365 @@ +#!/usr/bin/env python3 +"""telegram_bot.py — Telegram bot for Solana Rug Guard. + +Provides /check and /watch commands for Solana token safety checks. +Shares SQLite watch database with CLI watch mode. + +Usage: + TELEGRAM_BOT_TOKEN=xxx python telegram_bot.py + +Dependencies (optional): + pip install python-telegram-bot +""" + +from __future__ import annotations + +import json +import logging +import os +import sqlite3 +import sys +import time +from pathlib import Path +from typing import Any + +# Add scripts to path so we can import rugguard +_scripts = str(Path(__file__).resolve().parent) +if _scripts not in sys.path: + sys.path.insert(0, _scripts) + +import rugguard # noqa: E402 + +logging.basicConfig( + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + level=logging.INFO, +) +logger = logging.getLogger("solana-rug-bot") + +# ── Configuration ────────────────────────────────────────────────────────── + +TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "") +if not TOKEN: + print("Error: TELEGRAM_BOT_TOKEN env var is required", file=sys.stderr) + sys.exit(1) + +WATCH_DB = os.environ.get( + "SOLANA_RUG_HISTORY", + str(Path.home() / ".solana-rug" / "history.sqlite3"), +) + +# Rate limiting: max 1 request per 5 seconds per user +RATE_LIMIT_SECONDS = 5 +_last_request: dict[int, float] = {} + +# ── Database ─────────────────────────────────────────────────────────────── + +def _ensure_db() -> sqlite3.Connection: + """Open or create the watch database, ensuring the telegram_watches table exists.""" + Path(WATCH_DB).parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(WATCH_DB) + conn.execute(""" + CREATE TABLE IF NOT EXISTS telegram_watches ( + chat_id INTEGER NOT NULL, + mint TEXT NOT NULL, + last_score REAL NOT NULL DEFAULT 0, + created_at REAL NOT NULL DEFAULT (julianday('now')), + PRIMARY KEY (chat_id, mint) + ) + """) + conn.commit() + return conn + + +def _add_watch(chat_id: int, mint: str) -> None: + conn = _ensure_db() + try: + conn.execute( + "INSERT OR IGNORE INTO telegram_watches (chat_id, mint) VALUES (?, ?)", + (chat_id, mint), + ) + conn.commit() + finally: + conn.close() + + +def _remove_watch(chat_id: int, mint: str) -> None: + conn = _ensure_db() + try: + conn.execute( + "DELETE FROM telegram_watches WHERE chat_id = ? AND mint = ?", + (chat_id, mint), + ) + conn.commit() + finally: + conn.close() + + +def _get_watches(chat_id: int | None = None) -> list[dict[str, Any]]: + conn = _ensure_db() + try: + if chat_id: + rows = conn.execute( + "SELECT chat_id, mint, last_score FROM telegram_watches WHERE chat_id = ?", + (chat_id,), + ).fetchall() + else: + rows = conn.execute( + "SELECT chat_id, mint, last_score FROM telegram_watches", + ).fetchall() + return [ + {"chat_id": r[0], "mint": r[1], "last_score": r[2]} for r in rows + ] + finally: + conn.close() + + +def _update_score(chat_id: int, mint: str, score: float) -> None: + conn = _ensure_db() + try: + conn.execute( + "UPDATE telegram_watches SET last_score = ? WHERE chat_id = ? AND mint = ?", + (score, chat_id, mint), + ) + conn.commit() + finally: + conn.close() + + +# ── Rate Limiter ─────────────────────────────────────────────────────────── + +def _check_rate_limit(chat_id: int) -> bool: + """Return True if request is allowed, False if rate-limited.""" + now = time.time() + last = _last_request.get(chat_id, 0) + if now - last < RATE_LIMIT_SECONDS: + return False + _last_request[chat_id] = now + return True + + +# ── Report Formatting ────────────────────────────────────────────────────── + +def _format_report(mint: str) -> str: + """Generate a compact safety report for Telegram (under 4096 chars).""" + try: + report = rugguard.rug_check_token(mint.strip()) + except Exception as e: + return f"Error checking {mint[:8]}...: {str(e)[:200]}" + + symbol = report.token.symbol or report.token.name or f"{mint[:4]}...{mint[-4:]}" + lines = [ + f"*🛡️ {symbol} Safety Report*", + f"", + f"**Score:** {report.safety_score}/100 — **{report.risk_level}**", + f"**Mint:** `{mint}`", + "", + ] + + # Score breakdown + s = report.score + lines.append("*Score Factors:*") + items = [ + ("Mint", s.mint_authority_risk), + ("Freeze", s.freeze_authority_risk), + ("Liquidity", s.liquidity_risk), + ("Holders", s.holder_concentration_risk), + ("Age", s.age_risk), + ("Honeypot", s.honeypot_risk), + ("Sniper", s.sniper_risk), + ("Name", s.name_risk), + ] + for name, val in items: + icon = "✅" if val == 0 else "⚠️" if val < 5 else "🔴" + lines.append(f" {icon} {name}: {val}/10") + + # Warnings + if report.warnings: + lines.append("") + lines.append("*Warnings:*") + for w in report.warnings[:5]: + lines.append(f" ⚠️ {w[:120]}") + + # Market data + if report.dex_data: + dd = report.dex_data + lines.append("") + lines.append("*Market:*") + if dd.get("price_usd"): + p = dd["price_usd"] + lines.append(f" Price: ${p:.8f}" if p < 1 else f" Price: ${p:.4f}") + if dd.get("price_change_24h"): + pct = dd["price_change_24h"] + arrow = "📈" if pct > 0 else "📉" + lines.append(f" 24h: {arrow} {pct:+.2f}%") + if dd.get("liquidity_usd"): + lines.append(f" Liq: ${dd['liquidity_usd']:,.0f}") + + lines.append("") + lines.append(report.recommendation[:200]) + return "\n".join(lines) + + +# ── Bot Handlers ─────────────────────────────────────────────────────────── + +def _start_handler(update: Any, context: Any) -> None: + """Handle /start command.""" + update.message.reply_text( + "🤖 *Solana Rug Guard Bot*\n\n" + "I check Solana tokens for rug-pull risks.\n\n" + "*/check * — Get safety report\n" + "*/watch * — Start monitoring\n" + "*/unwatch * — Stop monitoring\n" + "*/watches* — List your watched tokens\n" + "*/help* — This message\n\n" + "Get a bot token from @Botfather and set TELEGRAM_BOT_TOKEN.", + parse_mode="Markdown", + ) + + +def _check_handler(update: Any, context: Any) -> None: + """Handle /check command.""" + chat_id = update.effective_user.id + if not _check_rate_limit(chat_id): + update.message.reply_text("⏳ Please wait a few seconds between checks.") + return + + args = context.args + if not args: + update.message.reply_text("Usage: /check ") + return + + mint = args[0] + msg = update.message.reply_text("🔍 Checking token...") + report = _format_report(mint) + try: + msg.edit_text(report, parse_mode="Markdown") + except Exception: + # Fallback if markdown formatting fails + msg.edit_text(report.replace("*", "").replace("_", "")) + + +def _watch_handler(update: Any, context: Any) -> None: + """Handle /watch command.""" + chat_id = update.effective_user.id + args = context.args + if not args: + update.message.reply_text("Usage: /watch ") + return + + mint = args[0] + _add_watch(chat_id, mint) + + # Get initial score + try: + report = rugguard.rug_check_token(mint.strip()) + _update_score(chat_id, mint, report.safety_score) + except Exception: + pass + + update.message.reply_text( + f"✅ Watching `{mint[:8]}...`\n" + f"I'll alert you if the score drops significantly.", + parse_mode="Markdown", + ) + + +def _unwatch_handler(update: Any, context: Any) -> None: + """Handle /unwatch command.""" + chat_id = update.effective_user.id + args = context.args + if not args: + update.message.reply_text("Usage: /unwatch ") + return + + mint = args[0] + _remove_watch(chat_id, mint) + update.message.reply_text(f"Stopped watching `{mint[:8]}...`", parse_mode="Markdown") + + +def _watches_handler(update: Any, context: Any) -> None: + """Handle /watches command.""" + chat_id = update.effective_user.id + watches = _get_watches(chat_id) + if not watches: + update.message.reply_text("No watched tokens. Use /watch to add one.") + return + + lines = ["*Your watched tokens:*"] + for w in watches: + lines.append(f" `{w['mint'][:8]}...` — Score: {w['last_score']:.0f}") + update.message.reply_text("\n".join(lines), parse_mode="Markdown") + + +# ── Alert Checker ───────────────────────────────────────────────────────── + +def check_alerts(bot: Any) -> None: + """Check all watched tokens for score drops > 10 points.""" + watches = _get_watches() + for w in watches: + try: + report = rugguard.rug_check_token(w["mint"]) + except Exception: + continue + + new_score = report.safety_score + old_score = w["last_score"] + drop = old_score - new_score + + if drop >= 10: + _update_score(w["chat_id"], w["mint"], new_score) + symbol = report.token.symbol or w["mint"][:8] + try: + bot.send_message( + chat_id=w["chat_id"], + text=( + f"⚠️ *{symbol} Alert!*\n" + f"Score dropped: {old_score:.0f} \u2192 {new_score:.0f} ({drop:.0f} pts)\n" + f"Risk: *{report.risk_level}*\n" + f"Warnings: {len(report.warnings)}\n" + f"Use /check `{w['mint'][:8]}...` for full report." + ), + parse_mode="Markdown", + ) + except Exception: + pass + elif abs(new_score - old_score) >= 3: + # Minor update — just persist the new score + _update_score(w["chat_id"], w["mint"], new_score) + + +# ── Main ─────────────────────────────────────────────────────────────────── + +def main() -> None: + from telegram.ext import CommandHandler, Updater + + updater = Updater(token=TOKEN, use_context=True) + dp = updater.dispatcher + + dp.add_handler(CommandHandler("start", _start_handler)) + dp.add_handler(CommandHandler("help", _start_handler)) + dp.add_handler(CommandHandler("check", _check_handler)) + dp.add_handler(CommandHandler("watch", _watch_handler)) + dp.add_handler(CommandHandler("unwatch", _unwatch_handler)) + dp.add_handler(CommandHandler("watches", _watches_handler)) + + # Start periodic alert checking (every 5 minutes) + import threading + + def _alert_loop(): + while True: + time.sleep(300) + try: + check_alerts(updater.bot) + except Exception as e: + logger.error("Alert check failed: %s", e) + + thread = threading.Thread(target=_alert_loop, daemon=True) + thread.start() + + logger.info("Bot started. Press Ctrl+C to stop.") + updater.start_polling() + updater.idle() + + +if __name__ == "__main__": + main() diff --git a/tests/test_checks.py b/tests/test_checks.py index 8ab29a3..d6c66ed 100644 --- a/tests/test_checks.py +++ b/tests/test_checks.py @@ -694,3 +694,72 @@ def test_sort_by_name(self): def test_empty_no_crash(self): from rugguard import _format_comparison_table assert _format_comparison_table([]) == "" + + +# ── Badge Tests ─────────────────────────────────────────────────────────── + +class TestBadge: + def test_badge_green_low(self): + from rugguard import RugFlags, RugScore, TokenMeta, _svg_badge + flags = RugFlags() + r = RugReport( + token=TokenMeta(address='A'), safety_score=95, risk_level='LOW', + score=RugScore(), flags=flags, warnings=[], recommendation='', + ) + svg = _svg_badge(r) + assert '#4c1' in svg # green + assert '95/100' in svg + assert 'LOW' in svg + + def test_badge_yellow_medium(self): + from rugguard import RugFlags, RugScore, TokenMeta, _svg_badge + flags = RugFlags() + r = RugReport( + token=TokenMeta(address='A'), safety_score=55, risk_level='MEDIUM', + score=RugScore(), flags=flags, warnings=[], recommendation='', + ) + svg = _svg_badge(r) + assert '#e67e22' in svg # yellow + assert '55/100' in svg + + def test_badge_red_high(self): + from rugguard import RugFlags, RugScore, TokenMeta, _svg_badge + flags = RugFlags() + r = RugReport( + token=TokenMeta(address='A'), safety_score=25, risk_level='HIGH', + score=RugScore(), flags=flags, warnings=[], recommendation='', + ) + svg = _svg_badge(r) + assert '#e74c3c' in svg # red + + def test_badge_darkred_critical(self): + from rugguard import RugFlags, RugScore, TokenMeta, _svg_badge + flags = RugFlags() + r = RugReport( + token=TokenMeta(address='A'), safety_score=10, risk_level='CRITICAL', + score=RugScore(), flags=flags, warnings=[], recommendation='', + ) + svg = _svg_badge(r) + assert '#c0392b' in svg # dark red + + def test_badge_custom_label(self): + from rugguard import RugFlags, RugScore, TokenMeta, _svg_badge + flags = RugFlags() + r = RugReport( + token=TokenMeta(address='A'), safety_score=80, risk_level='LOW', + score=RugScore(), flags=flags, warnings=[], recommendation='', + ) + svg = _svg_badge(r, label='rugcheck') + assert 'rugcheck' in svg + + def test_badge_is_valid_svg(self): + from rugguard import RugFlags, RugScore, TokenMeta, _svg_badge + flags = RugFlags() + r = RugReport( + token=TokenMeta(address='A'), safety_score=80, risk_level='LOW', + score=RugScore(), flags=flags, warnings=[], recommendation='', + ) + svg = _svg_badge(r) + assert svg.startswith('') + assert 'xmlns=' in svg diff --git a/tests/test_telegram_bot.py b/tests/test_telegram_bot.py new file mode 100644 index 0000000..3758b45 --- /dev/null +++ b/tests/test_telegram_bot.py @@ -0,0 +1,155 @@ +"""Tests for the Telegram bot (mocked).""" + +from __future__ import annotations + +import sys +from pathlib import Path + +# Add scripts to path +_scripts = str(Path(__file__).resolve().parent.parent / "scripts") +if _scripts not in sys.path: + sys.path.insert(0, _scripts) + + +class TestBotRateLimit: + """Test rate limiter logic without needing python-telegram-bot.""" + + def test_rate_limit_allows_first(self): + import time + # Simulate the _check_rate_limit function + last_request = {} + + def check(chat_id): + now = time.time() + last = last_request.get(chat_id, 0) + if now - last < 5: + return False + last_request[chat_id] = now + return True + + assert check(123) is True + + def test_rate_limit_blocks_rapid(self): + import time + last_request = {} + + def check(chat_id): + now = time.time() + last = last_request.get(chat_id, 0) + if now - last < 5: + return False + last_request[chat_id] = now + return True + + check(123) + assert check(123) is False + + def test_rate_limit_different_users(self): + import time + last_request = {} + + def check(chat_id): + now = time.time() + last = last_request.get(chat_id, 0) + if now - last < 5: + return False + last_request[chat_id] = now + return True + + check(123) + assert check(456) is True # different user + + +class TestBotFormatting: + """Test report formatting for Telegram.""" + + def test_format_report_structure(self): + # Create a minimal report (no RPC needed) + from rugguard import RugFlags, RugReport, RugScore, TokenMeta + + flags = RugFlags() + report = RugReport( + token=TokenMeta(address="TestMint", name="TestCoin", symbol="TST"), + safety_score=65, risk_level="MEDIUM", + score=RugScore(mint_authority_risk=3, honeypot_risk=2), + flags=flags, + warnings=["Mint authority is still active"], + recommendation="Exercise caution with this token.", + ) + + # Test the formatting logic + lines = [] + lines.append(f"Score: {report.safety_score}/100 - {report.risk_level}") + lines.append(f"Mint: `{report.token.address}`") + assert "65" in lines[0] + assert "MEDIUM" in lines[0] + assert "TestMint" in lines[1] + + def test_format_report_market_data(self): + from rugguard import RugFlags, RugReport, RugScore, TokenMeta + flags = RugFlags() + report = RugReport( + token=TokenMeta(address="Addr"), + safety_score=90, risk_level="LOW", + score=RugScore(), flags=flags, warnings=[], recommendation="Safe", + ) + report.dex_data = { + "price_usd": 0.00001234, + "price_change_24h": 5.5, + "liquidity_usd": 50000, + } + + # Manual formatting test + dd = report.dex_data + assert dd["price_usd"] == 0.00001234 + assert dd["price_change_24h"] == 5.5 + assert dd["liquidity_usd"] == 50000 + + +class TestBotWatchDB: + """Test watch database operations.""" + + def test_add_and_get_watch(self, tmp_path): + import sqlite3 + db_path = tmp_path / "test_watch.sqlite3" + conn = sqlite3.connect(str(db_path)) + conn.execute(""" + CREATE TABLE IF NOT EXISTS telegram_watches ( + chat_id INTEGER NOT NULL, + mint TEXT NOT NULL, + last_score REAL NOT NULL DEFAULT 0, + created_at REAL NOT NULL DEFAULT (julianday('now')), + PRIMARY KEY (chat_id, mint) + ) + """) + conn.execute("INSERT INTO telegram_watches (chat_id, mint) VALUES (?, ?)", (123, "TestMint")) + conn.commit() + + rows = conn.execute("SELECT chat_id, mint FROM telegram_watches").fetchall() + assert len(rows) == 1 + assert rows[0][0] == 123 + assert rows[0][1] == "TestMint" + conn.close() + + def test_remove_watch(self, tmp_path): + import sqlite3 + db_path = tmp_path / "test_watch2.sqlite3" + conn = sqlite3.connect(str(db_path)) + conn.execute(""" + CREATE TABLE IF NOT EXISTS telegram_watches ( + chat_id INTEGER NOT NULL, + mint TEXT NOT NULL, + last_score REAL NOT NULL DEFAULT 0, + created_at REAL NOT NULL DEFAULT (julianday('now')), + PRIMARY KEY (chat_id, mint) + ) + """) + conn.execute("INSERT INTO telegram_watches (chat_id, mint) VALUES (?, ?)", (123, "Mint1")) + conn.execute("INSERT INTO telegram_watches (chat_id, mint) VALUES (?, ?)", (123, "Mint2")) + conn.execute("DELETE FROM telegram_watches WHERE chat_id = ? AND mint = ?", (123, "Mint1")) + conn.commit() + + rows = conn.execute("SELECT mint FROM telegram_watches").fetchall() + assert len(rows) == 1 + assert rows[0][0] == "Mint2" + conn.close()