Skip to content

Tutorial on building async processing pipelines with FastAPI and Celery on Upsun

License

Notifications You must be signed in to change notification settings

upsun-trash/tutorial-fastapi-celery

Repository files navigation

FastAPI + Celery Web Content Processor

A comprehensive demonstration of building async web content processing pipelines using FastAPI + Celery, designed for deployment on Upsun as a multi-app architecture.

πŸš€ Features

  • FastAPI Web Application: High-performance async web server with automatic OpenAPI documentation
  • Celery Task Queue: Distributed task processing for AI/ML operations
  • Redis Message Broker: Fast and reliable message queuing
  • Flower Monitoring: Web-based monitoring dashboard for Celery workers
  • Web Content Processing: Web scraping and content summarization using OpenAI
  • Multi-App Architecture: Designed for Upsun platform deployment
  • Comprehensive Logging: Structured logging with JSON output
  • Error Handling: Robust error handling with custom exception types
  • Validation: Thorough input validation and file type checking

πŸ“ Project Structure

fastapi-celery/
β”œβ”€β”€ api/                    # FastAPI web application (Upsun app)
β”‚   β”œβ”€β”€ main.py            # Main FastAPI app with endpoints
β”‚   β”œβ”€β”€ pyproject.toml     # API service dependencies
β”‚   β”œβ”€β”€ uv.lock            # UV lock file for API
β”‚   β”œβ”€β”€ shared/            # Local copy of shared utilities
β”‚   └── .venv/             # Virtual environment
β”œβ”€β”€ worker/                # Celery worker processes (Upsun app)
β”‚   β”œβ”€β”€ tasks.py           # Celery task definitions
β”‚   β”œβ”€β”€ web_scraper.py      # Web content scraping and processing
β”‚   β”œβ”€β”€ pyproject.toml     # Worker service dependencies
β”‚   β”œβ”€β”€ uv.lock            # UV lock file for worker
β”‚   β”œβ”€β”€ shared/            # Local copy of shared utilities
β”‚   └── .venv/             # Virtual environment
β”œβ”€β”€ monitoring/            # Flower monitoring (Upsun app)
β”‚   β”œβ”€β”€ tasks.py           # Celery config for monitoring
β”‚   β”œβ”€β”€ flowerconfig.py    # Flower configuration
β”‚   β”œβ”€β”€ start_flower.py    # Flower startup script
β”‚   β”œβ”€β”€ pyproject.toml     # Monitoring service dependencies
β”‚   β”œβ”€β”€ uv.lock            # UV lock file for monitoring
β”‚   β”œβ”€β”€ shared/            # Local copy of shared utilities
β”‚   └── .venv/             # Virtual environment
β”œβ”€β”€ shared/                # Original shared utilities (template)
β”‚   β”œβ”€β”€ __init__.py        # Package exports
β”‚   β”œβ”€β”€ config.py          # Configuration management
β”‚   β”œβ”€β”€ logging_config.py  # Logging utilities
β”‚   β”œβ”€β”€ utils.py           # Common utilities
β”‚   β”œβ”€β”€ exceptions.py      # Custom exception classes
β”‚   └── pyproject.toml     # Shared utilities dependencies
β”œβ”€β”€ .upsun/
β”‚   └── config.yaml        # Upsun multi-app deployment config
β”œβ”€β”€ docker-compose.yml     # Local development with Redis
β”œβ”€β”€ pyproject.toml         # Root project dependencies (dev only)
β”œβ”€β”€ test_system.py         # System integration tests
β”œβ”€β”€ main.py                # Legacy main file
└── README.md              # This file

πŸ› οΈ Dependencies

The project uses the following key dependencies:

  • Web Framework: FastAPI, Uvicorn
  • Task Queue: Celery, Redis, Flower
  • AI/ML: OpenAI API for content summarization
  • Web Scraping: BeautifulSoup, requests, readability-lxml
  • Utilities: Pydantic, python-dotenv, structlog, tenacity

βš™οΈ Configuration

The application uses environment-based configuration with the following key variables:

Required Environment Variables

# Redis/Celery Configuration
REDIS_URL=redis://localhost:6379/0
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0

# API Configuration
API_HOST=0.0.0.0
API_PORT=8000

# Logging
LOG_LEVEL=INFO
LOG_FORMAT=json

# AI/ML (Required for summarization)
OPENAI_API_KEY=your_openai_api_key

Optional Environment Variables

# File Processing Limits
MAX_FILE_SIZE_MB=100

# Task Configuration
TASK_TIMEOUT_SECONDS=300
MAX_RETRIES=3

# Flower Monitoring
FLOWER_PORT=5555
FLOWER_BASIC_AUTH=admin:admin

# CORS Configuration
CORS_ORIGINS=*

# Web Scraping Configuration
STORAGE_PATH=/app/data
USER_AGENT=FastAPI-Celery-WebScraper/1.0

πŸš€ Quick Start

1. Install Dependencies

pip install -e .

2. Start Redis Server

Option A: Using Docker Compose (Recommended)

docker-compose up redis -d

Option B: Using standalone Docker

docker run -d --name redis -p 6379:6379 redis:7-alpine

Option C: Local Redis installation

redis-server

3. Start Celery Worker

cd worker
celery -A tasks worker --loglevel=info --concurrency=4

4. Start FastAPI Application

cd api
python main.py

# Or using uvicorn directly
uvicorn main:app --host 0.0.0.0 --port 8000 --reload

5. Start Flower Monitoring (Optional)

cd monitoring
python start_flower.py

# Or using celery flower command
celery -A tasks flower --port=5555

🐳 Docker Development Setup

The project includes a docker-compose.yml for easy local development:

# Start Redis only
docker-compose up redis -d

# Start all services (when Docker images are built)
docker-compose up -d

# View logs
docker-compose logs -f

# Stop services
docker-compose down

The Docker Compose setup includes:

  • Redis: Message broker on port 6379
  • API: FastAPI application on port 8000 (when built)
  • Worker: Celery worker processes (when built)
  • Flower: Monitoring dashboard on port 5555 (when built)

πŸ§ͺ Testing the System

Automated Testing

Run the comprehensive test suite:

python test_system.py --verbose

This will test:

  • API health checks
  • Web content processing endpoints
  • Task status monitoring
  • Worker statistics

Manual Testing

1. Check API Health

curl http://localhost:8000/health

2. Scrape and Summarize a Web Page

curl -X POST "http://localhost:8000/tasks/web/summarize" \
  -H "Content-Type: application/json" \
  -d '{
    "url": "https://example.com/article",
    "summary_type": "standard"
  }'

3. Get Task Result from Storage

curl http://localhost:8000/tasks/{task_id}/result

4. Check Task Status

curl http://localhost:8000/tasks/{task_id}/status

5. View Active Tasks

curl http://localhost:8000/tasks

πŸ“Š Monitoring

FastAPI Documentation

Flower Dashboard

API Endpoints

  • GET / - API information
  • GET /health - Health check with dependencies
  • POST /tasks/web/summarize - Scrape and summarize web pages
  • GET /tasks/{task_id}/result - Get detailed results from storage
  • GET /tasks/{task_id}/status - Get task status
  • GET /tasks - List active tasks
  • GET /tasks/stats - Worker statistics

☁️ Upsun Deployment

This project is designed for deployment on Upsun platform with a multi-app architecture. Each service runs in its own container with isolated dependencies.

Multi-App Architecture

The project uses Upsun's multi-app deployment with three separate applications:

  1. API Application (api/)

    • Source root: api/
    • Python 3.13 runtime
    • UV package manager for fast dependency resolution
    • Isolated dependencies in pyproject.toml and uv.lock
    • FastAPI-specific dependencies only
  2. Worker Application (worker/)

    • Source root: worker/
    • Python 3.13 runtime
    • UV package manager for fast dependency resolution
    • Web scraping dependencies (BeautifulSoup, requests, readability)
    • Celery worker with web content processing capabilities
  3. Flower Application (monitoring/)

    • Source root: monitoring/
    • Python 3.13 runtime
    • UV package manager for fast dependency resolution
    • Minimal dependencies for monitoring dashboard
    • Flower monitoring with Celery inspection

UV Package Manager

Each application uses UV for fast dependency installation:

dependencies:
  python3:
    uv: "*"
hooks:
  build: |
    # Use uv for fast dependency installation
    uv sync --frozen

Benefits of UV:

  • Fast Installation: 10-100x faster than pip
  • Reproducible Builds: Locked dependencies with uv.lock
  • Isolated Environments: Each app has its own dependencies
  • Better Caching: Efficient dependency resolution

Deployment Steps

  1. Connect Repository: Link your Git repository to Upsun
  2. Configure Environment Variables: Set required environment variables in Upsun dashboard
  3. Deploy: Push to main branch to trigger deployment

Each application will:

  • Create its own container
  • Install only required dependencies using UV
  • Use shared utilities from local copies
  • Connect to the shared Redis service

Upsun Services

  • API: FastAPI web application (https://api.{default}/)
  • Worker: Celery background processors (internal service)
  • Flower: Monitoring dashboard (https://monitor.{default}/)
  • Redis: Message broker service (internal service)

Access URLs

  • API: https://api.{default}/
  • API Documentation: https://api.{default}/docs
  • Monitoring: https://monitor.{default}/
  • Main: https://www.{default}/ (redirects to API)

Environment Variables

Each application uses environment-based configuration:

# Required for all services
CELERY_BROKER_URL=redis://redis.internal:6379/0
CELERY_RESULT_BACKEND=redis://redis.internal:6379/0

# Required AI/ML API keys (for worker)
OPENAI_API_KEY=your_openai_key

# Web Scraping Configuration
STORAGE_PATH=/app/data
USER_AGENT=FastAPI-Celery-WebScraper/1.0

πŸ”§ Development

Adding New Tasks

  1. Define Task: Add new task function in worker/tasks.py
  2. Add Processing Logic: Create processing module in worker/
  3. Add API Endpoint: Add new endpoint in api/main.py
  4. Update Tests: Add tests in test_system.py

Example New Task

# worker/tasks.py
@app.task(bind=True, name='tasks.new_ai_task')
def new_ai_task(self, input_data: dict, task_id: str):
    # Implementation here
    pass

# api/main.py  
@app.post("/tasks/new-ai/process")
async def process_new_ai_task(request: NewAIRequest):
    # Implementation here
    pass

Configuration Changes

Modify settings in shared/config.py and update environment variables accordingly.

Logging

The application uses structured logging with JSON output. Logs include:

  • Request/response logging
  • Task execution tracking
  • Error handling with context
  • Performance metrics

πŸ› Troubleshooting

Common Issues

  1. Redis Connection Failed

    • Ensure Redis is running: docker-compose up redis -d
    • Check REDIS_URL environment variable
    • Verify port 6379 is not in use by other services
  2. Tasks Not Processing

    • Verify Celery worker is running
    • Check worker logs for errors
    • Ensure tasks are properly registered
  3. File Upload Errors

    • Check file size limits (MAX_FILE_SIZE_MB)
    • Verify file types are supported
    • Check disk space
  4. High Memory Usage

    • Reduce worker concurrency
    • Implement result expiration
    • Monitor task memory usage

Debug Mode

Enable debug logging:

export LOG_LEVEL=DEBUG
export DEBUG=true

Health Checks

Monitor service health:

# API Health
curl http://localhost:8000/health

# Worker Stats  
curl http://localhost:8000/tasks/stats

πŸ“ License

This project is a demonstration/example for Upsun deployment and is provided as-is for educational purposes.

🀝 Contributing

This is a demonstration project. For production use, consider:

  • Adding authentication/authorization
  • Implementing real AI models
  • Adding database persistence
  • Implementing rate limiting
  • Adding comprehensive monitoring
  • Setting up CI/CD pipelines

πŸ“š Additional Resources

About

Tutorial on building async processing pipelines with FastAPI and Celery on Upsun

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages