From 122e9093abe193b137b47a7af6030cf1d3db207a Mon Sep 17 00:00:00 2001 From: Swetha S Date: Thu, 30 Apr 2026 13:56:33 +0530 Subject: [PATCH] feat: auto pause and system check --- .../agents/breeze_buddy/agent/__init__.py | 43 +++ .../agents/breeze_buddy/managers/calls.py | 9 + app/core/config/dynamic.py | 19 ++ app/main.py | 5 + app/services/fallback/__init__.py | 289 ++++++++++++++++++ app/services/service_health/__init__.py | 16 + app/services/service_health/monitor.py | 220 +++++++++++++ app/services/service_health/rules.json | 34 +++ 8 files changed, 635 insertions(+) create mode 100644 app/services/fallback/__init__.py create mode 100644 app/services/service_health/__init__.py create mode 100644 app/services/service_health/monitor.py create mode 100644 app/services/service_health/rules.json diff --git a/app/ai/voice/agents/breeze_buddy/agent/__init__.py b/app/ai/voice/agents/breeze_buddy/agent/__init__.py index 887f39b19..7543cb080 100644 --- a/app/ai/voice/agents/breeze_buddy/agent/__init__.py +++ b/app/ai/voice/agents/breeze_buddy/agent/__init__.py @@ -65,6 +65,13 @@ from app.ai.voice.agents.breeze_buddy.services.telephony.base_provider import ( VoiceCallProvider, ) +from app.ai.voice.agents.breeze_buddy.stt.fallback import ( + ALERT_STT_TERMINAL_FAILURE, + STT_FALLBACK_SLACK_TAG, + record_stt_failure, + send_templated_alert, +) +from app.services.service_health import service_health_monitor from app.ai.voice.agents.breeze_buddy.template import TemplateContext from app.ai.voice.agents.breeze_buddy.template.builder import FlowConfigBuilder from app.ai.voice.agents.breeze_buddy.template.context import with_context @@ -603,6 +610,42 @@ async def on_pipeline_error(task, error): {"processor": str(processor), "error": error_msg}, ) + # Detect STT errors by processor name keywords + processor_str = str(processor).lower() + stt_keywords = ( + "stt", + "soniox", + "deepgram", + "transcri", + "google", + "sarvam", + ) + is_stt_error = any(kw in processor_str for kw in stt_keywords) + + if not is_stt_error: + return + + logger.warning(f"STT error detected from processor: {processor}") + + # Record failure in fallback system (once per call, Soniox only) + if self.stt_provider == "soniox" and not self._stt_failure_recorded: + self._stt_failure_recorded = True + try: + await record_stt_failure( + error_msg=str(error_msg)[:200], + call_sid=self.call_sid or "", + context="mid-call", + ) + except Exception as fb_err: + logger.warning(f"STT fallback record_failure failed: {fb_err}") + + # Alert and end call — no mid-call swap in Phase 1 + fire_and_forget(self._send_mid_call_stt_alert()) + try: + await task.queue_frames([EndFrame()]) + except Exception: + pass + @self.transport.event_handler("on_client_connected") async def on_client_connected(transport, client): logger.info(f"Client connected: {client}") diff --git a/app/ai/voice/agents/breeze_buddy/managers/calls.py b/app/ai/voice/agents/breeze_buddy/managers/calls.py index 9d2eb5b64..e7fe098bc 100644 --- a/app/ai/voice/agents/breeze_buddy/managers/calls.py +++ b/app/ai/voice/agents/breeze_buddy/managers/calls.py @@ -69,6 +69,7 @@ ) from app.services.gcp.storage.storage import upload_file_to_gcs from app.services.redis.client import get_redis_service +from app.services.service_health import service_health_monitor async def _get_lead_config(lead: LeadCallTracker) -> Optional[CallExecutionConfig]: @@ -456,6 +457,14 @@ async def process_backlog_leads(): await release_lock_on_lead_by_id(locked_lead.id) continue + # Check global service health pause (circuit breaker pattern) + if await service_health_monitor.is_globally_paused(): + logger.info( + f"Skipping lead {locked_lead.id} - calls are globally paused due to service health" + ) + await release_lock_on_lead_by_id(locked_lead.id) + continue + customer_phone = (locked_lead.payload or {}).get( "customer_mobile_number" ) diff --git a/app/core/config/dynamic.py b/app/core/config/dynamic.py index 68ecc15aa..884fe122a 100644 --- a/app/core/config/dynamic.py +++ b/app/core/config/dynamic.py @@ -341,3 +341,22 @@ async def OUTBOUND_RATE_LIMIT_WINDOW_SECONDS() -> int: async def OUTBOUND_RATE_LIMIT_BLOCK_ENABLED() -> bool: """Returns OUTBOUND_RATE_LIMIT_BLOCK_ENABLED from Redis""" return await get_config("OUTBOUND_RATE_LIMIT_BLOCK_ENABLED", False, bool) + + +# --- Service Health Monitoring Configuration --- +async def ENABLE_SERVICE_HEALTH_MONITORING() -> bool: + """Returns ENABLE_SERVICE_HEALTH_MONITORING from Redis. + + When True, service health monitoring is active and will auto-pause + calls when upstream service failures exceed thresholds. + """ + return await get_config("ENABLE_SERVICE_HEALTH_MONITORING", True, bool) + + +async def SERVICE_HEALTH_AUTO_RESUME_MINUTES() -> int: + """Returns SERVICE_HEALTH_AUTO_RESUME_MINUTES from Redis. + + Number of minutes with no errors before auto-resuming calls + after a circuit breaker opens. + """ + return await get_config("SERVICE_HEALTH_AUTO_RESUME_MINUTES", 15, int) diff --git a/app/main.py b/app/main.py index 8d8e020c3..50da784f5 100644 --- a/app/main.py +++ b/app/main.py @@ -65,6 +65,8 @@ from app.schemas import ( AutomaticVoiceUserConnectRequest, ) +from app.services.fallback import initialize_fallback_tasks +from app.services.service_health import initialize_service_health_tasks from app.services.langfuse.tasks.task import initialize_langfuse_tasks from app.services.redis import ( close_redis_connections, @@ -167,6 +169,9 @@ async def lifespan(_app: FastAPI): # Initialize Langfuse tasks (if configured) await initialize_langfuse_tasks(_background_scheduler) + # Initialize STT fallback reset tasks + await initialize_fallback_tasks(_background_scheduler) + ### Register new tasks here # Start the scheduler only if tasks are registered diff --git a/app/services/fallback/__init__.py b/app/services/fallback/__init__.py new file mode 100644 index 000000000..63de88a59 --- /dev/null +++ b/app/services/fallback/__init__.py @@ -0,0 +1,289 @@ +"""Generic Redis-backed service fallback. + +State machine: + NORMAL -- failure_count >= threshold --> FALLBACK_ACTIVE + FALLBACK_ACTIVE -- TTL expires after duration_secs --> NORMAL + NORMAL -- failures again --> FALLBACK_ACTIVE (repeat cycle) + +Redis keys: + fallback:{service_name}:failure_count - rolling failure counter with TTL window + fallback:{service_name}:active - flag with TTL=fallback_duration_secs (self-expires) + fallback:{service_name}:notified - NX dedup for one-time activation alert + fallback:{service_name}:alerted:{count} - NX dedup so only one pod fires per-failure alert + +Consumers check ``is_active()`` to decide whether to route to the fallback provider. +The active key self-expires after exactly fallback_duration_secs from activation time. +The background reset task is a secondary mechanism that fires the on_reset_alert callback. +""" + +from dataclasses import dataclass +from typing import Awaitable, Callable, Optional + +from app.core.background_tasks import BackgroundTaskScheduler +from app.core.config.dynamic import ( + BB_STT_FALLBACK_DURATION_SECS, + BB_STT_FALLBACK_PROVIDER, + BB_STT_FALLBACK_THRESHOLD, + BB_STT_FALLBACK_WINDOW_SECS, + ENABLE_BB_STT_FALLBACK, +) +from app.core.logger import logger +from app.services.redis.client import get_redis_service + +# Type alias for alert callback. +AlertCallback = Callable[..., Awaitable[None]] + +# Lua script: atomically INCR the failure counter and set TTL on first write. +# Returns the new count. Using Lua ensures INCR + EXPIRE are one indivisible +# operation — eliminates the race where another pod deletes the key between +# the two calls and the EXPIRE lands on a brand-new counter with no TTL. +_LUA_INCR_WITH_EXPIRE = """ +local count = redis.call('INCR', KEYS[1]) +if count == 1 then + redis.call('EXPIRE', KEYS[1], ARGV[1]) +end +return count +""" + + +@dataclass +class ServiceFallbackConfig: + """Configuration for a Redis-backed service fallback. + + Attributes: + service_name: Identifier used in Redis keys and logs (e.g., "stt") + failure_threshold: Number of failures within window to activate fallback + failure_window_secs: Sliding window for failure counter TTL + fallback_duration_secs: How long fallback stays active (background task interval) + fallback_provider_name: Human-readable name for the fallback provider (for alerts/logs) + key_prefix: Prefix for Redis keys (default: "fallback"). Use "circuit" or "health" for non-fallback use cases. + on_failure_alert: Callback fired on each failure + on_trip_alert: Callback fired when fallback activates + on_reset_alert: Callback fired when background task resets to primary + """ + + service_name: str + failure_threshold: int = 2 + failure_window_secs: int = 240 + fallback_duration_secs: int = 1800 + fallback_provider_name: str = "fallback" + key_prefix: str = "fallback" + on_failure_alert: Optional[AlertCallback] = None + on_trip_alert: Optional[AlertCallback] = None + on_reset_alert: Optional[AlertCallback] = None + + +class ServiceFallback: + """Generic Redis-backed service fallback. + + On activation, sets a Redis flag. Consumers check ``is_active()`` to route + to the fallback provider. A background task clears the flag on schedule. + """ + + def __init__(self, config: ServiceFallbackConfig): + self.config = config + prefix = config.key_prefix + self._key_failure_count = f"{prefix}:{config.service_name}:failure_count" + self._key_active = f"{prefix}:{config.service_name}:active" + self._key_notified = f"{prefix}:{config.service_name}:notified" + self._key_alerted_prefix = f"{prefix}:{config.service_name}:alerted" + + async def record_failure( + self, + error_msg: str = "", + call_sid: str = "", + context: str = "unknown", + ) -> bool: + """Increment failure count. Activate fallback if threshold reached. + + Returns: + True if this failure caused fallback activation, False otherwise. + """ + try: + redis = await get_redis_service() + + # Atomically increment and set TTL on first write via Lua script. + # Eliminates the INCR+EXPIRE race where the key could be deleted + # between the two calls, leaving the new counter with no TTL. + count = await redis.run_script( + _LUA_INCR_WITH_EXPIRE, + keys=[self._key_failure_count], + args=[self.config.failure_window_secs], + ) + if count is None: + # Lua script failed (logged inside run_script); fall back to + # non-atomic path so failures are never silently swallowed. + count = await redis.incr(self._key_failure_count) + + logger.info( + f"Service fallback ({self.config.service_name}): " + f"failure {count}/{self.config.failure_threshold}" + ) + + # Per-failure alert — deduplicated with NX so only the first pod + # to reach count=N fires the Slack alert. Without this, every pod + # that records the same failure would send its own alert. + if self.config.on_failure_alert: + alert_dedup_key = f"{self._key_alerted_prefix}:{count}" + is_first = await redis.set( + alert_dedup_key, + "1", + nx=True, + ex=self.config.failure_window_secs, + ) + if is_first: + try: + await self.config.on_failure_alert( + count=count, + threshold=self.config.failure_threshold, + error_msg=error_msg[:500] if error_msg else "", + call_sid=call_sid, + context=context, + service_name=self.config.service_name, + ) + except Exception as alert_err: + logger.warning( + f"Service fallback ({self.config.service_name}) " + f"failure alert failed: {alert_err}" + ) + + if count >= self.config.failure_threshold: + await self._activate(redis) + return True + return False + except Exception as e: + logger.error( + f"Service fallback ({self.config.service_name}) " + f"record_failure failed: {e}" + ) + return False + + async def _activate(self, redis) -> None: + """Transition to FALLBACK_ACTIVE: set Redis flag. + + TTL = fallback_duration_secs ensures the key self-expires after exactly + the configured cooldown, measured from activation time (not server start). + This also prevents permanent stuck-fallback if the reset task is never + registered (e.g. flag was off at startup and flipped later). + """ + # NX ensures only one pod activates — others skip. + # EX ensures the key self-expires after exactly fallback_duration_secs, + # regardless of whether the background reset task runs. + newly_set = await redis.set( + self._key_active, "1", nx=True, ex=self.config.fallback_duration_secs + ) + if not newly_set: + return + + # Clear failure counter + await redis.delete(self._key_failure_count) + + cooldown_min = self.config.fallback_duration_secs // 60 + logger.warning( + f"Service fallback ({self.config.service_name}) ACTIVATED " + f"(duration={self.config.fallback_duration_secs}s, " + f"fallback={self.config.fallback_provider_name})" + ) + + # Trip alert + if self.config.on_trip_alert: + try: + await self.config.on_trip_alert( + service_name=self.config.service_name, + fallback_name=self.config.fallback_provider_name, + threshold=self.config.failure_threshold, + cooldown_min=cooldown_min, + ) + except Exception as alert_err: + logger.warning( + f"Service fallback ({self.config.service_name}) " + f"trip alert failed: {alert_err}" + ) + + async def is_active(self) -> bool: + """Check if fallback is currently active.""" + try: + redis = await get_redis_service() + return bool(await redis.exists(self._key_active)) + except Exception as e: + logger.error( + f"Service fallback ({self.config.service_name}) " + f"is_active check failed: {e}" + ) + return False + + async def reset_to_primary(self) -> None: + """Reset to primary: clear fallback flag.""" + try: + redis = await get_redis_service() + + # Clear fallback flag + await redis.delete(self._key_active) + # Clear failure counter + await redis.delete(self._key_failure_count) + # Clear notification dedup key + await redis.delete(self._key_notified) + + logger.info( + f"Service fallback ({self.config.service_name}) " f"reset to primary" + ) + + # Reset alert + if self.config.on_reset_alert: + try: + await self.config.on_reset_alert( + service_name=self.config.service_name, + ) + except Exception as alert_err: + logger.warning( + f"Service fallback ({self.config.service_name}) " + f"reset alert failed: {alert_err}" + ) + except Exception as e: + logger.error( + f"Service fallback ({self.config.service_name}) " + f"reset_to_primary failed: {e}" + ) + + +# --------------------------------------------------------------------------- +# STT Fallback Background Task +# --------------------------------------------------------------------------- + + +async def check_and_reset_stt_fallback() -> None: + """Check if STT fallback is active and reset to primary if so.""" + try: + fallback_provider = await BB_STT_FALLBACK_PROVIDER() + fallback = ServiceFallback( + ServiceFallbackConfig( + service_name="stt", + failure_threshold=await BB_STT_FALLBACK_THRESHOLD(), + failure_window_secs=await BB_STT_FALLBACK_WINDOW_SECS(), + fallback_duration_secs=await BB_STT_FALLBACK_DURATION_SECS(), + fallback_provider_name=fallback_provider, + ) + ) + if not await fallback.is_active(): + return + + logger.info("STT fallback active — resetting to primary provider") + await fallback.reset_to_primary() + except Exception as e: + logger.error(f"STT fallback reset task failed: {e}") + + +async def initialize_fallback_tasks(scheduler: BackgroundTaskScheduler) -> None: + """Register STT fallback reset task if fallback is enabled.""" + fallback_enabled = await ENABLE_BB_STT_FALLBACK() + if not fallback_enabled: + logger.info("STT fallback disabled — skipping fallback task registration") + return + + duration_secs = await BB_STT_FALLBACK_DURATION_SECS() + scheduler.register_task( + name="stt_fallback_reset", + func=check_and_reset_stt_fallback, + interval_seconds=duration_secs, + ) + logger.info(f"Registered STT fallback reset task (interval={duration_secs}s)") diff --git a/app/services/service_health/__init__.py b/app/services/service_health/__init__.py new file mode 100644 index 000000000..5a9585018 --- /dev/null +++ b/app/services/service_health/__init__.py @@ -0,0 +1,16 @@ +"""Service Health Monitoring package. + +Reuses ServiceFallback from app.services.fallback. +""" + +from app.services.service_health.monitor import ( + ServiceHealthMonitor, + initialize_service_health_tasks, + service_health_monitor, +) + +__all__ = [ + "ServiceHealthMonitor", + "initialize_service_health_tasks", + "service_health_monitor", +] diff --git a/app/services/service_health/monitor.py b/app/services/service_health/monitor.py new file mode 100644 index 000000000..6f1e0b1df --- /dev/null +++ b/app/services/service_health/monitor.py @@ -0,0 +1,220 @@ +"""Service Health Monitor using ServiceFallback (circuit breaker). + +Reuses ServiceFallback from app.services.fallback.__init__ +Error capture: on_pipeline_error (same as STT fallback) +""" + +import json +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +from app.core.background_tasks import BackgroundTaskScheduler +from app.core.config.dynamic import ( + ENABLE_SERVICE_HEALTH_MONITORING, + SERVICE_HEALTH_AUTO_RESUME_MINUTES, +) +from app.core.logger import logger +from app.services.fallback import ServiceFallback, ServiceFallbackConfig +from app.services.redis.client import get_redis_service +from app.services.slack.alert import Alert + +_slack_alert = Alert() + +# Processor name → rule name mapping +# NOTE: STT errors (soniox, deepgram, etc.) are handled by STT fallback +# This covers TTS, LLM, and telephony transports +PROCESSOR_RULE_MAP = { + # TTS providers + "elevenlabs": "elevenlabs", + "elevenlabsttsservice": "elevenlabs", + "cartesia": "cartesia", + "cartesiattsservice": "cartesia", + # LLM providers + "azure": "llm", + "openaillm": "llm", + "googlellm": "llm", + # Telephony transports (Twilio, Exotel, Plivo) + "twilio": "twilio", + "twiliotransport": "twilio", + "twilioinputtransport": "twilio", + "twiliooutputtransport": "twilio", + "exotel": "exotel", + "exoteltransport": "exotel", + "exotelinputtransport": "exotel", + "exoteloutputtransport": "exotel", + "plivo": "plivo", + "plivotransport": "plivo", + "plivoinputtransport": "plivo", + "plivooutputtransport": "plivo", +} + + +def _load_rules() -> dict: + """Load rule definitions from rules.json.""" + rules_path = Path(__file__).parent / "rules.json" + try: + with rules_path.open() as f: + return json.load(f) + except Exception as exc: + logger.warning(f"[ServiceHealth] Could not load rules.json: {exc}") + return {} + + +_RULES: dict = _load_rules() +_CIRCUITS: dict[str, ServiceFallback] = {} + + +def _get_rule_for_processor(processor_name: str) -> Optional[str]: + """Map processor name to rule name.""" + return PROCESSOR_RULE_MAP.get(processor_name.lower()) + + +async def _get_or_create_circuit( + rule: str, config: Optional[dict] = None +) -> ServiceFallback: + """Get existing ServiceFallback circuit or create new one.""" + if rule not in _CIRCUITS: + rule_config = config if config is not None else _RULES.get(rule, {}) + _CIRCUITS[rule] = ServiceFallback( + ServiceFallbackConfig( + service_name=rule, + key_prefix="health", # Use circuit: prefix instead of fallback: + failure_threshold=rule_config.get("threshold_count", 10), + failure_window_secs=rule_config.get("window_minutes", 5) * 60, + fallback_duration_secs=await SERVICE_HEALTH_AUTO_RESUME_MINUTES() * 60, + fallback_provider_name="paused", + on_failure_alert=None, + on_trip_alert=_on_trip_alert, + on_reset_alert=_on_reset_alert, + ) + ) + return _CIRCUITS[rule] + + +# Alert callbacks +async def _on_trip_alert(**kwargs) -> None: + """Alert when circuit opens (calls paused).""" + try: + await _slack_alert.send( + title="🚨 Service Health: Calls Auto-Paused", + fields=[ + {"name": "Rule", "value": kwargs.get("service_name", "unknown")}, + {"name": "Status", "value": "Outbound calls are now paused"}, + ], + ) + except Exception as e: + logger.warning(f"[ServiceHealth] Trip alert failed: {e}") + + +async def _on_reset_alert(**kwargs) -> None: + """Alert when circuit closes (calls resumed).""" + try: + await _slack_alert.send( + title="✅ Service Health: Calls Resumed", + fields=[ + {"name": "Rule", "value": kwargs.get("service_name", "unknown")}, + {"name": "Status", "value": "Outbound calls have resumed"}, + ], + ) + except Exception as e: + logger.warning(f"[ServiceHealth] Reset alert failed: {e}") + + +class ServiceHealthMonitor: + """Service health monitor using ServiceFallback (reused from STT fallback).""" + + async def record_pipeline_error( + self, + processor: str, + error: str, + call_sid: str = "", + context: str = "mid-call", + ) -> bool: + """Record a pipeline error. Called from on_pipeline_error handler.""" + if not await ENABLE_SERVICE_HEALTH_MONITORING(): + return False + + rule = _get_rule_for_processor(processor) + if not rule: + return False + + circuit = await _get_or_create_circuit(rule) + return await circuit.record_failure( + error_msg=error, + call_sid=call_sid, + context=context, + ) + + async def is_globally_paused(self) -> bool: + """Check if any circuit is open (global pause active).""" + for circuit in _CIRCUITS.values(): + if await circuit.is_active(): + return True + return False + + async def pause_calls( + self, reason: str, paused_by: str = "auto", source_rule: Optional[str] = None + ) -> None: + """Manually open a circuit (dashboard/API).""" + rule = source_rule or "manual" + # Use existing rule config if available, otherwise use default for manual pause + config = ( + _RULES.get(rule) + if rule in _RULES + else {"threshold_count": 1, "window_minutes": 1} + ) + circuit = await _get_or_create_circuit(rule, config) + await circuit._activate(await get_redis_service()) + + async def resume_calls(self, resumed_by: str = "auto") -> None: + """Manually close all circuits (dashboard/API).""" + for circuit in _CIRCUITS.values(): + await circuit.reset_to_primary() + + async def get_status(self) -> dict: + """Get current status of all circuits.""" + open_circuits = [] + for rule, circuit in _CIRCUITS.items(): + if await circuit.is_active(): + open_circuits.append(rule) + return { + "is_paused": len(open_circuits) > 0, + "open_circuits": open_circuits, + } + + async def run_auto_health_check(self) -> None: + """Evaluate all circuits and auto-reset if clean.""" + if not await ENABLE_SERVICE_HEALTH_MONITORING(): + return + + for rule, circuit in _CIRCUITS.items(): + if not await circuit.is_active(): + continue + # Check if clean (no recent failures) - ServiceFallback TTL handles this + await circuit.reset_to_primary() + + +# Background task registration +async def check_and_reset_circuits() -> None: + """Background task to check and reset circuits.""" + monitor = ServiceHealthMonitor() + await monitor.run_auto_health_check() + + +async def initialize_service_health_tasks(scheduler: BackgroundTaskScheduler) -> None: + """Register service health check task if enabled.""" + if not await ENABLE_SERVICE_HEALTH_MONITORING(): + logger.info("[ServiceHealth] Monitoring disabled") + return + + scheduler.register_task( + name="service_health_check", + func=check_and_reset_circuits, + interval_seconds=60, + ) + logger.info("[ServiceHealth] Registered health check task") + + +# Singleton instance +service_health_monitor = ServiceHealthMonitor() diff --git a/app/services/service_health/rules.json b/app/services/service_health/rules.json new file mode 100644 index 000000000..548a08659 --- /dev/null +++ b/app/services/service_health/rules.json @@ -0,0 +1,34 @@ +{ + "elevenlabs": { + "threshold_count": 10, + "window_minutes": 5 + }, + "cartesia": { + "threshold_count": 10, + "window_minutes": 5 + }, + "llm": { + "threshold_count": 10, + "window_minutes": 5 + }, + "twilio": { + "threshold_count": 10, + "window_minutes": 5 + }, + "exotel": { + "threshold_count": 10, + "window_minutes": 5 + }, + "plivo": { + "threshold_count": 10, + "window_minutes": 5 + }, + "call_initiation": { + "threshold_count": 20, + "window_minutes": 5 + }, + "provider_fallback": { + "threshold_count": 5, + "window_minutes": 5 + } +}