Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions examples/celery/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Celery HMR Example

This example demonstrates how to use HMR (Hot Module Reload) with Celery workers and tasks. With HMR, you can modify task implementations and see the changes take effect immediately without restarting the Celery worker.

## What This Example Shows

- **Hot-reloadable Celery tasks**: Modify task functions and see changes immediately
- **Worker lifecycle management**: Proper integration between HMR and Celery worker threads
- **Real-time task testing**: Send tasks while modifying code to see live updates
- **Error handling**: Demonstrate fixing broken tasks without worker restart

## Files Structure

- `app.py`: Celery app configuration with in-memory broker for demo
- `tasks.py`: Example tasks that can be hot-reloaded
- `worker.py`: Main entry point that starts Celery worker with HMR
- `sender.py`: Script to send test tasks to the worker
- `utils.py`: Logging and utility functions

## How to Run

1. **Install dependencies** (from this directory):
```sh
uv sync
```

2. **Start the worker with HMR** (in one terminal):
```sh
hmr worker.py
```

You should see output like:
```
Starting Celery HMR Worker
Worker started! You can now:
1. Run 'python sender.py' in another terminal to send tasks
2. Modify tasks.py to see HMR in action
3. Press Ctrl+C to stop
```

3. **Send test tasks** (in another terminal, from this directory):
```sh
python sender.py
```

## Demonstrating HMR

Once both the worker and sender are running, try these experiments:

### 1. Modify Task Logic
Edit `tasks.py` and change the `process_data` function. For example:
```python
# Change this line in process_data function:
'doubled': data * 2 if isinstance(data, (int, float)) else f"{data}_{data}",
# To:
'tripled': data * 3 if isinstance(data, (int, float)) else f"{data}_{data}_{data}",
```

Then run `python sender.py` again - you'll see the changes immediately!

### 2. Fix the Failing Task
The `failing_task` function is designed to fail. Fix it by uncommenting the return statement:
```python
@app.task
def failing_task():
"""A task that fails - can be fixed with HMR."""
logger.info("This task is about to fail...")
# Uncomment the next line to fix this task:
return "Task fixed with HMR!"
# raise Exception("This task always fails - fix me with HMR!")
```

### 3. Add New Tasks
Add a completely new task to `tasks.py`:
```python
@app.task
def new_task(message):
"""A new task added via HMR."""
logger.info(f"New task received: {message}")
return f"Processed: {message}"
```

Then modify `sender.py` to call your new task and run it again.

### 4. Update Utilities
Modify `utils.py` to change the logging format or level, and see the changes take effect immediately.

## How It Works

1. **HMR Integration**: The `worker.py` file uses a pattern similar to the Flask example, running the Celery worker in a separate thread that can be restarted when modules change.

2. **Task Registration**: Tasks are automatically discovered and registered with the Celery app. When HMR reloads the tasks module, the new task definitions become available.

3. **In-Memory Broker**: This example uses an in-memory message broker for simplicity. In production, you'd use Redis, RabbitMQ, or another persistent broker.

4. **Thread Safety**: The worker thread is properly managed to ensure clean shutdown and restart when code changes.

## Production Notes

- In production, you'd typically use Redis or RabbitMQ as the message broker
- Consider using `celery beat` for scheduled tasks
- Multiple worker processes can be used for better performance
- The HMR pattern here is primarily for development - in production, you'd use proper deployment strategies

## Troubleshooting

- **Worker not starting**: Make sure no other Celery workers are running on the same broker
- **Tasks not updating**: Ensure you're saving the files and HMR is detecting the changes
- **Import errors**: Check that all modules are properly importing each other

Enjoy experimenting with Celery and HMR! 🚀
24 changes: 24 additions & 0 deletions examples/celery/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""
Celery app configuration for HMR example.
"""

from celery import Celery

# Create Celery instance
app = Celery("hmr_celery_example")

# Configure Celery
app.conf.update(
broker_url="memory://", # Use in-memory broker for demo
result_backend="cache+memory://", # Use in-memory result backend
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
# Important: Set this to False to allow task updates without restart
worker_hijack_root_logger=False,
)

# Auto-discover tasks
app.autodiscover_tasks(["tasks"])
11 changes: 11 additions & 0 deletions examples/celery/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[project]
name = "celery-example"
version = "0"
requires-python = ">=3.12"
dependencies = [
"celery~=5.4.0",
"hmr~=0.6.0",
]

[tool.uv]
package = false
56 changes: 56 additions & 0 deletions examples/celery/sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Task sender script for testing the Celery HMR example.

This script sends various tasks to the worker to demonstrate HMR functionality.
"""

from app import app
from utils import logger


def send_test_tasks():
"""Send a variety of test tasks to demonstrate HMR."""
logger.info("Sending test tasks to Celery worker...")

# Task 1: Simple addition
logger.info("\n1. Sending addition task...")
result = app.send_task("tasks.add_numbers", args=[10, 5])
logger.info(f"Task ID: {result.id}")
logger.info(f"Result: {result.get(timeout=10)}")

# Task 2: Data processing
logger.info("\n2. Sending data processing task...")
result = app.send_task("tasks.process_data", args=["hello world"])
logger.info(f"Task ID: {result.id}")
logger.info(f"Result: {result.get(timeout=10)}")

# Task 3: Numeric data processing
logger.info("\n3. Sending numeric data processing task...")
result = app.send_task("tasks.process_data", args=[42])
logger.info(f"Task ID: {result.id}")
logger.info(f"Result: {result.get(timeout=10)}")

# Task 4: Slow task (async)
logger.info("\n4. Sending slow task (will run in background)...")
result = app.send_task("tasks.slow_task", args=[3])
logger.info(f"Task ID: {result.id}")
logger.info("Slow task started - check worker logs for progress")

# Task 5: Failing task (to demonstrate error handling)
logger.info("\n5. Sending failing task...")
try:
result = app.send_task("tasks.failing_task")
logger.info(f"Task ID: {result.id}")
result.get(timeout=10)
except Exception as e:
logger.info(f"Task failed as expected: {e}")

logger.info("\n" + "=" * 50)
logger.info("All test tasks sent!")
logger.info("Try modifying tasks.py and running this script again")
logger.info("to see HMR in action.")
logger.info("=" * 50)


if __name__ == "__main__":
send_test_tasks()
50 changes: 50 additions & 0 deletions examples/celery/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
Celery tasks that can be hot-reloaded with HMR.
"""

from app import app
from utils import logger


@app.task
def add_numbers(x, y):
"""Simple addition task."""
result = x + y
logger.info(f"Adding {x} + {y} = {result}")
return result


@app.task
def process_data(data):
"""Process some data - this can be updated without restarting worker."""
logger.info(f"Processing data: {data}")

# This logic can be modified and will be hot-reloaded
processed = {"original": data, "length": len(str(data)), "doubled": data * 2 if isinstance(data, int | float) else f"{data}_{data}", "type": type(data).__name__}

logger.info(f"Processed result: {processed}")
return processed


@app.task
def slow_task(seconds=5):
"""A task that takes some time - useful for testing HMR during execution."""
import time

logger.info(f"Starting slow task that will take {seconds} seconds...")

for i in range(seconds):
time.sleep(1)
logger.info(f"Slow task progress: {i + 1}/{seconds}")

logger.info("Slow task completed!")
return f"Completed after {seconds} seconds"


@app.task
def failing_task():
"""A task that fails - can be fixed with HMR."""
logger.info("This task is about to fail...")
# Uncomment the next line to fix this task:
# return "Task fixed with HMR!"
raise Exception("This task always fails - fix me with HMR!")
13 changes: 13 additions & 0 deletions examples/celery/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""
Utilities for the Celery HMR example.
"""

import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")

logger = logging.getLogger("celery_hmr_example")

# This can be modified to change log level or format
logger.info("Utilities module loaded")
85 changes: 85 additions & 0 deletions examples/celery/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""
Celery worker entry point with HMR integration.

This file demonstrates how to run a Celery worker that can hot-reload
task definitions and other modules without restarting the worker process.
"""

import sys
from atexit import register, unregister
from threading import Thread
from typing import cast

import tasks # noqa: F401 - Import needed to register tasks with Celery
from app import app
from utils import logger


class CeleryWorkerThread(Thread):
"""Thread to run Celery worker."""

def __init__(self):
super().__init__(daemon=True)
self.worker = None

def run(self):
"""Start the Celery worker."""
logger.info("Starting Celery worker...")

# Create worker instance
self.worker = app.Worker(
loglevel="INFO",
concurrency=1,
# Important: Don't hijack the root logger to play nice with HMR
hijack_root_logger=False,
)

# Start the worker
try:
self.worker.start()
except Exception as e:
logger.error(f"Worker error: {e}")

def stop(self):
"""Stop the Celery worker."""
if self.worker:
logger.info("Stopping Celery worker...")
self.worker.stop()


def start_worker():
"""Start or restart the Celery worker."""
global worker_thread

logger.info("=" * 50)
logger.info("Starting Celery HMR Worker")
logger.info("=" * 50)

# Stop existing worker if any
if worker_thread := cast("CeleryWorkerThread | None", globals().get("worker_thread")):
unregister(worker_thread.stop)
worker_thread.stop()

# Start new worker
worker_thread = CeleryWorkerThread()
worker_thread.start()
register(worker_thread.stop)

logger.info("Worker started! You can now:")
logger.info("1. Run 'python sender.py' in another terminal to send tasks")
logger.info("2. Modify tasks.py to see HMR in action")
logger.info("3. Press Ctrl+C to stop")


if __name__ == "__main__":
start_worker()

try:
# Keep the main thread alive
while True:
import time

time.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down...")
sys.exit(0)
Loading