diff --git a/examples/celery/README.md b/examples/celery/README.md new file mode 100644 index 0000000..3e60df8 --- /dev/null +++ b/examples/celery/README.md @@ -0,0 +1,111 @@ +# Celery HMR Example + +This example demonstrates how to use HMR (Hot Module Reload) with Celery workers and tasks. With HMR, you can modify task implementations and see the changes take effect immediately without restarting the Celery worker. + +## What This Example Shows + +- **Hot-reloadable Celery tasks**: Modify task functions and see changes immediately +- **Worker lifecycle management**: Proper integration between HMR and Celery worker threads +- **Real-time task testing**: Send tasks while modifying code to see live updates +- **Error handling**: Demonstrate fixing broken tasks without worker restart + +## Files Structure + +- `app.py`: Celery app configuration with in-memory broker for demo +- `tasks.py`: Example tasks that can be hot-reloaded +- `worker.py`: Main entry point that starts Celery worker with HMR +- `sender.py`: Script to send test tasks to the worker +- `utils.py`: Logging and utility functions + +## How to Run + +1. **Install dependencies** (from this directory): + ```sh + uv sync + ``` + +2. **Start the worker with HMR** (in one terminal): + ```sh + hmr worker.py + ``` + + You should see output like: + ``` + Starting Celery HMR Worker + Worker started! You can now: + 1. Run 'python sender.py' in another terminal to send tasks + 2. Modify tasks.py to see HMR in action + 3. Press Ctrl+C to stop + ``` + +3. **Send test tasks** (in another terminal, from this directory): + ```sh + python sender.py + ``` + +## Demonstrating HMR + +Once both the worker and sender are running, try these experiments: + +### 1. Modify Task Logic +Edit `tasks.py` and change the `process_data` function. For example: +```python +# Change this line in process_data function: +'doubled': data * 2 if isinstance(data, (int, float)) else f"{data}_{data}", +# To: +'tripled': data * 3 if isinstance(data, (int, float)) else f"{data}_{data}_{data}", +``` + +Then run `python sender.py` again - you'll see the changes immediately! + +### 2. Fix the Failing Task +The `failing_task` function is designed to fail. Fix it by uncommenting the return statement: +```python +@app.task +def failing_task(): + """A task that fails - can be fixed with HMR.""" + logger.info("This task is about to fail...") + # Uncomment the next line to fix this task: + return "Task fixed with HMR!" + # raise Exception("This task always fails - fix me with HMR!") +``` + +### 3. Add New Tasks +Add a completely new task to `tasks.py`: +```python +@app.task +def new_task(message): + """A new task added via HMR.""" + logger.info(f"New task received: {message}") + return f"Processed: {message}" +``` + +Then modify `sender.py` to call your new task and run it again. + +### 4. Update Utilities +Modify `utils.py` to change the logging format or level, and see the changes take effect immediately. + +## How It Works + +1. **HMR Integration**: The `worker.py` file uses a pattern similar to the Flask example, running the Celery worker in a separate thread that can be restarted when modules change. + +2. **Task Registration**: Tasks are automatically discovered and registered with the Celery app. When HMR reloads the tasks module, the new task definitions become available. + +3. **In-Memory Broker**: This example uses an in-memory message broker for simplicity. In production, you'd use Redis, RabbitMQ, or another persistent broker. + +4. **Thread Safety**: The worker thread is properly managed to ensure clean shutdown and restart when code changes. + +## Production Notes + +- In production, you'd typically use Redis or RabbitMQ as the message broker +- Consider using `celery beat` for scheduled tasks +- Multiple worker processes can be used for better performance +- The HMR pattern here is primarily for development - in production, you'd use proper deployment strategies + +## Troubleshooting + +- **Worker not starting**: Make sure no other Celery workers are running on the same broker +- **Tasks not updating**: Ensure you're saving the files and HMR is detecting the changes +- **Import errors**: Check that all modules are properly importing each other + +Enjoy experimenting with Celery and HMR! 🚀 \ No newline at end of file diff --git a/examples/celery/app.py b/examples/celery/app.py new file mode 100644 index 0000000..b6f0017 --- /dev/null +++ b/examples/celery/app.py @@ -0,0 +1,24 @@ +""" +Celery app configuration for HMR example. +""" + +from celery import Celery + +# Create Celery instance +app = Celery("hmr_celery_example") + +# Configure Celery +app.conf.update( + broker_url="memory://", # Use in-memory broker for demo + result_backend="cache+memory://", # Use in-memory result backend + task_serializer="json", + accept_content=["json"], + result_serializer="json", + timezone="UTC", + enable_utc=True, + # Important: Set this to False to allow task updates without restart + worker_hijack_root_logger=False, +) + +# Auto-discover tasks +app.autodiscover_tasks(["tasks"]) diff --git a/examples/celery/pyproject.toml b/examples/celery/pyproject.toml new file mode 100644 index 0000000..2a39789 --- /dev/null +++ b/examples/celery/pyproject.toml @@ -0,0 +1,11 @@ +[project] +name = "celery-example" +version = "0" +requires-python = ">=3.12" +dependencies = [ + "celery~=5.4.0", + "hmr~=0.6.0", +] + +[tool.uv] +package = false \ No newline at end of file diff --git a/examples/celery/sender.py b/examples/celery/sender.py new file mode 100644 index 0000000..4b7feb0 --- /dev/null +++ b/examples/celery/sender.py @@ -0,0 +1,56 @@ +""" +Task sender script for testing the Celery HMR example. + +This script sends various tasks to the worker to demonstrate HMR functionality. +""" + +from app import app +from utils import logger + + +def send_test_tasks(): + """Send a variety of test tasks to demonstrate HMR.""" + logger.info("Sending test tasks to Celery worker...") + + # Task 1: Simple addition + logger.info("\n1. Sending addition task...") + result = app.send_task("tasks.add_numbers", args=[10, 5]) + logger.info(f"Task ID: {result.id}") + logger.info(f"Result: {result.get(timeout=10)}") + + # Task 2: Data processing + logger.info("\n2. Sending data processing task...") + result = app.send_task("tasks.process_data", args=["hello world"]) + logger.info(f"Task ID: {result.id}") + logger.info(f"Result: {result.get(timeout=10)}") + + # Task 3: Numeric data processing + logger.info("\n3. Sending numeric data processing task...") + result = app.send_task("tasks.process_data", args=[42]) + logger.info(f"Task ID: {result.id}") + logger.info(f"Result: {result.get(timeout=10)}") + + # Task 4: Slow task (async) + logger.info("\n4. Sending slow task (will run in background)...") + result = app.send_task("tasks.slow_task", args=[3]) + logger.info(f"Task ID: {result.id}") + logger.info("Slow task started - check worker logs for progress") + + # Task 5: Failing task (to demonstrate error handling) + logger.info("\n5. Sending failing task...") + try: + result = app.send_task("tasks.failing_task") + logger.info(f"Task ID: {result.id}") + result.get(timeout=10) + except Exception as e: + logger.info(f"Task failed as expected: {e}") + + logger.info("\n" + "=" * 50) + logger.info("All test tasks sent!") + logger.info("Try modifying tasks.py and running this script again") + logger.info("to see HMR in action.") + logger.info("=" * 50) + + +if __name__ == "__main__": + send_test_tasks() diff --git a/examples/celery/tasks.py b/examples/celery/tasks.py new file mode 100644 index 0000000..76cc241 --- /dev/null +++ b/examples/celery/tasks.py @@ -0,0 +1,50 @@ +""" +Celery tasks that can be hot-reloaded with HMR. +""" + +from app import app +from utils import logger + + +@app.task +def add_numbers(x, y): + """Simple addition task.""" + result = x + y + logger.info(f"Adding {x} + {y} = {result}") + return result + + +@app.task +def process_data(data): + """Process some data - this can be updated without restarting worker.""" + logger.info(f"Processing data: {data}") + + # This logic can be modified and will be hot-reloaded + processed = {"original": data, "length": len(str(data)), "doubled": data * 2 if isinstance(data, int | float) else f"{data}_{data}", "type": type(data).__name__} + + logger.info(f"Processed result: {processed}") + return processed + + +@app.task +def slow_task(seconds=5): + """A task that takes some time - useful for testing HMR during execution.""" + import time + + logger.info(f"Starting slow task that will take {seconds} seconds...") + + for i in range(seconds): + time.sleep(1) + logger.info(f"Slow task progress: {i + 1}/{seconds}") + + logger.info("Slow task completed!") + return f"Completed after {seconds} seconds" + + +@app.task +def failing_task(): + """A task that fails - can be fixed with HMR.""" + logger.info("This task is about to fail...") + # Uncomment the next line to fix this task: + # return "Task fixed with HMR!" + raise Exception("This task always fails - fix me with HMR!") diff --git a/examples/celery/utils.py b/examples/celery/utils.py new file mode 100644 index 0000000..70464ff --- /dev/null +++ b/examples/celery/utils.py @@ -0,0 +1,13 @@ +""" +Utilities for the Celery HMR example. +""" + +import logging + +# Configure logging +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") + +logger = logging.getLogger("celery_hmr_example") + +# This can be modified to change log level or format +logger.info("Utilities module loaded") diff --git a/examples/celery/worker.py b/examples/celery/worker.py new file mode 100644 index 0000000..bb08dcb --- /dev/null +++ b/examples/celery/worker.py @@ -0,0 +1,85 @@ +""" +Celery worker entry point with HMR integration. + +This file demonstrates how to run a Celery worker that can hot-reload +task definitions and other modules without restarting the worker process. +""" + +import sys +from atexit import register, unregister +from threading import Thread +from typing import cast + +import tasks # noqa: F401 - Import needed to register tasks with Celery +from app import app +from utils import logger + + +class CeleryWorkerThread(Thread): + """Thread to run Celery worker.""" + + def __init__(self): + super().__init__(daemon=True) + self.worker = None + + def run(self): + """Start the Celery worker.""" + logger.info("Starting Celery worker...") + + # Create worker instance + self.worker = app.Worker( + loglevel="INFO", + concurrency=1, + # Important: Don't hijack the root logger to play nice with HMR + hijack_root_logger=False, + ) + + # Start the worker + try: + self.worker.start() + except Exception as e: + logger.error(f"Worker error: {e}") + + def stop(self): + """Stop the Celery worker.""" + if self.worker: + logger.info("Stopping Celery worker...") + self.worker.stop() + + +def start_worker(): + """Start or restart the Celery worker.""" + global worker_thread + + logger.info("=" * 50) + logger.info("Starting Celery HMR Worker") + logger.info("=" * 50) + + # Stop existing worker if any + if worker_thread := cast("CeleryWorkerThread | None", globals().get("worker_thread")): + unregister(worker_thread.stop) + worker_thread.stop() + + # Start new worker + worker_thread = CeleryWorkerThread() + worker_thread.start() + register(worker_thread.stop) + + logger.info("Worker started! You can now:") + logger.info("1. Run 'python sender.py' in another terminal to send tasks") + logger.info("2. Modify tasks.py to see HMR in action") + logger.info("3. Press Ctrl+C to stop") + + +if __name__ == "__main__": + start_worker() + + try: + # Keep the main thread alive + while True: + import time + + time.sleep(1) + except KeyboardInterrupt: + logger.info("Shutting down...") + sys.exit(0)