Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/bot/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ async def initialize(self) -> None:
builder.token(self.settings.telegram_token_str)
builder.defaults(Defaults(do_quote=self.settings.reply_quote))
builder.rate_limiter(AIORateLimiter(max_retries=1))
# Allow concurrent update processing so follow-up messages
# can interrupt a running Claude task
builder.concurrent_updates(True)

# Configure connection settings
builder.connect_timeout(30)
Expand Down
8 changes: 4 additions & 4 deletions src/bot/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,13 @@ async def handle_text_message(

# Enhanced stream updates handler with progress tracking
async def stream_handler(update_obj):
# Intercept send_image_to_user MCP tool calls.
# Intercept send_file_to_user / send_image_to_user MCP tool calls.
# The SDK namespaces MCP tools as "mcp__<server>__<tool>".
if update_obj.tool_calls:
for tc in update_obj.tool_calls:
tc_name = tc.get("name", "")
if tc_name == "send_image_to_user" or tc_name.endswith(
"__send_image_to_user"
if tc_name in ("send_file_to_user", "send_image_to_user") or tc_name.endswith(
("__send_file_to_user", "__send_image_to_user")
):
tc_input = tc.get("input", {})
file_path = tc_input.get("file_path", "")
Expand Down Expand Up @@ -439,7 +439,7 @@ async def stream_handler(update_obj):
# Delete progress message
await progress_msg.delete()

# Use MCP-collected images (from send_image_to_user tool calls)
# Use MCP-collected files (from send_file_to_user tool calls)
images: list[ImageAttachment] = mcp_images

# Try to combine text + images when response fits in a caption
Expand Down
643 changes: 467 additions & 176 deletions src/bot/orchestrator.py

Large diffs are not rendered by default.

209 changes: 186 additions & 23 deletions src/bot/utils/draft_streamer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
"""Stream partial responses to Telegram via sendMessageDraft."""
"""Stream partial responses to Telegram via sendMessageDraft.

Uses Telegram Bot API 9.3+ sendMessageDraft for smooth token-by-token
streaming in private chats. Falls back to editMessageText for group chats
where sendMessageDraft is unavailable.
"""

import secrets
import time
Expand All @@ -14,6 +19,17 @@
# Max tool lines shown in the draft header
_MAX_TOOL_LINES = 10

# Minimum characters before sending the first draft (avoids triggering
# push notifications with just a few characters)
_MIN_INITIAL_CHARS = 20

# Error messages that indicate the draft transport is unavailable
_DRAFT_UNAVAILABLE_ERRORS = frozenset({
"TEXTDRAFT_PEER_INVALID",
"Bad Request: draft can't be sent",
"Bad Request: peer doesn't support drafts",
})


def generate_draft_id() -> int:
"""Generate a non-zero positive draft ID.
Expand All @@ -30,18 +46,21 @@ class DraftStreamer:
The draft is composed of two sections:

1. **Tool header** β€” compact lines showing tool calls and reasoning
snippets as they arrive, e.g. ``"πŸ“– Read | πŸ” Grep | 🐚 Bash"``.
snippets as they arrive.
2. **Response body** β€” the actual assistant response text, streamed
token-by-token.

Both sections are combined into a single draft message and sent via
``sendMessageDraft``.
``sendMessageDraft`` (private chats) or ``editMessageText`` (groups).

Key design decisions:
Key design decisions (inspired by OpenClaw):
- Plain text drafts (no parse_mode) to avoid partial HTML/markdown errors.
- Tail-truncation for messages >4096 chars: shows ``"\\u2026" + last 4093 chars``.
- Self-disabling: any API error silently disables the streamer so the
request continues with normal (non-streaming) delivery.
- Tail-truncation for messages >4096 chars.
- Min initial chars: waits for ~20 chars before first send.
- Anti-regressive: skips updates where text got shorter.
- Error classification: distinguishes draft-unavailable (fall back to edit)
from other errors (disable entirely).
- Self-disabling: persistent errors silently disable the streamer.
"""

def __init__(
Expand All @@ -50,7 +69,8 @@ def __init__(
chat_id: int,
draft_id: int,
message_thread_id: Optional[int] = None,
throttle_interval: float = 0.3,
throttle_interval: float = 0.4,
is_private_chat: bool = True,
) -> None:
self.bot = bot
self.chat_id = chat_id
Expand All @@ -61,7 +81,18 @@ def __init__(
self._tool_lines: List[str] = []
self._accumulated_text = ""
self._last_send_time = 0.0
self._last_sent_length = 0 # anti-regressive tracking
self._enabled = True
self._error_count = 0
self._max_errors = 3

# Transport mode: "draft" for private chats, "edit" for groups
self._use_draft = is_private_chat
self._edit_message_id: Optional[int] = None # for edit-based transport

@property
def enabled(self) -> bool:
return self._enabled

async def append_tool(self, line: str) -> None:
"""Append a tool activity line and send a draft if throttled."""
Expand All @@ -81,16 +112,24 @@ async def append_text(self, text: str) -> None:
if (now - self._last_send_time) >= self.throttle_interval:
await self._send_draft()

def reset_text(self) -> None:
"""Reset accumulated text (call after text was shown via a πŸ’¬ message)."""
self._accumulated_text = ""

async def flush(self) -> None:
"""Force-send the current accumulated text as a draft."""
if not self._enabled:
return
if not self._accumulated_text and not self._tool_lines:
return
await self._send_draft()
await self._send_draft(force=True)

def _compose_draft(self, is_final: bool = False) -> str:
"""Combine tool header and response body into a single draft.

def _compose_draft(self) -> str:
"""Combine tool header and response body into a single draft."""
Appends a blinking cursor β–Œ during streaming (like OpenClaw)
to indicate the response is still being generated.
"""
parts: List[str] = []

if self._tool_lines:
Expand All @@ -103,33 +142,157 @@ def _compose_draft(self) -> str:
if self._accumulated_text:
if parts:
parts.append("") # blank separator line
parts.append(self._accumulated_text)
text = self._accumulated_text
if not is_final:
text += " β–Œ"
parts.append(text)

return "\n".join(parts)

async def _send_draft(self) -> None:
"""Send the composed draft (tools + text) as a message draft."""
async def _send_draft(self, force: bool = False) -> None:
"""Send the composed draft via the appropriate transport."""
draft_text = self._compose_draft()
if not draft_text.strip():
return

# Min initial chars gate (skip if force-flushing)
if not force and self._last_sent_length == 0:
if len(self._accumulated_text) < _MIN_INITIAL_CHARS and not self._tool_lines:
return

# Anti-regressive: skip if text got shorter (can happen with
# tool header rotation)
current_len = len(draft_text)
if not force and current_len < self._last_sent_length:
return

# Tail-truncate if over Telegram limit
if len(draft_text) > TELEGRAM_MAX_MESSAGE_LENGTH:
draft_text = "\u2026" + draft_text[-(TELEGRAM_MAX_MESSAGE_LENGTH - 1) :]
draft_text = "\u2026" + draft_text[-(TELEGRAM_MAX_MESSAGE_LENGTH - 1):]

try:
if self._use_draft:
await self._send_via_draft(draft_text)
else:
await self._send_via_edit(draft_text)
self._last_send_time = time.time()
self._last_sent_length = current_len
self._error_count = 0 # reset on success
except telegram.error.BadRequest as e:
error_str = str(e)
if any(err in error_str for err in _DRAFT_UNAVAILABLE_ERRORS):
# Draft transport unavailable β€” fall back to edit
logger.info(
"Draft transport unavailable, falling back to edit",
chat_id=self.chat_id,
error=error_str,
)
self._use_draft = False
# Retry immediately with edit transport
try:
await self._send_via_edit(draft_text)
self._last_send_time = time.time()
self._last_sent_length = current_len
except Exception:
self._handle_error()
elif "Message is not modified" in error_str:
# Same content β€” not an error, just skip
self._last_send_time = time.time()
elif "Message to edit not found" in error_str:
# Message was deleted β€” re-create
self._edit_message_id = None
try:
await self._send_via_edit(draft_text)
self._last_send_time = time.time()
self._last_sent_length = current_len
except Exception:
self._handle_error()
else:
self._handle_error()
except Exception:
self._handle_error()

def _handle_error(self) -> None:
"""Track errors and disable after too many."""
self._error_count += 1
if self._error_count >= self._max_errors:
logger.debug(
"Draft streamer disabled after repeated errors",
chat_id=self.chat_id,
error_count=self._error_count,
)
self._enabled = False

async def _send_via_draft(self, text: str) -> None:
"""Send via sendMessageDraft (private chats)."""
kwargs = {
"chat_id": self.chat_id,
"text": text,
"draft_id": self.draft_id,
}
if self.message_thread_id is not None:
kwargs["message_thread_id"] = self.message_thread_id
logger.debug(
"Sending draft",
transport="draft",
text_len=len(text),
preview=text[:80],
)
await self.bot.send_message_draft(**kwargs)

async def _send_via_edit(self, text: str) -> None:
"""Send via editMessageText (group chat fallback).

Creates a message on first call, then edits it on subsequent calls.
"""
if self._edit_message_id is None:
# Send initial message
kwargs = {
"chat_id": self.chat_id,
"text": draft_text,
"draft_id": self.draft_id,
"text": text,
}
if self.message_thread_id is not None:
kwargs["message_thread_id"] = self.message_thread_id
await self.bot.send_message_draft(**kwargs)
self._last_send_time = time.time()
except Exception:
logger.debug(
"Draft send failed, disabling streamer",
msg = await self.bot.send_message(**kwargs)
self._edit_message_id = msg.message_id
else:
await self.bot.edit_message_text(
text,
chat_id=self.chat_id,
message_id=self._edit_message_id,
)
self._enabled = False

async def clear(self) -> None:
"""Clear the draft bubble by sending an empty draft.

Call this before sending the final response message so the draft
bubble disappears cleanly instead of overlapping with the real message.
"""
if not self._enabled:
return
try:
if self._use_draft:
# Send empty draft to dismiss the typing bubble
await self.bot.send_message_draft(
chat_id=self.chat_id,
text="",
draft_id=self.draft_id,
)
elif self._edit_message_id is not None:
# For edit-based transport, delete the preview message
try:
await self.bot.delete_message(
chat_id=self.chat_id,
message_id=self._edit_message_id,
)
except Exception:
pass
self._edit_message_id = None
except Exception:
pass
self._enabled = False

@property
def edit_message_id(self) -> Optional[int]:
"""Return the message ID used by edit transport (for cleanup)."""
return self._edit_message_id
Loading