-
Notifications
You must be signed in to change notification settings - Fork 132
refactor: update AsyncQueueManager to use a single RabbitMQ priority queue and enhance connection handling; add default queue name and priority mapping #257
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…queue and enhance connection handling; add default queue name and priority mapping
📝 WalkthroughWalkthroughRefactored AsyncQueueManager to consolidate per-priority RabbitMQ queues into a single priority-enabled queue, replacing inefficient polling with push-based async iterator consumption, adding explicit type hints and improved logging throughout. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@smokeyScraper This should solve the multiple Queue issue and utilise resources more properly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/core/orchestration/queue_manager.py (1)
145-165: Critical: Swallowed exceptions cause failed messages to be acknowledged.When a handler raises an exception,
_process_itemcatches it (line 164), logs it, and returns normally. Back in_worker, execution continues tomessage.ack()(line 134), so the failed message is acknowledged and lost instead of being nacked.The exception must be re-raised so the worker's exception handler can properly nack the message.
Proposed fix
async def _process_item(self, item: Dict[str, Any], worker_name: str) -> None: """Process a queue item by message type.""" - try: - message_data = item["data"] - message_type = message_data.get("type", "unknown") + message_data = item["data"] + message_type = message_data.get("type", "unknown") - handler = self.handlers.get(message_type) + handler = self.handlers.get(message_type) - if handler: - logger.debug( - f"Worker {worker_name} processing {item['id']} (type: {message_type})" - ) - if asyncio.iscoroutinefunction(handler): - await handler(message_data) - else: - handler(message_data) + if handler: + logger.debug( + f"Worker {worker_name} processing {item['id']} (type: {message_type})" + ) + if asyncio.iscoroutinefunction(handler): + await handler(message_data) else: - logger.warning(f"No handler found for message type: {message_type}") - - except Exception as e: - logger.error(f"Error processing item {item.get('id', 'unknown')}: {str(e)}") + handler(message_data) + else: + logger.warning(f"No handler found for message type: {message_type}")By removing the try/except wrapper, any handler exception will propagate to
_worker, which will then log it and callmessage.nack(requeue=False).
🤖 Fix all issues with AI agents
In `@backend/app/core/orchestration/queue_manager.py`:
- Around line 87-109: The enqueue method can be called when the manager is not
connected (self.channel is None), so add a guard at the start of enqueue (in
function enqueue) that checks connection state (e.g., self.channel is not None
and not closed or a self.started flag set by start/stop) and if not connected
raise a clear exception (ConnectionError or RuntimeError) and/or log an error
before returning; ensure this check is applied before calling
self.channel.default_exchange.publish and reference self.queue_name in the error
message for context.
🧹 Nitpick comments (2)
backend/app/core/orchestration/queue_manager.py (2)
57-59: Uselogging.exceptionto capture stack trace.When logging errors in exception handlers,
logging.exceptionautomatically includes the stack trace, which aids debugging.Proposed fix
except Exception as e: - logger.error(f"Failed to connect to RabbitMQ: {e}") + logger.exception(f"Failed to connect to RabbitMQ: {e}") raise
135-143: Uselogging.exceptionfor error handlers to capture stack traces.Per static analysis hints,
logging.exceptionautomatically includes the traceback, which is helpful for debugging processing and worker errors.Proposed fix
except asyncio.CancelledError: raise except Exception as e: - logger.error(f"Error processing message: {e}") + logger.exception(f"Error processing message: {e}") await message.nack(requeue=False) except asyncio.CancelledError: logger.info(f"Worker {worker_name} cancelled") except Exception as e: - logger.error(f"Worker {worker_name} error: {e}") + logger.exception(f"Worker {worker_name} error: {e}")
| async def enqueue( | ||
| self, | ||
| message: Dict[str, Any], | ||
| priority: QueuePriority = QueuePriority.MEDIUM, | ||
| delay: float = 0, | ||
| ) -> None: | ||
| """Add a message to the single priority queue.""" | ||
| if delay > 0: | ||
| await asyncio.sleep(delay) | ||
|
|
||
| queue_item = { | ||
| "id": message.get("id", f"msg_{datetime.now().timestamp()}"), | ||
| "priority": priority, | ||
| "data": message | ||
| "priority": priority.value, | ||
| "data": message, | ||
| } | ||
| json_message = json.dumps(queue_item).encode() | ||
| json_body = json.dumps(queue_item).encode() | ||
| numeric_priority = PRIORITY_MAP[priority] | ||
|
|
||
| await self.channel.default_exchange.publish( | ||
| aio_pika.Message(body=json_message), | ||
| routing_key=self.queues[priority] | ||
| aio_pika.Message(body=json_body, priority=numeric_priority), | ||
| routing_key=self.queue_name, | ||
| ) | ||
| logger.info(f"Enqueued message {queue_item['id']} with priority {priority}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against publishing when not connected.
If enqueue() is called before start() or after stop(), self.channel will be None, causing an AttributeError on self.channel.default_exchange.publish(). Add a connection check to provide a clear error.
Proposed fix
async def enqueue(
self,
message: Dict[str, Any],
priority: QueuePriority = QueuePriority.MEDIUM,
delay: float = 0,
) -> None:
"""Add a message to the single priority queue."""
+ if not self.channel:
+ raise RuntimeError("Cannot enqueue: not connected. Call start() first.")
+
if delay > 0:
await asyncio.sleep(delay)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def enqueue( | |
| self, | |
| message: Dict[str, Any], | |
| priority: QueuePriority = QueuePriority.MEDIUM, | |
| delay: float = 0, | |
| ) -> None: | |
| """Add a message to the single priority queue.""" | |
| if delay > 0: | |
| await asyncio.sleep(delay) | |
| queue_item = { | |
| "id": message.get("id", f"msg_{datetime.now().timestamp()}"), | |
| "priority": priority, | |
| "data": message | |
| "priority": priority.value, | |
| "data": message, | |
| } | |
| json_message = json.dumps(queue_item).encode() | |
| json_body = json.dumps(queue_item).encode() | |
| numeric_priority = PRIORITY_MAP[priority] | |
| await self.channel.default_exchange.publish( | |
| aio_pika.Message(body=json_message), | |
| routing_key=self.queues[priority] | |
| aio_pika.Message(body=json_body, priority=numeric_priority), | |
| routing_key=self.queue_name, | |
| ) | |
| logger.info(f"Enqueued message {queue_item['id']} with priority {priority}") | |
| async def enqueue( | |
| self, | |
| message: Dict[str, Any], | |
| priority: QueuePriority = QueuePriority.MEDIUM, | |
| delay: float = 0, | |
| ) -> None: | |
| """Add a message to the single priority queue.""" | |
| if not self.channel: | |
| raise RuntimeError("Cannot enqueue: not connected. Call start() first.") | |
| if delay > 0: | |
| await asyncio.sleep(delay) | |
| queue_item = { | |
| "id": message.get("id", f"msg_{datetime.now().timestamp()}"), | |
| "priority": priority.value, | |
| "data": message, | |
| } | |
| json_body = json.dumps(queue_item).encode() | |
| numeric_priority = PRIORITY_MAP[priority] | |
| await self.channel.default_exchange.publish( | |
| aio_pika.Message(body=json_body, priority=numeric_priority), | |
| routing_key=self.queue_name, | |
| ) | |
| logger.info(f"Enqueued message {queue_item['id']} with priority {priority}") |
🤖 Prompt for AI Agents
In `@backend/app/core/orchestration/queue_manager.py` around lines 87 - 109, The
enqueue method can be called when the manager is not connected (self.channel is
None), so add a guard at the start of enqueue (in function enqueue) that checks
connection state (e.g., self.channel is not None and not closed or a
self.started flag set by start/stop) and if not connected raise a clear
exception (ConnectionError or RuntimeError) and/or log an error before
returning; ensure this check is applied before calling
self.channel.default_exchange.publish and reference self.queue_name in the error
message for context.
Pull Request
Closes #251
📝 Description
Refactors
AsyncQueueManagerto fix inefficient queue polling and make it production-ready. Replaces three separate queues (high_task_queue,medium_task_queue,low_task_queue) with a single RabbitMQ priority queue and push-based consumption. Workers no longer poll withqueue.get()and a fixed sleep; priority is enforced by the broker viax-max-priorityand per-messagepriority, and workers usequeue.iterator()so the broker pushes messages instead of the app polling.🔧 Changes Made
task_queue) declared withx-max-priority: 10. All messages go to this queue with a numeric priority (HIGH=10, MEDIUM=5, LOW=1).queue.get(). Each worker usesasync with queue.iterator(): async for message in queue_iterso the broker pushes messages; no polling orasyncio.sleep(0.1).channel.set_qos(prefetch_count=1)so the broker doesn’t over-deliver; each consumer has at most one unacked message.enqueue()publishes to the single queue withaio_pika.Message(..., priority=numeric_priority)and the samerouting_key. Public API unchanged:enqueue(message, priority=QueuePriority.MEDIUM, delay=0).stop()clearsworker_tasksaftergatherand closes channel/connection. Workers break out of the iterator when cancelled or whenself.runningis False.📷 Screenshots or Visual Changes (if applicable)
N/A
🤝 Collaboration
N/A
✅ Checklist
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.