diff --git a/pyproject.toml b/pyproject.toml
index 345e5a4..5a0b296 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -53,6 +53,9 @@ matrix = [
rag = [
"flashrank>=0.2.0,<1.0.0",
]
+pdf = [
+ "pymupdf>=1.25.0,<2.0.0",
+]
dev = [
"pytest>=9.0.0,<10.0.0",
"pytest-asyncio>=1.3.0,<2.0.0",
diff --git a/snapagent/agent/loop.py b/snapagent/agent/loop.py
index ef137d4..9e85fc4 100644
--- a/snapagent/agent/loop.py
+++ b/snapagent/agent/loop.py
@@ -22,6 +22,7 @@
from snapagent.agent.tools.doctor import DoctorCheckTool
from snapagent.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool
from snapagent.agent.tools.message import MessageTool
+from snapagent.agent.tools.pdf import PdfReaderTool
from snapagent.agent.tools.rag import RagQueryTool
from snapagent.agent.tools.registry import ToolRegistry
from snapagent.agent.tools.shell import ExecTool
@@ -163,6 +164,12 @@ def _register_default_tools(self) -> None:
self.tools.register(SpawnTool(manager=self.subagents))
if self.cron_service:
self.tools.register(CronTool(self.cron_service))
+ try:
+ import fitz
+
+ self.tools.register(PdfReaderTool(workspace=self.workspace, allowed_dir=allowed_dir))
+ except ImportError:
+ pass
async def _connect_mcp(self) -> None:
"""Connect to configured MCP servers (one-time, lazy)."""
@@ -233,6 +240,7 @@ async def _run_agent_loop(
session_key: str | None = None,
) -> tuple[str | None, list[str], list[dict]]:
"""Run one orchestrated turn. Returns (final_content, tools_used, messages)."""
+
async def _inject_event(messages: list[dict]) -> bool:
if not session_key:
return False
@@ -243,7 +251,7 @@ async def _inject_event(messages: list[dict]) -> bool:
messages.append(
{
"role": "system",
- "content": f"{event}",
+ "content": f'{event}',
}
)
if flattened_event:
@@ -379,8 +387,7 @@ async def _handle_doctor(self, msg: InboundMessage) -> None:
channel=msg.channel,
chat_id=msg.chat_id,
content=(
- f"🩺 Doctor precheck blocked (stopped {total} task(s)).\n\n"
- f"{guidance}"
+ f"🩺 Doctor precheck blocked (stopped {total} task(s)).\n\n{guidance}"
),
run_id=run_id,
turn_id=turn_id,
@@ -533,7 +540,9 @@ async def _run_doctor_via_codex_cli(
stderr_text = (await stderr_task).decode("utf-8", "replace").strip()
if exit_code == 0:
- final = output or "Doctor completed via Codex CLI, but no final message was captured."
+ final = (
+ output or "Doctor completed via Codex CLI, but no final message was captured."
+ )
else:
detail = stderr_text or output or f"exited with code {exit_code}"
final = f"🩺 Doctor via Codex CLI failed: {detail}"
@@ -692,9 +701,15 @@ def _doctor_setup_guidance(self) -> str | None:
try:
config_path = get_config_path()
config = load_config()
- snapshot = collect_health_snapshot(config=config, config_path=config_path).to_dict(deep=True)
+ snapshot = collect_health_snapshot(config=config, config_path=config_path).to_dict(
+ deep=True
+ )
provider = next(
- (item for item in snapshot.get("evidence", []) if item.get("component") == "provider"),
+ (
+ item
+ for item in snapshot.get("evidence", [])
+ if item.get("component") == "provider"
+ ),
None,
)
if not provider:
@@ -972,8 +987,7 @@ async def _process_message(
channel=msg.channel,
chat_id=msg.chat_id,
content=(
- "\u26a1 Normal mode — I'll execute tools directly.\n"
- "Use /plan to switch back."
+ "\u26a1 Normal mode — I'll execute tools directly.\nUse /plan to switch back."
),
run_id=run_id,
turn_id=turn_id,
@@ -1001,8 +1015,7 @@ async def _process_message(
doctor_prompt = (
"[Doctor Mode] Diagnose issues using evidence first. "
"Use doctor_check with check=health/status/logs/events as needed. "
- "Cite observed evidence and then propose next actions.\n\n"
- + msg.content
+ "Cite observed evidence and then propose next actions.\n\n" + msg.content
)
if self._doctor_cli_available():
codex_final, codex_ok = await self._run_doctor_via_codex_cli(
diff --git a/snapagent/agent/tools/pdf.py b/snapagent/agent/tools/pdf.py
new file mode 100644
index 0000000..3fa1dc6
--- /dev/null
+++ b/snapagent/agent/tools/pdf.py
@@ -0,0 +1,256 @@
+"""PDF reader tool using PyMuPDF."""
+
+import base64
+import json
+import re
+from pathlib import Path
+from typing import Any
+
+from loguru import logger
+
+from snapagent.agent.tools.base import Tool
+from snapagent.agent.tools.filesystem import _resolve_path
+
+
+class PdfReaderTool(Tool):
+ """Tool to read and extract content from PDF files."""
+
+ def __init__(
+ self,
+ workspace: Path | None = None,
+ allowed_dir: Path | None = None,
+ max_pages: int = 100,
+ extract_images: bool = False,
+ image_output_dir: str | None = None,
+ ):
+ self._workspace = workspace
+ self._allowed_dir = allowed_dir
+ self._max_pages = max_pages
+ self._extract_images = extract_images
+ self._image_output_dir = image_output_dir
+
+ @property
+ def name(self) -> str:
+ return "read_pdf"
+
+ @property
+ def description(self) -> str:
+ return (
+ "Extract text, tables, and metadata from a PDF file. "
+ "Returns structured content with page numbers."
+ )
+
+ @property
+ def parameters(self) -> dict[str, Any]:
+ return {
+ "type": "object",
+ "properties": {
+ "path": {
+ "type": "string",
+ "description": "Path to the PDF file",
+ },
+ "mode": {
+ "type": "string",
+ "enum": ["text", "tables", "metadata", "full"],
+ "description": "Extraction mode: text (default), tables, metadata, or full",
+ },
+ "pages": {
+ "type": "string",
+ "description": "Page range to extract, e.g. '1-5', '1,3,5', 'all' (default)",
+ },
+ "password": {
+ "type": "string",
+ "description": "Password for encrypted PDF (optional)",
+ },
+ },
+ "required": ["path"],
+ }
+
+ async def execute(
+ self,
+ path: str,
+ mode: str = "text",
+ pages: str = "all",
+ password: str | None = None,
+ **kwargs: Any,
+ ) -> str:
+ try:
+ file_path = _resolve_path(path, self._workspace, self._allowed_dir)
+ if not file_path.exists():
+ return f"Error: File not found: {path}"
+ if not file_path.is_file():
+ return f"Error: Not a file: {path}"
+ if file_path.suffix.lower() != ".pdf":
+ return f"Error: Not a PDF file: {path}"
+ except PermissionError as e:
+ return f"Error: {e}"
+
+ try:
+ import fitz
+ except ImportError:
+ return "Error: PyMuPDF not installed. Install with: pip install snapagent-ai[pdf]"
+
+ try:
+ doc = fitz.open(file_path)
+ if doc.is_encrypted:
+ if not password:
+ doc.close()
+ return "Error: PDF is encrypted. Provide password parameter."
+ if not doc.authenticate(password):
+ doc.close()
+ return "Error: Invalid password for encrypted PDF."
+
+ if mode == "metadata":
+ result = self._extract_metadata(doc)
+ elif mode == "tables":
+ result = self._extract_tables(doc, pages)
+ else:
+ result = self._extract_text(doc, pages, mode == "full")
+
+ doc.close()
+ return result
+
+ except Exception as e:
+ logger.error("PDF extraction error: {}", e)
+ return f"Error extracting PDF: {str(e)}"
+
+ def _parse_page_range(self, pages: str, total: int) -> list[int]:
+ if pages == "all":
+ return list(range(total))
+
+ page_nums = set()
+ for part in pages.split(","):
+ part = part.strip()
+ if "-" in part:
+ start, end = part.split("-", 1)
+ start, end = int(start) - 1, int(end)
+ page_nums.update(range(max(0, start), min(total, end)))
+ else:
+ p = int(part) - 1
+ if 0 <= p < total:
+ page_nums.add(p)
+
+ return sorted(page_nums)[: self._max_pages]
+
+ def _extract_text(self, doc, pages: str, include_images: bool) -> str:
+ total_pages = len(doc)
+ page_nums = self._parse_page_range(pages, total_pages)
+
+ output = []
+ output.append(f"PDF: {doc.name}")
+ output.append(f"Total pages: {total_pages}")
+ output.append(f"Extracting pages: {', '.join(str(p + 1) for p in page_nums)}")
+ output.append("-" * 40)
+
+ for page_num in page_nums:
+ page = doc[page_num]
+ output.append(f"\n[Page {page_num + 1}]\n")
+
+ text = page.get_text("text")
+ text = self._clean_text(text)
+ if text.strip():
+ output.append(text)
+
+ if include_images and self._extract_images:
+ images = self._extract_page_images(doc, page, page_num)
+ if images:
+ output.append(f"\n[Images on page {page_num + 1}]")
+ output.extend(images)
+
+ return "\n".join(output)
+
+ def _extract_tables(self, doc, pages: str) -> str:
+ total_pages = len(doc)
+ page_nums = self._parse_page_range(pages, total_pages)
+
+ output = []
+ output.append(f"PDF: {doc.name}")
+ output.append(f"Extracting tables from {len(page_nums)} pages")
+ output.append("-" * 40)
+
+ tables_found = 0
+ for page_num in page_nums:
+ page = doc[page_num]
+ tables = page.find_tables()
+
+ if tables.tables:
+ for i, table in enumerate(tables.tables, 1):
+ tables_found += 1
+ output.append(f"\n[Table {tables_found} - Page {page_num + 1}]")
+
+ df = table.to_pandas()
+ output.append(df.to_string(index=False))
+ output.append("")
+
+ if tables_found == 0:
+ output.append("\nNo tables found in the specified pages.")
+
+ return "\n".join(output)
+
+ def _extract_metadata(self, doc) -> str:
+ meta = doc.metadata
+
+ output = []
+ output.append(f"PDF Metadata: {doc.name}")
+ output.append("-" * 40)
+
+ fields = {
+ "title": "Title",
+ "author": "Author",
+ "subject": "Subject",
+ "keywords": "Keywords",
+ "creator": "Creator",
+ "producer": "Producer",
+ "creationDate": "Created",
+ "modDate": "Modified",
+ "format": "Format",
+ "encryption": "Encryption",
+ }
+
+ for key, label in fields.items():
+ value = meta.get(key)
+ if value:
+ output.append(f"{label}: {value}")
+
+ output.append(f"Pages: {len(doc)}")
+
+ toc = doc.get_toc()
+ if toc:
+ output.append("\nTable of Contents:")
+ for level, title, page in toc[:20]:
+ indent = " " * (level - 1)
+ output.append(f"{indent}{title} (p.{page})")
+ if len(toc) > 20:
+ output.append(f" ... and {len(toc) - 20} more entries")
+
+ return "\n".join(output)
+
+ def _extract_page_images(self, doc, page, page_num: int) -> list[str]:
+ if not self._image_output_dir:
+ return []
+
+ output_dir = Path(self._image_output_dir)
+ output_dir.mkdir(parents=True, exist_ok=True)
+
+ results = []
+ image_list = page.get_images(full=True)
+
+ for img_index, img in enumerate(image_list):
+ xref = img[0]
+ base_image = doc.extract_image(xref)
+ image_bytes = base_image["image"]
+ image_ext = base_image["ext"]
+
+ img_filename = f"page{page_num + 1}_img{img_index + 1}.{image_ext}"
+ img_path = output_dir / img_filename
+ img_path.write_bytes(image_bytes)
+
+ results.append(f" Saved: {img_path}")
+
+ return results
+
+ @staticmethod
+ def _clean_text(text: str) -> str:
+ text = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", "", text)
+ text = re.sub(r"\n{3,}", "\n\n", text)
+ return text.strip()
diff --git a/tests/test_pdf_tool.py b/tests/test_pdf_tool.py
new file mode 100644
index 0000000..4e1f4dd
--- /dev/null
+++ b/tests/test_pdf_tool.py
@@ -0,0 +1,181 @@
+"""Tests for PDF reader tool."""
+
+import tempfile
+from pathlib import Path
+
+import pytest
+
+
+def _has_pymupdf() -> bool:
+ try:
+ import fitz
+
+ return True
+ except ImportError:
+ return False
+
+
+pytestmark = pytest.mark.skipif(not _has_pymupdf(), reason="PyMuPDF not installed")
+
+
+@pytest.fixture
+def sample_pdf() -> Path:
+ """Create a simple PDF for testing."""
+ import fitz
+
+ with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as f:
+ doc = fitz.open()
+ page = doc.new_page()
+ page.insert_text((50, 50), "Hello PDF World!")
+ page.insert_text((50, 100), "This is page 1.")
+
+ page2 = doc.new_page()
+ page2.insert_text((50, 50), "Hello Page 2!")
+ page2.insert_text((50, 100), "Another paragraph here.")
+
+ doc.save(f.name)
+ doc.close()
+ return Path(f.name)
+
+
+@pytest.fixture
+def encrypted_pdf(sample_pdf: Path) -> Path:
+ """Create an encrypted PDF for testing."""
+ import fitz
+
+ with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as f:
+ doc = fitz.open(sample_pdf)
+ doc.save(
+ f.name, encryption=fitz.PDF_ENCRYPT_AES_256, owner_pw="owner123", user_pw="user123"
+ )
+ doc.close()
+ return Path(f.name)
+
+
+class TestPdfReaderTool:
+ def test_tool_properties(self) -> None:
+ from snapagent.agent.tools.pdf import PdfReaderTool
+
+ tool = PdfReaderTool()
+ assert tool.name == "read_pdf"
+ assert "PDF" in tool.description
+ assert "path" in tool.parameters["properties"]
+ assert "mode" in tool.parameters["properties"]
+
+ @pytest.mark.asyncio
+ async def test_read_text_default(self, sample_pdf: Path) -> None:
+ from snapagent.agent.tools.pdf import PdfReaderTool
+
+ tool = PdfReaderTool()
+ result = await tool.execute(path=str(sample_pdf))
+
+ assert "Hello PDF World" in result
+ assert "Page 1" in result
+ assert "Total pages: 2" in result
+
+ @pytest.mark.asyncio
+ async def test_read_specific_pages(self, sample_pdf: Path) -> None:
+ from snapagent.agent.tools.pdf import PdfReaderTool
+
+ tool = PdfReaderTool()
+ result = await tool.execute(path=str(sample_pdf), pages="1")
+
+ assert "Hello PDF World" in result
+ assert "Page 2" not in result
+
+ @pytest.mark.asyncio
+ async def test_read_page_range(self, sample_pdf: Path) -> None:
+ from snapagent.agent.tools.pdf import PdfReaderTool
+
+ tool = PdfReaderTool()
+ result = await tool.execute(path=str(sample_pdf), pages="1-2")
+
+ assert "Page 1" in result
+ assert "Page 2" in result
+
+ @pytest.mark.asyncio
+ async def test_read_metadata(self, sample_pdf: Path) -> None:
+ from snapagent.agent.tools.pdf import PdfReaderTool
+
+ tool = PdfReaderTool()
+ result = await tool.execute(path=str(sample_pdf), mode="metadata")
+
+ assert "PDF Metadata" in result
+ assert "Pages: 2" in result
+
+ @pytest.mark.asyncio
+ async def test_file_not_found(self) -> None:
+ from snapagent.agent.tools.pdf import PdfReaderTool
+
+ tool = PdfReaderTool()
+ result = await tool.execute(path="/nonexistent/file.pdf")
+
+ assert "Error" in result
+ assert "not found" in result.lower()
+
+ @pytest.mark.asyncio
+ async def test_not_a_pdf(self) -> None:
+ from snapagent.agent.tools.pdf import PdfReaderTool
+
+ with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f:
+ f.write(b"This is not a PDF")
+ f.flush()
+ tool = PdfReaderTool()
+ result = await tool.execute(path=f.name)
+
+ assert "Error" in result
+ assert "Not a PDF" in result
+
+ @pytest.mark.asyncio
+ async def test_encrypted_pdf_no_password(self, encrypted_pdf: Path) -> None:
+ from snapagent.agent.tools.pdf import PdfReaderTool
+
+ tool = PdfReaderTool()
+ result = await tool.execute(path=str(encrypted_pdf))
+
+ assert "Error" in result
+ assert "encrypted" in result.lower()
+
+ @pytest.mark.asyncio
+ async def test_encrypted_pdf_with_password(self, encrypted_pdf: Path) -> None:
+ from snapagent.agent.tools.pdf import PdfReaderTool
+
+ tool = PdfReaderTool()
+ result = await tool.execute(path=str(encrypted_pdf), password="user123")
+
+ assert "Hello PDF World" in result or "Pages: 2" in result
+
+ @pytest.mark.asyncio
+ async def test_workspace_restriction(self, sample_pdf: Path) -> None:
+ from snapagent.agent.tools.pdf import PdfReaderTool
+
+ workspace = sample_pdf.parent
+ tool = PdfReaderTool(workspace=workspace, allowed_dir=workspace)
+ result = await tool.execute(path=sample_pdf.name)
+
+ assert "Hello PDF World" in result or "Pages: 2" in result
+
+ @pytest.mark.asyncio
+ async def test_tables_mode_empty(self, sample_pdf: Path) -> None:
+ from snapagent.agent.tools.pdf import PdfReaderTool
+
+ tool = PdfReaderTool()
+ result = await tool.execute(path=str(sample_pdf), mode="tables")
+
+ assert "No tables found" in result
+
+ def test_parse_page_range(self) -> None:
+ from snapagent.agent.tools.pdf import PdfReaderTool
+
+ tool = PdfReaderTool()
+ assert tool._parse_page_range("all", 10) == list(range(10))
+ assert tool._parse_page_range("1", 10) == [0]
+ assert tool._parse_page_range("1-3", 10) == [0, 1, 2]
+ assert tool._parse_page_range("1,3,5", 10) == [0, 2, 4]
+ assert tool._parse_page_range("1-5,7,9", 10) == [0, 1, 2, 3, 4, 6, 8]
+
+ def cleanup(self, sample_pdf: Path, encrypted_pdf: Path) -> None:
+ if sample_pdf.exists():
+ sample_pdf.unlink()
+ if encrypted_pdf.exists():
+ encrypted_pdf.unlink()