diff --git a/app/ai/voice/agents/breeze_buddy/agent/__init__.py b/app/ai/voice/agents/breeze_buddy/agent/__init__.py index d8ed55adc..b64cf28a2 100644 --- a/app/ai/voice/agents/breeze_buddy/agent/__init__.py +++ b/app/ai/voice/agents/breeze_buddy/agent/__init__.py @@ -9,7 +9,7 @@ from opentelemetry import trace from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.frames.frames import LLMMessagesAppendFrame, TTSSpeakFrame +from pipecat.frames.frames import EndFrame, LLMMessagesAppendFrame, TTSSpeakFrame from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext @@ -83,12 +83,14 @@ ) from app.ai.voice.agents.breeze_buddy.template.vad import create_vad_analyzer from app.ai.voice.agents.breeze_buddy.utils.common import ( + fire_and_forget, track_error, ) from app.ai.voice.agents.breeze_buddy.utils.transport.websockets import ( close_websocket_safely, ) from app.ai.voice.agents.breeze_buddy.utils.warm_transfer import set_transfer_flag +from app.core.config.dynamic import BB_STT_SERVICE from app.core.config.static import ENABLE_BREEZE_BUDDY_TRACING from app.core.logger import logger from app.core.logger.context import ( @@ -102,6 +104,12 @@ ) from app.schemas import CallProvider from app.schemas.breeze_buddy.core import ExecutionMode, LeadCallTracker +from app.services.fallback import ( + BB_FALLBACK_CONFIG, + ServiceFallback, + ServiceFallbackConfig, +) +from app.services.slack import slack_alert DEFAULT_OUTCOME = "BUSY" TTS_SPEAK_MAX_CHARS = 2000 @@ -174,6 +182,12 @@ def __init__( # Error tracking self.errors: List[Dict[str, Any]] = [] + # STT fallback state + self.stt_provider: Optional[str] = None + self._stt_service: Any = None + self._stt_failure_recorded: bool = False + self._mid_call_alert_sent: bool = False + @property def is_daily_mode(self) -> bool: return self.transport_type == TRANSPORT_TYPE_DAILY @@ -273,6 +287,32 @@ async def _handle_post_greeting_idle(self, user_idle_config) -> None: logger.debug("Post-greeting idle timer cancelled.") return + async def _send_mid_call_stt_alert(self) -> None: + """Send Slack alert when STT fails mid-call and call must end.""" + from app.core.config.static import SLACK_TAG_USERS + + _fallback_tag = "@breeze-sentinals" + tag = f"{_fallback_tag},{SLACK_TAG_USERS}" if SLACK_TAG_USERS else _fallback_tag + provider = (self.stt_provider or "unknown").capitalize() + try: + await slack_alert.send( + title="🚨 STT Failed — Call Ended (Breeze Buddy)", + fields=[ + {"name": "Provider", "value": provider}, + {"name": "Call SID", "value": self.call_sid or "unknown"}, + ], + sections=[ + { + "title": "What Happened", + "text": "STT failed mid-call. Call could not continue.", + } + ], + fallback_text=f"STT failed, call ended — {self.call_sid or 'unknown'}", + tag_users=tag, + ) + except Exception as e: + logger.warning(f"Failed to send mid-call STT alert: {e}") + async def _setup_daily_transport(self, runner_args: RunnerArguments) -> None: """Initialize transport for Daily mode.""" if not runner_args or not runner_args.body: @@ -644,7 +684,7 @@ def _register_event_handlers(self) -> None: @self.task.event_handler("on_pipeline_error") async def on_pipeline_error(task, error): - """Capture TTS/STT/LLM pipeline failures.""" + """Handle pipeline errors — record STT failures in circuit breaker and end call.""" processor = getattr(error, "processor", "unknown") error_msg = getattr(error, "error", str(error)) detailed_msg = f"[PIPELINE] {processor}: {error_msg}" @@ -656,6 +696,57 @@ async def on_pipeline_error(task, error): {"processor": str(processor), "error": error_msg}, ) + # Detect STT errors by processor name keywords + processor_str = str(processor).lower() + stt_keywords = ( + "stt", + "soniox", + "deepgram", + "transcri", + "google", + "sarvam", + ) + is_stt_error = any(kw in processor_str for kw in stt_keywords) + + if not is_stt_error: + return + + logger.warning(f"STT error detected from processor: {processor}") + + # Record failure in fallback system (once per call, any STT provider) + if not self._stt_failure_recorded: + self._stt_failure_recorded = True + try: + cfg = await BB_FALLBACK_CONFIG("stt") + if cfg.enabled: + primary_provider = await BB_STT_SERVICE() + fb = ServiceFallback( + ServiceFallbackConfig( + service_name="stt", + failure_threshold=cfg.threshold, + failure_window_secs=cfg.window_secs, + fallback_duration_secs=cfg.duration_secs, + primary_provider_name=primary_provider, + fallback_provider_name=cfg.fallback_provider, + ) + ) + await fb.record_failure( + error_msg=str(error_msg)[:200], + call_sid=self.call_sid or "", + context="mid-call", + ) + except Exception as fb_err: + logger.warning(f"STT fallback record_failure failed: {fb_err}") + + # Alert and end call — no mid-call swap in Phase 1 + if not self._mid_call_alert_sent: + self._mid_call_alert_sent = True + fire_and_forget(self._send_mid_call_stt_alert()) + try: + await task.queue_frames([EndFrame()]) + except Exception as e: + logger.warning(f"Failed to queue EndFrame after STT error: {e}") + @self.transport.event_handler("on_client_connected") async def on_client_connected(transport, client): logger.info(f"Client connected: {client}") @@ -677,6 +768,7 @@ async def on_client_disconnected(transport, client): logger.info( "Cancelling the post greeting task due to client disconnect" ) + await self._handle_unexpected_disconnect("client_disconnected") @self.task.event_handler("on_idle_timeout") @@ -888,17 +980,18 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None: f"Invalid TTS provider '{payload_provider}' in payload, keeping existing config" ) - # Build services and pipeline. Stream mode skips LLM creation and - # runs build_pipeline with mode="stream" (no LLM processor, no - # assistant aggregator, transcript collector inserted, no user idle). - # All other wiring is identical. + # Create services and pipeline + # VAD analyzer is passed to build_pipeline where it's configured inside the + # LLMUserAggregator. This enables UserTurnStrategies (VAD + Transcription fallback). is_stream = self.is_stream_mode - stt, llm, tts = await create_services( + stt_result, llm, tts = await create_services( self.configurations, include_llm=not is_stream ) if not is_stream: assert llm is not None, "LLM is required in agent mode" - + if stt_result is not None: + self.stt_provider = stt_result.provider + self._stt_service = stt_result.service ( pipeline, context, @@ -908,7 +1001,7 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None: self._transcript_collector, ) = await build_pipeline( self.transport, - stt, + stt_result.service if stt_result is not None else None, llm, tts, self.vad_analyzer, diff --git a/app/ai/voice/agents/breeze_buddy/agent/pipeline.py b/app/ai/voice/agents/breeze_buddy/agent/pipeline.py index 243e5de35..59725981b 100644 --- a/app/ai/voice/agents/breeze_buddy/agent/pipeline.py +++ b/app/ai/voice/agents/breeze_buddy/agent/pipeline.py @@ -138,7 +138,7 @@ async def create_services( logger.info( f"Using template STT configuration: provider={stt_configuration.provider.value}" ) - stt = await get_stt_service(stt_configuration=stt_configuration) + stt_result = await get_stt_service(stt_configuration=stt_configuration) else: # Legacy path: build from scattered fields stt_language = getattr(configurations, "stt_language", None) @@ -149,7 +149,7 @@ async def create_services( logger.info(f"Using STT language from template: {stt_language}") if soniox_context: logger.info("Using Soniox context from template") - stt = await get_stt_service( + stt_result = await get_stt_service( language_hints=stt_language, soniox_context=soniox_context ) @@ -169,7 +169,7 @@ async def create_services( logger.info(f"Resolved voice config: provider={voice_config.provider.value}") tts = await get_tts_service(voice_config) - return stt, llm, tts + return stt_result, llm, tts def _wire_user_idle_event( diff --git a/app/ai/voice/agents/breeze_buddy/stt/__init__.py b/app/ai/voice/agents/breeze_buddy/stt/__init__.py index 886fa1647..66939f4e6 100644 --- a/app/ai/voice/agents/breeze_buddy/stt/__init__.py +++ b/app/ai/voice/agents/breeze_buddy/stt/__init__.py @@ -1,13 +1,15 @@ """Breeze Buddy STT service creation. Central routing: accepts a normalized ``STTConfiguration`` from the template -and casts it to the provider-specific builder config. Defaults are baked into -the Pydantic models — no env/dynamic config needed (except API keys). +and casts it to the provider-specific builder config. Wraps the result in +``STTServiceResult`` so callers know which provider was actually used +(may differ from requested if fallback kicked in). """ from __future__ import annotations -from typing import Optional +from dataclasses import dataclass +from typing import Any, Optional from pipecat.transcriptions.language import Language @@ -33,6 +35,7 @@ BB_SARVAM_STT_MODEL, BB_SARVAM_STT_PROMPT, BB_SARVAM_STT_VAD_SIGNALS, + BB_STT_SERVICE, ) from app.core.config.static import ( BREEZE_BUDDY_SONIOX_CONTEXT, @@ -40,7 +43,6 @@ BREEZE_BUDDY_SONIOX_MAX_ENDPOINT_DELAY_MS, BREEZE_BUDDY_SONIOX_MODEL, BREEZE_BUDDY_SONIOX_VAD_FORCE_TURN_ENDPOINT, - BREEZE_BUDDY_STT_SERVICE, DEEPGRAM_API_KEY, GOOGLE_CREDENTIALS_JSON, OPENAI_STT_API_KEY, @@ -50,6 +52,24 @@ SONIOX_API_KEY, ) from app.core.logger import logger +from app.services.fallback import ( + BB_FALLBACK_CONFIG, + ServiceFallback, + ServiceFallbackConfig, +) + + +@dataclass +class STTServiceResult: + """Result of STT service creation. + + ``provider`` reflects the *actual* provider used — may differ from + the requested one if fallback kicked in (e.g. requested soniox but + fallback forced deepgram). + """ + + provider: str + service: Any def _normalize_language(language: str | list[str] | None) -> str | None: @@ -81,18 +101,15 @@ def _deepgram_language(language: str | list[str] | None) -> str: return language -async def create_stt_from_config(config: STTConfiguration): - """Create STT service from normalized STTConfiguration. +async def _build_stt_provider(config: STTConfiguration) -> Any: + """Pure builder: create an STT service from config. No fallback logic. - Central routing: reads ``config.provider`` and casts the normalized - config to the provider-specific builder config. All tuning params - come from the template config with sensible defaults baked in. + Raises on missing API keys or provider init errors — callers handle fallback. """ if config.provider == STTProvider.DEEPGRAM: if not DEEPGRAM_API_KEY: raise ValueError("DEEPGRAM_API_KEY is required for deepgram STT") - # All defaults are in DeepgramSTTConfig — no env/dynamic lookup needed dg = config.deepgram or DeepgramSTTConfig() logger.info("Using Deepgram Nova-3 STT service for Breeze Buddy") @@ -105,7 +122,7 @@ async def create_stt_from_config(config: STTConfiguration): smart_format=dg.smart_format, punctuate=dg.punctuate, endpointing=dg.endpointing_ms, - utterance_end_ms=dg.utterance_end_ms, # None = disabled + utterance_end_ms=dg.utterance_end_ms, interim_results=True, profanity_filter=dg.profanity_filter, numerals=dg.numerals, @@ -180,22 +197,112 @@ async def create_stt_from_config(config: STTConfiguration): return build_google_stt(credentials_json=GOOGLE_CREDENTIALS_JSON) +async def create_stt_from_config(config: STTConfiguration) -> STTServiceResult: + """Create STT service with fallback support. + + When ``BB_FALLBACK`` DevCycle flag has ``stt.enabled: true``: + 1. If fallback is currently active, proactively routes to the fallback provider. + 2. Otherwise, wraps the build in try/except — on init failure, records the + failure and falls back to the configured fallback provider. + When disabled, builds directly without fallback. + """ + cfg = await BB_FALLBACK_CONFIG("stt") + provider_name = config.provider.value + + if not cfg.enabled: + service = await _build_stt_provider(config) + return STTServiceResult(provider=provider_name, service=service) + + # Fallback enabled + fallback_provider = cfg.fallback_provider + primary_provider = await BB_STT_SERVICE() + + def _make_fallback_obj() -> ServiceFallback: + return ServiceFallback( + ServiceFallbackConfig( + service_name="stt", + failure_threshold=cfg.threshold, + failure_window_secs=cfg.window_secs, + fallback_duration_secs=cfg.duration_secs, + primary_provider_name=primary_provider, + fallback_provider_name=fallback_provider, + ) + ) + + # Proactive routing: if fallback is active, skip primary entirely + if provider_name != fallback_provider: + fb = _make_fallback_obj() + if await fb.is_active(): + logger.info( + f"STT fallback active — using {fallback_provider} " + f"instead of {provider_name}" + ) + fallback_config = STTConfiguration( + provider=STTProvider(fallback_provider), + language=config.language, + ) + service = await _build_stt_provider(fallback_config) + # One-time alert that calls are routing through fallback (NX dedup in _activate) + return STTServiceResult(provider=fallback_provider, service=service) + + # Try primary, with init-time fallback on failure + try: + service = await _build_stt_provider(config) + except Exception as primary_err: + # Skip fallback if primary == fallback (nothing to fall to) + if provider_name == fallback_provider: + raise + + logger.error( + f"{provider_name.capitalize()} STT initialization failed, " + f"falling back to {fallback_provider.capitalize()}: {primary_err}" + ) + + # Record failure (increments counter, may activate fallback circuit breaker) + fb = _make_fallback_obj() + await fb.record_failure(error_msg=str(primary_err)[:200], context="init") + + # Try fallback provider + fallback_config = STTConfiguration( + provider=STTProvider(fallback_provider), + language=config.language, + ) + try: + fallback_service = await _build_stt_provider(fallback_config) + logger.info( + f"Successfully initialized {fallback_provider.capitalize()} STT " + f"as fallback for {provider_name.capitalize()} failure" + ) + return STTServiceResult( + provider=fallback_provider, service=fallback_service + ) + except Exception as fallback_err: + logger.error( + f"{fallback_provider.capitalize()} fallback also failed: {fallback_err}. " + f"Original {provider_name.capitalize()} error: {primary_err}" + ) + raise primary_err from fallback_err + + return STTServiceResult(provider=provider_name, service=service) + + async def get_stt_service( language_hints: str | None = None, soniox_context: str | None = None, stt_configuration: Optional[STTConfiguration] = None, -): - """Returns an STT service instance. +) -> STTServiceResult: + """Returns an STTServiceResult wrapping the STT service and actual provider. If ``stt_configuration`` is provided (from template), routes through - :func:`create_stt_from_config`. Otherwise falls back to env-var-based - provider selection (legacy path). + :func:`create_stt_from_config`. Otherwise falls back to dynamic-config-based + provider selection (legacy path — respects fallback overrides via BB_STT_SERVICE). """ # --- New path: template-level STTConfiguration --- if stt_configuration is not None: return await create_stt_from_config(stt_configuration) - # --- Legacy path: env var BREEZE_BUDDY_STT_SERVICE --- + # --- Legacy path: dynamic BB_STT_SERVICE (respects config:override) --- + effective_service = await BB_STT_SERVICE() provider_map = { "soniox": STTProvider.SONIOX, "deepgram": STTProvider.DEEPGRAM, @@ -203,11 +310,12 @@ async def get_stt_service( "openai": STTProvider.OPENAI, "google": STTProvider.GOOGLE, } - provider = provider_map.get(BREEZE_BUDDY_STT_SERVICE, STTProvider.GOOGLE) + provider = provider_map.get(effective_service, STTProvider.GOOGLE) legacy_config = STTConfiguration( provider=provider, language=language_hints, soniox=SonioxSTTConfig(context=soniox_context) if soniox_context else None, ) + return await create_stt_from_config(legacy_config) diff --git a/app/ai/voice/agents/breeze_buddy/utils/common.py b/app/ai/voice/agents/breeze_buddy/utils/common.py index 317c31090..e67ae1f57 100644 --- a/app/ai/voice/agents/breeze_buddy/utils/common.py +++ b/app/ai/voice/agents/breeze_buddy/utils/common.py @@ -1,3 +1,4 @@ +import asyncio import audioop import base64 import json @@ -15,6 +16,16 @@ from app.core.security.sha import calculate_hmac_sha256 from app.services.redis.client import get_redis_service +# Background task references to prevent premature GC of fire-and-forget tasks +_background_tasks: set[asyncio.Task] = set() + + +def fire_and_forget(coro) -> None: + """Schedule a coroutine as a fire-and-forget task, preventing GC.""" + task = asyncio.create_task(coro) + _background_tasks.add(task) + task.add_done_callback(_background_tasks.discard) + def track_error( errors: Optional[List[Dict[str, Any]]], diff --git a/app/ai/voice/stt/soniox/config.py b/app/ai/voice/stt/soniox/config.py index 8bfa97e5d..9c6bc1b15 100644 --- a/app/ai/voice/stt/soniox/config.py +++ b/app/ai/voice/stt/soniox/config.py @@ -39,6 +39,7 @@ class SonioxConfig: client_reference_id: Optional[str] = None log_context: str = "Soniox" language_hints_strict: bool = False + reconnect_on_error: bool = True def _parse_soniox_context( @@ -169,4 +170,5 @@ def build_soniox_stt(config: SonioxConfig): settings=soniox_settings, vad_force_turn_endpoint=config.vad_force_turn_endpoint, max_endpoint_delay_ms=config.max_endpoint_delay_ms, + reconnect_on_error=config.reconnect_on_error, ) diff --git a/app/core/config/dynamic.py b/app/core/config/dynamic.py index 658e58ce3..e3132a914 100644 --- a/app/core/config/dynamic.py +++ b/app/core/config/dynamic.py @@ -403,6 +403,44 @@ async def BB_ENABLE_ELEVENLABS_INDIAN_RESIDENCY() -> bool: return await get_config("BB_ENABLE_ELEVENLABS_INDIAN_RESIDENCY", True, bool) +# --- Breeze Buddy STT Service & Fallback Configuration --- +async def BB_STT_SERVICE() -> str: + """Returns BB_STT_SERVICE from Redis (soniox, deepgram, sarvam, openai, google)""" + return await get_config("BB_STT_SERVICE", "soniox", str) + + +async def BB_FALLBACK_RAW_CONFIG(service: str) -> dict: + """Return the raw fallback config dict for the given service from Redis. + + Reads the ``BB_FALLBACK`` DevCycle/Redis flag (a JSON string encoding an + object keyed by service name) and returns the service sub-dict. + + Example flag value:: + + { + "stt": { + "enabled": true, + "fallback_provider": "deepgram", + "threshold": 2, + "duration_secs": 1800, + "window_secs": 240 + } + } + + Returns an empty dict when the flag is absent or the service key is missing. + Consumers (e.g. ``app.services.fallback``) apply defaults and type the result. + """ + redis_json = await get_config("BB_FALLBACK", None, str) + if redis_json: + try: + raw = json.loads(redis_json) + if isinstance(raw, dict): + return raw.get(service, {}) + except (json.JSONDecodeError, TypeError) as e: + logger.warning(f"Failed to parse BB_FALLBACK from Redis: {e}") + return {} + + # --- Breeze Buddy Transfer Configuration --- async def BB_TRANSFER_CONFERENCE_TIMEOUT() -> int: """Seconds to wait for agent to join conference""" diff --git a/app/main.py b/app/main.py index d46eb89db..477680427 100644 --- a/app/main.py +++ b/app/main.py @@ -70,6 +70,7 @@ from app.schemas import ( AutomaticVoiceUserConnectRequest, ) +from app.services.fallback import initialize_fallback_tasks from app.services.langfuse.tasks.task import initialize_langfuse_tasks from app.services.redis import ( close_redis_connections, @@ -180,6 +181,10 @@ async def lifespan(_app: FastAPI): func=end_idle_chat_sessions, interval_seconds=CHAT_SESSION_END_TIMEOUT_LOOP_INTERVAL_SECONDS, ) + # Initialize STT fallback reset tasks + await initialize_fallback_tasks(_background_scheduler) + + ### Register new tasks here # Start the scheduler only if tasks are registered if _background_scheduler.tasks: diff --git a/app/services/fallback/__init__.py b/app/services/fallback/__init__.py new file mode 100644 index 000000000..ab1e0cdfe --- /dev/null +++ b/app/services/fallback/__init__.py @@ -0,0 +1,469 @@ +"""Generic Redis-backed service fallback. + +State machine: + NORMAL -- failure_count >= threshold --> FALLBACK_ACTIVE + FALLBACK_ACTIVE -- TTL expires after duration_secs --> NORMAL + NORMAL -- failures again --> FALLBACK_ACTIVE (repeat cycle) + +Redis keys: + fallback:config:{service} - JSON config (permanent, managed externally) + fallback:{service}:failure_count - rolling failure counter with TTL=window_secs + fallback:{service}:active - flag with TTL=duration_secs (self-expires) + fallback:{service}:notified - NX dedup for one-time activation alert + fallback:{service}:alerted:{count} - NX dedup so only one pod fires per-failure alert + buddy:{service}:health:{provider} - "unhealthy" marker for monitoring (TTL=duration_secs) + +Consumers check ``is_active()`` to decide whether to route to the fallback provider. +The active key self-expires after exactly duration_secs from activation time. +The background reset task fires the reset Slack alert when it detects expiry. +""" + +from dataclasses import dataclass + +from app.core.background_tasks import BackgroundTaskScheduler +from app.core.config.dynamic import BB_FALLBACK_RAW_CONFIG, BB_STT_SERVICE +from app.core.config.static import SLACK_TAG_USERS +from app.core.logger import logger +from app.services.redis.client import get_redis_service +from app.services.slack import slack_alert + +# Slack tag used for all fallback alerts +_FALLBACK_TAG = "@breeze-sentinals" +_ALERT_TAG = f"{_FALLBACK_TAG},{SLACK_TAG_USERS}" if SLACK_TAG_USERS else _FALLBACK_TAG + +# Lua script: atomically INCR the failure counter and set TTL on first write. +# Returns the new count. Using Lua ensures INCR + EXPIRE are one indivisible +# operation — eliminates the race where another pod deletes the key between +# the two calls and the EXPIRE lands on a brand-new counter with no TTL. +_LUA_INCR_WITH_EXPIRE = """ +local count = redis.call('INCR', KEYS[1]) +if count == 1 then + redis.call('EXPIRE', KEYS[1], ARGV[1]) +end +return count +""" + + +@dataclass +class FallbackSettings: + """Typed config for a single service's fallback behaviour. + + Sourced from the ``BB_FALLBACK`` DevCycle/Redis flag, keyed by service name. + + Attributes: + enabled: Whether fallback is active for this service. + fallback_provider: Provider to route to when primary fails. + threshold: Failure count within window_secs to trip the circuit. + duration_secs: How long the fallback stays active before auto-reset. + window_secs: Rolling window for the failure counter TTL. + """ + + enabled: bool = False + fallback_provider: str = "" + threshold: int = 2 + duration_secs: int = 1800 + window_secs: int = 240 + + +# Per-service sensible defaults for fallback_provider. +# Add "tts" and "llm" entries in their respective future PRs. +_FALLBACK_DEFAULTS: dict[str, FallbackSettings] = { + "stt": FallbackSettings(fallback_provider="deepgram"), + # "tts": FallbackSettings(fallback_provider="cartesia"), # future PR + # "llm": FallbackSettings(fallback_provider="openai"), # future PR +} + + +async def BB_FALLBACK_CONFIG(service: str) -> FallbackSettings: + """Return typed fallback config for the given service. + + Merges the raw Redis dict from ``BB_FALLBACK_RAW_CONFIG`` with the + per-service defaults defined in ``_FALLBACK_DEFAULTS``. Callers always + receive a valid ``FallbackSettings`` — never ``None`` or a bare dict. + """ + defaults = _FALLBACK_DEFAULTS.get(service, FallbackSettings()) + service_cfg = await BB_FALLBACK_RAW_CONFIG(service) + if not service_cfg: + return defaults + return FallbackSettings( + enabled=service_cfg.get("enabled", defaults.enabled), + fallback_provider=service_cfg.get( + "fallback_provider", defaults.fallback_provider + ), + threshold=service_cfg.get("threshold", defaults.threshold), + duration_secs=service_cfg.get("duration_secs", defaults.duration_secs), + window_secs=service_cfg.get("window_secs", defaults.window_secs), + ) + + +@dataclass +class ServiceFallbackConfig: + """Configuration for a Redis-backed service fallback. + + Attributes: + service_name: Identifier used in Redis keys and logs (e.g., "stt") + failure_threshold: Number of failures within window to activate fallback + failure_window_secs: Sliding window for failure counter TTL + fallback_duration_secs: How long fallback stays active + primary_provider_name: Human-readable primary provider name (for alerts/logs) + fallback_provider_name: Human-readable fallback provider name (for alerts/logs) + """ + + service_name: str + failure_threshold: int = 2 + failure_window_secs: int = 240 + fallback_duration_secs: int = 1800 + primary_provider_name: str = "primary" + fallback_provider_name: str = "fallback" + + +class ServiceFallback: + """Generic Redis-backed service fallback. + + On activation, sets a Redis flag. Consumers check ``is_active()`` to route + to the fallback provider. A background task clears the flag on schedule. + + Inline Slack alerts are sent at each key lifecycle event: + - Each failure: ⚠️ STT Failure on {Provider} ({count}/{threshold}) + - Activation: 🔴 {Service} Fallback Activated — {Provider} + - Reset: ✅ {Service} Fallback Reset — Back to {Provider} + """ + + def __init__(self, config: ServiceFallbackConfig): + self.config = config + self._key_failure_count = f"fallback:{config.service_name}:failure_count" + self._key_active = f"fallback:{config.service_name}:active" + self._key_notified = f"fallback:{config.service_name}:notified" + self._key_alerted_prefix = f"fallback:{config.service_name}:alerted" + self._key_health_prefix = f"buddy:{config.service_name}:health" + # Persistent sentinel set by the background poller while fallback is + # active. When the poller next runs and finds is_active() False but + # this sentinel present, it knows the TTL just expired and fires the + # reset alert. No TTL — cleared explicitly on reset. + self._key_seen_active = f"fallback:{config.service_name}:seen_active" + + # ------------------------------------------------------------------ + # Inline alert helpers + # ------------------------------------------------------------------ + + async def _send_failure_alert(self, count: int, error_msg: str) -> None: + provider = self.config.primary_provider_name.capitalize() + threshold = self.config.failure_threshold + try: + await slack_alert.send( + title=f"⚠️ STT Failure on {provider} ({count}/{threshold})", + fields=[{"name": "Fail Count", "value": f"{count}/{threshold}"}], + sections=( + [{"title": "Error", "text": f"```{error_msg}```"}] + if error_msg + else [] + ), + fallback_text=f"STT failure on {provider} ({count}/{threshold})", + tag_users=_ALERT_TAG, + ) + except Exception as e: + logger.warning( + f"Service fallback ({self.config.service_name}) " + f"failure alert failed: {e}" + ) + + async def _send_activation_alert(self, cooldown_min: int) -> None: + primary = self.config.primary_provider_name.capitalize() + fallback = self.config.fallback_provider_name.capitalize() + service = self.config.service_name.upper() + try: + await slack_alert.send( + title=f"🔴 {service} Fallback Activated — {primary}", + fields=[{"name": "Duration", "value": f"{cooldown_min} minutes"}], + sections=[ + { + "title": "What Happened", + "text": ( + f"{primary} {service} hit {self.config.failure_threshold} failures. " + f"All new calls will use {fallback} for {cooldown_min} minutes. " + f"{primary} will be retried automatically after the duration expires." + ), + } + ], + fallback_text=f"{service} fallback activated — {fallback} for {cooldown_min} min", + tag_users=_ALERT_TAG, + ) + except Exception as e: + logger.warning( + f"Service fallback ({self.config.service_name}) " + f"activation alert failed: {e}" + ) + + async def _send_reset_alert(self) -> None: + primary = self.config.primary_provider_name.capitalize() + service = self.config.service_name.upper() + try: + await slack_alert.send( + title=f"✅ {service} Fallback Reset — Back to {primary}", + sections=[ + { + "title": "What Happened", + "text": ( + f"{service} fallback duration expired. " + f"Calls are back on primary {primary} provider. " + "Normal operation resumed." + ), + } + ], + fallback_text=f"{service} fallback reset — back to {primary}", + tag_users=_ALERT_TAG, + ) + except Exception as e: + logger.warning( + f"Service fallback ({self.config.service_name}) " + f"reset alert failed: {e}" + ) + + # ------------------------------------------------------------------ + # Health tracking + # ------------------------------------------------------------------ + + async def _mark_provider_unhealthy(self, redis, provider: str) -> None: + """Set a monitoring health key for the given provider.""" + try: + health_key = f"{self._key_health_prefix}:{provider}" + await redis.set( + health_key, + "unhealthy", + ex=self.config.fallback_duration_secs, + ) + except Exception as e: + logger.warning( + f"Service fallback ({self.config.service_name}) " + f"health mark failed: {e}" + ) + + async def _clear_provider_health(self, redis, provider: str) -> None: + """Clear the monitoring health key for the given provider.""" + try: + health_key = f"{self._key_health_prefix}:{provider}" + await redis.delete(health_key) + except Exception as e: + logger.warning( + f"Service fallback ({self.config.service_name}) " + f"health clear failed: {e}" + ) + + # ------------------------------------------------------------------ + # Core state machine + # ------------------------------------------------------------------ + + async def record_failure( + self, + error_msg: str = "", + call_sid: str = "", + context: str = "unknown", + ) -> bool: + """Increment failure count. Activate fallback if threshold reached. + + Returns: + True if this failure caused fallback activation, False otherwise. + """ + try: + redis = await get_redis_service() + + # Atomically increment and set TTL on first write via Lua script. + count = await redis.run_script( + _LUA_INCR_WITH_EXPIRE, + keys=[self._key_failure_count], + args=[self.config.failure_window_secs], + ) + if count is None: + count = await redis.incr(self._key_failure_count) + + logger.info( + f"Service fallback ({self.config.service_name}): " + f"failure {count}/{self.config.failure_threshold}" + ) + + # Per-failure alert — deduplicated with NX so only the first pod fires. + alert_dedup_key = f"{self._key_alerted_prefix}:{count}" + is_first = await redis.set( + alert_dedup_key, + "1", + nx=True, + ex=self.config.failure_window_secs, + ) + if is_first: + await self._send_failure_alert( + count=count, + error_msg=error_msg[:500] if error_msg else "", + ) + + if count >= self.config.failure_threshold: + await self._activate(redis) + return True + return False + except Exception as e: + logger.error( + f"Service fallback ({self.config.service_name}) " + f"record_failure failed: {e}" + ) + return False + + async def _activate(self, redis) -> None: + """Transition to FALLBACK_ACTIVE: set Redis flag. + + TTL = fallback_duration_secs ensures the key self-expires after exactly + the configured cooldown. NX ensures only one pod activates. + """ + newly_set = await redis.set( + self._key_active, "1", nx=True, ex=self.config.fallback_duration_secs + ) + if not newly_set: + return + + # Clear failure counter + await redis.delete(self._key_failure_count) + + # Mark primary provider unhealthy for monitoring + await self._mark_provider_unhealthy( + redis, self.config.primary_provider_name.lower() + ) + + cooldown_min = self.config.fallback_duration_secs // 60 + logger.warning( + f"Service fallback ({self.config.service_name}) ACTIVATED " + f"(duration={self.config.fallback_duration_secs}s, " + f"fallback={self.config.fallback_provider_name})" + ) + + # Activation alert — deduplicated with NX so only one pod fires. + notified = await redis.set( + self._key_notified, + "1", + nx=True, + ex=self.config.fallback_duration_secs, + ) + if notified: + await self._send_activation_alert(cooldown_min=cooldown_min) + + async def is_active(self) -> bool: + """Check if fallback is currently active.""" + try: + redis = await get_redis_service() + return bool(await redis.exists(self._key_active)) + except Exception as e: + logger.error( + f"Service fallback ({self.config.service_name}) " + f"is_active check failed: {e}" + ) + return False + + async def reset_to_primary(self) -> None: + """Reset to primary: clear fallback flag and send reset alert.""" + try: + redis = await get_redis_service() + + await redis.delete(self._key_active) + await redis.delete(self._key_failure_count) + await redis.delete(self._key_notified) + + # Clear health mark for primary provider + await self._clear_provider_health( + redis, self.config.primary_provider_name.lower() + ) + + logger.info( + f"Service fallback ({self.config.service_name}) reset to primary" + ) + + await self._send_reset_alert() + except Exception as e: + logger.error( + f"Service fallback ({self.config.service_name}) " + f"reset_to_primary failed: {e}" + ) + + async def notify_on_expiry(self) -> None: + """Poll-friendly check: send reset alert exactly when TTL expires. + + Called by the background task on a short interval (e.g. 60 s). + + - If fallback is still active: mark sentinel and return. + - If fallback just expired (sentinel set, key gone): send reset alert + and clear sentinel so the alert fires only once. + - If neither: nothing to do (sentinel not set means we never saw it + active in this server lifetime). + """ + try: + redis = await get_redis_service() + active = bool(await redis.exists(self._key_active)) + + if active: + # Still within the fallback window — record that we've seen it. + await redis.set(self._key_seen_active, "1") + return + + # Not active — check if it *was* active (sentinel present). + seen = bool(await redis.exists(self._key_seen_active)) + if not seen: + return # Never activated during this server lifetime. + + # TTL just expired — clear sentinel and fire the reset alert. + await redis.delete(self._key_seen_active) + await self._clear_provider_health( + redis, self.config.primary_provider_name.lower() + ) + logger.info( + f"Service fallback ({self.config.service_name}) TTL expired — " + "sending reset alert" + ) + await self._send_reset_alert() + except Exception as e: + logger.error( + f"Service fallback ({self.config.service_name}) " + f"notify_on_expiry failed: {e}" + ) + + +# --------------------------------------------------------------------------- +# STT Fallback Background Task +# --------------------------------------------------------------------------- + + +async def check_and_reset_stt_fallback() -> None: + """Poll STT fallback state and fire the reset alert when TTL expires. + + Runs every 60 s. Delegates to ``ServiceFallback.notify_on_expiry()`` + which sets a sentinel while the fallback is active and fires the ✅ reset + alert the first time it observes the active key has expired. The Redis + TTL on the active key remains the sole source of truth for when routing + reverts to primary — this task only handles the Slack notification. + """ + try: + cfg = await BB_FALLBACK_CONFIG("stt") + primary_provider = await BB_STT_SERVICE() + fallback = ServiceFallback( + ServiceFallbackConfig( + service_name="stt", + failure_threshold=cfg.threshold, + failure_window_secs=cfg.window_secs, + fallback_duration_secs=cfg.duration_secs, + primary_provider_name=primary_provider, + fallback_provider_name=cfg.fallback_provider, + ) + ) + await fallback.notify_on_expiry() + except Exception as e: + logger.error(f"STT fallback reset task failed: {e}") + + +async def initialize_fallback_tasks(scheduler: BackgroundTaskScheduler) -> None: + """Register STT fallback reset task if fallback is enabled.""" + cfg = await BB_FALLBACK_CONFIG("stt") + if not cfg.enabled: + logger.info("STT fallback disabled — skipping fallback task registration") + return + + scheduler.register_task( + name="stt_fallback_reset", + func=check_and_reset_stt_fallback, + interval_seconds=60, + ) + logger.info("Registered STT fallback reset task (interval=60s)") diff --git a/app/services/live_config/utils.py b/app/services/live_config/utils.py index ac3eb218b..706d4abbd 100644 --- a/app/services/live_config/utils.py +++ b/app/services/live_config/utils.py @@ -35,6 +35,15 @@ def convert_type(value: Any, target_type: type) -> Any: return value str_value = str(value) if value else "" return [item.strip() for item in str_value.split(",") if item.strip()] + elif target_type == dict: + if isinstance(value, dict): + return value + try: + import json + + return json.loads(value) + except (ValueError, TypeError): + return None else: # str return str(value) diff --git a/app/services/slack/alert.py b/app/services/slack/alert.py index 40b65903d..424852382 100644 --- a/app/services/slack/alert.py +++ b/app/services/slack/alert.py @@ -29,6 +29,7 @@ async def send( links: Optional[List[Dict[str, str]]] = None, fallback_text: Optional[str] = None, include_tags: bool = True, + tag_users: Optional[str] = None, ) -> bool: """ Generic function to send Slack alerts with customizable content. @@ -41,6 +42,8 @@ async def send( fallback_text: Optional fallback text for notifications (defaults to title) include_tags: Whether to include @mention tags (default True). Set to False to suppress tagging and reduce Slack notification noise. + tag_users: Optional comma-separated users to tag. Overrides SLACK_TAG_USERS + when provided. Only used when include_tags is True. Returns: True if sent successfully, False otherwise @@ -102,10 +105,15 @@ async def send( ) # Add notifications section if users are configured for tagging - if include_tags and SLACK_TAG_USERS: + effective_tag_users = ( + tag_users if tag_users is not None else SLACK_TAG_USERS + ) + if include_tags and effective_tag_users: # Parse comma-separated usernames and filter out empty ones users = [ - user.strip() for user in SLACK_TAG_USERS.split(",") if user.strip() + user.strip() + for user in effective_tag_users.split(",") + if user.strip() ] if users: # Format users as proper Slack mentions