From ed252eac0da80c5878d7fc679dbd2273eac33a51 Mon Sep 17 00:00:00 2001 From: Anurag Singh Date: Sun, 31 May 2026 18:03:31 +0530 Subject: [PATCH] feat: add pluginable RabbitMQ broker support (closes #28) Introduces BROKER_TYPE and BROKER_URL settings so operators can switch between Redis and RabbitMQ without code changes. Changes ------- * config.py - new BROKER_TYPE ("redis"|"rabbitmq") and BROKER_URL settings; BROKER_URL falls back to REDIS_URL when left empty so existing deployments are unaffected. * taskiq/taskiq.py - creates AioPikaBroker when BROKER_TYPE=rabbitmq (via taskiq-aio-pika); retains ListQueueBroker + RedisAsyncResultBackend for Redis. TLS/SSL context built automatically for amqps:// and rediss://. * celery/celery.py - creates Celery with amqp(s):// broker and rpc:// result backend when BROKER_TYPE=rabbitmq (no extra store required); uses Redis URL + backend otherwise. SSL options applied per transport. * pyproject.toml - adds taskiq-aio-pika>=0.6.0 and aio-pika>=9.0.0 as project dependencies. Usage (RabbitMQ) ---------------- BROKER_TYPE=rabbitmq BROKER_URL=amqp://guest:guest@localhost:5672/ Leave both unset (or BROKER_TYPE=redis) to keep existing Redis behaviour. --- backend/app/background/celery/celery.py | 36 +++++++++++++++--- backend/app/background/taskiq/taskiq.py | 49 +++++++++++++++++++------ backend/app/config.py | 10 +++++ backend/pyproject.toml | 2 + 4 files changed, 80 insertions(+), 17 deletions(-) diff --git a/backend/app/background/celery/celery.py b/backend/app/background/celery/celery.py index 3c69122..ec2da65 100644 --- a/backend/app/background/celery/celery.py +++ b/backend/app/background/celery/celery.py @@ -17,11 +17,27 @@ logger = get_logger(__name__) -celery_app = Celery( - "interxai_worker", - broker=settings.REDIS_URL, - backend=settings.REDIS_URL, -) +# --------------------------------------------------------------------------- +# Resolve effective broker / backend URLs based on BROKER_TYPE. +# Fallback: when BROKER_URL is empty, use REDIS_URL for backward compat. +# --------------------------------------------------------------------------- +_broker_url: str = settings.BROKER_URL if settings.BROKER_URL else settings.REDIS_URL + +if settings.BROKER_TYPE == "rabbitmq": + # RabbitMQ: use amqp(s):// for the broker; rpc:// for the result backend + # so that no external store (Redis, DB) is required. + celery_app = Celery( + "interxai_worker", + broker=_broker_url, + backend="rpc://", + ) +else: + # Redis (default) + celery_app = Celery( + "interxai_worker", + broker=_broker_url, + backend=_broker_url, + ) celery_app.conf.update( task_serializer="json", @@ -31,7 +47,15 @@ enable_utc=True, ) -if settings.REDIS_URL.startswith("rediss://"): +# Apply transport-specific SSL / TLS settings +if settings.BROKER_TYPE == "rabbitmq" and _broker_url.startswith("amqps://"): + celery_app.conf.update( + broker_use_ssl={ + "ssl": True, + "ssl_cert_reqs": ssl.CERT_NONE, + }, + ) +elif _broker_url.startswith("rediss://"): celery_app.conf.update( broker_use_ssl={"ssl_cert_reqs": ssl.CERT_NONE}, redis_backend_use_ssl={"ssl_cert_reqs": ssl.CERT_NONE}, diff --git a/backend/app/background/taskiq/taskiq.py b/backend/app/background/taskiq/taskiq.py index 372234f..1bcd02f 100644 --- a/backend/app/background/taskiq/taskiq.py +++ b/backend/app/background/taskiq/taskiq.py @@ -1,17 +1,44 @@ import ssl -from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend - from app.config import settings -if settings.REDIS_URL.startswith("rediss://"): - _ssl_ctx: ssl.SSLContext | None = ssl.create_default_context() - assert _ssl_ctx is not None - _ssl_ctx.check_hostname = False - _ssl_ctx.verify_mode = ssl.CERT_NONE +# --------------------------------------------------------------------------- +# Resolve the effective broker URL. +# When BROKER_TYPE is "redis" (or unset) fall back to REDIS_URL so that +# existing deployments that only configure REDIS_URL keep working. +# --------------------------------------------------------------------------- +_broker_url: str = settings.BROKER_URL if settings.BROKER_URL else settings.REDIS_URL + +if settings.BROKER_TYPE == "rabbitmq": + # ------------------------------------------------------------------ + # RabbitMQ transport via taskiq-aio-pika + # ------------------------------------------------------------------ + from taskiq_aio_pika import AioPikaBroker + + if _broker_url.startswith("amqps://"): + # For TLS-secured amqps:// connections, disable hostname / cert + # verification (commonly needed with self-signed certs on PaaS). + _amqp_ssl_ctx: ssl.SSLContext = ssl.create_default_context() + _amqp_ssl_ctx.check_hostname = False + _amqp_ssl_ctx.verify_mode = ssl.CERT_NONE + broker = AioPikaBroker(url=_broker_url, ssl_context=_amqp_ssl_ctx) + else: + broker = AioPikaBroker(url=_broker_url) + else: - _ssl_ctx = None + # ------------------------------------------------------------------ + # Redis transport (default) via taskiq-redis + # ------------------------------------------------------------------ + from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend + + if _broker_url.startswith("rediss://"): + _redis_ssl_ctx: ssl.SSLContext | None = ssl.create_default_context() + assert _redis_ssl_ctx is not None + _redis_ssl_ctx.check_hostname = False + _redis_ssl_ctx.verify_mode = ssl.CERT_NONE + else: + _redis_ssl_ctx = None -broker = ListQueueBroker(url=settings.REDIS_URL).with_result_backend( - RedisAsyncResultBackend(redis_url=settings.REDIS_URL) -) + broker = ListQueueBroker(url=_broker_url).with_result_backend( + RedisAsyncResultBackend(redis_url=_broker_url) + ) diff --git a/backend/app/config.py b/backend/app/config.py index dea15df..2414ceb 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -17,6 +17,16 @@ class Settings(BaseSettings): # Redis/Celery REDIS_URL: str = "redis://localhost:6379/0" + # Broker – set BROKER_TYPE to "rabbitmq" and provide an amqp(s):// URL + # to use RabbitMQ instead of Redis for both Taskiq and Celery. + # Accepted values: "redis" | "rabbitmq" + BROKER_TYPE: str = "redis" + # When BROKER_TYPE="rabbitmq" this should be an amqp(s):// URL, e.g. + # amqp://guest:guest@localhost:5672/ + # amqps://user:pass@rabbit.host:5671/vhost + # When BROKER_TYPE="redis" it defaults to REDIS_URL automatically. + BROKER_URL: str = "" + # LLM LLM_MODEL_NAME: str = "groq/openai/gpt-oss-120b" GROQ_API_KEY: str = "" diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 369060f..7bed285 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -25,6 +25,8 @@ dependencies = [ "pypdf2>=3.0.1", "taskiq>=0.11.0", "taskiq-redis>=0.5.0", + "taskiq-aio-pika>=0.6.0", + "aio-pika>=9.0.0", "python-multipart>=0.0.27", "supabase>=2.30.0", "authlib>=1.7.2",