diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e5df1eb..abb45dff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- **`/schedule` command**: Create, list, pause, resume, and remove scheduled jobs directly from Telegram. Auto-populates chat, directory, and user from context. Requires `ENABLE_SCHEDULER=true` (#150) + ## [1.5.0] - 2026-03-04 ### Added diff --git a/CLAUDE.md b/CLAUDE.md index 0917d335..66053152 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -122,7 +122,7 @@ All datetimes use timezone-aware UTC: `datetime.now(UTC)` (not `datetime.utcnow( ### Agentic mode -Agentic mode commands: `/start`, `/new`, `/status`, `/verbose`, `/repo`. If `ENABLE_PROJECT_THREADS=true`: `/sync_threads`. To add a new command: +Agentic mode commands: `/start`, `/new`, `/status`, `/verbose`, `/repo`. If `ENABLE_PROJECT_THREADS=true`: `/sync_threads`. If `ENABLE_SCHEDULER=true`: `/schedule`. To add a new command: 1. Add handler function in `src/bot/orchestrator.py` 2. Register in `MessageOrchestrator._register_agentic_handlers()` diff --git a/README.md b/README.md index 34ca52d3..700ba0f2 100644 --- a/README.md +++ b/README.md @@ -99,7 +99,7 @@ The bot supports two interaction modes: The default conversational mode. Just talk to Claude naturally -- no special commands required. **Commands:** `/start`, `/new`, `/status`, `/verbose`, `/repo` -If `ENABLE_PROJECT_THREADS=true`: `/sync_threads` +If `ENABLE_PROJECT_THREADS=true`: `/sync_threads` | If `ENABLE_SCHEDULER=true`: `/schedule` ``` You: What files are in this project? @@ -157,8 +157,8 @@ Use `/repo` to list cloned repos in your workspace, or `/repo ` to switch Set `AGENTIC_MODE=false` to enable the full 13-command terminal-like interface with directory navigation, inline keyboards, quick actions, git integration, and session export. -**Commands:** `/start`, `/help`, `/new`, `/continue`, `/end`, `/status`, `/cd`, `/ls`, `/pwd`, `/projects`, `/export`, `/actions`, `/git` -If `ENABLE_PROJECT_THREADS=true`: `/sync_threads` +**Commands:** `/start`, `/help`, `/new`, `/continue`, `/end`, `/status`, `/cd`, `/ls`, `/pwd`, `/projects`, `/export`, `/actions`, `/git` +If `ENABLE_PROJECT_THREADS=true`: `/sync_threads` | If `ENABLE_SCHEDULER=true`: `/schedule` ``` You: /cd my-web-app @@ -176,7 +176,7 @@ Bot: [Run Tests] [Install Deps] [Format Code] [Run Linter] Beyond direct chat, the bot can respond to external triggers: - **Webhooks** -- Receive GitHub events (push, PR, issues) and route them through Claude for automated summaries or code review -- **Scheduler** -- Run recurring Claude tasks on a cron schedule (e.g., daily code health checks) +- **Scheduler** -- Run recurring Claude tasks on a cron schedule (e.g., daily code health checks). Manage jobs directly from Telegram with `/schedule` - **Notifications** -- Deliver agent responses to configured Telegram chats Enable with `ENABLE_API_SERVER=true` and `ENABLE_SCHEDULER=true`. See [docs/setup.md](docs/setup.md) for configuration. diff --git a/docs/setup.md b/docs/setup.md index 5b0670bb..5efc4011 100644 --- a/docs/setup.md +++ b/docs/setup.md @@ -173,7 +173,23 @@ ENABLE_SCHEDULER=true NOTIFICATION_CHAT_IDS=123456789 # Where to deliver results ``` -Jobs are managed programmatically and persist in the SQLite database. +Jobs persist in the SQLite database and survive bot restarts. Manage them directly from Telegram with the `/schedule` command: + +``` +/schedule list # List all jobs (active + paused) +/schedule add +/schedule remove # Remove a job +/schedule pause # Pause without deleting +/schedule resume # Resume a paused job +``` + +Example -- create a job that runs every weekday at 9 AM: + +``` +/schedule add daily-report 0 9 * * 1-5 Summarize yesterday's git commits +``` + +The `chat_id`, `working_directory`, and `created_by` fields are auto-populated from your current Telegram context. Results are delivered to the chat where the job was created. ### Voice Message Transcription diff --git a/src/bot/handlers/schedule.py b/src/bot/handlers/schedule.py new file mode 100644 index 00000000..19af84e6 --- /dev/null +++ b/src/bot/handlers/schedule.py @@ -0,0 +1,200 @@ +"""Schedule command handler for managing scheduled jobs via Telegram.""" + +from typing import List + +import structlog +from telegram import Update +from telegram.ext import ContextTypes + +from ...scheduler.scheduler import JobScheduler +from ..utils.html_format import escape_html + +logger = structlog.get_logger() + + +def _get_scheduler(context: ContextTypes.DEFAULT_TYPE) -> JobScheduler | None: + """Get the JobScheduler from bot dependencies.""" + return context.bot_data.get("scheduler") + + +async def schedule_command( + update: Update, context: ContextTypes.DEFAULT_TYPE +) -> None: + """Handle /schedule command with subcommands. + + Usage: + /schedule list + /schedule add + /schedule remove + /schedule pause + /schedule resume + """ + scheduler = _get_scheduler(context) + if not scheduler: + await update.message.reply_text( + "Scheduler is not enabled. Set ENABLE_SCHEDULER=true to use this command." + ) + return + + args: List[str] = context.args or [] + + if not args: + await _show_usage(update) + return + + subcommand = args[0].lower() + sub_args = args[1:] + + if subcommand == "list": + await _handle_list(update, scheduler) + elif subcommand == "add": + await _handle_add(update, context, scheduler, sub_args) + elif subcommand == "remove": + await _handle_remove(update, scheduler, sub_args) + elif subcommand == "pause": + await _handle_pause(update, scheduler, sub_args) + elif subcommand == "resume": + await _handle_resume(update, scheduler, sub_args) + else: + await _show_usage(update) + + +async def _show_usage(update: Update) -> None: + """Show usage information.""" + await update.message.reply_html( + "Usage:\n" + "/schedule list\n" + "/schedule add <name> <cron> <prompt>\n" + "/schedule remove <job_id>\n" + "/schedule pause <job_id>\n" + "/schedule resume <job_id>\n\n" + "Cron format: min hour day month weekday\n" + "Example: /schedule add daily-report 0 9 * * * Check status" + ) + + +async def _handle_list(update: Update, scheduler: JobScheduler) -> None: + """List all scheduled jobs.""" + jobs = await scheduler.list_jobs(include_paused=True) + + if not jobs: + await update.message.reply_text("No scheduled jobs.") + return + + lines = ["Scheduled Jobs:\n"] + for job in jobs: + status = "active" if job.get("is_active") else "paused" + name = escape_html(job.get("job_name", "?")) + cron = escape_html(job.get("cron_expression", "?")) + job_id = escape_html(str(job.get("job_id", "?"))) + lines.append( + f"{name} [{status}]\n" + f" Cron: {cron}\n" + f" ID: {job_id}" + ) + + await update.message.reply_html("\n\n".join(lines)) + + +async def _handle_add( + update: Update, + context: ContextTypes.DEFAULT_TYPE, + scheduler: JobScheduler, + args: List[str], +) -> None: + """Add a new scheduled job. + + Expected args: + The 5 cron fields are joined into a single cron expression. + """ + # Need at least: name + 5 cron fields + 1 prompt word = 7 + if len(args) < 7: + await update.message.reply_html( + "Usage: /schedule add <name> " + "<min> <hour> <day> <month> <weekday> " + "<prompt>\n\n" + "Example: /schedule add daily-report 0 9 * * * Check status" + ) + return + + job_name = args[0] + cron_expression = " ".join(args[1:6]) + prompt = " ".join(args[6:]) + + # Auto-populate fields from context + chat_id = update.effective_chat.id + user_id = update.effective_user.id + settings = context.bot_data.get("settings") + working_dir = context.user_data.get( + "current_directory", + settings.approved_directory if settings else None, + ) + + try: + job_id = await scheduler.add_job( + job_name=job_name, + cron_expression=cron_expression, + prompt=prompt, + target_chat_ids=[chat_id], + working_directory=working_dir, + created_by=user_id, + ) + await update.message.reply_html( + f"Job {escape_html(job_name)} created.\n" + f"ID: {escape_html(job_id)}\n" + f"Cron: {escape_html(cron_expression)}" + ) + except Exception as e: + logger.exception("Failed to add scheduled job", error=str(e)) + await update.message.reply_text(f"Failed to create job: {e}") + + +async def _handle_remove( + update: Update, scheduler: JobScheduler, args: List[str] +) -> None: + """Remove a scheduled job.""" + if not args: + await update.message.reply_text("Usage: /schedule remove ") + return + + job_id = args[0] + await scheduler.remove_job(job_id) + await update.message.reply_html( + f"Job {escape_html(job_id)} removed." + ) + + +async def _handle_pause( + update: Update, scheduler: JobScheduler, args: List[str] +) -> None: + """Pause a scheduled job.""" + if not args: + await update.message.reply_text("Usage: /schedule pause ") + return + + job_id = args[0] + success = await scheduler.pause_job(job_id) + if success: + await update.message.reply_html( + f"Job {escape_html(job_id)} paused." + ) + else: + await update.message.reply_text(f"Job '{job_id}' not found.") + + +async def _handle_resume( + update: Update, scheduler: JobScheduler, args: List[str] +) -> None: + """Resume a paused job.""" + if not args: + await update.message.reply_text("Usage: /schedule resume ") + return + + job_id = args[0] + success = await scheduler.resume_job(job_id) + if success: + await update.message.reply_html( + f"Job {escape_html(job_id)} resumed." + ) + else: + await update.message.reply_text(f"Job '{job_id}' not found or failed to resume.") diff --git a/src/bot/orchestrator.py b/src/bot/orchestrator.py index ac1d5304..e04f94a4 100644 --- a/src/bot/orchestrator.py +++ b/src/bot/orchestrator.py @@ -311,6 +311,10 @@ def _register_agentic_handlers(self, app: Application) -> None: ] if self.settings.enable_project_threads: handlers.append(("sync_threads", command.sync_threads)) + if self.settings.enable_scheduler: + from .handlers import schedule + + handlers.append(("schedule", schedule.schedule_command)) for cmd, handler in handlers: app.add_handler(CommandHandler(cmd, self._inject_deps(handler))) @@ -376,6 +380,10 @@ def _register_classic_handlers(self, app: Application) -> None: ] if self.settings.enable_project_threads: handlers.append(("sync_threads", command.sync_threads)) + if self.settings.enable_scheduler: + from .handlers import schedule + + handlers.append(("schedule", schedule.schedule_command)) for cmd, handler in handlers: app.add_handler(CommandHandler(cmd, self._inject_deps(handler))) @@ -420,6 +428,8 @@ async def get_bot_commands(self) -> list: # type: ignore[type-arg] ] if self.settings.enable_project_threads: commands.append(BotCommand("sync_threads", "Sync project topics")) + if self.settings.enable_scheduler: + commands.append(BotCommand("schedule", "Manage scheduled jobs")) return commands else: commands = [ @@ -440,6 +450,8 @@ async def get_bot_commands(self) -> list: # type: ignore[type-arg] ] if self.settings.enable_project_threads: commands.append(BotCommand("sync_threads", "Sync project topics")) + if self.settings.enable_scheduler: + commands.append(BotCommand("schedule", "Manage scheduled jobs")) return commands # --- Agentic handlers --- diff --git a/src/main.py b/src/main.py index 02660733..4683bcd6 100644 --- a/src/main.py +++ b/src/main.py @@ -181,6 +181,7 @@ async def create_application(config: Settings) -> Dict[str, Any]: "event_bus": event_bus, "project_registry": None, "project_threads_manager": None, + "scheduler": None, } bot = ClaudeCodeBot(config, dependencies) @@ -314,6 +315,7 @@ def signal_handler(signum: int, frame: Any) -> None: default_working_directory=config.approved_directory, ) await scheduler.start() + bot.deps["scheduler"] = scheduler logger.info("Job scheduler enabled") # Shutdown task diff --git a/src/scheduler/scheduler.py b/src/scheduler/scheduler.py index 98d90a55..e06ce33e 100644 --- a/src/scheduler/scheduler.py +++ b/src/scheduler/scheduler.py @@ -116,12 +116,89 @@ async def remove_job(self, job_id: str) -> bool: logger.info("Scheduled job removed", job_id=job_id) return True - async def list_jobs(self) -> List[Dict[str, Any]]: - """List all scheduled jobs from the database.""" + async def pause_job(self, job_id: str) -> bool: + """Pause a scheduled job (remove from APScheduler, mark inactive in DB).""" + try: + self._scheduler.remove_job(job_id) + except Exception: + logger.warning("Job not found in scheduler", job_id=job_id) + + async with self.db_manager.get_connection() as conn: + cursor = await conn.execute( + "UPDATE scheduled_jobs SET is_active = 0 WHERE job_id = ?", + (job_id,), + ) + await conn.commit() + if cursor.rowcount == 0: + return False + + logger.info("Scheduled job paused", job_id=job_id) + return True + + async def resume_job(self, job_id: str) -> bool: + """Resume a paused job (re-register with APScheduler, mark active in DB).""" async with self.db_manager.get_connection() as conn: cursor = await conn.execute( - "SELECT * FROM scheduled_jobs WHERE is_active = 1 ORDER BY created_at" + "SELECT * FROM scheduled_jobs WHERE job_id = ?", + (job_id,), + ) + row = await cursor.fetchone() + + if not row: + return False + + row_dict = dict(row) + + # Re-register with APScheduler + try: + trigger = CronTrigger.from_crontab(row_dict["cron_expression"]) + chat_ids_str = row_dict.get("target_chat_ids", "") + chat_ids = ( + [int(x) for x in chat_ids_str.split(",") if x.strip()] + if chat_ids_str + else [] + ) + self._scheduler.add_job( + self._fire_event, + trigger=trigger, + kwargs={ + "job_name": row_dict["job_name"], + "prompt": row_dict["prompt"], + "working_directory": row_dict["working_directory"], + "target_chat_ids": chat_ids, + "skill_name": row_dict.get("skill_name"), + }, + id=job_id, + name=row_dict["job_name"], + replace_existing=True, + ) + except Exception: + logger.exception("Failed to re-register job with scheduler", job_id=job_id) + return False + + # Mark active in DB + async with self.db_manager.get_connection() as conn: + await conn.execute( + "UPDATE scheduled_jobs SET is_active = 1 WHERE job_id = ?", + (job_id,), ) + await conn.commit() + + logger.info("Scheduled job resumed", job_id=job_id) + return True + + async def list_jobs(self, include_paused: bool = False) -> List[Dict[str, Any]]: + """List scheduled jobs from the database.""" + async with self.db_manager.get_connection() as conn: + if include_paused: + cursor = await conn.execute( + "SELECT * FROM scheduled_jobs ORDER BY created_at" + ) + else: + cursor = await conn.execute( + "SELECT * FROM scheduled_jobs" + " WHERE is_active = 1 ORDER BY created_at" + ) rows = await cursor.fetchall() return [dict(row) for row in rows] diff --git a/tests/unit/test_schedule_handler.py b/tests/unit/test_schedule_handler.py new file mode 100644 index 00000000..408a4126 --- /dev/null +++ b/tests/unit/test_schedule_handler.py @@ -0,0 +1,214 @@ +"""Tests for the /schedule command handler.""" + +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from src.bot.handlers.schedule import schedule_command + + +@pytest.fixture +def tmp_dir(): + with tempfile.TemporaryDirectory() as d: + yield Path(d) + + +@pytest.fixture +def mock_scheduler(): + scheduler = AsyncMock() + scheduler.list_jobs = AsyncMock(return_value=[]) + scheduler.add_job = AsyncMock(return_value="job-123") + scheduler.remove_job = AsyncMock(return_value=True) + scheduler.pause_job = AsyncMock(return_value=True) + scheduler.resume_job = AsyncMock(return_value=True) + return scheduler + + +@pytest.fixture +def update(): + update = MagicMock() + update.message = MagicMock() + update.message.reply_text = AsyncMock() + update.message.reply_html = AsyncMock() + update.effective_chat = MagicMock() + update.effective_chat.id = 12345 + update.effective_user = MagicMock() + update.effective_user.id = 67890 + return update + + +@pytest.fixture +def context(mock_scheduler, tmp_dir): + context = MagicMock() + context.bot_data = { + "scheduler": mock_scheduler, + "settings": MagicMock(approved_directory=tmp_dir), + } + context.user_data = {"current_directory": tmp_dir} + context.args = [] + return context + + +async def test_no_scheduler_shows_error(update, context): + """When scheduler is not injected, show error message.""" + context.bot_data["scheduler"] = None + await schedule_command(update, context) + update.message.reply_text.assert_called_once() + assert "not enabled" in update.message.reply_text.call_args[0][0] + + +async def test_no_args_shows_usage(update, context): + """No subcommand shows usage info.""" + context.args = [] + await schedule_command(update, context) + update.message.reply_html.assert_called_once() + assert "Usage" in update.message.reply_html.call_args[0][0] + + +async def test_unknown_subcommand_shows_usage(update, context): + """Unknown subcommand shows usage info.""" + context.args = ["unknown"] + await schedule_command(update, context) + update.message.reply_html.assert_called_once() + assert "Usage" in update.message.reply_html.call_args[0][0] + + +async def test_list_empty(update, context, mock_scheduler): + """List with no jobs shows empty message.""" + context.args = ["list"] + await schedule_command(update, context) + mock_scheduler.list_jobs.assert_called_once_with(include_paused=True) + update.message.reply_text.assert_called_once_with("No scheduled jobs.") + + +async def test_list_with_jobs(update, context, mock_scheduler): + """List with jobs formats them correctly.""" + mock_scheduler.list_jobs.return_value = [ + { + "job_id": "abc-123", + "job_name": "daily-report", + "cron_expression": "0 9 * * *", + "is_active": 1, + }, + { + "job_id": "def-456", + "job_name": "weekly-backup", + "cron_expression": "0 0 * * 0", + "is_active": 0, + }, + ] + context.args = ["list"] + await schedule_command(update, context) + html = update.message.reply_html.call_args[0][0] + assert "daily-report" in html + assert "active" in html + assert "weekly-backup" in html + assert "paused" in html + assert "abc-123" in html + + +async def test_add_success(update, context, mock_scheduler, tmp_dir): + """Add creates a job with correct parameters.""" + context.args = ["add", "my-job", "0", "9", "*", "*", "1-5", "Run", "status", "check"] + await schedule_command(update, context) + + mock_scheduler.add_job.assert_called_once_with( + job_name="my-job", + cron_expression="0 9 * * 1-5", + prompt="Run status check", + target_chat_ids=[12345], + working_directory=tmp_dir, + created_by=67890, + ) + html = update.message.reply_html.call_args[0][0] + assert "my-job" in html + assert "job-123" in html + + +async def test_add_too_few_args(update, context): + """Add with insufficient args shows usage.""" + context.args = ["add", "my-job", "0", "9"] + await schedule_command(update, context) + update.message.reply_html.assert_called_once() + assert "Usage" in update.message.reply_html.call_args[0][0] + + +async def test_add_failure(update, context, mock_scheduler): + """Add handles scheduler errors gracefully.""" + mock_scheduler.add_job.side_effect = ValueError("Invalid cron expression") + context.args = ["add", "bad-job", "x", "y", "z", "a", "b", "Do", "something"] + await schedule_command(update, context) + update.message.reply_text.assert_called_once() + assert "Failed" in update.message.reply_text.call_args[0][0] + + +async def test_remove(update, context, mock_scheduler): + """Remove calls scheduler.remove_job.""" + context.args = ["remove", "job-123"] + await schedule_command(update, context) + mock_scheduler.remove_job.assert_called_once_with("job-123") + html = update.message.reply_html.call_args[0][0] + assert "job-123" in html + assert "removed" in html + + +async def test_remove_no_id(update, context): + """Remove without job_id shows usage.""" + context.args = ["remove"] + await schedule_command(update, context) + update.message.reply_text.assert_called_once() + assert "Usage" in update.message.reply_text.call_args[0][0] + + +async def test_pause_success(update, context, mock_scheduler): + """Pause calls scheduler.pause_job.""" + context.args = ["pause", "job-123"] + await schedule_command(update, context) + mock_scheduler.pause_job.assert_called_once_with("job-123") + html = update.message.reply_html.call_args[0][0] + assert "paused" in html + + +async def test_pause_not_found(update, context, mock_scheduler): + """Pause with unknown job_id shows not found.""" + mock_scheduler.pause_job.return_value = False + context.args = ["pause", "unknown-id"] + await schedule_command(update, context) + update.message.reply_text.assert_called_once() + assert "not found" in update.message.reply_text.call_args[0][0] + + +async def test_pause_no_id(update, context): + """Pause without job_id shows usage.""" + context.args = ["pause"] + await schedule_command(update, context) + update.message.reply_text.assert_called_once() + assert "Usage" in update.message.reply_text.call_args[0][0] + + +async def test_resume_success(update, context, mock_scheduler): + """Resume calls scheduler.resume_job.""" + context.args = ["resume", "job-123"] + await schedule_command(update, context) + mock_scheduler.resume_job.assert_called_once_with("job-123") + html = update.message.reply_html.call_args[0][0] + assert "resumed" in html + + +async def test_resume_not_found(update, context, mock_scheduler): + """Resume with unknown job_id shows failure message.""" + mock_scheduler.resume_job.return_value = False + context.args = ["resume", "unknown-id"] + await schedule_command(update, context) + update.message.reply_text.assert_called_once() + assert "not found" in update.message.reply_text.call_args[0][0] + + +async def test_resume_no_id(update, context): + """Resume without job_id shows usage.""" + context.args = ["resume"] + await schedule_command(update, context) + update.message.reply_text.assert_called_once() + assert "Usage" in update.message.reply_text.call_args[0][0]