Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,21 @@ To use topics with your bot, enable them in BotFather:

Message [@userinfobot](https://t.me/userinfobot) on Telegram -- it will reply with your user ID number.

## Viewing Logs

If running as a systemd user service (see [SYSTEMD_SETUP.md](SYSTEMD_SETUP.md)):

```bash
# Live logs (follow mode)
journalctl --user -u claude-telegram-bot -f

# Recent logs (last 50 lines)
journalctl --user -u claude-telegram-bot -n 50

# Check service status
systemctl --user status claude-telegram-bot
```

## Troubleshooting

**Bot doesn't respond:**
Expand Down
1 change: 1 addition & 0 deletions src/bot/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ async def initialize(self) -> None:
builder = Application.builder()
builder.token(self.settings.telegram_token_str)
builder.rate_limiter(AIORateLimiter(max_retries=1))
builder.concurrent_updates(True)

# Configure connection settings
builder.connect_timeout(30)
Expand Down
107 changes: 74 additions & 33 deletions src/bot/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ class MessageOrchestrator:
def __init__(self, settings: Settings, deps: Dict[str, Any]):
self.settings = settings
self.deps = deps
# Per-update thread state for concurrent handler safety.
# telegram.Update uses __slots__ so we cannot set arbitrary attributes.
self._update_thread_states: Dict[int, Dict[str, Any]] = {}

def _inject_deps(self, handler: Callable) -> Callable: # type: ignore[type-arg]
"""Wrap handler to inject dependencies into context.bot_data."""
Expand Down Expand Up @@ -135,7 +138,7 @@ async def wrapped(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await handler(update, context)
finally:
if should_enforce:
self._persist_thread_state(context)
self._persist_thread_state(update, context)

return wrapped

Expand Down Expand Up @@ -202,24 +205,44 @@ async def _apply_thread_routing_context(

context.user_data["current_directory"] = current_dir
context.user_data["claude_session_id"] = state.get("claude_session_id")
context.user_data["_thread_context"] = {
thread_ctx_dict = {
"chat_id": chat.id,
"message_thread_id": message_thread_id,
"state_key": state_key,
"project_slug": project.slug,
"project_root": str(project_root),
"project_name": project.name,
}
context.user_data["_thread_context"] = thread_ctx_dict

# Per-update isolated copy for concurrent handler safety.
# context.user_data is shared per-user, so concurrent handlers for
# different topics would overwrite each other's values.
# Note: telegram.Update uses __slots__, so we store state in a dict
# on the orchestrator instance, keyed by update_id.
self._update_thread_states[update.update_id] = {
"current_directory": current_dir,
"claude_session_id": state.get("claude_session_id"),
"_thread_context": thread_ctx_dict,
}
return True

def _persist_thread_state(self, context: ContextTypes.DEFAULT_TYPE) -> None:
def _persist_thread_state(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Persist compatibility keys back into per-thread state."""
thread_context = context.user_data.get("_thread_context")
# Read from per-update storage to avoid races with concurrent handlers.
_ts = self._update_thread_states.pop(update.update_id, {})
thread_context = _ts.get("_thread_context") or context.user_data.get(
"_thread_context"
)
if not thread_context:
return

project_root = Path(thread_context["project_root"])
current_dir = context.user_data.get("current_directory", project_root)
current_dir = _ts.get("current_directory") or context.user_data.get(
"current_directory", project_root
)
if not isinstance(current_dir, Path):
current_dir = Path(str(current_dir))
current_dir = current_dir.resolve()
Expand All @@ -229,7 +252,8 @@ def _persist_thread_state(self, context: ContextTypes.DEFAULT_TYPE) -> None:
thread_states = context.user_data.setdefault("thread_state", {})
thread_states[thread_context["state_key"]] = {
"current_directory": str(current_dir),
"claude_session_id": context.user_data.get("claude_session_id"),
"claude_session_id": _ts.get("claude_session_id")
or context.user_data.get("claude_session_id"),
"project_slug": thread_context["project_slug"],
}

Expand Down Expand Up @@ -693,6 +717,19 @@ async def agentic_text(
user_id = update.effective_user.id
message_text = update.message.text

# Capture thread-specific state immediately (before any await) to avoid
# race conditions with concurrent handlers for different topics.
_ts = self._update_thread_states.get(update.update_id, {})
current_dir = (
_ts.get("current_directory")
or context.user_data.get("current_directory")
or self.settings.approved_directory
)
session_id = _ts.get("claude_session_id") or context.user_data.get(
"claude_session_id"
)
force_new = bool(context.user_data.get("force_new_session"))

logger.info(
"Agentic text message",
user_id=user_id,
Expand Down Expand Up @@ -720,15 +757,6 @@ async def agentic_text(
)
return

current_dir = context.user_data.get(
"current_directory", self.settings.approved_directory
)
session_id = context.user_data.get("claude_session_id")

# Check if /new was used — skip auto-resume for this first message.
# Flag is only cleared after a successful run so retries keep the intent.
force_new = bool(context.user_data.get("force_new_session"))

# --- Verbose progress tracking via stream callback ---
tool_log: List[Dict[str, Any]] = []
start_time = time.time()
Expand All @@ -755,6 +783,9 @@ async def agentic_text(
context.user_data["force_new_session"] = False

context.user_data["claude_session_id"] = claude_response.session_id
# Also persist to per-update storage for _persist_thread_state
if update.update_id in self._update_thread_states:
self._update_thread_states[update.update_id]["claude_session_id"] = claude_response.session_id

# Track directory changes
from .handlers.message import _update_working_directory_from_claude_response
Expand Down Expand Up @@ -852,6 +883,18 @@ async def agentic_document(
user_id = update.effective_user.id
document = update.message.document

# Capture thread-specific state immediately (before any await).
_ts = self._update_thread_states.get(update.update_id, {})
current_dir = (
_ts.get("current_directory")
or context.user_data.get("current_directory")
or self.settings.approved_directory
)
session_id = _ts.get("claude_session_id") or context.user_data.get(
"claude_session_id"
)
force_new = bool(context.user_data.get("force_new_session"))

logger.info(
"Agentic document upload",
user_id=user_id,
Expand Down Expand Up @@ -920,15 +963,6 @@ async def agentic_document(
)
return

current_dir = context.user_data.get(
"current_directory", self.settings.approved_directory
)
session_id = context.user_data.get("claude_session_id")

# Check if /new was used — skip auto-resume for this first message.
# Flag is only cleared after a successful run so retries keep the intent.
force_new = bool(context.user_data.get("force_new_session"))

verbose_level = self._get_verbose_level(context)
tool_log: List[Dict[str, Any]] = []
on_stream = self._make_stream_callback(
Expand All @@ -950,6 +984,8 @@ async def agentic_document(
context.user_data["force_new_session"] = False

context.user_data["claude_session_id"] = claude_response.session_id
if update.update_id in self._update_thread_states:
self._update_thread_states[update.update_id]["claude_session_id"] = claude_response.session_id

from .handlers.message import _update_working_directory_from_claude_response

Expand Down Expand Up @@ -990,6 +1026,18 @@ async def agentic_photo(
"""Process photo -> Claude, minimal chrome."""
user_id = update.effective_user.id

# Capture thread-specific state immediately (before any await).
_ts = self._update_thread_states.get(update.update_id, {})
current_dir = (
_ts.get("current_directory")
or context.user_data.get("current_directory")
or self.settings.approved_directory
)
session_id = _ts.get("claude_session_id") or context.user_data.get(
"claude_session_id"
)
force_new = bool(context.user_data.get("force_new_session"))

features = context.bot_data.get("features")
image_handler = features.get_image_handler() if features else None

Expand All @@ -1014,15 +1062,6 @@ async def agentic_photo(
)
return

current_dir = context.user_data.get(
"current_directory", self.settings.approved_directory
)
session_id = context.user_data.get("claude_session_id")

# Check if /new was used — skip auto-resume for this first message.
# Flag is only cleared after a successful run so retries keep the intent.
force_new = bool(context.user_data.get("force_new_session"))

verbose_level = self._get_verbose_level(context)
tool_log: List[Dict[str, Any]] = []
on_stream = self._make_stream_callback(
Expand All @@ -1046,6 +1085,8 @@ async def agentic_photo(
context.user_data["force_new_session"] = False

context.user_data["claude_session_id"] = claude_response.session_id
if update.update_id in self._update_thread_states:
self._update_thread_states[update.update_id]["claude_session_id"] = claude_response.session_id

from .utils.formatting import ResponseFormatter

Expand Down
45 changes: 33 additions & 12 deletions src/claude/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
# Subdirectories under ~/.claude/ that Claude Code uses internally.
# File operations targeting these paths are allowed even when they fall
# outside the project's approved directory.
_CLAUDE_INTERNAL_SUBDIRS: Set[str] = {"plans", "todos", "settings.json"}
_CLAUDE_INTERNAL_SUBDIRS: Set[str] = {
"plans",
"todos",
"projects",
"tasks",
"settings.json",
}

logger = structlog.get_logger()

Expand Down Expand Up @@ -162,30 +168,45 @@ def check_bash_directory_boundary(


def _is_claude_internal_path(file_path: str) -> bool:
"""Check whether *file_path* points inside the ``~/.claude/`` directory.
"""Check whether *file_path* points inside a Claude Code internal directory.

Claude Code keeps internal state (plan-mode drafts, todo lists, etc.)
under ``$HOME/.claude/``. These paths are outside the project's
``approved_directory`` but are safe to read/write because they are
controlled entirely by Claude Code itself.
Claude Code keeps internal state (plan-mode drafts, todo lists,
compressed tool results, task agent outputs, etc.) under
``$HOME/.claude/`` and ``/tmp/claude-<uid>/``. These paths are
outside the project's ``approved_directory`` but are safe to
read/write because they are controlled entirely by Claude Code itself.

Only the specific subdirectories listed in ``_CLAUDE_INTERNAL_SUBDIRS``
are allowed; arbitrary files directly under ``~/.claude/`` are not.
are allowed under ``~/.claude/``; arbitrary files directly under
``~/.claude/`` are not. All paths under ``/tmp/claude-<uid>/`` are
allowed (Claude Code uses this for Task agent output files).
"""
try:
resolved = Path(file_path).resolve()

# Check ~/.claude/ internal paths
home = Path.home().resolve()
claude_dir = home / ".claude"

# Path must be inside ~/.claude/
try:
rel = resolved.relative_to(claude_dir)
# Must be in one of the known subdirectories (or a known file)
top_part = rel.parts[0] if rel.parts else ""
return top_part in _CLAUDE_INTERNAL_SUBDIRS
except ValueError:
return False
pass

# Must be in one of the known subdirectories (or a known file)
top_part = rel.parts[0] if rel.parts else ""
return top_part in _CLAUDE_INTERNAL_SUBDIRS
# Check /tmp/claude-<uid>/ paths (Task agent output files)
import os

tmp_claude_dir = Path(f"/tmp/claude-{os.getuid()}")
try:
resolved.relative_to(tmp_claude_dir)
return True
except ValueError:
pass

return False

except Exception:
return False
Expand Down
Loading