From 1c133577e1a14494dd9ec20f1ea63432afa27a78 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 8 Apr 2026 14:54:02 +0000 Subject: [PATCH] feat: telegram bot command receiver (closes #135) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add inbound command receiving via the Telegram getUpdates long-polling API so users can interact with qracer remotely from chat. Builds on the existing TelegramAdapter (urllib only — no new dependencies). - TelegramBotPoller: long-polls getUpdates, tracks offset, filters by authorised chat_id, parses messages into BotCommand objects, and can send plain-text replies (auto-truncated to fit Telegram's limit). - BotCommand.parse: parses "/action arg1 arg2" with @botname suffix stripping and case folding. - Server: optional telegram_poller wired into _tick(); dispatches /status, /alerts, /alert, /tasks, /schedule, /help (and /start) using the existing alert and task stores. /analyze and /portfolio reply with a "use the CLI" message until deeper engine plumbing exists. - build_telegram_poller factory in notifications.factory. - cli serve: instantiates the poller from credentials and announces bot mode at startup. --- qracer/cli.py | 9 +- qracer/notifications/__init__.py | 3 + qracer/notifications/factory.py | 24 ++ qracer/notifications/telegram_poller.py | 222 +++++++++++++ qracer/server.py | 154 +++++++++ tests/notifications/test_telegram_poller.py | 336 ++++++++++++++++++++ tests/test_server.py | 292 ++++++++++++++++- 7 files changed, 1038 insertions(+), 2 deletions(-) create mode 100644 qracer/notifications/telegram_poller.py create mode 100644 tests/notifications/test_telegram_poller.py diff --git a/qracer/cli.py b/qracer/cli.py index ec47581..b31113b 100644 --- a/qracer/cli.py +++ b/qracer/cli.py @@ -943,7 +943,10 @@ def serve(check_interval: int) -> None: from qracer.alert_monitor import AlertMonitor from qracer.alerts import AlertStore - from qracer.notifications.factory import build_notification_registry + from qracer.notifications.factory import ( + build_notification_registry, + build_telegram_poller, + ) from qracer.pidfile import acquire, release from qracer.server import Server from qracer.task_executor import TaskExecutor @@ -983,6 +986,7 @@ def serve(check_interval: int) -> None: config = load_config() notifications = build_notification_registry(config.credentials) + telegram_poller = build_telegram_poller(config.credentials) # Autonomous market monitoring from qracer.autonomous import AutonomousMonitor @@ -1004,6 +1008,7 @@ def serve(check_interval: int) -> None: task_executor, notifications, autonomous_monitor=autonomous_monitor, + telegram_poller=telegram_poller, tick_interval=1.0, ) @@ -1023,6 +1028,8 @@ def _handle_signal(signum: int, _frame: object) -> None: f" Autonomous monitoring: threshold={app_cfg.price_move_threshold_pct}%," f" cooldown={app_cfg.alert_cooldown_minutes}m" ) + if telegram_poller is not None: + click.echo(" Telegram bot: receiving commands (try /help in chat)") click.echo(" Press Ctrl+C to stop.\n") try: diff --git a/qracer/notifications/__init__.py b/qracer/notifications/__init__.py index 57437a7..c4b3b74 100644 --- a/qracer/notifications/__init__.py +++ b/qracer/notifications/__init__.py @@ -5,11 +5,14 @@ ) from qracer.notifications.registry import NotificationRegistry from qracer.notifications.telegram_adapter import TelegramAdapter +from qracer.notifications.telegram_poller import BotCommand, TelegramBotPoller __all__ = [ + "BotCommand", "Notification", "NotificationCategory", "NotificationProvider", "NotificationRegistry", "TelegramAdapter", + "TelegramBotPoller", ] diff --git a/qracer/notifications/factory.py b/qracer/notifications/factory.py index e655626..5894245 100644 --- a/qracer/notifications/factory.py +++ b/qracer/notifications/factory.py @@ -15,6 +15,7 @@ from qracer.notifications.registry import NotificationRegistry from qracer.notifications.telegram_adapter import TelegramAdapter +from qracer.notifications.telegram_poller import TelegramBotPoller logger = logging.getLogger(__name__) @@ -43,3 +44,26 @@ def build_notification_registry( ) return registry + + +def build_telegram_poller( + credentials: dict[str, str], + *, + timeout: int = 1, +) -> TelegramBotPoller | None: + """Build a :class:`TelegramBotPoller` if Telegram credentials are present. + + Returns ``None`` when ``TELEGRAM_BOT_TOKEN`` or ``TELEGRAM_CHAT_ID`` are + missing — callers should treat this as "no inbound bot integration". + + The default ``timeout=1`` keeps the long-poll short enough to coexist + with the 1-second :class:`~qracer.server.Server` tick; standalone + callers can pass a larger value (e.g. 30) for true long-polling. + """ + bot_token = credentials.get("TELEGRAM_BOT_TOKEN", "") + chat_id = credentials.get("TELEGRAM_CHAT_ID", "") + if not bot_token or not chat_id: + return None + poller = TelegramBotPoller(bot_token=bot_token, chat_id=chat_id, timeout=timeout) + logger.info("Telegram bot command poller initialised") + return poller diff --git a/qracer/notifications/telegram_poller.py b/qracer/notifications/telegram_poller.py new file mode 100644 index 0000000..9321f22 --- /dev/null +++ b/qracer/notifications/telegram_poller.py @@ -0,0 +1,222 @@ +"""TelegramBotPoller — receive bot commands from Telegram via long-polling. + +Companion to :mod:`qracer.notifications.telegram_adapter` (which only sends). +This module adds inbound command receiving so users can interact with the +qracer service remotely from their phone. + +Uses only the stdlib ``urllib`` so there is no extra dependency, mirroring +the existing TelegramAdapter design. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import urllib.error +import urllib.parse +import urllib.request +from dataclasses import dataclass +from typing import Any + +logger = logging.getLogger(__name__) + +_TELEGRAM_API = "https://api.telegram.org" + +# Telegram caps a single message at 4096 characters; leave a small margin so +# truncation suffixes still fit. +_DEFAULT_MESSAGE_CHAR_LIMIT = 4000 + + +@dataclass(frozen=True) +class BotCommand: + """A parsed bot command from a Telegram message. + + Example:: + + BotCommand.parse("/analyze AAPL") + # → BotCommand(action="analyze", args=["AAPL"], raw_text="/analyze AAPL") + """ + + action: str + args: list[str] + raw_text: str + + @classmethod + def parse(cls, text: str) -> BotCommand | None: + """Parse a Telegram message into a :class:`BotCommand`. + + Returns ``None`` if the text is not a recognised command (i.e. does + not start with ``/`` or contains no action token). + + Telegram bot commands may include a ``@botname`` suffix when sent in + groups (e.g. ``/analyze@qracerbot AAPL``); the suffix is stripped so + the action is just ``analyze``. + """ + text = (text or "").strip() + if not text or not text.startswith("/"): + return None + parts = text[1:].split() + if not parts: + return None + action = parts[0].split("@", 1)[0].lower() + if not action: + return None + return cls(action=action, args=parts[1:], raw_text=text) + + +class TelegramBotPoller: + """Receive bot commands from Telegram via the ``getUpdates`` long-poll API. + + Tracks the update offset so messages are never returned twice, filters + messages to those originating from the authorised chat, and parses + incoming text into :class:`BotCommand` objects. + + Replies can be sent back to the same chat via :meth:`send_reply`. + + Usage:: + + poller = TelegramBotPoller(bot_token="...", chat_id="123") + commands = await poller.poll() + for cmd in commands: + await poller.send_reply(f"Got: {cmd.action}") + """ + + def __init__( + self, + bot_token: str, + chat_id: str, + *, + timeout: int = 30, + message_char_limit: int = _DEFAULT_MESSAGE_CHAR_LIMIT, + ) -> None: + if not bot_token: + raise ValueError("TELEGRAM_BOT_TOKEN is required but was empty") + if not chat_id: + raise ValueError("TELEGRAM_CHAT_ID is required but was empty") + self._bot_token = bot_token + self._chat_id = str(chat_id) + self._timeout = max(0, int(timeout)) + self._message_char_limit = message_char_limit + self._offset: int | None = None + + @property + def offset(self) -> int | None: + """Current update offset (``None`` until the first update arrives).""" + return self._offset + + @property + def chat_id(self) -> str: + """The authorised chat ID this poller filters by.""" + return self._chat_id + + async def poll(self) -> list[BotCommand]: + """Long-poll Telegram for new commands. + + Returns a list of :class:`BotCommand` parsed from messages that + arrived from the authorised chat. The offset is advanced past + the highest update ID returned, so subsequent calls only return + new messages. + + Network and API errors are logged and converted to an empty list + — the caller is expected to retry on the next tick. + """ + return await asyncio.to_thread(self._poll_sync) + + def _poll_sync(self) -> list[BotCommand]: + url = f"{_TELEGRAM_API}/bot{self._bot_token}/getUpdates" + params: dict[str, Any] = { + "timeout": self._timeout, + "allowed_updates": json.dumps(["message"]), + } + if self._offset is not None: + params["offset"] = self._offset + full_url = f"{url}?{urllib.parse.urlencode(params)}" + + try: + with urllib.request.urlopen(full_url, timeout=self._timeout + 5) as resp: + if resp.status != 200: + logger.warning("Telegram getUpdates returned status %s", resp.status) + return [] + payload = json.loads(resp.read().decode("utf-8")) + except urllib.error.HTTPError as exc: + logger.error("Telegram getUpdates failed (HTTP %s): %s", exc.code, exc.reason) + return [] + except urllib.error.URLError as exc: + logger.error("Telegram getUpdates failed (network): %s", exc.reason) + return [] + except (json.JSONDecodeError, ValueError) as exc: + logger.error("Telegram getUpdates returned invalid JSON: %s", exc) + return [] + + if not isinstance(payload, dict) or not payload.get("ok"): + logger.warning( + "Telegram getUpdates returned ok=false: %s", + payload.get("description") if isinstance(payload, dict) else payload, + ) + return [] + + commands: list[BotCommand] = [] + max_update_id = -1 + for update in payload.get("result", []): + update_id = update.get("update_id") + if isinstance(update_id, int) and update_id > max_update_id: + max_update_id = update_id + + message = update.get("message") + if not isinstance(message, dict): + continue + + chat = message.get("chat") or {} + if str(chat.get("id")) != self._chat_id: + logger.debug("Ignoring message from unauthorised chat %s", chat.get("id")) + continue + + text = message.get("text") + if not isinstance(text, str): + continue + + cmd = BotCommand.parse(text) + if cmd is not None: + commands.append(cmd) + + if max_update_id >= 0: + self._offset = max_update_id + 1 + + return commands + + async def send_reply(self, text: str) -> bool: + """Send a plain-text reply to the authorised chat. + + Long replies are truncated to ``message_char_limit`` characters + with a trailing ``"..."``. Returns ``True`` on HTTP 200. + """ + return await asyncio.to_thread(self._send_reply_sync, text) + + def _send_reply_sync(self, text: str) -> bool: + if not text: + return False + if len(text) > self._message_char_limit: + text = text[: self._message_char_limit - 3] + "..." + + url = f"{_TELEGRAM_API}/bot{self._bot_token}/sendMessage" + payload = {"chat_id": self._chat_id, "text": text} + data = json.dumps(payload).encode() + req = urllib.request.Request( + url, + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + with urllib.request.urlopen(req, timeout=10) as resp: + ok = resp.status == 200 + if not ok: + logger.warning("Telegram sendMessage returned status %s", resp.status) + return ok + except urllib.error.HTTPError as exc: + logger.error("Telegram sendMessage failed (HTTP %s): %s", exc.code, exc.reason) + return False + except urllib.error.URLError as exc: + logger.error("Telegram sendMessage failed (network): %s", exc.reason) + return False diff --git a/qracer/server.py b/qracer/server.py index 75c7d38..c9fcb8f 100644 --- a/qracer/server.py +++ b/qracer/server.py @@ -8,12 +8,16 @@ import asyncio import logging +import time from qracer.alert_monitor import AlertMonitor +from qracer.alerts import AlertCondition from qracer.autonomous import AutonomousMonitor from qracer.notifications.providers import Notification, NotificationCategory from qracer.notifications.registry import NotificationRegistry +from qracer.notifications.telegram_poller import BotCommand, TelegramBotPoller from qracer.task_executor import TaskExecutor +from qracer.tasks import TaskActionType logger = logging.getLogger(__name__) @@ -35,18 +39,22 @@ def __init__( notifications: NotificationRegistry | None = None, *, autonomous_monitor: AutonomousMonitor | None = None, + telegram_poller: TelegramBotPoller | None = None, tick_interval: float = 1.0, ) -> None: self._alert_monitor = alert_monitor self._task_executor = task_executor self._autonomous_monitor = autonomous_monitor self._notifications = notifications or NotificationRegistry() + self._telegram_poller = telegram_poller self._tick_interval = tick_interval self._shutdown_event = asyncio.Event() + self._started_at: float | None = None async def run(self) -> None: """Main loop — runs until shutdown() is called.""" logger.info("Server started (tick=%.1fs)", self._tick_interval) + self._started_at = time.monotonic() while not self._shutdown_event.is_set(): await self._tick() @@ -104,6 +112,137 @@ async def _tick(self) -> None: except Exception: logger.debug("Autonomous check failed", exc_info=True) + if self._telegram_poller is not None: + try: + commands = await self._telegram_poller.poll() + except Exception: + logger.debug("Telegram poll failed", exc_info=True) + commands = [] + for command in commands: + await self._handle_bot_command(command) + + async def _handle_bot_command(self, command: BotCommand) -> None: + """Dispatch an incoming bot command and reply with the result.""" + try: + reply = self._dispatch_bot_command(command) + except Exception as exc: + logger.exception("Bot command handler failed: /%s", command.action) + reply = f"Error handling /{command.action}: {exc}" + if reply and self._telegram_poller is not None: + await self._telegram_poller.send_reply(reply) + + def _dispatch_bot_command(self, command: BotCommand) -> str: + """Route a :class:`BotCommand` to the matching handler. + + Handlers return the reply text to send back to the user. Long + replies are truncated by the poller before transmission. + """ + action = command.action + if action in {"help", "start"}: + return self._cmd_help() + if action == "status": + return self._cmd_status() + if action == "alerts": + return self._cmd_alerts() + if action == "alert": + return self._cmd_create_alert(command.args) + if action == "tasks": + return self._cmd_tasks() + if action == "schedule": + return self._cmd_schedule(command.args) + if action in {"analyze", "portfolio"}: + return ( + f"/{action} is not supported in bot mode yet — " + "use the qracer CLI on the host. Try /help." + ) + return f"Unknown command: /{action}. Try /help." + + # ------------------------------------------------------------------ + # Individual command handlers + # ------------------------------------------------------------------ + + @staticmethod + def _cmd_help() -> str: + return ( + "qracer bot commands:\n" + "/status — server status and uptime\n" + "/alerts — list active price alerts\n" + "/alert TICKER above|below PRICE — create a price alert\n" + "/tasks — list scheduled tasks\n" + "/schedule ACTION TICKER SCHEDULE — schedule a task\n" + " e.g. /schedule analyze AAPL every 1h\n" + "/help — show this message" + ) + + def _cmd_status(self) -> str: + uptime = "unknown" + if self._started_at is not None: + uptime = _format_duration(time.monotonic() - self._started_at) + channels = ", ".join(self._notifications.channels) or "none" + autonomous = "on" if self._autonomous_monitor else "off" + return ( + "qracer status\n" + f" uptime: {uptime}\n" + f" notifications: {channels}\n" + f" autonomous: {autonomous}" + ) + + def _cmd_alerts(self) -> str: + alerts = self._alert_monitor.store.get_active() + if not alerts: + return "No active alerts." + lines = ["Active alerts:"] + for a in alerts: + lines.append(f" {a.id} {a.describe()}") + return "\n".join(lines) + + def _cmd_create_alert(self, args: list[str]) -> str: + if len(args) < 3: + return "Usage: /alert TICKER above|below PRICE (e.g. /alert AAPL above 200)" + ticker, condition_str, price_str = args[0], args[1].lower(), args[2] + try: + condition = AlertCondition(condition_str) + except ValueError: + return f"Unknown condition '{condition_str}'. Use 'above' or 'below'." + if condition is AlertCondition.CHANGE_PCT: + return "Use 'above' or 'below' from the bot — change_pct alerts need the CLI." + try: + threshold = float(price_str) + except ValueError: + return f"Invalid price '{price_str}' — must be a number." + alert = self._alert_monitor.store.create(ticker, condition, threshold) + return f"Created alert {alert.id}: {alert.describe()}" + + def _cmd_tasks(self) -> str: + tasks = self._task_executor.store.get_active() + if not tasks: + return "No scheduled tasks." + lines = ["Scheduled tasks:"] + for t in tasks: + lines.append(f" {t.id} {t.describe()}") + return "\n".join(lines) + + def _cmd_schedule(self, args: list[str]) -> str: + if len(args) < 3: + return ( + "Usage: /schedule ACTION TICKER SCHEDULE\n" + " ACTION: analyze | news_scan | portfolio_snapshot\n" + " e.g. /schedule analyze AAPL every 1h" + ) + action_str = args[0].lower() + ticker = args[1].upper() + schedule_spec = " ".join(args[2:]) + try: + action_type = TaskActionType(action_str) + except ValueError: + valid = ", ".join(t.value for t in TaskActionType) + return f"Unknown action '{action_str}'. Valid: {valid}" + try: + task = self._task_executor.store.create(action_type, {"ticker": ticker}, schedule_spec) + except ValueError as exc: + return f"Invalid schedule: {exc}" + return f"Scheduled task {task.id}: {task.describe()}" + async def _notify(self, category: NotificationCategory, title: str, body: str) -> None: """Send a notification if any channels are registered.""" if not self._notifications.channels: @@ -114,3 +253,18 @@ async def _notify(self, category: NotificationCategory, title: str, body: str) - def shutdown(self) -> None: """Signal the server to stop after the current tick.""" self._shutdown_event.set() + + +def _format_duration(seconds: float) -> str: + """Format a duration in seconds as ``"1h 23m 45s"`` (omitting empty units).""" + seconds = max(0, int(seconds)) + hours, rem = divmod(seconds, 3600) + minutes, secs = divmod(rem, 60) + parts: list[str] = [] + if hours: + parts.append(f"{hours}h") + if minutes: + parts.append(f"{minutes}m") + if secs or not parts: + parts.append(f"{secs}s") + return " ".join(parts) diff --git a/tests/notifications/test_telegram_poller.py b/tests/notifications/test_telegram_poller.py new file mode 100644 index 0000000..fd0a713 --- /dev/null +++ b/tests/notifications/test_telegram_poller.py @@ -0,0 +1,336 @@ +"""Tests for TelegramBotPoller and BotCommand.""" + +from __future__ import annotations + +import io +import json +import urllib.error +from unittest.mock import MagicMock, patch + +import pytest + +from qracer.notifications.telegram_poller import ( + BotCommand, + TelegramBotPoller, +) + +# --------------------------------------------------------------------------- +# BotCommand.parse +# --------------------------------------------------------------------------- + + +class TestBotCommandParse: + def test_simple_command(self) -> None: + cmd = BotCommand.parse("/status") + assert cmd is not None + assert cmd.action == "status" + assert cmd.args == [] + assert cmd.raw_text == "/status" + + def test_command_with_args(self) -> None: + cmd = BotCommand.parse("/analyze AAPL") + assert cmd is not None + assert cmd.action == "analyze" + assert cmd.args == ["AAPL"] + + def test_command_with_multiple_args(self) -> None: + cmd = BotCommand.parse("/alert AAPL above 200") + assert cmd is not None + assert cmd.action == "alert" + assert cmd.args == ["AAPL", "above", "200"] + + def test_command_with_botname_suffix(self) -> None: + cmd = BotCommand.parse("/analyze@qracerbot AAPL") + assert cmd is not None + assert cmd.action == "analyze" + assert cmd.args == ["AAPL"] + + def test_action_lowercased(self) -> None: + cmd = BotCommand.parse("/STATUS") + assert cmd is not None + assert cmd.action == "status" + + def test_leading_trailing_whitespace(self) -> None: + cmd = BotCommand.parse(" /tasks ") + assert cmd is not None + assert cmd.action == "tasks" + + def test_non_command_returns_none(self) -> None: + assert BotCommand.parse("hello world") is None + + def test_empty_string_returns_none(self) -> None: + assert BotCommand.parse("") is None + + def test_whitespace_only_returns_none(self) -> None: + assert BotCommand.parse(" ") is None + + def test_lone_slash_returns_none(self) -> None: + assert BotCommand.parse("/") is None + + def test_only_botname_after_slash_returns_none(self) -> None: + assert BotCommand.parse("/@bot") is None + + +# --------------------------------------------------------------------------- +# Construction validation +# --------------------------------------------------------------------------- + + +class TestTelegramBotPollerInit: + def test_requires_bot_token(self) -> None: + with pytest.raises(ValueError, match="TELEGRAM_BOT_TOKEN"): + TelegramBotPoller(bot_token="", chat_id="12345") + + def test_requires_chat_id(self) -> None: + with pytest.raises(ValueError, match="TELEGRAM_CHAT_ID"): + TelegramBotPoller(bot_token="tok", chat_id="") + + def test_chat_id_coerced_to_str(self) -> None: + poller = TelegramBotPoller(bot_token="tok", chat_id=12345) # type: ignore[arg-type] + assert poller.chat_id == "12345" + + def test_initial_offset_is_none(self) -> None: + poller = TelegramBotPoller(bot_token="tok", chat_id="1") + assert poller.offset is None + + +# --------------------------------------------------------------------------- +# poll() — mocked HTTP +# --------------------------------------------------------------------------- + + +def _mock_response(payload: dict) -> MagicMock: + """Build a context-manager mock that ``urlopen`` returns.""" + body = json.dumps(payload).encode() + resp = MagicMock() + resp.status = 200 + resp.read.return_value = body + resp.__enter__ = MagicMock(return_value=resp) + resp.__exit__ = MagicMock(return_value=False) + return resp + + +def _make_update(update_id: int, chat_id: int, text: str) -> dict: + return { + "update_id": update_id, + "message": { + "message_id": update_id, + "chat": {"id": chat_id, "type": "private"}, + "text": text, + }, + } + + +class TestTelegramBotPollerPoll: + @pytest.fixture() + def poller(self) -> TelegramBotPoller: + return TelegramBotPoller(bot_token="tok", chat_id="999", timeout=1) + + async def test_poll_parses_commands_and_advances_offset( + self, poller: TelegramBotPoller + ) -> None: + payload = { + "ok": True, + "result": [ + _make_update(10, 999, "/status"), + _make_update(11, 999, "/analyze AAPL"), + ], + } + target = "qracer.notifications.telegram_poller.urllib.request.urlopen" + with patch(target, return_value=_mock_response(payload)): + commands = await poller.poll() + + assert len(commands) == 2 + assert commands[0].action == "status" + assert commands[1].action == "analyze" + assert commands[1].args == ["AAPL"] + assert poller.offset == 12 # max update_id + 1 + + async def test_poll_filters_unauthorised_chats(self, poller: TelegramBotPoller) -> None: + payload = { + "ok": True, + "result": [ + _make_update(1, 999, "/status"), + _make_update(2, 1234, "/leak"), # different chat — must be ignored + _make_update(3, 999, "/tasks"), + ], + } + target = "qracer.notifications.telegram_poller.urllib.request.urlopen" + with patch(target, return_value=_mock_response(payload)): + commands = await poller.poll() + + actions = [c.action for c in commands] + assert actions == ["status", "tasks"] + # Offset still advances past all updates so we don't re-fetch them. + assert poller.offset == 4 + + async def test_poll_skips_non_text_messages(self, poller: TelegramBotPoller) -> None: + payload = { + "ok": True, + "result": [ + { + "update_id": 5, + "message": { + "message_id": 5, + "chat": {"id": 999, "type": "private"}, + "photo": [{"file_id": "abc"}], + }, + }, + _make_update(6, 999, "/status"), + ], + } + target = "qracer.notifications.telegram_poller.urllib.request.urlopen" + with patch(target, return_value=_mock_response(payload)): + commands = await poller.poll() + + assert len(commands) == 1 + assert commands[0].action == "status" + assert poller.offset == 7 + + async def test_poll_skips_non_command_text(self, poller: TelegramBotPoller) -> None: + payload = { + "ok": True, + "result": [_make_update(1, 999, "hello there")], + } + target = "qracer.notifications.telegram_poller.urllib.request.urlopen" + with patch(target, return_value=_mock_response(payload)): + commands = await poller.poll() + + assert commands == [] + assert poller.offset == 2 # update_id still consumed + + async def test_poll_passes_offset_on_subsequent_calls(self, poller: TelegramBotPoller) -> None: + payload1 = {"ok": True, "result": [_make_update(20, 999, "/status")]} + payload2 = {"ok": True, "result": [_make_update(21, 999, "/tasks")]} + + target = "qracer.notifications.telegram_poller.urllib.request.urlopen" + responses = [_mock_response(payload1), _mock_response(payload2)] + with patch(target, side_effect=responses) as mock: + await poller.poll() + await poller.poll() + + # First call has no offset, second call must include offset=21. + first_url = mock.call_args_list[0][0][0] + second_url = mock.call_args_list[1][0][0] + assert "offset=" not in first_url + assert "offset=21" in second_url + + async def test_poll_returns_empty_on_http_error(self, poller: TelegramBotPoller) -> None: + exc = urllib.error.HTTPError( + url="https://api.telegram.org", + code=500, + msg="Server Error", + hdrs=None, # type: ignore[arg-type] + fp=None, + ) + target = "qracer.notifications.telegram_poller.urllib.request.urlopen" + with patch(target, side_effect=exc): + commands = await poller.poll() + + assert commands == [] + assert poller.offset is None + + async def test_poll_returns_empty_on_url_error(self, poller: TelegramBotPoller) -> None: + target = "qracer.notifications.telegram_poller.urllib.request.urlopen" + with patch(target, side_effect=urllib.error.URLError("Connection refused")): + commands = await poller.poll() + assert commands == [] + + async def test_poll_returns_empty_on_invalid_json(self, poller: TelegramBotPoller) -> None: + resp = MagicMock() + resp.status = 200 + resp.read.return_value = b"" + resp.__enter__ = MagicMock(return_value=resp) + resp.__exit__ = MagicMock(return_value=False) + target = "qracer.notifications.telegram_poller.urllib.request.urlopen" + with patch(target, return_value=resp): + commands = await poller.poll() + assert commands == [] + + async def test_poll_returns_empty_on_ok_false(self, poller: TelegramBotPoller) -> None: + payload = {"ok": False, "description": "bad token"} + target = "qracer.notifications.telegram_poller.urllib.request.urlopen" + with patch(target, return_value=_mock_response(payload)): + commands = await poller.poll() + assert commands == [] + + async def test_poll_uses_token_in_url(self, poller: TelegramBotPoller) -> None: + payload = {"ok": True, "result": []} + target = "qracer.notifications.telegram_poller.urllib.request.urlopen" + with patch(target, return_value=_mock_response(payload)) as mock: + await poller.poll() + url = mock.call_args[0][0] + assert "/bottok/getUpdates" in url + + +# --------------------------------------------------------------------------- +# send_reply — mocked HTTP +# --------------------------------------------------------------------------- + + +class TestTelegramBotPollerReply: + @pytest.fixture() + def poller(self) -> TelegramBotPoller: + return TelegramBotPoller( + bot_token="tok", + chat_id="999", + timeout=1, + message_char_limit=50, + ) + + async def test_send_reply_success(self, poller: TelegramBotPoller) -> None: + resp = MagicMock() + resp.status = 200 + resp.__enter__ = MagicMock(return_value=resp) + resp.__exit__ = MagicMock(return_value=False) + + target = "qracer.notifications.telegram_poller.urllib.request.urlopen" + with patch(target, return_value=resp) as mock: + ok = await poller.send_reply("hello") + + assert ok is True + req = mock.call_args[0][0] + assert "/bottok/sendMessage" in req.full_url + body = json.loads(req.data) + assert body == {"chat_id": "999", "text": "hello"} + + async def test_send_reply_truncates_long_text(self, poller: TelegramBotPoller) -> None: + resp = MagicMock() + resp.status = 200 + resp.__enter__ = MagicMock(return_value=resp) + resp.__exit__ = MagicMock(return_value=False) + + long_text = "x" * 200 + target = "qracer.notifications.telegram_poller.urllib.request.urlopen" + with patch(target, return_value=resp) as mock: + await poller.send_reply(long_text) + + body = json.loads(mock.call_args[0][0].data) + assert len(body["text"]) == 50 # message_char_limit + assert body["text"].endswith("...") + + async def test_send_reply_empty_text_returns_false(self, poller: TelegramBotPoller) -> None: + target = "qracer.notifications.telegram_poller.urllib.request.urlopen" + with patch(target) as mock: + ok = await poller.send_reply("") + assert ok is False + mock.assert_not_called() + + async def test_send_reply_http_error_returns_false(self, poller: TelegramBotPoller) -> None: + exc = urllib.error.HTTPError( + url="https://api.telegram.org", + code=403, + msg="Forbidden", + hdrs=None, # type: ignore[arg-type] + fp=io.BytesIO(b""), + ) + target = "qracer.notifications.telegram_poller.urllib.request.urlopen" + with patch(target, side_effect=exc): + ok = await poller.send_reply("hello") + assert ok is False + + async def test_send_reply_url_error_returns_false(self, poller: TelegramBotPoller) -> None: + target = "qracer.notifications.telegram_poller.urllib.request.urlopen" + with patch(target, side_effect=urllib.error.URLError("offline")): + ok = await poller.send_reply("hello") + assert ok is False diff --git a/tests/test_server.py b/tests/test_server.py index 4f75072..ded6641 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -5,7 +5,10 @@ import asyncio from unittest.mock import AsyncMock, MagicMock -from qracer.server import Server +from qracer.alerts import Alert, AlertCondition +from qracer.notifications.telegram_poller import BotCommand +from qracer.server import Server, _format_duration +from qracer.tasks import Task, TaskActionType, TaskScheduleType, TaskStatus def _make_monitor(triggered=None): @@ -22,6 +25,13 @@ def _make_executor(results=None): return executor +def _make_poller(): + poller = MagicMock() + poller.poll = AsyncMock(return_value=[]) + poller.send_reply = AsyncMock(return_value=True) + return poller + + class TestServer: async def test_tick_checks_alerts_and_tasks(self) -> None: monitor = _make_monitor() @@ -121,3 +131,283 @@ async def test_tick_handles_task_error_gracefully(self) -> None: server = Server(monitor, executor) await server._tick() # Should not raise + + +# --------------------------------------------------------------------------- +# Telegram bot command integration +# --------------------------------------------------------------------------- + + +def _alert(id_: str, ticker: str, threshold: float, active: bool = True) -> Alert: + return Alert( + id=id_, + ticker=ticker, + condition=AlertCondition.ABOVE, + threshold=threshold, + created_at="2026-01-01T00:00:00+00:00", + active=active, + ) + + +def _task(id_: str, ticker: str, schedule: str = "every 1h") -> Task: + return Task( + id=id_, + action_type=TaskActionType.ANALYZE, + action_params={"ticker": ticker}, + schedule_type=TaskScheduleType.RECURRING, + schedule_spec=schedule, + status=TaskStatus.PENDING, + created_at="2026-01-01T00:00:00+00:00", + next_run_at="2026-01-01T01:00:00+00:00", + ) + + +class TestServerTelegramPoller: + async def test_tick_polls_telegram_when_configured(self) -> None: + monitor = _make_monitor() + executor = _make_executor() + poller = _make_poller() + server = Server(monitor, executor, telegram_poller=poller) + + await server._tick() + + poller.poll.assert_awaited_once() + + async def test_tick_skips_telegram_when_not_configured(self) -> None: + monitor = _make_monitor() + executor = _make_executor() + server = Server(monitor, executor) + + await server._tick() # No poller — should not raise. + + async def test_tick_dispatches_command_and_replies(self) -> None: + monitor = _make_monitor() + monitor.store.get_active.return_value = [] + executor = _make_executor() + poller = _make_poller() + poller.poll = AsyncMock( + return_value=[BotCommand(action="alerts", args=[], raw_text="/alerts")] + ) + server = Server(monitor, executor, telegram_poller=poller) + + await server._tick() + + poller.send_reply.assert_awaited_once() + assert "No active alerts" in poller.send_reply.await_args[0][0] + + async def test_tick_handles_poll_failure_gracefully(self) -> None: + monitor = _make_monitor() + executor = _make_executor() + poller = _make_poller() + poller.poll = AsyncMock(side_effect=RuntimeError("boom")) + server = Server(monitor, executor, telegram_poller=poller) + + await server._tick() # Should not raise + + poller.send_reply.assert_not_called() + + async def test_tick_handles_dispatch_exception_with_error_reply(self) -> None: + monitor = _make_monitor() + monitor.store.get_active.side_effect = RuntimeError("db down") + executor = _make_executor() + poller = _make_poller() + poller.poll = AsyncMock( + return_value=[BotCommand(action="alerts", args=[], raw_text="/alerts")] + ) + server = Server(monitor, executor, telegram_poller=poller) + + await server._tick() + + poller.send_reply.assert_awaited_once() + msg = poller.send_reply.await_args[0][0] + assert "Error handling /alerts" in msg + assert "db down" in msg + + +class TestBotCommandHandlers: + @staticmethod + def _server(monitor=None, executor=None, **kwargs) -> Server: + return Server( + monitor or _make_monitor(), + executor or _make_executor(), + **kwargs, + ) + + def test_help(self) -> None: + server = self._server() + out = server._dispatch_bot_command(BotCommand("help", [], "/help")) + assert "/status" in out + assert "/alerts" in out + assert "/alert" in out + assert "/tasks" in out + assert "/schedule" in out + + def test_start_aliases_help(self) -> None: + server = self._server() + out = server._dispatch_bot_command(BotCommand("start", [], "/start")) + assert "/status" in out + + def test_unknown_command(self) -> None: + server = self._server() + out = server._dispatch_bot_command(BotCommand("nope", [], "/nope")) + assert "Unknown command" in out + + def test_analyze_returns_not_supported(self) -> None: + server = self._server() + out = server._dispatch_bot_command(BotCommand("analyze", ["AAPL"], "/analyze AAPL")) + assert "not supported" in out.lower() + + def test_portfolio_returns_not_supported(self) -> None: + server = self._server() + out = server._dispatch_bot_command(BotCommand("portfolio", [], "/portfolio")) + assert "not supported" in out.lower() + + def test_status(self) -> None: + notifications = MagicMock() + notifications.channels = ["telegram"] + server = self._server(notifications=notifications) + out = server._dispatch_bot_command(BotCommand("status", [], "/status")) + assert "uptime" in out + assert "telegram" in out + assert "autonomous: off" in out + + def test_alerts_empty(self) -> None: + monitor = _make_monitor() + monitor.store.get_active.return_value = [] + server = self._server(monitor=monitor) + out = server._dispatch_bot_command(BotCommand("alerts", [], "/alerts")) + assert out == "No active alerts." + + def test_alerts_lists_each(self) -> None: + monitor = _make_monitor() + monitor.store.get_active.return_value = [ + _alert("a1", "AAPL", 200), + _alert("b2", "MSFT", 410), + ] + server = self._server(monitor=monitor) + out = server._dispatch_bot_command(BotCommand("alerts", [], "/alerts")) + assert "a1" in out + assert "AAPL" in out + assert "b2" in out + assert "MSFT" in out + + def test_create_alert_validates_args(self) -> None: + server = self._server() + out = server._dispatch_bot_command(BotCommand("alert", ["AAPL"], "/alert AAPL")) + assert "Usage" in out + + def test_create_alert_rejects_unknown_condition(self) -> None: + server = self._server() + out = server._dispatch_bot_command( + BotCommand("alert", ["AAPL", "near", "200"], "/alert AAPL near 200") + ) + assert "Unknown condition" in out + + def test_create_alert_rejects_change_pct(self) -> None: + server = self._server() + out = server._dispatch_bot_command( + BotCommand("alert", ["AAPL", "change_pct", "5"], "/alert AAPL change_pct 5") + ) + assert "change_pct" in out + assert "CLI" in out + + def test_create_alert_rejects_invalid_price(self) -> None: + server = self._server() + out = server._dispatch_bot_command( + BotCommand("alert", ["AAPL", "above", "abc"], "/alert AAPL above abc") + ) + assert "Invalid price" in out + + def test_create_alert_persists(self) -> None: + monitor = _make_monitor() + created = _alert("xx", "AAPL", 200) + monitor.store.create.return_value = created + server = self._server(monitor=monitor) + out = server._dispatch_bot_command( + BotCommand("alert", ["AAPL", "above", "200"], "/alert AAPL above 200") + ) + monitor.store.create.assert_called_once_with("AAPL", AlertCondition.ABOVE, 200.0) + assert "Created alert xx" in out + + def test_tasks_empty(self) -> None: + executor = _make_executor() + executor.store.get_active.return_value = [] + server = self._server(executor=executor) + out = server._dispatch_bot_command(BotCommand("tasks", [], "/tasks")) + assert out == "No scheduled tasks." + + def test_tasks_lists_each(self) -> None: + executor = _make_executor() + executor.store.get_active.return_value = [ + _task("t1", "AAPL"), + _task("t2", "MSFT", schedule="daily 09:30"), + ] + server = self._server(executor=executor) + out = server._dispatch_bot_command(BotCommand("tasks", [], "/tasks")) + assert "t1" in out + assert "AAPL" in out + assert "t2" in out + assert "daily 09:30" in out + + def test_schedule_validates_args(self) -> None: + server = self._server() + out = server._dispatch_bot_command(BotCommand("schedule", ["analyze"], "/schedule analyze")) + assert "Usage" in out + + def test_schedule_rejects_unknown_action(self) -> None: + server = self._server() + out = server._dispatch_bot_command( + BotCommand( + "schedule", + ["foo", "AAPL", "every", "1h"], + "/schedule foo AAPL every 1h", + ) + ) + assert "Unknown action" in out + + def test_schedule_rejects_invalid_spec(self) -> None: + executor = _make_executor() + executor.store.create.side_effect = ValueError("bad spec") + server = self._server(executor=executor) + out = server._dispatch_bot_command( + BotCommand( + "schedule", + ["analyze", "AAPL", "tomorrow"], + "/schedule analyze AAPL tomorrow", + ) + ) + assert "Invalid schedule" in out + assert "bad spec" in out + + def test_schedule_creates_task(self) -> None: + executor = _make_executor() + executor.store.create.return_value = _task("nn", "AAPL") + server = self._server(executor=executor) + out = server._dispatch_bot_command( + BotCommand( + "schedule", + ["analyze", "aapl", "every", "1h"], + "/schedule analyze aapl every 1h", + ) + ) + executor.store.create.assert_called_once_with( + TaskActionType.ANALYZE, {"ticker": "AAPL"}, "every 1h" + ) + assert "Scheduled task nn" in out + + +class TestFormatDuration: + def test_zero(self) -> None: + assert _format_duration(0) == "0s" + + def test_seconds(self) -> None: + assert _format_duration(45) == "45s" + + def test_minutes(self) -> None: + assert _format_duration(125) == "2m 5s" + + def test_hours(self) -> None: + assert _format_duration(3725) == "1h 2m 5s" + + def test_negative_clamped_to_zero(self) -> None: + assert _format_duration(-5) == "0s"