Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions src/mcp_trove_crunchtools/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@
total_chunks INTEGER NOT NULL DEFAULT 0,
error_message TEXT
);

CREATE TABLE IF NOT EXISTS index_errors (
id INTEGER PRIMARY KEY,
run_id INTEGER REFERENCES index_runs(id),
path TEXT NOT NULL,
error_message TEXT NOT NULL,
error_type TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
resolved_at TEXT,
resolved INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_errors_path ON index_errors(path);
CREATE INDEX IF NOT EXISTS idx_errors_resolved ON index_errors(resolved);
"""

VEC_TABLE_SQL = f"""
Expand Down Expand Up @@ -314,3 +327,82 @@ def log_run_error(run_id: int, error_message: str) -> None:
"error_message = ? WHERE id = ?",
(error_message, run_id),
)


# --- Per-file error tracking ---

_TRANSIENT_PATTERNS = (
"connection reset",
"dns",
"503",
"timeout",
"temporary failure",
"name resolution",
"broken pipe",
"connection refused",
"network unreachable",
)


def classify_error(error_message: str) -> str:
"""Classify an error as 'transient' or 'permanent'."""
lower = error_message.lower()
for pattern in _TRANSIENT_PATTERNS:
if pattern in lower:
return "transient"
return "permanent"


def insert_error(
run_id: int | None, path: str, error_message: str, error_type: str,
) -> int:
"""Record a per-file indexing failure."""
return execute(
"INSERT INTO index_errors (run_id, path, error_message, error_type) "
"VALUES (?, ?, ?, ?)",
(run_id, path, error_message, error_type),
)


def resolve_errors(path: str) -> int:
"""Mark all unresolved errors for a path as resolved.

Returns the number of rows updated.
"""
db = get_db()
cursor = db.execute(
"UPDATE index_errors SET resolved = 1, resolved_at = datetime('now') "
"WHERE path = ? AND resolved = 0",
(path,),
)
db.commit()
return cursor.rowcount


def query_errors(
*,
resolved: bool | None = False,
path: str | None = None,
limit: int = 100,
) -> list[dict[str, Any]]:
"""List per-file indexing errors with optional filters."""
clauses: list[str] = []
params: list[Any] = []

if resolved is not None:
clauses.append("resolved = ?")
params.append(1 if resolved else 0)
if path:
clauses.append("path LIKE ?")
params.append(path + "%")

where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
params.append(limit)

return query(
f"SELECT id, run_id, path, error_message, error_type, " # noqa: S608
f"created_at, resolved_at, resolved "
f"FROM index_errors {where} "
f"ORDER BY created_at DESC LIMIT ?",
tuple(params),
)
14 changes: 11 additions & 3 deletions src/mcp_trove_crunchtools/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def _store_one(extraction: dict[str, Any]) -> dict[str, str | int]:

if not chunks:
db.update_file(file_id, checksum, file_size, 0)
db.resolve_errors(path_str)
return {"path": path_str, "status": "indexed", "chunk_count": 0}

embeddings = embed_texts(chunks)
Expand All @@ -165,6 +166,7 @@ def _store_one(extraction: dict[str, Any]) -> dict[str, str | int]:
db.insert_vector(chunk_id, embedding)

db.update_file(file_id, checksum, file_size, len(chunks))
db.resolve_errors(path_str)

return {"path": path_str, "status": "indexed", "chunk_count": len(chunks)}

Expand Down Expand Up @@ -228,6 +230,7 @@ def _partition_unchanged(
async def _extract_and_store_batched(
to_extract: list[tuple[Path, str, int, int | None]],
results: list[dict[str, str | int]],
run_id: int | None = None,
) -> None:
"""Extract files in batches, storing each batch before starting the next.

Expand Down Expand Up @@ -267,13 +270,18 @@ async def extract_bounded(
# Store this batch immediately, then free extraction data
for idx, raw in enumerate(extractions):
if isinstance(raw, BaseException):
err_path = str(chunk[idx][0])
err_msg = str(raw)
logger.warning(
"Failed to extract %s: %s", chunk[idx][0], raw,
)
db.insert_error(
run_id, err_path, err_msg, db.classify_error(err_msg),
)
results.append({
"path": str(chunk[idx][0]),
"path": err_path,
"status": "error",
"reason": str(raw),
"reason": err_msg,
"chunk_count": 0,
})
continue
Expand Down Expand Up @@ -318,7 +326,7 @@ async def index_path_async(
results, to_extract = _partition_unchanged(files, force)

if to_extract:
await _extract_and_store_batched(to_extract, results)
await _extract_and_store_batched(to_extract, results, run_id)

indexed = sum(1 for r in results if r["status"] == "indexed")
skipped = sum(1 for r in results if r["status"] == "skipped")
Expand Down
21 changes: 21 additions & 0 deletions src/mcp_trove_crunchtools/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
trove_index,
trove_list,
trove_log,
trove_quality,
trove_reindex,
trove_remove,
trove_search,
Expand Down Expand Up @@ -172,3 +173,23 @@ async def trove_get_chunks_tool(
limit: Maximum chunks to return (1-500, default: 50)
"""
return await trove_get_chunks(file_path, limit)


@mcp.tool()
async def trove_quality_tool(
path: str | None = None,
show_resolved: bool = False,
limit: int = 100,
) -> dict[str, Any]:
"""Per-file error tracking from indexing runs.

Shows which files failed, why, and whether they were resolved on a
subsequent successful index. Use this to find files worth retrying
via trove_reindex.

Args:
path: Optional path prefix to filter errors
show_resolved: Include resolved errors (default: False)
limit: Maximum error records to return (1-500, default: 100)
"""
return await trove_quality(path, show_resolved, limit)
3 changes: 2 additions & 1 deletion src/mcp_trove_crunchtools/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from .index import trove_index, trove_reindex, trove_remove
from .search import trove_search, trove_similar
from .status import trove_get_chunks, trove_list, trove_log, trove_status
from .status import trove_get_chunks, trove_list, trove_log, trove_quality, trove_status

__all__ = [
"trove_search",
Expand All @@ -14,4 +14,5 @@
"trove_list",
"trove_log",
"trove_get_chunks",
"trove_quality",
]
27 changes: 27 additions & 0 deletions src/mcp_trove_crunchtools/tools/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import json
from collections import Counter
from typing import Any

from .. import database as db
Expand Down Expand Up @@ -122,3 +123,29 @@ async def trove_get_chunks(
}
for row in chunks
]


async def trove_quality(
path: str | None = None,
show_resolved: bool = False,
limit: int = 100,
) -> dict[str, Any]:
"""Per-file error summary and details from indexing runs."""
resolved_filter: bool | None = None if show_resolved else False
errors = db.query_errors(resolved=resolved_filter, path=path, limit=limit)

# Compute aggregate counts across all errors (not just the page returned)
all_errors = db.query_errors(resolved=None, path=path, limit=10_000)
total = len(all_errors)
resolved_count = sum(1 for e in all_errors if e["resolved"])
unresolved_count = total - resolved_count

by_type: dict[str, int] = dict(Counter(e["error_type"] for e in all_errors))
Comment on lines +137 to +143
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation for calculating aggregate error statistics is inefficient. It fetches up to 10,000 full error records into memory to perform calculations. This approach is memory-intensive, will be slow at scale, and will produce incorrect totals if the number of errors exceeds the 10,000 limit.

This can be performed much more efficiently by executing aggregate queries (COUNT, SUM, GROUP BY) directly in the database, which avoids loading all records into memory and removes the arbitrary limit.

Suggested change
# Compute aggregate counts across all errors (not just the page returned)
all_errors = db.query_errors(resolved=None, path=path, limit=10_000)
total = len(all_errors)
resolved_count = sum(1 for e in all_errors if e["resolved"])
unresolved_count = total - resolved_count
by_type: dict[str, int] = dict(Counter(e["error_type"] for e in all_errors))
# Compute aggregate counts efficiently in the database
where_sql = "WHERE path LIKE ?" if path else ""
params = (path + "%",) if path else ()
stats = db.query_one(
f"SELECT COUNT(*) AS total, SUM(resolved) AS resolved_count FROM index_errors {where_sql}", # noqa: S608
params,
)
total = stats["total"] if stats else 0
resolved_count = stats["resolved_count"] if stats and stats["resolved_count"] else 0
unresolved_count = total - resolved_count
type_rows = db.query(
f"SELECT error_type, COUNT(*) as count FROM index_errors {where_sql} GROUP BY error_type", # noqa: S608
params,
)
by_type: dict[str, int] = {row["error_type"]: row["count"] for row in type_rows}


return {
"total_errors": total,
"unresolved": unresolved_count,
"resolved": resolved_count,
"by_type": by_type,
"errors": errors,
}
52 changes: 50 additions & 2 deletions tests/test_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,18 @@
from mcp_trove_crunchtools.server import mcp
from mcp_trove_crunchtools.tools.index import trove_index, trove_reindex, trove_remove
from mcp_trove_crunchtools.tools.search import trove_search, trove_similar
from mcp_trove_crunchtools.tools.status import trove_get_chunks, trove_list, trove_log, trove_status
from mcp_trove_crunchtools.tools.status import (
trove_get_chunks,
trove_list,
trove_log,
trove_quality,
trove_status,
)

if TYPE_CHECKING:
import sqlite3

EXPECTED_TOOL_COUNT = 9
EXPECTED_TOOL_COUNT = 10


class TestToolCount:
Expand Down Expand Up @@ -291,3 +297,45 @@ async def test_status_includes_last_run(self, in_memory_db: sqlite3.Connection)
assert status["last_run"]["status"] == "completed"

Path(path).unlink()

@pytest.mark.asyncio
async def test_quality_empty(self, in_memory_db: sqlite3.Connection) -> None:
result = await trove_quality()
assert result["total_errors"] == 0
assert result["unresolved"] == 0
assert result["resolved"] == 0
assert result["by_type"] == {}
assert result["errors"] == []

@pytest.mark.asyncio
async def test_quality_after_error(self, in_memory_db: sqlite3.Connection) -> None:
from mcp_trove_crunchtools import database as test_db

run_id = test_db.start_run("/nonexistent/test", 1)
test_db.insert_error(
run_id, "/nonexistent/test/bad.pdf",
"connection reset by peer", "transient",
)
test_db.insert_error(
run_id, "/nonexistent/test/corrupt.pdf",
"invalid PDF structure", "permanent",
)

result = await trove_quality()
assert result["total_errors"] == 2
assert result["unresolved"] == 2
assert result["resolved"] == 0
assert result["by_type"]["transient"] == 1
assert result["by_type"]["permanent"] == 1
assert len(result["errors"]) == 2

# Resolve one error and verify
test_db.resolve_errors("/nonexistent/test/bad.pdf")
result = await trove_quality(show_resolved=True)
assert result["resolved"] == 1
assert result["unresolved"] == 1

# Default (show_resolved=False) should only return unresolved
result = await trove_quality()
assert len(result["errors"]) == 1
assert result["errors"][0]["path"] == "/nonexistent/test/corrupt.pdf"
Loading