Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions qracer/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1048,9 +1048,12 @@ def serve(check_interval: int) -> None:
from qracer.autonomous import AutonomousMonitor
from qracer.watchlist import Watchlist

# Watchlist is shared by the autonomous monitor and the Telegram
# /watchlist + /briefing commands, so build it unconditionally.
watchlist = Watchlist(_user_dir() / "watchlist.json")

autonomous_monitor: AutonomousMonitor | None = None
if app_cfg.autonomous_enabled:
watchlist = Watchlist(_user_dir() / "watchlist.json")
autonomous_monitor = AutonomousMonitor(
watchlist,
data_registry,
Expand All @@ -1059,13 +1062,20 @@ def serve(check_interval: int) -> None:
cooldown_minutes=app_cfg.alert_cooldown_minutes,
)

sessions_dir = _user_dir() / "sessions"
reports_dir = _user_dir() / "reports"

server = Server(
alert_monitor,
task_executor,
notifications,
autonomous_monitor=autonomous_monitor,
telegram_poller=telegram_poller,
tick_interval=1.0,
watchlist=watchlist,
data_registry=data_registry,
sessions_dir=sessions_dir,
reports_dir=reports_dir,
)

def _handle_signal(signum: int, _frame: object) -> None:
Expand All @@ -1085,7 +1095,14 @@ def _handle_signal(signum: int, _frame: object) -> None:
f" cooldown={app_cfg.alert_cooldown_minutes}m"
)
if telegram_poller is not None:
click.echo(" Telegram bot: receiving commands (try /help in chat)")
authorised = len(telegram_poller.allowed_chat_ids)
if authorised > 1:
click.echo(
f" Telegram bot: receiving commands ({authorised} authorised chats; "
"try /help in chat)"
)
else:
click.echo(" Telegram bot: receiving commands (try /help in chat)")
click.echo(" Press Ctrl+C to stop.\n")

try:
Expand Down
26 changes: 24 additions & 2 deletions qracer/notifications/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ def build_notification_registry(
return registry


def _parse_chat_ids(raw: str) -> list[str]:
"""Split a comma-separated chat-id list and drop blanks."""
return [part.strip() for part in raw.split(",") if part.strip()]


def build_telegram_poller(
credentials: dict[str, str],
*,
Expand All @@ -59,11 +64,28 @@ def build_telegram_poller(
The default ``timeout=1`` keeps the long-poll short enough to coexist
with the 1-second :class:`~qracer.server.Server` tick; standalone
callers can pass a larger value (e.g. 30) for true long-polling.

``TELEGRAM_ALLOWED_CHAT_IDS`` (comma-separated, optional) authorises
additional chats — e.g. ``"111,222"`` lets two users talk to the bot.
The primary chat (``TELEGRAM_CHAT_ID``) is always authorised and used as
the default reply target.
"""
bot_token = credentials.get("TELEGRAM_BOT_TOKEN", "")
chat_id = credentials.get("TELEGRAM_CHAT_ID", "")
if not bot_token or not chat_id:
return None
poller = TelegramBotPoller(bot_token=bot_token, chat_id=chat_id, timeout=timeout)
logger.info("Telegram bot command poller initialised")
allowed = _parse_chat_ids(credentials.get("TELEGRAM_ALLOWED_CHAT_IDS", ""))
poller = TelegramBotPoller(
bot_token=bot_token,
chat_id=chat_id,
allowed_chat_ids=allowed or None,
timeout=timeout,
)
if len(poller.allowed_chat_ids) > 1:
logger.info(
"Telegram bot command poller initialised (authorised chats: %d)",
len(poller.allowed_chat_ids),
)
else:
logger.info("Telegram bot command poller initialised")
return poller
154 changes: 127 additions & 27 deletions qracer/notifications/telegram_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import asyncio
import json
import logging
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from collections import deque
from dataclasses import dataclass, field
from typing import Any

logger = logging.getLogger(__name__)
Expand All @@ -27,23 +29,35 @@
# truncation suffixes still fit.
_DEFAULT_MESSAGE_CHAR_LIMIT = 4000

# Default rate-limit: 20 commands per chat per 60 seconds. Balances
# responsiveness for normal use against runaway loops or abuse on a shared
# chat.
_DEFAULT_RATE_LIMIT_COMMANDS = 20
_DEFAULT_RATE_LIMIT_WINDOW_S = 60.0


@dataclass(frozen=True)
class BotCommand:
"""A parsed bot command from a Telegram message.

Example::

BotCommand.parse("/analyze AAPL")
# → BotCommand(action="analyze", args=["AAPL"], raw_text="/analyze AAPL")
BotCommand.parse("/analyze AAPL", chat_id="12345")
# → BotCommand(action="analyze", args=["AAPL"],
# raw_text="/analyze AAPL", chat_id="12345")

``chat_id`` is the sender's chat — callers can use it as the target of
:meth:`TelegramBotPoller.send_reply` so replies go back to whoever asked
(useful when ``allowed_chat_ids`` authorises more than one chat).
"""

action: str
args: list[str]
raw_text: str
chat_id: str = ""

@classmethod
def parse(cls, text: str) -> BotCommand | None:
def parse(cls, text: str, chat_id: str = "") -> BotCommand | None:
"""Parse a Telegram message into a :class:`BotCommand`.

Returns ``None`` if the text is not a recognised command (i.e. does
Expand All @@ -62,61 +76,111 @@ def parse(cls, text: str) -> BotCommand | None:
action = parts[0].split("@", 1)[0].lower()
if not action:
return None
return cls(action=action, args=parts[1:], raw_text=text)
return cls(action=action, args=parts[1:], raw_text=text, chat_id=str(chat_id))


@dataclass
class _RateBucket:
"""Sliding-window counter for a single chat."""

timestamps: deque[float] = field(default_factory=deque)

def admit(self, now: float, limit: int, window: float) -> bool:
"""Return ``True`` if a new command is within limits at ``now``."""
cutoff = now - window
while self.timestamps and self.timestamps[0] <= cutoff:
self.timestamps.popleft()
if len(self.timestamps) >= limit:
return False
self.timestamps.append(now)
return True


class TelegramBotPoller:
"""Receive bot commands from Telegram via the ``getUpdates`` long-poll API.

Tracks the update offset so messages are never returned twice, filters
messages to those originating from the authorised chat, and parses
incoming text into :class:`BotCommand` objects.
messages to those originating from an authorised chat (``chat_id`` plus
any ``allowed_chat_ids``), parses incoming text into :class:`BotCommand`
objects, and enforces a per-chat sliding-window rate limit.

Replies can be sent back to the same chat via :meth:`send_reply`.
Replies can be sent back to any authorised chat via :meth:`send_reply`;
when ``chat_id`` is omitted the primary chat (``self.chat_id``) is used.

Usage::

poller = TelegramBotPoller(bot_token="...", chat_id="123")
poller = TelegramBotPoller(
bot_token="...",
chat_id="123",
allowed_chat_ids=["123", "456"],
)
commands = await poller.poll()
for cmd in commands:
await poller.send_reply(f"Got: {cmd.action}")
await poller.send_reply(f"Got: {cmd.action}", chat_id=cmd.chat_id)
"""

def __init__(
self,
bot_token: str,
chat_id: str,
*,
allowed_chat_ids: list[str] | None = None,
timeout: int = 30,
message_char_limit: int = _DEFAULT_MESSAGE_CHAR_LIMIT,
rate_limit_commands: int = _DEFAULT_RATE_LIMIT_COMMANDS,
rate_limit_window_seconds: float = _DEFAULT_RATE_LIMIT_WINDOW_S,
) -> None:
if not bot_token:
raise ValueError("TELEGRAM_BOT_TOKEN is required but was empty")
if not chat_id:
raise ValueError("TELEGRAM_CHAT_ID is required but was empty")
self._bot_token = bot_token
self._chat_id = str(chat_id)

# Authorised senders. Always include the primary chat_id; merge any
# extras while preserving insertion order and dropping blanks.
authorised: list[str] = [self._chat_id]
for extra in allowed_chat_ids or []:
extra_str = str(extra).strip()
if extra_str and extra_str not in authorised:
authorised.append(extra_str)
self._allowed_chat_ids: tuple[str, ...] = tuple(authorised)

self._timeout = max(0, int(timeout))
self._message_char_limit = message_char_limit
self._offset: int | None = None

if rate_limit_commands < 0:
raise ValueError("rate_limit_commands must be >= 0")
if rate_limit_window_seconds <= 0:
raise ValueError("rate_limit_window_seconds must be > 0")
self._rate_limit_commands = rate_limit_commands
self._rate_limit_window = rate_limit_window_seconds
self._rate_buckets: dict[str, _RateBucket] = {}

@property
def offset(self) -> int | None:
"""Current update offset (``None`` until the first update arrives)."""
return self._offset

@property
def chat_id(self) -> str:
"""The authorised chat ID this poller filters by."""
"""The primary chat ID — default target for :meth:`send_reply`."""
return self._chat_id

@property
def allowed_chat_ids(self) -> tuple[str, ...]:
"""All chat IDs authorised to send commands (primary first)."""
return self._allowed_chat_ids

async def poll(self) -> list[BotCommand]:
"""Long-poll Telegram for new commands.

Returns a list of :class:`BotCommand` parsed from messages that
arrived from the authorised chat. The offset is advanced past
the highest update ID returned, so subsequent calls only return
new messages.
arrived from an authorised chat. The offset is advanced past the
highest update ID returned, so subsequent calls only return new
messages. Commands that exceed the per-chat rate limit are logged
and dropped.

Network and API errors are logged and converted to an empty list
— the caller is expected to retry on the next tick.
Expand Down Expand Up @@ -158,6 +222,7 @@ def _poll_sync(self) -> list[BotCommand]:

commands: list[BotCommand] = []
max_update_id = -1
now = time.monotonic()
for update in payload.get("result", []):
update_id = update.get("update_id")
if isinstance(update_id, int) and update_id > max_update_id:
Expand All @@ -168,39 +233,74 @@ def _poll_sync(self) -> list[BotCommand]:
continue

chat = message.get("chat") or {}
if str(chat.get("id")) != self._chat_id:
logger.debug("Ignoring message from unauthorised chat %s", chat.get("id"))
sender_chat_id = str(chat.get("id"))
if sender_chat_id not in self._allowed_chat_ids:
logger.debug(
"Ignoring message from unauthorised chat %s",
sender_chat_id,
)
continue

text = message.get("text")
if not isinstance(text, str):
continue

cmd = BotCommand.parse(text)
if cmd is not None:
commands.append(cmd)
cmd = BotCommand.parse(text, chat_id=sender_chat_id)
if cmd is None:
continue

if not self._admit(sender_chat_id, now):
logger.warning(
"Rate-limited command from chat %s: /%s",
sender_chat_id,
cmd.action,
)
continue
commands.append(cmd)

if max_update_id >= 0:
self._offset = max_update_id + 1

return commands

async def send_reply(self, text: str) -> bool:
"""Send a plain-text reply to the authorised chat.

Long replies are truncated to ``message_char_limit`` characters
with a trailing ``"..."``. Returns ``True`` on HTTP 200.
def _admit(self, chat_id: str, now: float) -> bool:
"""Return True when this chat is within the sliding-window limit."""
if self._rate_limit_commands == 0:
return False
bucket = self._rate_buckets.get(chat_id)
if bucket is None:
bucket = _RateBucket()
self._rate_buckets[chat_id] = bucket
return bucket.admit(now, self._rate_limit_commands, self._rate_limit_window)

async def send_reply(self, text: str, chat_id: str | None = None) -> bool:
"""Send a plain-text reply.

``chat_id`` defaults to the primary :attr:`chat_id`; pass an explicit
value to reply to a secondary authorised chat (e.g. the sender's
:attr:`BotCommand.chat_id`). Unknown chat IDs fall back to the
primary chat with a warning log.

Long replies are truncated to ``message_char_limit`` characters with
a trailing ``"..."``. Returns ``True`` on HTTP 200.
"""
return await asyncio.to_thread(self._send_reply_sync, text)
target = chat_id if chat_id else self._chat_id
if target not in self._allowed_chat_ids:
logger.warning(
"send_reply called with unauthorised chat %s; falling back to primary",
target,
)
target = self._chat_id
return await asyncio.to_thread(self._send_reply_sync, text, target)

def _send_reply_sync(self, text: str) -> bool:
def _send_reply_sync(self, text: str, chat_id: str) -> bool:
if not text:
return False
if len(text) > self._message_char_limit:
text = text[: self._message_char_limit - 3] + "..."

url = f"{_TELEGRAM_API}/bot{self._bot_token}/sendMessage"
payload = {"chat_id": self._chat_id, "text": text}
payload = {"chat_id": chat_id, "text": text}
data = json.dumps(payload).encode()
req = urllib.request.Request(
url,
Expand Down
Loading
Loading