Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion app/ai/voice/agents/breeze_buddy/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@
from app.ai.voice.agents.breeze_buddy.observability.tracing_setup import (
create_root_span,
)
from app.ai.voice.agents.breeze_buddy.processors.rag_context import RagContextProcessor
from app.ai.voice.agents.breeze_buddy.services.inbound_policy import (
get_block_redirect,
)
from app.ai.voice.agents.breeze_buddy.services.rag import RagMemoryRouter
from app.ai.voice.agents.breeze_buddy.services.telephony.base_provider import (
VoiceCallProvider,
)
Expand All @@ -82,7 +84,11 @@
close_websocket_safely,
)
from app.ai.voice.agents.breeze_buddy.utils.warm_transfer import set_transfer_flag
from app.core.config.static import ENABLE_BREEZE_BUDDY_TRACING
from app.core.config.static import (
ENABLE_BREEZE_BUDDY_TRACING,
RAG_EMBEDDING_DEPLOYMENT,
RAG_ENABLED,
)
from app.core.logger import logger
from app.core.logger.context import (
clear_log_context,
Expand Down Expand Up @@ -163,6 +169,10 @@ def __init__(
# Error tracking
self.errors: List[Dict[str, Any]] = []

# RAG (Retrieval-Augmented Generation) — optional, per-call
self._rag_router: Optional[RagMemoryRouter] = None
self._rag_context_processor: Optional[RagContextProcessor] = None

@property
def is_daily_mode(self) -> bool:
return self.transport_type == TRANSPORT_TYPE_DAILY
Expand Down Expand Up @@ -707,6 +717,50 @@ async def _run_with_tracing(self, runner: PipelineRunner) -> None:
track_error(self.errors, error_msg)
self.root_span.end()

async def _init_rag(self) -> None:
"""Initialise RAG router and context processor if knowledge_base is configured."""
if not RAG_ENABLED:
logger.info("RAG disabled globally (RAG_ENABLED=false) — skipping init")
return
if not self.configurations or not self.template:
return
kb_config = getattr(self.configurations, "knowledge_base", None)
if not kb_config:
return
Comment on lines +725 to +729
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_init_rag() doesn’t check the global RAG_ENABLED switch, so when RAG is disabled it will still attempt to build/start the router and may log errors if embedding env vars/DB aren’t configured. Consider short-circuiting here when RAG_ENABLED is false to avoid noisy logs and unnecessary work.

Copilot uses AI. Check for mistakes.

merchant_id = self.template.merchant_id or "default"
template_id = self.template.id

try:
self._rag_router = await RagMemoryRouter.build(
kb_config=kb_config,
merchant_id=merchant_id,
template_id=template_id,
azure_embedding_deployment=RAG_EMBEDDING_DEPLOYMENT,
)
await self._rag_router.start()
self._rag_context_processor = RagContextProcessor(self._rag_router)
logger.info(
"RAG initialised: gs://.../%s/%s",
merchant_id,
template_id,
)
except Exception as exc:
logger.error("RAG initialisation failed (proceeding without RAG): %s", exc)
self._rag_router = None
self._rag_context_processor = None
Comment on lines +720 to +751
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

_init_rag ignores the global RAG_ENABLED master switch.

The PR adds RAG_ENABLED to app/core/config/static.py as a documented "master switch ... to disable RAG globally without changing templates", and the knowledge-base API router enforces it via _require_rag_enabled(). However, the per-call agent path here only checks configurations.knowledge_base — when RAG_ENABLED=false, any template that has knowledge_base configured will still build a RagMemoryRouter, hit Azure embeddings, and query pgvector on every turn. The kill-switch becomes effectively half-disabled (admin endpoints off, runtime on).

🔧 Suggested fix
+from app.core.config.static import (
+    ENABLE_BREEZE_BUDDY_TRACING,
+    RAG_EMBEDDING_DEPLOYMENT,
+    RAG_ENABLED,
+)
@@
     async def _init_rag(self) -> None:
         """Initialise RAG router and context processor if knowledge_base is configured."""
+        if not RAG_ENABLED:
+            return
         if not self.configurations or not self.template:
             return
🧰 Tools
🪛 Ruff (0.15.11)

[warning] 741-741: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/ai/voice/agents/breeze_buddy/agent/__init__.py` around lines 716 - 744,
The _init_rag method currently builds RagMemoryRouter whenever
configurations.knowledge_base exists; update _init_rag to first check the global
RAG_ENABLED master switch (import/inspect RAG_ENABLED) and return early if it is
False, so no RagMemoryRouter is constructed and both self._rag_router and
self._rag_context_processor remain None; specifically, add a guard at the start
of _init_rag that consults RAG_ENABLED before consulting
configurations.knowledge_base to prevent Azure embeddings/pgvector calls when
RAG is globally disabled.


async def _stop_rag(self) -> None:
"""Stop and clean up the RAG router."""
if self._rag_router is not None:
try:
await self._rag_router.stop()
except Exception as exc:
logger.warning("RAG stop error: %s", exc)
finally:
self._rag_router = None
self._rag_context_processor = None

async def run(self, runner_args: Optional[RunnerArguments] = None) -> None:
"""Main entry point for running the agent.

Expand Down Expand Up @@ -743,6 +797,10 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None:
# VAD analyzer is passed to build_pipeline where it's configured inside the
# LLMUserAggregator. This enables UserTurnStrategies (VAD + Transcription fallback).
stt, llm, tts = await create_services(self.configurations)

# Initialise RAG (non-blocking on failure)
await self._init_rag()

(
pipeline,
self.context,
Expand All @@ -757,6 +815,7 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None:
self.vad_analyzer,
self.configurations,
on_user_idle_timeout=self._handle_user_idle_timeout,
rag_context_processor=self._rag_context_processor,
)

# Store callback handler for resetting retry count on user activity
Expand Down Expand Up @@ -816,6 +875,8 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None:
except asyncio.CancelledError:
logger.info("Pipeline task cancelled. Exiting gracefully.")
finally:
# Stop RAG router (logs final metrics, cancels background tasks)
await self._stop_rag()
# Safety net: always clear log context when run() exits, regardless of how.
# This handles crashes, cancellations, and any exit path missed above.
# Double-clearing (if end_conversation already cleared) is harmless.
Expand Down
22 changes: 21 additions & 1 deletion app/ai/voice/agents/breeze_buddy/agent/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from app.ai.voice.agents.breeze_buddy.llm import get_llm_service
from app.ai.voice.agents.breeze_buddy.observability.tracing_setup import setup_tracing
from app.ai.voice.agents.breeze_buddy.processors import (
RagContextProcessor,
TranscriptionGateProcessor,
UserIdleCallbackHandler,
create_user_idle_processor,
Expand Down Expand Up @@ -169,6 +170,7 @@ async def build_pipeline(
vad_analyzer: Optional[SileroVADAnalyzer] = None,
configurations: Optional[ConfigurationModel] = None,
on_user_idle_timeout: Optional[Callable[[int], Any]] = None,
rag_context_processor: Optional[RagContextProcessor] = None,
) -> tuple[
Pipeline,
LLMContext,
Expand Down Expand Up @@ -360,7 +362,7 @@ async def build_pipeline(
# Store reference to user aggregator for position lookup
user_aggregator = context_aggregator.user()

# Order: stt → transcription_gate → user_aggregator → llm → tts
# Order: stt → transcription_gate → user_aggregator → [rag_context] → llm → tts
# Pipecat's LLMUserAggregator natively handles interruptions via
# UserTurnStrategies — no custom response gate needed.
# Note: RTVIProcessor is added automatically by PipelineTask (pipecat v0.0.102+)
Expand All @@ -376,6 +378,24 @@ async def build_pipeline(
context_aggregator.assistant(),
]

# Insert RAG context processor between user_aggregator and llm.
# It must sit after user_aggregator so it can intercept the LLMContextFrame
# (which carries the assembled context) and inject knowledge ephemerally
# before the LLM sees it. Placing it before user_aggregator would mean it
# only sees raw TranscriptionFrames and has no access to the LLMContext object.
if rag_context_processor is not None:
try:
user_aggregator_idx = pipeline_parts.index(user_aggregator)
pipeline_parts.insert(user_aggregator_idx + 1, rag_context_processor)
logger.info(
"RAG context processor inserted into pipeline (after user_aggregator)"
)
except ValueError as e:
logger.error(
"Failed to insert RAG context processor into pipeline: %s. RAG disabled.",
e,
)

# Insert user idle processor before user_aggregator to monitor user activity
if user_idle:
try:
Expand Down
4 changes: 4 additions & 0 deletions app/ai/voice/agents/breeze_buddy/processors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""Breeze Buddy custom processors for pipeline control."""

from app.ai.voice.agents.breeze_buddy.processors.rag_context import (
RagContextProcessor,
)
from app.ai.voice.agents.breeze_buddy.processors.transcription_gate import (
TranscriptionGateProcessor,
)
Expand All @@ -9,6 +12,7 @@
)

__all__ = [
"RagContextProcessor",
"TranscriptionGateProcessor",
"UserIdleCallbackHandler",
"create_user_idle_processor",
Expand Down
197 changes: 197 additions & 0 deletions app/ai/voice/agents/breeze_buddy/processors/rag_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
"""
RAG Context Processor for Breeze Buddy.

Sits **after** the user_aggregator in the pipeline. When the user_aggregator
pushes a ``LLMContextFrame`` downstream (i.e. the user's turn has been
aggregated and the context is ready to be sent to the LLM), this processor:

1. Extracts the latest user utterance directly from the ``LLMContextFrame``
(the last ``user`` role message already assembled by ``user_aggregator``).
2. Embeds it and fetches relevant knowledge from the ``RagMemoryRouter``
2. If knowledge is found it is injected **directly into the LLMContext object**
as a temporary ``system`` message — before the context reaches the LLM.
3. After the LLM finishes responding (``LLMFullResponseEndFrame``), the
injected message is **removed from the context** so it does not pollute the
persistent conversation history and does not confuse subsequent turns.

Why this approach:
- The knowledge never touches the user's ``TranscriptionFrame`` text.
- The knowledge is not a persistent instruction — it disappears after one turn.
- Because the LLM sees a plain ``system`` message with *only* factual content
(no "do this / don't do that" meta-instructions), the node's own task/role
messages remain the sole source of behavioural guidance. The LLM answers the
side question from the facts and then continues its current task.

Pipeline position::

transport.input()
→ stt
→ TranscriptionGateProcessor
→ user_aggregator ← produces LLMContextFrame
→ RagContextProcessor ← here: enriches context ephemerally
→ llm
→ tts
→ transport.output()
→ context_aggregator.assistant()
"""

from __future__ import annotations

from typing import Any, Dict, Optional

from pipecat.frames.frames import (
Frame,
LLMContextFrame,
LLMFullResponseEndFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor

from app.ai.voice.agents.breeze_buddy.services.rag.memory_router import RagMemoryRouter
from app.core.config.static import RAG_MAX_CONTEXT_CHARS
from app.core.logger import logger

# Role used for the ephemeral RAG message — must match what we look for on cleanup
_RAG_ROLE = "system"
_RAG_MARKER = "[RAG]"


class RagContextProcessor(FrameProcessor):
"""Ephemerally enriches the LLM context with RAG knowledge on each user turn.

The retrieved knowledge is injected into the ``LLMContext`` object directly
before it reaches the LLM and removed immediately after the LLM responds.
This keeps the persistent conversation history clean and avoids competing
with the node's own task/role messages.

Args:
rag_router: Fully initialised ``RagMemoryRouter`` for this call.
"""

def __init__(self, rag_router: RagMemoryRouter) -> None:
super().__init__()
self._router = rag_router
# The LLMContext we injected into (kept for cleanup after LLM responds)
self._injected_context: Optional[OpenAILLMContext] = None

# ------------------------------------------------------------------
# Frame processing
# ------------------------------------------------------------------

async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
await super().process_frame(frame, direction)

if isinstance(frame, LLMContextFrame):
await self._enrich_and_forward(frame, direction)
return

if isinstance(frame, LLMFullResponseEndFrame):
# LLM finished — remove the ephemeral RAG message from context
self._cleanup_rag_message()
await self.push_frame(frame, direction)
return

await self.push_frame(frame, direction)

# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------

async def _enrich_and_forward(
self, frame: LLMContextFrame, direction: FrameDirection
) -> None:
"""Fetch RAG context and inject it into the LLMContext before forwarding."""
llm_context: OpenAILLMContext = frame.context # type: ignore[assignment]

# Extract the latest user utterance from the context messages
utterance = ""
messages = llm_context.get_messages() or []
for msg in reversed(messages):
if msg.get("role") == "user":
content = msg.get("content", "")
if isinstance(content, str):
utterance = content.strip()
elif isinstance(content, list):
# OpenAI multi-part content: extract text parts
parts = [
p.get("text", "")
for p in content
if isinstance(p, dict) and p.get("type") == "text"
]
utterance = " ".join(parts).strip()
break

if not utterance:
await self.push_frame(frame, direction)
return

# Retrieve knowledge (cache hit ≈ 0.1 ms, miss ≈ 50–150 ms)
try:
context = await self._router.get_context(utterance)
except Exception as exc:
logger.warning("RagContextProcessor: context retrieval failed: %s", exc)
await self.push_frame(frame, direction)
return

if not context:
await self.push_frame(frame, direction)
return

# Trim to prevent prompt bloat
if len(context) > RAG_MAX_CONTEXT_CHARS:
context = context[:RAG_MAX_CONTEXT_CHARS].rsplit("\n", 1)[0]

# Inject directly into the LLMContext object as an ephemeral system msg.
# The framing instruction tells the LLM to use the retrieved facts to
# answer the user's question. The marker prefix lets us find and remove
# it on cleanup.
rag_message: Dict[str, Any] = {
"role": _RAG_ROLE,
"content": (
f"{_RAG_MARKER} The following are relevant facts from the knowledge base. "
"Use them to answer the user's question.\n\n"
f"{context}"
),
}

llm_context.add_message(rag_message) # type: ignore[arg-type]
self._injected_context = llm_context

logger.debug(
"RagContextProcessor: injected %d chars of RAG context for '%s…'",
len(context),
utterance[:40],
)

await self.push_frame(frame, direction)

def _cleanup_rag_message(self) -> None:
"""Remove the ephemeral RAG system message from the LLMContext."""
if self._injected_context is None:
return

messages = self._injected_context.get_messages()
if not messages:
self._injected_context = None
return

original_len = len(messages)
filtered = []
for m in messages:
content = m.get("content")
is_rag_msg = (
m.get("role") == _RAG_ROLE
and isinstance(content, str)
and content.startswith(_RAG_MARKER)
)
if not is_rag_msg:
filtered.append(m)

if len(filtered) < original_len:
self._injected_context.set_messages(filtered)
logger.debug(
"RagContextProcessor: cleaned up %d ephemeral RAG message(s)",
original_len - len(filtered),
)

self._injected_context = None
Loading
Loading