Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions backend/app/background/celery/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,27 @@

logger = get_logger(__name__)

celery_app = Celery(
"interxai_worker",
broker=settings.REDIS_URL,
backend=settings.REDIS_URL,
)
# ---------------------------------------------------------------------------
# Resolve effective broker / backend URLs based on BROKER_TYPE.
# Fallback: when BROKER_URL is empty, use REDIS_URL for backward compat.
# ---------------------------------------------------------------------------
_broker_url: str = settings.BROKER_URL if settings.BROKER_URL else settings.REDIS_URL

if settings.BROKER_TYPE == "rabbitmq":
# RabbitMQ: use amqp(s):// for the broker; rpc:// for the result backend
# so that no external store (Redis, DB) is required.
celery_app = Celery(
"interxai_worker",
broker=_broker_url,
backend="rpc://",
)
else:
# Redis (default)
celery_app = Celery(
"interxai_worker",
broker=_broker_url,
backend=_broker_url,
)

celery_app.conf.update(
task_serializer="json",
Expand All @@ -31,7 +47,15 @@
enable_utc=True,
)

if settings.REDIS_URL.startswith("rediss://"):
# Apply transport-specific SSL / TLS settings
if settings.BROKER_TYPE == "rabbitmq" and _broker_url.startswith("amqps://"):
celery_app.conf.update(
broker_use_ssl={
"ssl": True,
"ssl_cert_reqs": ssl.CERT_NONE,
},
)
elif _broker_url.startswith("rediss://"):
celery_app.conf.update(
broker_use_ssl={"ssl_cert_reqs": ssl.CERT_NONE},
redis_backend_use_ssl={"ssl_cert_reqs": ssl.CERT_NONE},
Expand Down
49 changes: 38 additions & 11 deletions backend/app/background/taskiq/taskiq.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,44 @@
import ssl

from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend

from app.config import settings

if settings.REDIS_URL.startswith("rediss://"):
_ssl_ctx: ssl.SSLContext | None = ssl.create_default_context()
assert _ssl_ctx is not None
_ssl_ctx.check_hostname = False
_ssl_ctx.verify_mode = ssl.CERT_NONE
# ---------------------------------------------------------------------------
# Resolve the effective broker URL.
# When BROKER_TYPE is "redis" (or unset) fall back to REDIS_URL so that
# existing deployments that only configure REDIS_URL keep working.
# ---------------------------------------------------------------------------
_broker_url: str = settings.BROKER_URL if settings.BROKER_URL else settings.REDIS_URL

if settings.BROKER_TYPE == "rabbitmq":
# ------------------------------------------------------------------
# RabbitMQ transport via taskiq-aio-pika
# ------------------------------------------------------------------
from taskiq_aio_pika import AioPikaBroker

if _broker_url.startswith("amqps://"):
# For TLS-secured amqps:// connections, disable hostname / cert
# verification (commonly needed with self-signed certs on PaaS).
_amqp_ssl_ctx: ssl.SSLContext = ssl.create_default_context()
_amqp_ssl_ctx.check_hostname = False
_amqp_ssl_ctx.verify_mode = ssl.CERT_NONE
broker = AioPikaBroker(url=_broker_url, ssl_context=_amqp_ssl_ctx)
else:
broker = AioPikaBroker(url=_broker_url)

else:
_ssl_ctx = None
# ------------------------------------------------------------------
# Redis transport (default) via taskiq-redis
# ------------------------------------------------------------------
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend

if _broker_url.startswith("rediss://"):
_redis_ssl_ctx: ssl.SSLContext | None = ssl.create_default_context()
assert _redis_ssl_ctx is not None
_redis_ssl_ctx.check_hostname = False
_redis_ssl_ctx.verify_mode = ssl.CERT_NONE
else:
_redis_ssl_ctx = None

broker = ListQueueBroker(url=settings.REDIS_URL).with_result_backend(
RedisAsyncResultBackend(redis_url=settings.REDIS_URL)
)
broker = ListQueueBroker(url=_broker_url).with_result_backend(
RedisAsyncResultBackend(redis_url=_broker_url)
)
10 changes: 10 additions & 0 deletions backend/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ class Settings(BaseSettings):
# Redis/Celery
REDIS_URL: str = "redis://localhost:6379/0"

# Broker – set BROKER_TYPE to "rabbitmq" and provide an amqp(s):// URL
# to use RabbitMQ instead of Redis for both Taskiq and Celery.
# Accepted values: "redis" | "rabbitmq"
BROKER_TYPE: str = "redis"
# When BROKER_TYPE="rabbitmq" this should be an amqp(s):// URL, e.g.
# amqp://guest:guest@localhost:5672/
# amqps://user:pass@rabbit.host:5671/vhost
# When BROKER_TYPE="redis" it defaults to REDIS_URL automatically.
BROKER_URL: str = ""

# LLM
LLM_MODEL_NAME: str = "groq/openai/gpt-oss-120b"
GROQ_API_KEY: str = ""
Expand Down
2 changes: 2 additions & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ dependencies = [
"pypdf2>=3.0.1",
"taskiq>=0.11.0",
"taskiq-redis>=0.5.0",
"taskiq-aio-pika>=0.6.0",
"aio-pika>=9.0.0",
"python-multipart>=0.0.27",
"supabase>=2.30.0",
"authlib>=1.7.2",
Expand Down
Loading