-
Notifications
You must be signed in to change notification settings - Fork 57
feat: auto pause and system check #731
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: release
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -65,6 +65,13 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from app.ai.voice.agents.breeze_buddy.services.telephony.base_provider import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| VoiceCallProvider, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from app.ai.voice.agents.breeze_buddy.stt.fallback import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ALERT_STT_TERMINAL_FAILURE, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| STT_FALLBACK_SLACK_TAG, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| record_stt_failure, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| send_templated_alert, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from app.services.service_health import service_health_monitor | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from app.ai.voice.agents.breeze_buddy.template import TemplateContext | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from app.ai.voice.agents.breeze_buddy.template.builder import FlowConfigBuilder | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from app.ai.voice.agents.breeze_buddy.template.context import with_context | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -603,6 +610,42 @@ async def on_pipeline_error(task, error): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| {"processor": str(processor), "error": error_msg}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Detect STT errors by processor name keywords | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| processor_str = str(processor).lower() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stt_keywords = ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "stt", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "soniox", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "deepgram", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "transcri", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "google", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "sarvam", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| is_stt_error = any(kw in processor_str for kw in stt_keywords) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+613
to
+623
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Detect STT errors by processor name keywords | |
| processor_str = str(processor).lower() | |
| stt_keywords = ( | |
| "stt", | |
| "soniox", | |
| "deepgram", | |
| "transcri", | |
| "google", | |
| "sarvam", | |
| ) | |
| is_stt_error = any(kw in processor_str for kw in stt_keywords) | |
| # Detect STT errors using STT-specific processor identifiers only. | |
| # Avoid broad provider substrings like "google", which can also match | |
| # non-STT processors such as Google-backed LLM components. | |
| processor_str = str(processor).lower() | |
| processor_type_str = processor.__class__.__name__.lower() | |
| stt_keywords = ( | |
| "stt", | |
| "soniox", | |
| "deepgram", | |
| "transcri", | |
| "sarvam", | |
| "speech", | |
| ) | |
| is_stt_error = any( | |
| kw in processor_str or kw in processor_type_str | |
| for kw in stt_keywords | |
| ) |
Copilot
AI
Apr 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
service_health_monitor is imported but never used in on_pipeline_error. As a result, non‑STT pipeline errors (TTS/LLM/telephony) aren’t recorded into the service health circuits, so auto‑pause can’t trigger from those failures. Consider calling service_health_monitor.record_pipeline_error(...) for non‑STT errors before returning.
| if not is_stt_error: | |
| if not is_stt_error: | |
| try: | |
| service_health_monitor.record_pipeline_error( | |
| processor=str(processor), | |
| error_message=str(error_msg), | |
| ) | |
| except Exception as health_err: | |
| logger.warning( | |
| f"Failed to record non-STT pipeline error in service health monitor: {health_err}" | |
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if fire_and_forget is defined or imported anywhere in the codebase
echo "=== Searching for fire_and_forget definition/import ==="
rg -n "fire_and_forget" --type=py
echo ""
echo "=== Checking EndFrame import in agent/__init__.py ==="
rg -n "from pipecat.frames.frames import" app/ai/voice/agents/breeze_buddy/agent/__init__.py
echo ""
echo "=== Checking stt_provider attribute initialization ==="
rg -n "self\.stt_provider" app/ai/voice/agents/breeze_buddy/agent/__init__.py
echo ""
echo "=== Checking _stt_failure_recorded attribute initialization ==="
rg -n "self\._stt_failure_recorded" app/ai/voice/agents/breeze_buddy/agent/__init__.pyRepository: juspay/clairvoyance
Length of output: 2978
🏁 Script executed:
#!/bin/bash
# Get first 100 lines to see all imports
echo "=== First 100 lines (imports section) ==="
head -100 app/ai/voice/agents/breeze_buddy/agent/__init__.py
echo ""
echo "=== Check if fire_and_forget is defined in agent/__init__.py ==="
rg -n "def fire_and_forget" app/ai/voice/agents/breeze_buddy/agent/__init__.py
echo ""
echo "=== Search for __init__ method and surrounding context ==="
rg -n "def __init__" app/ai/voice/agents/breeze_buddy/agent/__init__.pyRepository: juspay/clairvoyance
Length of output: 3709
🏁 Script executed:
#!/bin/bash
# Read the __init__ method body (lines 116-200 to capture most of it)
echo "=== __init__ method body (lines 116-250) ==="
sed -n '116,250p' app/ai/voice/agents/breeze_buddy/agent/__init__.py
echo ""
echo "=== Search for fire_and_forget definition anywhere in file ==="
grep -n "def fire_and_forget\|fire_and_forget.*=" app/ai/voice/agents/breeze_buddy/agent/__init__.py
echo ""
echo "=== Search for any fire_and_forget function or import in entire file ==="
grep -n "fire_and_forget" app/ai/voice/agents/breeze_buddy/agent/__init__.pyRepository: juspay/clairvoyance
Length of output: 5457
🏁 Script executed:
#!/bin/bash
# Verify the exact line range (613-647) matches the code snippet shown in review
echo "=== Lines 613-647 to verify against review snippet ==="
sed -n '613,647p' app/ai/voice/agents/breeze_buddy/agent/__init__.pyRepository: juspay/clairvoyance
Length of output: 1466
Critical: Undefined names and attributes will cause runtime errors.
The code contains multiple undefined names and attributes that will raise NameError and AttributeError at runtime:
fire_and_forget(line 643) — not imported or defined anywhere in the fileEndFrame(line 645) — not imported frompipecat.frames.framesself.stt_provider(line 631) — not initialized in__init__self._stt_failure_recorded(line 632) — not initialized in__init__
The silent except: pass (lines 646-647) will also swallow errors without any indication of failure.
Proposed fixes
Add missing import:
-from pipecat.frames.frames import LLMMessagesAppendFrame, TTSSpeakFrame
+from pipecat.frames.frames import EndFrame, LLMMessagesAppendFrame, TTSSpeakFrameImport or define fire_and_forget:
+import asyncio
+
+def fire_and_forget(coro):
+ """Schedule coroutine without awaiting."""
+ asyncio.create_task(coro)Initialize attributes in __init__ after line 195 (error tracking section):
# Error tracking
self.errors: List[Dict[str, Any]] = []
+
+ # STT fallback tracking
+ self.stt_provider: Optional[str] = None
+ self._stt_failure_recorded: bool = FalseReplace silent exception:
- try:
- await task.queue_frames([EndFrame()])
- except Exception:
- pass
+ try:
+ await task.queue_frames([EndFrame()])
+ except Exception as e:
+ logger.warning(f"Failed to queue EndFrame: {e}")🧰 Tools
🪛 Ruff (0.15.12)
[warning] 639-639: Do not catch blind exception: Exception
(BLE001)
[error] 643-643: Undefined name fire_and_forget
(F821)
[error] 645-645: Undefined name EndFrame
(F821)
[error] 646-647: try-except-pass detected, consider logging the exception
(S110)
[warning] 646-646: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/ai/voice/agents/breeze_buddy/agent/__init__.py` around lines 613 - 647,
The block handling STT errors uses undefined symbols and uninitialized
attributes: import or define fire_and_forget and EndFrame (from
pipecat.frames.frames) so calls to
fire_and_forget(self._send_mid_call_stt_alert()) and EndFrame() resolve;
initialize self.stt_provider and self._stt_failure_recorded in the class
__init__ (e.g., set default provider string and False) so checks in that block
won't raise AttributeError; and replace the silent "except: pass" around await
task.queue_frames([EndFrame()]) with a specific exception handler that logs the
error (use logger.warning or logger.exception) to avoid swallowing failures.
Ensure references to record_stt_failure and _send_mid_call_stt_alert remain
unchanged.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,6 +69,7 @@ | |
| ) | ||
| from app.services.gcp.storage.storage import upload_file_to_gcs | ||
| from app.services.redis.client import get_redis_service | ||
| from app.services.service_health import service_health_monitor | ||
|
|
||
|
|
||
| async def _get_lead_config(lead: LeadCallTracker) -> Optional[CallExecutionConfig]: | ||
|
|
@@ -456,6 +457,14 @@ async def process_backlog_leads(): | |
| await release_lock_on_lead_by_id(locked_lead.id) | ||
| continue | ||
|
|
||
| # Check global service health pause (circuit breaker pattern) | ||
| if await service_health_monitor.is_globally_paused(): | ||
| logger.info( | ||
| f"Skipping lead {locked_lead.id} - calls are globally paused due to service health" | ||
| ) | ||
| await release_lock_on_lead_by_id(locked_lead.id) | ||
| continue | ||
|
Comment on lines
+460
to
+466
|
||
|
|
||
| customer_phone = (locked_lead.payload or {}).get( | ||
| "customer_mobile_number" | ||
| ) | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -341,3 +341,22 @@ async def OUTBOUND_RATE_LIMIT_WINDOW_SECONDS() -> int: | |||||||||||||||||||||
| async def OUTBOUND_RATE_LIMIT_BLOCK_ENABLED() -> bool: | ||||||||||||||||||||||
| """Returns OUTBOUND_RATE_LIMIT_BLOCK_ENABLED from Redis""" | ||||||||||||||||||||||
| return await get_config("OUTBOUND_RATE_LIMIT_BLOCK_ENABLED", False, bool) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| # --- Service Health Monitoring Configuration --- | ||||||||||||||||||||||
| async def ENABLE_SERVICE_HEALTH_MONITORING() -> bool: | ||||||||||||||||||||||
| """Returns ENABLE_SERVICE_HEALTH_MONITORING from Redis. | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| When True, service health monitoring is active and will auto-pause | ||||||||||||||||||||||
| calls when upstream service failures exceed thresholds. | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
| return await get_config("ENABLE_SERVICE_HEALTH_MONITORING", True, bool) | ||||||||||||||||||||||
|
Comment on lines
+350
to
+353
|
||||||||||||||||||||||
| When True, service health monitoring is active and will auto-pause | |
| calls when upstream service failures exceed thresholds. | |
| """ | |
| return await get_config("ENABLE_SERVICE_HEALTH_MONITORING", True, bool) | |
| When False (default), service health monitoring is disabled unless | |
| explicitly enabled via Redis/DevCycle rollout. | |
| When True, service health monitoring is active and will auto-pause | |
| calls when upstream service failures exceed thresholds. | |
| """ | |
| return await get_config("ENABLE_SERVICE_HEALTH_MONITORING", False, bool) |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -65,6 +65,8 @@ | |||||||||||||||||||||||
| from app.schemas import ( | ||||||||||||||||||||||||
| AutomaticVoiceUserConnectRequest, | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| from app.services.fallback import initialize_fallback_tasks | ||||||||||||||||||||||||
| from app.services.service_health import initialize_service_health_tasks | ||||||||||||||||||||||||
| from app.services.langfuse.tasks.task import initialize_langfuse_tasks | ||||||||||||||||||||||||
| from app.services.redis import ( | ||||||||||||||||||||||||
| close_redis_connections, | ||||||||||||||||||||||||
|
|
@@ -167,6 +169,9 @@ async def lifespan(_app: FastAPI): | |||||||||||||||||||||||
| # Initialize Langfuse tasks (if configured) | ||||||||||||||||||||||||
| await initialize_langfuse_tasks(_background_scheduler) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Initialize STT fallback reset tasks | ||||||||||||||||||||||||
| await initialize_fallback_tasks(_background_scheduler) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
|
||||||||||||||||||||||||
| # Initialize service health monitoring tasks | |
| await initialize_service_health_tasks(_background_scheduler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing call to initialize_service_health_tasks.
initialize_service_health_tasks is imported on line 69 but never invoked. The service health background task (check_and_reset_circuits) will not be registered, meaning circuits will never auto-reset based on the configured schedule.
🐛 Proposed fix to register service health tasks
# Initialize STT fallback reset tasks
await initialize_fallback_tasks(_background_scheduler)
+ # Initialize service health check tasks
+ await initialize_service_health_tasks(_background_scheduler)
+
### Register new tasks here📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Initialize STT fallback reset tasks | |
| await initialize_fallback_tasks(_background_scheduler) | |
| ### Register new tasks here | |
| # Initialize STT fallback reset tasks | |
| await initialize_fallback_tasks(_background_scheduler) | |
| # Initialize service health check tasks | |
| await initialize_service_health_tasks(_background_scheduler) | |
| ### Register new tasks here |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/main.py` around lines 172 - 175, The import
initialize_service_health_tasks is never invoked, so the service health job
(check_and_reset_circuits) isn’t registered; add a call to await
initialize_service_health_tasks(_background_scheduler) (similar to the existing
await initialize_fallback_tasks(_background_scheduler)) in the initialization
sequence (e.g., immediately after initialize_fallback_tasks) so the background
scheduler registers the check_and_reset_circuits task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Several newly added imports appear unused in this module (
ALERT_STT_TERMINAL_FAILURE,STT_FALLBACK_SLACK_TAG,send_templated_alert, andservice_health_monitorunless used elsewhere). If they aren’t used later in the file, they should be removed to avoid confusion and keep imports accurate.