From abc8081ccd2ab51943ae38751a849a25ddc53c6d Mon Sep 17 00:00:00 2001 From: Ganipudi Yugesh Date: Mon, 27 Apr 2026 10:09:34 +0530 Subject: [PATCH] feat: add RAG pipeline for Breeze Buddy knowledge-base Q&A MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Embeds merchant FAQ chunks via Azure AI Foundry (embed-v4.0) into pgvector. On each user turn, the utterance is embedded and the top-K nearest chunks are fetched via cosine similarity search and injected ephemerally into the LLM context before the response — removed immediately after so history stays clean. A per-template min_score threshold (default 0.30, configurable per language) filters irrelevant results. A text-cache and FAISS semantic cache (warmed by SlowThinker background predictions) reduce repeat-query latency to sub-ms. --- .../agents/breeze_buddy/agent/__init__.py | 63 ++- .../agents/breeze_buddy/agent/pipeline.py | 22 +- .../breeze_buddy/processors/__init__.py | 4 + .../breeze_buddy/processors/rag_context.py | 197 ++++++++ .../breeze_buddy/services/rag/__init__.py | 46 ++ .../breeze_buddy/services/rag/chunker.py | 145 ++++++ .../breeze_buddy/services/rag/embeddings.py | 131 ++++++ .../breeze_buddy/services/rag/fast_talker.py | 133 ++++++ .../breeze_buddy/services/rag/gcs_loader.py | 174 ++++++++ .../services/rag/index_manager.py | 224 ++++++++++ .../services/rag/memory_router.py | 338 ++++++++++++++ .../services/rag/semantic_cache.py | 282 ++++++++++++ .../breeze_buddy/services/rag/slow_thinker.py | 279 ++++++++++++ .../agents/breeze_buddy/services/rag/types.py | 139 ++++++ .../breeze_buddy/services/rag/vector_store.py | 98 ++++ .../agents/breeze_buddy/template/types.py | 10 + app/api/routers/breeze_buddy/__init__.py | 6 + .../breeze_buddy/knowledge_base/__init__.py | 419 ++++++++++++++++++ app/core/config/static.py | 23 + app/database/__init__.py | 58 ++- .../accessor/breeze_buddy/rag_embeddings.py | 173 ++++++++ .../023_create_rag_embeddings_table.sql | 56 +++ .../queries/breeze_buddy/rag_embeddings.py | 107 +++++ gigaAi/docs | 1 + pyproject.toml | 2 + uv.lock | 42 ++ 26 files changed, 3169 insertions(+), 3 deletions(-) create mode 100644 app/ai/voice/agents/breeze_buddy/processors/rag_context.py create mode 100644 app/ai/voice/agents/breeze_buddy/services/rag/__init__.py create mode 100644 app/ai/voice/agents/breeze_buddy/services/rag/chunker.py create mode 100644 app/ai/voice/agents/breeze_buddy/services/rag/embeddings.py create mode 100644 app/ai/voice/agents/breeze_buddy/services/rag/fast_talker.py create mode 100644 app/ai/voice/agents/breeze_buddy/services/rag/gcs_loader.py create mode 100644 app/ai/voice/agents/breeze_buddy/services/rag/index_manager.py create mode 100644 app/ai/voice/agents/breeze_buddy/services/rag/memory_router.py create mode 100644 app/ai/voice/agents/breeze_buddy/services/rag/semantic_cache.py create mode 100644 app/ai/voice/agents/breeze_buddy/services/rag/slow_thinker.py create mode 100644 app/ai/voice/agents/breeze_buddy/services/rag/types.py create mode 100644 app/ai/voice/agents/breeze_buddy/services/rag/vector_store.py create mode 100644 app/api/routers/breeze_buddy/knowledge_base/__init__.py create mode 100644 app/database/accessor/breeze_buddy/rag_embeddings.py create mode 100644 app/database/migrations/023_create_rag_embeddings_table.sql create mode 100644 app/database/queries/breeze_buddy/rag_embeddings.py create mode 160000 gigaAi/docs diff --git a/app/ai/voice/agents/breeze_buddy/agent/__init__.py b/app/ai/voice/agents/breeze_buddy/agent/__init__.py index 100f6071b..eb62c3783 100644 --- a/app/ai/voice/agents/breeze_buddy/agent/__init__.py +++ b/app/ai/voice/agents/breeze_buddy/agent/__init__.py @@ -59,9 +59,11 @@ from app.ai.voice.agents.breeze_buddy.observability.tracing_setup import ( create_root_span, ) +from app.ai.voice.agents.breeze_buddy.processors.rag_context import RagContextProcessor from app.ai.voice.agents.breeze_buddy.services.inbound_policy import ( get_block_redirect, ) +from app.ai.voice.agents.breeze_buddy.services.rag import RagMemoryRouter from app.ai.voice.agents.breeze_buddy.services.telephony.base_provider import ( VoiceCallProvider, ) @@ -82,7 +84,11 @@ close_websocket_safely, ) from app.ai.voice.agents.breeze_buddy.utils.warm_transfer import set_transfer_flag -from app.core.config.static import ENABLE_BREEZE_BUDDY_TRACING +from app.core.config.static import ( + ENABLE_BREEZE_BUDDY_TRACING, + RAG_EMBEDDING_DEPLOYMENT, + RAG_ENABLED, +) from app.core.logger import logger from app.core.logger.context import ( clear_log_context, @@ -163,6 +169,10 @@ def __init__( # Error tracking self.errors: List[Dict[str, Any]] = [] + # RAG (Retrieval-Augmented Generation) — optional, per-call + self._rag_router: Optional[RagMemoryRouter] = None + self._rag_context_processor: Optional[RagContextProcessor] = None + @property def is_daily_mode(self) -> bool: return self.transport_type == TRANSPORT_TYPE_DAILY @@ -707,6 +717,50 @@ async def _run_with_tracing(self, runner: PipelineRunner) -> None: track_error(self.errors, error_msg) self.root_span.end() + async def _init_rag(self) -> None: + """Initialise RAG router and context processor if knowledge_base is configured.""" + if not RAG_ENABLED: + logger.info("RAG disabled globally (RAG_ENABLED=false) — skipping init") + return + if not self.configurations or not self.template: + return + kb_config = getattr(self.configurations, "knowledge_base", None) + if not kb_config: + return + + merchant_id = self.template.merchant_id or "default" + template_id = self.template.id + + try: + self._rag_router = await RagMemoryRouter.build( + kb_config=kb_config, + merchant_id=merchant_id, + template_id=template_id, + azure_embedding_deployment=RAG_EMBEDDING_DEPLOYMENT, + ) + await self._rag_router.start() + self._rag_context_processor = RagContextProcessor(self._rag_router) + logger.info( + "RAG initialised: gs://.../%s/%s", + merchant_id, + template_id, + ) + except Exception as exc: + logger.error("RAG initialisation failed (proceeding without RAG): %s", exc) + self._rag_router = None + self._rag_context_processor = None + + async def _stop_rag(self) -> None: + """Stop and clean up the RAG router.""" + if self._rag_router is not None: + try: + await self._rag_router.stop() + except Exception as exc: + logger.warning("RAG stop error: %s", exc) + finally: + self._rag_router = None + self._rag_context_processor = None + async def run(self, runner_args: Optional[RunnerArguments] = None) -> None: """Main entry point for running the agent. @@ -743,6 +797,10 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None: # VAD analyzer is passed to build_pipeline where it's configured inside the # LLMUserAggregator. This enables UserTurnStrategies (VAD + Transcription fallback). stt, llm, tts = await create_services(self.configurations) + + # Initialise RAG (non-blocking on failure) + await self._init_rag() + ( pipeline, self.context, @@ -757,6 +815,7 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None: self.vad_analyzer, self.configurations, on_user_idle_timeout=self._handle_user_idle_timeout, + rag_context_processor=self._rag_context_processor, ) # Store callback handler for resetting retry count on user activity @@ -816,6 +875,8 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None: except asyncio.CancelledError: logger.info("Pipeline task cancelled. Exiting gracefully.") finally: + # Stop RAG router (logs final metrics, cancels background tasks) + await self._stop_rag() # Safety net: always clear log context when run() exits, regardless of how. # This handles crashes, cancellations, and any exit path missed above. # Double-clearing (if end_conversation already cleared) is harmless. diff --git a/app/ai/voice/agents/breeze_buddy/agent/pipeline.py b/app/ai/voice/agents/breeze_buddy/agent/pipeline.py index a72fff85a..0bb92ae50 100644 --- a/app/ai/voice/agents/breeze_buddy/agent/pipeline.py +++ b/app/ai/voice/agents/breeze_buddy/agent/pipeline.py @@ -47,6 +47,7 @@ from app.ai.voice.agents.breeze_buddy.llm import get_llm_service from app.ai.voice.agents.breeze_buddy.observability.tracing_setup import setup_tracing from app.ai.voice.agents.breeze_buddy.processors import ( + RagContextProcessor, TranscriptionGateProcessor, UserIdleCallbackHandler, create_user_idle_processor, @@ -169,6 +170,7 @@ async def build_pipeline( vad_analyzer: Optional[SileroVADAnalyzer] = None, configurations: Optional[ConfigurationModel] = None, on_user_idle_timeout: Optional[Callable[[int], Any]] = None, + rag_context_processor: Optional[RagContextProcessor] = None, ) -> tuple[ Pipeline, LLMContext, @@ -360,7 +362,7 @@ async def build_pipeline( # Store reference to user aggregator for position lookup user_aggregator = context_aggregator.user() - # Order: stt → transcription_gate → user_aggregator → llm → tts + # Order: stt → transcription_gate → user_aggregator → [rag_context] → llm → tts # Pipecat's LLMUserAggregator natively handles interruptions via # UserTurnStrategies — no custom response gate needed. # Note: RTVIProcessor is added automatically by PipelineTask (pipecat v0.0.102+) @@ -376,6 +378,24 @@ async def build_pipeline( context_aggregator.assistant(), ] + # Insert RAG context processor between user_aggregator and llm. + # It must sit after user_aggregator so it can intercept the LLMContextFrame + # (which carries the assembled context) and inject knowledge ephemerally + # before the LLM sees it. Placing it before user_aggregator would mean it + # only sees raw TranscriptionFrames and has no access to the LLMContext object. + if rag_context_processor is not None: + try: + user_aggregator_idx = pipeline_parts.index(user_aggregator) + pipeline_parts.insert(user_aggregator_idx + 1, rag_context_processor) + logger.info( + "RAG context processor inserted into pipeline (after user_aggregator)" + ) + except ValueError as e: + logger.error( + "Failed to insert RAG context processor into pipeline: %s. RAG disabled.", + e, + ) + # Insert user idle processor before user_aggregator to monitor user activity if user_idle: try: diff --git a/app/ai/voice/agents/breeze_buddy/processors/__init__.py b/app/ai/voice/agents/breeze_buddy/processors/__init__.py index 0dbdd84e1..844eaf2a5 100644 --- a/app/ai/voice/agents/breeze_buddy/processors/__init__.py +++ b/app/ai/voice/agents/breeze_buddy/processors/__init__.py @@ -1,5 +1,8 @@ """Breeze Buddy custom processors for pipeline control.""" +from app.ai.voice.agents.breeze_buddy.processors.rag_context import ( + RagContextProcessor, +) from app.ai.voice.agents.breeze_buddy.processors.transcription_gate import ( TranscriptionGateProcessor, ) @@ -9,6 +12,7 @@ ) __all__ = [ + "RagContextProcessor", "TranscriptionGateProcessor", "UserIdleCallbackHandler", "create_user_idle_processor", diff --git a/app/ai/voice/agents/breeze_buddy/processors/rag_context.py b/app/ai/voice/agents/breeze_buddy/processors/rag_context.py new file mode 100644 index 000000000..9462a319e --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/processors/rag_context.py @@ -0,0 +1,197 @@ +""" +RAG Context Processor for Breeze Buddy. + +Sits **after** the user_aggregator in the pipeline. When the user_aggregator +pushes a ``LLMContextFrame`` downstream (i.e. the user's turn has been +aggregated and the context is ready to be sent to the LLM), this processor: + +1. Extracts the latest user utterance directly from the ``LLMContextFrame`` + (the last ``user`` role message already assembled by ``user_aggregator``). +2. Embeds it and fetches relevant knowledge from the ``RagMemoryRouter`` +2. If knowledge is found it is injected **directly into the LLMContext object** + as a temporary ``system`` message — before the context reaches the LLM. +3. After the LLM finishes responding (``LLMFullResponseEndFrame``), the + injected message is **removed from the context** so it does not pollute the + persistent conversation history and does not confuse subsequent turns. + +Why this approach: +- The knowledge never touches the user's ``TranscriptionFrame`` text. +- The knowledge is not a persistent instruction — it disappears after one turn. +- Because the LLM sees a plain ``system`` message with *only* factual content + (no "do this / don't do that" meta-instructions), the node's own task/role + messages remain the sole source of behavioural guidance. The LLM answers the + side question from the facts and then continues its current task. + +Pipeline position:: + + transport.input() + → stt + → TranscriptionGateProcessor + → user_aggregator ← produces LLMContextFrame + → RagContextProcessor ← here: enriches context ephemerally + → llm + → tts + → transport.output() + → context_aggregator.assistant() +""" + +from __future__ import annotations + +from typing import Any, Dict, Optional + +from pipecat.frames.frames import ( + Frame, + LLMContextFrame, + LLMFullResponseEndFrame, +) +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + +from app.ai.voice.agents.breeze_buddy.services.rag.memory_router import RagMemoryRouter +from app.core.config.static import RAG_MAX_CONTEXT_CHARS +from app.core.logger import logger + +# Role used for the ephemeral RAG message — must match what we look for on cleanup +_RAG_ROLE = "system" +_RAG_MARKER = "[RAG]" + + +class RagContextProcessor(FrameProcessor): + """Ephemerally enriches the LLM context with RAG knowledge on each user turn. + + The retrieved knowledge is injected into the ``LLMContext`` object directly + before it reaches the LLM and removed immediately after the LLM responds. + This keeps the persistent conversation history clean and avoids competing + with the node's own task/role messages. + + Args: + rag_router: Fully initialised ``RagMemoryRouter`` for this call. + """ + + def __init__(self, rag_router: RagMemoryRouter) -> None: + super().__init__() + self._router = rag_router + # The LLMContext we injected into (kept for cleanup after LLM responds) + self._injected_context: Optional[OpenAILLMContext] = None + + # ------------------------------------------------------------------ + # Frame processing + # ------------------------------------------------------------------ + + async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: + await super().process_frame(frame, direction) + + if isinstance(frame, LLMContextFrame): + await self._enrich_and_forward(frame, direction) + return + + if isinstance(frame, LLMFullResponseEndFrame): + # LLM finished — remove the ephemeral RAG message from context + self._cleanup_rag_message() + await self.push_frame(frame, direction) + return + + await self.push_frame(frame, direction) + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + async def _enrich_and_forward( + self, frame: LLMContextFrame, direction: FrameDirection + ) -> None: + """Fetch RAG context and inject it into the LLMContext before forwarding.""" + llm_context: OpenAILLMContext = frame.context # type: ignore[assignment] + + # Extract the latest user utterance from the context messages + utterance = "" + messages = llm_context.get_messages() or [] + for msg in reversed(messages): + if msg.get("role") == "user": + content = msg.get("content", "") + if isinstance(content, str): + utterance = content.strip() + elif isinstance(content, list): + # OpenAI multi-part content: extract text parts + parts = [ + p.get("text", "") + for p in content + if isinstance(p, dict) and p.get("type") == "text" + ] + utterance = " ".join(parts).strip() + break + + if not utterance: + await self.push_frame(frame, direction) + return + + # Retrieve knowledge (cache hit ≈ 0.1 ms, miss ≈ 50–150 ms) + try: + context = await self._router.get_context(utterance) + except Exception as exc: + logger.warning("RagContextProcessor: context retrieval failed: %s", exc) + await self.push_frame(frame, direction) + return + + if not context: + await self.push_frame(frame, direction) + return + + # Trim to prevent prompt bloat + if len(context) > RAG_MAX_CONTEXT_CHARS: + context = context[:RAG_MAX_CONTEXT_CHARS].rsplit("\n", 1)[0] + + # Inject directly into the LLMContext object as an ephemeral system msg. + # The framing instruction tells the LLM to use the retrieved facts to + # answer the user's question. The marker prefix lets us find and remove + # it on cleanup. + rag_message: Dict[str, Any] = { + "role": _RAG_ROLE, + "content": ( + f"{_RAG_MARKER} The following are relevant facts from the knowledge base. " + "Use them to answer the user's question.\n\n" + f"{context}" + ), + } + + llm_context.add_message(rag_message) # type: ignore[arg-type] + self._injected_context = llm_context + + logger.debug( + "RagContextProcessor: injected %d chars of RAG context for '%s…'", + len(context), + utterance[:40], + ) + + await self.push_frame(frame, direction) + + def _cleanup_rag_message(self) -> None: + """Remove the ephemeral RAG system message from the LLMContext.""" + if self._injected_context is None: + return + + messages = self._injected_context.get_messages() + if not messages: + self._injected_context = None + return + + original_len = len(messages) + filtered = [] + for m in messages: + content = m.get("content") + is_rag_msg = ( + m.get("role") == _RAG_ROLE + and isinstance(content, str) + and content.startswith(_RAG_MARKER) + ) + if not is_rag_msg: + filtered.append(m) + + if len(filtered) < original_len: + self._injected_context.set_messages(filtered) + logger.debug( + "RagContextProcessor: cleaned up %d ephemeral RAG message(s)", + original_len - len(filtered), + ) + + self._injected_context = None diff --git a/app/ai/voice/agents/breeze_buddy/services/rag/__init__.py b/app/ai/voice/agents/breeze_buddy/services/rag/__init__.py new file mode 100644 index 000000000..2bf4a6d2f --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/services/rag/__init__.py @@ -0,0 +1,46 @@ +""" +Breeze Buddy RAG Service + +Voice-optimised Retrieval-Augmented Generation using a dual-agent architecture: + + SlowThinker – background async task that predicts follow-up topics from the + ongoing conversation, retrieves candidate chunks from the + per-template pgvector store and pre-warms the SemanticCache. + + FastTalker – synchronous (sub-ms) cache lookup. Falls back to direct + pgvector search only on a cache miss, so the LLM context is + always ready before the first TTS byte is produced. + +Knowledge files live in GCS under a configurable prefix. They are downloaded +once per index run, chunked, embedded (Azure OpenAI text-embedding-3-small) and +stored in PostgreSQL via pgvector. The IndexManager writes embeddings into the +``rag_embeddings`` table; the PgVectorStore fetches them on demand per call. + +Usage +----- +from app.ai.voice.agents.breeze_buddy.services.rag import RagMemoryRouter + +router = await RagMemoryRouter.build( + kb_config=template.configurations.knowledge_base, + merchant_id=merchant_id, + template_id=template_id, +) +await router.start() + +# In the LLM context-building phase: +context = await router.get_context(user_utterance) + +await router.stop() +""" + +from app.ai.voice.agents.breeze_buddy.services.rag.memory_router import RagMemoryRouter +from app.ai.voice.agents.breeze_buddy.services.rag.types import ( + KnowledgeBaseConfig, + RagMetrics, +) + +__all__ = [ + "RagMemoryRouter", + "KnowledgeBaseConfig", + "RagMetrics", +] diff --git a/app/ai/voice/agents/breeze_buddy/services/rag/chunker.py b/app/ai/voice/agents/breeze_buddy/services/rag/chunker.py new file mode 100644 index 000000000..f5b621e77 --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/services/rag/chunker.py @@ -0,0 +1,145 @@ +""" +Text chunker for the Breeze Buddy RAG pipeline. + +Uses a recursive character splitter that tries paragraph breaks first, then +sentence boundaries, then word boundaries, then hard byte splits. This +produces coherent chunks that stay within the ``chunk_size`` limit while +preserving natural reading units. + +Improved over the VoiceAgentRAG reference implementation: +- Respects both ``chunk_size`` AND ``chunk_overlap`` without duplicating content + (overlap is only injected between *adjacent* chunks, not recursively) +- Strips leading/trailing whitespace from every chunk +- Handles very long tokens (single words > chunk_size) gracefully +""" + +from __future__ import annotations + +from typing import List + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def chunk_text( + text: str, + chunk_size: int = 512, + chunk_overlap: int = 64, +) -> List[str]: + """Split *text* into overlapping chunks. + + Args: + text: Raw text content to split. + chunk_size: Maximum characters per chunk (hard limit). + chunk_overlap: Number of characters to repeat at the start of each + subsequent chunk to maintain cross-chunk context. + + Returns: + List of non-empty chunk strings. + """ + if not text or not text.strip(): + return [] + + if len(text) <= chunk_size: + stripped = text.strip() + return [stripped] if stripped else [] + + raw_chunks = _recursive_split(text, _SEPARATORS, chunk_size) + return _apply_overlap(raw_chunks, chunk_overlap, chunk_size) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +_SEPARATORS = ["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " "] + + +def _recursive_split(text: str, separators: List[str], chunk_size: int) -> List[str]: + """Recursively split *text* using the first separator that fits.""" + text = text.strip() + if not text: + return [] + if len(text) <= chunk_size: + return [text] + + # Find the best separator available in this text + sep = "" + remaining_seps: List[str] = [] + for i, s in enumerate(separators): + if s in text: + sep = s + remaining_seps = separators[i + 1 :] + break + + if not sep: + # No separator found → hard split + return _hard_split(text, chunk_size) + + parts = text.split(sep) + chunks: List[str] = [] + current_parts: List[str] = [] + current_len = 0 + + for part in parts: + candidate_len = current_len + len(sep) * len(current_parts) + len(part) + if candidate_len <= chunk_size: + current_parts.append(part) + current_len += len(part) + else: + # Flush current accumulation + if current_parts: + joined = sep.join(current_parts).strip() + if joined: + chunks.append(joined) + current_parts = [] + current_len = 0 + + # Handle a single part that is still too large + if len(part) > chunk_size: + if remaining_seps: + chunks.extend(_recursive_split(part, remaining_seps, chunk_size)) + else: + chunks.extend(_hard_split(part, chunk_size)) + else: + current_parts = [part] + current_len = len(part) + + if current_parts: + joined = sep.join(current_parts).strip() + if joined: + chunks.append(joined) + + return [c for c in chunks if c] + + +def _hard_split(text: str, chunk_size: int) -> List[str]: + """Split text into hard character-count slices (last resort).""" + chunks = [] + for i in range(0, len(text), chunk_size): + chunk = text[i : i + chunk_size].strip() + if chunk: + chunks.append(chunk) + return chunks + + +def _apply_overlap(chunks: List[str], chunk_overlap: int, chunk_size: int) -> List[str]: + """Prepend a tail of the previous chunk to each subsequent chunk. + + The prepended overlap is taken from the *end* of the previous chunk and is + capped so the resulting chunk does not exceed ``chunk_size``. + """ + if chunk_overlap <= 0 or len(chunks) <= 1: + return chunks + + result = [chunks[0]] + for i in range(1, len(chunks)): + prev_tail = chunks[i - 1][-chunk_overlap:] + candidate = (prev_tail + " " + chunks[i]).strip() + # Truncate if prepended overlap pushes us past chunk_size + if len(candidate) > chunk_size: + candidate = candidate[-chunk_size:].strip() + result.append(candidate) + + return result diff --git a/app/ai/voice/agents/breeze_buddy/services/rag/embeddings.py b/app/ai/voice/agents/breeze_buddy/services/rag/embeddings.py new file mode 100644 index 000000000..57f7bdc2b --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/services/rag/embeddings.py @@ -0,0 +1,131 @@ +""" +Embedding provider for Breeze Buddy RAG. + +Uses the Azure AI Foundry embedding endpoint (embed-v4.0, 1536-dim) via the +standard openai.OpenAI client (base_url + api_key pattern — NOT AzureOpenAI). + +Credentials are read from: + RAG_EMBEDDING_ENDPOINT = "https://breeze-automatic.services.ai.azure.com/openai/v1/" + RAG_EMBEDDING_API_KEY = "" + RAG_EMBEDDING_DEPLOYMENT = "embed-v-4-0" + +Design: +- Synchronous SDK call wrapped in asyncio.to_thread to avoid blocking the + event loop. +- Batched in groups of 256 (configurable) for throughput. +- Empty strings replaced with "." to avoid API 422 errors. +- Input is always passed as a list (the API requires it). +""" + +from __future__ import annotations + +import asyncio +from typing import List + +import numpy as np + +from app.core.logger import logger + +_DEFAULT_EMBEDDING_MODEL = "embed-v-4-0" +_DEFAULT_EMBEDDING_DIMENSION = 1536 + + +class EmbeddingProvider: + """Async embedding provider backed by Azure AI Foundry (OpenAI-compatible). + + Uses the standard ``openai.OpenAI`` client with ``base_url`` + ``api_key``. + This is different from ``AzureOpenAI`` which uses ``azure_endpoint`` + + ``api_version`` — the Foundry endpoint does not require an api_version. + + Args: + api_key: API key for the embedding endpoint. + endpoint: Base URL, e.g. + ``"https://breeze-automatic.services.ai.azure.com/openai/v1/"``. + deployment: Model/deployment name, e.g. ``"embed-v-4-0"``. + dimension: Expected output dimension (1536 for embed-v4.0). + max_batch_size: Maximum texts per single API call. + """ + + def __init__( + self, + api_key: str, + endpoint: str, + deployment: str = _DEFAULT_EMBEDDING_MODEL, + dimension: int = _DEFAULT_EMBEDDING_DIMENSION, + max_batch_size: int = 256, + ) -> None: + try: + from openai import OpenAI + except ImportError as exc: + raise ImportError( + "openai package is required for EmbeddingProvider. " + "Install via: uv add openai" + ) from exc + + self._client = OpenAI( + base_url=endpoint, + api_key=api_key, + ) + self._deployment = deployment + self._dimension = dimension + self._max_batch_size = max_batch_size + + # ------------------------------------------------------------------ + # Public interface + # ------------------------------------------------------------------ + + @property + def dimension(self) -> int: + """Embedding dimensionality.""" + return self._dimension + + async def embed(self, texts: List[str]) -> np.ndarray: + """Embed a batch of texts asynchronously. + + Args: + texts: Non-empty list of strings. + + Returns: + Float32 numpy array of shape ``(len(texts), dimension)``. + """ + if not texts: + return np.empty((0, self._dimension), dtype=np.float32) + + all_embeddings: List[np.ndarray] = [] + for i in range(0, len(texts), self._max_batch_size): + batch = texts[i : i + self._max_batch_size] + batch_embeddings = await asyncio.to_thread(self._embed_sync, batch) + all_embeddings.append(batch_embeddings) + + return np.vstack(all_embeddings).astype(np.float32) + + async def embed_single(self, text: str) -> np.ndarray: + """Embed a single string. + + Args: + text: Input string. + + Returns: + Float32 numpy array of shape ``(dimension,)``. + """ + result = await self.embed([text]) + return result[0] + + # ------------------------------------------------------------------ + # Private helpers + # ------------------------------------------------------------------ + + def _embed_sync(self, texts: List[str]) -> np.ndarray: + """Synchronous embedding call — runs in a thread pool via asyncio.to_thread.""" + # API requires a list; replace empty strings to avoid 422 errors + safe_texts = [t if t.strip() else "." for t in texts] + response = self._client.embeddings.create( + model=self._deployment, + input=safe_texts, + ) + vectors = [item.embedding for item in response.data] + arr = np.array(vectors, dtype=np.float32) + logger.debug( + "EmbeddingProvider: embedded %d texts → shape %s", len(texts), arr.shape + ) + return arr diff --git a/app/ai/voice/agents/breeze_buddy/services/rag/fast_talker.py b/app/ai/voice/agents/breeze_buddy/services/rag/fast_talker.py new file mode 100644 index 000000000..996f8f1ca --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/services/rag/fast_talker.py @@ -0,0 +1,133 @@ +""" +FastTalker – foreground retrieval agent for Breeze Buddy RAG. + +Reads context from the semantic cache (sub-ms) and falls back to a direct +vector-store search only on a cache miss. The retrieved context is formatted +as a structured string that can be injected directly into the system / task +prompt. +""" + +from __future__ import annotations + +from typing import List + +from app.ai.voice.agents.breeze_buddy.services.rag.embeddings import EmbeddingProvider +from app.ai.voice.agents.breeze_buddy.services.rag.semantic_cache import SemanticCache +from app.ai.voice.agents.breeze_buddy.services.rag.types import RagMetrics +from app.ai.voice.agents.breeze_buddy.services.rag.vector_store import PgVectorStore +from app.core.logger import logger + + +class FastTalker: + """Foreground RAG retrieval agent optimised for minimum latency. + + Cache-hit path: ~ 0.1 ms (FAISS inner-product search over ≤ 1 000 vectors). + Cache-miss path: embedding (async thread) + pgvector search ≈ 20–80 ms. + + Args: + vector_store: The knowledge-base vector store (``PgVectorStore`` in prod). + embedding_provider: Shared embedding provider. + cache: Semantic cache populated by the SlowThinker. + metrics: Shared metrics instance. + top_k: Context chunks to assemble per query. + fallback_to_retrieval: When ``True`` (default), fall back to direct + vector-store search on a cache miss instead of returning empty context. + """ + + def __init__( + self, + vector_store: PgVectorStore, + embedding_provider: EmbeddingProvider, + cache: SemanticCache, + metrics: RagMetrics, + top_k: int = 6, + fallback_to_retrieval: bool = True, + ) -> None: + self._store = vector_store + self._embeddings = embedding_provider + self._cache = cache + self._metrics = metrics + self._top_k = top_k + self._fallback = fallback_to_retrieval + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + async def get_context(self, query: str) -> str: + """Return formatted context string for the given user query. + + This is the primary entry-point called from ``MemoryRouter.get_context``. + + Args: + query: The user's latest utterance. + + Returns: + A formatted multi-chunk context string, or ``""`` if no relevant + context was found. + """ + self._metrics.total_queries += 1 + + # 1. Embed the query + q_emb = await self._embeddings.embed_single(query) + + # 2. Try the semantic cache (fast path) + cached = await self._cache.get(q_emb, top_k=self._top_k) + if cached: + self._metrics.cache_hits += 1 + chunks = [entry.text for entry in cached] + logger.debug( + "FastTalker cache HIT: %d chunks for '%s…'", len(chunks), query[:40] + ) + return _format_context(chunks) + + # 3. Cache miss — fall back to direct vector-store search + self._metrics.cache_misses += 1 + logger.debug("FastTalker cache MISS for '%s…'", query[:40]) + + if not self._fallback: + return "" + + results = await self._store.search(q_emb, top_k=self._top_k) + if not results: + return "" + + # Populate cache with the retrieved chunks so the next similar query hits. + # Use the query embedding as the cache key (document embeddings no longer + # fetched from the DB). + await self._cache.put_batch( + [ + { + "query_embedding": q_emb, + "text": r.text, + "metadata": r.metadata, + "relevance_score": r.score, + } + for r in results + ] + ) + + chunks = [r.text for r in results] + logger.debug( + "FastTalker fallback: %d chunks retrieved for '%s…'", + len(chunks), + query[:40], + ) + return _format_context(chunks) + + +# --------------------------------------------------------------------------- +# Context formatting +# --------------------------------------------------------------------------- + + +def _format_context(chunks: List[str]) -> str: + """Format retrieved chunks into an LLM-ready context string. + + The numbered format gives the LLM a clear structure and allows it to + cite sources if needed. + """ + if not chunks: + return "" + lines = [f"[{i + 1}] {chunk.strip()}" for i, chunk in enumerate(chunks)] + return "\n\n".join(lines) diff --git a/app/ai/voice/agents/breeze_buddy/services/rag/gcs_loader.py b/app/ai/voice/agents/breeze_buddy/services/rag/gcs_loader.py new file mode 100644 index 000000000..876779f6f --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/services/rag/gcs_loader.py @@ -0,0 +1,174 @@ +""" +GCS Document Loader for Breeze Buddy RAG. + +Downloads all supported files from a GCS bucket/prefix and chunks them for +embedding. PDF extraction uses pdfminer.six (optional) when available; +falls back to raw text for plain/markdown files. +""" + +from __future__ import annotations + +import io +import json +from typing import List, Optional + +from google.cloud import storage +from google.oauth2 import service_account + +from app.ai.voice.agents.breeze_buddy.services.rag.chunker import chunk_text +from app.ai.voice.agents.breeze_buddy.services.rag.types import DocumentChunk +from app.core.config.static import GCS_CREDENTIALS_JSON +from app.core.logger import logger + + +def _get_storage_client(credentials_json: Optional[str] = None) -> storage.Client: + """Build a GCS client from service-account JSON. + + Args: + credentials_json: JSON string for a GCS service account. + Defaults to the ``GCS_CREDENTIALS_JSON`` env var. + + Returns: + Authenticated GCS client. + """ + creds_json = credentials_json or GCS_CREDENTIALS_JSON + if not creds_json: + raise RuntimeError( + "GCS_CREDENTIALS_JSON env var is not set – cannot connect to GCS." + ) + creds_dict = json.loads(creds_json) + credentials = service_account.Credentials.from_service_account_info(creds_dict) + return storage.Client(credentials=credentials, project=creds_dict.get("project_id")) + + +def _extract_text_from_blob(blob: storage.Blob, extension: str) -> Optional[str]: + """Download a GCS blob and extract its text content. + + Args: + blob: GCS Blob object. + extension: Lowercased file extension (e.g. '.pdf', '.txt'). + + Returns: + Extracted text or ``None`` if the file could not be read. + """ + raw_bytes = blob.download_as_bytes() + + if extension == ".pdf": + try: + from pdfminer.high_level import extract_text_to_fp + from pdfminer.layout import LAParams + + output = io.StringIO() + extract_text_to_fp( + io.BytesIO(raw_bytes), + output, + laparams=LAParams(), + output_type="text", + codec="utf-8", + ) + return output.getvalue() + except ImportError: + logger.warning( + "pdfminer.six is not installed — skipping PDF file: %s", blob.name + ) + return None + except Exception as exc: + logger.warning("Failed to extract PDF %s: %s", blob.name, exc) + return None + + # Plain-text / markdown / rst + try: + return raw_bytes.decode("utf-8") + except UnicodeDecodeError: + try: + return raw_bytes.decode("latin-1") + except Exception as exc: + logger.warning("Cannot decode %s: %s", blob.name, exc) + return None + + +def load_gcs_documents( + gcs_bucket: str, + gcs_prefix: str = "", + extensions: Optional[List[str]] = None, + chunk_size: int = 512, + chunk_overlap: int = 64, + credentials_json: Optional[str] = None, +) -> List[DocumentChunk]: + """Download and chunk all knowledge files from GCS. + + Args: + gcs_bucket: GCS bucket name. + gcs_prefix: Path prefix inside the bucket. + extensions: Supported file extensions to ingest. + chunk_size: Maximum characters per chunk. + chunk_overlap: Overlap between adjacent chunks. + credentials_json: Service-account JSON (defaults to env var). + + Returns: + List of ``DocumentChunk`` objects ready for embedding. + """ + _exts: set[str] = ( + {".txt", ".md", ".rst", ".text", ".pdf"} + if extensions is None + else {e.lower() for e in extensions} + ) + + client = _get_storage_client(credentials_json) + bucket = client.bucket(gcs_bucket) + + # Normalise prefix: GCS list expects no leading slash, trailing slash is fine + prefix = gcs_prefix.lstrip("/") + + blobs = list(bucket.list_blobs(prefix=prefix)) + logger.info( + "GCS loader: found %d blobs under gs://%s/%s", + len(blobs), + gcs_bucket, + prefix, + ) + + all_chunks: List[DocumentChunk] = [] + + for blob in blobs: + name: str = blob.name + # Skip "directory marker" blobs (end with /) + if name.endswith("/"): + continue + + ext = "." + name.rsplit(".", 1)[-1].lower() if "." in name else "" + if ext not in _exts: + continue + + try: + text = _extract_text_from_blob(blob, ext) + except Exception as exc: + logger.warning("Failed to download gs://%s/%s: %s", gcs_bucket, name, exc) + continue + + if not text or not text.strip(): + logger.debug("Empty content in gs://%s/%s — skipping", gcs_bucket, name) + continue + + chunks = chunk_text(text, chunk_size=chunk_size, chunk_overlap=chunk_overlap) + for i, chunk in enumerate(chunks): + all_chunks.append( + DocumentChunk( + text=chunk, + metadata={ + "source": f"gs://{gcs_bucket}/{name}", + "chunk_index": i, + "file_name": name.split("/")[-1], + }, + ) + ) + + logger.info("GCS loader: gs://%s/%s → %d chunks", gcs_bucket, name, len(chunks)) + + logger.info( + "GCS loader: total %d chunks from gs://%s/%s", + len(all_chunks), + gcs_bucket, + prefix, + ) + return all_chunks diff --git a/app/ai/voice/agents/breeze_buddy/services/rag/index_manager.py b/app/ai/voice/agents/breeze_buddy/services/rag/index_manager.py new file mode 100644 index 000000000..37dbc2a7f --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/services/rag/index_manager.py @@ -0,0 +1,224 @@ +""" +Knowledge-base index manager for Breeze Buddy RAG — pgvector edition. + +Responsibilities +---------------- +``build_knowledge_base`` + Download raw files from GCS → chunk → embed → upsert into + ``rag_embeddings`` (PostgreSQL / pgvector). Safe to call from the + management API at any time; idempotent via ON CONFLICT upsert. + +``get_pg_vector_store`` + Return a lightweight ``PgVectorStore`` handle for a given + (merchant_id, template_id) pair. Instantiation is instant — no I/O, + no index build. The store fetches embeddings on demand from pgvector. + +``get_cached_index_stats`` + Query pgvector for live chunk/file counts and the last-indexed timestamp. + Always accurate; no in-process cache. + +``invalidate_index`` + Delete all rows for a (merchant_id, template_id) from ``rag_embeddings``. + +There is intentionally **no** ``_INDEX_CACHE`` dict. pgvector IS the store. +Each pod is stateless with respect to the knowledge base. +""" + +from __future__ import annotations + +import asyncio +import time +from typing import Any, Dict + +import numpy as np + +from app.ai.voice.agents.breeze_buddy.services.rag.embeddings import EmbeddingProvider +from app.ai.voice.agents.breeze_buddy.services.rag.gcs_loader import load_gcs_documents +from app.ai.voice.agents.breeze_buddy.services.rag.types import KnowledgeBaseConfig +from app.ai.voice.agents.breeze_buddy.services.rag.vector_store import PgVectorStore +from app.core.config.static import RAG_EMBEDDING_DIMENSION +from app.core.logger import logger + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +async def build_knowledge_base( + kb_config: KnowledgeBaseConfig, + embedding_provider: EmbeddingProvider, + merchant_id: str, + template_id: str, + gcs_bucket: str, + gcs_prefix: str, +) -> int: + """Download GCS documents, embed, and upsert into pgvector. + + This is the equivalent of the old ``force_rebuild_index`` but writes to + PostgreSQL instead of building a FAISS index. + + Args: + kb_config: Knowledge-base tuning config from the template. + embedding_provider: Configured ``EmbeddingProvider`` instance. + merchant_id: Merchant identifier. + template_id: Template UUID. + gcs_bucket: GCS bucket name. + gcs_prefix: GCS path prefix (``//``). + + Returns: + Number of chunks upserted. + """ + from app.database import get_pool + from app.database.accessor.breeze_buddy.rag_embeddings import upsert_knowledge_base + + logger.info( + "IndexManager: building knowledge base for %s/%s from gs://%s/%s …", + merchant_id, + template_id, + gcs_bucket, + gcs_prefix, + ) + t0 = time.time() + + # 1. Load + chunk documents from GCS + chunks = await asyncio.to_thread( + load_gcs_documents, + gcs_bucket, + gcs_prefix, + kb_config.extensions, + kb_config.chunk_size, + kb_config.chunk_overlap, + ) + + if not chunks: + logger.warning( + "IndexManager: no documents found at gs://%s/%s — nothing indexed", + gcs_bucket, + gcs_prefix, + ) + return 0 + + texts = [c.text for c in chunks] + metadata = [c.metadata for c in chunks] + + # 2. Embed all chunks in one call + embeddings_array: np.ndarray = await embedding_provider.embed(texts) + + # 3. Upsert into pgvector — wrapped in a transaction so stale-chunk delete + # and the upsert are atomic (no window where old chunks can be served). + async with get_pool().acquire() as conn: + async with conn.transaction(): + n = await upsert_knowledge_base( + conn, merchant_id, template_id, texts, embeddings_array, metadata + ) + + elapsed = time.time() - t0 + logger.info( + "IndexManager: upserted %d chunks for %s/%s in %.2f s", + n, + merchant_id, + template_id, + elapsed, + ) + return n + + +def get_pg_vector_store( + merchant_id: str, + template_id: str, + dimension: int = RAG_EMBEDDING_DIMENSION, +) -> PgVectorStore: + """Return a ``PgVectorStore`` handle for the given template. + + Instantiation is instant — no database I/O. The store fetches vectors + from pgvector on each ``search()`` call. + + Args: + merchant_id: Merchant identifier. + template_id: Template UUID. + dimension: Embedding dimension (default: ``RAG_EMBEDDING_DIMENSION``). + + Returns: + A ``PgVectorStore`` instance. + """ + return PgVectorStore( + merchant_id=merchant_id, + template_id=template_id, + dimension=dimension, + ) + + +async def get_cached_index_stats( + merchant_id: str, + template_id: str, +) -> Dict[str, Any]: + """Query pgvector for live knowledge-base stats. + + Args: + merchant_id: Merchant identifier. + template_id: Template UUID. + + Returns: + Dict with keys: ``chunk_count``, ``total_documents``, + ``index_size_bytes``, ``last_indexed_at``, ``error_message``. + """ + from app.database import get_pool + from app.database.accessor.breeze_buddy.rag_embeddings import get_kb_stats + + try: + async with get_pool().acquire() as conn: + stats = await get_kb_stats(conn, merchant_id, template_id) + + last_indexed_at = stats.get("last_indexed_at") + last_indexed_str = ( + last_indexed_at.isoformat() if last_indexed_at is not None else None + ) + + chunk_count = stats.get("chunk_count", 0) + return { + "chunk_count": chunk_count, + "total_documents": stats.get("file_count", 0), + # Approximate storage estimate: each vector is dimension * 4 bytes + "index_size_bytes": chunk_count * RAG_EMBEDDING_DIMENSION * 4, + "last_indexed_at": last_indexed_str, + "error_message": None, + } + except Exception as exc: + logger.warning( + "IndexManager: failed to fetch stats for %s/%s: %s", + merchant_id, + template_id, + exc, + ) + return { + "chunk_count": 0, + "total_documents": 0, + "index_size_bytes": 0, + "last_indexed_at": None, + "error_message": str(exc), + } + + +async def invalidate_index( + merchant_id: str, + template_id: str, +) -> int: + """Delete all chunks for a knowledge base from pgvector. + + Args: + merchant_id: Merchant identifier. + template_id: Template UUID. + + Returns: + Number of rows deleted. + """ + from app.database import get_pool + from app.database.accessor.breeze_buddy.rag_embeddings import delete_knowledge_base + + async with get_pool().acquire() as conn: + deleted = await delete_knowledge_base(conn, merchant_id, template_id) + + logger.info( + "IndexManager: deleted %d chunks for %s/%s", deleted, merchant_id, template_id + ) + return deleted diff --git a/app/ai/voice/agents/breeze_buddy/services/rag/memory_router.py b/app/ai/voice/agents/breeze_buddy/services/rag/memory_router.py new file mode 100644 index 000000000..b7c8dabb7 --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/services/rag/memory_router.py @@ -0,0 +1,338 @@ +""" +RagMemoryRouter – central orchestrator for the Breeze Buddy RAG service. + +One ``RagMemoryRouter`` is created per voice-call. It wires together: + + - ``EmbeddingProvider`` – embeddings via Azure AI Foundry + - ``PgVectorStore`` – stateless handle to the pgvector knowledge base + (no in-process FAISS index; safe across pods) + - ``SemanticCache`` – in-memory FAISS semantic cache per call + - ``SlowThinker`` – background prefetch agent (asyncio task) + - ``FastTalker`` – foreground sub-ms context retrieval + +Knowledge files are stored in GCS at:: + + gs:///// + +The bucket is read from ``RAG_GCS_BUCKET`` env var; the path is derived +automatically from the template — no manual configuration is required. + +Usage from the Breeze Buddy pipeline:: + + from app.ai.voice.agents.breeze_buddy.services.rag import RagMemoryRouter + + router = await RagMemoryRouter.build( + kb_config=template.configurations.knowledge_base, + merchant_id=template.merchant_id, + template_id=template.id, + ) + await router.start() + + # Inside the LLM context-assembly callback: + context = await router.get_context(user_utterance, conversation_history) + + await router.stop() +""" + +from __future__ import annotations + +from typing import Optional + +from app.ai.voice.agents.breeze_buddy.services.rag.embeddings import ( + EmbeddingProvider, +) +from app.ai.voice.agents.breeze_buddy.services.rag.fast_talker import FastTalker +from app.ai.voice.agents.breeze_buddy.services.rag.index_manager import ( + get_pg_vector_store, +) +from app.ai.voice.agents.breeze_buddy.services.rag.semantic_cache import SemanticCache +from app.ai.voice.agents.breeze_buddy.services.rag.slow_thinker import SlowThinker +from app.ai.voice.agents.breeze_buddy.services.rag.types import ( + KnowledgeBaseConfig, + RagMetrics, +) +from app.core.config.static import ( + AZURE_BREEZE_BUDDY_OPENAI_MODEL, + AZURE_OPENAI_API_KEY, + AZURE_OPENAI_ENDPOINT, + RAG_EMBEDDING_API_KEY, + RAG_EMBEDDING_DEPLOYMENT, + RAG_EMBEDDING_DIMENSION, + RAG_EMBEDDING_ENDPOINT, + RAG_GCS_BUCKET, +) +from app.core.logger import logger + + +class RagMemoryRouter: + """Voice-optimised dual-agent RAG orchestrator. + + Do not construct directly – use ``RagMemoryRouter.build()``. + """ + + def __init__( + self, + fast_talker: FastTalker, + slow_thinker: SlowThinker, + cache: SemanticCache, + metrics: RagMetrics, + kb_config: KnowledgeBaseConfig, + merchant_id: str, + template_id: str, + ) -> None: + self._fast_talker = fast_talker + self._slow_thinker = slow_thinker + self._cache = cache + self._metrics = metrics + self._kb_config = kb_config + self._merchant_id = merchant_id + self._template_id = template_id + self._conversation_history: list[str] = [] + + # ------------------------------------------------------------------ + # Factory + # ------------------------------------------------------------------ + + @classmethod + async def build( + cls, + kb_config: KnowledgeBaseConfig, + merchant_id: str, + template_id: str, + azure_api_key: Optional[str] = None, + azure_endpoint: Optional[str] = None, + azure_embedding_deployment: str = "text-embedding-3-small", + azure_prediction_deployment: Optional[str] = None, + azure_api_version: str = "2024-02-01", + ) -> "RagMemoryRouter": + """Build a ``RagMemoryRouter`` for a specific template. + + The PgVectorStore is instantiated instantly (no I/O). Embedding + credentials are read from ``RAG_EMBEDDING_*`` env vars. The LLM + used by SlowThinker for follow-up prediction still uses the main + Azure OpenAI credentials (``AZURE_OPENAI_*``). + + Args: + kb_config: Knowledge-base tuning config from the template. + merchant_id: Merchant identifier (from ``TemplateModel.merchant_id``). + template_id: Template UUID (from ``TemplateModel.id``). + azure_api_key: Unused — kept for backward-compat signature only. + azure_endpoint: Unused — kept for backward-compat signature only. + azure_embedding_deployment: Unused — overridden by RAG_EMBEDDING_DEPLOYMENT. + azure_prediction_deployment: Azure deployment for Slow Thinker LLM. + azure_api_version: Azure OpenAI REST API version for Slow Thinker LLM. + + Returns: + A fully wired ``RagMemoryRouter`` (not yet started). + """ + # Embedding provider — uses dedicated RAG_EMBEDDING_* credentials + emb_endpoint = RAG_EMBEDDING_ENDPOINT + emb_api_key = RAG_EMBEDDING_API_KEY + emb_deployment = RAG_EMBEDDING_DEPLOYMENT + emb_dimension = RAG_EMBEDDING_DIMENSION + + if not emb_endpoint or not emb_api_key: + raise ValueError( + "RAG_EMBEDDING_ENDPOINT and RAG_EMBEDDING_API_KEY env vars are required." + ) + + embedding_provider = EmbeddingProvider( + api_key=emb_api_key, + endpoint=emb_endpoint, + deployment=emb_deployment, + dimension=emb_dimension, + ) + + # SlowThinker LLM — still uses main Azure OpenAI credentials + pred_api_key = ( + kb_config.prediction_llm_api_key or azure_api_key or AZURE_OPENAI_API_KEY + ) or None + pred_endpoint = ( + kb_config.prediction_llm_endpoint or azure_endpoint or AZURE_OPENAI_ENDPOINT + ) or None + pred_deployment = ( + kb_config.prediction_llm_model + or azure_prediction_deployment + or AZURE_BREEZE_BUDDY_OPENAI_MODEL + ) + + # PgVectorStore — stateless, instant, no index build needed + vector_store = get_pg_vector_store( + merchant_id=merchant_id, + template_id=template_id, + dimension=emb_dimension, + ) + + # Per-call semantic cache (in-process FAISS, ephemeral) + cache = SemanticCache( + dimension=embedding_provider.dimension, + max_size=kb_config.cache_max_size, + default_ttl=kb_config.cache_ttl_seconds, + similarity_threshold=kb_config.cache_similarity_threshold, + ) + + metrics = RagMetrics() + + # Build LLM client for Slow Thinker predictions + llm_client = cls._build_llm_client( + pred_api_key, pred_endpoint, azure_api_version + ) + + slow_thinker = SlowThinker( + vector_store=vector_store, + embedding_provider=embedding_provider, + cache=cache, + metrics=metrics, + max_predictions=kb_config.max_predictions, + prefetch_top_k=kb_config.prefetch_top_k, + rate_limit_secs=kb_config.slow_thinker_rate_limit, + llm_client=llm_client, + llm_model=pred_deployment, + ) + + fast_talker = FastTalker( + vector_store=vector_store, + embedding_provider=embedding_provider, + cache=cache, + metrics=metrics, + top_k=kb_config.top_k, + ) + + logger.info( + "RagMemoryRouter built for %s/%s (pgvector store)", + merchant_id, + template_id, + ) + + return cls( + fast_talker=fast_talker, + slow_thinker=slow_thinker, + cache=cache, + metrics=metrics, + kb_config=kb_config, + merchant_id=merchant_id, + template_id=template_id, + ) + + @staticmethod + def _build_llm_client( + api_key: Optional[str], endpoint: Optional[str], api_version: str + ) -> Optional[object]: + """Build an Azure OpenAI client for Slow Thinker predictions.""" + if not api_key or not endpoint: + logger.warning( + "SlowThinker LLM credentials missing — will use keyword fallback" + ) + return None + try: + from openai import AzureOpenAI # type: ignore + + return AzureOpenAI( + api_key=api_key, + azure_endpoint=endpoint, + api_version=api_version, + ) + except ImportError: + logger.warning( + "openai package not available — Slow Thinker will use keyword fallback" + ) + return None + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def start(self) -> None: + """Start the background Slow Thinker task.""" + await self._slow_thinker.start() + + async def stop(self) -> None: + """Stop the background Slow Thinker task and log final metrics.""" + await self._slow_thinker.stop() + m = self._metrics + logger.info( + "RagMemoryRouter stopped | queries=%d cache_hits=%d cache_misses=%d " + "hit_rate=%.0f%% prefetch_ops=%d predictions=%d", + m.total_queries, + m.cache_hits, + m.cache_misses, + m.cache_hit_rate * 100, + m.prefetch_operations, + m.predictions_made, + ) + + # ------------------------------------------------------------------ + # Main API + # ------------------------------------------------------------------ + + async def get_context( + self, + user_utterance: str, + conversation_history: Optional[str] = None, + ) -> str: + """Return formatted context for the user's utterance. + + This should be called *synchronously in the LLM context-assembly phase* + (i.e. after the STT transcript is available but before the first LLM + token is generated). + + The SlowThinker is triggered in the background so it can pre-warm the + cache for the *next* user turn while the LLM is currently responding. + + Args: + user_utterance: The finalised STT transcript for the current turn. + conversation_history: Optional formatted conversation history string + (User: …\\nAssistant: …). Used by SlowThinker for better + prediction. Pass ``None`` if not available. + + Returns: + Formatted context string (empty string if nothing relevant found). + """ + # Update local conversation history + if user_utterance and user_utterance.strip(): + self._conversation_history.append(f"User: {user_utterance}") + # Keep a rolling window to stay within the prompt budget + if len(self._conversation_history) > 20: + self._conversation_history = self._conversation_history[-20:] + + if not user_utterance or not user_utterance.strip(): + return "" + + history = conversation_history or "\n".join(self._conversation_history) + + # Trigger SlowThinker in the background (non-blocking) + self._slow_thinker.on_user_utterance(user_utterance, history) + + # Retrieve context from cache / FAISS (fast path) + context = await self._fast_talker.get_context(user_utterance) + return context + + def record_assistant_response(self, response_text: str) -> None: + """Call this after the bot finishes speaking to keep history up-to-date. + + Args: + response_text: The full assistant response for the completed turn. + """ + if response_text: + self._conversation_history.append(f"Assistant: {response_text}") + + @property + def metrics(self) -> RagMetrics: + """Access live metrics for this router instance.""" + return self._metrics + + @property + def knowledge_base_size(self) -> int: + """Number of entries currently cached in the per-call semantic cache. + + The pgvector store is stateless (no in-process index), so the total + number of indexed chunks is not available without a database round-trip. + Use ``index_manager.get_cached_index_stats()`` for the persistent count. + """ + return self._cache.size + + @property + def gcs_path(self) -> str: + """Full GCS path for this router's knowledge base.""" + gcs_bucket = RAG_GCS_BUCKET or "" + return f"gs://{gcs_bucket}/{self._merchant_id}/{self._template_id}/" diff --git a/app/ai/voice/agents/breeze_buddy/services/rag/semantic_cache.py b/app/ai/voice/agents/breeze_buddy/services/rag/semantic_cache.py new file mode 100644 index 000000000..4ded7a7e4 --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/services/rag/semantic_cache.py @@ -0,0 +1,282 @@ +""" +In-process FAISS semantic cache for the Breeze Buddy RAG service. + +The cache is the bridge between the SlowThinker (writer) and the FastTalker +(reader). Entries are indexed by the embedding of the retrieval query used +when they were cached, and the cached value is the pre-fetched chunk/context +associated with that query. Semantically similar future queries reuse the +same prefetched result. + +Design decisions +---------------- +* **Inner-product index over normalised vectors** – equivalent to cosine sim, + faster than L2. +* **Async lock on both reads and writes** – SlowThinker writes and FastTalker + reads run concurrently in separate asyncio tasks. ``_rebuild_index()`` + replaces ``self._index`` and ``self._entries`` in place, so an unguarded + ``get()`` can race with a concurrent ``put()``/eviction and crash or return + stale indices. Both paths acquire ``self._lock``. +* **LRU eviction** – when at capacity the entry with the oldest + ``last_accessed`` timestamp is removed. +* **TTL eviction** – expired entries are purged lazily on each ``put``. +* **Rebuild-on-evict** – FAISS flat indexes do not support deletion, so the + index is rebuilt when entries are removed. Because the cache is small + (default 1000 entries) this is fast (< 0.1 ms for 1000 × 1536-dim). +""" + +from __future__ import annotations + +import asyncio +import time +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +import faiss +import numpy as np + +from app.core.logger import logger + + +@dataclass +class CachedContext: + """One entry in the semantic cache.""" + + text: str + metadata: Dict + embedding: np.ndarray + relevance_score: float + ttl: float + created_at: float = field(default_factory=time.time) + last_accessed: float = field(default_factory=time.time) + + @property + def is_expired(self) -> bool: + return (time.time() - self.created_at) > self.ttl + + +class SemanticCache: + """FAISS-backed in-memory semantic cache. + + Args: + dimension: Embedding dimensionality (must match the vector store). + max_size: Maximum number of cached entries. + default_ttl: Default TTL in seconds for new entries. + similarity_threshold: Minimum cosine similarity to count as a cache hit. + """ + + def __init__( + self, + dimension: int, + max_size: int = 1000, + default_ttl: float = 300.0, + similarity_threshold: float = 0.40, + ) -> None: + self._dimension = dimension + self._max_size = max_size + self._default_ttl = default_ttl + self._similarity_threshold = similarity_threshold + self._lock = asyncio.Lock() + + self._entries: List[CachedContext] = [] + self._index = faiss.IndexFlatIP(dimension) + + # ------------------------------------------------------------------ + # Properties + # ------------------------------------------------------------------ + + @property + def size(self) -> int: + return len(self._entries) + + # ------------------------------------------------------------------ + # Write path + # ------------------------------------------------------------------ + + async def put( + self, + query_embedding: np.ndarray, + text: str, + metadata: Optional[Dict] = None, + relevance_score: float = 1.0, + ttl: Optional[float] = None, + ) -> None: + """Insert or refresh a single cache entry. + + If a near-identical vector (cosine ≥ 0.95) is already cached, we + refresh it in place instead of adding a duplicate. + + Args: + query_embedding: Embedding used as the cache key. + text: Chunk text. + metadata: Source metadata. + relevance_score: Retrieval score from the vector store. + ttl: Override the default TTL. + """ + async with self._lock: + self._put_locked(query_embedding, text, metadata, relevance_score, ttl) + + async def put_batch( + self, + entries: List[Dict[str, Any]], + ) -> None: + """Insert multiple cache entries under a single lock acquisition. + + Each element of *entries* is a dict with keys: + ``query_embedding``, ``text``, ``metadata`` (optional), + ``relevance_score`` (optional), ``ttl`` (optional). + + This is more efficient than calling ``put()`` N times because the lock + is acquired only once. + """ + async with self._lock: + for e in entries: + self._put_locked( + e["query_embedding"], + e["text"], + e.get("metadata"), + e.get("relevance_score", 1.0), + e.get("ttl"), + ) + + # ------------------------------------------------------------------ + # Read path (lock-protected — SlowThinker writes and FastTalker reads + # run as concurrent asyncio tasks; _rebuild_index() replaces _index + # and _entries in place, so reads must hold the same lock as writes) + # ------------------------------------------------------------------ + + async def get( + self, + query_embedding: np.ndarray, + top_k: int = 5, + similarity_threshold: Optional[float] = None, + ) -> List[CachedContext]: + """Look up cached context for a query embedding. + + Args: + query_embedding: The embedding of the user's current utterance. + top_k: Maximum results to return. + similarity_threshold: Override the instance-level threshold. + + Returns: + List of non-expired ``CachedContext`` entries sorted by relevance + descending, or an empty list on a cache miss. + """ + threshold = ( + similarity_threshold + if similarity_threshold is not None + else self._similarity_threshold + ) + + q = query_embedding.reshape(1, -1).astype(np.float32).copy() + faiss.normalize_L2(q) + + async with self._lock: + if self._index.ntotal == 0: + return [] + + k = min(top_k, self._index.ntotal) + scores, idxs = self._index.search(q, k) # type: ignore[call-arg] + + now = time.time() + hits: List[CachedContext] = [] + for score, idx in zip(scores[0], idxs[0]): + if idx == -1 or idx >= len(self._entries): + continue + entry = self._entries[idx] + if entry.is_expired: + continue + if score >= threshold: + entry.last_accessed = now + hits.append(entry) + + if hits: + logger.debug( + "SemanticCache HIT: %d chunks (best=%.3f)", len(hits), scores[0][0] + ) + else: + logger.debug( + "SemanticCache MISS (best=%.3f < threshold=%.3f)", + scores[0][0] if scores[0][0] > -1 else 0.0, + threshold, + ) + + return sorted(hits, key=lambda e: e.relevance_score, reverse=True) + + # ------------------------------------------------------------------ + # Internal (must be called with lock held) + # ------------------------------------------------------------------ + + def _put_locked( + self, + query_embedding: np.ndarray, + text: str, + metadata: Optional[Dict], + relevance_score: float, + ttl: Optional[float], + ) -> None: + # Check for near-duplicate + if self._index.ntotal > 0: + q = query_embedding.reshape(1, -1).astype(np.float32).copy() + faiss.normalize_L2(q) + scores, idxs = self._index.search(q, 1) # type: ignore[call-arg] + if scores[0][0] > 0.95 and idxs[0][0] != -1: + idx = int(idxs[0][0]) + if idx < len(self._entries): + self._entries[idx].text = text + self._entries[idx].relevance_score = relevance_score + self._entries[idx].created_at = time.time() + self._entries[idx].last_accessed = time.time() + return # refreshed — done + + # Evict expired entries before adding + self._evict_expired() + + # Evict LRU if still at capacity + if len(self._entries) >= self._max_size: + self._evict_lru() + + # Add entry + emb = query_embedding.reshape(1, -1).astype(np.float32).copy() + faiss.normalize_L2(emb) + self._index.add(emb) # type: ignore[call-arg] + self._entries.append( + CachedContext( + text=text, + metadata=metadata or {}, + embedding=query_embedding.copy(), + relevance_score=relevance_score, + ttl=ttl if ttl is not None else self._default_ttl, + ) + ) + + def _evict_expired(self, max_age: Optional[float] = None) -> int: + now = time.time() + to_keep: List[int] = [] + removed = 0 + for i, entry in enumerate(self._entries): + expired = entry.is_expired + if max_age is not None: + expired = expired or (now - entry.created_at) > max_age + if not expired: + to_keep.append(i) + else: + removed += 1 + if removed: + self._rebuild_index(to_keep) + return removed + + def _evict_lru(self) -> None: + if not self._entries: + return + lru_idx = min( + range(len(self._entries)), key=lambda i: self._entries[i].last_accessed + ) + self._rebuild_index([i for i in range(len(self._entries)) if i != lru_idx]) + + def _rebuild_index(self, keep_indices: List[int]) -> None: + self._entries = [self._entries[i] for i in keep_indices] + self._index = faiss.IndexFlatIP(self._dimension) + if self._entries: + vectors = np.stack([e.embedding for e in self._entries]).astype(np.float32) + faiss.normalize_L2(vectors) + self._index.add(vectors) # type: ignore[call-arg] diff --git a/app/ai/voice/agents/breeze_buddy/services/rag/slow_thinker.py b/app/ai/voice/agents/breeze_buddy/services/rag/slow_thinker.py new file mode 100644 index 000000000..f52f76ce7 --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/services/rag/slow_thinker.py @@ -0,0 +1,279 @@ +""" +SlowThinker – background async agent for Breeze Buddy RAG. + +Subscribes to the conversation stream, predicts likely follow-up topics using +the Azure OpenAI LLM, retrieves relevant chunks from the pgvector store, and +pre-warms the semantic cache. All of this happens in the background while the +bot is generating and speaking its response, so the *next* user turn finds +context already waiting in the cache. + +Key design points: +- Prediction and prefetch tasks run in parallel via ``asyncio.gather``. +- All embed calls for a single turn are batched into one HTTP request. +- A rate limiter prevents prediction storms during rapid turns. +""" + +from __future__ import annotations + +import asyncio +import time +from typing import List, Optional + +from app.ai.voice.agents.breeze_buddy.services.rag.embeddings import EmbeddingProvider +from app.ai.voice.agents.breeze_buddy.services.rag.semantic_cache import SemanticCache +from app.ai.voice.agents.breeze_buddy.services.rag.types import RagMetrics +from app.ai.voice.agents.breeze_buddy.services.rag.vector_store import PgVectorStore +from app.core.logger import logger + +# --------------------------------------------------------------------------- +# Prompt template +# --------------------------------------------------------------------------- + +_PREDICT_PROMPT = """\ +Conversation so far: +{conversation} + +Latest user message: {latest} + +List {n} short document-search phrases (NOT questions) predicting what the user +will ask about next. Each phrase should match knowledge-base content. +Reply with a numbered list only — nothing else. +1.""" + + +class SlowThinker: + """Background async agent that pre-warms the semantic cache. + + Args: + vector_store: The knowledge-base vector store (``PgVectorStore``). + embedding_provider: Shared embedding provider. + cache: The semantic cache to populate. + metrics: Shared metrics instance. + max_predictions: How many follow-up topics to predict per turn. + prefetch_top_k: Chunks to retrieve per predicted topic. + rate_limit_secs: Minimum seconds between prediction cycles. + llm_client: An ``openai.AzureOpenAI`` client for predictions. + If ``None``, keyword-based fallback is used. + llm_model: Azure deployment name for the prediction LLM. + """ + + def __init__( + self, + vector_store: PgVectorStore, + embedding_provider: EmbeddingProvider, + cache: SemanticCache, + metrics: RagMetrics, + max_predictions: int = 4, + prefetch_top_k: int = 10, + rate_limit_secs: float = 0.5, + llm_client: Optional[object] = None, + llm_model: str = "gpt-4o-mini", + ) -> None: + self._store = vector_store + self._embeddings = embedding_provider + self._cache = cache + self._metrics = metrics + self._max_predictions = max_predictions + self._prefetch_top_k = prefetch_top_k + self._rate_limit = rate_limit_secs + self._llm = llm_client + self._llm_model = llm_model + + self._running = False + self._task: Optional[asyncio.Task] = None # type: ignore[type-arg] + self._queue: asyncio.Queue[tuple] = asyncio.Queue(maxsize=20) # type: ignore[type-arg] + self._last_run: float = 0.0 + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def start(self) -> None: + """Start the background processing loop.""" + self._running = True + self._task = asyncio.create_task(self._run(), name="rag-slow-thinker") + logger.debug("SlowThinker started") + + async def stop(self) -> None: + """Stop the background loop gracefully.""" + self._running = False + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + logger.debug("SlowThinker stopped") + + # ------------------------------------------------------------------ + # Public API (called from MemoryRouter on each user utterance) + # ------------------------------------------------------------------ + + def on_user_utterance(self, text: str, conversation_history: str) -> None: + """Enqueue a user utterance for background processing. + + Non-blocking — drops silently if the queue is full (back-pressure). + """ + try: + self._queue.put_nowait(("utterance", text, conversation_history)) + except asyncio.QueueFull: + logger.debug("SlowThinker queue full — dropping utterance") + + # ------------------------------------------------------------------ + # Background loop + # ------------------------------------------------------------------ + + async def _run(self) -> None: + while self._running: + try: + kind, text, history = await asyncio.wait_for( + self._queue.get(), timeout=5.0 + ) + now = time.time() + remaining = self._rate_limit - (now - self._last_run) + if remaining > 0: + # Rate-limited: wait out the remainder, then process the + # latest utterance (which may have already replaced stale + # ones via the queue drain below). + await asyncio.sleep(remaining) + # Drain any queued-up items; keep only the most recent one. + latest_kind, latest_text, latest_history = kind, text, history + while not self._queue.empty(): + try: + latest_kind, latest_text, latest_history = ( + self._queue.get_nowait() + ) + except asyncio.QueueEmpty: + break + kind, text, history = latest_kind, latest_text, latest_history + + self._last_run = time.time() + + if kind == "utterance": + await self._handle_utterance(text, history) + + except asyncio.TimeoutError: + continue + except asyncio.CancelledError: + break + except Exception as exc: + logger.error("SlowThinker error: %s", exc, exc_info=True) + + async def _handle_utterance(self, text: str, conversation_history: str) -> None: + """Predict follow-up topics and prefetch context into the cache. + + All queries (current utterance + predictions) are embedded in a single + batched HTTP call to reduce Azure round-trips from N+1 to 1. + """ + predictions = await self._predict_followups(text, conversation_history) + if predictions: + self._metrics.predictions_made += len(predictions) + + # Batch all embed calls: [current_utterance] + predictions + all_queries = [text] + predictions + try: + all_embeddings = await self._embeddings.embed(all_queries) + except Exception as exc: + logger.warning("SlowThinker batch embed failed: %s", exc) + return + + # Launch all retrieval+cache tasks in parallel + tasks = [ + asyncio.create_task( + self._retrieve_and_cache(all_queries[i], all_embeddings[i]) + ) + for i in range(len(all_queries)) + ] + await asyncio.gather(*tasks, return_exceptions=True) + + # ------------------------------------------------------------------ + # Core retrieval + # ------------------------------------------------------------------ + + async def _retrieve_and_cache( + self, query: str, query_embedding: "np.ndarray" # type: ignore[name-defined] + ) -> None: + """Search the vector store with a pre-computed embedding and populate the cache.""" + try: + results = await self._store.search( + query_embedding, top_k=self._prefetch_top_k + ) + await self._cache.put_batch( + [ + { + "query_embedding": query_embedding, + "text": r.text, + "metadata": r.metadata, + "relevance_score": r.score, + } + for r in results + ] + ) + self._metrics.prefetch_operations += 1 + logger.debug( + "SlowThinker: prefetched %d chunks for '%s…'", + len(results), + query[:40], + ) + except Exception as exc: + logger.warning("SlowThinker retrieval error: %s", exc) + + # ------------------------------------------------------------------ + # Prediction + # ------------------------------------------------------------------ + + async def _predict_followups( + self, latest: str, conversation_history: str + ) -> List[str]: + """Predict likely follow-up topics using the LLM (or keyword fallback).""" + if self._llm is None: + return self._keyword_fallback(latest) + try: + return await self._predict_with_llm(latest, conversation_history) + except Exception as exc: + logger.warning( + "SlowThinker LLM prediction failed (%s); using keywords", exc + ) + return self._keyword_fallback(latest) + + async def _predict_with_llm( + self, latest: str, conversation_history: str + ) -> List[str]: + """Call Azure OpenAI for follow-up topic predictions.""" + prompt = _PREDICT_PROMPT.format( + conversation=( + conversation_history[-1500:] if conversation_history else "(none)" + ), + latest=latest, + n=self._max_predictions, + ) + + response = await asyncio.to_thread(self._llm_call_sync, prompt) # type: ignore[arg-type] + + predictions: List[str] = [] + for line in response.strip().splitlines(): + line = line.strip() + if line and line[0].isdigit(): + line = line.split(".", 1)[-1].strip() + elif line.startswith("- "): + line = line[2:].strip() + if line: + predictions.append(line) + return predictions[: self._max_predictions] + + def _llm_call_sync(self, prompt: str) -> str: + """Synchronous Azure OpenAI chat completion (runs in thread pool).""" + response = self._llm.chat.completions.create( # type: ignore[union-attr] + model=self._llm_model, + messages=[{"role": "user", "content": prompt}], + max_tokens=150, + temperature=0.3, + ) + return response.choices[0].message.content or "" + + def _keyword_fallback(self, text: str) -> List[str]: + """Simple keyword extraction as fallback when LLM is unavailable.""" + words = [w.strip(".,!?;:") for w in text.split() if len(w) > 4] + if words: + return [" ".join(words[:6])] + return [] diff --git a/app/ai/voice/agents/breeze_buddy/services/rag/types.py b/app/ai/voice/agents/breeze_buddy/services/rag/types.py new file mode 100644 index 000000000..41433bda8 --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/services/rag/types.py @@ -0,0 +1,139 @@ +""" +Pydantic types for the Breeze Buddy RAG service. +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + + +class KnowledgeBaseConfig(BaseModel): + """Template-level knowledge-base configuration. + + Stored under ``configurations.knowledge_base`` in the template JSON. + Knowledge files are stored in GCS at: + ``gs://///`` + + The bucket is set globally via the ``RAG_GCS_BUCKET`` env var. + The path is derived automatically from the template — no manual config needed. + + Example — enable RAG with defaults:: + + { "knowledge_base": {} } + + Example — with custom chunking:: + + { + "knowledge_base": { + "chunk_size": 400, + "chunk_overlap": 40, + "top_k": 5 + } + } + """ + + # Supported file extensions to ingest + extensions: List[str] = Field( + default=[".txt", ".md", ".rst", ".text", ".pdf"], + description="File extensions to ingest from GCS.", + ) + + # Chunking + chunk_size: int = Field( + 512, + ge=64, + le=4096, + description="Maximum characters per document chunk.", + ) + chunk_overlap: int = Field( + 64, + ge=0, + le=512, + description="Overlap characters between adjacent chunks.", + ) + + # Retrieval + top_k: int = Field( + 6, + ge=1, + le=20, + description="Number of chunks to retrieve per query.", + ) + prefetch_top_k: int = Field( + 10, + ge=1, + le=40, + description="Number of chunks to prefetch per Slow Thinker prediction.", + ) + + # Cache + cache_max_size: int = Field( + 1000, + ge=100, + description="Maximum number of entries in the semantic cache.", + ) + cache_ttl_seconds: float = Field( + 300.0, + ge=30.0, + description="Time-to-live for cache entries in seconds.", + ) + cache_similarity_threshold: float = Field( + 0.40, + ge=0.0, + le=1.0, + description="Cosine-similarity threshold for a cache hit.", + ) + + # Slow Thinker + max_predictions: int = Field( + 4, + ge=1, + le=10, + description="Number of follow-up topics for the Slow Thinker to predict per turn.", + ) + slow_thinker_rate_limit: float = Field( + 0.5, + ge=0.0, + description="Minimum seconds between Slow Thinker prediction cycles.", + ) + + # LLM provider for predictions (falls back to Azure if not set) + prediction_llm_api_key: Optional[str] = Field( + None, + description="Azure OpenAI API key used for Slow Thinker predictions. " + "Defaults to the global AZURE_OPENAI_API_KEY env var.", + ) + prediction_llm_endpoint: Optional[str] = Field( + None, + description="Azure OpenAI endpoint used for Slow Thinker predictions. " + "Defaults to the global AZURE_OPENAI_ENDPOINT env var.", + ) + prediction_llm_model: Optional[str] = Field( + None, + description="Azure OpenAI deployment name for predictions. " + "Defaults to AZURE_BREEZE_BUDDY_OPENAI_MODEL.", + ) + + +class DocumentChunk(BaseModel): + """A chunk of a document ready for embedding.""" + + text: str + metadata: Dict[str, Any] = Field(default_factory=dict) + + +class RagMetrics(BaseModel): + """Aggregate runtime metrics for one MemoryRouter instance.""" + + cache_hits: int = 0 + cache_misses: int = 0 + prefetch_operations: int = 0 + predictions_made: int = 0 + total_queries: int = 0 + + @property + def cache_hit_rate(self) -> float: + total = self.cache_hits + self.cache_misses + return self.cache_hits / total if total > 0 else 0.0 diff --git a/app/ai/voice/agents/breeze_buddy/services/rag/vector_store.py b/app/ai/voice/agents/breeze_buddy/services/rag/vector_store.py new file mode 100644 index 000000000..f8a970791 --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/services/rag/vector_store.py @@ -0,0 +1,98 @@ +""" +Vector store for the Breeze Buddy RAG pipeline. + +``PgVectorStore`` is a lightweight stateless handle that delegates all search +to the ``rag_embeddings`` PostgreSQL table via pgvector's ``<=>`` cosine +distance operator. There is no in-process state beyond the merchant/template +identifiers, so it is safe to share across async tasks or instantiate fresh +per call with zero cold-start latency. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +import numpy as np + +from app.core.logger import logger + + +@dataclass +class SearchResult: + """A single vector-search result.""" + + text: str + metadata: Dict[str, Any] + score: float + embedding: Optional[np.ndarray] = field(default=None, repr=False) + + +class PgVectorStore: + """Stateless vector store backed by pgvector in PostgreSQL. + + Each ``search()`` call acquires a connection from the shared asyncpg pool + and executes a cosine-similarity query via the ``<=>`` operator. + + Args: + merchant_id: Merchant identifier. + template_id: Template UUID. + dimension: Expected embedding dimension (default 1536). + """ + + def __init__( + self, + merchant_id: str, + template_id: str, + dimension: int = 1536, + ) -> None: + self._merchant_id = merchant_id + self._template_id = template_id + self._dimension = dimension + + @property + def dimension(self) -> int: + return self._dimension + + async def search( + self, + query_embedding: np.ndarray, + top_k: int = 5, + ) -> List[SearchResult]: + """Cosine similarity search via pgvector. + + Args: + query_embedding: 1-D float32 array of length ``dimension``. + top_k: Maximum number of results. + + Returns: + List of ``SearchResult`` sorted by similarity descending. + """ + from app.database import get_pool + from app.database.accessor.breeze_buddy.rag_embeddings import search_chunks + + async with get_pool().acquire() as conn: + rows = await search_chunks( + conn, + self._merchant_id, + self._template_id, + query_embedding, + top_k=top_k, + ) + + results = [ + SearchResult( + text=row["text"], + metadata=row["metadata"], + score=row["score"], + ) + for row in rows + ] + + logger.debug( + "PgVectorStore: %d results for %s/%s", + len(results), + self._merchant_id, + self._template_id, + ) + return results diff --git a/app/ai/voice/agents/breeze_buddy/template/types.py b/app/ai/voice/agents/breeze_buddy/template/types.py index 7082a309c..9d787a88b 100644 --- a/app/ai/voice/agents/breeze_buddy/template/types.py +++ b/app/ai/voice/agents/breeze_buddy/template/types.py @@ -7,6 +7,7 @@ from pydantic import BaseModel, ConfigDict, Field, SecretStr, model_validator +from app.ai.voice.agents.breeze_buddy.services.rag.types import KnowledgeBaseConfig from app.ai.voice.llm.types import LLMConfiguration from app.core.deprecation import log_deprecated_fields @@ -570,6 +571,15 @@ class ConfigurationModel(BaseModel): description="LLM provider and model configuration", ) + # --- RAG Knowledge Base --- + knowledge_base: Optional[KnowledgeBaseConfig] = Field( + None, + description="Knowledge-base configuration for voice-optimised RAG. " + "When set, Breeze Buddy pre-loads the specified GCS bucket/prefix into a " + "FAISS index and injects relevant context into every LLM turn using the " + "dual-agent (SlowThinker + FastTalker) architecture.", + ) + @model_validator(mode="after") def _backfill_legacy_from_stt_config(self): """Mirror stt_configuration values to legacy fields for backward compat. diff --git a/app/api/routers/breeze_buddy/__init__.py b/app/api/routers/breeze_buddy/__init__.py index e072acd4f..4c460aa11 100644 --- a/app/api/routers/breeze_buddy/__init__.py +++ b/app/api/routers/breeze_buddy/__init__.py @@ -16,6 +16,9 @@ # Daily transport (web/mobile clients via Daily.co) from app.api.routers.breeze_buddy.daily import router as daily_router from app.api.routers.breeze_buddy.demo import router as demo_router + +# RAG knowledge base management +from app.api.routers.breeze_buddy.knowledge_base import router as knowledge_base_router from app.api.routers.breeze_buddy.leads import router as leads_router from app.api.routers.breeze_buddy.merchants import router as merchants_router from app.api.routers.breeze_buddy.numbers import router as numbers_router @@ -80,3 +83,6 @@ router.include_router(pod_router, prefix="/pod", tags=["pod"]) router.include_router(daily_router, prefix="", tags=["daily"]) + +# RAG knowledge base management (upload, index, status, invalidate) +router.include_router(knowledge_base_router, prefix="", tags=["knowledge-base"]) diff --git a/app/api/routers/breeze_buddy/knowledge_base/__init__.py b/app/api/routers/breeze_buddy/knowledge_base/__init__.py new file mode 100644 index 000000000..015c98178 --- /dev/null +++ b/app/api/routers/breeze_buddy/knowledge_base/__init__.py @@ -0,0 +1,419 @@ +""" +Knowledge Base management endpoints for Breeze Buddy RAG. + +These endpoints let operators manage the RAG knowledge base for a given +Breeze Buddy template without touching GCS directly. + +Knowledge files live at: + gs:///// + +Endpoints: + POST /knowledge-base/upload - Upload a file into the template's GCS folder + POST /knowledge-base/index - Trigger (re-)indexing for a template + GET /knowledge-base/status - Index stats for a template + DELETE /knowledge-base/invalidate - Delete all pgvector chunks for a template + +All endpoints require a valid Breeze Buddy JWT (admin role). +""" + +import asyncio +from typing import Optional + +from fastapi import ( + APIRouter, + BackgroundTasks, + Depends, + File, + HTTPException, + Query, + UploadFile, + status, +) +from pydantic import BaseModel + +from app.api.security.breeze_buddy.rbac_token import get_current_user_with_rbac +from app.core.config.static import GCS_CREDENTIALS_JSON, RAG_ENABLED, RAG_GCS_BUCKET +from app.core.logger import logger +from app.core.security.authorization import require_admin +from app.schemas import UserInfo + +router = APIRouter() + + +# --------------------------------------------------------------------------- +# Response models +# --------------------------------------------------------------------------- + + +class UploadResponse(BaseModel): + status: str + gcs_path: str + size_bytes: int + message: str + + +class IndexResponse(BaseModel): + status: str + template_id: str + gcs_path: str + message: str + + +class StatusResponse(BaseModel): + template_id: str + gcs_path: str + chunk_count: int + total_documents: int + index_size_bytes: int + last_indexed_at: Optional[str] + error_message: Optional[str] + rag_enabled_globally: bool + + +class InvalidateResponse(BaseModel): + status: str + gcs_path: str + message: str + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _require_rag_enabled() -> None: + """Raise 503 if RAG is disabled globally.""" + if not RAG_ENABLED: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="RAG is disabled globally (RAG_ENABLED=false).", + ) + + +def _require_rag_bucket() -> str: + """Return the RAG GCS bucket or raise 503 if not configured.""" + if not RAG_GCS_BUCKET: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="RAG_GCS_BUCKET env var is not configured on the server.", + ) + return RAG_GCS_BUCKET + + +async def _load_template(template_id: str): + """Return (template, kb_config) or raise 404/422.""" + from app.database.accessor.breeze_buddy.template import get_template_by_id + + template = await get_template_by_id(template_id) + if template is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Template '{template_id}' not found.", + ) + + configurations = template.configurations + if configurations is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Template has no configurations block.", + ) + + kb_config = getattr(configurations, "knowledge_base", None) + if kb_config is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Template does not have a knowledge_base configuration.", + ) + + return template, kb_config + + +def _gcs_prefix(merchant_id: Optional[str], template_id: str) -> str: + """Return the canonical GCS prefix for a template's knowledge files.""" + merchant = merchant_id or "default" + return f"{merchant}/{template_id}/" + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + + +@router.post( + "/knowledge-base/upload", + response_model=UploadResponse, + status_code=status.HTTP_201_CREATED, + summary="Upload a knowledge file to GCS", + tags=["knowledge-base"], +) +async def upload_knowledge_file( + template_id: str = Query( + ..., description="Template UUID to associate the file with" + ), + file: UploadFile = File(..., description="File to upload (.txt, .md, .pdf, etc.)"), + current_user: UserInfo = Depends(get_current_user_with_rbac), +): + """ + Upload a document to the template's GCS knowledge folder: + ``gs://///`` + + After uploading, call ``POST /knowledge-base/index`` to rebuild the index. + + Permissions: admin only. + """ + require_admin(current_user) + _require_rag_enabled() + gcs_bucket = _require_rag_bucket() + + if not GCS_CREDENTIALS_JSON: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="GCS_CREDENTIALS_JSON is not configured on the server.", + ) + + template, _kb_config = await _load_template(template_id) + prefix = _gcs_prefix(template.merchant_id, template_id) + + try: + import json + + from google.cloud import storage + from google.oauth2 import service_account + + creds_dict = json.loads(GCS_CREDENTIALS_JSON) + credentials = service_account.Credentials.from_service_account_info(creds_dict) + client = storage.Client( + project=creds_dict.get("project_id"), credentials=credentials + ) + + blob_name = f"{prefix}{file.filename}" + blob = client.bucket(gcs_bucket).blob(blob_name) + + content = await file.read() + await asyncio.to_thread( + blob.upload_from_string, + content, + content_type=file.content_type or "application/octet-stream", + ) + + gcs_path = f"gs://{gcs_bucket}/{blob_name}" + logger.info( + "Knowledge file uploaded: %s (%d bytes) by %s", + gcs_path, + len(content), + current_user.username, + ) + + return UploadResponse( + status="success", + gcs_path=gcs_path, + size_bytes=len(content), + message=f"File uploaded to {gcs_path}. Run /knowledge-base/index to rebuild the index.", + ) + except HTTPException: + raise + except Exception as exc: + logger.error("Knowledge file upload failed: %s", exc) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Upload failed. Check server logs for details.", + ) + + +@router.post( + "/knowledge-base/index", + response_model=IndexResponse, + status_code=status.HTTP_202_ACCEPTED, + summary="Trigger a (re-)index of the knowledge base", + tags=["knowledge-base"], +) +async def index_knowledge_base( + background_tasks: BackgroundTasks, + template_id: str = Query(..., description="Template UUID to index"), + current_user: UserInfo = Depends(get_current_user_with_rbac), +): + """ + Enqueue a (re-)index of the template's GCS knowledge folder into pgvector. + + Returns **202 Accepted** immediately; the indexing job runs in the + background. Poll ``GET /knowledge-base/status`` to check progress. + + Permissions: admin only. + """ + require_admin(current_user) + _require_rag_enabled() + gcs_bucket = _require_rag_bucket() + + template, kb_config = await _load_template(template_id) + prefix = _gcs_prefix(template.merchant_id, template_id) + gcs_path = f"gs://{gcs_bucket}/{prefix}" + + from app.ai.voice.agents.breeze_buddy.services.rag.embeddings import ( + EmbeddingProvider, + ) + from app.ai.voice.agents.breeze_buddy.services.rag.index_manager import ( + build_knowledge_base, + ) + from app.core.config.static import ( + RAG_EMBEDDING_API_KEY, + RAG_EMBEDDING_DEPLOYMENT, + RAG_EMBEDDING_DIMENSION, + RAG_EMBEDDING_ENDPOINT, + ) + + emb_endpoint = RAG_EMBEDDING_ENDPOINT + emb_api_key = RAG_EMBEDDING_API_KEY + + if not emb_endpoint or not emb_api_key: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="RAG embedding credentials are not configured " + "(RAG_EMBEDDING_ENDPOINT / RAG_EMBEDDING_API_KEY).", + ) + + embedding_provider = EmbeddingProvider( + api_key=emb_api_key, + endpoint=emb_endpoint, + deployment=RAG_EMBEDDING_DEPLOYMENT, + dimension=RAG_EMBEDDING_DIMENSION, + ) + + async def _run_indexing() -> None: + try: + chunk_count = await build_knowledge_base( + kb_config=kb_config, + embedding_provider=embedding_provider, + merchant_id=template.merchant_id or "default", + template_id=template_id, + gcs_bucket=gcs_bucket, + gcs_prefix=prefix, + ) + logger.info( + "Knowledge base re-indexed for template %s: %d chunks at %s", + template_id, + chunk_count, + gcs_path, + ) + except Exception as exc: + logger.error( + "Background indexing failed for template %s: %s", template_id, exc + ) + + background_tasks.add_task(_run_indexing) + + return IndexResponse( + status="accepted", + template_id=template_id, + gcs_path=gcs_path, + message=f"Indexing job enqueued for {gcs_path}. Poll /knowledge-base/status to track progress.", + ) + + +@router.get( + "/knowledge-base/status", + response_model=StatusResponse, + summary="Get knowledge base index status", + tags=["knowledge-base"], +) +async def knowledge_base_status( + template_id: str = Query(..., description="Template UUID"), + current_user: UserInfo = Depends(get_current_user_with_rbac), +): + """ + Return live stats for the pgvector knowledge base for this template. + + If the knowledge base has not been indexed yet, numeric fields + will be 0 and ``last_indexed_at`` will be null. + + Permissions: admin only. + """ + require_admin(current_user) + _require_rag_enabled() + gcs_bucket = _require_rag_bucket() + + template, _kb_config = await _load_template(template_id) + prefix = _gcs_prefix(template.merchant_id, template_id) + gcs_path = f"gs://{gcs_bucket}/{prefix}" + + try: + from app.ai.voice.agents.breeze_buddy.services.rag.index_manager import ( + get_cached_index_stats, + ) + + stats = await get_cached_index_stats( + template.merchant_id or "default", template_id + ) + + return StatusResponse( + template_id=template_id, + gcs_path=gcs_path, + chunk_count=stats.get("chunk_count", 0), + total_documents=stats.get("total_documents", 0), + index_size_bytes=stats.get("index_size_bytes", 0), + last_indexed_at=stats.get("last_indexed_at"), + error_message=stats.get("error_message"), + rag_enabled_globally=RAG_ENABLED, + ) + except HTTPException: + raise + except Exception as exc: + logger.error("Failed to get KB status for template %s: %s", template_id, exc) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Status check failed. Check server logs for details.", + ) + + +@router.delete( + "/knowledge-base/invalidate", + response_model=InvalidateResponse, + summary="Invalidate (delete) the pgvector knowledge base chunks", + tags=["knowledge-base"], +) +async def invalidate_knowledge_base( + template_id: str = Query(..., description="Template UUID"), + current_user: UserInfo = Depends(get_current_user_with_rbac), +): + """ + Delete all pgvector chunks for this template's knowledge base. + + The next ``POST /knowledge-base/index`` call will rebuild the embeddings + from GCS. Use this after uploading new documents to force a clean re-index. + + Permissions: admin only. + """ + require_admin(current_user) + _require_rag_enabled() + gcs_bucket = _require_rag_bucket() + + template, _kb_config = await _load_template(template_id) + prefix = _gcs_prefix(template.merchant_id, template_id) + gcs_path = f"gs://{gcs_bucket}/{prefix}" + + try: + from app.ai.voice.agents.breeze_buddy.services.rag.index_manager import ( + invalidate_index, + ) + + await invalidate_index(template.merchant_id or "default", template_id) + + logger.info( + "Knowledge base index invalidated for template %s (%s) by %s", + template_id, + gcs_path, + current_user.username, + ) + + return InvalidateResponse( + status="success", + gcs_path=gcs_path, + message=f"Index for {gcs_path} deleted from pgvector.", + ) + except HTTPException: + raise + except Exception as exc: + logger.error("KB invalidation failed for template %s: %s", template_id, exc) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Invalidation failed. Check server logs for details.", + ) diff --git a/app/core/config/static.py b/app/core/config/static.py index 0a7693b92..58ad81c72 100644 --- a/app/core/config/static.py +++ b/app/core/config/static.py @@ -262,6 +262,29 @@ "AZURE_BREEZE_BUDDY_OPENAI_MODEL", "gpt-4o-automatic" ) +# RAG (Retrieval-Augmented Generation) Configuration — Breeze Buddy +# Dedicated GCS bucket for knowledge files. Structure: /// +RAG_GCS_BUCKET = os.environ.get("RAG_GCS_BUCKET", "") + +# Embedding endpoint (Azure AI Foundry, OpenAI-compatible base_url pattern) +# endpoint : full base URL including trailing slash and "openai/v1/" +# api_key : API key from the Foundry deployment page +# deployment: model/deployment name shown in Foundry (e.g. "embed-v-4-0") +# dimension : output vector size — confirmed 1536 for embed-v4.0 +RAG_EMBEDDING_ENDPOINT = os.environ.get( + "RAG_EMBEDDING_ENDPOINT", + "https://breeze-automatic.services.ai.azure.com/openai/v1/", +) +RAG_EMBEDDING_API_KEY = os.environ.get("RAG_EMBEDDING_API_KEY", "") +RAG_EMBEDDING_DEPLOYMENT = os.environ.get("RAG_EMBEDDING_DEPLOYMENT", "embed-v-4-0") +RAG_EMBEDDING_DIMENSION = int(os.environ.get("RAG_EMBEDDING_DIMENSION", "1536")) + +# Master switch: set to "false" to disable RAG globally without changing templates +RAG_ENABLED = os.environ.get("RAG_ENABLED", "true").lower() == "true" + +# Maximum characters of RAG context injected per LLM turn (prevents prompt bloat) +RAG_MAX_CONTEXT_CHARS = int(os.environ.get("RAG_MAX_CONTEXT_CHARS", "1200")) + # Twilio settings TWILIO_ACCOUNT_SID = os.getenv("TWILIO_ACCOUNT_SID", "") TWILIO_AUTH_TOKEN = os.getenv("TWILIO_AUTH_TOKEN", "") diff --git a/app/database/__init__.py b/app/database/__init__.py index 6cdcd2a9e..8ce52e1e8 100644 --- a/app/database/__init__.py +++ b/app/database/__init__.py @@ -17,7 +17,61 @@ from app.core.logger import logger from app.services.aws.kms import decrypt_kms -pool = None +pool: asyncpg.Pool | None = None + + +def get_pool() -> asyncpg.Pool: + """Return the initialised connection pool, raising if not yet ready.""" + if pool is None: + raise RuntimeError( + "Database pool is not initialized. Call init_db_pool() first." + ) + return pool + + +async def _init_vector_codec(conn: asyncpg.Connection) -> None: + """Register pgvector <-> Python list codec on a connection. + + asyncpg does not know the ``vector`` type by default. We register a + text codec so that: + - Python lists / numpy arrays are sent as the text representation + ``'[0.1, -0.2, ...]'`` that pgvector accepts. + - Values read back from the DB are returned as Python lists of floats. + + This is called via ``init`` in ``create_pool`` so it runs for every + connection in the pool automatically. + + The ``vector`` extension itself is created by migration 023 — we do not + issue DDL here to avoid requiring elevated privileges on every connection. + """ + # Fetch the OID of the vector type (must already exist via migration) + vector_oid = await conn.fetchval( + "SELECT oid FROM pg_type WHERE typname = 'vector' LIMIT 1" + ) + if vector_oid is None: + logger.warning( + "pgvector extension not available — RAG vector search will not work" + ) + return + + def _encode_vector(value: object) -> str: # list[float] | np.ndarray → str + if hasattr(value, "tolist"): + value = value.tolist() # type: ignore[union-attr] + return "[" + ",".join(str(float(v)) for v in value) + "]" # type: ignore[arg-type] + + def _decode_vector(data: str) -> list: # '[0.1,…]' → list[float] + inner = data.strip("[]").strip() + if not inner: + return [] + return [float(x) for x in inner.split(",")] + + await conn.set_type_codec( + "vector", + encoder=_encode_vector, + decoder=_decode_vector, + schema="public", + format="text", + ) async def init_db_pool(): @@ -56,6 +110,7 @@ async def init_db_pool(): port=POSTGRES_PORT, min_size=POSTGRES_POOL_SIZE, max_size=POSTGRES_POOL_SIZE + POSTGRES_MAX_OVERFLOW, + init=_init_vector_codec, ) logger.info("Database pool initialized successfully.") except Exception as e: @@ -94,5 +149,6 @@ async def close_db_pool(): __all__ = [ "init_db_pool", "get_db_connection", + "get_pool", "close_db_pool", ] diff --git a/app/database/accessor/breeze_buddy/rag_embeddings.py b/app/database/accessor/breeze_buddy/rag_embeddings.py new file mode 100644 index 000000000..c64147332 --- /dev/null +++ b/app/database/accessor/breeze_buddy/rag_embeddings.py @@ -0,0 +1,173 @@ +""" +Database accessor for the rag_embeddings table. + +Provides: + upsert_knowledge_base — insert/replace all chunks for a template (re-index) + search_chunks — cosine similarity search via pgvector + get_kb_stats — chunk count, file count, last indexed timestamp + delete_knowledge_base — wipe all chunks for a template (invalidate) +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Tuple + +import numpy as np + +from app.core.logger import logger +from app.database.queries.breeze_buddy.rag_embeddings import ( + DELETE_ALL_CHUNKS_QUERY, + DELETE_STALE_CHUNKS_QUERY, + GET_KB_STATS_QUERY, + SEARCH_QUERY, + UPSERT_CHUNKS_QUERY, + upsert_chunks_args, +) + + +async def upsert_knowledge_base( + conn: Any, + merchant_id: str, + template_id: str, + texts: List[str], + embeddings: np.ndarray, + metadata: List[Dict[str, Any]], +) -> int: + """Insert or replace all chunks for a knowledge base. + + Uses ``executemany`` with the upsert query so each chunk is an atomic + INSERT … ON CONFLICT DO UPDATE. After upserting, stale rows (chunk_index + beyond the new count) are deleted so a shrinking knowledge base doesn't + leave orphan rows. + + Args: + conn: asyncpg connection (from pool.acquire()). + merchant_id: Merchant identifier. + template_id: Template UUID. + texts: Chunk text strings. + embeddings: Float32 numpy array of shape (N, 1536). + metadata: Per-chunk metadata dicts (must contain ``source`` key). + + Returns: + Number of chunks upserted. + """ + if not texts: + return 0 + + n = len(texts) + chunks: List[Tuple[int, str, str, List[float]]] = [] + for i, (text, emb, meta) in enumerate( + zip(texts, embeddings, metadata, strict=True) + ): + source = meta.get("source", "") + chunks.append((i, source, text, emb.tolist())) + + rows = upsert_chunks_args(merchant_id, template_id, chunks) + + await conn.executemany(UPSERT_CHUNKS_QUERY, rows) + + # Remove any rows with chunk_index >= new count (handles shrinking KBs) + await conn.execute(DELETE_STALE_CHUNKS_QUERY, merchant_id, template_id, n - 1) + + logger.info( + "rag_embeddings: upserted %d chunks for %s/%s", n, merchant_id, template_id + ) + return n + + +async def search_chunks( + conn: Any, + merchant_id: str, + template_id: str, + query_embedding: np.ndarray, + top_k: int = 6, +) -> List[Dict[str, Any]]: + """Cosine similarity search against rag_embeddings via pgvector. + + Args: + conn: asyncpg connection. + merchant_id: Merchant identifier. + template_id: Template UUID. + query_embedding: 1-D float32 numpy array of length 1536. + top_k: Maximum results to return. + + Returns: + List of dicts with keys: text, metadata, score, embedding (np.ndarray). + Sorted by score descending (highest similarity first). + """ + emb_list = query_embedding.tolist() + + rows = await conn.fetch( + SEARCH_QUERY, + emb_list, + merchant_id, + template_id, + top_k, + ) + + results = [] + for row in rows: + results.append( + { + "text": row["chunk_text"], + "metadata": { + "source": row["source_file"], + "chunk_index": row["chunk_index"], + }, + "score": float(row["score"]), + } + ) + + return results + + +async def get_kb_stats( + conn: Any, + merchant_id: str, + template_id: str, +) -> Dict[str, Any]: + """Return chunk count, file count, and last indexed timestamp. + + Args: + conn: asyncpg connection. + merchant_id: Merchant identifier. + template_id: Template UUID. + + Returns: + Dict with: chunk_count (int), file_count (int), + last_indexed_at (datetime | None). + """ + row = await conn.fetchrow(GET_KB_STATS_QUERY, merchant_id, template_id) + + return { + "chunk_count": int(row["chunk_count"]) if row else 0, + "file_count": int(row["file_count"]) if row else 0, + "last_indexed_at": row["last_indexed_at"] if row else None, + } + + +async def delete_knowledge_base( + conn: Any, + merchant_id: str, + template_id: str, +) -> int: + """Delete all chunks for a knowledge base. + + Args: + conn: asyncpg connection. + merchant_id: Merchant identifier. + template_id: Template UUID. + + Returns: + Number of rows deleted. + """ + result = await conn.execute(DELETE_ALL_CHUNKS_QUERY, merchant_id, template_id) + # result is a string like "DELETE 42" + deleted = int(result.split()[-1]) if result else 0 + logger.info( + "rag_embeddings: deleted %d chunks for %s/%s", + deleted, + merchant_id, + template_id, + ) + return deleted diff --git a/app/database/migrations/023_create_rag_embeddings_table.sql b/app/database/migrations/023_create_rag_embeddings_table.sql new file mode 100644 index 000000000..51d2a9d6f --- /dev/null +++ b/app/database/migrations/023_create_rag_embeddings_table.sql @@ -0,0 +1,56 @@ +-- Migration 023: Enable pgvector and create RAG embeddings table +-- +-- pgvector is pre-installed on Cloud SQL for PostgreSQL. +-- No OS-level install needed — just CREATE EXTENSION. +-- +-- Table design: +-- One row per document chunk per knowledge base (merchant + template). +-- Embeddings stored as vector(1536) — matches embed-v4.0 output dimension. +-- HNSW index for approximate nearest-neighbour search: +-- m=16, ef_construction=64 — good balance of speed vs accuracy for +-- knowledge bases up to ~50 000 chunks. +-- +-- Search operator: <=> (cosine distance). Lower = more similar. +-- To get cosine similarity: 1 - (embedding <=> query_vector) +-- +-- Chunks are keyed by (merchant_id, template_id, chunk_index) so a +-- re-index (upsert) replaces old chunks atomically without leaving orphans. + +BEGIN; + +-- Enable the pgvector extension (Cloud SQL: already installed, just needs enabling) +CREATE EXTENSION IF NOT EXISTS vector; + +-- RAG embeddings table +CREATE TABLE IF NOT EXISTS rag_embeddings ( + id BIGSERIAL PRIMARY KEY, + merchant_id TEXT NOT NULL, + template_id TEXT NOT NULL, + chunk_index INTEGER NOT NULL, + source_file TEXT NOT NULL DEFAULT '', + chunk_text TEXT NOT NULL, + embedding vector(1536) NOT NULL, + indexed_at TIMESTAMPTZ NOT NULL DEFAULT now(), + + -- Unique key for upsert: one chunk per position per knowledge base + CONSTRAINT rag_embeddings_kb_chunk_key + UNIQUE (merchant_id, template_id, chunk_index) +); + +-- HNSW index for fast cosine similarity search +-- Filters by (merchant_id, template_id) first, then ranks by vector distance. +-- Partial indexes per tenant are not needed at expected scale (<50k total chunks). +CREATE INDEX IF NOT EXISTS idx_rag_embeddings_hnsw + ON rag_embeddings + USING hnsw (embedding vector_cosine_ops) + WITH (m = 16, ef_construction = 64); + +-- B-tree index to make the WHERE merchant_id=? AND template_id=? filter fast +CREATE INDEX IF NOT EXISTS idx_rag_embeddings_kb + ON rag_embeddings (merchant_id, template_id); + +-- Index on indexed_at for TTL/freshness queries from the status endpoint +CREATE INDEX IF NOT EXISTS idx_rag_embeddings_indexed_at + ON rag_embeddings (merchant_id, template_id, indexed_at); + +COMMIT; diff --git a/app/database/queries/breeze_buddy/rag_embeddings.py b/app/database/queries/breeze_buddy/rag_embeddings.py new file mode 100644 index 000000000..0ef1a4f5b --- /dev/null +++ b/app/database/queries/breeze_buddy/rag_embeddings.py @@ -0,0 +1,107 @@ +""" +SQL query builders for the rag_embeddings table. + +All queries use $N positional parameters (asyncpg style). +The ``embedding`` column is ``vector(1536)`` — values are passed as Python +lists of floats thanks to the codec registered in ``app/database/__init__.py``. +""" + +from __future__ import annotations + +from typing import Any, List, Tuple + +# --------------------------------------------------------------------------- +# Upsert (insert or replace on chunk_index conflict) +# --------------------------------------------------------------------------- + +UPSERT_CHUNKS_QUERY = """ +INSERT INTO rag_embeddings + (merchant_id, template_id, chunk_index, source_file, chunk_text, embedding, indexed_at) +VALUES + ($1, $2, $3, $4, $5, $6::vector, now()) +ON CONFLICT (merchant_id, template_id, chunk_index) +DO UPDATE SET + source_file = EXCLUDED.source_file, + chunk_text = EXCLUDED.chunk_text, + embedding = EXCLUDED.embedding, + indexed_at = now() +""" + + +def upsert_chunks_args( + merchant_id: str, + template_id: str, + chunks: List[Tuple[int, str, str, List[float]]], +) -> List[Tuple[Any, ...]]: + """Return a list of row tuples ready for ``executemany``. + + Args: + merchant_id: Merchant identifier. + template_id: Template UUID. + chunks: List of (chunk_index, source_file, chunk_text, embedding_list). + + Returns: + List of 6-tuples: (merchant_id, template_id, chunk_index, source_file, + chunk_text, embedding). + """ + return [ + (merchant_id, template_id, idx, source, text, emb) + for idx, source, text, emb in chunks + ] + + +# --------------------------------------------------------------------------- +# Delete stale chunks after a re-index +# Removes any rows whose chunk_index is beyond the new chunk count. +# --------------------------------------------------------------------------- + +DELETE_STALE_CHUNKS_QUERY = """ +DELETE FROM rag_embeddings +WHERE merchant_id = $1 + AND template_id = $2 + AND chunk_index > $3 +""" + + +# --------------------------------------------------------------------------- +# Similarity search +# --------------------------------------------------------------------------- + +SEARCH_QUERY = """ +SELECT + chunk_text, + source_file, + chunk_index, + 1 - (embedding <=> $1::vector) AS score +FROM rag_embeddings +WHERE merchant_id = $2 + AND template_id = $3 +ORDER BY embedding <=> $1::vector +LIMIT $4 +""" + + +# --------------------------------------------------------------------------- +# Status / metadata queries — single round-trip +# --------------------------------------------------------------------------- + +GET_KB_STATS_QUERY = """ +SELECT + COUNT(*) AS chunk_count, + COUNT(DISTINCT source_file) AS file_count, + MAX(indexed_at) AS last_indexed_at +FROM rag_embeddings +WHERE merchant_id = $1 + AND template_id = $2 +""" + + +# --------------------------------------------------------------------------- +# Delete all chunks for a knowledge base (used by invalidate endpoint) +# --------------------------------------------------------------------------- + +DELETE_ALL_CHUNKS_QUERY = """ +DELETE FROM rag_embeddings +WHERE merchant_id = $1 + AND template_id = $2 +""" diff --git a/gigaAi/docs b/gigaAi/docs new file mode 160000 index 000000000..2d3984e5f --- /dev/null +++ b/gigaAi/docs @@ -0,0 +1 @@ +Subproject commit 2d3984e5f6d9df2f997a3d8ef189a628c2dc4d34 diff --git a/pyproject.toml b/pyproject.toml index e798b5e3d..b80127255 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,8 @@ dependencies = [ "soundfile", "redis[hiredis]>=5.0.0", "nltk", + "faiss-cpu>=1.7.4", + "pdfminer.six>=20251230", ] [project.optional-dependencies] diff --git a/uv.lock b/uv.lock index 249ee2da8..3539bf7c7 100644 --- a/uv.lock +++ b/uv.lock @@ -720,6 +720,7 @@ dependencies = [ { name = "bcrypt" }, { name = "boto3" }, { name = "cryptography" }, + { name = "faiss-cpu" }, { name = "fastapi" }, { name = "google-auth" }, { name = "google-cloud-storage", version = "3.4.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.14'" }, @@ -730,6 +731,7 @@ dependencies = [ { name = "opentelemetry-api" }, { name = "opentelemetry-exporter-otlp-proto-http" }, { name = "opentelemetry-sdk" }, + { name = "pdfminer-six" }, { name = "pipecat-ai", extra = ["aic", "anthropic", "assemblyai", "azure", "cartesia", "daily", "deepgram", "elevenlabs", "google", "mcp", "openai", "sarvam", "silero", "soniox"] }, { name = "pipecat-ai-flows" }, { name = "plivo" }, @@ -763,6 +765,7 @@ requires-dist = [ { name = "black", marker = "extra == 'dev'" }, { name = "boto3" }, { name = "cryptography", specifier = ">=44.0.1" }, + { name = "faiss-cpu", specifier = ">=1.7.4" }, { name = "fastapi", specifier = "==0.115.12" }, { name = "google-auth", specifier = ">=2.17.0" }, { name = "google-cloud-storage", specifier = ">=2.10.0" }, @@ -773,6 +776,7 @@ requires-dist = [ { name = "opentelemetry-api" }, { name = "opentelemetry-exporter-otlp-proto-http" }, { name = "opentelemetry-sdk" }, + { name = "pdfminer-six", specifier = ">=20251230" }, { name = "pipecat-ai", extras = ["daily", "google", "assemblyai", "silero", "openai", "azure", "elevenlabs", "aic", "anthropic", "deepgram", "soniox", "mcp", "sarvam", "cartesia"] }, { name = "pipecat-ai-flows" }, { name = "plivo" }, @@ -965,6 +969,31 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/55/e2/2537ebcff11c1ee1ff17d8d0b6f4db75873e3b0fb32c2d4a2ee31ecb310a/docstring_parser-0.17.0-py3-none-any.whl", hash = "sha256:cf2569abd23dce8099b300f9b4fa8191e9582dda731fd533daf54c4551658708", size = 36896, upload-time = "2025-07-21T07:35:00.684Z" }, ] +[[package]] +name = "faiss-cpu" +version = "1.13.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "numpy" }, + { name = "packaging" }, +] +wheels = [ + { url = "https://files.pythonhosted.org/packages/07/c9/671f66f6b31ec48e5825d36435f0cb91189fa8bb6b50724029dbff4ca83c/faiss_cpu-1.13.2-cp310-abi3-macosx_14_0_arm64.whl", hash = "sha256:a9064eb34f8f64438dd5b95c8f03a780b1a3f0b99c46eeacb1f0b5d15fc02dc1", size = 3452776, upload-time = "2025-12-24T10:27:01.419Z" }, + { url = "https://files.pythonhosted.org/packages/5a/4a/97150aa1582fb9c2bca95bd8fc37f27d3b470acec6f0a6833844b21e4b40/faiss_cpu-1.13.2-cp310-abi3-macosx_14_0_x86_64.whl", hash = "sha256:c8d097884521e1ecaea6467aeebbf1aa56ee4a36350b48b2ca6b39366565c317", size = 7896434, upload-time = "2025-12-24T10:27:03.592Z" }, + { url = "https://files.pythonhosted.org/packages/0b/d0/0940575f059591ca31b63a881058adb16a387020af1709dcb7669460115c/faiss_cpu-1.13.2-cp310-abi3-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0ee330a284042c2480f2e90450a10378fd95655d62220159b1408f59ee83ebf1", size = 11485825, upload-time = "2025-12-24T10:27:05.681Z" }, + { url = "https://files.pythonhosted.org/packages/e7/e1/a5acac02aa593809f0123539afe7b4aff61d1db149e7093239888c9053e1/faiss_cpu-1.13.2-cp310-abi3-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ab88ee287c25a119213153d033f7dd64c3ccec466ace267395872f554b648cd7", size = 23845772, upload-time = "2025-12-24T10:27:08.194Z" }, + { url = "https://files.pythonhosted.org/packages/9c/7b/49dcaf354834ec457e85ca769d50bc9b5f3003fab7c94a9dcf08cf742793/faiss_cpu-1.13.2-cp310-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:85511129b34f890d19c98b82a0cd5ffb27d89d1cec2ee41d2621ee9f9ef8cf3f", size = 13477567, upload-time = "2025-12-24T10:27:10.822Z" }, + { url = "https://files.pythonhosted.org/packages/f7/6b/12bb4037921c38bb2c0b4cfc213ca7e04bbbebbfea89b0b5746248ce446e/faiss_cpu-1.13.2-cp310-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:8b32eb4065bac352b52a9f5ae07223567fab0a976c7d05017c01c45a1c24264f", size = 25102239, upload-time = "2025-12-24T10:27:13.476Z" }, + { url = "https://files.pythonhosted.org/packages/14/6d/40439a05e4e60a0e889aa68b08ec70f5c8e32901f75f2be25c593a2e050e/faiss_cpu-1.13.2-cp311-cp311-win_amd64.whl", hash = "sha256:7c5944d7807d58fe7244b6aba06be710ee7ed99343365ed92699349efe979f51", size = 18879906, upload-time = "2025-12-24T10:27:19.041Z" }, + { url = "https://files.pythonhosted.org/packages/9f/f9/b97eadbdd9e00f945d1566c7101382344f504596bfb19219465b0fc61e6e/faiss_cpu-1.13.2-cp311-cp311-win_arm64.whl", hash = "sha256:19508a1badfb36e456c1c8664eeb948349f604db5c7545f277a0126b4a84b080", size = 8548280, upload-time = "2025-12-24T10:27:22.114Z" }, + { url = "https://files.pythonhosted.org/packages/87/ff/35ed875423200c17bdd594ce921abfc1812ddd21e09355290b9a94e170ab/faiss_cpu-1.13.2-cp312-cp312-win_amd64.whl", hash = "sha256:b82c01d30430dd7b1fa442001b9099735d1a82f6bb72033acdc9206d5ac66a64", size = 18890300, upload-time = "2025-12-24T10:27:24.194Z" }, + { url = "https://files.pythonhosted.org/packages/c5/3a/bbdf5deaf6feb34b46b469c0a0acd40216c3d3c6ecf5aeb71d56b8a650e3/faiss_cpu-1.13.2-cp312-cp312-win_arm64.whl", hash = "sha256:2c4f696ae76e7c97cbc12311db83aaf1e7f4f7be06a3ffea7e5b0e8ec1fd805b", size = 8553157, upload-time = "2025-12-24T10:27:26.38Z" }, + { url = "https://files.pythonhosted.org/packages/60/4b/903d85bf3a8264d49964ec799e45c7ffc91098606b8bc9ef2c904c1a56cb/faiss_cpu-1.13.2-cp313-cp313-win_amd64.whl", hash = "sha256:cb4b5ee184816a4b099162ac93c0d7f0033d81a88e7c1291d0a9cc41ec348984", size = 18891330, upload-time = "2025-12-24T10:27:28.806Z" }, + { url = "https://files.pythonhosted.org/packages/b2/52/5d10642da628f63544aab27e48416be4a7ea25c6b81d8bd65016d8538b00/faiss_cpu-1.13.2-cp313-cp313-win_arm64.whl", hash = "sha256:1243967eeb2298791ff7f3683a4abd2100d7e6ec7542ca05c3b75d47a7f621e5", size = 8553088, upload-time = "2025-12-24T10:27:31.325Z" }, + { url = "https://files.pythonhosted.org/packages/b0/b1/daaab8046f56c60079648bd83774f61b283b59a9930a2f60790ee4cdedfe/faiss_cpu-1.13.2-cp314-cp314-win_amd64.whl", hash = "sha256:c8b645e7d56591aa35dc75415bb53a62e4a494dba010e16f4b67daeffd830bd7", size = 18892621, upload-time = "2025-12-24T10:27:33.923Z" }, + { url = "https://files.pythonhosted.org/packages/06/6f/5eaf3e249c636e616ebb52e369a4a2f1d32b1caf9a611b4f917b3dd21423/faiss_cpu-1.13.2-cp314-cp314-win_arm64.whl", hash = "sha256:8113a2a80b59fe5653cf66f5c0f18be0a691825601a52a614c30beb1fca9bc7c", size = 8556374, upload-time = "2025-12-24T10:27:36.653Z" }, +] + [[package]] name = "fastapi" version = "0.115.12" @@ -2378,6 +2407,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ef/3c/2c197d226f9ea224a9ab8d197933f9da0ae0aac5b6e0f884e2b8d9c8e9f7/pathspec-1.0.4-py3-none-any.whl", hash = "sha256:fb6ae2fd4e7c921a165808a552060e722767cfa526f99ca5156ed2ce45a5c723", size = 55206, upload-time = "2026-01-27T03:59:45.137Z" }, ] +[[package]] +name = "pdfminer-six" +version = "20260107" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "charset-normalizer" }, + { name = "cryptography" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/34/a4/5cec1112009f0439a5ca6afa8ace321f0ab2f48da3255b7a1c8953014670/pdfminer_six-20260107.tar.gz", hash = "sha256:96bfd431e3577a55a0efd25676968ca4ce8fd5b53f14565f85716ff363889602", size = 8512094, upload-time = "2026-01-07T13:29:12.937Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/20/8b/28c4eaec9d6b036a52cb44720408f26b1a143ca9bce76cc19e8f5de00ab4/pdfminer_six-20260107-py3-none-any.whl", hash = "sha256:366585ba97e80dffa8f00cebe303d2f381884d8637af4ce422f1df3ef38111a9", size = 6592252, upload-time = "2026-01-07T13:29:10.742Z" }, +] + [[package]] name = "pillow" version = "11.3.0"