diff --git a/README.md b/README.md index 559e19e6..4061852f 100644 --- a/README.md +++ b/README.md @@ -295,6 +295,21 @@ To use topics with your bot, enable them in BotFather: Message [@userinfobot](https://t.me/userinfobot) on Telegram -- it will reply with your user ID number. +## Viewing Logs + +If running as a systemd user service (see [SYSTEMD_SETUP.md](SYSTEMD_SETUP.md)): + +```bash +# Live logs (follow mode) +journalctl --user -u claude-telegram-bot -f + +# Recent logs (last 50 lines) +journalctl --user -u claude-telegram-bot -n 50 + +# Check service status +systemctl --user status claude-telegram-bot +``` + ## Troubleshooting **Bot doesn't respond:** diff --git a/src/bot/core.py b/src/bot/core.py index e617fb0a..4cd1296c 100644 --- a/src/bot/core.py +++ b/src/bot/core.py @@ -51,6 +51,7 @@ async def initialize(self) -> None: builder = Application.builder() builder.token(self.settings.telegram_token_str) builder.rate_limiter(AIORateLimiter(max_retries=1)) + builder.concurrent_updates(True) # Configure connection settings builder.connect_timeout(30) diff --git a/src/bot/orchestrator.py b/src/bot/orchestrator.py index 0df4e1cc..70477a10 100644 --- a/src/bot/orchestrator.py +++ b/src/bot/orchestrator.py @@ -103,6 +103,9 @@ class MessageOrchestrator: def __init__(self, settings: Settings, deps: Dict[str, Any]): self.settings = settings self.deps = deps + # Per-update thread state for concurrent handler safety. + # telegram.Update uses __slots__ so we cannot set arbitrary attributes. + self._update_thread_states: Dict[int, Dict[str, Any]] = {} def _inject_deps(self, handler: Callable) -> Callable: # type: ignore[type-arg] """Wrap handler to inject dependencies into context.bot_data.""" @@ -135,7 +138,7 @@ async def wrapped(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: await handler(update, context) finally: if should_enforce: - self._persist_thread_state(context) + self._persist_thread_state(update, context) return wrapped @@ -202,7 +205,7 @@ async def _apply_thread_routing_context( context.user_data["current_directory"] = current_dir context.user_data["claude_session_id"] = state.get("claude_session_id") - context.user_data["_thread_context"] = { + thread_ctx_dict = { "chat_id": chat.id, "message_thread_id": message_thread_id, "state_key": state_key, @@ -210,16 +213,36 @@ async def _apply_thread_routing_context( "project_root": str(project_root), "project_name": project.name, } + context.user_data["_thread_context"] = thread_ctx_dict + + # Per-update isolated copy for concurrent handler safety. + # context.user_data is shared per-user, so concurrent handlers for + # different topics would overwrite each other's values. + # Note: telegram.Update uses __slots__, so we store state in a dict + # on the orchestrator instance, keyed by update_id. + self._update_thread_states[update.update_id] = { + "current_directory": current_dir, + "claude_session_id": state.get("claude_session_id"), + "_thread_context": thread_ctx_dict, + } return True - def _persist_thread_state(self, context: ContextTypes.DEFAULT_TYPE) -> None: + def _persist_thread_state( + self, update: Update, context: ContextTypes.DEFAULT_TYPE + ) -> None: """Persist compatibility keys back into per-thread state.""" - thread_context = context.user_data.get("_thread_context") + # Read from per-update storage to avoid races with concurrent handlers. + _ts = self._update_thread_states.pop(update.update_id, {}) + thread_context = _ts.get("_thread_context") or context.user_data.get( + "_thread_context" + ) if not thread_context: return project_root = Path(thread_context["project_root"]) - current_dir = context.user_data.get("current_directory", project_root) + current_dir = _ts.get("current_directory") or context.user_data.get( + "current_directory", project_root + ) if not isinstance(current_dir, Path): current_dir = Path(str(current_dir)) current_dir = current_dir.resolve() @@ -229,7 +252,8 @@ def _persist_thread_state(self, context: ContextTypes.DEFAULT_TYPE) -> None: thread_states = context.user_data.setdefault("thread_state", {}) thread_states[thread_context["state_key"]] = { "current_directory": str(current_dir), - "claude_session_id": context.user_data.get("claude_session_id"), + "claude_session_id": _ts.get("claude_session_id") + or context.user_data.get("claude_session_id"), "project_slug": thread_context["project_slug"], } @@ -693,6 +717,19 @@ async def agentic_text( user_id = update.effective_user.id message_text = update.message.text + # Capture thread-specific state immediately (before any await) to avoid + # race conditions with concurrent handlers for different topics. + _ts = self._update_thread_states.get(update.update_id, {}) + current_dir = ( + _ts.get("current_directory") + or context.user_data.get("current_directory") + or self.settings.approved_directory + ) + session_id = _ts.get("claude_session_id") or context.user_data.get( + "claude_session_id" + ) + force_new = bool(context.user_data.get("force_new_session")) + logger.info( "Agentic text message", user_id=user_id, @@ -720,15 +757,6 @@ async def agentic_text( ) return - current_dir = context.user_data.get( - "current_directory", self.settings.approved_directory - ) - session_id = context.user_data.get("claude_session_id") - - # Check if /new was used — skip auto-resume for this first message. - # Flag is only cleared after a successful run so retries keep the intent. - force_new = bool(context.user_data.get("force_new_session")) - # --- Verbose progress tracking via stream callback --- tool_log: List[Dict[str, Any]] = [] start_time = time.time() @@ -755,6 +783,9 @@ async def agentic_text( context.user_data["force_new_session"] = False context.user_data["claude_session_id"] = claude_response.session_id + # Also persist to per-update storage for _persist_thread_state + if update.update_id in self._update_thread_states: + self._update_thread_states[update.update_id]["claude_session_id"] = claude_response.session_id # Track directory changes from .handlers.message import _update_working_directory_from_claude_response @@ -852,6 +883,18 @@ async def agentic_document( user_id = update.effective_user.id document = update.message.document + # Capture thread-specific state immediately (before any await). + _ts = self._update_thread_states.get(update.update_id, {}) + current_dir = ( + _ts.get("current_directory") + or context.user_data.get("current_directory") + or self.settings.approved_directory + ) + session_id = _ts.get("claude_session_id") or context.user_data.get( + "claude_session_id" + ) + force_new = bool(context.user_data.get("force_new_session")) + logger.info( "Agentic document upload", user_id=user_id, @@ -920,15 +963,6 @@ async def agentic_document( ) return - current_dir = context.user_data.get( - "current_directory", self.settings.approved_directory - ) - session_id = context.user_data.get("claude_session_id") - - # Check if /new was used — skip auto-resume for this first message. - # Flag is only cleared after a successful run so retries keep the intent. - force_new = bool(context.user_data.get("force_new_session")) - verbose_level = self._get_verbose_level(context) tool_log: List[Dict[str, Any]] = [] on_stream = self._make_stream_callback( @@ -950,6 +984,8 @@ async def agentic_document( context.user_data["force_new_session"] = False context.user_data["claude_session_id"] = claude_response.session_id + if update.update_id in self._update_thread_states: + self._update_thread_states[update.update_id]["claude_session_id"] = claude_response.session_id from .handlers.message import _update_working_directory_from_claude_response @@ -990,6 +1026,18 @@ async def agentic_photo( """Process photo -> Claude, minimal chrome.""" user_id = update.effective_user.id + # Capture thread-specific state immediately (before any await). + _ts = self._update_thread_states.get(update.update_id, {}) + current_dir = ( + _ts.get("current_directory") + or context.user_data.get("current_directory") + or self.settings.approved_directory + ) + session_id = _ts.get("claude_session_id") or context.user_data.get( + "claude_session_id" + ) + force_new = bool(context.user_data.get("force_new_session")) + features = context.bot_data.get("features") image_handler = features.get_image_handler() if features else None @@ -1014,15 +1062,6 @@ async def agentic_photo( ) return - current_dir = context.user_data.get( - "current_directory", self.settings.approved_directory - ) - session_id = context.user_data.get("claude_session_id") - - # Check if /new was used — skip auto-resume for this first message. - # Flag is only cleared after a successful run so retries keep the intent. - force_new = bool(context.user_data.get("force_new_session")) - verbose_level = self._get_verbose_level(context) tool_log: List[Dict[str, Any]] = [] on_stream = self._make_stream_callback( @@ -1046,6 +1085,8 @@ async def agentic_photo( context.user_data["force_new_session"] = False context.user_data["claude_session_id"] = claude_response.session_id + if update.update_id in self._update_thread_states: + self._update_thread_states[update.update_id]["claude_session_id"] = claude_response.session_id from .utils.formatting import ResponseFormatter diff --git a/src/claude/monitor.py b/src/claude/monitor.py index ada3b777..19413627 100644 --- a/src/claude/monitor.py +++ b/src/claude/monitor.py @@ -14,7 +14,13 @@ # Subdirectories under ~/.claude/ that Claude Code uses internally. # File operations targeting these paths are allowed even when they fall # outside the project's approved directory. -_CLAUDE_INTERNAL_SUBDIRS: Set[str] = {"plans", "todos", "settings.json"} +_CLAUDE_INTERNAL_SUBDIRS: Set[str] = { + "plans", + "todos", + "projects", + "tasks", + "settings.json", +} logger = structlog.get_logger() @@ -162,30 +168,45 @@ def check_bash_directory_boundary( def _is_claude_internal_path(file_path: str) -> bool: - """Check whether *file_path* points inside the ``~/.claude/`` directory. + """Check whether *file_path* points inside a Claude Code internal directory. - Claude Code keeps internal state (plan-mode drafts, todo lists, etc.) - under ``$HOME/.claude/``. These paths are outside the project's - ``approved_directory`` but are safe to read/write because they are - controlled entirely by Claude Code itself. + Claude Code keeps internal state (plan-mode drafts, todo lists, + compressed tool results, task agent outputs, etc.) under + ``$HOME/.claude/`` and ``/tmp/claude-/``. These paths are + outside the project's ``approved_directory`` but are safe to + read/write because they are controlled entirely by Claude Code itself. Only the specific subdirectories listed in ``_CLAUDE_INTERNAL_SUBDIRS`` - are allowed; arbitrary files directly under ``~/.claude/`` are not. + are allowed under ``~/.claude/``; arbitrary files directly under + ``~/.claude/`` are not. All paths under ``/tmp/claude-/`` are + allowed (Claude Code uses this for Task agent output files). """ try: resolved = Path(file_path).resolve() + + # Check ~/.claude/ internal paths home = Path.home().resolve() claude_dir = home / ".claude" - # Path must be inside ~/.claude/ try: rel = resolved.relative_to(claude_dir) + # Must be in one of the known subdirectories (or a known file) + top_part = rel.parts[0] if rel.parts else "" + return top_part in _CLAUDE_INTERNAL_SUBDIRS except ValueError: - return False + pass - # Must be in one of the known subdirectories (or a known file) - top_part = rel.parts[0] if rel.parts else "" - return top_part in _CLAUDE_INTERNAL_SUBDIRS + # Check /tmp/claude-/ paths (Task agent output files) + import os + + tmp_claude_dir = Path(f"/tmp/claude-{os.getuid()}") + try: + resolved.relative_to(tmp_claude_dir) + return True + except ValueError: + pass + + return False except Exception: return False