Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions packages/backend/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,45 @@ class AuditLog(db.Model):
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True)
action = db.Column(db.String(100), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)


class JobStatus(str, Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
RETRYING = "RETRYING"
DEAD = "DEAD"
CANCELLED = "CANCELLED"


class BackgroundJob(db.Model):
"""Persistent background job with retry tracking."""
__tablename__ = "background_jobs"

id = db.Column(db.Integer, primary_key=True)
job_type = db.Column(db.String(100), nullable=False, index=True)
payload = db.Column(db.Text, nullable=True)
status = db.Column(db.String(20), default=JobStatus.PENDING.value, nullable=False, index=True)
attempt_count = db.Column(db.Integer, default=0, nullable=False)
max_retries = db.Column(db.Integer, default=3, nullable=False)
error_log = db.Column(db.Text, nullable=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True)
next_retry_at = db.Column(db.DateTime, nullable=True, index=True)
completed_at = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)

def to_dict(self):
return {
"id": self.id,
"job_type": self.job_type,
"status": self.status,
"attempt_count": self.attempt_count,
"max_retries": self.max_retries,
"error_log": self.error_log,
"user_id": self.user_id,
"next_retry_at": self.next_retry_at.isoformat() if self.next_retry_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
106 changes: 106 additions & 0 deletions packages/backend/app/routes/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
REST API endpoints for background job monitoring and management.
GET /api/jobs - list jobs with filters
GET /api/jobs/stats - aggregate status counts
GET /api/jobs/<id> - single job detail
POST /api/jobs/<id>/requeue - requeue a dead job
POST /api/jobs/<id>/cancel - cancel pending/retrying job
"""
from datetime import datetime
from flask import Blueprint, jsonify, request
from flask_jwt_extended import get_jwt_identity, jwt_required

from ..models import BackgroundJob, JobStatus
from ..services.job_service import (
cancel_job,
get_dead_letter_jobs,
get_job_stats,
requeue_dead_job,
)

jobs_bp = Blueprint("jobs", __name__, url_prefix="/api/jobs")


@jobs_bp.get("")
@jwt_required()
def list_jobs():
"""List jobs for the current user with optional status filter."""
user_id = int(get_jwt_identity())
status = request.args.get("status")
limit = min(int(request.args.get("limit", 50)), 200)
offset = int(request.args.get("offset", 0))

query = BackgroundJob.query.filter_by(user_id=user_id)
if status:
try:
JobStatus(status)
except ValueError:
return jsonify({"error": f"Invalid status. Valid values: {[s.value for s in JobStatus]}"}), 400
query = query.filter_by(status=status)

total = query.count()
jobs = query.order_by(BackgroundJob.created_at.desc()).offset(offset).limit(limit).all()
return jsonify({
"total": total,
"items": [j.to_dict() for j in jobs],
"offset": offset,
"limit": limit,
})


@jobs_bp.get("/stats")
@jwt_required()
def job_stats():
"""Aggregate job counts per status for the current user."""
user_id = int(get_jwt_identity())
stats = get_job_stats(user_id=user_id)
return jsonify(stats)


@jobs_bp.get("/dead-letter")
@jwt_required()
def dead_letter_queue():
"""Return dead-letter jobs for the current user."""
user_id = int(get_jwt_identity())
limit = min(int(request.args.get("limit", 100)), 500)
jobs = get_dead_letter_jobs(user_id=user_id, limit=limit)
return jsonify({"items": [j.to_dict() for j in jobs], "count": len(jobs)})


@jobs_bp.get("/<int:job_id>")
@jwt_required()
def get_job(job_id: int):
"""Retrieve a single job by ID (must belong to current user)."""
user_id = int(get_jwt_identity())
job = BackgroundJob.query.filter_by(id=job_id, user_id=user_id).first()
if not job:
return jsonify({"error": "Job not found"}), 404
return jsonify(job.to_dict())


@jobs_bp.post("/<int:job_id>/requeue")
@jwt_required()
def requeue_job(job_id: int):
"""Requeue a dead-letter job for retry."""
user_id = int(get_jwt_identity())
job = BackgroundJob.query.filter_by(id=job_id, user_id=user_id).first()
if not job:
return jsonify({"error": "Job not found"}), 404
result = requeue_dead_job(job_id)
if not result:
return jsonify({"error": "Only DEAD jobs can be requeued"}), 400
return jsonify(result.to_dict())


@jobs_bp.post("/<int:job_id>/cancel")
@jwt_required()
def cancel(job_id: int):
"""Cancel a pending or retrying job."""
user_id = int(get_jwt_identity())
job = BackgroundJob.query.filter_by(id=job_id, user_id=user_id).first()
if not job:
return jsonify({"error": "Job not found"}), 404
success = cancel_job(job_id)
if not success:
return jsonify({"error": "Job cannot be cancelled in its current state"}), 400
return jsonify(job.to_dict())
127 changes: 127 additions & 0 deletions packages/backend/app/services/job_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""
Background job retry service with exponential backoff, dead-letter queue, and monitoring.
Implements resilient async job execution for FinMind.
"""
import json
import threading
import traceback
from datetime import datetime, timedelta
from typing import Any, Callable, Dict, Optional

from ..extensions import db
from ..models import BackgroundJob, JobStatus


DEFAULT_MAX_RETRIES = 3
DEFAULT_BASE_DELAY_SECONDS = 5
DEFAULT_MAX_DELAY_SECONDS = 300
JOB_LOCK = threading.Lock()


def _calculate_backoff(attempt: int, base_delay: int = DEFAULT_BASE_DELAY_SECONDS,
max_delay: int = DEFAULT_MAX_DELAY_SECONDS) -> int:
"""Exponential backoff: min(base * 2^attempt, max)."""
return int(min(base_delay * (2 ** attempt), max_delay))


def enqueue_job(job_type: str, payload: Dict[str, Any],
max_retries: int = DEFAULT_MAX_RETRIES,
user_id: Optional[int] = None) -> BackgroundJob:
"""Create and persist a new background job."""
job = BackgroundJob(
job_type=job_type,
payload=json.dumps(payload),
status=JobStatus.PENDING.value,
max_retries=max_retries,
attempt_count=0,
user_id=user_id,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
next_retry_at=datetime.utcnow(),
)
db.session.add(job)
db.session.commit()
return job


def execute_job(job: BackgroundJob, handler: Callable[[Dict], Any]) -> bool:
"""Execute a job with retry logic. Returns True on success."""
with JOB_LOCK:
job.status = JobStatus.RUNNING.value
job.attempt_count += 1
job.updated_at = datetime.utcnow()
db.session.commit()

try:
payload = json.loads(job.payload or "{}")
handler(payload)
with JOB_LOCK:
job.status = JobStatus.COMPLETED.value
job.completed_at = datetime.utcnow()
job.updated_at = datetime.utcnow()
db.session.commit()
return True
except Exception:
error_msg = f"Attempt {job.attempt_count}: {traceback.format_exc()}"
with JOB_LOCK:
existing = job.error_log or ""
job.error_log = f"{existing}\n---\n{error_msg}".strip()
if job.attempt_count >= job.max_retries:
job.status = JobStatus.DEAD.value
else:
delay = _calculate_backoff(job.attempt_count)
job.status = JobStatus.RETRYING.value
job.next_retry_at = datetime.utcnow() + timedelta(seconds=delay)
job.updated_at = datetime.utcnow()
db.session.commit()
return False


def get_pending_jobs(limit: int = 50):
"""Return jobs ready for execution."""
now = datetime.utcnow()
return BackgroundJob.query.filter(
BackgroundJob.status.in_([JobStatus.PENDING.value, JobStatus.RETRYING.value]),
BackgroundJob.next_retry_at <= now,
).order_by(BackgroundJob.next_retry_at).limit(limit).all()


def get_dead_letter_jobs(user_id: Optional[int] = None, limit: int = 100):
"""Return jobs in the dead-letter queue."""
query = BackgroundJob.query.filter_by(status=JobStatus.DEAD.value)
if user_id is not None:
query = query.filter_by(user_id=user_id)
return query.order_by(BackgroundJob.updated_at.desc()).limit(limit).all()


def requeue_dead_job(job_id: int) -> Optional[BackgroundJob]:
"""Manually requeue a dead-letter job."""
job = BackgroundJob.query.get(job_id)
if job and job.status == JobStatus.DEAD.value:
job.status = JobStatus.PENDING.value
job.attempt_count = 0
job.error_log = None
job.next_retry_at = datetime.utcnow()
job.updated_at = datetime.utcnow()
db.session.commit()
return job
return None


def get_job_stats(user_id: Optional[int] = None) -> Dict[str, int]:
"""Return aggregate stats: counts per status."""
query = BackgroundJob.query
if user_id is not None:
query = query.filter_by(user_id=user_id)
return {status.value: query.filter_by(status=status.value).count() for status in JobStatus}


def cancel_job(job_id: int) -> bool:
"""Cancel a pending or retrying job."""
job = BackgroundJob.query.get(job_id)
if job and job.status in (JobStatus.PENDING.value, JobStatus.RETRYING.value):
job.status = JobStatus.CANCELLED.value
job.updated_at = datetime.utcnow()
db.session.commit()
return True
return False
Loading