diff --git a/src/mcp_trove_crunchtools/database.py b/src/mcp_trove_crunchtools/database.py index 5f59b47..dbbf357 100644 --- a/src/mcp_trove_crunchtools/database.py +++ b/src/mcp_trove_crunchtools/database.py @@ -71,6 +71,19 @@ total_chunks INTEGER NOT NULL DEFAULT 0, error_message TEXT ); + +CREATE TABLE IF NOT EXISTS index_errors ( + id INTEGER PRIMARY KEY, + run_id INTEGER REFERENCES index_runs(id), + path TEXT NOT NULL, + error_message TEXT NOT NULL, + error_type TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + resolved_at TEXT, + resolved INTEGER NOT NULL DEFAULT 0 +); +CREATE INDEX IF NOT EXISTS idx_errors_path ON index_errors(path); +CREATE INDEX IF NOT EXISTS idx_errors_resolved ON index_errors(resolved); """ VEC_TABLE_SQL = f""" @@ -314,3 +327,82 @@ def log_run_error(run_id: int, error_message: str) -> None: "error_message = ? WHERE id = ?", (error_message, run_id), ) + + +# --- Per-file error tracking --- + +_TRANSIENT_PATTERNS = ( + "connection reset", + "dns", + "503", + "timeout", + "temporary failure", + "name resolution", + "broken pipe", + "connection refused", + "network unreachable", +) + + +def classify_error(error_message: str) -> str: + """Classify an error as 'transient' or 'permanent'.""" + lower = error_message.lower() + for pattern in _TRANSIENT_PATTERNS: + if pattern in lower: + return "transient" + return "permanent" + + +def insert_error( + run_id: int | None, path: str, error_message: str, error_type: str, +) -> int: + """Record a per-file indexing failure.""" + return execute( + "INSERT INTO index_errors (run_id, path, error_message, error_type) " + "VALUES (?, ?, ?, ?)", + (run_id, path, error_message, error_type), + ) + + +def resolve_errors(path: str) -> int: + """Mark all unresolved errors for a path as resolved. + + Returns the number of rows updated. + """ + db = get_db() + cursor = db.execute( + "UPDATE index_errors SET resolved = 1, resolved_at = datetime('now') " + "WHERE path = ? AND resolved = 0", + (path,), + ) + db.commit() + return cursor.rowcount + + +def query_errors( + *, + resolved: bool | None = False, + path: str | None = None, + limit: int = 100, +) -> list[dict[str, Any]]: + """List per-file indexing errors with optional filters.""" + clauses: list[str] = [] + params: list[Any] = [] + + if resolved is not None: + clauses.append("resolved = ?") + params.append(1 if resolved else 0) + if path: + clauses.append("path LIKE ?") + params.append(path + "%") + + where = ("WHERE " + " AND ".join(clauses)) if clauses else "" + params.append(limit) + + return query( + f"SELECT id, run_id, path, error_message, error_type, " # noqa: S608 + f"created_at, resolved_at, resolved " + f"FROM index_errors {where} " + f"ORDER BY created_at DESC LIMIT ?", + tuple(params), + ) diff --git a/src/mcp_trove_crunchtools/indexer.py b/src/mcp_trove_crunchtools/indexer.py index aec038d..cfee957 100644 --- a/src/mcp_trove_crunchtools/indexer.py +++ b/src/mcp_trove_crunchtools/indexer.py @@ -154,6 +154,7 @@ def _store_one(extraction: dict[str, Any]) -> dict[str, str | int]: if not chunks: db.update_file(file_id, checksum, file_size, 0) + db.resolve_errors(path_str) return {"path": path_str, "status": "indexed", "chunk_count": 0} embeddings = embed_texts(chunks) @@ -165,6 +166,7 @@ def _store_one(extraction: dict[str, Any]) -> dict[str, str | int]: db.insert_vector(chunk_id, embedding) db.update_file(file_id, checksum, file_size, len(chunks)) + db.resolve_errors(path_str) return {"path": path_str, "status": "indexed", "chunk_count": len(chunks)} @@ -228,6 +230,7 @@ def _partition_unchanged( async def _extract_and_store_batched( to_extract: list[tuple[Path, str, int, int | None]], results: list[dict[str, str | int]], + run_id: int | None = None, ) -> None: """Extract files in batches, storing each batch before starting the next. @@ -267,13 +270,18 @@ async def extract_bounded( # Store this batch immediately, then free extraction data for idx, raw in enumerate(extractions): if isinstance(raw, BaseException): + err_path = str(chunk[idx][0]) + err_msg = str(raw) logger.warning( "Failed to extract %s: %s", chunk[idx][0], raw, ) + db.insert_error( + run_id, err_path, err_msg, db.classify_error(err_msg), + ) results.append({ - "path": str(chunk[idx][0]), + "path": err_path, "status": "error", - "reason": str(raw), + "reason": err_msg, "chunk_count": 0, }) continue @@ -318,7 +326,7 @@ async def index_path_async( results, to_extract = _partition_unchanged(files, force) if to_extract: - await _extract_and_store_batched(to_extract, results) + await _extract_and_store_batched(to_extract, results, run_id) indexed = sum(1 for r in results if r["status"] == "indexed") skipped = sum(1 for r in results if r["status"] == "skipped") diff --git a/src/mcp_trove_crunchtools/server.py b/src/mcp_trove_crunchtools/server.py index 1080b80..94fd953 100644 --- a/src/mcp_trove_crunchtools/server.py +++ b/src/mcp_trove_crunchtools/server.py @@ -11,6 +11,7 @@ trove_index, trove_list, trove_log, + trove_quality, trove_reindex, trove_remove, trove_search, @@ -172,3 +173,23 @@ async def trove_get_chunks_tool( limit: Maximum chunks to return (1-500, default: 50) """ return await trove_get_chunks(file_path, limit) + + +@mcp.tool() +async def trove_quality_tool( + path: str | None = None, + show_resolved: bool = False, + limit: int = 100, +) -> dict[str, Any]: + """Per-file error tracking from indexing runs. + + Shows which files failed, why, and whether they were resolved on a + subsequent successful index. Use this to find files worth retrying + via trove_reindex. + + Args: + path: Optional path prefix to filter errors + show_resolved: Include resolved errors (default: False) + limit: Maximum error records to return (1-500, default: 100) + """ + return await trove_quality(path, show_resolved, limit) diff --git a/src/mcp_trove_crunchtools/tools/__init__.py b/src/mcp_trove_crunchtools/tools/__init__.py index 39a5017..21410f8 100644 --- a/src/mcp_trove_crunchtools/tools/__init__.py +++ b/src/mcp_trove_crunchtools/tools/__init__.py @@ -2,7 +2,7 @@ from .index import trove_index, trove_reindex, trove_remove from .search import trove_search, trove_similar -from .status import trove_get_chunks, trove_list, trove_log, trove_status +from .status import trove_get_chunks, trove_list, trove_log, trove_quality, trove_status __all__ = [ "trove_search", @@ -14,4 +14,5 @@ "trove_list", "trove_log", "trove_get_chunks", + "trove_quality", ] diff --git a/src/mcp_trove_crunchtools/tools/status.py b/src/mcp_trove_crunchtools/tools/status.py index a12a00d..9386d0b 100644 --- a/src/mcp_trove_crunchtools/tools/status.py +++ b/src/mcp_trove_crunchtools/tools/status.py @@ -3,6 +3,7 @@ from __future__ import annotations import json +from collections import Counter from typing import Any from .. import database as db @@ -122,3 +123,29 @@ async def trove_get_chunks( } for row in chunks ] + + +async def trove_quality( + path: str | None = None, + show_resolved: bool = False, + limit: int = 100, +) -> dict[str, Any]: + """Per-file error summary and details from indexing runs.""" + resolved_filter: bool | None = None if show_resolved else False + errors = db.query_errors(resolved=resolved_filter, path=path, limit=limit) + + # Compute aggregate counts across all errors (not just the page returned) + all_errors = db.query_errors(resolved=None, path=path, limit=10_000) + total = len(all_errors) + resolved_count = sum(1 for e in all_errors if e["resolved"]) + unresolved_count = total - resolved_count + + by_type: dict[str, int] = dict(Counter(e["error_type"] for e in all_errors)) + + return { + "total_errors": total, + "unresolved": unresolved_count, + "resolved": resolved_count, + "by_type": by_type, + "errors": errors, + } diff --git a/tests/test_tools.py b/tests/test_tools.py index 44bed36..80377bb 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -11,12 +11,18 @@ from mcp_trove_crunchtools.server import mcp from mcp_trove_crunchtools.tools.index import trove_index, trove_reindex, trove_remove from mcp_trove_crunchtools.tools.search import trove_search, trove_similar -from mcp_trove_crunchtools.tools.status import trove_get_chunks, trove_list, trove_log, trove_status +from mcp_trove_crunchtools.tools.status import ( + trove_get_chunks, + trove_list, + trove_log, + trove_quality, + trove_status, +) if TYPE_CHECKING: import sqlite3 -EXPECTED_TOOL_COUNT = 9 +EXPECTED_TOOL_COUNT = 10 class TestToolCount: @@ -291,3 +297,45 @@ async def test_status_includes_last_run(self, in_memory_db: sqlite3.Connection) assert status["last_run"]["status"] == "completed" Path(path).unlink() + + @pytest.mark.asyncio + async def test_quality_empty(self, in_memory_db: sqlite3.Connection) -> None: + result = await trove_quality() + assert result["total_errors"] == 0 + assert result["unresolved"] == 0 + assert result["resolved"] == 0 + assert result["by_type"] == {} + assert result["errors"] == [] + + @pytest.mark.asyncio + async def test_quality_after_error(self, in_memory_db: sqlite3.Connection) -> None: + from mcp_trove_crunchtools import database as test_db + + run_id = test_db.start_run("/nonexistent/test", 1) + test_db.insert_error( + run_id, "/nonexistent/test/bad.pdf", + "connection reset by peer", "transient", + ) + test_db.insert_error( + run_id, "/nonexistent/test/corrupt.pdf", + "invalid PDF structure", "permanent", + ) + + result = await trove_quality() + assert result["total_errors"] == 2 + assert result["unresolved"] == 2 + assert result["resolved"] == 0 + assert result["by_type"]["transient"] == 1 + assert result["by_type"]["permanent"] == 1 + assert len(result["errors"]) == 2 + + # Resolve one error and verify + test_db.resolve_errors("/nonexistent/test/bad.pdf") + result = await trove_quality(show_resolved=True) + assert result["resolved"] == 1 + assert result["unresolved"] == 1 + + # Default (show_resolved=False) should only return unresolved + result = await trove_quality() + assert len(result["errors"]) == 1 + assert result["errors"][0]["path"] == "/nonexistent/test/corrupt.pdf"