RAG Integration <> Buddy#725
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughThis PR introduces comprehensive RAG (Retrieval-Augmented Generation) infrastructure to Breeze Buddy, enabling context-aware LLM responses. It adds per-call RAG initialization via RagMemoryRouter, integrating semantic caching, Azure embeddings, pgvector indexing, document ingestion from GCS, background prediction (SlowThinker), low-latency retrieval (FastTalker), and API endpoints for knowledge base management, along with database schema, configuration constants, and test/seed scripts. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Agent as Breeze Buddy<br/>Agent
participant RagMemoryRouter
participant FastTalker
participant SlowThinker
participant SemanticCache
participant VectorStore
participant EmbeddingProvider
participant LLM
User->>Agent: User utterance
par FastPath
Agent->>RagMemoryRouter: get_context(utterance)
RagMemoryRouter->>EmbeddingProvider: embed(utterance)
EmbeddingProvider-->>RagMemoryRouter: embedding
RagMemoryRouter->>FastTalker: get_context(query_embedding)
FastTalker->>SemanticCache: get(embedding, top_k)
alt Cache Hit
SemanticCache-->>FastTalker: cached_contexts
else Cache Miss
FastTalker->>VectorStore: search(embedding, top_k)
VectorStore-->>FastTalker: search_results
FastTalker->>SemanticCache: put(embedding, results)
end
FastTalker-->>RagMemoryRouter: formatted_context
RagMemoryRouter-->>Agent: context
and BackgroundPath
Agent->>SlowThinker: on_user_utterance(utterance)
SlowThinker->>LLM: generate_search_phrases(utterance)
LLM-->>SlowThinker: predicted_phrases
SlowThinker->>EmbeddingProvider: embed_batch([utterance, phrases...])
EmbeddingProvider-->>SlowThinker: embeddings
SlowThinker->>VectorStore: search_batch(embeddings)
VectorStore-->>SlowThinker: results
SlowThinker->>SemanticCache: put_batch(results)
end
Agent->>Agent: inject_rag_context(context)
Agent->>LLM: forward with RAG context
LLM->>LLM: generate response
LLM-->>Agent: response
Agent->>RagMemoryRouter: record_assistant_response(text)
Agent->>User: speak response
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
b71d6eb to
49fd62e
Compare
There was a problem hiding this comment.
Pull request overview
Adds a Breeze Buddy Retrieval-Augmented Generation (RAG) pipeline backed by pgvector in Postgres, plus admin endpoints and local/dev scripts to upload/index/query a template knowledge base.
Changes:
- Introduces pgvector-backed KB storage (
rag_embeddings) with migration, query builders, and DB accessor. - Adds Breeze Buddy RAG runtime components (embedding provider, GCS loader + chunker, pgvector store, FAISS semantic cache, SlowThinker/FastTalker, RagMemoryRouter) and pipeline integration via
RagContextProcessor. - Adds Breeze Buddy admin API endpoints for KB upload/index/status/invalidate, plus dev scripts and new deps (
faiss-cpu,pdfminer.six).
Reviewed changes
Copilot reviewed 24 out of 25 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| uv.lock | Locks new dependencies needed for FAISS cache + PDF ingestion. |
| pyproject.toml | Adds faiss-cpu and pdfminer.six runtime deps. |
| scripts/test_rag_local.py | Local smoke test for semantic cache + FastTalker cache-hit path. |
| scripts/seed_rag_freshbus.py | Local dev seed script to embed and upsert sample KB data. |
| app/database/migrations/023_create_rag_embeddings_table.sql | Creates rag_embeddings table and pgvector indexes. |
| app/database/queries/breeze_buddy/rag_embeddings.py | Adds SQL query strings for KB upsert/search/stats/delete. |
| app/database/accessor/breeze_buddy/rag_embeddings.py | Implements KB upsert/search/stats/delete via asyncpg. |
| app/database/init.py | Adds get_pool() and registers pgvector codec during pool init. |
| app/core/config/static.py | Adds RAG env vars (RAG_GCS_BUCKET, embedding config, RAG_ENABLED). |
| app/api/routers/breeze_buddy/knowledge_base/init.py | Adds KB management endpoints (upload/index/status/invalidate). |
| app/api/routers/breeze_buddy/init.py | Registers the new knowledge-base router. |
| app/ai/voice/agents/breeze_buddy/template/types.py | Adds knowledge_base config to template configuration model. |
| app/ai/voice/agents/breeze_buddy/services/rag/* | Implements RAG runtime (chunking, embeddings, retrieval, caching, orchestration). |
| app/ai/voice/agents/breeze_buddy/processors/rag_context.py | Injects RAG context ephemerally into LLMContextFrame. |
| app/ai/voice/agents/breeze_buddy/processors/init.py | Exports RagContextProcessor. |
| app/ai/voice/agents/breeze_buddy/agent/pipeline.py | Inserts RagContextProcessor after user_aggregator. |
| app/ai/voice/agents/breeze_buddy/agent/init.py | Initializes/stops RAG per call when template enables it. |
| """Number of indexed chunks in the knowledge base (cached from last refresh).""" | ||
| return self._fast_talker._store.size # type: ignore[attr-defined] | ||
|
|
There was a problem hiding this comment.
knowledge_base_size reads self._fast_talker._store.size, but PgVectorStore doesn’t define a size attribute/property (it’s a stateless DB-backed store). This will raise at runtime if accessed. Consider removing this property, or implement it via get_cached_index_stats() / get_kb_stats() instead of reaching into a private field.
| """Number of indexed chunks in the knowledge base (cached from last refresh).""" | |
| return self._fast_talker._store.size # type: ignore[attr-defined] | |
| """Number of indexed chunks in the knowledge base, when available.""" | |
| stats_getter = getattr(self._fast_talker, "get_cached_index_stats", None) | |
| if callable(stats_getter): | |
| try: | |
| stats = stats_getter() | |
| except Exception: | |
| logger.debug( | |
| "Failed to read cached index stats for knowledge base size", | |
| exc_info=True, | |
| ) | |
| else: | |
| if isinstance(stats, dict): | |
| for key in ( | |
| "size", | |
| "kb_size", | |
| "knowledge_base_size", | |
| "indexed_chunks", | |
| "total_chunks", | |
| ): | |
| value = stats.get(key) | |
| if value is not None: | |
| try: | |
| return int(value) | |
| except (TypeError, ValueError): | |
| break | |
| kb_stats_getter = getattr(self._fast_talker, "get_kb_stats", None) | |
| if callable(kb_stats_getter): | |
| try: | |
| stats = kb_stats_getter() | |
| except Exception: | |
| logger.debug( | |
| "Failed to read knowledge base stats for knowledge base size", | |
| exc_info=True, | |
| ) | |
| else: | |
| if isinstance(stats, dict): | |
| for key in ( | |
| "size", | |
| "kb_size", | |
| "knowledge_base_size", | |
| "indexed_chunks", | |
| "total_chunks", | |
| ): | |
| value = stats.get(key) | |
| if value is not None: | |
| try: | |
| return int(value) | |
| except (TypeError, ValueError): | |
| break | |
| store = getattr(self._fast_talker, "_store", None) | |
| size = getattr(store, "size", None) | |
| if size is not None: | |
| try: | |
| return int(size) | |
| except (TypeError, ValueError): | |
| logger.debug( | |
| "Store size was present but could not be converted to int", | |
| exc_info=True, | |
| ) | |
| return 0 |
| (reader). Documents are stored keyed by their *own* embedding vector – not the | ||
| query that retrieved them – so a wide variety of user phrasing can hit the same | ||
| pre-fetched chunk. |
There was a problem hiding this comment.
This module-level docstring says cache entries are keyed by the document’s embedding (not the retrieval query), but the implementation stores/searches vectors using the provided query_embedding (see put() / _put_locked() usage and callers in FastTalker/SlowThinker). Please align the docstring with the actual keying strategy, or update the implementation if the intent is to key by chunk embeddings.
| (reader). Documents are stored keyed by their *own* embedding vector – not the | |
| query that retrieved them – so a wide variety of user phrasing can hit the same | |
| pre-fetched chunk. | |
| (reader). Entries are indexed by the embedding of the retrieval query used when | |
| they were cached, and the cached value is the pre-fetched chunk/context | |
| associated with that query. This lets semantically similar future queries reuse | |
| the same prefetched result. |
| asyncpg does not know the ``vector`` type by default. We register a | ||
| text codec so that: | ||
| - Python lists / numpy arrays are sent as the text representation | ||
| ``'[0.1, -0.2, ...]'`` that pgvector accepts. | ||
| - Values read back from the DB are returned as Python lists of floats. | ||
|
|
||
| This is called via ``init`` in ``create_pool`` so it runs for every | ||
| connection in the pool automatically. | ||
| """ | ||
| await conn.execute("CREATE EXTENSION IF NOT EXISTS vector") | ||
| # Fetch the OID of the vector type (created by the extension above) |
There was a problem hiding this comment.
_init_vector_codec() runs CREATE EXTENSION IF NOT EXISTS vector during pool connection initialization. This is redundant with migration 023 and can fail in environments where the app DB role lacks CREATE EXTENSION, which would prevent pool initialization. Consider removing this DDL (rely on migrations) or catching/handling failures so the app can still start (with RAG disabled) when the extension can’t be created.
| asyncpg does not know the ``vector`` type by default. We register a | |
| text codec so that: | |
| - Python lists / numpy arrays are sent as the text representation | |
| ``'[0.1, -0.2, ...]'`` that pgvector accepts. | |
| - Values read back from the DB are returned as Python lists of floats. | |
| This is called via ``init`` in ``create_pool`` so it runs for every | |
| connection in the pool automatically. | |
| """ | |
| await conn.execute("CREATE EXTENSION IF NOT EXISTS vector") | |
| # Fetch the OID of the vector type (created by the extension above) | |
| asyncpg does not know the ``vector`` type by default. We register a | |
| text codec so that: | |
| - Python lists / numpy arrays are sent as the text representation | |
| ``'[0.1, -0.2, ...]'`` that pgvector accepts. | |
| - Values read back from the DB are returned as Python lists of floats. | |
| This is called via ``init`` in ``create_pool`` so it runs for every | |
| connection in the pool automatically. The pgvector extension is expected | |
| to be installed by migrations; if it is unavailable, codec registration | |
| is skipped and vector search remains disabled. | |
| """ | |
| # Fetch the OID of the vector type. The extension should already exist | |
| # from migrations; avoid running privileged DDL during pool init. |
| kb_config.prediction_llm_api_key or azure_api_key or AZURE_OPENAI_API_KEY | ||
| ) | ||
| pred_endpoint = ( | ||
| kb_config.prediction_llm_endpoint or azure_endpoint or AZURE_OPENAI_ENDPOINT | ||
| ) |
There was a problem hiding this comment.
pred_api_key / pred_endpoint can end up as empty strings (e.g., missing AZURE_OPENAI_* and no per-template overrides). In that case _build_llm_client() will still create an AzureOpenAI client, and SlowThinker will repeatedly error and fall back. Consider treating empty credentials as “not configured” and returning None so SlowThinker uses the keyword fallback without noisy failures.
| user_aggregator = context_aggregator.user() | ||
|
|
||
| # Order: stt → transcription_gate → user_aggregator → llm → tts | ||
| # Order: stt → transcription_gate → [rag_context] → user_aggregator → llm → tts |
There was a problem hiding this comment.
The pipeline order comment is inconsistent with the actual insertion logic: rag_context_processor is inserted after user_aggregator (between user_aggregator and llm), but the comment states it runs before user_aggregator. Please update the comment to reflect the real order so future pipeline changes don’t get made based on incorrect documentation.
| # Order: stt → transcription_gate → [rag_context] → user_aggregator → llm → tts | |
| # Order: stt → transcription_gate → user_aggregator → [rag_context] → llm → tts |
| POST /knowledge-base/upload - Upload a file into the template's GCS folder | ||
| POST /knowledge-base/index - Trigger (re-)indexing for a template | ||
| GET /knowledge-base/status - Index stats for a template | ||
| DELETE /knowledge-base/invalidate - Expire the in-process index cache |
There was a problem hiding this comment.
The knowledge-base router’s header comment says DELETE /knowledge-base/invalidate “Expire the in-process index cache”, but the implementation deletes rows from rag_embeddings (pgvector), and the index manager states there is no in-process cache. Please update the description to reflect the actual behavior (deletes the pgvector knowledge base).
| DELETE /knowledge-base/invalidate - Expire the in-process index cache | |
| DELETE /knowledge-base/invalidate - Delete the template's pgvector knowledge base |
| if not self.configurations or not self.template: | ||
| return | ||
| kb_config = getattr(self.configurations, "knowledge_base", None) | ||
| if not kb_config: | ||
| return |
There was a problem hiding this comment.
_init_rag() doesn’t check the global RAG_ENABLED switch, so when RAG is disabled it will still attempt to build/start the router and may log errors if embedding env vars/DB aren’t configured. Consider short-circuiting here when RAG_ENABLED is false to avoid noisy logs and unnecessary work.
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Delete stale chunks after a re-index | ||
| # Removes any rows whose chunk_index is NOT in the new set. |
There was a problem hiding this comment.
The comment above DELETE_STALE_CHUNKS_QUERY says it removes rows whose chunk_index is “NOT in the new set”, but the query only deletes chunk_index > $3 (indices above the new max). Please adjust the comment to match the actual behavior to avoid confusion during future re-index logic changes.
| # Removes any rows whose chunk_index is NOT in the new set. | |
| # Removes any rows whose chunk_index is greater than the new maximum index. |
| if isinstance(frame, BotStoppedSpeakingFrame): | ||
| if self._last_bot_text: | ||
| self._router.record_assistant_response(self._last_bot_text) | ||
| self._last_bot_text = None | ||
|
|
||
| await self.push_frame(frame, direction) |
There was a problem hiding this comment.
RagContextProcessor attempts to record assistant responses on BotStoppedSpeakingFrame using _last_bot_text, but _last_bot_text is never set anywhere (the capture_bot_text() method is unused; no other code calls it). As a result, RagMemoryRouter’s conversation history will only contain user messages. Either wire capture_bot_text() into the LLM/TTS path, or remove this unused tracking to avoid misleading behavior.
There was a problem hiding this comment.
Actionable comments posted: 18
🧹 Nitpick comments (12)
app/ai/voice/agents/breeze_buddy/services/rag/types.py (1)
103-117: ConsiderSecretStrfor the API-key field.
prediction_llm_api_keyholds an Azure OpenAI credential. Elsewhere in the codebase (e.g.,HttpAuthConfig.token) keys/secrets use PydanticSecretStrso they don't leak viamodel_dump()/repr()/ log lines. Same treatment here would prevent accidental exposure if aKnowledgeBaseConfiginstance is serialized for debugging.-from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, SecretStr @@ - prediction_llm_api_key: Optional[str] = Field( + prediction_llm_api_key: Optional[SecretStr] = Field( None, description="Azure OpenAI API key used for Slow Thinker predictions. " "Defaults to the global AZURE_OPENAI_API_KEY env var.", )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/ai/voice/agents/breeze_buddy/services/rag/types.py` around lines 103 - 117, Change the prediction_llm_api_key field on the KnowledgeBaseConfig (in types.py) from Optional[str] to Optional[SecretStr] and import SecretStr from pydantic; keep the Field(...) description and default None, and update any downstream uses of prediction_llm_api_key (call sites that expect a str) to call .get_secret_value() when the raw string is needed so consumers don’t accidentally log or dump the secret.app/ai/voice/agents/breeze_buddy/services/rag/chunker.py (1)
127-145: Overlap effectively collapses between near-full-size chunks.When two adjacent raw chunks are both close to
chunk_size,candidate = prev_tail + " " + chunks[i]exceedschunk_size, and the truncationcandidate[-chunk_size:]keeps onlychunks[i](plus possibly 1–2 chars of overlap). So the actual cross-chunk overlap can be near zero in exactly the case where it matters most for retrieval recall.This isn't a correctness bug (no content is lost), but if cross-chunk context preservation is a stated goal, consider shrinking the chunks during recursive split to leave headroom for overlap (e.g., split using
chunk_size - chunk_overlapas the target), so adding overlap doesn't force a truncate.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/ai/voice/agents/breeze_buddy/services/rag/chunker.py` around lines 127 - 145, The current overlap logic in _apply_overlap can get truncated away when adjacent raw chunks are near chunk_size; fix by making the upstream chunking produce headroom by splitting with a target of (chunk_size - chunk_overlap) instead of chunk_size so adding prev_tail won't force truncation. Locate the function that produces the initial raw chunks (the recursive splitter used before _apply_overlap) and change its split target to max(1, chunk_size - chunk_overlap) or otherwise ensure non-negative headroom; keep _apply_overlap unchanged except optionally verifying chunk_overlap is clamped to a sensible value relative to chunk_size.app/ai/voice/agents/breeze_buddy/services/rag/gcs_loader.py (2)
34-41: Wrap credential JSON parsing in a clear error.If
GCS_CREDENTIALS_JSONis set but malformed,json.loads(creds_json)raises a genericJSONDecodeError, leaking the value's offset in the trace and giving operators a confusing failure mode. A small guard makes the failure mode explicit and avoids any chance of partial creds bleeding into error logs.- creds_dict = json.loads(creds_json) + try: + creds_dict = json.loads(creds_json) + except json.JSONDecodeError as exc: + raise RuntimeError( + "GCS_CREDENTIALS_JSON is set but is not valid JSON" + ) from exc🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/ai/voice/agents/breeze_buddy/services/rag/gcs_loader.py` around lines 34 - 41, Wrap the JSON parsing of creds_json in a try/except to catch json.JSONDecodeError and raise a clear, non-sensitive RuntimeError if parsing fails; specifically, around the json.loads(creds_json) call handle JSONDecodeError and raise a RuntimeError like "GCS_CREDENTIALS_JSON is present but malformed" (do not include the raw JSON or offset), then proceed to create credentials via service_account.Credentials.from_service_account_info(creds_dict) and return storage.Client as before (symbols: creds_json, GCS_CREDENTIALS_JSON, json.loads, service_account.Credentials.from_service_account_info, storage.Client).
90-174: The sync I/O pattern is already properly handled viaasyncio.to_thread().
load_gcs_documentsuses synchronous I/O (google-cloud-storagelist_blobs(),download_as_bytes(), and pdfminer extraction), which violates the guideline: "Use async/await for all I/O operations; do not use synchronous I/O." However, the only call site inindex_manager.py:84already wraps this function correctly:chunks = await asyncio.to_thread( load_gcs_documents, gcs_bucket, gcs_prefix, kb_config.extensions, kb_config.chunk_size, kb_config.chunk_overlap, )Since there is no other caller, the risk of accidental event-loop blocking is eliminated. Consider renaming the function to
load_gcs_documents_blocking()to make the synchronous nature explicit in the API, reducing future misuse potential (optional; not required given current single-caller pattern).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/ai/voice/agents/breeze_buddy/services/rag/gcs_loader.py` around lines 90 - 174, The function load_gcs_documents is synchronous and already used via asyncio.to_thread at its sole call site (the asyncio.to_thread(..., load_gcs_documents, ...) invocation), so make its blocking nature explicit by renaming load_gcs_documents to load_gcs_documents_blocking (update the definition and all references), and update the caller that invokes asyncio.to_thread to call load_gcs_documents_blocking; alternatively, if you prefer not to rename, add a clear docstring note on load_gcs_documents stating it is blocking and must be called via asyncio.to_thread (but prefer renaming to prevent accidental future misuse).app/database/migrations/023_create_rag_embeddings_table.sql (1)
48-54:idx_rag_embeddings_kbis redundant with the unique constraint.The
UNIQUE (merchant_id, template_id, chunk_index)constraint already creates a btree index whose leftmost prefix(merchant_id, template_id)is fully usable for tenant-filter queries.idx_rag_embeddings_kbadds storage and write cost on every upsert without serving a query that the unique index cannot. Same applies toidx_rag_embeddings_indexed_at's leading columns.Consider dropping
idx_rag_embeddings_kband keeping onlyidx_rag_embeddings_indexed_at(which extends the prefix withindexed_atand is therefore not redundant).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/database/migrations/023_create_rag_embeddings_table.sql` around lines 48 - 54, The idx_rag_embeddings_kb index is redundant because the existing UNIQUE (merchant_id, template_id, chunk_index) constraint already creates the same btree prefix; remove the CREATE INDEX IF NOT EXISTS idx_rag_embeddings_kb statement from the migration and leave the CREATE INDEX IF NOT EXISTS idx_rag_embeddings_indexed_at (which includes indexed_at and is not redundant). Ensure the UNIQUE constraint on rag_embeddings (merchant_id, template_id, chunk_index) remains present so tenant-prefix queries still use that index.app/ai/voice/agents/breeze_buddy/agent/__init__.py (1)
736-742: Misleading log message and inconsistent log formatting style.
- Line 737:
"RAG initialised: gs://.../%s/%s"interpolatesmerchant_idandtemplate_idinto a fakegs://URI. The actual bucket isRAG_GCS_BUCKET; the literal.../is misleading when grepping logs or debugging.- Lines 737, 742, 752: use
%s-style positional args. The rest of this file (and the project guideline "Use Loguru logging with contextvars") uses f-strings /{}placeholders. Loguru does support%svia*argsbut mixing styles within one file hurts readability.🔧 Suggested fix
- logger.info( - "RAG initialised: gs://.../%s/%s", - merchant_id, - template_id, - ) + logger.info( + f"RAG initialised for merchant_id={merchant_id}, " + f"template_id={template_id}" + ) except Exception as exc: - logger.error("RAG initialisation failed (proceeding without RAG): %s", exc) + logger.error( + f"RAG initialisation failed (proceeding without RAG): {exc}" + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/ai/voice/agents/breeze_buddy/agent/__init__.py` around lines 736 - 742, Update the misleading and inconsistent logs in the RAG init block: replace the fake "gs://.../%s/%s" message with the real bucket variable (RAG_GCS_BUCKET) and the merchant_id/template_id so the URI is accurate, and switch both logger.info and logger.error to the project's Loguru-style formatting (use f-strings or `{}` placeholders) instead of `%s`-style arguments; locate the logger calls around the RAG initialization in __init__.py (the logger.info(...) that references merchant_id and template_id and the except block logger.error(..., exc)) and format the error log to include the exception (exc) consistently with the chosen Loguru style.app/ai/voice/agents/breeze_buddy/services/rag/slow_thinker.py (1)
158-158: Nit: prefer iterable unpacking.Ruff (RUF005).
♻️ Proposed nit
- all_queries = [text] + predictions + all_queries = [text, *predictions]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/ai/voice/agents/breeze_buddy/services/rag/slow_thinker.py` at line 158, Replace the list concatenation with iterable unpacking: instead of constructing all_queries via [text] + predictions, build it using a single list literal with unpacking (e.g., [text, *predictions]) so the variable all_queries is created with iterable unpacking; update the assignment where all_queries is defined in slow_thinker.py (the use of text and predictions) to follow this pattern.scripts/seed_rag_freshbus.py (1)
38-47: Useget_required_env()fromapp/core/config/static.pyinstead ofos.environdirectly.These mandatory env vars should be loaded centrally to keep config sourcing consistent across the codebase. Even for seed scripts, this makes failures (missing env) emit the same canonical error and avoids drift with the production embedding/Postgres config wiring.
As per coding guidelines: "Load ALL configuration from
app/core/config/static.pyusingget_required_env()for mandatory variables; never import directly fromos.environelsewhere".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@scripts/seed_rag_freshbus.py` around lines 38 - 47, Replace all direct os.environ accesses for EMBEDDING_ENDPOINT, EMBEDDING_API_KEY, EMBEDDING_DEPLOYMENT, EMBEDDING_DIMENSION and the POSTGRES_* pieces that build POSTGRES_DSN with calls to get_required_env() from app/core/config/static.py; e.g. assign EMBEDDING_ENDPOINT = get_required_env("RAG_EMBEDDING_ENDPOINT"), wrap EMBEDDING_DIMENSION = int(get_required_env("RAG_EMBEDDING_DIMENSION")) (preserving the integer conversion), and construct POSTGRES_DSN by pulling POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB via get_required_env() instead of os.environ so missing vars produce the canonical error path.app/api/routers/breeze_buddy/knowledge_base/__init__.py (2)
222-307: Operational concern: 30–120 s blocking index call on a request thread.The docstring already calls out that this can take a couple of minutes. In practice that runs into ingress / gateway timeouts (most defaults are 30–60 s), and ties up an event-loop worker holding the embedding HTTP connection. If long indexing jobs are expected, prefer kicking off the build via
BackgroundTasks(or anasyncio.create_tasktracked at app level) and returning a202 Acceptedwith a job id; the existing/knowledge-base/statusendpoint already exposes progress vialast_indexed_at/chunk_count. "BackgroundTasks is intended more for shorter, low-compute tasks like triggering an email send or similar. For tasks involving multi-minute calls to subprocesses, I think long term you would benefit from putting in the effort to set up a celery worker and a task queue" — for ≤2 min cases starletteBackgroundTasksis usually sufficient.
171-180: Move the lazy imports up to module scope.
json,google.cloud.storage,service_account,EmbeddingProvider,build_knowledge_base,get_cached_index_stats,invalidate_index, andRAG_EMBEDDING_*are imported insidetry:blocks per request. Two side-effects:
- An
ImportError(e.g., misconfigured deployment withoutgoogle-cloud-storage) gets reported as a generic500 "Upload failed: No module named …"instead of failing fast at app startup.- Slight per-request overhead and harder static analysis.
If the lazy import is intentional (to avoid importing heavy GCP / openai at module load), keep it but wrap it in a separate
try/except ImportErrorreturning a503 Service Unavailableso genuine bugs in downstream code don't masquerade as missing-dependency errors.Also applies to: 244-255, 337-339, 392-394
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/api/routers/breeze_buddy/knowledge_base/__init__.py` around lines 171 - 180, Move the lazy imports (json, google.cloud.storage as storage, google.oauth2.service_account as service_account, EmbeddingProvider, build_knowledge_base, get_cached_index_stats, invalidate_index and RAG_EMBEDDING_* constants) out of per-request try blocks into module scope so ImportError surfaces at startup; if you must keep them lazy, wrap each import block in a try/except ImportError that raises/returns an HTTP 503 Service Unavailable with a clear message (e.g., "Dependency missing: google-cloud-storage") instead of letting the missing-module error propagate as a 500 during request handling; update the code locations using functions/classes like build_knowledge_base, EmbeddingProvider, get_cached_index_stats and invalidate_index to rely on the module-level imports (or the 503-wrapping import helper) to eliminate per-request overhead and improve fail-fast behavior.app/ai/voice/agents/breeze_buddy/services/rag/embeddings.py (1)
65-72: Add timeout and dimension validation to OpenAI embeddings client.Two small hardening tweaks worth considering:
- The OpenAI client is constructed without a
timeout=, so on Azure Foundry hiccups a single embed call can block itsto_threadworker for the SDK default (600 seconds). A bounded timeout (e.g., 15–30 s) keeps requests from accumulating stuck threads during outages.- After parsing
response.data, assertingarr.shape[1] == self._dimensionwould catch deployment/model mismatches early instead of letting a bad-shape vector reach pgvector (which will then fail thevector(1536)cast with a less obvious error).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/ai/voice/agents/breeze_buddy/services/rag/embeddings.py` around lines 65 - 72, The OpenAI client is created without a timeout and embeddings responses aren't validated against the expected dimension; update the OpenAI constructor call that sets self._client (the OpenAI(...) instantiation) to include a bounded timeout (e.g., timeout=20) to avoid long blocking during outages, and after you parse the embeddings into the numpy array (the variable `arr` derived from response.data) add an assertion or explicit check that arr.shape[1] == self._dimension (raise a clear error if not) so mismatched deployment/model dimensions are detected early before storing into pgvector; keep references to self._deployment, self._dimension and self._max_batch_size unchanged.app/ai/voice/agents/breeze_buddy/services/rag/memory_router.py (1)
316-319: Don't reach into_fast_talker._store; expose a public accessor instead.
self._fast_talker._store.sizepierces two layers of private state and requires a# type: ignore[attr-defined]. Add a publicsize/store_sizeproperty onFastTalker(or have it forwardPgVectorStore.size) and call that here so this property doesn't break the momentFastTalker's internals are renamed/refactored.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/ai/voice/agents/breeze_buddy/services/rag/memory_router.py` around lines 316 - 319, The knowledge_base_size property is accessing private internals via self._fast_talker._store.size; add a public accessor on FastTalker (e.g., a size or store_size property/method that forwards to its internal PgVectorStore.size) and update memory_router.py to call self._fast_talker.size (or store_size) instead of reaching into _store and dropping the type: ignore; reference FastTalker and the knowledge_base_size property when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/ai/voice/agents/breeze_buddy/agent/__init__.py`:
- Around line 716-744: The _init_rag method currently builds RagMemoryRouter
whenever configurations.knowledge_base exists; update _init_rag to first check
the global RAG_ENABLED master switch (import/inspect RAG_ENABLED) and return
early if it is False, so no RagMemoryRouter is constructed and both
self._rag_router and self._rag_context_processor remain None; specifically, add
a guard at the start of _init_rag that consults RAG_ENABLED before consulting
configurations.knowledge_base to prevent Azure embeddings/pgvector calls when
RAG is globally disabled.
In `@app/ai/voice/agents/breeze_buddy/processors/rag_context.py`:
- Around line 117-131: The code is reading and mutating the private _messages on
llm_context; update reads to use the public API (e.g.,
llm_context.get_messages() or llm_context.messages) and replace direct writes
with llm_context.set_messages(filtered_list); specifically, change the
getattr(llm_context, "_messages", None) usage to
llm_context.get_messages()/messages when extracting the last user utterance and
use llm_context.set_messages(...) instead of assigning to _messages when
cleaning up or updating messages (keep existing add_message(...) calls as they
are).
- Around line 98-103: capture_bot_text is never called, so _last_bot_text stays
None and record_assistant_response is never invoked when a
BotStoppedSpeakingFrame arrives; wire the LLM/TTS output path to call
processor.capture_bot_text(text) with the assistant's plain text before the
BotStoppedSpeakingFrame is pushed. Locate the LLM response handler or TTS
observer (the component that receives raw assistant text or TTS-ready payload)
and invoke capture_bot_text(...) on this processor instance, ensuring the call
happens prior to push_frame(...)/emitting BotStoppedSpeakingFrame so
_last_bot_text is set and record_assistant_response triggers in the existing
BotStoppedSpeakingFrame branch. Ensure the captured text is the final assistant
output (not audio) and that you use the same processor object that implements
_last_bot_text and push_frame.
In `@app/ai/voice/agents/breeze_buddy/services/rag/index_manager.py`:
- Around line 175-183: The index_size_bytes calculation in index_manager.py uses
the global RAG_EMBEDDING_DIMENSION which can differ from the dimension actually
stored for a given KB; update get_kb_stats (or the code that reads
rag_embeddings) to read and return the actual stored vector dimension (e.g., a
"vector_dimension" or "embedding_dim" field from rag_embeddings/metadata) and
compute index_size_bytes as chunk_count * actual_dimension * 4, falling back to
RAG_EMBEDDING_DIMENSION if the stored dimension is missing; ensure the returned
dict uses the new field name (e.g., "vector_dimension") so callers can see the
real dimension.
- Around line 107-111: build_knowledge_base currently calls
upsert_knowledge_base under get_pool().acquire() without a transaction, so a
mid-run failure can leave rag_embeddings inconsistent; wrap the upsert +
subsequent stale-delete into a single asyncpg transaction so both statements
commit or rollback together. Modify build_knowledge_base to use
conn.transaction() (or adjust upsert_knowledge_base to accept/use an active
transaction) and ensure the executemany upsert and the DELETE_STALE_CHUNKS_QUERY
run inside that transaction boundary so failures roll back atomically.
In `@app/ai/voice/agents/breeze_buddy/services/rag/memory_router.py`:
- Around line 286-300: The SlowThinker is being invoked even for
empty/whitespace STT transcripts; update the call to
self._slow_thinker.on_user_utterance(user_utterance, history) so it only runs
when the utterance is non-empty after trimming (e.g., if user_utterance and
user_utterance.strip()), mirroring the existing conversation-history update that
checks self._conversation_history; leave the subsequent fast path context
retrieval via self._fast_talker.get_context(user_utterance) unchanged.
- Around line 102-127: The docstring for the RagMemoryRouter builder misstates
that azure_api_key and azure_endpoint are unused while the implementation still
uses them as fallbacks for pred_api_key/pred_endpoint, and
azure_embedding_deployment is unused yet has a misleading default; update the
code by either (A) removing the fallback usage of azure_api_key and
azure_endpoint in the builder so they truly are unused (locate the fallback
logic around pred_api_key/pred_endpoint in the RagMemoryRouter factory) or (B)
keep the fallbacks but correct the docstring to state they are used as
fallbacks; additionally either remove the unused azure_embedding_deployment
parameter from the signature or change its default to match the actual
RAG_EMBEDDING_DEPLOYMENT value (and update the docstring to reflect that
embeddings are controlled by RAG_EMBEDDING_DEPLOYMENT), ensuring references to
the symbols azure_api_key, azure_endpoint, azure_embedding_deployment and
pred_api_key/pred_endpoint are updated accordingly.
In `@app/ai/voice/agents/breeze_buddy/services/rag/semantic_cache.py`:
- Around line 211-222: The near-duplicate refresh path in put() updates
created_at but not last_accessed, so hot entries refreshed via put() can still
be evicted by _evict_lru(); modify the near-duplicate branch in
semantic_cache.put (the block checking self._index.search and where it updates
self._entries[idx].text, .relevance_score, .created_at) to also set
self._entries[idx].last_accessed = time.time() so that refreshes bump LRU
recency along with TTL.
In `@app/ai/voice/agents/breeze_buddy/services/rag/slow_thinker.py`:
- Around line 126-146: The loop in SlowThinker._run drops dequeued items when
rate-limited (now - self._last_run < self._rate_limit) causing lost utterances;
instead drain the queue and keep only the most recent utterance (or requeue the
latest) so we process the last one after the rate window — modify _run to, upon
hitting the rate-limit, drain self._queue (or peek) to find the newest (kind,
text, history) and put that back (or call self._handle_utterance for it later)
rather than just continue; use existing symbols _queue, _last_run, _rate_limit,
and _handle_utterance to implement the drain-or-requeue behavior.
- Around line 16-26: Add a TYPE_CHECKING guard import for numpy and remove the
inline type ignore: import TYPE_CHECKING from typing and inside "if
TYPE_CHECKING:" add "import numpy as np"; then delete the "# type:
ignore[name-defined]" comment on the query_embedding annotation in
slow_thinker.py (the function/variable named query_embedding where the
annotation is "np.ndarray") so the forward string annotation works with from
__future__ import annotations and static type checks can see numpy only during
type checking.
In `@app/api/routers/breeze_buddy/knowledge_base/__init__.py`:
- Around line 206-213: The except blocks currently log the exception and then
include the raw exception string in the HTTPException.detail (see the except
Exception as exc block that calls logger.error("Knowledge file upload failed:
%s", exc) and then raises HTTPException(detail=f"Upload failed: {exc}")). Change
the client-facing message to a static safe string (e.g., "Upload failed" or
"Internal server error while uploading knowledge file") and do NOT interpolate
exc into detail; instead raise the HTTPException chained from the original
exception (raise HTTPException(..., detail="Upload failed") from exc) so the
traceback is preserved but no raw exception text is echoed. Apply the same fix
to the three other except blocks mentioned (lines around 298-307, 355-362,
410-417) in this module.
In `@app/core/config/static.py`:
- Line 280: RAG_EMBEDDING_DIMENSION is configurable but the DB schema and
PgVectorStore default are fixed at 1536; either remove the env var or validate
it at startup. Update the startup/config code that defines
RAG_EMBEDDING_DIMENSION to parse the env var and assert it equals 1536 (the
value used by the migration 023_create_rag_embeddings_table.sql and
PgVectorStore dimension), and if it does not match raise a clear startup error
explaining the mismatch and that a migration/schema change is required before
changing this value. Ensure the error references RAG_EMBEDDING_DIMENSION,
PgVectorStore, and migration 023_create_rag_embeddings_table.sql so maintainers
know what to change.
In `@app/database/__init__.py`:
- Around line 60-61: _decode_vector currently fails for empty or malformed
inputs (e.g., '[]' becomes [''] and float('') raises); update the _decode_vector
function to first strip brackets and check if the inner string is empty or in
unexpected tokens (like '' or 'NULL') and return an empty list in that case,
otherwise parse the comma-separated values; additionally wrap the float
conversion in a try/except to raise a clear ValueError with context (include the
original data) so malformed inputs don’t crash the connection codec.
- Around line 32-69: Remove the DDL from the per-connection codec initializer
and ensure the extension is created once at startup: delete the "CREATE
EXTENSION IF NOT EXISTS vector" call and the unused vector_oid lookup from
_init_vector_codec so it only registers the codec via conn.set_type_codec;
instead, after creating the pool in init_db_pool(), acquire a single connection
and run await conn.execute("CREATE EXTENSION IF NOT EXISTS vector") inside a
try/except that catches asyncpg.exceptions.InsufficientPrivilegeError and logs a
warning, leaving migrations as the preferred way to install the extension.
In `@app/database/accessor/breeze_buddy/rag_embeddings.py`:
- Around line 57-65: Replace the silent-truncating zip in the loop with strict
zipping to fail fast: change the for-loop that currently uses zip(texts,
embeddings, metadata) (the loop that builds chunks before calling
upsert_chunks_args) to use zip(texts, embeddings, metadata, strict=True); ensure
the ValueError from a length mismatch is either allowed to propagate or caught
and re-raised/logged with clear context (merchant_id/template_id and lengths) so
mismatched lengths are detected immediately.
In `@pyproject.toml`:
- Around line 35-36: Update the pdfminer.six constraint to pin a safe minimum
(change the existing "pdfminer.six>=20221105" entry to "pdfminer.six>=20251230")
so the dependency set excludes versions with known CVEs; also address the
faiss-cpu image impact by either selecting a smaller/specific wheel or moving
"faiss-cpu" to an optional/dev dependency and switching your Docker build to a
multi-stage approach (or use a platform-specific slim wheel) to avoid inflating
the final deploy image — search for the dependency lines containing
"pdfminer.six" and "faiss-cpu" in pyproject.toml and apply these changes.
In `@scripts/seed_rag_freshbus.py`:
- Line 353: The print statement uses an unnecessary f-string; replace the call
to print(f"\n=== FreshBus RAG seed script ===") with a normal string literal
(remove the leading f) so it becomes print("\n=== FreshBus RAG seed script
==="); locate the exact statement in scripts/seed_rag_freshbus.py (the print of
the FreshBus RAG seed script banner) and remove the redundant f prefix.
- Around line 319-326: _dead code: `_init_vector_codec` is defined but never
used; either remove it or invoke it on the DB connection before any vector
operations. Fix by calling await _init_vector_codec(conn) immediately after
obtaining the asyncpg connection (the same conn used for inserts/queries) so
conn.set_type_codec runs, or alternatively remove the `_init_vector_codec`
function and its imports if you intentionally rely on pre-formatted text and SQL
casts; reference the helper `_init_vector_codec` and the `conn.set_type_codec`
call when making the change.
---
Nitpick comments:
In `@app/ai/voice/agents/breeze_buddy/agent/__init__.py`:
- Around line 736-742: Update the misleading and inconsistent logs in the RAG
init block: replace the fake "gs://.../%s/%s" message with the real bucket
variable (RAG_GCS_BUCKET) and the merchant_id/template_id so the URI is
accurate, and switch both logger.info and logger.error to the project's
Loguru-style formatting (use f-strings or `{}` placeholders) instead of
`%s`-style arguments; locate the logger calls around the RAG initialization in
__init__.py (the logger.info(...) that references merchant_id and template_id
and the except block logger.error(..., exc)) and format the error log to include
the exception (exc) consistently with the chosen Loguru style.
In `@app/ai/voice/agents/breeze_buddy/services/rag/chunker.py`:
- Around line 127-145: The current overlap logic in _apply_overlap can get
truncated away when adjacent raw chunks are near chunk_size; fix by making the
upstream chunking produce headroom by splitting with a target of (chunk_size -
chunk_overlap) instead of chunk_size so adding prev_tail won't force truncation.
Locate the function that produces the initial raw chunks (the recursive splitter
used before _apply_overlap) and change its split target to max(1, chunk_size -
chunk_overlap) or otherwise ensure non-negative headroom; keep _apply_overlap
unchanged except optionally verifying chunk_overlap is clamped to a sensible
value relative to chunk_size.
In `@app/ai/voice/agents/breeze_buddy/services/rag/embeddings.py`:
- Around line 65-72: The OpenAI client is created without a timeout and
embeddings responses aren't validated against the expected dimension; update the
OpenAI constructor call that sets self._client (the OpenAI(...) instantiation)
to include a bounded timeout (e.g., timeout=20) to avoid long blocking during
outages, and after you parse the embeddings into the numpy array (the variable
`arr` derived from response.data) add an assertion or explicit check that
arr.shape[1] == self._dimension (raise a clear error if not) so mismatched
deployment/model dimensions are detected early before storing into pgvector;
keep references to self._deployment, self._dimension and self._max_batch_size
unchanged.
In `@app/ai/voice/agents/breeze_buddy/services/rag/gcs_loader.py`:
- Around line 34-41: Wrap the JSON parsing of creds_json in a try/except to
catch json.JSONDecodeError and raise a clear, non-sensitive RuntimeError if
parsing fails; specifically, around the json.loads(creds_json) call handle
JSONDecodeError and raise a RuntimeError like "GCS_CREDENTIALS_JSON is present
but malformed" (do not include the raw JSON or offset), then proceed to create
credentials via
service_account.Credentials.from_service_account_info(creds_dict) and return
storage.Client as before (symbols: creds_json, GCS_CREDENTIALS_JSON, json.loads,
service_account.Credentials.from_service_account_info, storage.Client).
- Around line 90-174: The function load_gcs_documents is synchronous and already
used via asyncio.to_thread at its sole call site (the asyncio.to_thread(...,
load_gcs_documents, ...) invocation), so make its blocking nature explicit by
renaming load_gcs_documents to load_gcs_documents_blocking (update the
definition and all references), and update the caller that invokes
asyncio.to_thread to call load_gcs_documents_blocking; alternatively, if you
prefer not to rename, add a clear docstring note on load_gcs_documents stating
it is blocking and must be called via asyncio.to_thread (but prefer renaming to
prevent accidental future misuse).
In `@app/ai/voice/agents/breeze_buddy/services/rag/memory_router.py`:
- Around line 316-319: The knowledge_base_size property is accessing private
internals via self._fast_talker._store.size; add a public accessor on FastTalker
(e.g., a size or store_size property/method that forwards to its internal
PgVectorStore.size) and update memory_router.py to call self._fast_talker.size
(or store_size) instead of reaching into _store and dropping the type: ignore;
reference FastTalker and the knowledge_base_size property when making the
change.
In `@app/ai/voice/agents/breeze_buddy/services/rag/slow_thinker.py`:
- Line 158: Replace the list concatenation with iterable unpacking: instead of
constructing all_queries via [text] + predictions, build it using a single list
literal with unpacking (e.g., [text, *predictions]) so the variable all_queries
is created with iterable unpacking; update the assignment where all_queries is
defined in slow_thinker.py (the use of text and predictions) to follow this
pattern.
In `@app/ai/voice/agents/breeze_buddy/services/rag/types.py`:
- Around line 103-117: Change the prediction_llm_api_key field on the
KnowledgeBaseConfig (in types.py) from Optional[str] to Optional[SecretStr] and
import SecretStr from pydantic; keep the Field(...) description and default
None, and update any downstream uses of prediction_llm_api_key (call sites that
expect a str) to call .get_secret_value() when the raw string is needed so
consumers don’t accidentally log or dump the secret.
In `@app/api/routers/breeze_buddy/knowledge_base/__init__.py`:
- Around line 171-180: Move the lazy imports (json, google.cloud.storage as
storage, google.oauth2.service_account as service_account, EmbeddingProvider,
build_knowledge_base, get_cached_index_stats, invalidate_index and
RAG_EMBEDDING_* constants) out of per-request try blocks into module scope so
ImportError surfaces at startup; if you must keep them lazy, wrap each import
block in a try/except ImportError that raises/returns an HTTP 503 Service
Unavailable with a clear message (e.g., "Dependency missing:
google-cloud-storage") instead of letting the missing-module error propagate as
a 500 during request handling; update the code locations using functions/classes
like build_knowledge_base, EmbeddingProvider, get_cached_index_stats and
invalidate_index to rely on the module-level imports (or the 503-wrapping import
helper) to eliminate per-request overhead and improve fail-fast behavior.
In `@app/database/migrations/023_create_rag_embeddings_table.sql`:
- Around line 48-54: The idx_rag_embeddings_kb index is redundant because the
existing UNIQUE (merchant_id, template_id, chunk_index) constraint already
creates the same btree prefix; remove the CREATE INDEX IF NOT EXISTS
idx_rag_embeddings_kb statement from the migration and leave the CREATE INDEX IF
NOT EXISTS idx_rag_embeddings_indexed_at (which includes indexed_at and is not
redundant). Ensure the UNIQUE constraint on rag_embeddings (merchant_id,
template_id, chunk_index) remains present so tenant-prefix queries still use
that index.
In `@scripts/seed_rag_freshbus.py`:
- Around line 38-47: Replace all direct os.environ accesses for
EMBEDDING_ENDPOINT, EMBEDDING_API_KEY, EMBEDDING_DEPLOYMENT, EMBEDDING_DIMENSION
and the POSTGRES_* pieces that build POSTGRES_DSN with calls to
get_required_env() from app/core/config/static.py; e.g. assign
EMBEDDING_ENDPOINT = get_required_env("RAG_EMBEDDING_ENDPOINT"), wrap
EMBEDDING_DIMENSION = int(get_required_env("RAG_EMBEDDING_DIMENSION"))
(preserving the integer conversion), and construct POSTGRES_DSN by pulling
POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB via
get_required_env() instead of os.environ so missing vars produce the canonical
error path.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3e08ecba-c192-4218-9092-0358cf327bf4
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (27)
app/ai/voice/agents/breeze_buddy/agent/__init__.pyapp/ai/voice/agents/breeze_buddy/agent/pipeline.pyapp/ai/voice/agents/breeze_buddy/processors/__init__.pyapp/ai/voice/agents/breeze_buddy/processors/rag_context.pyapp/ai/voice/agents/breeze_buddy/services/rag/__init__.pyapp/ai/voice/agents/breeze_buddy/services/rag/chunker.pyapp/ai/voice/agents/breeze_buddy/services/rag/embeddings.pyapp/ai/voice/agents/breeze_buddy/services/rag/fast_talker.pyapp/ai/voice/agents/breeze_buddy/services/rag/gcs_loader.pyapp/ai/voice/agents/breeze_buddy/services/rag/index_manager.pyapp/ai/voice/agents/breeze_buddy/services/rag/memory_router.pyapp/ai/voice/agents/breeze_buddy/services/rag/semantic_cache.pyapp/ai/voice/agents/breeze_buddy/services/rag/slow_thinker.pyapp/ai/voice/agents/breeze_buddy/services/rag/types.pyapp/ai/voice/agents/breeze_buddy/services/rag/vector_store.pyapp/ai/voice/agents/breeze_buddy/template/types.pyapp/api/routers/breeze_buddy/__init__.pyapp/api/routers/breeze_buddy/knowledge_base/__init__.pyapp/core/config/static.pyapp/database/__init__.pyapp/database/accessor/breeze_buddy/rag_embeddings.pyapp/database/migrations/023_create_rag_embeddings_table.sqlapp/database/queries/breeze_buddy/rag_embeddings.pygigaAi/docspyproject.tomlscripts/seed_rag_freshbus.pyscripts/test_rag_local.py
| async def _init_rag(self) -> None: | ||
| """Initialise RAG router and context processor if knowledge_base is configured.""" | ||
| if not self.configurations or not self.template: | ||
| return | ||
| kb_config = getattr(self.configurations, "knowledge_base", None) | ||
| if not kb_config: | ||
| return | ||
|
|
||
| merchant_id = self.template.merchant_id or "default" | ||
| template_id = self.template.id | ||
|
|
||
| try: | ||
| self._rag_router = await RagMemoryRouter.build( | ||
| kb_config=kb_config, | ||
| merchant_id=merchant_id, | ||
| template_id=template_id, | ||
| azure_embedding_deployment=RAG_EMBEDDING_DEPLOYMENT, | ||
| ) | ||
| await self._rag_router.start() | ||
| self._rag_context_processor = RagContextProcessor(self._rag_router) | ||
| logger.info( | ||
| "RAG initialised: gs://.../%s/%s", | ||
| merchant_id, | ||
| template_id, | ||
| ) | ||
| except Exception as exc: | ||
| logger.error("RAG initialisation failed (proceeding without RAG): %s", exc) | ||
| self._rag_router = None | ||
| self._rag_context_processor = None |
There was a problem hiding this comment.
_init_rag ignores the global RAG_ENABLED master switch.
The PR adds RAG_ENABLED to app/core/config/static.py as a documented "master switch ... to disable RAG globally without changing templates", and the knowledge-base API router enforces it via _require_rag_enabled(). However, the per-call agent path here only checks configurations.knowledge_base — when RAG_ENABLED=false, any template that has knowledge_base configured will still build a RagMemoryRouter, hit Azure embeddings, and query pgvector on every turn. The kill-switch becomes effectively half-disabled (admin endpoints off, runtime on).
🔧 Suggested fix
+from app.core.config.static import (
+ ENABLE_BREEZE_BUDDY_TRACING,
+ RAG_EMBEDDING_DEPLOYMENT,
+ RAG_ENABLED,
+)
@@
async def _init_rag(self) -> None:
"""Initialise RAG router and context processor if knowledge_base is configured."""
+ if not RAG_ENABLED:
+ return
if not self.configurations or not self.template:
return🧰 Tools
🪛 Ruff (0.15.11)
[warning] 741-741: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/ai/voice/agents/breeze_buddy/agent/__init__.py` around lines 716 - 744,
The _init_rag method currently builds RagMemoryRouter whenever
configurations.knowledge_base exists; update _init_rag to first check the global
RAG_ENABLED master switch (import/inspect RAG_ENABLED) and return early if it is
False, so no RagMemoryRouter is constructed and both self._rag_router and
self._rag_context_processor remain None; specifically, add a guard at the start
of _init_rag that consults RAG_ENABLED before consulting
configurations.knowledge_base to prevent Azure embeddings/pgvector calls when
RAG is globally disabled.
| if isinstance(frame, BotStoppedSpeakingFrame): | ||
| if self._last_bot_text: | ||
| self._router.record_assistant_response(self._last_bot_text) | ||
| self._last_bot_text = None | ||
|
|
||
| await self.push_frame(frame, direction) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify capture_bot_text is wired from the agent / pipeline assembly.
rg -nP -C3 '\bcapture_bot_text\s*\(' --type=pyRepository: juspay/clairvoyance
Length of output: 692
🏁 Script executed:
# Search for calls to capture_bot_text (not just the definition)
rg 'capture_bot_text\(' --type=py -A 2Repository: juspay/clairvoyance
Length of output: 374
🏁 Script executed:
# Check the breeze_buddy agent pipeline assembly and instantiation
fd -e py -path '*breeze_buddy*' | xargs rg -l 'RagContextProcessor|rag_context' --type=pyRepository: juspay/clairvoyance
Length of output: 467
🏁 Script executed:
# Look for TTS-related frame handling in the processor file
cat -n app/ai/voice/agents/breeze_buddy/processors/rag_context.py | head -250Repository: juspay/clairvoyance
Length of output: 9344
🏁 Script executed:
cat -n app/ai/voice/agents/breeze_buddy/agent/pipeline.pyRepository: juspay/clairvoyance
Length of output: 21842
🏁 Script executed:
cat -n app/ai/voice/agents/breeze_buddy/agent/__init__.py | head -150Repository: juspay/clairvoyance
Length of output: 6575
🏁 Script executed:
# Search for capture_bot_text across the entire agents directory
rg 'capture_bot_text' app/ai/voice/agents/ -nRepository: juspay/clairvoyance
Length of output: 177
🏁 Script executed:
rg -n 'LLMFullResponseEndFrame|LLMResponseStartFrame|BotStartedSpeakingFrame' app/ai/voice/agents/breeze_buddy --type pyRepository: juspay/clairvoyance
Length of output: 1387
🏁 Script executed:
# Check for any TTS frame observers or handlers
rg -n 'class.*Observer|def.*tts|TextFrame|SpeechFrame' app/ai/voice/agents/breeze_buddy/handlers --type py | head -50Repository: juspay/clairvoyance
Length of output: 45
🏁 Script executed:
# Search for where record_assistant_response is called
rg -n 'record_assistant_response' app/ai/voice/agents/breeze_buddy --type pyRepository: juspay/clairvoyance
Length of output: 338
🏁 Script executed:
# Check the TTS service and LLM to see if they provide hooks for text capture
rg -n 'class.*TTS|class.*LLM|def.*started|def.*text' app/ai/voice/agents/breeze_buddy/tts.py app/ai/voice/agents/breeze_buddy/llm.py --type py | head -40Repository: juspay/clairvoyance
Length of output: 223
🏁 Script executed:
# Look at the agent main loop to see if there's supposed to be a hook
grep -n "capture_bot_text\|processor\|rag_context" app/ai/voice/agents/breeze_buddy/agent/__init__.py | head -30Repository: juspay/clairvoyance
Length of output: 1742
🏁 Script executed:
# Check if there's any LLMResponseStartFrame or TextFrame handler that could wire this
rg -n 'TextFrame|LLMResponseStartFrame|\.text' app/ai/voice/agents/breeze_buddy/handlers --type py | grep -v test | head -50Repository: juspay/clairvoyance
Length of output: 45
🏁 Script executed:
# Look for where TTS text extraction happens or LLM response handling
fd -e py -path '*breeze_buddy*' | xargs grep -l 'TextFrame\|get_text\|response.*text' | head -10Repository: juspay/clairvoyance
Length of output: 232
🏁 Script executed:
# Check the builtin_dispatcher to see if it's where bot text should be captured
cat -n app/ai/voice/agents/breeze_buddy/handlers/internal/builtin_dispatcher.py | head -100Repository: juspay/clairvoyance
Length of output: 4786
🏁 Script executed:
# Look for LLMResponseStartFrame or similar that might contain bot text
rg -n 'LLMResponseStartFrame|LLMAssistantContextFrame|llm.*text|response.*frame' app/ai/voice/agents/breeze_buddy/handlers --type py | head -40Repository: juspay/clairvoyance
Length of output: 45
🏁 Script executed:
# Check if there's an action manager or frame observer that should capture text
rg -n 'class ActionManager|def.*on_frame|TextFrame' app/ai/voice/agents/breeze_buddy --type py | head -30Repository: juspay/clairvoyance
Length of output: 196
🏁 Script executed:
# Look at how pipecat LLM frames work - check what frames are available after LLM processes
rg -n 'from pipecat.*llm.*frame\|LLMResponseStartFrame\|TextFrame' app/ai/voice/agents/breeze_buddy --type pyRepository: juspay/clairvoyance
Length of output: 45
🏁 Script executed:
# Search for TextFrame and how it's used in the codebase
fd -type f -name "*.py" -path "*breeze_buddy*" -exec grep -l "TextFrame" {} \;Repository: juspay/clairvoyance
Length of output: 234
🏁 Script executed:
# Check pipecat imports to understand what frames are available for bot text
rg -n "from pipecat.*frames" app/ai/voice/agents/breeze_buddy/processors/rag_context.pyRepository: juspay/clairvoyance
Length of output: 102
🏁 Script executed:
# Look at the complete pipeline to see where bot text should be captured
rg -n "LLMResponseStartFrame|TextFrame|get_text\(\)" app/ai/voice/agents/breeze_buddy --type pyRepository: juspay/clairvoyance
Length of output: 45
🏁 Script executed:
# Check the memory_router to understand what record_assistant_response expects
cat -n app/ai/voice/agents/breeze_buddy/services/rag/memory_router.py | sed -n '295,320p'Repository: juspay/clairvoyance
Length of output: 1253
capture_bot_text is never invoked; wiring is missing and must be added.
The method exists but nothing calls it anywhere in the codebase. As a result, _last_bot_text always remains None and record_assistant_response never fires when BotStoppedSpeakingFrame is received (line 98–101). The conversation history will only contain user utterances, never assistant responses, breaking RAG's ability to build complete context for SlowThinker's history.
The bot's response text must be captured from the LLM pipeline (likely via an LLM response frame handler or TTS observer) and passed to processor.capture_bot_text(...) before BotStoppedSpeakingFrame is processed. Without this wiring, the agent will never call record_assistant_response() and RAG context enrichment will be incomplete.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/ai/voice/agents/breeze_buddy/processors/rag_context.py` around lines 98 -
103, capture_bot_text is never called, so _last_bot_text stays None and
record_assistant_response is never invoked when a BotStoppedSpeakingFrame
arrives; wire the LLM/TTS output path to call processor.capture_bot_text(text)
with the assistant's plain text before the BotStoppedSpeakingFrame is pushed.
Locate the LLM response handler or TTS observer (the component that receives raw
assistant text or TTS-ready payload) and invoke capture_bot_text(...) on this
processor instance, ensuring the call happens prior to push_frame(...)/emitting
BotStoppedSpeakingFrame so _last_bot_text is set and record_assistant_response
triggers in the existing BotStoppedSpeakingFrame branch. Ensure the captured
text is the final assistant output (not audio) and that you use the same
processor object that implements _last_bot_text and push_frame.
| messages = getattr(llm_context, "_messages", None) or [] | ||
| for msg in reversed(messages): | ||
| if msg.get("role") == "user": | ||
| content = msg.get("content", "") | ||
| if isinstance(content, str): | ||
| utterance = content.strip() | ||
| elif isinstance(content, list): | ||
| # OpenAI multi-part content: extract text parts | ||
| parts = [ | ||
| p.get("text", "") | ||
| for p in content | ||
| if isinstance(p, dict) and p.get("type") == "text" | ||
| ] | ||
| utterance = " ".join(parts).strip() | ||
| break |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm whether OpenAILLMContext exposes a public messages property/setter in the version on disk.
fd -t f openai_llm_context.py | head
fd -t f openai_llm_context.py -x rg -nP -C2 '\b(def\s+(get_messages|set_messages|add_message)|@property\s*\n\s*def\s+messages|self\._messages)\b' {}Repository: juspay/clairvoyance
Length of output: 45
🌐 Web query:
pipecat OpenAILLMContext public API messages property add_message set_messages
💡 Result:
OpenAILLMContext in Pipecat is a class for managing OpenAI-specific LLM conversation context, including messages, tools, and tool_choice. It has a public messages property that returns the list of ChatCompletionMessageParam messages. It provides methods: add_message(message) to append a single message, add_messages(messages) to append multiple, set_messages(messages) to replace all messages, get_messages to get a copy of the list, plus from_messages(messages) static method and others like get_messages_json. Example usage: from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext context = OpenAILLMContext context.add_message({"role": "system", "content": "You are helpful."}) print(context.messages) # Access public messages property context.set_messages(new_messages) Note: Deprecated since v0.0.99. Migrate to universal LLMContext and LLMContextAggregatorPair: from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_context_aggregator_pair import LLMContextAggregatorPair context = LLMContext(messages, tools) # LLMContext has identical public API: messages property, add_message, add_messages, set_messages, etc. LLMContext offers the same interface with OpenAI-compatible format plus support for LLMSpecificMessage.
Citations:
- 1: https://reference-server.pipecat.ai/en/latest/api/pipecat.processors.aggregators.openai%5Fllm%5Fcontext.html
- 2: https://reference-server.pipecat.ai/en/stable/api/pipecat.processors.aggregators.openai%5Fllm%5Fcontext.html
- 3: https://reference-server.pipecat.ai/en/latest/_modules/pipecat/processors/aggregators/openai_llm_context.html
- 4: https://github.com/pipecat-ai/pipecat/blob/dc97ffc9/src/pipecat/processors/aggregators/openai_llm_context.py
- 5: https://reference-server.pipecat.ai/en/latest/api/pipecat.processors.aggregators.llm_context.html
- 6: https://reference-server.pipecat.ai/en/stable/api/pipecat.processors.aggregators.llm_context.html
- 7: https://reference-server.pipecat.ai/en/latest/_modules/pipecat/processors/aggregators/llm_context.html
🏁 Script executed:
#!/bin/bash
# Read the file to see the actual code at the mentioned lines
wc -l app/ai/voice/agents/breeze_buddy/processors/rag_context.pyRepository: juspay/clairvoyance
Length of output: 126
🏁 Script executed:
#!/bin/bash
# Get the full file content (check size first if needed)
cat -n app/ai/voice/agents/breeze_buddy/processors/rag_context.py | head -200Repository: juspay/clairvoyance
Length of output: 9091
🏁 Script executed:
#!/bin/bash
# Check for imports to determine which context class is used
grep -n "from.*LLMContext\|from.*openai_llm_context\|import.*OpenAI\|import.*LLMContext" app/ai/voice/agents/breeze_buddy/processors/rag_context.pyRepository: juspay/clairvoyance
Length of output: 302
Replace direct _messages access with public OpenAILLMContext API methods.
The processor reads and writes the private _messages list inconsistently:
- Line 160: correctly uses public
add_message() - Line 117, 176: use
getattr(..., "_messages")to read - Line 182: directly mutates
_messageswith assignment
This is fragile to pipecat version changes (e.g., _messages renaming or restructuring), and is inconsistent with the public API used elsewhere in the same method. Use the public accessors instead:
- Line 117: Replace
getattr(llm_context, "_messages", None)withllm_context.messagesorllm_context.get_messages() - Line 182: Replace direct mutation with
llm_context.set_messages([filtered_list])
Cleanup refactor example
def _cleanup_rag_message(self) -> None:
if self._injected_context is None:
return
messages = self._injected_context.get_messages() # or .messages
if not messages:
self._injected_context = None
return
original_len = len(messages)
filtered = [
m for m in messages
if not (
m.get("role") == _RAG_ROLE
and isinstance(m.get("content"), str)
and m["content"].startswith(_RAG_MARKER)
)
]
if len(filtered) < original_len:
self._injected_context.set_messages(filtered)
logger.debug(
"RagContextProcessor: cleaned up %d ephemeral RAG message(s)",
original_len - len(filtered),
)
self._injected_context = None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/ai/voice/agents/breeze_buddy/processors/rag_context.py` around lines 117
- 131, The code is reading and mutating the private _messages on llm_context;
update reads to use the public API (e.g., llm_context.get_messages() or
llm_context.messages) and replace direct writes with
llm_context.set_messages(filtered_list); specifically, change the
getattr(llm_context, "_messages", None) usage to
llm_context.get_messages()/messages when extracting the last user utterance and
use llm_context.set_messages(...) instead of assigning to _messages when
cleaning up or updating messages (keep existing add_message(...) calls as they
are).
| # 3. Upsert into pgvector | ||
| async with get_pool().acquire() as conn: | ||
| n = await upsert_knowledge_base( | ||
| conn, merchant_id, template_id, texts, embeddings_array, metadata | ||
| ) |
There was a problem hiding this comment.
Wrap the KB rebuild in a transaction for atomicity.
upsert_knowledge_base (app/database/accessor/breeze_buddy/rag_embeddings.py) performs an executemany upsert followed by a separate DELETE_STALE_CHUNKS_QUERY to drop orphan chunks. With autocommit (no enclosing transaction), a failure between those two statements—or a failure mid-executemany when shrinking a KB—leaves the rag_embeddings table in an inconsistent state (mixed old/new chunks, or orphans whose chunk_index >= new_count). Since build_knowledge_base is the single entry point for rebuilds and is documented as idempotent, run both statements inside a single asyncpg transaction so a failed rebuild rolls back cleanly.
🛡️ Suggested fix
- # 3. Upsert into pgvector
- async with get_pool().acquire() as conn:
- n = await upsert_knowledge_base(
- conn, merchant_id, template_id, texts, embeddings_array, metadata
- )
+ # 3. Upsert into pgvector (atomic upsert + stale-row cleanup)
+ async with get_pool().acquire() as conn:
+ async with conn.transaction():
+ n = await upsert_knowledge_base(
+ conn, merchant_id, template_id, texts, embeddings_array, metadata
+ )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/ai/voice/agents/breeze_buddy/services/rag/index_manager.py` around lines
107 - 111, build_knowledge_base currently calls upsert_knowledge_base under
get_pool().acquire() without a transaction, so a mid-run failure can leave
rag_embeddings inconsistent; wrap the upsert + subsequent stale-delete into a
single asyncpg transaction so both statements commit or rollback together.
Modify build_knowledge_base to use conn.transaction() (or adjust
upsert_knowledge_base to accept/use an active transaction) and ensure the
executemany upsert and the DELETE_STALE_CHUNKS_QUERY run inside that transaction
boundary so failures roll back atomically.
| chunk_count = stats.get("chunk_count", 0) | ||
| return { | ||
| "chunk_count": chunk_count, | ||
| "total_documents": stats.get("file_count", 0), | ||
| # Approximate storage estimate: each vector is dimension * 4 bytes | ||
| "index_size_bytes": chunk_count * RAG_EMBEDDING_DIMENSION * 4, | ||
| "last_indexed_at": last_indexed_str, | ||
| "error_message": None, | ||
| } |
There was a problem hiding this comment.
index_size_bytes may misreport when the stored vector dimension differs from the env default.
The size estimate multiplies by RAG_EMBEDDING_DIMENSION (the current env default), not by the dimension actually stored for this (merchant_id, template_id). If RAG_EMBEDDING_DIMENSION is ever changed without re-indexing all KBs, this stat will silently be wrong for legacy KBs. Either record the dimension in rag_embeddings/get_kb_stats and use that, or document the figure as "current-config approximation only".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/ai/voice/agents/breeze_buddy/services/rag/index_manager.py` around lines
175 - 183, The index_size_bytes calculation in index_manager.py uses the global
RAG_EMBEDDING_DIMENSION which can differ from the dimension actually stored for
a given KB; update get_kb_stats (or the code that reads rag_embeddings) to read
and return the actual stored vector dimension (e.g., a "vector_dimension" or
"embedding_dim" field from rag_embeddings/metadata) and compute index_size_bytes
as chunk_count * actual_dimension * 4, falling back to RAG_EMBEDDING_DIMENSION
if the stored dimension is missing; ensure the returned dict uses the new field
name (e.g., "vector_dimension") so callers can see the real dimension.
| def _decode_vector(data: str) -> list: # '[0.1,…]' → list[float] | ||
| return [float(x) for x in data.strip("[]").split(",")] |
There was a problem hiding this comment.
_decode_vector raises on empty / unexpected formats.
'[]'.strip('[]').split(',') == [''] and float('') raises. While zero-dim vectors are unlikely in practice, the decoder will also raise on any unexpected pgvector text format (e.g., NULL semantics handled by asyncpg, but malformed values would panic the connection codec). Consider a small defensive guard:
🛡️ Proposed fix
- def _decode_vector(data: str) -> list: # '[0.1,…]' → list[float]
- return [float(x) for x in data.strip("[]").split(",")]
+ def _decode_vector(data: str) -> list[float]: # '[0.1,…]' → list[float]
+ body = data.strip("[]").strip()
+ if not body:
+ return []
+ return [float(x) for x in body.split(",")]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _decode_vector(data: str) -> list: # '[0.1,…]' → list[float] | |
| return [float(x) for x in data.strip("[]").split(",")] | |
| def _decode_vector(data: str) -> list[float]: # '[0.1,…]' → list[float] | |
| body = data.strip("[]").strip() | |
| if not body: | |
| return [] | |
| return [float(x) for x in body.split(",")] |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/database/__init__.py` around lines 60 - 61, _decode_vector currently
fails for empty or malformed inputs (e.g., '[]' becomes [''] and float('')
raises); update the _decode_vector function to first strip brackets and check if
the inner string is empty or in unexpected tokens (like '' or 'NULL') and return
an empty list in that case, otherwise parse the comma-separated values;
additionally wrap the float conversion in a try/except to raise a clear
ValueError with context (include the original data) so malformed inputs don’t
crash the connection codec.
| n = len(texts) | ||
| chunks: List[Tuple[int, str, str, List[float]]] = [] | ||
| for i, (text, emb, meta) in enumerate(zip(texts, embeddings, metadata)): | ||
| source = meta.get("source", "") | ||
| chunks.append((i, source, text, emb.tolist())) | ||
|
|
||
| rows = upsert_chunks_args(merchant_id, template_id, chunks) | ||
|
|
||
| await conn.executemany(UPSERT_CHUNKS_QUERY, rows) |
There was a problem hiding this comment.
Use zip(..., strict=True) to fail fast on length mismatch.
texts, embeddings, and metadata come from three separate code paths in index_manager.build_knowledge_base (chunker output, embed call, chunker metadata). If one is ever shorter than the others (e.g., a partial embedding response), zip silently truncates and the knowledge base gets indexed with missing chunks — the bug surfaces only as poor retrieval quality much later. strict=True raises ValueError immediately. Ruff also flags this (B905).
🛡️ Proposed fix
- for i, (text, emb, meta) in enumerate(zip(texts, embeddings, metadata)):
+ for i, (text, emb, meta) in enumerate(
+ zip(texts, embeddings, metadata, strict=True)
+ ):
source = meta.get("source", "")
chunks.append((i, source, text, emb.tolist()))🧰 Tools
🪛 Ruff (0.15.11)
[warning] 59-59: zip() without an explicit strict= parameter
Add explicit value for parameter strict=
(B905)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/database/accessor/breeze_buddy/rag_embeddings.py` around lines 57 - 65,
Replace the silent-truncating zip in the loop with strict zipping to fail fast:
change the for-loop that currently uses zip(texts, embeddings, metadata) (the
loop that builds chunks before calling upsert_chunks_args) to use zip(texts,
embeddings, metadata, strict=True); ensure the ValueError from a length mismatch
is either allowed to propagate or caught and re-raised/logged with clear context
(merchant_id/template_id and lengths) so mismatched lengths are detected
immediately.
| "faiss-cpu>=1.7.4", | ||
| "pdfminer.six>=20221105", |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
gh api graphql -f query='
{
securityVulnerabilities(first: 10, ecosystem: PIP, package: "pdfminer.six") {
nodes { advisory { summary severity publishedAt } vulnerableVersionRange firstPatchedVersion { identifier } }
}
}'
gh api graphql -f query='
{
securityVulnerabilities(first: 10, ecosystem: PIP, package: "faiss-cpu") {
nodes { advisory { summary severity publishedAt } vulnerableVersionRange firstPatchedVersion { identifier } }
}
}'Repository: juspay/clairvoyance
Length of output: 903
Critical security constraint on pdfminer.six; image size consideration for faiss-cpu.
Two operational issues with the new dependencies:
-
pdfminer.six version constraint is too loose and allows vulnerable versions. Version
>=20221105permits installation of versions with HIGH severity CVEs, including Arbitrary Code Execution via crafted PDF input (fixed in 20251107) and Insecure Deserialization via pickle (fixed in 20251230). Since pdfminer.six parses untrusted PDFs from GCS, you must pin to at least>=20251230to exclude all known vulnerabilities. -
faiss-cpuwheels weigh ~50–80 MB and include AVX2/MKL dependencies. If your deploy image was previously slim, this will measurably grow it; consider a multi-stage build or explicit slim wheel target if image size is constrained.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pyproject.toml` around lines 35 - 36, Update the pdfminer.six constraint to
pin a safe minimum (change the existing "pdfminer.six>=20221105" entry to
"pdfminer.six>=20251230") so the dependency set excludes versions with known
CVEs; also address the faiss-cpu image impact by either selecting a
smaller/specific wheel or moving "faiss-cpu" to an optional/dev dependency and
switching your Docker build to a multi-stage approach (or use a
platform-specific slim wheel) to avoid inflating the final deploy image — search
for the dependency lines containing "pdfminer.six" and "faiss-cpu" in
pyproject.toml and apply these changes.
| async def _init_vector_codec(conn: asyncpg.Connection) -> None: | ||
| await conn.set_type_codec( | ||
| "vector", | ||
| encoder=lambda v: "[" + ",".join(str(x) for x in v) + "]", | ||
| decoder=lambda s: [float(x) for x in s.strip("[]").split(",")], | ||
| schema="public", | ||
| format="text", | ||
| ) |
There was a problem hiding this comment.
Dead code: _init_vector_codec is defined but never registered.
The connection opened at line 367 never calls await conn.set_type_codec(...), so this helper has no effect. Writes work because line 377 pre-formats the vector as '[…]' text and the SQL casts via $6::vector; reads at lines 427–439 only fetch chunk_text and a numeric score, so no decoder is needed either. Either drop the helper to avoid confusion or actually invoke it on conn before use (the production version in app/database/__init__.py:_init_vector_codec is wired through the pool's init= parameter for this reason).
♻️ Suggested cleanup
-async def _init_vector_codec(conn: asyncpg.Connection) -> None:
- await conn.set_type_codec(
- "vector",
- encoder=lambda v: "[" + ",".join(str(x) for x in v) + "]",
- decoder=lambda s: [float(x) for x in s.strip("[]").split(",")],
- schema="public",
- format="text",
- )
-
-
# ---------------------------------------------------------------------------
# Embedding helper
# ---------------------------------------------------------------------------📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def _init_vector_codec(conn: asyncpg.Connection) -> None: | |
| await conn.set_type_codec( | |
| "vector", | |
| encoder=lambda v: "[" + ",".join(str(x) for x in v) + "]", | |
| decoder=lambda s: [float(x) for x in s.strip("[]").split(",")], | |
| schema="public", | |
| format="text", | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Embedding helper | |
| # --------------------------------------------------------------------------- |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/seed_rag_freshbus.py` around lines 319 - 326, _dead code:
`_init_vector_codec` is defined but never used; either remove it or invoke it on
the DB connection before any vector operations. Fix by calling await
_init_vector_codec(conn) immediately after obtaining the asyncpg connection (the
same conn used for inserts/queries) so conn.set_type_codec runs, or
alternatively remove the `_init_vector_codec` function and its imports if you
intentionally rely on pre-formatted text and SQL casts; reference the helper
`_init_vector_codec` and the `conn.set_type_codec` call when making the change.
|
|
||
|
|
||
| async def main() -> None: | ||
| print(f"\n=== FreshBus RAG seed script ===") |
There was a problem hiding this comment.
Drop the redundant f prefix.
Ruff (F541): f-string without placeholders.
🔧 Proposed fix
- print(f"\n=== FreshBus RAG seed script ===")
+ print("\n=== FreshBus RAG seed script ===")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| print(f"\n=== FreshBus RAG seed script ===") | |
| print("\n=== FreshBus RAG seed script ===") |
🧰 Tools
🪛 Ruff (0.15.11)
[error] 353-353: f-string without any placeholders
Remove extraneous f prefix
(F541)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/seed_rag_freshbus.py` at line 353, The print statement uses an
unnecessary f-string; replace the call to print(f"\n=== FreshBus RAG seed script
===") with a normal string literal (remove the leading f) so it becomes
print("\n=== FreshBus RAG seed script ==="); locate the exact statement in
scripts/seed_rag_freshbus.py (the print of the FreshBus RAG seed script banner)
and remove the redundant f prefix.
49fd62e to
32d1fc1
Compare
| # Check for near-duplicate | ||
| if self._index.ntotal > 0: | ||
| q = query_embedding.reshape(1, -1).astype(np.float32).copy() | ||
| faiss.normalize_L2(q) | ||
| scores, idxs = self._index.search(q, 1) # type: ignore[call-arg] | ||
| if scores[0][0] > 0.95 and idxs[0][0] != -1: | ||
| idx = int(idxs[0][0]) | ||
| if idx < len(self._entries): | ||
| self._entries[idx].text = text | ||
| self._entries[idx].relevance_score = relevance_score | ||
| self._entries[idx].created_at = time.time() | ||
| self._entries[idx].last_accessed = time.time() | ||
| return # refreshed — done |
There was a problem hiding this comment.
SemanticCache._put_locked() treats query_embedding as the stored vector and also performs a near-duplicate refresh when cosine > 0.95. Since SlowThinker/FastTalker currently call put_batch() with the same query_embedding for multiple retrieved chunks, this logic will overwrite the existing entry and you’ll end up caching only one chunk per query. Consider storing per-chunk/document embeddings in the cache (and passing those into put/put_batch), or remove/adjust the near-duplicate refresh behavior so multiple chunks can coexist.
| if self._index.ntotal == 0: | ||
| return [] | ||
|
|
||
| q = query_embedding.reshape(1, -1).astype(np.float32).copy() | ||
| faiss.normalize_L2(q) | ||
|
|
||
| k = min(top_k, self._index.ntotal) | ||
| scores, idxs = self._index.search(q, k) # type: ignore[call-arg] | ||
|
|
||
| now = time.time() | ||
| hits: List[CachedContext] = [] | ||
| for score, idx in zip(scores[0], idxs[0]): | ||
| if idx == -1 or idx >= len(self._entries): | ||
| continue | ||
| entry = self._entries[idx] | ||
| if entry.is_expired: | ||
| continue | ||
| if score >= threshold: | ||
| entry.last_accessed = now | ||
| hits.append(entry) | ||
|
|
There was a problem hiding this comment.
The cache is read without a lock while writes can mutate/rebuild self._index and self._entries under the write lock. In this RAG design, SlowThinker writes concurrently with FastTalker reads, so get() can race with _rebuild_index() / add() and potentially crash or return inconsistent results. Consider protecting reads with the same lock (or a RW-lock / copy-on-write swap) to make FAISS index access safe under concurrent read/write.
| if self._index.ntotal == 0: | |
| return [] | |
| q = query_embedding.reshape(1, -1).astype(np.float32).copy() | |
| faiss.normalize_L2(q) | |
| k = min(top_k, self._index.ntotal) | |
| scores, idxs = self._index.search(q, k) # type: ignore[call-arg] | |
| now = time.time() | |
| hits: List[CachedContext] = [] | |
| for score, idx in zip(scores[0], idxs[0]): | |
| if idx == -1 or idx >= len(self._entries): | |
| continue | |
| entry = self._entries[idx] | |
| if entry.is_expired: | |
| continue | |
| if score >= threshold: | |
| entry.last_accessed = now | |
| hits.append(entry) | |
| q = query_embedding.reshape(1, -1).astype(np.float32).copy() | |
| faiss.normalize_L2(q) | |
| async with self._lock: | |
| if self._index.ntotal == 0: | |
| return [] | |
| k = min(top_k, self._index.ntotal) | |
| scores, idxs = self._index.search(q, k) # type: ignore[call-arg] | |
| now = time.time() | |
| hits: List[CachedContext] = [] | |
| for score, idx in zip(scores[0], idxs[0]): | |
| if idx == -1 or idx >= len(self._entries): | |
| continue | |
| entry = self._entries[idx] | |
| if entry.is_expired: | |
| continue | |
| if score >= threshold: | |
| entry.last_accessed = now | |
| hits.append(entry) |
| # Use the query embedding as the cache key (document embeddings no longer | ||
| # fetched from the DB). | ||
| await self._cache.put_batch( | ||
| [ | ||
| { | ||
| "query_embedding": q_emb, | ||
| "text": r.text, | ||
| "metadata": r.metadata, | ||
| "relevance_score": r.score, | ||
| } | ||
| for r in results | ||
| ] | ||
| ) | ||
|
|
There was a problem hiding this comment.
FastTalker’s fallback path populates the semantic cache using the query embedding for every retrieved chunk. This makes all cached entries identical in vector space and prevents the cache from storing/returning multiple distinct chunks. Consider caching by each chunk’s embedding (e.g., return embedding from PgVectorStore.search() / pgvector query) so future semantically similar queries can hit the same cached chunks.
| # Use the query embedding as the cache key (document embeddings no longer | |
| # fetched from the DB). | |
| await self._cache.put_batch( | |
| [ | |
| { | |
| "query_embedding": q_emb, | |
| "text": r.text, | |
| "metadata": r.metadata, | |
| "relevance_score": r.score, | |
| } | |
| for r in results | |
| ] | |
| ) | |
| # Cache each chunk by its own embedding, not the original query embedding, | |
| # so distinct chunks remain distinct in vector space. | |
| cache_entries = [] | |
| missing_embeddings = 0 | |
| for r in results: | |
| chunk_embedding = getattr(r, "embedding", None) | |
| if chunk_embedding is None: | |
| missing_embeddings += 1 | |
| continue | |
| cache_entries.append( | |
| { | |
| "query_embedding": chunk_embedding, | |
| "text": r.text, | |
| "metadata": r.metadata, | |
| "relevance_score": r.score, | |
| } | |
| ) | |
| if cache_entries: | |
| await self._cache.put_batch(cache_entries) | |
| if missing_embeddings: | |
| logger.debug( | |
| "FastTalker fallback: skipped caching %d retrieved chunks without embeddings for '%s…'", | |
| missing_embeddings, | |
| query[:40], | |
| ) |
| "When set, Breeze Buddy pre-loads the specified GCS bucket/prefix into a " | ||
| "FAISS index and injects relevant context into every LLM turn using the " | ||
| "dual-agent (SlowThinker + FastTalker) architecture.", |
There was a problem hiding this comment.
The knowledge_base field description says Breeze Buddy “pre-loads … into a FAISS index”, but this PR’s implementation uses pgvector in Postgres as the persistent vector store (FAISS is only used for the per-call semantic cache). Updating this description will prevent template authors from being misled about where/when indexing happens.
| "When set, Breeze Buddy pre-loads the specified GCS bucket/prefix into a " | |
| "FAISS index and injects relevant context into every LLM turn using the " | |
| "dual-agent (SlowThinker + FastTalker) architecture.", | |
| "When set, Breeze Buddy indexes and retrieves documents from the " | |
| "configured GCS bucket/prefix using the persistent pgvector-backed " | |
| "Postgres store, and injects relevant context into every LLM turn " | |
| "using the dual-agent (SlowThinker + FastTalker) architecture.", |
| blob_name = f"{prefix}{file.filename}" | ||
| blob = client.bucket(gcs_bucket).blob(blob_name) | ||
|
|
There was a problem hiding this comment.
blob_name is constructed directly from file.filename. If the filename contains path separators or .. segments, it can create unexpected object keys under the prefix (and make later listing/cleanup ambiguous). Consider normalizing to a basename and stripping disallowed characters (and optionally enforcing an allowlist of extensions / max upload size) before writing to GCS.
| router.include_router(daily_router, prefix="", tags=["daily"]) | ||
|
|
||
| # RAG knowledge base management (upload, index, status, invalidate) | ||
| router.include_router(knowledge_base_router, prefix="", tags=["knowledge-base"]) |
There was a problem hiding this comment.
Routes under knowledge_base_router already declare tags=["knowledge-base"] on each endpoint, and include_router(..., tags=["knowledge-base"]) adds the same tag again. This can lead to duplicated tags in the OpenAPI schema. Consider removing tags from either the include_router call or the individual route decorators.
| router.include_router(knowledge_base_router, prefix="", tags=["knowledge-base"]) | |
| router.include_router(knowledge_base_router, prefix="") |
| RAG_EMBEDDING_ENDPOINT = os.environ.get( | ||
| "RAG_EMBEDDING_ENDPOINT", | ||
| "https://breeze-automatic.services.ai.azure.com/openai/v1/", | ||
| ) |
There was a problem hiding this comment.
RAG_EMBEDDING_ENDPOINT defaults to a concrete Azure Foundry URL. Given this is environment-specific, a non-empty default risks accidentally coupling local/staging to a production endpoint (or obscuring misconfiguration). Consider defaulting this to an empty string and requiring it to be set explicitly when RAG is enabled.
| await self._cache.put_batch( | ||
| [ | ||
| { | ||
| "query_embedding": query_embedding, | ||
| "text": r.text, | ||
| "metadata": r.metadata, | ||
| "relevance_score": r.score, | ||
| } | ||
| for r in results | ||
| ] | ||
| ) | ||
| self._metrics.prefetch_operations += 1 | ||
| logger.debug( | ||
| "SlowThinker: prefetched %d chunks for '%s…'", | ||
| len(results), | ||
| query[:40], |
There was a problem hiding this comment.
SlowThinker._retrieve_and_cache() passes the query embedding into SemanticCache.put_batch() for every retrieved chunk. With the current cache implementation this collapses all chunks for a query into a single cached vector/entry. Once the cache is fixed to key on document/chunk embeddings, this callsite should pass each result’s embedding instead of the query embedding (or otherwise ensure each cached entry has a distinct vector).
| await self._cache.put_batch( | |
| [ | |
| { | |
| "query_embedding": query_embedding, | |
| "text": r.text, | |
| "metadata": r.metadata, | |
| "relevance_score": r.score, | |
| } | |
| for r in results | |
| ] | |
| ) | |
| self._metrics.prefetch_operations += 1 | |
| logger.debug( | |
| "SlowThinker: prefetched %d chunks for '%s…'", | |
| len(results), | |
| query[:40], | |
| cache_items = [] | |
| skipped_results = 0 | |
| for r in results: | |
| result_embedding = getattr(r, "embedding", None) | |
| if result_embedding is None: | |
| skipped_results += 1 | |
| logger.warning( | |
| "SlowThinker: skipping cache entry for retrieved chunk without embedding" | |
| ) | |
| continue | |
| cache_items.append( | |
| { | |
| "query_embedding": result_embedding, | |
| "text": r.text, | |
| "metadata": r.metadata, | |
| "relevance_score": r.score, | |
| } | |
| ) | |
| if cache_items: | |
| await self._cache.put_batch(cache_items) | |
| self._metrics.prefetch_operations += 1 | |
| logger.debug( | |
| "SlowThinker: prefetched %d/%d chunks for '%s…'%s", | |
| len(cache_items), | |
| len(results), | |
| query[:40], | |
| f" ({skipped_results} skipped without embeddings)" | |
| if skipped_results | |
| else "", |
| SEARCH_QUERY = """ | ||
| SELECT | ||
| chunk_text, | ||
| source_file, | ||
| chunk_index, | ||
| 1 - (embedding <=> $1::vector) AS score | ||
| FROM rag_embeddings | ||
| WHERE merchant_id = $2 | ||
| AND template_id = $3 | ||
| ORDER BY embedding <=> $1::vector | ||
| LIMIT $4 | ||
| """ |
There was a problem hiding this comment.
SEARCH_QUERY/search_chunks() only returns text + metadata + score. To support semantic caching keyed by document/chunk embeddings (as described in SemanticCache), the search query likely needs to also select embedding and return it (decoded via the pgvector codec) so callers can cache using per-chunk vectors rather than reusing the query vector for every entry.
| once per index run, chunked, embedded (Azure OpenAI text-embedding-3-small) and | ||
| stored in PostgreSQL via pgvector. The IndexManager writes embeddings into the | ||
| ``rag_embeddings`` table; the PgVectorStore fetches them on demand per call. |
There was a problem hiding this comment.
Module docstring says documents are embedded with “Azure OpenAI text-embedding-3-small”, but the EmbeddingProvider and config default use the Foundry deployment embed-v-4-0. Please update the docstring to reflect the actual embedding deployment/model to avoid operational confusion.
| once per index run, chunked, embedded (Azure OpenAI text-embedding-3-small) and | |
| stored in PostgreSQL via pgvector. The IndexManager writes embeddings into the | |
| ``rag_embeddings`` table; the PgVectorStore fetches them on demand per call. | |
| once per index run, chunked, embedded using the configured Foundry deployment | |
| (`embed-v-4-0` by default), and stored in PostgreSQL via pgvector. The | |
| IndexManager writes embeddings into the ``rag_embeddings`` table; the | |
| PgVectorStore fetches them on demand per call. |
Embeds merchant FAQ chunks via Azure AI Foundry (embed-v4.0) into pgvector. On each user turn, the utterance is embedded and the top-K nearest chunks are fetched via cosine similarity search and injected ephemerally into the LLM context before the response — removed immediately after so history stays clean. A per-template min_score threshold (default 0.30, configurable per language) filters irrelevant results. A text-cache and FAISS semantic cache (warmed by SlowThinker background predictions) reduce repeat-query latency to sub-ms.
32d1fc1 to
abc8081
Compare
feat: add RAG pipeline for Breeze Buddy knowledge-base Q&A Embeds merchant FAQ chunks via Azure AI Foundry (embed-v4.0) into pgvector. On each user turn, the utterance is embedded and the top-K nearest chunks are fetched via cosine similarity search and injected ephemerally into the LLM context before the response — removed immediately after so history stays clean. A per-template min_score threshold (default 0.30, configurable per language) filters irrelevant results. A text-cache and FAISS semantic cache (warmed by SlowThinker background predictions) reduce repeat-query latency to sub-ms.
Summary by CodeRabbit
Release Notes
New Features
Dependencies