From 7a70e9340b89b040d1b155e6fda4c58d864cbe0f Mon Sep 17 00:00:00 2001 From: "devansh.raj@juspay.in" Date: Mon, 20 Apr 2026 12:30:29 +0530 Subject: [PATCH] feat(breeze-buddy): add global pickup rate alert background task Adds a periodic background task that monitors call pickup rates and sends Slack warning alerts when rates drop below a configurable threshold. - Add app/services/pickup_rate/ module with: - config.py: AlertConfig dataclass (enabled, interval, threshold, alert_type, scope) with per-merchant fields stubbed for Phase 2 extension - calculator.py: compute_pickup_rates() mirroring ScoreMonitor._get_daily_call_stats() for both call-based and lead-based rate calculation - monitor.py: PickupRateMonitor class with threshold evaluation, Redis-based dedup (fail-open), and inline slack_alert.send() for alerts - task.py: initialize_pickup_rate_tasks() following langfuse task registration pattern - Add 3 dynamic config getters in dynamic.py: ENABLE_PICKUP_RATE_ALERT, PICKUP_RATE_ALERT_INTERVAL_SECONDS, PICKUP_RATE_ALERT_THRESHOLD - Register task in main.py lifespan after Langfuse task registration Edge cases handled: zero calls/leads skipped, Redis unavailable fails open, DB errors skip the cycle, Slack failures skip mark-alerted for retry. Lookback window derived from interval_seconds (no separate config needed). Per-merchant alerting scaffolded for Phase 2 (call_execution_config + DB filters). --- app/core/config/dynamic.py | 16 + app/main.py | 2 + .../langfuse/tasks/score_monitor/score.py | 30 +- app/services/pickup_rate/__init__.py | 15 + app/services/pickup_rate/calculator.py | 121 ++++++++ app/services/pickup_rate/config.py | 55 ++++ app/services/pickup_rate/monitor.py | 282 ++++++++++++++++++ app/services/pickup_rate/task.py | 55 ++++ 8 files changed, 560 insertions(+), 16 deletions(-) create mode 100644 app/services/pickup_rate/__init__.py create mode 100644 app/services/pickup_rate/calculator.py create mode 100644 app/services/pickup_rate/config.py create mode 100644 app/services/pickup_rate/monitor.py create mode 100644 app/services/pickup_rate/task.py diff --git a/app/core/config/dynamic.py b/app/core/config/dynamic.py index 68ecc15aa..45711ea2c 100644 --- a/app/core/config/dynamic.py +++ b/app/core/config/dynamic.py @@ -46,6 +46,22 @@ async def ENABLE_BACKGROUND_TASKS() -> bool: return await get_config("ENABLE_BACKGROUND_TASKS", "false", bool) +# --- Pickup Rate Alert Configuration --- +async def ENABLE_PICKUP_RATE_ALERT() -> bool: + """Returns ENABLE_PICKUP_RATE_ALERT from Redis""" + return await get_config("ENABLE_PICKUP_RATE_ALERT", "false", bool) + + +async def PICKUP_RATE_ALERT_INTERVAL_SECONDS() -> int: + """Returns PICKUP_RATE_ALERT_INTERVAL_SECONDS from Redis (default: 86400 = 24 h)""" + return await get_config("PICKUP_RATE_ALERT_INTERVAL_SECONDS", 86400, int) + + +async def PICKUP_RATE_ALERT_THRESHOLD() -> float: + """Returns PICKUP_RATE_ALERT_THRESHOLD from Redis (default: 40.0 %)""" + return await get_config("PICKUP_RATE_ALERT_THRESHOLD", 40.0, float) + + async def DAILY_SUMMARY_HOUR() -> int: """Returns DAILY_SUMMARY_HOUR from Redis (24-hour format: 0-23)""" return await get_config("DAILY_SUMMARY_HOUR", 21, int) diff --git a/app/main.py b/app/main.py index ce38c09c2..f55bf4073 100644 --- a/app/main.py +++ b/app/main.py @@ -66,6 +66,7 @@ AutomaticVoiceUserConnectRequest, ) from app.services.langfuse.tasks.task import initialize_langfuse_tasks +from app.services.pickup_rate.task import initialize_pickup_rate_tasks from app.services.redis import ( close_redis_connections, get_redis_service, @@ -168,6 +169,7 @@ async def lifespan(_app: FastAPI): await initialize_langfuse_tasks(_background_scheduler) ### Register new tasks here + await initialize_pickup_rate_tasks(_background_scheduler) # Start the scheduler only if tasks are registered if _background_scheduler.tasks: diff --git a/app/services/langfuse/tasks/score_monitor/score.py b/app/services/langfuse/tasks/score_monitor/score.py index 7399b529b..d5eb87fed 100644 --- a/app/services/langfuse/tasks/score_monitor/score.py +++ b/app/services/langfuse/tasks/score_monitor/score.py @@ -440,24 +440,22 @@ async def _get_daily_call_stats(self) -> Dict[str, Any]: # Process each call tracker for call-based stats for tracker, calling_provider in call_trackers: - # Count attempted calls (FINISHED status) + # Count attempted calls and outcomes only for FINISHED calls if tracker.status and tracker.status.value == "FINISHED": calls_attempted += 1 - - # Count by outcome - outcome_value = tracker.outcome if tracker.outcome else None - if outcome_value == "NO_ANSWER": - calls_no_answer += 1 - elif outcome_value == "CONFIRM": - calls_confirm += 1 - elif outcome_value == "CANCEL": - calls_cancel += 1 - elif outcome_value == "ADDRESS_UPDATED": - calls_address_updated += 1 - elif outcome_value == "BUSY": - calls_busy += 1 - - # Count by provider + outcome_value = tracker.outcome if tracker.outcome else None + if outcome_value == "NO_ANSWER": + calls_no_answer += 1 + elif outcome_value == "CONFIRM": + calls_confirm += 1 + elif outcome_value == "CANCEL": + calls_cancel += 1 + elif outcome_value == "ADDRESS_UPDATED": + calls_address_updated += 1 + elif outcome_value == "BUSY": + calls_busy += 1 + + # Count by provider regardless of status if calling_provider: provider_upper = calling_provider.upper() if provider_upper in provider_counts: diff --git a/app/services/pickup_rate/__init__.py b/app/services/pickup_rate/__init__.py new file mode 100644 index 000000000..2ae050191 --- /dev/null +++ b/app/services/pickup_rate/__init__.py @@ -0,0 +1,15 @@ +""" +Pickup Rate Alert Service + +Monitors call pickup rates and sends Slack warning alerts when rates drop +below configurable thresholds. +""" + +from app.services.pickup_rate.monitor import PickupRateMonitor, pickup_rate_monitor +from app.services.pickup_rate.task import initialize_pickup_rate_tasks + +__all__ = [ + "PickupRateMonitor", + "pickup_rate_monitor", + "initialize_pickup_rate_tasks", +] diff --git a/app/services/pickup_rate/calculator.py b/app/services/pickup_rate/calculator.py new file mode 100644 index 000000000..5425b46bb --- /dev/null +++ b/app/services/pickup_rate/calculator.py @@ -0,0 +1,121 @@ +""" +Pickup Rate Calculator + +Computes call-based and lead-based pickup rates for a given time window, +mirroring the logic in ScoreMonitor._get_daily_call_stats(). + +Phase 1: merchant_id / reseller_id params are accepted but ignored. +Phase 2: pass them through to the DB accessor filters once those are wired up. +""" + +from datetime import datetime +from typing import Any, Dict, Optional + +from app.core.logger import logger +from app.database.accessor.breeze_buddy.lead_call_tracker import ( + get_all_lead_call_trackers, + get_lead_based_analytics, +) + +# Sentinel value returned on DB error – callers must check for None +_ERROR_RESULT = None + + +async def compute_pickup_rates( + start_date: datetime, + end_date: datetime, + reseller_id: Optional[str] = None, # Phase 2: filter by reseller + merchant_id: Optional[str] = None, # Phase 2: filter by merchant +) -> Optional[Dict[str, Any]]: + """ + Calculate call-based and lead-based pickup rates for the given window. + + Args: + start_date: Start of the rolling window (UTC-aware). + end_date: End of the rolling window (UTC-aware). + reseller_id: (Phase 2) Restrict results to this reseller. + merchant_id: (Phase 2) Restrict results to this merchant. + + Returns: + Dictionary with the following keys on success:: + + { + "calls_attempted": int, # FINISHED calls in window + "calls_picked": int, # attempted - NO_ANSWER + "calls_no_answer": int, + "call_pickup_rate": float, # (picked / attempted) * 100 + "total_leads": int, # unique request_ids + "leads_picked": int, # leads where finished > no_answer + "lead_pickup_rate": float, # (leads_picked / total_leads) * 100 + } + + Returns ``None`` if a DB error occurs (caller should skip alerting). + """ + try: + # ------------------------------------------------------------------ + # 1. Call-based metrics + # ------------------------------------------------------------------ + # Phase 2 NOTE: pass reseller_id / merchant_id to the accessor once + # those optional params are supported by get_all_lead_call_trackers(). + call_trackers = await get_all_lead_call_trackers( + start_date=start_date, + end_date=end_date, + ) + + calls_attempted = 0 + calls_no_answer = 0 + + if call_trackers: + for tracker, _provider in call_trackers: + if tracker.status and tracker.status.value == "FINISHED": + calls_attempted += 1 + if tracker.outcome == "NO_ANSWER": + calls_no_answer += 1 + + calls_picked = calls_attempted - calls_no_answer + call_pickup_rate = ( + calls_picked / calls_attempted * 100 if calls_attempted > 0 else 0.0 + ) + + # ------------------------------------------------------------------ + # 2. Lead-based metrics + # ------------------------------------------------------------------ + # Phase 2 NOTE: pass reseller_id / merchant_id once supported. + lead_data = await get_lead_based_analytics( + start_date=start_date, + end_date=end_date, + ) + + total_leads = len(lead_data) if lead_data else 0 + leads_picked = 0 + + if lead_data: + for lead in lead_data: + # A lead is "picked" when more calls were finished than went unanswered + if lead["finished_calls"] > lead["no_answer_calls"]: + leads_picked += 1 + + lead_pickup_rate = leads_picked / total_leads * 100 if total_leads > 0 else 0.0 + + result = { + "calls_attempted": calls_attempted, + "calls_picked": calls_picked, + "calls_no_answer": calls_no_answer, + "call_pickup_rate": call_pickup_rate, + "total_leads": total_leads, + "leads_picked": leads_picked, + "lead_pickup_rate": lead_pickup_rate, + } + + logger.debug( + f"compute_pickup_rates [{start_date} → {end_date}] " + f"call_rate={call_pickup_rate}% lead_rate={lead_pickup_rate}%" + ) + return result + + except Exception as e: + logger.error( + f"compute_pickup_rates failed for window [{start_date} → {end_date}]: {e}", + exc_info=True, + ) + return _ERROR_RESULT diff --git a/app/services/pickup_rate/config.py b/app/services/pickup_rate/config.py new file mode 100644 index 000000000..1f3059ddf --- /dev/null +++ b/app/services/pickup_rate/config.py @@ -0,0 +1,55 @@ +""" +AlertConfig dataclass for the Pickup Rate Alert system. + +Phase 1: global alerting only. +Phase 2 will extend this with optional merchant_id / reseller_id for per-merchant scope. +""" + +from dataclasses import dataclass, field +from typing import Optional + + +@dataclass +class AlertConfig: + """Configuration for a single pickup-rate alert scope. + + Fields: + enabled: Whether alerting is active for this scope. + interval_seconds: How often (in seconds) to check and potentially alert. + Also used as the Redis TTL for the dedup key. + threshold_percent: Alert fires when pickup rate drops below this value (0-100). + alert_type: Which rate(s) to evaluate. + "CALL_BASED" – only call-based pickup rate. + "LEAD_BASED" – only lead-based pickup rate. + "BOTH" – alert if *either* rate is below threshold. + scope: Human-readable label used in Slack messages and Redis keys + (e.g. "global", "merchant:acme/store-1"). + lookback_hours: Rolling window of call data to evaluate (default: 24 h). + + Phase 2 extension fields (unused in Phase 1 – kept for forward-compatibility): + merchant_id: Merchant identifier to scope DB queries (None = no filter). + reseller_id: Reseller identifier to scope DB queries (None = no filter). + """ + + enabled: bool + interval_seconds: int + threshold_percent: float + alert_type: str = "BOTH" + scope: str = "global" + lookback_hours: int = 24 + + # Phase 2: per-merchant fields – not used yet, wired through to calculator + merchant_id: Optional[str] = None + reseller_id: Optional[str] = None + + # Computed Redis dedup key – derived from scope, not configurable directly + _redis_key: str = field(init=False, repr=False, default="") + + def __post_init__(self) -> None: + safe_scope = self.scope.replace(":", "_").replace("/", "_") + self._redis_key = f"pickup_rate:{safe_scope}:last_alert" + + @property + def redis_dedup_key(self) -> str: + """Redis key used to track last-alert timestamp for this scope.""" + return self._redis_key diff --git a/app/services/pickup_rate/monitor.py b/app/services/pickup_rate/monitor.py new file mode 100644 index 000000000..9af7806d6 --- /dev/null +++ b/app/services/pickup_rate/monitor.py @@ -0,0 +1,282 @@ +""" +Pickup Rate Monitor + +Background task that checks whether global (and, in Phase 2, per-merchant) +call pickup rates have dropped below a configured threshold, and sends a +Slack warning alert when they have. + +Edge-case handling: +- Zero calls / leads in window → skip (no data to evaluate) +- Redis unavailable → local dedup/mark operations fail open (log warning, + proceed with alert); scheduled execution still + requires BackgroundTaskScheduler Redis lock +- DB error in calculator → log error, skip this check cycle +- Slack send failure → log error, do NOT mark-alerted (will retry next cycle) +""" + +from datetime import datetime, timedelta, timezone +from typing import Any, Dict + +from app.core.config.dynamic import ( + ENABLE_PICKUP_RATE_ALERT, + PICKUP_RATE_ALERT_INTERVAL_SECONDS, + PICKUP_RATE_ALERT_THRESHOLD, +) +from app.core.logger import logger +from app.services.pickup_rate.calculator import compute_pickup_rates +from app.services.pickup_rate.config import AlertConfig +from app.services.redis import get_redis_service, is_redis_configured +from app.services.slack.alert import slack_alert + + +class PickupRateMonitor: + """Monitors pickup rates and fires Slack alerts when thresholds are breached.""" + + # ----------------------------------------------------------------------- + # Public API + # ----------------------------------------------------------------------- + + async def check_and_alert(self) -> None: + """Entry point called by BackgroundTaskScheduler on each tick.""" + config = await self.load_config() + + if not config.enabled: + logger.debug("PickupRateMonitor: alerting disabled, skipping check") + return + + now = datetime.now(timezone.utc) + start_date = now - timedelta(hours=config.lookback_hours) + + logger.info( + f"PickupRateMonitor: checking pickup rate " + f"[{start_date.isoformat()} → {now.isoformat()}] " + f"threshold={config.threshold_percent}% type={config.alert_type}" + ) + + rates = await compute_pickup_rates( + start_date=start_date, + end_date=now, + reseller_id=config.reseller_id, + merchant_id=config.merchant_id, + ) + + if rates is None: + logger.error("PickupRateMonitor: DB query failed, skipping alert cycle") + return + + # Skip when there is no data – avoids false alerts on idle systems + if rates["calls_attempted"] == 0 and rates["total_leads"] == 0: + logger.info( + "PickupRateMonitor: no calls or leads in window, skipping alert" + ) + return + + if not self._is_threshold_breached(rates, config): + logger.info( + f"PickupRateMonitor: pickup rate OK " + f"(call={rates['call_pickup_rate']:.1f}% " + f"lead={rates['lead_pickup_rate']:.1f}% " + f"threshold={config.threshold_percent}%)" + ) + return + + # Dedup check – skip if we alerted too recently + if not await self._should_alert(config.redis_dedup_key): + logger.info( + f"PickupRateMonitor: threshold breached but already alerted recently " + f"(key={config.redis_dedup_key})" + ) + return + + # ------------------------------------------------------------------ + # Build and send Slack alert directly + # ------------------------------------------------------------------ + display_date = now.strftime("%d-%m-%y") + title = f"⚠️ Low Pickup Rate Alert - {display_date}" + threshold = config.threshold_percent + + fields = [ + {"name": "Scope", "value": config.scope.replace("_", " ").title()}, + {"name": "Time Window", "value": f"Last {config.lookback_hours} hours"}, + {"name": "Threshold", "value": f"{threshold}%"}, + {"name": "Alert Type", "value": config.alert_type}, + ] + + sections = [] + + if rates["calls_attempted"] > 0: + call_status = ( + "⚠️ BELOW THRESHOLD" + if rates["call_pickup_rate"] < threshold + else "✅ OK" + ) + sections.append( + { + "title": "Call-Based Pickup Rate", + "text": ( + f"• Rate: *{rates['call_pickup_rate']:.1f}%* " + f"(threshold: {threshold}%) {call_status}\n" + f"• Calls Attempted: *{rates['calls_attempted']}*\n" + f"• Calls Picked Up: *{rates['calls_picked']}*\n" + f"• Calls No Answer: *{rates['calls_no_answer']}*" + ), + } + ) + else: + sections.append( + { + "title": "Call-Based Pickup Rate", + "text": "• No calls attempted in this window", + } + ) + + if rates["total_leads"] > 0: + lead_status = ( + "⚠️ BELOW THRESHOLD" + if rates["lead_pickup_rate"] < threshold + else "✅ OK" + ) + sections.append( + { + "title": "Lead-Based Pickup Rate", + "text": ( + f"• Rate: *{rates['lead_pickup_rate']:.1f}%* " + f"(threshold: {threshold}%) {lead_status}\n" + f"• Total Leads: *{rates['total_leads']}*\n" + f"• Leads Picked: *{rates['leads_picked']}*" + ), + } + ) + else: + sections.append( + { + "title": "Lead-Based Pickup Rate", + "text": "• No leads processed in this window", + } + ) + + fallback = ( + f"Low Pickup Rate Alert: call={rates['call_pickup_rate']:.1f}% " + f"lead={rates['lead_pickup_rate']:.1f}% threshold={threshold}%" + ) + + try: + success = await slack_alert.send( + title=title, + fields=fields, + sections=sections, + fallback_text=fallback, + ) + except Exception as e: + logger.error(f"PickupRateMonitor: exception while sending Slack alert: {e}") + success = False + + # Only mark-alerted after a successful send so we retry on Slack failure + if success: + logger.info( + f"PickupRateMonitor: alert sent for scope='{config.scope}' " + f"call_rate={rates['call_pickup_rate']:.1f}% " + f"lead_rate={rates['lead_pickup_rate']:.1f}%" + ) + await self._mark_alerted(config.redis_dedup_key, config.interval_seconds) + else: + logger.warning( + "PickupRateMonitor: Slack send failed – will retry on next cycle" + ) + + # ----------------------------------------------------------------------- + # Configuration loading + # ----------------------------------------------------------------------- + + async def load_config(self) -> AlertConfig: + """Fetch runtime config from dynamic (Redis/DevCycle) getters.""" + enabled = await ENABLE_PICKUP_RATE_ALERT() + interval_seconds = await PICKUP_RATE_ALERT_INTERVAL_SECONDS() + threshold_percent = await PICKUP_RATE_ALERT_THRESHOLD() + + return AlertConfig( + enabled=enabled, + interval_seconds=interval_seconds, + threshold_percent=threshold_percent, + alert_type="BOTH", + scope="global", + lookback_hours=max(1, interval_seconds // 3600), + ) + + # ----------------------------------------------------------------------- + # Threshold evaluation + # ----------------------------------------------------------------------- + + def _is_threshold_breached( + self, rates: Dict[str, Any], config: AlertConfig + ) -> bool: + """Return True when the configured rate(s) fall below the threshold.""" + threshold = config.threshold_percent + alert_type = config.alert_type.upper() + + call_breached = rates["call_pickup_rate"] < threshold + lead_breached = rates["lead_pickup_rate"] < threshold + + # Only evaluate the relevant rate(s) depending on alert_type + if alert_type == "CALL_BASED": + # Skip lead check when no call data, but still evaluate call rate + if rates["calls_attempted"] == 0: + return False + return call_breached + + if alert_type == "LEAD_BASED": + if rates["total_leads"] == 0: + return False + return lead_breached + + # Default: "BOTH" – alert if *either* rate is below threshold + # Guard against checking a rate with zero denominator + call_ok = rates["calls_attempted"] == 0 or not call_breached + lead_ok = rates["total_leads"] == 0 or not lead_breached + return not (call_ok and lead_ok) + + # ----------------------------------------------------------------------- + # Redis dedup helpers + # ----------------------------------------------------------------------- + + async def _should_alert(self, redis_key: str) -> bool: + """Return True when it is safe to send an alert (dedup TTL has expired). + + Fails open: if Redis is unavailable the alert is allowed to fire so + we do not silently drop warnings on infrastructure issues. + """ + if not is_redis_configured(): + logger.warning( + "PickupRateMonitor: Redis not configured – skipping dedup check, " + "proceeding with alert" + ) + return True + + try: + redis_service = await get_redis_service() + key_exists = await redis_service.exists(redis_key) + return not key_exists + except Exception as e: + logger.warning( + f"PickupRateMonitor: Redis dedup check failed ({e}) – " + "proceeding with alert (fail-open)" + ) + return True + + async def _mark_alerted(self, redis_key: str, interval_seconds: int) -> None: + """Set the Redis dedup key with TTL = interval_seconds.""" + if not is_redis_configured(): + return + + try: + redis_service = await get_redis_service() + await redis_service.setex(redis_key, "1", interval_seconds) + logger.debug( + f"PickupRateMonitor: marked alerted (key={redis_key} ttl={interval_seconds}s)" + ) + except Exception as e: + logger.warning(f"PickupRateMonitor: failed to mark alerted in Redis: {e}") + + +# Module-level singleton (mirrors score_monitor pattern) +pickup_rate_monitor = PickupRateMonitor() diff --git a/app/services/pickup_rate/task.py b/app/services/pickup_rate/task.py new file mode 100644 index 000000000..858d6e94a --- /dev/null +++ b/app/services/pickup_rate/task.py @@ -0,0 +1,55 @@ +""" +Pickup Rate Alert Background Task Initialization + +Registers the pickup_rate_monitor task with the BackgroundTaskScheduler. +Mirrors the pattern used by app/services/langfuse/tasks/task.py. +""" + +from app.core.config.dynamic import PICKUP_RATE_ALERT_INTERVAL_SECONDS +from app.core.config.static import SLACK_WEBHOOK_URL +from app.core.logger import logger +from app.services.pickup_rate.monitor import pickup_rate_monitor + + +async def initialize_pickup_rate_tasks(scheduler) -> bool: + """ + Register pickup rate alert task if required configuration is present. + + The task is always registered when SLACK_WEBHOOK_URL is configured. + ENABLE_PICKUP_RATE_ALERT is re-read on each scheduler tick inside + check_and_alert(), so alerts can be enabled or disabled at runtime via + Redis/DevCycle without a restart. + + PICKUP_RATE_ALERT_INTERVAL_SECONDS is read at registration time and + determines the scheduler cadence. Changing it in Redis will not affect + an already-registered task until the process restarts. + + Args: + scheduler: BackgroundTaskScheduler instance to register the task with. + + Returns: + True if the task was registered, False if skipped. + """ + if not SLACK_WEBHOOK_URL: + logger.warning( + "PickupRateMonitor: SLACK_WEBHOOK_URL not configured – skipping task registration" + ) + return False + + interval_seconds = await PICKUP_RATE_ALERT_INTERVAL_SECONDS() + + try: + scheduler.register_task( + name="pickup_rate_monitor", + func=pickup_rate_monitor.check_and_alert, + interval_seconds=interval_seconds, + ) + logger.info( + f"Registered pickup_rate_monitor background task " + f"(interval={interval_seconds}s)" + ) + return True + + except Exception as e: + logger.error(f"Failed to register pickup_rate_monitor task: {e}") + return False