Auto pause calls on alerts#711
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughThis pull request introduces a comprehensive service-health monitoring system that detects error patterns via log matching, automatically pauses service calls when configured thresholds are exceeded, and exposes API endpoints for manual pause/resume control. The system uses Redis for state management, includes auto-recovery logic with configurable delays, and integrates Slack notifications. Changes
Sequence Diagram(s)sequenceDiagram
participant Logger as App Logger
participant Sink as Health Log Sink
participant Monitor as ServiceHealthMonitor
participant Redis as Redis Store
participant BGTask as Background Task
participant Slack as Slack Notifier
autonumber
Logger->>Sink: Log warning/error message
Sink->>Monitor: Pattern match detected
Monitor->>Redis: Increment error count (per-rule, per-minute)
BGTask->>Monitor: run_auto_health_check() [every 60s]
Monitor->>Redis: Sum errors across window
alt Any rule threshold exceeded?
Monitor->>Redis: Set global_paused=true, paused_by="auto", source_rule=rule_name, paused_at=timestamp
Monitor->>Slack: Send pause alert
else All rules clear & pause triggered by "auto"?
alt Recovery delay elapsed?
Monitor->>Redis: Delete pause state
Monitor->>Slack: Send resume alert
end
end
Monitor->>BGTask: Return (pause state unchanged)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~55 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/api/routers/systems.py`:
- Around line 408-453: The three operational endpoints service_health_pause,
service_health_resume, and service_health_alert are currently unprotected and
accept client-supplied paused_by/resumed_by which allows spoofing; fix by adding
an admin authentication dependency to each route (i.e., require an authenticated
admin principal via your auth dependency) and stop using client-provided
paused_by/resumed_by: extract the actor from the authenticated principal and
pass that actor to service_health_monitor.pause_calls and resume_calls, remove
or ignore those fields from the request body, and only allow pause triggers from
service_health_alert when the caller is an authorized system/admin principal.
In `@app/services/service_health/monitor.py`:
- Around line 366-380: The resume eligibility check is using paused_at
(f"{_NS}:paused_at") which measures pause age rather than how long the system
has been continuously healthy; replace this by reading and parsing a "clear
since" timestamp (e.g., f"{_NS}:clear_since") that is written when all rules
first drop below threshold, then compute elapsed_minutes = (now -
clear_since).total_seconds() / 60 and compare that to
SERVICE_HEALTH_AUTO_RESUME_MINUTES; also ensure the timestamp parsing still uses
datetime.fromisoformat with timezone-aware datetimes. Update the code that
detects when all rules are below threshold (where health rules are evaluated) to
set/clear the f"{_NS}:clear_since" key appropriately so this check has the
correct reference time.
- Around line 99-110: The sink is re-processing logs emitted by the same
service-health module and can self-trigger; update sink(message) to early-return
when the record originates from the health monitor itself by checking
record.get("name") / record.get("logger") or record.get("module") /
record.get("funcName") for identifiers like
"app.services.service_health.monitor", "service_health_monitor" or
"record_error" before iterating pattern_map; keep the existing level check and
exception handling but perform this origin check first in sink so calls to
service_health_monitor.record_error cannot re-enter the same matching/scheduling
logic.
In `@app/services/service_health/rules.json`:
- Around line 6-10: Remove the catch-all "soniox" entry from the error-match
list in rules.json so the monitor only matches concrete failure phrases; edit
the array that currently contains "http 502", "unable to connect to soniox",
"exception processing startframe", "soniox" and delete the "soniox" element,
then ensure the JSON array remains syntactically valid (commas adjusted) so only
specific phrases trigger auto-pauses.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c8d28ab6-cc4a-48bf-ab21-d5411082f5a4
📒 Files selected for processing (6)
app/api/routers/systems.pyapp/core/config/static.pyapp/main.pyapp/services/service_health/__init__.pyapp/services/service_health/monitor.pyapp/services/service_health/rules.json
| @router.get("/health/service") | ||
| async def service_health_status(): | ||
| """Return current service-health state: pause overlay + per-rule error counts.""" | ||
| from app.services.service_health.monitor import service_health_monitor | ||
|
|
||
| status = await service_health_monitor.get_status() | ||
| return JSONResponse(status) | ||
|
|
||
|
|
||
| @router.post("/health/pause") | ||
| async def service_health_pause(request: Request): | ||
| """Manually pause outbound calls.""" | ||
| from app.services.service_health.monitor import service_health_monitor | ||
|
|
||
| body = await request.json() | ||
| reason = body.get("reason", "Manual pause via API") | ||
| paused_by = body.get("paused_by", "team_member") | ||
| await service_health_monitor.pause_calls( | ||
| reason=reason, paused_by=paused_by, source_rule=None | ||
| ) | ||
| return JSONResponse({"status": "paused", "reason": reason, "paused_by": paused_by}) | ||
|
|
||
|
|
||
| @router.post("/health/resume") | ||
| async def service_health_resume(request: Request): | ||
| """Manually resume outbound calls.""" | ||
| from app.services.service_health.monitor import service_health_monitor | ||
|
|
||
| body = await request.json() | ||
| resumed_by = body.get("resumed_by", "team_member") | ||
| await service_health_monitor.resume_calls(resumed_by=resumed_by) | ||
| return JSONResponse({"status": "resumed", "resumed_by": resumed_by}) | ||
|
|
||
|
|
||
| @router.post("/health/alert") | ||
| async def service_health_alert(request: Request): | ||
| """Webhook endpoint for external alert systems to trigger a service pause.""" | ||
| from app.services.service_health.monitor import service_health_monitor | ||
|
|
||
| body = await request.json() | ||
| rule = body.get("rule", "external_alert") | ||
| reason = body.get("reason", f"External alert received for rule: {rule}") | ||
| await service_health_monitor.pause_calls( | ||
| reason=reason, paused_by="alert_webhook", source_rule=rule | ||
| ) | ||
| return JSONResponse({"status": "paused", "rule": rule, "reason": reason}) |
There was a problem hiding this comment.
Protect these operational endpoints with real auth.
/health/pause, /health/resume, and /health/alert are public right now, and Line 424 / Line 437 let the caller spoof who performed the action. Any unauthenticated caller can globally stop or resume outbound calling. Gate these routes behind an admin auth dependency and derive paused_by / resumed_by from the authenticated principal server-side.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/api/routers/systems.py` around lines 408 - 453, The three operational
endpoints service_health_pause, service_health_resume, and service_health_alert
are currently unprotected and accept client-supplied paused_by/resumed_by which
allows spoofing; fix by adding an admin authentication dependency to each route
(i.e., require an authenticated admin principal via your auth dependency) and
stop using client-provided paused_by/resumed_by: extract the actor from the
authenticated principal and pass that actor to
service_health_monitor.pause_calls and resume_calls, remove or ignore those
fields from the request body, and only allow pause triggers from
service_health_alert when the caller is an authorized system/admin principal.
| def sink(message): | ||
| record = message.record | ||
| if record["level"].no < 30: # WARNING = 30, skip DEBUG/INFO | ||
| return | ||
| text = record["message"].lower() | ||
| try: | ||
| loop = asyncio.get_running_loop() | ||
| for rule, phrases in pattern_map.items(): | ||
| if any(phrase in text for phrase in phrases): | ||
| loop.create_task(service_health_monitor.record_error(rule)) | ||
| except Exception: | ||
| pass # Never let health monitoring affect normal logging |
There was a problem hiding this comment.
Exclude service-health’s own logs from the sink.
This sink processes warnings emitted by app/services/service_health/monitor.py itself. When a call like record_error("soniox") fails, the warning message includes soniox, matches rules.json again, and schedules another record_error() task. During Redis/logging failures this can spiral into self-triggered error storms.
Suggested fix
def sink(message):
record = message.record
+ if record["name"] == __name__:
+ return
if record["level"].no < 30: # WARNING = 30, skip DEBUG/INFO
return
text = record["message"].lower()
try:
loop = asyncio.get_running_loop()🧰 Tools
🪛 Ruff (0.15.10)
[warning] 108-108: Store a reference to the return value of loop.create_task
(RUF006)
[error] 109-110: try-except-pass detected, consider logging the exception
(S110)
[warning] 109-109: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/service_health/monitor.py` around lines 99 - 110, The sink is
re-processing logs emitted by the same service-health module and can
self-trigger; update sink(message) to early-return when the record originates
from the health monitor itself by checking record.get("name") /
record.get("logger") or record.get("module") / record.get("funcName") for
identifiers like "app.services.service_health.monitor", "service_health_monitor"
or "record_error" before iterating pattern_map; keep the existing level check
and exception handling but perform this origin check first in sink so calls to
service_health_monitor.record_error cannot re-enter the same matching/scheduling
logic.
| # Verify the pause has been active long enough | ||
| paused_at_raw = await redis.get(f"{_NS}:paused_at") | ||
| if paused_at_raw: | ||
| paused_at_str = ( | ||
| paused_at_raw.decode() | ||
| if isinstance(paused_at_raw, bytes) | ||
| else paused_at_raw | ||
| ) | ||
| try: | ||
| paused_at = datetime.fromisoformat(paused_at_str) | ||
| elapsed_minutes = ( | ||
| datetime.now(timezone.utc) - paused_at | ||
| ).total_seconds() / 60 | ||
| if elapsed_minutes < SERVICE_HEALTH_AUTO_RESUME_MINUTES: | ||
| return # Not yet eligible for auto-resume |
There was a problem hiding this comment.
Base auto-resume on “clear since”, not pause age.
Lines 366-380 use paused_at to decide resume eligibility. That resumes too early whenever the outage clears late: if the system is unhealthy for 14 minutes and only becomes clean in minute 15, this code resumes after ~1 clean minute instead of after SERVICE_HEALTH_AUTO_RESUME_MINUTES of clean time. Persist the timestamp when all rules first drop below threshold and compare against that instead.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/service_health/monitor.py` around lines 366 - 380, The resume
eligibility check is using paused_at (f"{_NS}:paused_at") which measures pause
age rather than how long the system has been continuously healthy; replace this
by reading and parsing a "clear since" timestamp (e.g., f"{_NS}:clear_since")
that is written when all rules first drop below threshold, then compute
elapsed_minutes = (now - clear_since).total_seconds() / 60 and compare that to
SERVICE_HEALTH_AUTO_RESUME_MINUTES; also ensure the timestamp parsing still uses
datetime.fromisoformat with timezone-aware datetimes. Update the code that
detects when all rules are below threshold (where health rules are evaluated) to
set/clear the f"{_NS}:clear_since" key appropriately so this check has the
correct reference time.
| "http 502", | ||
| "unable to connect to soniox", | ||
| "exception processing startframe", | ||
| "soniox" | ||
| ] |
There was a problem hiding this comment.
Remove the catch-all "soniox" match.
Line 9 will count any WARNING/ERROR that merely mentions Soniox, not just actual provider failures. With the monitor doing plain substring matching on every WARNING+ log, this makes false auto-pauses much more likely. Keep only concrete failure phrases here.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/service_health/rules.json` around lines 6 - 10, Remove the
catch-all "soniox" entry from the error-match list in rules.json so the monitor
only matches concrete failure phrases; edit the array that currently contains
"http 502", "unable to connect to soniox", "exception processing startframe",
"soniox" and delete the "soniox" element, then ensure the JSON array remains
syntactically valid (commas adjusted) so only specific phrases trigger
auto-pauses.
There was a problem hiding this comment.
Pull request overview
Adds a Redis-backed “service health” monitoring system intended to auto-pause outbound calls when upstream/provider error rates cross thresholds, with manual pause/resume controls and Slack notifications.
Changes:
- Introduces rule-based error counting (via Loguru sink + Redis sliding window counters) and auto pause/resume logic with Slack alerts.
- Wires the monitor into app startup and registers a periodic background health-check task.
- Adds Systems API endpoints to view status and to pause/resume via API/webhook, plus config/env toggles.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| app/services/service_health/rules.json | Defines log-pattern rules and thresholds for service health monitoring. |
| app/services/service_health/monitor.py | Implements log sink, Redis counters, pause overlay keys, auto-resume logic, and Slack alerts. |
| app/services/service_health/init.py | Exposes the service health monitor singleton for imports. |
| app/main.py | Installs the Loguru sink at startup and schedules the periodic health check task. |
| app/core/config/static.py | Adds env-driven switches for enabling auto-pause and configuring auto-resume delay. |
| app/api/routers/systems.py | Adds endpoints for service-health status plus manual pause/resume and an alert webhook. |
| Tracks upstream service failures via Redis sliding-window counters. | ||
| When error counts exceed configured thresholds, calls are paused globally | ||
| via a Redis overlay flag (no database mutation). A background task runs | ||
| every 60 seconds to evaluate all rules and auto-resume after a recovery |
There was a problem hiding this comment.
This module sets a Redis "global_paused" overlay, but there is currently no call path that reads/enforces it. A repo-wide search shows no usages of {service_health}:global_paused outside this file, so pausing via rules/alerts won’t actually stop outbound calls. Add an enforcement check in the call initiation/dispatch path (e.g., Breeze Buddy lead processing / provider call initiation) to block new calls when the overlay is active.
| bucket = int(time.time()) // 60 | ||
| key = f"{_NS}:errors:{rule}:{bucket}" | ||
| count = await redis.incr(key) | ||
| # Only set TTL on the first write to avoid hammering Redis | ||
| if count == 1: | ||
| await redis.expire(key, 600) # 10-minute TTL | ||
| except Exception as e: |
There was a problem hiding this comment.
Per-minute error bucket TTL is hard-coded to 600s. Since the docstring says new services can be added by editing rules.json only, a rule with window_minutes > 10 would silently undercount (old buckets expire before the window completes). Consider computing TTL from the max configured window (plus some buffer) so rules.json changes can’t break counting semantics.
| @router.post("/health/pause") | ||
| async def service_health_pause(request: Request): | ||
| """Manually pause outbound calls.""" | ||
| from app.services.service_health.monitor import service_health_monitor | ||
|
|
||
| body = await request.json() | ||
| reason = body.get("reason", "Manual pause via API") | ||
| paused_by = body.get("paused_by", "team_member") | ||
| await service_health_monitor.pause_calls( | ||
| reason=reason, paused_by=paused_by, source_rule=None | ||
| ) | ||
| return JSONResponse({"status": "paused", "reason": reason, "paused_by": paused_by}) | ||
|
|
||
|
|
||
| @router.post("/health/resume") | ||
| async def service_health_resume(request: Request): | ||
| """Manually resume outbound calls.""" | ||
| from app.services.service_health.monitor import service_health_monitor | ||
|
|
||
| body = await request.json() | ||
| resumed_by = body.get("resumed_by", "team_member") | ||
| await service_health_monitor.resume_calls(resumed_by=resumed_by) | ||
| return JSONResponse({"status": "resumed", "resumed_by": resumed_by}) |
There was a problem hiding this comment.
The new /health/pause and /health/resume endpoints allow any caller to globally pause/resume outbound calls with no authentication/authorization. This is a high-impact control plane action and should be protected (e.g., reuse existing RBAC/JWT dependencies, require an internal API key, or restrict by network), otherwise an external party could trivially DoS calling.
| @router.post("/health/alert") | ||
| async def service_health_alert(request: Request): | ||
| """Webhook endpoint for external alert systems to trigger a service pause.""" | ||
| from app.services.service_health.monitor import service_health_monitor | ||
|
|
||
| body = await request.json() | ||
| rule = body.get("rule", "external_alert") | ||
| reason = body.get("reason", f"External alert received for rule: {rule}") | ||
| await service_health_monitor.pause_calls( | ||
| reason=reason, paused_by="alert_webhook", source_rule=rule | ||
| ) | ||
| return JSONResponse({"status": "paused", "rule": rule, "reason": reason}) |
There was a problem hiding this comment.
/health/alert is a webhook that can pause calls, but it currently trusts any POST body and has no signature/secret validation. Please add request authentication (shared secret/signature verification, allowlisted source, etc.) before using the payload to trigger a global pause.
| def sink(message): | ||
| record = message.record | ||
| if record["level"].no < 30: # WARNING = 30, skip DEBUG/INFO | ||
| return | ||
| text = record["message"].lower() | ||
| try: | ||
| loop = asyncio.get_running_loop() | ||
| for rule, phrases in pattern_map.items(): | ||
| if any(phrase in text for phrase in phrases): | ||
| loop.create_task(service_health_monitor.record_error(rule)) | ||
| except Exception: | ||
| pass # Never let health monitoring affect normal logging |
There was a problem hiding this comment.
The Loguru sink will also process logs emitted by the service-health monitor itself (e.g., pause/resume logs and Redis error warnings). Because some rule patterns are broad (like "soniox"), these self-logs can match rules and recursively schedule more record_error() calls, inflating counters and potentially creating a runaway loop during Redis failures. Add an explicit guard to ignore [ServiceHealth] logs / this module's logger records, or tighten the logger.add(..., filter=...) to exclude service-health records from the sink.
| @router.post("/health/alert") | ||
| async def service_health_alert(request: Request): | ||
| """Webhook endpoint for external alert systems to trigger a service pause.""" | ||
| from app.services.service_health.monitor import service_health_monitor |
| # ------------------------------------------------------------------ # | ||
|
|
||
| # Master switch — set ENABLE_AUTO_PAUSE_CALLS=false to disable entirely | ||
| ENABLE_AUTO_PAUSE_CALLS = ( |
| has exceeded SERVICE_HEALTH_AUTO_RESUME_MINUTES → resume + Slack alert | ||
| - If currently paused by a human ("paused_by" != "auto") → never auto-resume | ||
| """ | ||
| if not ENABLE_AUTO_PAUSE_CALLS: |
There was a problem hiding this comment.
this doesn't correspond to fnc it used? I guess we can change variable name here
7faea54 to
b44ced0
Compare
|
|
||
|
|
||
| class ServiceHealthMonitor: | ||
| """ |
There was a problem hiding this comment.
Avoid too many comments lets keep only why we did what we did and rest code should be self explanatory
|
|
||
| ### Register new tasks here | ||
|
|
||
| from app.core.config.static import SERVICE_HEALTH_CHECKER_ENABLED |
There was a problem hiding this comment.
Move to top if possible
|
|
||
|
|
||
| @router.post("/health/resume") | ||
| async def service_health_resume( |
There was a problem hiding this comment.
Again same. Let's just keeps which explain why we are doing what we doing
|
@Swetha-160303 Looks good minor nits 👍 |
b44ced0 to
1dc2966
Compare
Summary by CodeRabbit
DEV PROOF:
