Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ matrix = [
rag = [
"flashrank>=0.2.0,<1.0.0",
]
pdf = [
"pymupdf>=1.25.0,<2.0.0",
]
dev = [
"pytest>=9.0.0,<10.0.0",
"pytest-asyncio>=1.3.0,<2.0.0",
Expand Down
33 changes: 23 additions & 10 deletions snapagent/agent/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from snapagent.agent.tools.doctor import DoctorCheckTool
from snapagent.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool
from snapagent.agent.tools.message import MessageTool
from snapagent.agent.tools.pdf import PdfReaderTool
from snapagent.agent.tools.rag import RagQueryTool
from snapagent.agent.tools.registry import ToolRegistry
from snapagent.agent.tools.shell import ExecTool
Expand Down Expand Up @@ -163,6 +164,12 @@ def _register_default_tools(self) -> None:
self.tools.register(SpawnTool(manager=self.subagents))
if self.cron_service:
self.tools.register(CronTool(self.cron_service))
try:
import fitz

self.tools.register(PdfReaderTool(workspace=self.workspace, allowed_dir=allowed_dir))
except ImportError:
pass

async def _connect_mcp(self) -> None:
"""Connect to configured MCP servers (one-time, lazy)."""
Expand Down Expand Up @@ -233,6 +240,7 @@ async def _run_agent_loop(
session_key: str | None = None,
) -> tuple[str | None, list[str], list[dict]]:
"""Run one orchestrated turn. Returns (final_content, tools_used, messages)."""

async def _inject_event(messages: list[dict]) -> bool:
if not session_key:
return False
Expand All @@ -243,7 +251,7 @@ async def _inject_event(messages: list[dict]) -> bool:
messages.append(
{
"role": "system",
"content": f"<SYS_EVENT type=\"user_interrupt\">{event}</SYS_EVENT>",
"content": f'<SYS_EVENT type="user_interrupt">{event}</SYS_EVENT>',
}
)
if flattened_event:
Expand Down Expand Up @@ -379,8 +387,7 @@ async def _handle_doctor(self, msg: InboundMessage) -> None:
channel=msg.channel,
chat_id=msg.chat_id,
content=(
f"🩺 Doctor precheck blocked (stopped {total} task(s)).\n\n"
f"{guidance}"
f"🩺 Doctor precheck blocked (stopped {total} task(s)).\n\n{guidance}"
),
run_id=run_id,
turn_id=turn_id,
Expand Down Expand Up @@ -533,7 +540,9 @@ async def _run_doctor_via_codex_cli(
stderr_text = (await stderr_task).decode("utf-8", "replace").strip()

if exit_code == 0:
final = output or "Doctor completed via Codex CLI, but no final message was captured."
final = (
output or "Doctor completed via Codex CLI, but no final message was captured."
)
else:
detail = stderr_text or output or f"exited with code {exit_code}"
final = f"🩺 Doctor via Codex CLI failed: {detail}"
Expand Down Expand Up @@ -692,9 +701,15 @@ def _doctor_setup_guidance(self) -> str | None:
try:
config_path = get_config_path()
config = load_config()
snapshot = collect_health_snapshot(config=config, config_path=config_path).to_dict(deep=True)
snapshot = collect_health_snapshot(config=config, config_path=config_path).to_dict(
deep=True
)
provider = next(
(item for item in snapshot.get("evidence", []) if item.get("component") == "provider"),
(
item
for item in snapshot.get("evidence", [])
if item.get("component") == "provider"
),
None,
)
if not provider:
Expand Down Expand Up @@ -972,8 +987,7 @@ async def _process_message(
channel=msg.channel,
chat_id=msg.chat_id,
content=(
"\u26a1 Normal mode — I'll execute tools directly.\n"
"Use /plan to switch back."
"\u26a1 Normal mode — I'll execute tools directly.\nUse /plan to switch back."
),
run_id=run_id,
turn_id=turn_id,
Expand Down Expand Up @@ -1001,8 +1015,7 @@ async def _process_message(
doctor_prompt = (
"[Doctor Mode] Diagnose issues using evidence first. "
"Use doctor_check with check=health/status/logs/events as needed. "
"Cite observed evidence and then propose next actions.\n\n"
+ msg.content
"Cite observed evidence and then propose next actions.\n\n" + msg.content
)
if self._doctor_cli_available():
codex_final, codex_ok = await self._run_doctor_via_codex_cli(
Expand Down
256 changes: 256 additions & 0 deletions snapagent/agent/tools/pdf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
"""PDF reader tool using PyMuPDF."""

import base64
import json
import re
from pathlib import Path
from typing import Any

from loguru import logger

from snapagent.agent.tools.base import Tool
from snapagent.agent.tools.filesystem import _resolve_path


class PdfReaderTool(Tool):
"""Tool to read and extract content from PDF files."""

def __init__(
self,
workspace: Path | None = None,
allowed_dir: Path | None = None,
max_pages: int = 100,
extract_images: bool = False,
image_output_dir: str | None = None,
):
self._workspace = workspace
self._allowed_dir = allowed_dir
self._max_pages = max_pages
self._extract_images = extract_images
self._image_output_dir = image_output_dir

@property
def name(self) -> str:
return "read_pdf"

@property
def description(self) -> str:
return (
"Extract text, tables, and metadata from a PDF file. "
"Returns structured content with page numbers."
)

@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the PDF file",
},
"mode": {
"type": "string",
"enum": ["text", "tables", "metadata", "full"],
"description": "Extraction mode: text (default), tables, metadata, or full",
},
"pages": {
"type": "string",
"description": "Page range to extract, e.g. '1-5', '1,3,5', 'all' (default)",
},
"password": {
"type": "string",
"description": "Password for encrypted PDF (optional)",
},
},
"required": ["path"],
}

async def execute(
self,
path: str,
mode: str = "text",
pages: str = "all",
password: str | None = None,
**kwargs: Any,
) -> str:
try:
file_path = _resolve_path(path, self._workspace, self._allowed_dir)
if not file_path.exists():
return f"Error: File not found: {path}"
if not file_path.is_file():
return f"Error: Not a file: {path}"
if file_path.suffix.lower() != ".pdf":
return f"Error: Not a PDF file: {path}"
except PermissionError as e:
return f"Error: {e}"

try:
import fitz
except ImportError:
return "Error: PyMuPDF not installed. Install with: pip install snapagent-ai[pdf]"

try:
doc = fitz.open(file_path)
if doc.is_encrypted:
if not password:
doc.close()
return "Error: PDF is encrypted. Provide password parameter."
if not doc.authenticate(password):
doc.close()
return "Error: Invalid password for encrypted PDF."

if mode == "metadata":
result = self._extract_metadata(doc)
elif mode == "tables":
result = self._extract_tables(doc, pages)
else:
result = self._extract_text(doc, pages, mode == "full")

doc.close()
return result

except Exception as e:
logger.error("PDF extraction error: {}", e)
return f"Error extracting PDF: {str(e)}"

def _parse_page_range(self, pages: str, total: int) -> list[int]:
if pages == "all":
return list(range(total))

page_nums = set()
for part in pages.split(","):
part = part.strip()
if "-" in part:
start, end = part.split("-", 1)
start, end = int(start) - 1, int(end)
page_nums.update(range(max(0, start), min(total, end)))
else:
p = int(part) - 1
if 0 <= p < total:
page_nums.add(p)

return sorted(page_nums)[: self._max_pages]

def _extract_text(self, doc, pages: str, include_images: bool) -> str:
total_pages = len(doc)
page_nums = self._parse_page_range(pages, total_pages)

output = []
output.append(f"PDF: {doc.name}")
output.append(f"Total pages: {total_pages}")
output.append(f"Extracting pages: {', '.join(str(p + 1) for p in page_nums)}")
output.append("-" * 40)

for page_num in page_nums:
page = doc[page_num]
output.append(f"\n[Page {page_num + 1}]\n")

text = page.get_text("text")
text = self._clean_text(text)
if text.strip():
output.append(text)

if include_images and self._extract_images:
images = self._extract_page_images(doc, page, page_num)
if images:
output.append(f"\n[Images on page {page_num + 1}]")
output.extend(images)

return "\n".join(output)

def _extract_tables(self, doc, pages: str) -> str:
total_pages = len(doc)
page_nums = self._parse_page_range(pages, total_pages)

output = []
output.append(f"PDF: {doc.name}")
output.append(f"Extracting tables from {len(page_nums)} pages")
output.append("-" * 40)

tables_found = 0
for page_num in page_nums:
page = doc[page_num]
tables = page.find_tables()

if tables.tables:
for i, table in enumerate(tables.tables, 1):
tables_found += 1
output.append(f"\n[Table {tables_found} - Page {page_num + 1}]")

df = table.to_pandas()
output.append(df.to_string(index=False))
output.append("")

if tables_found == 0:
output.append("\nNo tables found in the specified pages.")

return "\n".join(output)

def _extract_metadata(self, doc) -> str:
meta = doc.metadata

output = []
output.append(f"PDF Metadata: {doc.name}")
output.append("-" * 40)

fields = {
"title": "Title",
"author": "Author",
"subject": "Subject",
"keywords": "Keywords",
"creator": "Creator",
"producer": "Producer",
"creationDate": "Created",
"modDate": "Modified",
"format": "Format",
"encryption": "Encryption",
}

for key, label in fields.items():
value = meta.get(key)
if value:
output.append(f"{label}: {value}")

output.append(f"Pages: {len(doc)}")

toc = doc.get_toc()
if toc:
output.append("\nTable of Contents:")
for level, title, page in toc[:20]:
indent = " " * (level - 1)
output.append(f"{indent}{title} (p.{page})")
if len(toc) > 20:
output.append(f" ... and {len(toc) - 20} more entries")

return "\n".join(output)

def _extract_page_images(self, doc, page, page_num: int) -> list[str]:
if not self._image_output_dir:
return []

output_dir = Path(self._image_output_dir)
output_dir.mkdir(parents=True, exist_ok=True)

results = []
image_list = page.get_images(full=True)

for img_index, img in enumerate(image_list):
xref = img[0]
base_image = doc.extract_image(xref)
image_bytes = base_image["image"]
image_ext = base_image["ext"]

img_filename = f"page{page_num + 1}_img{img_index + 1}.{image_ext}"
img_path = output_dir / img_filename
img_path.write_bytes(image_bytes)

results.append(f" Saved: {img_path}")

return results

@staticmethod
def _clean_text(text: str) -> str:
text = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", "", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
Loading
Loading