Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 85 additions & 64 deletions backend/app/core/orchestration/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,130 +9,151 @@

logger = logging.getLogger(__name__)

# Single queue name for all priorities (broker handles ordering via x-max-priority)
DEFAULT_QUEUE_NAME = "task_queue"
MAX_PRIORITY = 10 # RabbitMQ priority 0-255; higher = more urgent


class QueuePriority(str, Enum):
HIGH = "high"
MEDIUM = "medium"
LOW = "low"


# Map enum to numeric priority for RabbitMQ (higher number = higher priority)
PRIORITY_MAP = {
QueuePriority.HIGH: 10,
QueuePriority.MEDIUM: 5,
QueuePriority.LOW: 1,
}

class AsyncQueueManager:
"""Queue manager for agent orchestration"""
"""Queue manager for agent orchestration using a single RabbitMQ priority queue."""

def __init__(self):
self.queues = {
QueuePriority.HIGH: 'high_task_queue',
QueuePriority.MEDIUM: 'medium_task_queue',
QueuePriority.LOW: 'low_task_queue'
}
def __init__(self, queue_name: str = DEFAULT_QUEUE_NAME):
self.queue_name = queue_name
self.handlers: Dict[str, Callable] = {}
self.running = False
self.worker_tasks = []
self.worker_tasks: list[asyncio.Task] = []
self.connection: Optional[aio_pika.RobustConnection] = None
self.channel: Optional[aio_pika.abc.AbstractChannel] = None



async def connect(self):
async def connect(self) -> None:
try:
rabbitmq_url = getattr(settings, 'rabbitmq_url', 'amqp://guest:guest@localhost/')
rabbitmq_url = getattr(
settings, "rabbitmq_url", "amqp://guest:guest@localhost/"
)
self.connection = await aio_pika.connect_robust(rabbitmq_url)
self.channel = await self.connection.channel()
# Declare queues
for queue_name in self.queues.values():
await self.channel.declare_queue(queue_name, durable=True)
logger.info("Successfully connected to RabbitMQ")
# Prefetch: broker sends at most this many unacked messages per consumer
await self.channel.set_qos(prefetch_count=1)
# Single priority queue: broker orders by message priority, no polling
await self.channel.declare_queue(
self.queue_name,
durable=True,
arguments={"x-max-priority": MAX_PRIORITY},
)
logger.info("Successfully connected to RabbitMQ (single priority queue)")
except Exception as e:
logger.error(f"Failed to connect to RabbitMQ: {e}")
raise

async def start(self, num_workers: int = 3):
"""Start the queue processing workers"""
async def start(self, num_workers: int = 3) -> None:
"""Start the queue processing workers (push-based consumers, no polling)."""
await self.connect()
self.running = True

for i in range(num_workers):
task = asyncio.create_task(self._worker(f"worker-{i}"))
self.worker_tasks.append(task)

logger.info(f"Started {num_workers} async queue workers")
logger.info(f"Started {num_workers} async queue workers on {self.queue_name}")

async def stop(self):
"""Stop the queue processing"""
async def stop(self) -> None:
"""Stop the queue processing and close connections."""
self.running = False

# Cancel all worker tasks
for task in self.worker_tasks:
task.cancel()

await asyncio.gather(*self.worker_tasks, return_exceptions=True)
self.worker_tasks.clear()
if self.channel:
await self.channel.close()
if self.connection:
await self.connection.close()
logger.info("Stopped all queue workers and closed connection")

async def enqueue(self,
message: Dict[str, Any],
priority: QueuePriority = QueuePriority.MEDIUM,
delay: float = 0):
"""Add a message to the queue"""

async def enqueue(
self,
message: Dict[str, Any],
priority: QueuePriority = QueuePriority.MEDIUM,
delay: float = 0,
) -> None:
"""Add a message to the single priority queue."""
if delay > 0:
await asyncio.sleep(delay)

queue_item = {
"id": message.get("id", f"msg_{datetime.now().timestamp()}"),
"priority": priority,
"data": message
"priority": priority.value,
"data": message,
}
json_message = json.dumps(queue_item).encode()
json_body = json.dumps(queue_item).encode()
numeric_priority = PRIORITY_MAP[priority]

await self.channel.default_exchange.publish(
aio_pika.Message(body=json_message),
routing_key=self.queues[priority]
aio_pika.Message(body=json_body, priority=numeric_priority),
routing_key=self.queue_name,
)
logger.info(f"Enqueued message {queue_item['id']} with priority {priority}")
Comment on lines +87 to 109
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, find the queue_manager.py file and examine the current state
find . -name "queue_manager.py" -type f

Repository: AOSSIE-Org/Devr.AI

Length of output: 111


🏁 Script executed:

# Check for requirements/dependencies to see aio-pika version
find . -type f \( -name "requirements.txt" -o -name "setup.py" -o -name "pyproject.toml" -o -name "poetry.lock" -o -name "Pipfile" \) | head -20

Repository: AOSSIE-Org/Devr.AI

Length of output: 310


🏁 Script executed:

# Search for PRIORITY_MAP and related queue configuration
rg "PRIORITY_MAP|QueuePriority|delivery_mode" --type python -B 3 -A 3

Repository: AOSSIE-Org/Devr.AI

Length of output: 92


🏁 Script executed:

# Read the queue_manager.py file to see complete implementation
cat -n ./backend/app/core/orchestration/queue_manager.py

Repository: AOSSIE-Org/Devr.AI

Length of output: 7410


🏁 Script executed:

# Check pyproject.toml and requirements.txt for aio-pika version
cat ./pyproject.toml

Repository: AOSSIE-Org/Devr.AI

Length of output: 2034


🏁 Script executed:

# Search for queue initialization and configuration
rg "queue_declare|durable" -B 5 -A 5

Repository: AOSSIE-Org/Devr.AI

Length of output: 2098


🌐 Web query:

aio-pika 9.5.5 DeliveryMode PERSISTENT RabbitMQ durable queue

💡 Result:

To make messages survive a RabbitMQ broker restart with aio-pika 9.5.5, you need both:

  1. A durable queue (and a durable exchange if you’re not using the default exchange), and
  2. Persistent messages (delivery_mode=DeliveryMode.PERSISTENT, i.e. AMQP delivery-mode = 2). [1][2]
import aio_pika
from aio_pika import Message, DeliveryMode, ExchangeType

connection = await aio_pika.connect("amqp://guest:guest@localhost/")

async with connection:
    channel = await connection.channel(publisher_confirms=True)  # default=True [3][4]

    # If you publish via your own exchange, make it durable:
    exchange = await channel.declare_exchange(
        "my_exchange", ExchangeType.DIRECT, durable=True
    )

    queue = await channel.declare_queue("my_queue", durable=True)  # [1]
    await queue.bind(exchange, routing_key="rk")

    msg = Message(b"hello", delivery_mode=DeliveryMode.PERSISTENT)  # [1]
    await exchange.publish(msg, routing_key="rk")

Notes:

  • RabbitMQ’s docs: durable queues are recovered on boot, including messages in them that were published as persistent; transient messages are discarded during recovery. [2]
  • Persistence alone isn’t a hard guarantee; for stronger safety you should use publisher confirms. In RabbitMQ, a confirm for a persistent message routed to a durable queue implies it was persisted to disk; aio-pika enables confirms on channels by default (publisher_confirms=True). [1][4]

Sources:
[1] aio-pika tutorial (Work Queues: durable queue + DeliveryMode.PERSISTENT, confirms note) (docs.aio-pika.com)
[2] RabbitMQ docs (Queues: durability + persistent publishing requirement) (rabbitmq.com)
[3] aio-pika API reference (channel publisher_confirms: bool = True) (docs.aio-pika.com)
[4] RabbitMQ docs (Confirms: persistent-to-durable implies disk persistence before ack) (rabbitmq.com)

Citations:


🌐 Web query:

RabbitMQ message persistence vs queue durability delivery_mode

💡 Result:

In RabbitMQ (AMQP 0-9-1), queue/exchange “durability” and message “persistence” (delivery_mode) are different knobs:

1) Durable queue / durable exchange (metadata durability)

  • durable=true on a queue or exchange means its definition/metadata is stored so it can be recreated after a broker restart. (rabbitmq.com)
  • If a queue is non-durable (transient), it is deleted on node boot, and any messages in it are lost. (rabbitmq.com)

2) delivery_mode / persistent message (message durability)

  • A message is “persistent” when published with delivery_mode=2 (many clients expose this as persistent: true). (rabbitmq.com)
  • On restart, only messages that were published as persistent are recovered from a durable queue. Messages published as transient (delivery_mode=1) are discarded during recovery, even if they were in a durable queue. (rabbitmq.com)

The key rule (what you usually want)

To survive a broker restart, you generally need:

  • durable exchange + durable queue (so the topology exists after restart), and
  • persistent messages (delivery_mode=2) (so the messages are recovered). (rabbitmq.com)

Important reliability note: persistence ≠ guaranteed

Even with delivery_mode=2 and durable queues, a message can still be lost if the broker fails before it’s actually flushed to disk. To know when it’s safely stored, enable publisher confirms (then the confirm for a persistent message routed to a durable queue is sent only after it’s persisted). (rabbitmq.com)

Citations:


Add persistent delivery mode to survive broker restarts.

Queue durability and message persistence are separate in RabbitMQ—the durable queue metadata survives restarts, but without delivery_mode=DeliveryMode.PERSISTENT, messages themselves are transient and discarded on broker recovery, causing task loss.

Proposed fix
 await self.channel.default_exchange.publish(
-    aio_pika.Message(body=json_body, priority=numeric_priority),
+    aio_pika.Message(
+        body=json_body,
+        priority=numeric_priority,
+        delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
+    ),
     routing_key=self.queue_name,
 )
🤖 Prompt for AI Agents
In `@backend/app/core/orchestration/queue_manager.py` around lines 87 - 109, The
enqueue method currently publishes messages with aio_pika.Message(...) but omits
setting delivery mode, so messages are not persisted; update the call in enqueue
to pass delivery_mode=DeliveryMode.PERSISTENT when constructing aio_pika.Message
(and add/import DeliveryMode from aio_pika if not present) so messages survive
broker restarts; keep using PRIORITY_MAP and existing routing_key but ensure the
message constructor includes delivery_mode=DeliveryMode.PERSISTENT.


def register_handler(self, message_type: str, handler: Callable):
"""Register a handler for a specific message type"""
def register_handler(self, message_type: str, handler: Callable) -> None:
"""Register a handler for a specific message type."""
self.handlers[message_type] = handler
logger.info(f"Registered handler for message type: {message_type}")

async def _worker(self, worker_name: str):
"""Worker coroutine to process queue items"""
async def _worker(self, worker_name: str) -> None:
"""Worker: long-lived consumer on the single queue (push-based, no polling)."""
logger.info(f"Started queue worker: {worker_name}")
# Each worker listens to all queues by priority
queues = [
await self.channel.declare_queue(self.queues[priority], durable=True)
for priority in [QueuePriority.HIGH, QueuePriority.MEDIUM, QueuePriority.LOW]
]
while self.running:
for queue in queues:
try:
message = await queue.get(no_ack=False, fail=False)
if message:
try:
item = json.loads(message.body.decode())
await self._process_item(item, worker_name)
await message.ack()
except Exception as e:
logger.error(f"Error processing message: {e}")
await message.nack(requeue=False)
except asyncio.CancelledError:
logger.info(f"Worker {worker_name} cancelled")
return
except Exception as e:
logger.error(f"Worker {worker_name} error: {e}")
await asyncio.sleep(0.1)

async def _process_item(self, item: Dict[str, Any], worker_name: str):
"""Process a queue item"""

queue = await self.channel.declare_queue(
self.queue_name,
durable=True,
arguments={"x-max-priority": MAX_PRIORITY},
)

try:
async with queue.iterator() as queue_iter:
async for message in queue_iter:
if not self.running:
break
try:
item = json.loads(message.body.decode())
await self._process_item(item, worker_name)
await message.ack()
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(f"Error processing message: {e}")
await message.nack(requeue=False)
except asyncio.CancelledError:
logger.info(f"Worker {worker_name} cancelled")
except Exception as e:
logger.error(f"Worker {worker_name} error: {e}")

async def _process_item(self, item: Dict[str, Any], worker_name: str) -> None:
"""Process a queue item by message type."""
try:
message_data = item["data"]
message_type = message_data.get("type", "unknown")

handler = self.handlers.get(message_type)

if handler:
logger.debug(f"Worker {worker_name} processing {item['id']} (type: {message_type})")
logger.debug(
f"Worker {worker_name} processing {item['id']} (type: {message_type})"
)
if asyncio.iscoroutinefunction(handler):
await handler(message_data)
else:
Expand Down
4 changes: 0 additions & 4 deletions frontend/src/components/pages/index.ts

This file was deleted.

Loading