diff --git a/app/src/api/jobs.ts b/app/src/api/jobs.ts new file mode 100644 index 00000000..471d0293 --- /dev/null +++ b/app/src/api/jobs.ts @@ -0,0 +1,51 @@ +import { api } from './client'; + +export interface JobExecution { + id: number; + job_id: string; + job_name: string; + status: 'PENDING' | 'RUNNING' | 'SUCCESS' | 'FAILED' | 'RETRYING' | 'DEAD'; + attempt: number; + max_retries: number; + error_message: string | null; + started_at: string | null; + finished_at: string | null; + next_retry_at: string | null; + duration_ms: number | null; + created_at: string | null; +} + +export interface JobStats { + PENDING: number; + RUNNING: number; + SUCCESS: number; + FAILED: number; + RETRYING: number; + DEAD: number; + total: number; +} + +export function listJobs(params?: { + limit?: number; + status?: string; + job_name?: string; +}): Promise { + const query = new URLSearchParams(); + if (params?.limit) query.set('limit', String(params.limit)); + if (params?.status) query.set('status', params.status); + if (params?.job_name) query.set('job_name', params.job_name); + const qs = query.toString(); + return api(`/jobs/${qs ? `?${qs}` : ''}`); +} + +export function getJobStats(): Promise { + return api('/jobs/stats'); +} + +export function getJobDetail(jobId: string): Promise { + return api(`/jobs/${jobId}`); +} + +export function retryJob(executionId: number): Promise { + return api(`/jobs/retry/${executionId}`, { method: 'POST' }); +} diff --git a/app/src/pages/Jobs.tsx b/app/src/pages/Jobs.tsx new file mode 100644 index 00000000..7b4a9cf6 --- /dev/null +++ b/app/src/pages/Jobs.tsx @@ -0,0 +1,282 @@ +import { useEffect, useState, useCallback } from 'react'; +import { + FinancialCard, + FinancialCardContent, + FinancialCardHeader, + FinancialCardTitle, +} from '@/components/ui/financial-card'; +import { Button } from '@/components/ui/button'; +import { Badge } from '@/components/ui/badge'; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from '@/components/ui/select'; +import { + Table, + TableBody, + TableCell, + TableHead, + TableHeader, + TableRow, +} from '@/components/ui/table'; +import { + Activity, + CheckCircle2, + XCircle, + Clock, + AlertTriangle, + Skull, + RefreshCw, + Loader2, + RotateCcw, +} from 'lucide-react'; +import { useToast } from '@/components/ui/use-toast'; +import { + listJobs, + getJobStats, + retryJob, + type JobExecution, + type JobStats, +} from '@/api/jobs'; + +const STATUS_CONFIG: Record< + string, + { label: string; variant: 'default' | 'secondary' | 'destructive' | 'outline'; icon: React.ElementType; color: string } +> = { + PENDING: { label: 'Pending', variant: 'secondary', icon: Clock, color: 'text-muted-foreground' }, + RUNNING: { label: 'Running', variant: 'default', icon: Loader2, color: 'text-blue-600' }, + SUCCESS: { label: 'Success', variant: 'outline', icon: CheckCircle2, color: 'text-green-600' }, + FAILED: { label: 'Failed', variant: 'destructive', icon: XCircle, color: 'text-red-500' }, + RETRYING: { label: 'Retrying', variant: 'secondary', icon: RefreshCw, color: 'text-amber-500' }, + DEAD: { label: 'Dead', variant: 'destructive', icon: Skull, color: 'text-red-700' }, +}; + +function StatusBadge({ status }: { status: string }) { + const cfg = STATUS_CONFIG[status] || STATUS_CONFIG.PENDING; + const Icon = cfg.icon; + return ( + + + {cfg.label} + + ); +} + +function StatCard({ + title, + value, + icon: Icon, + color, +}: { + title: string; + value: number; + icon: React.ElementType; + color: string; +}) { + return ( + + +
+ +
+
+

{value}

+

{title}

+
+
+
+ ); +} + +function formatDuration(ms: number | null): string { + if (ms === null || ms === undefined) return '—'; + if (ms < 1000) return `${ms}ms`; + return `${(ms / 1000).toFixed(1)}s`; +} + +function formatTime(iso: string | null): string { + if (!iso) return '—'; + try { + const d = new Date(iso); + return d.toLocaleString(undefined, { + month: 'short', + day: 'numeric', + hour: '2-digit', + minute: '2-digit', + second: '2-digit', + }); + } catch { + return iso; + } +} + +export default function Jobs() { + const [jobs, setJobs] = useState([]); + const [stats, setStats] = useState(null); + const [statusFilter, setStatusFilter] = useState('ALL'); + const [loading, setLoading] = useState(true); + const [retrying, setRetrying] = useState(null); + const { toast } = useToast(); + + const fetchData = useCallback(async () => { + try { + setLoading(true); + const filter = statusFilter === 'ALL' ? undefined : statusFilter; + const [jobList, jobStats] = await Promise.all([ + listJobs({ limit: 100, status: filter }), + getJobStats(), + ]); + setJobs(jobList); + setStats(jobStats); + } catch (err: unknown) { + toast({ + title: 'Error loading jobs', + description: err instanceof Error ? err.message : 'Unknown error', + variant: 'destructive', + }); + } finally { + setLoading(false); + } + }, [statusFilter, toast]); + + useEffect(() => { + void fetchData(); + const interval = setInterval(() => void fetchData(), 15000); // auto-refresh 15s + return () => clearInterval(interval); + }, [fetchData]); + + const handleRetry = async (id: number) => { + setRetrying(id); + try { + await retryJob(id); + toast({ title: 'Job queued for retry' }); + void fetchData(); + } catch (err: unknown) { + toast({ + title: 'Retry failed', + description: err instanceof Error ? err.message : 'Unknown error', + variant: 'destructive', + }); + } finally { + setRetrying(null); + } + }; + + return ( +
+ {/* Header */} +
+
+

Job Monitor

+

+ Background job executions, retries & health +

+
+ +
+ + {/* Stats cards */} + {stats && ( +
+ + + + + + +
+ )} + + {/* Filter + table */} + + + Recent Executions + + + + {loading && jobs.length === 0 ? ( +
+ Loading jobs… +
+ ) : jobs.length === 0 ? ( +
+ +

No job executions found

+
+ ) : ( +
+ + + + Job Name + Status + Attempt + Duration + Started + Error + Actions + + + + {jobs.map((job) => ( + + + {job.job_name} + + + + + + {job.attempt}/{job.max_retries + 1} + + {formatDuration(job.duration_ms)} + {formatTime(job.started_at)} + + {job.error_message || '—'} + + + {job.status === 'DEAD' && ( + + )} + + + ))} + +
+
+ )} +
+
+
+ ); +} diff --git a/packages/backend/app/__init__.py b/packages/backend/app/__init__.py index cdf76b45..8b5527de 100644 --- a/packages/backend/app/__init__.py +++ b/packages/backend/app/__init__.py @@ -49,6 +49,11 @@ def create_app(settings: Settings | None = None) -> Flask: CORS(app, resources={r"*": {"origins": "*"}}, supports_credentials=True) # Redis (already global) + # Register default job alert callbacks + from .services.job_alerts import register_default_alerts + + register_default_alerts() + # Blueprint routes register_routes(app) diff --git a/packages/backend/app/db/schema.sql b/packages/backend/app/db/schema.sql index 410189de..c6cdecf4 100644 --- a/packages/backend/app/db/schema.sql +++ b/packages/backend/app/db/schema.sql @@ -123,3 +123,29 @@ CREATE TABLE IF NOT EXISTS audit_logs ( action VARCHAR(100) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW() ); + +-- Job execution tracking for resilient retry & monitoring +DO $$ BEGIN + CREATE TYPE job_status AS ENUM ('PENDING','RUNNING','SUCCESS','FAILED','RETRYING','DEAD'); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; + +CREATE TABLE IF NOT EXISTS job_executions ( + id SERIAL PRIMARY KEY, + job_id VARCHAR(200) NOT NULL, + job_name VARCHAR(200) NOT NULL, + status job_status NOT NULL DEFAULT 'PENDING', + attempt INT NOT NULL DEFAULT 1, + max_retries INT NOT NULL DEFAULT 3, + error_message TEXT, + error_traceback TEXT, + started_at TIMESTAMP, + finished_at TIMESTAMP, + next_retry_at TIMESTAMP, + duration_ms INT, + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_job_executions_job_id ON job_executions(job_id); +CREATE INDEX IF NOT EXISTS idx_job_executions_status ON job_executions(status); +CREATE INDEX IF NOT EXISTS idx_job_executions_created ON job_executions(created_at DESC); diff --git a/packages/backend/app/models.py b/packages/backend/app/models.py index 64d44810..f7be0a0e 100644 --- a/packages/backend/app/models.py +++ b/packages/backend/app/models.py @@ -3,6 +3,9 @@ from sqlalchemy import Enum as SAEnum from .extensions import db +# Re-export job models so `import app.models` registers them with SQLAlchemy +from .models.job_execution import JobExecution, JobStatus # noqa: F401 + class Role(str, Enum): USER = "USER" diff --git a/packages/backend/app/models/__init__.py b/packages/backend/app/models/__init__.py new file mode 100644 index 00000000..6c2cc39c --- /dev/null +++ b/packages/backend/app/models/__init__.py @@ -0,0 +1,2 @@ +"""Job execution models package.""" +from .job_execution import JobExecution, JobStatus # noqa: F401 diff --git a/packages/backend/app/models/job_execution.py b/packages/backend/app/models/job_execution.py new file mode 100644 index 00000000..1ad5d394 --- /dev/null +++ b/packages/backend/app/models/job_execution.py @@ -0,0 +1,59 @@ +"""Job execution tracking model for resilient retry & monitoring.""" + +from datetime import datetime +from enum import Enum + +from sqlalchemy import Enum as SAEnum, Text + +from ..extensions import db + + +class JobStatus(str, Enum): + PENDING = "PENDING" + RUNNING = "RUNNING" + SUCCESS = "SUCCESS" + FAILED = "FAILED" + RETRYING = "RETRYING" + DEAD = "DEAD" # exhausted all retries + + +class JobExecution(db.Model): + """Tracks every background job execution with retry metadata.""" + + __tablename__ = "job_executions" + + id = db.Column(db.Integer, primary_key=True) + job_id = db.Column(db.String(200), nullable=False, index=True) + job_name = db.Column(db.String(200), nullable=False) + status = db.Column( + SAEnum(JobStatus, name="job_status", create_constraint=False), + nullable=False, + default=JobStatus.PENDING, + ) + attempt = db.Column(db.Integer, nullable=False, default=1) + max_retries = db.Column(db.Integer, nullable=False, default=3) + error_message = db.Column(Text, nullable=True) + error_traceback = db.Column(Text, nullable=True) + started_at = db.Column(db.DateTime, nullable=True) + finished_at = db.Column(db.DateTime, nullable=True) + next_retry_at = db.Column(db.DateTime, nullable=True) + duration_ms = db.Column(db.Integer, nullable=True) + created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + def to_dict(self): + return { + "id": self.id, + "job_id": self.job_id, + "job_name": self.job_name, + "status": self.status.value if self.status else None, + "attempt": self.attempt, + "max_retries": self.max_retries, + "error_message": self.error_message, + "started_at": self.started_at.isoformat() if self.started_at else None, + "finished_at": self.finished_at.isoformat() if self.finished_at else None, + "next_retry_at": ( + self.next_retry_at.isoformat() if self.next_retry_at else None + ), + "duration_ms": self.duration_ms, + "created_at": self.created_at.isoformat() if self.created_at else None, + } diff --git a/packages/backend/app/routes/__init__.py b/packages/backend/app/routes/__init__.py index f13b0f89..8d585f25 100644 --- a/packages/backend/app/routes/__init__.py +++ b/packages/backend/app/routes/__init__.py @@ -7,6 +7,7 @@ from .categories import bp as categories_bp from .docs import bp as docs_bp from .dashboard import bp as dashboard_bp +from .jobs import bp as jobs_bp def register_routes(app: Flask): @@ -18,3 +19,4 @@ def register_routes(app: Flask): app.register_blueprint(categories_bp, url_prefix="/categories") app.register_blueprint(docs_bp, url_prefix="/docs") app.register_blueprint(dashboard_bp, url_prefix="/dashboard") + app.register_blueprint(jobs_bp, url_prefix="/jobs") diff --git a/packages/backend/app/routes/jobs.py b/packages/backend/app/routes/jobs.py new file mode 100644 index 00000000..6577c237 --- /dev/null +++ b/packages/backend/app/routes/jobs.py @@ -0,0 +1,83 @@ +"""Job monitoring API endpoints. + +Provides: +- GET /jobs/ – list recent executions (filterable) +- GET /jobs/stats – aggregate status counts +- GET /jobs/ – single execution detail +- POST /jobs/retry/ – manually re-trigger a DEAD job +""" + +from flask import Blueprint, jsonify, request +from flask_jwt_extended import jwt_required + +from ..models.job_execution import JobExecution, JobStatus +from ..services.job_retry import ( + get_cached_status, + get_job_stats, + get_recent_executions, +) +from ..extensions import db + +bp = Blueprint("jobs", __name__) + + +@bp.get("/") +@jwt_required() +def list_jobs(): + """Return recent job executions with optional filters.""" + limit = request.args.get("limit", 50, type=int) + status = request.args.get("status", None) + job_name = request.args.get("job_name", None) + limit = min(max(limit, 1), 200) + + executions = get_recent_executions(limit=limit, status=status, job_name=job_name) + return jsonify([e.to_dict() for e in executions]), 200 + + +@bp.get("/stats") +@jwt_required() +def job_stats(): + """Aggregate job status counts for the dashboard.""" + stats = get_job_stats() + return jsonify(stats), 200 + + +@bp.get("/") +@jwt_required() +def job_detail(job_id: str): + """Get detailed info for a specific job execution.""" + # Try Redis cache first for speed + cached = get_cached_status(job_id) + + # Always fetch full record from DB for detail view + execution = JobExecution.query.filter_by(job_id=job_id).first() + if not execution: + if cached: + return jsonify(cached), 200 + return jsonify(error="Job not found"), 404 + + return jsonify(execution.to_dict()), 200 + + +@bp.post("/retry/") +@jwt_required() +def retry_job(execution_id: int): + """Manually reset a DEAD job to PENDING for re-execution. + + Note: actual re-execution requires the scheduler or a manual trigger; + this endpoint resets the status so the monitoring dashboard reflects it. + """ + execution = db.session.get(JobExecution, execution_id) + if not execution: + return jsonify(error="Execution not found"), 404 + if execution.status != JobStatus.DEAD: + return jsonify(error="Only DEAD jobs can be retried"), 400 + + execution.status = JobStatus.PENDING + execution.attempt = 0 + execution.error_message = None + execution.error_traceback = None + execution.next_retry_at = None + db.session.commit() + + return jsonify(execution.to_dict()), 200 diff --git a/packages/backend/app/services/job_alerts.py b/packages/backend/app/services/job_alerts.py new file mode 100644 index 00000000..d8aac62a --- /dev/null +++ b/packages/backend/app/services/job_alerts.py @@ -0,0 +1,91 @@ +"""Pluggable alerting for failed/dead background jobs. + +Ships with: +- Console/log alert (always active) +- Email alert (active when SMTP is configured) +- Webhook alert (active when JOB_ALERT_WEBHOOK_URL env var is set) + +Register additional callbacks via ``job_retry.register_alert_callback``. +""" + +import json +import logging +import os +from typing import TYPE_CHECKING + +import requests + +from ..models.job_execution import JobStatus + +if TYPE_CHECKING: + from ..models.job_execution import JobExecution + +logger = logging.getLogger("finmind.job_alerts") + + +def log_alert(execution: "JobExecution"): + """Always-on: log to application logger.""" + level = logging.ERROR if execution.status == JobStatus.DEAD else logging.WARNING + logger.log( + level, + "JOB ALERT [%s] %s — attempt %d/%d — %s: %s", + execution.status.value, + execution.job_name, + execution.attempt, + execution.max_retries, + execution.job_id, + execution.error_message or "no error message", + ) + + +def email_alert(execution: "JobExecution"): + """Send email alert for DEAD jobs when SMTP is configured.""" + if execution.status != JobStatus.DEAD: + return + try: + from .reminders import send_email + from ..config import Settings + + cfg = Settings() + if not cfg.smtp_url or not cfg.email_from: + return + subject = f"[FinMind] Job DEAD: {execution.job_name}" + body = ( + f"Job: {execution.job_name}\n" + f"ID: {execution.job_id}\n" + f"Attempts: {execution.attempt}/{execution.max_retries}\n" + f"Error: {execution.error_message}\n\n" + f"Traceback:\n{execution.error_traceback or 'N/A'}" + ) + send_email(cfg.email_from, subject, body) + except Exception: + logger.debug("Email alert failed for %s", execution.job_id) + + +def webhook_alert(execution: "JobExecution"): + """POST to an external webhook URL for DEAD or FAILED jobs.""" + url = os.getenv("JOB_ALERT_WEBHOOK_URL") + if not url: + return + try: + payload = { + "event": "job_alert", + "status": execution.status.value, + "job_name": execution.job_name, + "job_id": execution.job_id, + "attempt": execution.attempt, + "max_retries": execution.max_retries, + "error": execution.error_message, + } + requests.post(url, json=payload, timeout=5) + except Exception: + logger.debug("Webhook alert failed for %s", execution.job_id) + + +def register_default_alerts(): + """Wire up all built-in alert callbacks.""" + from .job_retry import register_alert_callback + + register_alert_callback(log_alert) + register_alert_callback(email_alert) + register_alert_callback(webhook_alert) diff --git a/packages/backend/app/services/job_retry.py b/packages/backend/app/services/job_retry.py new file mode 100644 index 00000000..24474556 --- /dev/null +++ b/packages/backend/app/services/job_retry.py @@ -0,0 +1,288 @@ +"""Resilient job retry engine with exponential backoff. + +Wraps any callable into a tracked, retryable background job with: +- Exponential backoff (base * 2^attempt) with jitter +- Per-job execution logging to DB +- Dead-letter status after max retries exhausted +- Redis-backed real-time status cache for fast polling +- Pluggable alert hooks for failed/dead jobs +""" + +import json +import logging +import math +import random +import time +import traceback +import uuid +from datetime import datetime, timedelta +from functools import wraps +from typing import Any, Callable, Optional + +from ..extensions import db, redis_client +from ..models.job_execution import JobExecution, JobStatus + +logger = logging.getLogger("finmind.jobs") + +# --------------------------------------------------------------------------- +# Configuration defaults +# --------------------------------------------------------------------------- +DEFAULT_MAX_RETRIES = 3 +DEFAULT_BACKOFF_BASE_SECONDS = 10 +DEFAULT_BACKOFF_MAX_SECONDS = 600 # 10 min ceiling +JITTER_FACTOR = 0.25 # ±25 % jitter + +# Redis key prefix for live status +_REDIS_PREFIX = "finmind:job:" + +# --------------------------------------------------------------------------- +# Alert callbacks (pluggable) +# --------------------------------------------------------------------------- +_alert_callbacks: list[Callable[[JobExecution], None]] = [] + + +def register_alert_callback(fn: Callable[[JobExecution], None]): + """Register a callback invoked when a job fails or becomes DEAD.""" + _alert_callbacks.append(fn) + + +def _fire_alerts(execution: JobExecution): + for cb in _alert_callbacks: + try: + cb(execution) + except Exception: + logger.exception("Alert callback error for job %s", execution.job_id) + + +# --------------------------------------------------------------------------- +# Backoff calculator +# --------------------------------------------------------------------------- +def compute_backoff( + attempt: int, + base: float = DEFAULT_BACKOFF_BASE_SECONDS, + cap: float = DEFAULT_BACKOFF_MAX_SECONDS, +) -> float: + """Exponential backoff with jitter. attempt is 0-indexed retry number.""" + delay = min(base * (2**attempt), cap) + jitter = delay * JITTER_FACTOR * (2 * random.random() - 1) + return max(0, delay + jitter) + + +# --------------------------------------------------------------------------- +# Core execution wrapper +# --------------------------------------------------------------------------- +def execute_with_retry( + fn: Callable[..., Any], + args: tuple = (), + kwargs: dict | None = None, + job_name: str | None = None, + max_retries: int = DEFAULT_MAX_RETRIES, + backoff_base: float = DEFAULT_BACKOFF_BASE_SECONDS, + app=None, +) -> JobExecution: + """Run *fn* with automatic retry + tracking. + + This is the **synchronous** entry point. For APScheduler integration + the ``resilient_job`` decorator is preferred. + + Returns the final :class:`JobExecution` row. + """ + kwargs = kwargs or {} + job_id = f"{job_name or fn.__name__}_{uuid.uuid4().hex[:12]}" + resolved_name = job_name or fn.__name__ + + ctx = app.app_context() if app else _noop_ctx() + + with ctx: + execution = JobExecution( + job_id=job_id, + job_name=resolved_name, + status=JobStatus.PENDING, + attempt=1, + max_retries=max_retries, + ) + db.session.add(execution) + db.session.commit() + _cache_status(execution) + + for attempt in range(max_retries + 1): + execution.attempt = attempt + 1 + execution.status = JobStatus.RUNNING + execution.started_at = datetime.utcnow() + execution.next_retry_at = None + db.session.commit() + _cache_status(execution) + + t0 = time.monotonic() + try: + fn(*args, **kwargs) + elapsed = int((time.monotonic() - t0) * 1000) + execution.status = JobStatus.SUCCESS + execution.finished_at = datetime.utcnow() + execution.duration_ms = elapsed + execution.error_message = None + execution.error_traceback = None + db.session.commit() + _cache_status(execution) + logger.info( + "Job %s succeeded on attempt %d (%d ms)", + job_id, + attempt + 1, + elapsed, + ) + return execution + except Exception as exc: + elapsed = int((time.monotonic() - t0) * 1000) + execution.duration_ms = elapsed + execution.error_message = str(exc)[:2000] + execution.error_traceback = traceback.format_exc()[:4000] + execution.finished_at = datetime.utcnow() + + if attempt < max_retries: + delay = compute_backoff(attempt, backoff_base) + execution.status = JobStatus.RETRYING + execution.next_retry_at = datetime.utcnow() + timedelta( + seconds=delay + ) + db.session.commit() + _cache_status(execution) + _fire_alerts(execution) + logger.warning( + "Job %s attempt %d failed (%s), retrying in %.1fs", + job_id, + attempt + 1, + exc, + delay, + ) + time.sleep(delay) + else: + execution.status = JobStatus.DEAD + db.session.commit() + _cache_status(execution) + _fire_alerts(execution) + logger.error( + "Job %s DEAD after %d attempts: %s", + job_id, + max_retries + 1, + exc, + ) + return execution + + return execution # unreachable, but keeps type-checkers happy + + +# --------------------------------------------------------------------------- +# Decorator for APScheduler jobs +# --------------------------------------------------------------------------- +def resilient_job( + max_retries: int = DEFAULT_MAX_RETRIES, + backoff_base: float = DEFAULT_BACKOFF_BASE_SECONDS, + job_name: str | None = None, +): + """Decorator that wraps an APScheduler job function with retry logic. + + Usage:: + + @resilient_job(max_retries=5, backoff_base=15) + def my_scheduled_task(): + ... + """ + + def decorator(fn: Callable): + @wraps(fn) + def wrapper(*args, **kwargs): + from flask import current_app + + app = current_app._get_current_object() # type: ignore[attr-defined] + return execute_with_retry( + fn, + args=args, + kwargs=kwargs, + job_name=job_name or fn.__name__, + max_retries=max_retries, + backoff_base=backoff_base, + app=app, + ) + + # Preserve original name so APScheduler id works + wrapper.__wrapped__ = fn # type: ignore[attr-defined] + return wrapper + + return decorator + + +# --------------------------------------------------------------------------- +# Redis status cache helpers +# --------------------------------------------------------------------------- +def _cache_status(execution: JobExecution, ttl: int = 86400): + """Push lightweight status into Redis for fast API reads.""" + try: + key = f"{_REDIS_PREFIX}{execution.job_id}" + data = { + "id": execution.id, + "job_id": execution.job_id, + "job_name": execution.job_name, + "status": execution.status.value, + "attempt": execution.attempt, + "max_retries": execution.max_retries, + "error_message": execution.error_message or "", + "updated_at": datetime.utcnow().isoformat(), + } + redis_client.set(key, json.dumps(data), ex=ttl) + except Exception: + logger.debug("Redis cache write failed for %s", execution.job_id) + + +def get_cached_status(job_id: str) -> dict | None: + """Read cached status from Redis (or None).""" + try: + raw = redis_client.get(f"{_REDIS_PREFIX}{job_id}") + return json.loads(raw) if raw else None + except Exception: + return None + + +# --------------------------------------------------------------------------- +# Query helpers +# --------------------------------------------------------------------------- +def get_recent_executions( + limit: int = 50, + status: Optional[str] = None, + job_name: Optional[str] = None, +) -> list[JobExecution]: + """Return recent job executions, optionally filtered.""" + q = JobExecution.query.order_by(JobExecution.created_at.desc()) + if status: + try: + q = q.filter(JobExecution.status == JobStatus(status)) + except ValueError: + pass + if job_name: + q = q.filter(JobExecution.job_name == job_name) + return q.limit(limit).all() + + +def get_job_stats() -> dict: + """Aggregate counts by status for the dashboard summary.""" + rows = ( + db.session.query(JobExecution.status, db.func.count(JobExecution.id)) + .group_by(JobExecution.status) + .all() + ) + stats = {s.value: 0 for s in JobStatus} + for status, count in rows: + key = status.value if isinstance(status, JobStatus) else status + stats[key] = count + stats["total"] = sum(stats.values()) + return stats + + +# --------------------------------------------------------------------------- +# Noop context manager +# --------------------------------------------------------------------------- +class _noop_ctx: + def __enter__(self): + return self + + def __exit__(self, *a): + pass diff --git a/packages/backend/tests/test_job_alerts.py b/packages/backend/tests/test_job_alerts.py new file mode 100644 index 00000000..0949456c --- /dev/null +++ b/packages/backend/tests/test_job_alerts.py @@ -0,0 +1,81 @@ +"""Tests for job alert callbacks.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from app.models.job_execution import JobExecution, JobStatus +from app.services.job_alerts import ( + email_alert, + log_alert, + webhook_alert, +) + + +def _make_execution(status=JobStatus.DEAD, **overrides): + """Create a mock-like JobExecution without DB.""" + defaults = dict( + id=1, + job_id="test_alert_123", + job_name="test_job", + status=status, + attempt=3, + max_retries=3, + error_message="something broke", + error_traceback="Traceback ...", + ) + defaults.update(overrides) + e = MagicMock(spec=JobExecution) + for k, v in defaults.items(): + setattr(e, k, v) + return e + + +class TestLogAlert: + def test_logs_dead_as_error(self): + execution = _make_execution(status=JobStatus.DEAD) + with patch("app.services.job_alerts.logger") as mock_logger: + log_alert(execution) + mock_logger.log.assert_called_once() + # First arg is log level (ERROR = 40) + assert mock_logger.log.call_args[0][0] == 40 + + def test_logs_retrying_as_warning(self): + execution = _make_execution(status=JobStatus.RETRYING) + with patch("app.services.job_alerts.logger") as mock_logger: + log_alert(execution) + assert mock_logger.log.call_args[0][0] == 30 # WARNING + + +class TestWebhookAlert: + @patch.dict("os.environ", {"JOB_ALERT_WEBHOOK_URL": "https://hooks.example.com/job"}) + @patch("app.services.job_alerts.requests.post") + def test_sends_webhook(self, mock_post): + execution = _make_execution() + webhook_alert(execution) + mock_post.assert_called_once() + payload = mock_post.call_args[1]["json"] + assert payload["status"] == "DEAD" + assert payload["job_name"] == "test_job" + + @patch.dict("os.environ", {}, clear=True) + def test_skips_without_url(self): + execution = _make_execution() + # Should not raise + webhook_alert(execution) + + +class TestEmailAlert: + def test_skips_non_dead(self): + execution = _make_execution(status=JobStatus.RETRYING) + # Should not raise or send + email_alert(execution) + + @patch("app.services.job_alerts.send_email") + def test_sends_for_dead(self, mock_send): + execution = _make_execution(status=JobStatus.DEAD) + with patch("app.services.job_alerts.Settings") as MockSettings: + MockSettings.return_value.smtp_url = "smtp+ssl://u:p@h:465" + MockSettings.return_value.email_from = "admin@test.com" + email_alert(execution) + mock_send.assert_called_once() diff --git a/packages/backend/tests/test_job_retry.py b/packages/backend/tests/test_job_retry.py new file mode 100644 index 00000000..2cbdb181 --- /dev/null +++ b/packages/backend/tests/test_job_retry.py @@ -0,0 +1,203 @@ +"""Tests for the resilient job retry system.""" + +import time +from unittest.mock import MagicMock, patch + +import pytest + +from app.models.job_execution import JobExecution, JobStatus +from app.services.job_retry import ( + DEFAULT_BACKOFF_BASE_SECONDS, + compute_backoff, + execute_with_retry, + get_job_stats, + get_recent_executions, + register_alert_callback, + _alert_callbacks, +) +from app.extensions import db + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- +@pytest.fixture(autouse=True) +def _clean_alert_callbacks(): + """Ensure alert callbacks don't leak between tests.""" + original = _alert_callbacks.copy() + yield + _alert_callbacks.clear() + _alert_callbacks.extend(original) + + +# --------------------------------------------------------------------------- +# Backoff tests +# --------------------------------------------------------------------------- +class TestComputeBackoff: + def test_first_attempt_near_base(self): + delay = compute_backoff(0, base=10, cap=600) + # With 25% jitter: 7.5 – 12.5 + assert 5 <= delay <= 15 + + def test_exponential_growth(self): + d0 = compute_backoff(0, base=10, cap=600) + d2 = compute_backoff(2, base=10, cap=600) + # Attempt 2 → base * 4 = 40, so should be much larger than attempt 0 + assert d2 > d0 + + def test_cap_respected(self): + delay = compute_backoff(20, base=10, cap=100) + # Even with jitter, should not exceed cap + 25% + assert delay <= 100 * 1.30 + + def test_never_negative(self): + for attempt in range(10): + assert compute_backoff(attempt) >= 0 + + +# --------------------------------------------------------------------------- +# execute_with_retry tests +# --------------------------------------------------------------------------- +class TestExecuteWithRetry: + def test_success_on_first_try(self, app_fixture): + fn = MagicMock(return_value=None) + with app_fixture.app_context(): + result = execute_with_retry( + fn, job_name="test_success", max_retries=3, backoff_base=0.01, + app=app_fixture, + ) + assert result.status == JobStatus.SUCCESS + assert result.attempt == 1 + assert result.duration_ms is not None + assert result.error_message is None + fn.assert_called_once() + + def test_retry_then_success(self, app_fixture): + call_count = {"n": 0} + + def flaky(): + call_count["n"] += 1 + if call_count["n"] < 3: + raise RuntimeError("transient failure") + + with app_fixture.app_context(): + result = execute_with_retry( + flaky, job_name="test_flaky", max_retries=3, backoff_base=0.01, + app=app_fixture, + ) + assert result.status == JobStatus.SUCCESS + assert result.attempt == 3 + assert call_count["n"] == 3 + + def test_dead_after_max_retries(self, app_fixture): + fn = MagicMock(side_effect=ValueError("permanent error")) + with app_fixture.app_context(): + result = execute_with_retry( + fn, job_name="test_dead", max_retries=2, backoff_base=0.01, + app=app_fixture, + ) + assert result.status == JobStatus.DEAD + assert result.attempt == 3 # 1 initial + 2 retries + assert "permanent error" in result.error_message + assert fn.call_count == 3 + + def test_error_traceback_captured(self, app_fixture): + def bad(): + raise TypeError("type issue") + + with app_fixture.app_context(): + result = execute_with_retry( + bad, job_name="test_tb", max_retries=0, backoff_base=0.01, + app=app_fixture, + ) + assert result.status == JobStatus.DEAD + assert "TypeError" in result.error_traceback + + def test_alert_fired_on_failure(self, app_fixture): + alert_mock = MagicMock() + register_alert_callback(alert_mock) + + with app_fixture.app_context(): + execute_with_retry( + MagicMock(side_effect=RuntimeError("boom")), + job_name="test_alert", + max_retries=1, + backoff_base=0.01, + app=app_fixture, + ) + # Called on RETRYING + DEAD = 2 times + assert alert_mock.call_count == 2 + + def test_execution_persisted_to_db(self, app_fixture): + with app_fixture.app_context(): + execute_with_retry( + lambda: None, + job_name="test_persist", + max_retries=0, + backoff_base=0.01, + app=app_fixture, + ) + rows = JobExecution.query.filter_by(job_name="test_persist").all() + assert len(rows) == 1 + assert rows[0].status == JobStatus.SUCCESS + + +# --------------------------------------------------------------------------- +# Query helper tests +# --------------------------------------------------------------------------- +class TestQueryHelpers: + def _seed(self, app_fixture, statuses): + with app_fixture.app_context(): + for i, s in enumerate(statuses): + e = JobExecution( + job_id=f"seed_{i}", + job_name="seeded", + status=s, + attempt=1, + max_retries=3, + ) + db.session.add(e) + db.session.commit() + + def test_get_recent_executions_limit(self, app_fixture): + self._seed(app_fixture, [JobStatus.SUCCESS] * 5) + with app_fixture.app_context(): + results = get_recent_executions(limit=3) + assert len(results) == 3 + + def test_get_recent_executions_filter_status(self, app_fixture): + self._seed( + app_fixture, + [JobStatus.SUCCESS, JobStatus.DEAD, JobStatus.SUCCESS], + ) + with app_fixture.app_context(): + results = get_recent_executions(status="DEAD") + assert all(r.status == JobStatus.DEAD for r in results) + + def test_get_job_stats(self, app_fixture): + self._seed( + app_fixture, + [JobStatus.SUCCESS, JobStatus.SUCCESS, JobStatus.DEAD], + ) + with app_fixture.app_context(): + stats = get_job_stats() + assert stats["SUCCESS"] == 2 + assert stats["DEAD"] == 1 + assert stats["total"] == 3 + + +# --------------------------------------------------------------------------- +# Decorator tests +# --------------------------------------------------------------------------- +class TestResilientJobDecorator: + def test_decorator_wraps_function(self, app_fixture): + from app.services.job_retry import resilient_job + + @resilient_job(max_retries=1, backoff_base=0.01) + def my_task(): + return "done" + + with app_fixture.app_context(): + result = my_task() + assert result.status == JobStatus.SUCCESS + assert result.job_name == "my_task" diff --git a/packages/backend/tests/test_jobs_api.py b/packages/backend/tests/test_jobs_api.py new file mode 100644 index 00000000..e249901e --- /dev/null +++ b/packages/backend/tests/test_jobs_api.py @@ -0,0 +1,111 @@ +"""Tests for the /jobs API endpoints.""" + +import pytest + +from app.extensions import db +from app.models.job_execution import JobExecution, JobStatus + + +def _seed_jobs(app): + """Create sample job executions.""" + with app.app_context(): + for i, status in enumerate( + [JobStatus.SUCCESS, JobStatus.DEAD, JobStatus.RUNNING, JobStatus.SUCCESS] + ): + e = JobExecution( + job_id=f"api_test_{i}", + job_name="test_job", + status=status, + attempt=1 if status == JobStatus.SUCCESS else 3, + max_retries=3, + error_message="fail" if status == JobStatus.DEAD else None, + ) + db.session.add(e) + db.session.commit() + + +class TestJobsListEndpoint: + def test_list_requires_auth(self, client): + r = client.get("/jobs/") + assert r.status_code in (401, 422) + + def test_list_returns_jobs(self, client, auth_header, app_fixture): + _seed_jobs(app_fixture) + r = client.get("/jobs/", headers=auth_header) + assert r.status_code == 200 + data = r.get_json() + assert isinstance(data, list) + assert len(data) == 4 + + def test_list_filter_by_status(self, client, auth_header, app_fixture): + _seed_jobs(app_fixture) + r = client.get("/jobs/?status=DEAD", headers=auth_header) + assert r.status_code == 200 + data = r.get_json() + assert all(j["status"] == "DEAD" for j in data) + + def test_list_filter_by_job_name(self, client, auth_header, app_fixture): + _seed_jobs(app_fixture) + r = client.get("/jobs/?job_name=test_job", headers=auth_header) + assert r.status_code == 200 + assert len(r.get_json()) == 4 + + def test_list_limit(self, client, auth_header, app_fixture): + _seed_jobs(app_fixture) + r = client.get("/jobs/?limit=2", headers=auth_header) + assert r.status_code == 200 + assert len(r.get_json()) == 2 + + +class TestJobsStatsEndpoint: + def test_stats_requires_auth(self, client): + r = client.get("/jobs/stats") + assert r.status_code in (401, 422) + + def test_stats_returns_counts(self, client, auth_header, app_fixture): + _seed_jobs(app_fixture) + r = client.get("/jobs/stats", headers=auth_header) + assert r.status_code == 200 + data = r.get_json() + assert data["total"] == 4 + assert data["SUCCESS"] == 2 + assert data["DEAD"] == 1 + + +class TestJobDetailEndpoint: + def test_detail_found(self, client, auth_header, app_fixture): + _seed_jobs(app_fixture) + r = client.get("/jobs/api_test_0", headers=auth_header) + assert r.status_code == 200 + assert r.get_json()["job_id"] == "api_test_0" + + def test_detail_not_found(self, client, auth_header): + r = client.get("/jobs/nonexistent_xyz", headers=auth_header) + assert r.status_code == 404 + + +class TestRetryEndpoint: + def test_retry_dead_job(self, client, auth_header, app_fixture): + _seed_jobs(app_fixture) + # Find the DEAD job + with app_fixture.app_context(): + dead = JobExecution.query.filter_by(status=JobStatus.DEAD).first() + dead_id = dead.id + + r = client.post(f"/jobs/retry/{dead_id}", headers=auth_header) + assert r.status_code == 200 + data = r.get_json() + assert data["status"] == "PENDING" + + def test_retry_non_dead_rejected(self, client, auth_header, app_fixture): + _seed_jobs(app_fixture) + with app_fixture.app_context(): + success = JobExecution.query.filter_by(status=JobStatus.SUCCESS).first() + sid = success.id + + r = client.post(f"/jobs/retry/{sid}", headers=auth_header) + assert r.status_code == 400 + + def test_retry_not_found(self, client, auth_header): + r = client.post("/jobs/retry/99999", headers=auth_header) + assert r.status_code == 404