Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/app/core/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Settings(BaseSettings):
# DB configuration
supabase_url: str
supabase_key: str
database_url: Optional[str] = None

# LangSmith Tracing
langsmith_tracing: bool = False
Expand Down
8 changes: 6 additions & 2 deletions backend/app/core/handler/handler_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ def register(self, event_types: List[EventType], handler_class: Type[BaseHandler

def get_handler(self, event: BaseEvent) -> BaseHandler:
"""Get handler instance for an event"""
# Handle both enum and string values for platform and event_type
platform_val = event.platform.value if hasattr(event.platform, 'value') else event.platform
event_type_val = event.event_type.value if hasattr(event.event_type, 'value') else event.event_type

# Try platform-specific handler first
key = f"{event.platform.value}:{event.event_type.value}"
key = f"{platform_val}:{event_type_val}"
handler_class = self.handlers.get(key)

# Fall back to generic event type handler
if not handler_class:
key = event.event_type.value
key = event_type_val
handler_class = self.handlers.get(key)

if not handler_class:
Expand Down
4 changes: 0 additions & 4 deletions backend/app/core/handler/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ async def _handle_message_created(self, event: BaseEvent) -> Dict[str, Any]:
)
return await self.faq_handler.handle(faq_event)

# Implementation for new message creation
# - Check if it's a command
# - Check if it's a question
# - Process natural language
return {"success": True, "action": "message_processed"}

async def _handle_message_updated(self, event: BaseEvent) -> Dict[str, Any]:
Expand Down
54 changes: 54 additions & 0 deletions backend/app/database/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from typing import AsyncGenerator
from app.core.config import settings
import logging

logger = logging.getLogger(__name__)

# Database configuration
DATABASE_URL = settings.database_url

if not DATABASE_URL:
logger.warning("DATABASE_URL is not set. Database connection pooling will not be available.")
# Fallback or strict error depending on requirements.
# For now ensuring tests/code doesn't crash on import if env missing during build.
# But initialization should be guarded.

# Initialize SQLAlchemy Async Engine with pooling
# If DATABASE_URL is missing, engine will be None and get_db will fail/error out appropriately when called
engine = create_async_engine(
DATABASE_URL,
echo=False,
pool_size=20, # Maintain 20 open connections
max_overflow=10, # Allow 10 extra during spikes
pool_timeout=30, # Wait 30s for a connection before raising timeout
pool_pre_ping=True, # Check connection health before handing it out
) if DATABASE_URL else None

# Session Factory
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False,
) if engine else None


async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""
Dependency to provide a thread-safe database session.
Ensures that the session is closed after the request is processed.
"""
if not async_session_maker:
raise RuntimeError("Database engine is not initialized. check DATABASE_URL.")

async with async_session_maker() as session:
try:
yield session
# automatic commit/rollback is often handled by caller or service layer logic
except Exception:
logger.exception("Database session error")
await session.rollback()
raise
# session.close() is handled automatically by the async context manager
2 changes: 2 additions & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ attrs==25.3.0
auth0-python==4.9.0
Authlib==1.3.1
autoflake==2.3.1
asyncpg==0.29.0
pytest-asyncio==0.23.5
autopep8==2.3.2
backoff==2.2.1
bcrypt==4.3.0
Expand Down
49 changes: 49 additions & 0 deletions docs/DATABASE_CONNECTION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Database Connection Management

This document describes the thread-safe database connection management implemented for the Devr.AI backend.

## Overview

We use **SQLAlchemy** (AsyncIO) with **asyncpg** to manage a pool of connections to the Supabase PostgreSQL database. This allows for high-concurrency operations without the limitations of HTTP-based PostgREST calls (which `supabase-py` wraps).

## Configuration

The connection manager reads the `DATABASE_URL` from the application settings (loaded from `.env`).

```env
DATABASE_URL=postgresql+asyncpg://user:password@host:5432/dbname
```

## Key Components

### 1. Engine & Pooling
Located in `app/database/core.py`.
- **Pool Size**: 20 connections maintained open.
- **Max Overflow**: 10 temporary connections allowed during high load.
- **Pool Timeout**: 30 seconds wait time before raising an error.
- **Pre-Ping**: Checked before checkout to ensure connection health.

Comment on lines +19 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Clarify the module path to match the repo layout.

The file lives under backend/app/... in this repo; adjusting the path avoids confusion for contributors.

✏️ Suggested doc tweak
-Located in `app/database/core.py`.
+Located in `backend/app/database/core.py`.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
### 1. Engine & Pooling
Located in `app/database/core.py`.
- **Pool Size**: 20 connections maintained open.
- **Max Overflow**: 10 temporary connections allowed during high load.
- **Pool Timeout**: 30 seconds wait time before raising an error.
- **Pre-Ping**: Checked before checkout to ensure connection health.
### 1. Engine & Pooling
Located in `backend/app/database/core.py`.
- **Pool Size**: 20 connections maintained open.
- **Max Overflow**: 10 temporary connections allowed during high load.
- **Pool Timeout**: 30 seconds wait time before raising an error.
- **Pre-Ping**: Checked before checkout to ensure connection health.
🤖 Prompt for AI Agents
In `@docs/DATABASE_CONNECTION.md` around lines 19 - 25, Update the module path in
the documentation so it matches the repository layout: change references to
app/database/core.py to backend/app/database/core.py in the "Engine & Pooling"
section (and any other occurrences), ensuring the doc points to the actual
module location used by the codebase.

### 2. Dependency Injection
Use `get_db` in FastAPI routes or other async functions to get a session.

```python
from app.database.core import get_db
from sqlalchemy import text

@router.get("/items")
async def read_items(db: AsyncSession = Depends(get_db)):
result = await db.execute(text("SELECT * FROM items"))
return result.mappings().all()
```

The `get_db` generator ensures:
- A session is created from the pool.
- The session is passed to the function.
- The session is **automatically closed** after the function completes (even on error).
- If an error occurs, the transaction is rolled back.

## Testing
Unit tests in `tests/test_db_pool.py` verify:
- Pool configuration.
- Concurrent session acquisition (simulating 50+ parallel requests).
- Proper cleanup (rollback and close) on errors.
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ dependencies = [
"pygit2 (>=1.18.2,<2.0.0)",
"toml (>=0.10.2,<0.11.0)",
"websockets (>=15.0.1,<16.0.0)",
"sqlalchemy (>=2.0.25,<3.0.0)",
"asyncpg (>=0.29.0,<1.0.0)",
]

[tool.poetry]
Expand All @@ -56,3 +58,7 @@ isort = "^6.0.1"
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
79 changes: 79 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""
Shared pytest fixtures for Devr.AI backend tests.
"""
import sys
import os
from datetime import datetime
from typing import Dict, Any
from unittest.mock import MagicMock, AsyncMock

import pytest

# Add backend to path for imports
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'backend')))


# ---------------------------------------------------------------------------
# Event fixtures
# ---------------------------------------------------------------------------

@pytest.fixture
def sample_event_data() -> Dict[str, Any]:
"""Returns minimal valid data for creating a BaseEvent."""
return {
"id": "evt-12345",
"platform": "discord",
"event_type": "message.created",
"actor_id": "user-001",
"actor_name": "TestUser",
"channel_id": "chan-001",
"content": "Hello, how do I contribute?",
"raw_data": {"original": "payload"},
"metadata": {"source": "test"},
}


@pytest.fixture
def sample_faq_event_data(sample_event_data) -> Dict[str, Any]:
"""Event data for a FAQ request."""
data = sample_event_data.copy()
data["event_type"] = "faq.requested"
data["content"] = "what is devr.ai?"
return data


# ---------------------------------------------------------------------------
# Handler fixtures
# ---------------------------------------------------------------------------

@pytest.fixture
def mock_discord_bot():
"""Mock Discord bot with channel sending capability."""
bot = MagicMock()
channel = MagicMock()
channel.send = AsyncMock()
bot.get_channel = MagicMock(return_value=channel)
return bot


# ---------------------------------------------------------------------------
# LLM fixtures
# ---------------------------------------------------------------------------

@pytest.fixture
def mock_llm_client():
"""Mock LLM client that returns a valid JSON triage response."""
mock_llm = MagicMock()
mock_response = MagicMock()
mock_response.content = '{"needs_devrel": true, "priority": "high", "reasoning": "Test reasoning"}'
mock_llm.ainvoke = AsyncMock(return_value=mock_response)
return mock_llm


@pytest.fixture
def mock_llm_client_error():
"""Mock LLM client that raises an exception."""
mock_llm = MagicMock()
mock_llm.ainvoke = AsyncMock(side_effect=Exception("LLM API Error"))
return mock_llm
Loading