Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions alembic/versions/009_update_ingest_scan_directories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Update ingest scanner to scan from root directory

Revision ID: 009
Revises: 008
Create Date: 2026-03-31

Changes the ingest.directories config from a curated list of known
directories (["/misc/", "/SCC2SRT/", "/wisconsinlife/"]) to scan from
root (["/"]) with recursive subdirectory traversal. This ensures new
directories like /IWP/ are automatically discovered.
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
import json

revision: str = '009'
down_revision: Union[str, None] = '008'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None

OLD_DIRECTORIES = ["/misc/", "/SCC2SRT/", "/wisconsinlife/"]
NEW_DIRECTORIES = ["/"]


def upgrade() -> None:
# Update ingest.directories to scan from root
op.execute(
sa.text(
"UPDATE config SET value = :new_val, updated_at = CURRENT_TIMESTAMP "
"WHERE key = 'ingest.directories'"
).bindparams(new_val=json.dumps(NEW_DIRECTORIES))
)

# Update scan_time from midnight to 7 AM
op.execute(
sa.text(
"UPDATE config SET value = '07:00', updated_at = CURRENT_TIMESTAMP "
"WHERE key = 'ingest.scan_time'"
)
)


def downgrade() -> None:
# Restore original curated directory list
op.execute(
sa.text(
"UPDATE config SET value = :old_val, updated_at = CURRENT_TIMESTAMP "
"WHERE key = 'ingest.directories'"
).bindparams(old_val=json.dumps(OLD_DIRECTORIES))
)

# Restore midnight scan time
op.execute(
sa.text(
"UPDATE config SET value = '00:00', updated_at = CURRENT_TIMESTAMP "
"WHERE key = 'ingest.scan_time'"
)
)
12 changes: 12 additions & 0 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ async def lifespan(app: FastAPI):
app.add_exception_handler(RateLimitExceeded, rate_limit_exceeded_handler)


# Global exception handler: ensure ALL errors return JSON, never HTML
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
from fastapi.responses import JSONResponse

logger.error(f"Unhandled exception on {request.method} {request.url.path}: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"},
)


@app.get("/")
async def root():
"""Health check endpoint."""
Expand Down
5 changes: 3 additions & 2 deletions api/middleware/rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
)

# Rate limit strings used by endpoint decorators.
RATE_EXPENSIVE = "10/minute"
RATE_READ = "60/minute"
# Configurable via env vars for different deployment scenarios.
RATE_EXPENSIVE = os.getenv("RATE_LIMIT_EXPENSIVE", "30/minute")
RATE_READ = os.getenv("RATE_LIMIT_READ", "120/minute")


def rate_limit_exceeded_handler(request: Request, exc: RateLimitExceeded):
Expand Down
2 changes: 1 addition & 1 deletion api/models/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class IngestConfig(BaseModel):
last_scan_success: Optional[bool] = Field(None, description="Whether last scan succeeded")
server_url: str = Field("https://mmingest.pbswi.wisc.edu/", description="Base URL of ingest server")
directories: List[str] = Field(
default_factory=lambda: ["/misc/", "/SCC2SRT/", "/wisconsinlife/"], description="Directories to scan"
default_factory=lambda: ["/"], description="Root directories to scan (recurses into subdirectories)"
)
ignore_directories: List[str] = Field(default_factory=lambda: ["/promos/"], description="Directories to ignore")

Expand Down
16 changes: 15 additions & 1 deletion api/routers/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,11 @@ async def trigger_scan(
scan_base_url = base_url or config.server_url
scan_dirs = directories.split(",") if directories else config.directories

scanner = IngestScanner(base_url=scan_base_url, directories=scan_dirs)
scanner = IngestScanner(
base_url=scan_base_url,
directories=scan_dirs,
ignore_directories=config.ignore_directories,
)
result = await scanner.scan()

# Record scan result in config
Expand Down Expand Up @@ -682,6 +686,7 @@ async def queue_transcript(file_id: int) -> QueueTranscriptResponse:
scanner = IngestScanner(
base_url=config.server_url,
directories=config.directories,
ignore_directories=config.ignore_directories,
)

# Download the file
Expand Down Expand Up @@ -742,6 +747,15 @@ async def queue_transcript(file_id: int) -> QueueTranscriptResponse:

except Exception as e:
logger.error(f"Failed to create job for transcript {file_id}: {e}")
# Reset file status so it doesn't become an orphan
try:
async with get_session() as session:
await session.execute(
text("UPDATE available_files SET status = 'new' WHERE id = :file_id"),
{"file_id": file_id},
)
except Exception as reset_err:
logger.error(f"Failed to reset status for file {file_id}: {reset_err}", exc_info=True)
return QueueTranscriptResponse(
success=False,
file_id=file_id,
Expand Down
8 changes: 7 additions & 1 deletion api/routers/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,13 @@ async def upload_transcripts(
raise HTTPException(status_code=400, detail=f"Batch size exceeds maximum of {MAX_BATCH_SIZE} files")

# Ensure transcripts directory exists
TRANSCRIPTS_DIR.mkdir(exist_ok=True)
try:
TRANSCRIPTS_DIR.mkdir(parents=True, exist_ok=True)
except PermissionError:
raise HTTPException(
status_code=500,
detail="Server cannot write to transcripts directory. Check file permissions.",
)

results: List[UploadStatus] = []
uploaded_count = 0
Expand Down
7 changes: 3 additions & 4 deletions api/services/ingest_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@
DEFAULT_CONFIG = IngestConfig(
enabled=True,
scan_interval_hours=24,
scan_time="00:00", # Midnight
scan_time="07:00", # 7 AM daily scan
last_scan_at=None,
last_scan_success=None,
server_url="https://mmingest.pbswi.wisc.edu/",
# /exports/ removed - returns 404
# /SCC2SRT/ and /wisconsinlife/ added - contain transcript files
directories=["/misc/", "/SCC2SRT/", "/wisconsinlife/"],
# Scan from root to auto-discover all directories (IWP, SCC2SRT, misc, etc.)
directories=["/"],
ignore_directories=["/promos/"],
)

Expand Down
60 changes: 47 additions & 13 deletions api/services/ingest_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,16 @@ class IngestScanner:
TRANSCRIPT_EXTENSIONS = {".srt", ".txt"}
SCREENGRAB_EXTENSIONS = {".jpg", ".jpeg", ".png"}

# Maximum recursion depth for subdirectory scanning
MAX_SCAN_DEPTH = 3

def __init__(
self,
base_url: str = "https://mmingest.pbswi.wisc.edu/",
directories: Optional[List[str]] = None,
timeout_seconds: int = 30,
auth: Optional[tuple] = None,
ignore_directories: Optional[List[str]] = None,
):
"""
Initialize scanner.
Expand All @@ -85,11 +89,14 @@ def __init__(
directories: List of directory paths to scan (e.g., ["/exports/", "/images/"])
timeout_seconds: HTTP request timeout
auth: Optional (username, password) tuple for basic auth
ignore_directories: Directory names to skip during recursive scanning.
Matched by name only (e.g., "promos" matches at any depth).
"""
self.base_url = base_url.rstrip("/")
self.directories = directories or ["/"]
self.timeout = timeout_seconds
self.auth = auth
self.ignore_directories = {d.strip("/").lower() for d in (ignore_directories or [])}

async def get_qc_passed_media_ids(self) -> List[str]:
"""
Expand Down Expand Up @@ -354,16 +361,18 @@ async def _scan_directory(
self,
url: str,
directory_path: str,
depth: int = 0,
) -> List[RemoteFile]:
"""
Fetch and parse a directory listing.
Fetch and parse a directory listing, recursing into subdirectories.

Args:
url: Full URL to the directory
directory_path: Path relative to base URL
depth: Current recursion depth (0 = top-level configured directory)

Returns:
List of RemoteFile objects
List of RemoteFile objects (including from subdirectories)
"""
files: List[RemoteFile] = []

Expand All @@ -376,24 +385,42 @@ async def _scan_directory(
response = await client.get(url, auth=auth)
response.raise_for_status()

# Parse HTML
files = self._parse_directory_listing(
# Parse HTML into files and subdirectory links
found_files, subdirs = self._parse_directory_listing(
response.text,
url,
directory_path,
)
files.extend(found_files)

logger.info(f"Found {len(found_files)} files in {directory_path}")

# Recurse into subdirectories if within depth limit
if depth < self.MAX_SCAN_DEPTH:
for subdir_name, subdir_url in subdirs:
subdir_path = f"{directory_path.rstrip('/')}/{subdir_name}/"

# Check against ignore list
if subdir_name.lower() in self.ignore_directories:
logger.debug(f"Skipping ignored directory: {subdir_path}")
continue

try:
sub_files = await self._scan_directory(subdir_url, subdir_path, depth + 1)
files.extend(sub_files)
except Exception as e:
logger.warning(f"Failed to scan subdirectory {subdir_path}: {e}")

logger.info(f"Found {len(files)} files in {directory_path}")
return files

def _parse_directory_listing(
self,
html: str,
base_url: str,
directory_path: str,
) -> List[RemoteFile]:
) -> tuple[List[RemoteFile], List[tuple[str, str]]]:
"""
Parse Apache/nginx autoindex HTML to extract file links.
Parse Apache/nginx autoindex HTML to extract file links and subdirectories.

Typical Apache autoindex format:
<a href="filename.srt">filename.srt</a> 12-Jan-2025 14:30 45K
Expand All @@ -404,9 +431,11 @@ def _parse_directory_listing(
directory_path: Path for tracking

Returns:
List of RemoteFile objects
Tuple of (files, subdirectories) where subdirectories is a list
of (name, url) tuples for recursive scanning
"""
files: List[RemoteFile] = []
subdirs: List[tuple[str, str]] = []
soup = BeautifulSoup(html, "html.parser")

for link in soup.find_all("a"):
Expand All @@ -420,8 +449,11 @@ def _parse_directory_listing(
if href.startswith("?"):
continue
if href.endswith("/"):
# This is a subdirectory - skip for now
# (could recursively scan in future)
# Subdirectory — collect for recursive scanning
subdir_name = href.rstrip("/").split("/")[-1]
if subdir_name and subdir_name not in ("..", "."):
subdir_url = urljoin(base_url + "/", href)
subdirs.append((subdir_name, subdir_url))
continue

# Determine file type by extension
Expand Down Expand Up @@ -457,7 +489,7 @@ def _parse_directory_listing(
)
)

return files
return files, subdirs

def _extract_media_id(self, filename: str) -> Optional[str]:
"""
Expand Down Expand Up @@ -556,8 +588,8 @@ def _parse_file_metadata(self, link_element) -> tuple[Optional[int], Optional[da
if len(parts) >= 3:
file_size = self._parse_size(parts[-1])

except Exception:
pass
except Exception as e:
logger.debug(f"Could not parse metadata for {link_element.get('href', '?')}: {e}")

return file_size, modified_at

Expand Down Expand Up @@ -772,9 +804,11 @@ async def get_pending_screengrabs(self) -> List[dict]:
def get_ingest_scanner(
base_url: str = "https://mmingest.pbswi.wisc.edu/",
directories: Optional[List[str]] = None,
ignore_directories: Optional[List[str]] = None,
) -> IngestScanner:
"""Create IngestScanner instance with default config."""
return IngestScanner(
base_url=base_url,
directories=directories or ["/"],
ignore_directories=ignore_directories,
)
Loading
Loading