Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 102 additions & 9 deletions app/ai/voice/agents/breeze_buddy/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from opentelemetry import trace
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMMessagesAppendFrame, TTSSpeakFrame
from pipecat.frames.frames import EndFrame, LLMMessagesAppendFrame, TTSSpeakFrame
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
Expand Down Expand Up @@ -83,12 +83,14 @@
)
from app.ai.voice.agents.breeze_buddy.template.vad import create_vad_analyzer
from app.ai.voice.agents.breeze_buddy.utils.common import (
fire_and_forget,
track_error,
)
from app.ai.voice.agents.breeze_buddy.utils.transport.websockets import (
close_websocket_safely,
)
from app.ai.voice.agents.breeze_buddy.utils.warm_transfer import set_transfer_flag
from app.core.config.dynamic import BB_STT_SERVICE
from app.core.config.static import ENABLE_BREEZE_BUDDY_TRACING
from app.core.logger import logger
from app.core.logger.context import (
Expand All @@ -102,6 +104,12 @@
)
from app.schemas import CallProvider
from app.schemas.breeze_buddy.core import ExecutionMode, LeadCallTracker
from app.services.fallback import (
BB_FALLBACK_CONFIG,
ServiceFallback,
ServiceFallbackConfig,
)
from app.services.slack import slack_alert

DEFAULT_OUTCOME = "BUSY"
TTS_SPEAK_MAX_CHARS = 2000
Expand Down Expand Up @@ -174,6 +182,12 @@ def __init__(
# Error tracking
self.errors: List[Dict[str, Any]] = []

# STT fallback state
self.stt_provider: Optional[str] = None
self._stt_service: Any = None
self._stt_failure_recorded: bool = False
self._mid_call_alert_sent: bool = False

@property
def is_daily_mode(self) -> bool:
return self.transport_type == TRANSPORT_TYPE_DAILY
Expand Down Expand Up @@ -273,6 +287,32 @@ async def _handle_post_greeting_idle(self, user_idle_config) -> None:
logger.debug("Post-greeting idle timer cancelled.")
return

async def _send_mid_call_stt_alert(self) -> None:
"""Send Slack alert when STT fails mid-call and call must end."""
from app.core.config.static import SLACK_TAG_USERS

_fallback_tag = "@breeze-sentinals"
tag = f"{_fallback_tag},{SLACK_TAG_USERS}" if SLACK_TAG_USERS else _fallback_tag
provider = (self.stt_provider or "unknown").capitalize()
try:
await slack_alert.send(
title="🚨 STT Failed — Call Ended (Breeze Buddy)",
fields=[
{"name": "Provider", "value": provider},
{"name": "Call SID", "value": self.call_sid or "unknown"},
],
sections=[
{
"title": "What Happened",
"text": "STT failed mid-call. Call could not continue.",
}
],
fallback_text=f"STT failed, call ended — {self.call_sid or 'unknown'}",
tag_users=tag,
)
except Exception as e:
logger.warning(f"Failed to send mid-call STT alert: {e}")

async def _setup_daily_transport(self, runner_args: RunnerArguments) -> None:
"""Initialize transport for Daily mode."""
if not runner_args or not runner_args.body:
Expand Down Expand Up @@ -644,7 +684,7 @@ def _register_event_handlers(self) -> None:

@self.task.event_handler("on_pipeline_error")
async def on_pipeline_error(task, error):
"""Capture TTS/STT/LLM pipeline failures."""
"""Handle pipeline errors — record STT failures in circuit breaker and end call."""
processor = getattr(error, "processor", "unknown")
error_msg = getattr(error, "error", str(error))
detailed_msg = f"[PIPELINE] {processor}: {error_msg}"
Expand All @@ -656,6 +696,57 @@ async def on_pipeline_error(task, error):
{"processor": str(processor), "error": error_msg},
)

# Detect STT errors by processor name keywords
processor_str = str(processor).lower()
stt_keywords = (
"stt",
"soniox",
"deepgram",
"transcri",
"google",
"sarvam",
)
is_stt_error = any(kw in processor_str for kw in stt_keywords)
Comment on lines +699 to +709
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Substring classification has false positives and a missing provider.

"google" matches any google_* processor (Google LLM/TTS/Vertex) — not just STT — so non-STT errors will be misclassified and trigger STT fallback recording + alert + EndFrame(). Conversely, "openai" is missing despite OpenAI being a supported STT provider per the learning above.

Prefer matching on self._stt_service's class identity (or processor name from the service instance) instead of substring sniffing the error's processor attribute. At minimum, anchor on more specific tokens (e.g. "_stt", "google_stt") and add "openai" if you keep this approach.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/ai/voice/agents/breeze_buddy/agent/__init__.py` around lines 625 - 635,
The substring check that builds processor_str and stt_keywords leads to false
positives (e.g., "google" matching non‑STT Google processors) and misses
"openai"; replace it by determining STT identity from the actual STT service
instance instead of the error's processor string — use self._stt_service (or a
provider/name attribute on that instance) to classify STT errors, falling back
to a tighter substring match only if the service instance is unavailable (e.g.,
require tokens like "_stt" or "google_stt" and include "openai" in
stt_keywords); update the logic that sets is_stt_error (and references to
processor_str and stt_keywords) to use this service-based check so only true STT
providers trigger STT fallback recording, alerting, and EndFrame().


if not is_stt_error:
return

logger.warning(f"STT error detected from processor: {processor}")

# Record failure in fallback system (once per call, any STT provider)
if not self._stt_failure_recorded:
self._stt_failure_recorded = True
try:
cfg = await BB_FALLBACK_CONFIG("stt")
if cfg.enabled:
primary_provider = await BB_STT_SERVICE()
fb = ServiceFallback(
ServiceFallbackConfig(
service_name="stt",
failure_threshold=cfg.threshold,
failure_window_secs=cfg.window_secs,
fallback_duration_secs=cfg.duration_secs,
primary_provider_name=primary_provider,
fallback_provider_name=cfg.fallback_provider,
)
)
await fb.record_failure(
error_msg=str(error_msg)[:200],
call_sid=self.call_sid or "",
context="mid-call",
)
except Exception as fb_err:
logger.warning(f"STT fallback record_failure failed: {fb_err}")

# Alert and end call — no mid-call swap in Phase 1
if not self._mid_call_alert_sent:
self._mid_call_alert_sent = True
fire_and_forget(self._send_mid_call_stt_alert())
try:
await task.queue_frames([EndFrame()])
except Exception as e:
logger.warning(f"Failed to queue EndFrame after STT error: {e}")

@self.transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
Expand All @@ -677,6 +768,7 @@ async def on_client_disconnected(transport, client):
logger.info(
"Cancelling the post greeting task due to client disconnect"
)

await self._handle_unexpected_disconnect("client_disconnected")

@self.task.event_handler("on_idle_timeout")
Expand Down Expand Up @@ -888,17 +980,18 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None:
f"Invalid TTS provider '{payload_provider}' in payload, keeping existing config"
)

# Build services and pipeline. Stream mode skips LLM creation and
# runs build_pipeline with mode="stream" (no LLM processor, no
# assistant aggregator, transcript collector inserted, no user idle).
# All other wiring is identical.
# Create services and pipeline
# VAD analyzer is passed to build_pipeline where it's configured inside the
# LLMUserAggregator. This enables UserTurnStrategies (VAD + Transcription fallback).
is_stream = self.is_stream_mode
stt, llm, tts = await create_services(
stt_result, llm, tts = await create_services(
self.configurations, include_llm=not is_stream
)
if not is_stream:
assert llm is not None, "LLM is required in agent mode"

if stt_result is not None:
self.stt_provider = stt_result.provider
self._stt_service = stt_result.service
(
pipeline,
context,
Expand All @@ -908,7 +1001,7 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None:
self._transcript_collector,
) = await build_pipeline(
self.transport,
stt,
stt_result.service if stt_result is not None else None,
llm,
tts,
self.vad_analyzer,
Expand Down
6 changes: 3 additions & 3 deletions app/ai/voice/agents/breeze_buddy/agent/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async def create_services(
logger.info(
f"Using template STT configuration: provider={stt_configuration.provider.value}"
)
stt = await get_stt_service(stt_configuration=stt_configuration)
stt_result = await get_stt_service(stt_configuration=stt_configuration)
else:
# Legacy path: build from scattered fields
stt_language = getattr(configurations, "stt_language", None)
Expand All @@ -149,7 +149,7 @@ async def create_services(
logger.info(f"Using STT language from template: {stt_language}")
if soniox_context:
logger.info("Using Soniox context from template")
stt = await get_stt_service(
stt_result = await get_stt_service(
language_hints=stt_language, soniox_context=soniox_context
)

Expand All @@ -169,7 +169,7 @@ async def create_services(
logger.info(f"Resolved voice config: provider={voice_config.provider.value}")
tts = await get_tts_service(voice_config)

return stt, llm, tts
return stt_result, llm, tts
Comment on lines 170 to +172
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_services() now returns stt_result (an STTServiceResult) rather than a raw STT service, but the docstring still says the tuple contains stt_service. Update the return description (and/or type alias) so call sites don’t misinterpret the first element.

Copilot uses AI. Check for mistakes.


def _wire_user_idle_event(
Expand Down
Loading
Loading