Skip to content

ParzivalXIII/arq-task-planner

Repository files navigation

Distributed Task Orchestrator

This repository contains a FastAPI-based backend for a distributed task orchestration system. It includes:

  • Async HTTP API for submitting and monitoring tasks
  • PostgreSQL persistence via SQLModel
  • Redis-based event queue with ARQ workers
  • Exponential backoff retry logic and dead-letter handling
  • Structured logging and health checks

Documentation

  • API Reference - HTTP endpoint specifications and examples
  • Worker Runtime - Task processing, retry policies, and handler development
  • PRD - Project requirements and architecture

Quick Start

Prerequisites

  • Python 3.12+
  • PostgreSQL 15+ or SQLite
  • Redis 7+
  • Docker & Docker Compose (optional)

Local Development

# 1. Install dependencies
uv sync

# 2. Set up environment
cp .env.example .env

# 3. Run migrations (if using PostgreSQL)
alembic upgrade head

# 4. Terminal 1: Start API server
uv run python main.py

# 5. Terminal 2: Start worker
uv run python -m src.workers.runner

Docker Compose

# Start all services (API, Worker, DB, Redis)
docker-compose up

# Stop all services
docker-compose down

Usage

Submit a Task

curl -X POST http://localhost:8000/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "task_type": "email_notification",
    "payload": {"email": "user@example.com", "subject": "Welcome"},
    "priority": 1,
    "max_retries": 5
  }'

Monitor Task Status

curl http://localhost:8000/tasks/{task_id}

List Tasks

curl "http://localhost:8000/tasks?status=PENDING"

Manual Retry

curl -X POST http://localhost:8000/tasks/{task_id}/retry

Health Check

curl http://localhost:8000/health

Project Structure

src/
  api/              # FastAPI routes and schemas
  services/         # Business logic (task service)
  workers/          # ARQ worker runtime
  models/           # SQLModel database models
  db/               # Database session management
  observability/    # Logging and metrics

tests/
  unit/             # Unit tests
  integration/      # Integration tests

docs/               # Documentation
  API.md            # API specification
  WORKER.md         # Worker documentation
  PRD.md            # Project requirements

migrations/         # Alembic database migrations

Testing

# Run all tests
uv run pytest tests/ -v

# Run with coverage
uv run pytest tests/ --cov=src --cov-report=html

# Run specific test category
uv run pytest tests/unit/ -v
uv run pytest tests/integration/ -v

Task Status Flow

PENDING → QUEUED → PROCESSING → COMPLETED
                 ↓
                FAILED/RETRYING* → QUEUED → ...
                            ↓
                       DEAD_LETTER*

* Automatic retry with exponential backoff

Configuration

Environment variables (see .env.example):

  • DATABASE_URL - PostgreSQL/SQLite connection
  • REDIS_URL - Redis connection
  • LOG_LEVEL - Logging verbosity
  • JOB_TIMEOUT - Max task execution time (seconds)

About

A FastAPI-based distributed task orchestration system with ARQ workers, PostgreSQL persistence, Redis queues, retry logic, and monitoring, ideal for building scalable asynchronous backends.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages