Skip to content

Commit 96ab2da

Browse files
mriechersclaude
andauthored
fix: Ingest scanner, bulk queue, error handling, metrics, and MCP job submission (#60)
* feat: Add recursive directory scanning to ingest scanner The scanner was only checking files in the top-level configured directories (/misc/, /SCC2SRT/, /wisconsinlife/) and skipping all subdirectories. This meant directories like /IWP/ were completely invisible. Now scans from root (/) and recurses into subdirectories up to 3 levels deep, respecting the ignore_directories config. - Add recursive scanning with MAX_SCAN_DEPTH=3 to _scan_directory - Split _parse_directory_listing to return (files, subdirs) tuple - Change default directories from curated list to ["/"] - Wire ignore_directories through router to scanner - Update default scan_time from midnight to 07:00 - Add migration 009 to update stored config values - Add 6 new tests for recursive scanning behavior [Agent: Main Assistant] Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Bulk queue shows error despite succeeding (fixes #45) The frontend expected the bulk queue response to be a flat array but the API returns a BulkQueueResponse wrapper object. Calling .filter() on the object threw TypeError, caught as a generic "Failed to queue" error — even though the files were actually queued on the backend. Fix: Read .queued and .failed from the BulkQueueResponse directly. Also fixes orphaned file records: if download_file succeeds but create_job fails, the file status is now reset to 'new' instead of being stuck as 'queued' with no job_id (invisible on Ready for Work). [Agent: Main Assistant] Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Return JSON errors instead of HTML for unhandled exceptions (fixes #30) Added global exception handler in api/main.py to ensure all unhandled server errors return JSON responses, not FastAPI's default HTML page. The frontend was getting "Unexpected token '<'" when trying to parse HTML error responses as JSON. Also fixes: - upload.py: mkdir now uses parents=True and catches PermissionError with a clear message instead of an unhandled exception - TranscriptUploader.tsx: gracefully handles non-JSON error responses instead of crashing on response.json() Closes #30, relates to #32 (Docker permissions) [Agent: Main Assistant] Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Persist transcript metrics to database on job processing (fixes #44) Transcript metrics (word_count, duration_minutes) were calculated in-memory for routing decisions but never saved to the database. If a job failed and was retried, the metrics remained None because the initial calculation was lost. Now persists metrics to the job record after calculation, with a guard to only write when the values are missing (avoids overwriting on retry of already-populated jobs). [Agent: Main Assistant] Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Relax rate limits and make configurable (fixes #41, #44, #39) Bumped RATE_EXPENSIVE from 10/min to 30/min and RATE_READ from 60/min to 120/min — the previous limits were too restrictive for a single-user internal editorial tool. Both are now configurable via RATE_LIMIT_EXPENSIVE and RATE_LIMIT_READ env vars. Also includes the transcript metrics persistence fix from the previous commit (workers now save word_count and duration_minutes to the job record after calculation). [Agent: Main Assistant] Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: Add submit_processing_job MCP tool (fixes #47) New tool allows queuing transcript processing jobs by Media ID from any MCP-connected workspace (Claude Desktop, other projects). The tool: 1. Checks for existing jobs to avoid duplicates 2. Searches local transcripts/ dir for matching files 3. Falls back to ingest server available_files 4. Queues via the appropriate API endpoint This enables workflows like scheduling Content Calendar entries and triggering processing in the same session without switching projects. [Agent: Main Assistant] Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Per-job cost tracking prevents race condition (fixes #43) The cost tracker was a single module-level global that got silently overwritten when multiple jobs ran concurrently. The second job's start_run_tracking() destroyed the first job's tracker, causing the first job to write actual_cost=0. Fix: replaced single global with a dict of trackers keyed by job_id. Each job now gets its own isolated tracker. The chat() method looks up the correct tracker by job_id, and end_run_tracking(job_id) retrieves the right one. Also: - Set TESTING=1 early in conftest.py to disable rate limiter in tests - Tests now pass: 307 passed, 0 failed [Agent: Main Assistant] Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style: Apply black formatting to ingest_scanner and MCP server Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Address PR review findings — error handling and info leaks Fixes 5 critical issues from PR review: 1. Global exception handler: return generic "Internal server error" instead of leaking str(exc) to clients; add exc_info=True to log 2. Upload error: remove filesystem path from PermissionError detail 3. MCP submit_processing_job: replace bare except:pass with logged warnings for duplicate check and ingest search failures 4. Worker metrics persistence: wrap in try/except so transient DB errors don't kill the whole job for a non-critical backfill 5. Orphaned file reset: include exception details in error log Also adds logging module to MCP server. [Agent: Main Assistant] Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Address remaining PR review findings Important issues: - Add finally cleanup for _run_trackers dict to prevent memory leak when jobs crash before end_run_tracking (worker.py) - Add 2 concurrency tests for per-job cost tracker isolation (test_llm.py) - Replace 3 pre-existing bare except:pass in MCP server helpers with debug-level logging (server.py) Suggestions: - Add DEBUG logging to _parse_file_metadata bare except (ingest_scanner.py) - Document ignore_directories matches by name only, not path (ingest_scanner.py) 309 tests passing. [Agent: Main Assistant] Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent cc49864 commit 96ab2da

16 files changed

Lines changed: 685 additions & 63 deletions

File tree

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
"""Update ingest scanner to scan from root directory
2+
3+
Revision ID: 009
4+
Revises: 008
5+
Create Date: 2026-03-31
6+
7+
Changes the ingest.directories config from a curated list of known
8+
directories (["/misc/", "/SCC2SRT/", "/wisconsinlife/"]) to scan from
9+
root (["/"]) with recursive subdirectory traversal. This ensures new
10+
directories like /IWP/ are automatically discovered.
11+
"""
12+
from typing import Sequence, Union
13+
14+
from alembic import op
15+
import sqlalchemy as sa
16+
import json
17+
18+
revision: str = '009'
19+
down_revision: Union[str, None] = '008'
20+
branch_labels: Union[str, Sequence[str], None] = None
21+
depends_on: Union[str, Sequence[str], None] = None
22+
23+
OLD_DIRECTORIES = ["/misc/", "/SCC2SRT/", "/wisconsinlife/"]
24+
NEW_DIRECTORIES = ["/"]
25+
26+
27+
def upgrade() -> None:
28+
# Update ingest.directories to scan from root
29+
op.execute(
30+
sa.text(
31+
"UPDATE config SET value = :new_val, updated_at = CURRENT_TIMESTAMP "
32+
"WHERE key = 'ingest.directories'"
33+
).bindparams(new_val=json.dumps(NEW_DIRECTORIES))
34+
)
35+
36+
# Update scan_time from midnight to 7 AM
37+
op.execute(
38+
sa.text(
39+
"UPDATE config SET value = '07:00', updated_at = CURRENT_TIMESTAMP "
40+
"WHERE key = 'ingest.scan_time'"
41+
)
42+
)
43+
44+
45+
def downgrade() -> None:
46+
# Restore original curated directory list
47+
op.execute(
48+
sa.text(
49+
"UPDATE config SET value = :old_val, updated_at = CURRENT_TIMESTAMP "
50+
"WHERE key = 'ingest.directories'"
51+
).bindparams(old_val=json.dumps(OLD_DIRECTORIES))
52+
)
53+
54+
# Restore midnight scan time
55+
op.execute(
56+
sa.text(
57+
"UPDATE config SET value = '00:00', updated_at = CURRENT_TIMESTAMP "
58+
"WHERE key = 'ingest.scan_time'"
59+
)
60+
)

api/main.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,18 @@ async def lifespan(app: FastAPI):
112112
app.add_exception_handler(RateLimitExceeded, rate_limit_exceeded_handler)
113113

114114

115+
# Global exception handler: ensure ALL errors return JSON, never HTML
116+
@app.exception_handler(Exception)
117+
async def global_exception_handler(request, exc):
118+
from fastapi.responses import JSONResponse
119+
120+
logger.error(f"Unhandled exception on {request.method} {request.url.path}: {exc}", exc_info=True)
121+
return JSONResponse(
122+
status_code=500,
123+
content={"detail": "Internal server error"},
124+
)
125+
126+
115127
@app.get("/")
116128
async def root():
117129
"""Health check endpoint."""

api/middleware/rate_limit.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919
)
2020

2121
# Rate limit strings used by endpoint decorators.
22-
RATE_EXPENSIVE = "10/minute"
23-
RATE_READ = "60/minute"
22+
# Configurable via env vars for different deployment scenarios.
23+
RATE_EXPENSIVE = os.getenv("RATE_LIMIT_EXPENSIVE", "30/minute")
24+
RATE_READ = os.getenv("RATE_LIMIT_READ", "120/minute")
2425

2526

2627
def rate_limit_exceeded_handler(request: Request, exc: RateLimitExceeded):

api/models/ingest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ class IngestConfig(BaseModel):
204204
last_scan_success: Optional[bool] = Field(None, description="Whether last scan succeeded")
205205
server_url: str = Field("https://mmingest.pbswi.wisc.edu/", description="Base URL of ingest server")
206206
directories: List[str] = Field(
207-
default_factory=lambda: ["/misc/", "/SCC2SRT/", "/wisconsinlife/"], description="Directories to scan"
207+
default_factory=lambda: ["/"], description="Root directories to scan (recurses into subdirectories)"
208208
)
209209
ignore_directories: List[str] = Field(default_factory=lambda: ["/promos/"], description="Directories to ignore")
210210

api/routers/ingest.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,11 @@ async def trigger_scan(
294294
scan_base_url = base_url or config.server_url
295295
scan_dirs = directories.split(",") if directories else config.directories
296296

297-
scanner = IngestScanner(base_url=scan_base_url, directories=scan_dirs)
297+
scanner = IngestScanner(
298+
base_url=scan_base_url,
299+
directories=scan_dirs,
300+
ignore_directories=config.ignore_directories,
301+
)
298302
result = await scanner.scan()
299303

300304
# Record scan result in config
@@ -682,6 +686,7 @@ async def queue_transcript(file_id: int) -> QueueTranscriptResponse:
682686
scanner = IngestScanner(
683687
base_url=config.server_url,
684688
directories=config.directories,
689+
ignore_directories=config.ignore_directories,
685690
)
686691

687692
# Download the file
@@ -742,6 +747,15 @@ async def queue_transcript(file_id: int) -> QueueTranscriptResponse:
742747

743748
except Exception as e:
744749
logger.error(f"Failed to create job for transcript {file_id}: {e}")
750+
# Reset file status so it doesn't become an orphan
751+
try:
752+
async with get_session() as session:
753+
await session.execute(
754+
text("UPDATE available_files SET status = 'new' WHERE id = :file_id"),
755+
{"file_id": file_id},
756+
)
757+
except Exception as reset_err:
758+
logger.error(f"Failed to reset status for file {file_id}: {reset_err}", exc_info=True)
745759
return QueueTranscriptResponse(
746760
success=False,
747761
file_id=file_id,

api/routers/upload.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,13 @@ async def upload_transcripts(
8181
raise HTTPException(status_code=400, detail=f"Batch size exceeds maximum of {MAX_BATCH_SIZE} files")
8282

8383
# Ensure transcripts directory exists
84-
TRANSCRIPTS_DIR.mkdir(exist_ok=True)
84+
try:
85+
TRANSCRIPTS_DIR.mkdir(parents=True, exist_ok=True)
86+
except PermissionError:
87+
raise HTTPException(
88+
status_code=500,
89+
detail="Server cannot write to transcripts directory. Check file permissions.",
90+
)
8591

8692
results: List[UploadStatus] = []
8793
uploaded_count = 0

api/services/ingest_config.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,12 @@
4040
DEFAULT_CONFIG = IngestConfig(
4141
enabled=True,
4242
scan_interval_hours=24,
43-
scan_time="00:00", # Midnight
43+
scan_time="07:00", # 7 AM daily scan
4444
last_scan_at=None,
4545
last_scan_success=None,
4646
server_url="https://mmingest.pbswi.wisc.edu/",
47-
# /exports/ removed - returns 404
48-
# /SCC2SRT/ and /wisconsinlife/ added - contain transcript files
49-
directories=["/misc/", "/SCC2SRT/", "/wisconsinlife/"],
47+
# Scan from root to auto-discover all directories (IWP, SCC2SRT, misc, etc.)
48+
directories=["/"],
5049
ignore_directories=["/promos/"],
5150
)
5251

api/services/ingest_scanner.py

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,16 @@ class IngestScanner:
7070
TRANSCRIPT_EXTENSIONS = {".srt", ".txt"}
7171
SCREENGRAB_EXTENSIONS = {".jpg", ".jpeg", ".png"}
7272

73+
# Maximum recursion depth for subdirectory scanning
74+
MAX_SCAN_DEPTH = 3
75+
7376
def __init__(
7477
self,
7578
base_url: str = "https://mmingest.pbswi.wisc.edu/",
7679
directories: Optional[List[str]] = None,
7780
timeout_seconds: int = 30,
7881
auth: Optional[tuple] = None,
82+
ignore_directories: Optional[List[str]] = None,
7983
):
8084
"""
8185
Initialize scanner.
@@ -85,11 +89,14 @@ def __init__(
8589
directories: List of directory paths to scan (e.g., ["/exports/", "/images/"])
8690
timeout_seconds: HTTP request timeout
8791
auth: Optional (username, password) tuple for basic auth
92+
ignore_directories: Directory names to skip during recursive scanning.
93+
Matched by name only (e.g., "promos" matches at any depth).
8894
"""
8995
self.base_url = base_url.rstrip("/")
9096
self.directories = directories or ["/"]
9197
self.timeout = timeout_seconds
9298
self.auth = auth
99+
self.ignore_directories = {d.strip("/").lower() for d in (ignore_directories or [])}
93100

94101
async def get_qc_passed_media_ids(self) -> List[str]:
95102
"""
@@ -354,16 +361,18 @@ async def _scan_directory(
354361
self,
355362
url: str,
356363
directory_path: str,
364+
depth: int = 0,
357365
) -> List[RemoteFile]:
358366
"""
359-
Fetch and parse a directory listing.
367+
Fetch and parse a directory listing, recursing into subdirectories.
360368
361369
Args:
362370
url: Full URL to the directory
363371
directory_path: Path relative to base URL
372+
depth: Current recursion depth (0 = top-level configured directory)
364373
365374
Returns:
366-
List of RemoteFile objects
375+
List of RemoteFile objects (including from subdirectories)
367376
"""
368377
files: List[RemoteFile] = []
369378

@@ -376,24 +385,42 @@ async def _scan_directory(
376385
response = await client.get(url, auth=auth)
377386
response.raise_for_status()
378387

379-
# Parse HTML
380-
files = self._parse_directory_listing(
388+
# Parse HTML into files and subdirectory links
389+
found_files, subdirs = self._parse_directory_listing(
381390
response.text,
382391
url,
383392
directory_path,
384393
)
394+
files.extend(found_files)
395+
396+
logger.info(f"Found {len(found_files)} files in {directory_path}")
397+
398+
# Recurse into subdirectories if within depth limit
399+
if depth < self.MAX_SCAN_DEPTH:
400+
for subdir_name, subdir_url in subdirs:
401+
subdir_path = f"{directory_path.rstrip('/')}/{subdir_name}/"
402+
403+
# Check against ignore list
404+
if subdir_name.lower() in self.ignore_directories:
405+
logger.debug(f"Skipping ignored directory: {subdir_path}")
406+
continue
407+
408+
try:
409+
sub_files = await self._scan_directory(subdir_url, subdir_path, depth + 1)
410+
files.extend(sub_files)
411+
except Exception as e:
412+
logger.warning(f"Failed to scan subdirectory {subdir_path}: {e}")
385413

386-
logger.info(f"Found {len(files)} files in {directory_path}")
387414
return files
388415

389416
def _parse_directory_listing(
390417
self,
391418
html: str,
392419
base_url: str,
393420
directory_path: str,
394-
) -> List[RemoteFile]:
421+
) -> tuple[List[RemoteFile], List[tuple[str, str]]]:
395422
"""
396-
Parse Apache/nginx autoindex HTML to extract file links.
423+
Parse Apache/nginx autoindex HTML to extract file links and subdirectories.
397424
398425
Typical Apache autoindex format:
399426
<a href="filename.srt">filename.srt</a> 12-Jan-2025 14:30 45K
@@ -404,9 +431,11 @@ def _parse_directory_listing(
404431
directory_path: Path for tracking
405432
406433
Returns:
407-
List of RemoteFile objects
434+
Tuple of (files, subdirectories) where subdirectories is a list
435+
of (name, url) tuples for recursive scanning
408436
"""
409437
files: List[RemoteFile] = []
438+
subdirs: List[tuple[str, str]] = []
410439
soup = BeautifulSoup(html, "html.parser")
411440

412441
for link in soup.find_all("a"):
@@ -420,8 +449,11 @@ def _parse_directory_listing(
420449
if href.startswith("?"):
421450
continue
422451
if href.endswith("/"):
423-
# This is a subdirectory - skip for now
424-
# (could recursively scan in future)
452+
# Subdirectory — collect for recursive scanning
453+
subdir_name = href.rstrip("/").split("/")[-1]
454+
if subdir_name and subdir_name not in ("..", "."):
455+
subdir_url = urljoin(base_url + "/", href)
456+
subdirs.append((subdir_name, subdir_url))
425457
continue
426458

427459
# Determine file type by extension
@@ -457,7 +489,7 @@ def _parse_directory_listing(
457489
)
458490
)
459491

460-
return files
492+
return files, subdirs
461493

462494
def _extract_media_id(self, filename: str) -> Optional[str]:
463495
"""
@@ -556,8 +588,8 @@ def _parse_file_metadata(self, link_element) -> tuple[Optional[int], Optional[da
556588
if len(parts) >= 3:
557589
file_size = self._parse_size(parts[-1])
558590

559-
except Exception:
560-
pass
591+
except Exception as e:
592+
logger.debug(f"Could not parse metadata for {link_element.get('href', '?')}: {e}")
561593

562594
return file_size, modified_at
563595

@@ -772,9 +804,11 @@ async def get_pending_screengrabs(self) -> List[dict]:
772804
def get_ingest_scanner(
773805
base_url: str = "https://mmingest.pbswi.wisc.edu/",
774806
directories: Optional[List[str]] = None,
807+
ignore_directories: Optional[List[str]] = None,
775808
) -> IngestScanner:
776809
"""Create IngestScanner instance with default config."""
777810
return IngestScanner(
778811
base_url=base_url,
779812
directories=directories or ["/"],
813+
ignore_directories=ignore_directories,
780814
)

0 commit comments

Comments
 (0)