diff --git a/packages/backend/app/models.py b/packages/backend/app/models.py index 64d44810..4b31bf66 100644 --- a/packages/backend/app/models.py +++ b/packages/backend/app/models.py @@ -133,3 +133,45 @@ class AuditLog(db.Model): user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True) action = db.Column(db.String(100), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + +class JobStatus(str, Enum): + PENDING = "PENDING" + RUNNING = "RUNNING" + COMPLETED = "COMPLETED" + RETRYING = "RETRYING" + DEAD = "DEAD" + CANCELLED = "CANCELLED" + + +class BackgroundJob(db.Model): + """Persistent background job with retry tracking.""" + __tablename__ = "background_jobs" + + id = db.Column(db.Integer, primary_key=True) + job_type = db.Column(db.String(100), nullable=False, index=True) + payload = db.Column(db.Text, nullable=True) + status = db.Column(db.String(20), default=JobStatus.PENDING.value, nullable=False, index=True) + attempt_count = db.Column(db.Integer, default=0, nullable=False) + max_retries = db.Column(db.Integer, default=3, nullable=False) + error_log = db.Column(db.Text, nullable=True) + user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True) + next_retry_at = db.Column(db.DateTime, nullable=True, index=True) + completed_at = db.Column(db.DateTime, nullable=True) + created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + updated_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + def to_dict(self): + return { + "id": self.id, + "job_type": self.job_type, + "status": self.status, + "attempt_count": self.attempt_count, + "max_retries": self.max_retries, + "error_log": self.error_log, + "user_id": self.user_id, + "next_retry_at": self.next_retry_at.isoformat() if self.next_retry_at else None, + "completed_at": self.completed_at.isoformat() if self.completed_at else None, + "created_at": self.created_at.isoformat() if self.created_at else None, + "updated_at": self.updated_at.isoformat() if self.updated_at else None, + } diff --git a/packages/backend/app/routes/jobs.py b/packages/backend/app/routes/jobs.py new file mode 100644 index 00000000..05febb13 --- /dev/null +++ b/packages/backend/app/routes/jobs.py @@ -0,0 +1,106 @@ +""" +REST API endpoints for background job monitoring and management. +GET /api/jobs - list jobs with filters +GET /api/jobs/stats - aggregate status counts +GET /api/jobs/ - single job detail +POST /api/jobs//requeue - requeue a dead job +POST /api/jobs//cancel - cancel pending/retrying job +""" +from datetime import datetime +from flask import Blueprint, jsonify, request +from flask_jwt_extended import get_jwt_identity, jwt_required + +from ..models import BackgroundJob, JobStatus +from ..services.job_service import ( + cancel_job, + get_dead_letter_jobs, + get_job_stats, + requeue_dead_job, +) + +jobs_bp = Blueprint("jobs", __name__, url_prefix="/api/jobs") + + +@jobs_bp.get("") +@jwt_required() +def list_jobs(): + """List jobs for the current user with optional status filter.""" + user_id = int(get_jwt_identity()) + status = request.args.get("status") + limit = min(int(request.args.get("limit", 50)), 200) + offset = int(request.args.get("offset", 0)) + + query = BackgroundJob.query.filter_by(user_id=user_id) + if status: + try: + JobStatus(status) + except ValueError: + return jsonify({"error": f"Invalid status. Valid values: {[s.value for s in JobStatus]}"}), 400 + query = query.filter_by(status=status) + + total = query.count() + jobs = query.order_by(BackgroundJob.created_at.desc()).offset(offset).limit(limit).all() + return jsonify({ + "total": total, + "items": [j.to_dict() for j in jobs], + "offset": offset, + "limit": limit, + }) + + +@jobs_bp.get("/stats") +@jwt_required() +def job_stats(): + """Aggregate job counts per status for the current user.""" + user_id = int(get_jwt_identity()) + stats = get_job_stats(user_id=user_id) + return jsonify(stats) + + +@jobs_bp.get("/dead-letter") +@jwt_required() +def dead_letter_queue(): + """Return dead-letter jobs for the current user.""" + user_id = int(get_jwt_identity()) + limit = min(int(request.args.get("limit", 100)), 500) + jobs = get_dead_letter_jobs(user_id=user_id, limit=limit) + return jsonify({"items": [j.to_dict() for j in jobs], "count": len(jobs)}) + + +@jobs_bp.get("/") +@jwt_required() +def get_job(job_id: int): + """Retrieve a single job by ID (must belong to current user).""" + user_id = int(get_jwt_identity()) + job = BackgroundJob.query.filter_by(id=job_id, user_id=user_id).first() + if not job: + return jsonify({"error": "Job not found"}), 404 + return jsonify(job.to_dict()) + + +@jobs_bp.post("//requeue") +@jwt_required() +def requeue_job(job_id: int): + """Requeue a dead-letter job for retry.""" + user_id = int(get_jwt_identity()) + job = BackgroundJob.query.filter_by(id=job_id, user_id=user_id).first() + if not job: + return jsonify({"error": "Job not found"}), 404 + result = requeue_dead_job(job_id) + if not result: + return jsonify({"error": "Only DEAD jobs can be requeued"}), 400 + return jsonify(result.to_dict()) + + +@jobs_bp.post("//cancel") +@jwt_required() +def cancel(job_id: int): + """Cancel a pending or retrying job.""" + user_id = int(get_jwt_identity()) + job = BackgroundJob.query.filter_by(id=job_id, user_id=user_id).first() + if not job: + return jsonify({"error": "Job not found"}), 404 + success = cancel_job(job_id) + if not success: + return jsonify({"error": "Job cannot be cancelled in its current state"}), 400 + return jsonify(job.to_dict()) diff --git a/packages/backend/app/services/job_service.py b/packages/backend/app/services/job_service.py new file mode 100644 index 00000000..131c8eaa --- /dev/null +++ b/packages/backend/app/services/job_service.py @@ -0,0 +1,127 @@ +""" +Background job retry service with exponential backoff, dead-letter queue, and monitoring. +Implements resilient async job execution for FinMind. +""" +import json +import threading +import traceback +from datetime import datetime, timedelta +from typing import Any, Callable, Dict, Optional + +from ..extensions import db +from ..models import BackgroundJob, JobStatus + + +DEFAULT_MAX_RETRIES = 3 +DEFAULT_BASE_DELAY_SECONDS = 5 +DEFAULT_MAX_DELAY_SECONDS = 300 +JOB_LOCK = threading.Lock() + + +def _calculate_backoff(attempt: int, base_delay: int = DEFAULT_BASE_DELAY_SECONDS, + max_delay: int = DEFAULT_MAX_DELAY_SECONDS) -> int: + """Exponential backoff: min(base * 2^attempt, max).""" + return int(min(base_delay * (2 ** attempt), max_delay)) + + +def enqueue_job(job_type: str, payload: Dict[str, Any], + max_retries: int = DEFAULT_MAX_RETRIES, + user_id: Optional[int] = None) -> BackgroundJob: + """Create and persist a new background job.""" + job = BackgroundJob( + job_type=job_type, + payload=json.dumps(payload), + status=JobStatus.PENDING.value, + max_retries=max_retries, + attempt_count=0, + user_id=user_id, + created_at=datetime.utcnow(), + updated_at=datetime.utcnow(), + next_retry_at=datetime.utcnow(), + ) + db.session.add(job) + db.session.commit() + return job + + +def execute_job(job: BackgroundJob, handler: Callable[[Dict], Any]) -> bool: + """Execute a job with retry logic. Returns True on success.""" + with JOB_LOCK: + job.status = JobStatus.RUNNING.value + job.attempt_count += 1 + job.updated_at = datetime.utcnow() + db.session.commit() + + try: + payload = json.loads(job.payload or "{}") + handler(payload) + with JOB_LOCK: + job.status = JobStatus.COMPLETED.value + job.completed_at = datetime.utcnow() + job.updated_at = datetime.utcnow() + db.session.commit() + return True + except Exception: + error_msg = f"Attempt {job.attempt_count}: {traceback.format_exc()}" + with JOB_LOCK: + existing = job.error_log or "" + job.error_log = f"{existing}\n---\n{error_msg}".strip() + if job.attempt_count >= job.max_retries: + job.status = JobStatus.DEAD.value + else: + delay = _calculate_backoff(job.attempt_count) + job.status = JobStatus.RETRYING.value + job.next_retry_at = datetime.utcnow() + timedelta(seconds=delay) + job.updated_at = datetime.utcnow() + db.session.commit() + return False + + +def get_pending_jobs(limit: int = 50): + """Return jobs ready for execution.""" + now = datetime.utcnow() + return BackgroundJob.query.filter( + BackgroundJob.status.in_([JobStatus.PENDING.value, JobStatus.RETRYING.value]), + BackgroundJob.next_retry_at <= now, + ).order_by(BackgroundJob.next_retry_at).limit(limit).all() + + +def get_dead_letter_jobs(user_id: Optional[int] = None, limit: int = 100): + """Return jobs in the dead-letter queue.""" + query = BackgroundJob.query.filter_by(status=JobStatus.DEAD.value) + if user_id is not None: + query = query.filter_by(user_id=user_id) + return query.order_by(BackgroundJob.updated_at.desc()).limit(limit).all() + + +def requeue_dead_job(job_id: int) -> Optional[BackgroundJob]: + """Manually requeue a dead-letter job.""" + job = BackgroundJob.query.get(job_id) + if job and job.status == JobStatus.DEAD.value: + job.status = JobStatus.PENDING.value + job.attempt_count = 0 + job.error_log = None + job.next_retry_at = datetime.utcnow() + job.updated_at = datetime.utcnow() + db.session.commit() + return job + return None + + +def get_job_stats(user_id: Optional[int] = None) -> Dict[str, int]: + """Return aggregate stats: counts per status.""" + query = BackgroundJob.query + if user_id is not None: + query = query.filter_by(user_id=user_id) + return {status.value: query.filter_by(status=status.value).count() for status in JobStatus} + + +def cancel_job(job_id: int) -> bool: + """Cancel a pending or retrying job.""" + job = BackgroundJob.query.get(job_id) + if job and job.status in (JobStatus.PENDING.value, JobStatus.RETRYING.value): + job.status = JobStatus.CANCELLED.value + job.updated_at = datetime.utcnow() + db.session.commit() + return True + return False diff --git a/packages/backend/tests/test_job_service.py b/packages/backend/tests/test_job_service.py new file mode 100644 index 00000000..c63113f6 --- /dev/null +++ b/packages/backend/tests/test_job_service.py @@ -0,0 +1,208 @@ +""" +Tests for background job retry service and monitoring endpoints. +Covers: enqueue, success, retry with backoff, dead-letter, requeue, cancel, stats. +""" +import json +from datetime import datetime, timedelta +from unittest.mock import MagicMock, patch, call +import pytest + +from app.models import BackgroundJob, JobStatus +from app.services.job_service import ( + _calculate_backoff, + cancel_job, + enqueue_job, + execute_job, + get_dead_letter_jobs, + get_job_stats, + get_pending_jobs, + requeue_dead_job, +) + + +# --- Unit tests for backoff calculation --- + +class TestCalculateBackoff: + def test_first_attempt(self): + assert _calculate_backoff(0) == 5 # 5 * 2^0 = 5 + + def test_second_attempt(self): + assert _calculate_backoff(1) == 10 # 5 * 2^1 = 10 + + def test_third_attempt(self): + assert _calculate_backoff(2) == 20 # 5 * 2^2 = 20 + + def test_max_cap(self): + assert _calculate_backoff(100, max_delay=300) == 300 + + def test_custom_base(self): + assert _calculate_backoff(1, base_delay=10) == 20 + + +# --- Integration-style tests using in-memory SQLite --- + +@pytest.fixture +def app_ctx(app): + with app.app_context(): + yield + + +@pytest.fixture +def sample_job(app_ctx, db): + job = BackgroundJob( + job_type="test_job", + payload=json.dumps({"key": "value"}), + status=JobStatus.PENDING.value, + max_retries=3, + attempt_count=0, + next_retry_at=datetime.utcnow() - timedelta(seconds=1), + created_at=datetime.utcnow(), + updated_at=datetime.utcnow(), + ) + db.session.add(job) + db.session.commit() + return job + + +class TestExecuteJob: + def test_success_marks_completed(self, sample_job): + handler = MagicMock() + result = execute_job(sample_job, handler) + assert result is True + assert sample_job.status == JobStatus.COMPLETED.value + assert sample_job.attempt_count == 1 + assert sample_job.completed_at is not None + handler.assert_called_once_with({"key": "value"}) + + def test_failure_increments_attempts_and_schedules_retry(self, sample_job): + def failing_handler(payload): + raise ValueError("test error") + + result = execute_job(sample_job, failing_handler) + assert result is False + assert sample_job.status == JobStatus.RETRYING.value + assert sample_job.attempt_count == 1 + assert sample_job.error_log is not None + assert "test error" in sample_job.error_log + assert sample_job.next_retry_at > datetime.utcnow() + + def test_max_retries_moves_to_dead(self, sample_job): + sample_job.attempt_count = 2 # already at max - 1 + sample_job.max_retries = 3 + + def failing_handler(payload): + raise RuntimeError("fatal error") + + result = execute_job(sample_job, failing_handler) + assert result is False + assert sample_job.status == JobStatus.DEAD.value + + def test_error_log_accumulates(self, sample_job): + def failing_handler(payload): + raise ValueError("error one") + + execute_job(sample_job, failing_handler) + error_log_after_first = sample_job.error_log + + sample_job.status = JobStatus.RETRYING.value + sample_job.next_retry_at = datetime.utcnow() - timedelta(seconds=1) + + def failing_handler2(payload): + raise ValueError("error two") + + execute_job(sample_job, failing_handler2) + assert "error one" in sample_job.error_log + assert "error two" in sample_job.error_log + + +class TestGetPendingJobs: + def test_returns_pending_jobs(self, sample_job): + jobs = get_pending_jobs() + assert any(j.id == sample_job.id for j in jobs) + + def test_does_not_return_future_retries(self, sample_job, db): + future_job = BackgroundJob( + job_type="future", + payload="{}", + status=JobStatus.RETRYING.value, + attempt_count=1, + max_retries=3, + next_retry_at=datetime.utcnow() + timedelta(hours=1), + created_at=datetime.utcnow(), + updated_at=datetime.utcnow(), + ) + db.session.add(future_job) + db.session.commit() + jobs = get_pending_jobs() + assert not any(j.id == future_job.id for j in jobs) + + +class TestRequeueAndCancel: + def test_requeue_dead_job(self, sample_job): + sample_job.status = JobStatus.DEAD.value + from app.extensions import db + db.session.commit() + result = requeue_dead_job(sample_job.id) + assert result is not None + assert result.status == JobStatus.PENDING.value + assert result.attempt_count == 0 + + def test_requeue_non_dead_returns_none(self, sample_job): + result = requeue_dead_job(sample_job.id) + assert result is None + + def test_cancel_pending_job(self, sample_job): + success = cancel_job(sample_job.id) + assert success is True + assert sample_job.status == JobStatus.CANCELLED.value + + def test_cancel_completed_job_fails(self, sample_job): + sample_job.status = JobStatus.COMPLETED.value + from app.extensions import db + db.session.commit() + success = cancel_job(sample_job.id) + assert success is False + + +class TestJobStats: + def test_stats_returns_all_statuses(self, sample_job): + stats = get_job_stats() + for status in JobStatus: + assert status.value in stats + assert stats[JobStatus.PENDING.value] >= 1 + + +class TestJobEndpoints: + def test_list_jobs_requires_auth(self, client): + resp = client.get("/api/jobs") + assert resp.status_code == 401 + + def test_stats_requires_auth(self, client): + resp = client.get("/api/jobs/stats") + assert resp.status_code == 401 + + def test_list_jobs_authenticated(self, auth_client, sample_job): + resp = auth_client.get("/api/jobs") + assert resp.status_code == 200 + data = resp.get_json() + assert "items" in data + assert "total" in data + + def test_dead_letter_queue(self, auth_client, db): + from app.extensions import db as _db + dead_job = BackgroundJob( + job_type="dead", + payload="{}", + status=JobStatus.DEAD.value, + attempt_count=3, + max_retries=3, + next_retry_at=datetime.utcnow(), + created_at=datetime.utcnow(), + updated_at=datetime.utcnow(), + ) + _db.session.add(dead_job) + _db.session.commit() + resp = auth_client.get("/api/jobs/dead-letter") + assert resp.status_code == 200 + data = resp.get_json() + assert data["count"] >= 1