Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/fastapi/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ typing-extensions>=4.0

# HTTP client and retry helpers
httpx>=0.24
# Enable HTTP/2 support for httpx when available
h2>=4.1
tenacity>=8.2

# Testing
Expand Down
5 changes: 5 additions & 0 deletions backend/fastapi/src/adapter/factory/service_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ def get_conversation_output_port() -> ConversationOutputPort:
conv_repo, _, _, _ = _make_repos_and_ports()
return conv_repo

@staticmethod
def get_message_output_port() -> MessageOutputPort:
_, msg_repo, _, _ = _make_repos_and_ports()
return msg_repo

@staticmethod
def get_health_input_port() -> HealthInputPort:
_, _, _, health_input = _make_repos_and_ports()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from fastapi import APIRouter, Query, Depends
from typing import Optional
from typing import Optional, List
from src.domain.vo.message_response import MessageResponse
from src.application.ports.input.conversation_input_port import ConversationInputPort
from src.domain.vo.conversation_response import ConversationResponse
from src.domain.vo.conversation_update_request import ConversationUpdateRequest
from src.domain.vo.list_response import ListResponse
from src.adapter.factory.service_factory import ServiceFactory
from src.adapter.input.controllers.response_utils import success_response


# Router organized as a grouped resource; prefix is applied when included in the app
Expand All @@ -19,38 +21,43 @@ async def list_conversations(
conversation_service: ConversationInputPort = Depends(ServiceFactory.get_conversation_input_port)
):
"""Cursor pagination: `after` is the id of the anchor element; return `limit` items after that anchor, skipping `offset` items, ordered by `created_at`."""
return await conversation_service.get_conversation_list(limit=limit, after=after, order=order)
data = await conversation_service.get_conversation_list(limit=limit, after=after, order=order)
return success_response(data=data, message="ok", status_code=200)


@router.get("/{conversation_id}", response_model=ConversationResponse)
async def get_conversation(
conversation_id: str,
conversation_service: ConversationInputPort = Depends(ServiceFactory.get_conversation_input_port)
):
return await conversation_service.get_conversation_detail(conversation_id)
data = await conversation_service.get_conversation_detail(conversation_id)
return success_response(data=data, message="ok", status_code=200)


@router.post("/", response_model=ConversationResponse)
async def create_conversation(
conversation_service: ConversationInputPort = Depends(ServiceFactory.get_conversation_input_port)
):
return await conversation_service.create_conversation()
data = await conversation_service.create_conversation()
return success_response(data=data, message="created", status_code=201)


@router.put("/", response_model=ConversationResponse)
async def update_conversation(
request: ConversationUpdateRequest,
conversation_service: ConversationInputPort = Depends(ServiceFactory.get_conversation_input_port)
):
return await conversation_service.update_conversation(request)
data = await conversation_service.update_conversation(request)
return success_response(data=data, message="updated", status_code=200)


@router.delete("/{conversation_id}", response_model=ConversationResponse)
async def delete_conversation(
conversation_id: str,
conversation_service: ConversationInputPort = Depends(ServiceFactory.get_conversation_input_port)
):
return await conversation_service.delete_conversation(conversation_id)
result = await conversation_service.delete_conversation(conversation_id)
return success_response(data={"deleted": result}, message="deleted", status_code=200)


@router.post("/{conversation_id}/messages")
Expand All @@ -59,4 +66,28 @@ async def post_message(
content: str,
conversation_service: ConversationInputPort = Depends(ServiceFactory.get_conversation_input_port)
):
return await conversation_service.post_message(conversation_id, content)
data = await conversation_service.post_message(conversation_id, content)
return success_response(data=data, message="created", status_code=201)


@router.get("/{conversation_id}/messages", response_model=ListResponse[MessageResponse])
async def get_conversation_messages(
conversation_id: str,
after: Optional[str] = Query(None),
limit: int = Query(10, gt=0),
order: str = Query("desc", regex="^(asc|desc)$"),
conversation_service: ConversationInputPort = Depends(ServiceFactory.get_conversation_input_port)
):
"""Get messages for a conversation (cursor pagination)."""
data = await conversation_service.get_conversation_messages(conversation_id=conversation_id, after=after, limit=limit, order=order)
return success_response(data=data, message="ok", status_code=200)


@router.get("/{conversation_id}/messages/recent", response_model=List[MessageResponse])
async def get_recent_messages(
conversation_id: str,
k: int = Query(5, gt=0, description="Number of latest messages to return (default 5)"),
conversation_service: ConversationInputPort = Depends(ServiceFactory.get_conversation_input_port)
):
data = await conversation_service.get_recent_messages(conversation_id=conversation_id, k=k)
return success_response(data=data, message="ok", status_code=200)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from src.application.ports.input.gemini_input_port import GeminiInputPort
from src.domain.vo.message_request import MessageRequest
from src.adapter.factory.service_factory import ServiceFactory
from src.adapter.input.controllers.response_utils import success_response
import json


router = APIRouter(prefix="/gemini", tags=["gemini"])
Expand All @@ -15,7 +17,8 @@ async def query(
gemini_service: GeminiInputPort = Depends(ServiceFactory.get_gemini_input_port),
):
"""Synchronous (non-streaming) Gemini query returning the full assistant text."""
return await gemini_service.query(message_request)
resp = await gemini_service.query(message_request)
return success_response(data=resp, message="ok", status_code=200)


@router.post("/stream")
Expand All @@ -30,8 +33,24 @@ async def query_stream(
"""

async def generator() -> AsyncIterator[bytes]:
# Yield Server-Sent Events (SSE) style 'data:' frames so clients such as
# EventSource or curl can process parts immediately. Each event is
# terminated by a blank line. This also tends to reduce buffering in
# intermediate proxies.
async for chunk in gemini_service.query_stream(message_request):
# encode each string chunk as utf-8 bytes
yield chunk.encode("utf-8")

return StreamingResponse(generator(), media_type="text/plain; charset=utf-8")
if chunk is None:
continue
# Ensure chunk is a str and strip accidental newlines
text = str(chunk)
# SSE data frame
data = f"data: {json.dumps(text, ensure_ascii=False)}\n\n"
yield data.encode("utf-8")

headers = {
# Prevent proxies from buffering the response
"Cache-Control": "no-cache, no-transform",
# For nginx / proxy buffering bypass
"X-Accel-Buffering": "no",
}

return StreamingResponse(generator(), media_type="text/event-stream", headers=headers)
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from fastapi import APIRouter, Depends
from src.application.ports.input.health_input_port import HealthInputPort
from src.adapter.factory.service_factory import ServiceFactory
from src.adapter.input.controllers.response_utils import success_response

router = APIRouter(prefix="/health", tags=["health"])


@router.get("/", summary="Liveness probe")
async def health(health_service: HealthInputPort = Depends(ServiceFactory.get_health_input_port)):
return await health_service.check_health()
data = await health_service.check_health()
return success_response(data=data, message="ok", status_code=200)


@router.get("/ready", summary="Readiness probe")
async def ready(health_service: HealthInputPort = Depends(ServiceFactory.get_health_input_port)):
return await health_service.check_readiness()
data = await health_service.check_readiness()
return success_response(data=data, message="ready", status_code=200)
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from fastapi import APIRouter, Query, Depends
from typing import Optional, List
from src.adapter.factory.service_factory import ServiceFactory
from src.application.ports.output.message_output_port import MessageOutputPort
from src.application.ports.input.conversation_input_port import ConversationInputPort
from src.domain.vo.message_response import MessageResponse
from src.domain.vo.list_response import ListResponse
from src.domain.vo.message_update_request import MessageUpdateRequest
from src.domain.models.message_domain import MessageDomain
from src.adapter.input.controllers.response_utils import success_response


router = APIRouter(prefix="/messages", tags=["messages"])


@router.get("/", response_model=ListResponse[MessageResponse])
async def list_messages(
conversation_id: Optional[str] = Query(None, description="Filter by conversation id"),
after: Optional[str] = Query(None),
limit: int = Query(20, gt=0),
order: str = Query("desc", regex="^(asc|desc)$"),
message_repo: MessageOutputPort = Depends(ServiceFactory.get_message_output_port),
):
"""List messages. If `conversation_id` is provided, returns messages for that conversation (cursor pagination)."""
if conversation_id:
messages, has_more = await message_repo.get_list_by_conversation(conversation_id, after, limit, order)
data: List[MessageResponse] = [MessageResponse.from_domain(m) for m in messages]
first_id = data[0].id if data else None
last_id = data[-1].id if data else None
payload = ListResponse[MessageResponse](data=data, first_id=first_id, last_id=last_id, has_more=has_more)
return success_response(data=payload, message="ok", status_code=200)
# If no conversation_id provided, return empty list
payload = ListResponse[MessageResponse](data=[], first_id=None, last_id=None, has_more=False)
return success_response(data=payload, message="ok", status_code=200)


@router.get("/{message_id}", response_model=MessageResponse)
async def get_message(
message_id: str,
message_repo: MessageOutputPort = Depends(ServiceFactory.get_message_output_port),
):
msg = await message_repo.get_by_id(message_id)
return success_response(data=MessageResponse.from_domain(msg), message="ok", status_code=200)


@router.put("/", response_model=MessageResponse)
async def update_message(
request: MessageUpdateRequest = Depends(MessageUpdateRequest.as_body),
message_repo: MessageOutputPort = Depends(ServiceFactory.get_message_output_port),
):
# retrieve existing message
existing = await message_repo.get_by_id(request.id)
# apply updates
updated = MessageDomain(
id=existing.id,
conversation_id=existing.conversation_id,
role=request.role if request.role is not None else existing.role,
content=request.content,
created_at=existing.created_at,
)
saved = await message_repo.update(updated)
return success_response(data=MessageResponse.from_domain(saved), message="ok", status_code=200)


@router.delete("/{message_id}")
async def delete_message(
message_id: str,
message_repo: MessageOutputPort = Depends(ServiceFactory.get_message_output_port),
):
existing = await message_repo.get_by_id(message_id)
result = await message_repo.delete(existing)
return success_response(data={"deleted": result}, message="deleted", status_code=200)


# Helper endpoint: messages by conversation (convenience)
@router.get("/by-conversation/{conversation_id}", response_model=ListResponse[MessageResponse])
async def get_messages_by_conversation(
conversation_id: str,
after: Optional[str] = Query(None),
limit: int = Query(20, gt=0),
order: str = Query("desc", regex="^(asc|desc)$"),
conversation_service: ConversationInputPort = Depends(ServiceFactory.get_conversation_input_port),
):
data = await conversation_service.get_conversation_messages(conversation_id=conversation_id, after=after, limit=limit, order=order)
return success_response(data=data, message="ok", status_code=200)
23 changes: 23 additions & 0 deletions backend/fastapi/src/adapter/input/controllers/response_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from fastapi.responses import JSONResponse
from typing import Any

from pydantic import BaseModel

def to_serializable(obj: Any):
if isinstance(obj, BaseModel):
return obj.model_dump()
if isinstance(obj, list):
return [to_serializable(o) for o in obj]
if isinstance(obj, dict):
return {k: to_serializable(v) for k, v in obj.items()}
return obj


def success_response(data: Any = None, message: str = "OK", status_code: int = 200) -> JSONResponse:
payload = {"status_code": status_code, "message": message, "data": to_serializable(data)}
return JSONResponse(content=payload, status_code=status_code)


def error_response(message: str = "Error", status_code: int = 500, data: Any = None) -> JSONResponse:
payload = {"status_code": status_code, "message": message, "data": to_serializable(data)}
return JSONResponse(content=payload, status_code=status_code)
Loading
Loading