From 7a70e9340b89b040d1b155e6fda4c58d864cbe0f Mon Sep 17 00:00:00 2001 From: "devansh.raj@juspay.in" Date: Mon, 20 Apr 2026 12:30:29 +0530 Subject: [PATCH 1/2] feat(breeze-buddy): add global pickup rate alert background task Adds a periodic background task that monitors call pickup rates and sends Slack warning alerts when rates drop below a configurable threshold. - Add app/services/pickup_rate/ module with: - config.py: AlertConfig dataclass (enabled, interval, threshold, alert_type, scope) with per-merchant fields stubbed for Phase 2 extension - calculator.py: compute_pickup_rates() mirroring ScoreMonitor._get_daily_call_stats() for both call-based and lead-based rate calculation - monitor.py: PickupRateMonitor class with threshold evaluation, Redis-based dedup (fail-open), and inline slack_alert.send() for alerts - task.py: initialize_pickup_rate_tasks() following langfuse task registration pattern - Add 3 dynamic config getters in dynamic.py: ENABLE_PICKUP_RATE_ALERT, PICKUP_RATE_ALERT_INTERVAL_SECONDS, PICKUP_RATE_ALERT_THRESHOLD - Register task in main.py lifespan after Langfuse task registration Edge cases handled: zero calls/leads skipped, Redis unavailable fails open, DB errors skip the cycle, Slack failures skip mark-alerted for retry. Lookback window derived from interval_seconds (no separate config needed). Per-merchant alerting scaffolded for Phase 2 (call_execution_config + DB filters). --- app/core/config/dynamic.py | 16 + app/main.py | 2 + .../langfuse/tasks/score_monitor/score.py | 30 +- app/services/pickup_rate/__init__.py | 15 + app/services/pickup_rate/calculator.py | 121 ++++++++ app/services/pickup_rate/config.py | 55 ++++ app/services/pickup_rate/monitor.py | 282 ++++++++++++++++++ app/services/pickup_rate/task.py | 55 ++++ 8 files changed, 560 insertions(+), 16 deletions(-) create mode 100644 app/services/pickup_rate/__init__.py create mode 100644 app/services/pickup_rate/calculator.py create mode 100644 app/services/pickup_rate/config.py create mode 100644 app/services/pickup_rate/monitor.py create mode 100644 app/services/pickup_rate/task.py diff --git a/app/core/config/dynamic.py b/app/core/config/dynamic.py index 68ecc15aa..45711ea2c 100644 --- a/app/core/config/dynamic.py +++ b/app/core/config/dynamic.py @@ -46,6 +46,22 @@ async def ENABLE_BACKGROUND_TASKS() -> bool: return await get_config("ENABLE_BACKGROUND_TASKS", "false", bool) +# --- Pickup Rate Alert Configuration --- +async def ENABLE_PICKUP_RATE_ALERT() -> bool: + """Returns ENABLE_PICKUP_RATE_ALERT from Redis""" + return await get_config("ENABLE_PICKUP_RATE_ALERT", "false", bool) + + +async def PICKUP_RATE_ALERT_INTERVAL_SECONDS() -> int: + """Returns PICKUP_RATE_ALERT_INTERVAL_SECONDS from Redis (default: 86400 = 24 h)""" + return await get_config("PICKUP_RATE_ALERT_INTERVAL_SECONDS", 86400, int) + + +async def PICKUP_RATE_ALERT_THRESHOLD() -> float: + """Returns PICKUP_RATE_ALERT_THRESHOLD from Redis (default: 40.0 %)""" + return await get_config("PICKUP_RATE_ALERT_THRESHOLD", 40.0, float) + + async def DAILY_SUMMARY_HOUR() -> int: """Returns DAILY_SUMMARY_HOUR from Redis (24-hour format: 0-23)""" return await get_config("DAILY_SUMMARY_HOUR", 21, int) diff --git a/app/main.py b/app/main.py index ce38c09c2..f55bf4073 100644 --- a/app/main.py +++ b/app/main.py @@ -66,6 +66,7 @@ AutomaticVoiceUserConnectRequest, ) from app.services.langfuse.tasks.task import initialize_langfuse_tasks +from app.services.pickup_rate.task import initialize_pickup_rate_tasks from app.services.redis import ( close_redis_connections, get_redis_service, @@ -168,6 +169,7 @@ async def lifespan(_app: FastAPI): await initialize_langfuse_tasks(_background_scheduler) ### Register new tasks here + await initialize_pickup_rate_tasks(_background_scheduler) # Start the scheduler only if tasks are registered if _background_scheduler.tasks: diff --git a/app/services/langfuse/tasks/score_monitor/score.py b/app/services/langfuse/tasks/score_monitor/score.py index 7399b529b..d5eb87fed 100644 --- a/app/services/langfuse/tasks/score_monitor/score.py +++ b/app/services/langfuse/tasks/score_monitor/score.py @@ -440,24 +440,22 @@ async def _get_daily_call_stats(self) -> Dict[str, Any]: # Process each call tracker for call-based stats for tracker, calling_provider in call_trackers: - # Count attempted calls (FINISHED status) + # Count attempted calls and outcomes only for FINISHED calls if tracker.status and tracker.status.value == "FINISHED": calls_attempted += 1 - - # Count by outcome - outcome_value = tracker.outcome if tracker.outcome else None - if outcome_value == "NO_ANSWER": - calls_no_answer += 1 - elif outcome_value == "CONFIRM": - calls_confirm += 1 - elif outcome_value == "CANCEL": - calls_cancel += 1 - elif outcome_value == "ADDRESS_UPDATED": - calls_address_updated += 1 - elif outcome_value == "BUSY": - calls_busy += 1 - - # Count by provider + outcome_value = tracker.outcome if tracker.outcome else None + if outcome_value == "NO_ANSWER": + calls_no_answer += 1 + elif outcome_value == "CONFIRM": + calls_confirm += 1 + elif outcome_value == "CANCEL": + calls_cancel += 1 + elif outcome_value == "ADDRESS_UPDATED": + calls_address_updated += 1 + elif outcome_value == "BUSY": + calls_busy += 1 + + # Count by provider regardless of status if calling_provider: provider_upper = calling_provider.upper() if provider_upper in provider_counts: diff --git a/app/services/pickup_rate/__init__.py b/app/services/pickup_rate/__init__.py new file mode 100644 index 000000000..2ae050191 --- /dev/null +++ b/app/services/pickup_rate/__init__.py @@ -0,0 +1,15 @@ +""" +Pickup Rate Alert Service + +Monitors call pickup rates and sends Slack warning alerts when rates drop +below configurable thresholds. +""" + +from app.services.pickup_rate.monitor import PickupRateMonitor, pickup_rate_monitor +from app.services.pickup_rate.task import initialize_pickup_rate_tasks + +__all__ = [ + "PickupRateMonitor", + "pickup_rate_monitor", + "initialize_pickup_rate_tasks", +] diff --git a/app/services/pickup_rate/calculator.py b/app/services/pickup_rate/calculator.py new file mode 100644 index 000000000..5425b46bb --- /dev/null +++ b/app/services/pickup_rate/calculator.py @@ -0,0 +1,121 @@ +""" +Pickup Rate Calculator + +Computes call-based and lead-based pickup rates for a given time window, +mirroring the logic in ScoreMonitor._get_daily_call_stats(). + +Phase 1: merchant_id / reseller_id params are accepted but ignored. +Phase 2: pass them through to the DB accessor filters once those are wired up. +""" + +from datetime import datetime +from typing import Any, Dict, Optional + +from app.core.logger import logger +from app.database.accessor.breeze_buddy.lead_call_tracker import ( + get_all_lead_call_trackers, + get_lead_based_analytics, +) + +# Sentinel value returned on DB error – callers must check for None +_ERROR_RESULT = None + + +async def compute_pickup_rates( + start_date: datetime, + end_date: datetime, + reseller_id: Optional[str] = None, # Phase 2: filter by reseller + merchant_id: Optional[str] = None, # Phase 2: filter by merchant +) -> Optional[Dict[str, Any]]: + """ + Calculate call-based and lead-based pickup rates for the given window. + + Args: + start_date: Start of the rolling window (UTC-aware). + end_date: End of the rolling window (UTC-aware). + reseller_id: (Phase 2) Restrict results to this reseller. + merchant_id: (Phase 2) Restrict results to this merchant. + + Returns: + Dictionary with the following keys on success:: + + { + "calls_attempted": int, # FINISHED calls in window + "calls_picked": int, # attempted - NO_ANSWER + "calls_no_answer": int, + "call_pickup_rate": float, # (picked / attempted) * 100 + "total_leads": int, # unique request_ids + "leads_picked": int, # leads where finished > no_answer + "lead_pickup_rate": float, # (leads_picked / total_leads) * 100 + } + + Returns ``None`` if a DB error occurs (caller should skip alerting). + """ + try: + # ------------------------------------------------------------------ + # 1. Call-based metrics + # ------------------------------------------------------------------ + # Phase 2 NOTE: pass reseller_id / merchant_id to the accessor once + # those optional params are supported by get_all_lead_call_trackers(). + call_trackers = await get_all_lead_call_trackers( + start_date=start_date, + end_date=end_date, + ) + + calls_attempted = 0 + calls_no_answer = 0 + + if call_trackers: + for tracker, _provider in call_trackers: + if tracker.status and tracker.status.value == "FINISHED": + calls_attempted += 1 + if tracker.outcome == "NO_ANSWER": + calls_no_answer += 1 + + calls_picked = calls_attempted - calls_no_answer + call_pickup_rate = ( + calls_picked / calls_attempted * 100 if calls_attempted > 0 else 0.0 + ) + + # ------------------------------------------------------------------ + # 2. Lead-based metrics + # ------------------------------------------------------------------ + # Phase 2 NOTE: pass reseller_id / merchant_id once supported. + lead_data = await get_lead_based_analytics( + start_date=start_date, + end_date=end_date, + ) + + total_leads = len(lead_data) if lead_data else 0 + leads_picked = 0 + + if lead_data: + for lead in lead_data: + # A lead is "picked" when more calls were finished than went unanswered + if lead["finished_calls"] > lead["no_answer_calls"]: + leads_picked += 1 + + lead_pickup_rate = leads_picked / total_leads * 100 if total_leads > 0 else 0.0 + + result = { + "calls_attempted": calls_attempted, + "calls_picked": calls_picked, + "calls_no_answer": calls_no_answer, + "call_pickup_rate": call_pickup_rate, + "total_leads": total_leads, + "leads_picked": leads_picked, + "lead_pickup_rate": lead_pickup_rate, + } + + logger.debug( + f"compute_pickup_rates [{start_date} → {end_date}] " + f"call_rate={call_pickup_rate}% lead_rate={lead_pickup_rate}%" + ) + return result + + except Exception as e: + logger.error( + f"compute_pickup_rates failed for window [{start_date} → {end_date}]: {e}", + exc_info=True, + ) + return _ERROR_RESULT diff --git a/app/services/pickup_rate/config.py b/app/services/pickup_rate/config.py new file mode 100644 index 000000000..1f3059ddf --- /dev/null +++ b/app/services/pickup_rate/config.py @@ -0,0 +1,55 @@ +""" +AlertConfig dataclass for the Pickup Rate Alert system. + +Phase 1: global alerting only. +Phase 2 will extend this with optional merchant_id / reseller_id for per-merchant scope. +""" + +from dataclasses import dataclass, field +from typing import Optional + + +@dataclass +class AlertConfig: + """Configuration for a single pickup-rate alert scope. + + Fields: + enabled: Whether alerting is active for this scope. + interval_seconds: How often (in seconds) to check and potentially alert. + Also used as the Redis TTL for the dedup key. + threshold_percent: Alert fires when pickup rate drops below this value (0-100). + alert_type: Which rate(s) to evaluate. + "CALL_BASED" – only call-based pickup rate. + "LEAD_BASED" – only lead-based pickup rate. + "BOTH" – alert if *either* rate is below threshold. + scope: Human-readable label used in Slack messages and Redis keys + (e.g. "global", "merchant:acme/store-1"). + lookback_hours: Rolling window of call data to evaluate (default: 24 h). + + Phase 2 extension fields (unused in Phase 1 – kept for forward-compatibility): + merchant_id: Merchant identifier to scope DB queries (None = no filter). + reseller_id: Reseller identifier to scope DB queries (None = no filter). + """ + + enabled: bool + interval_seconds: int + threshold_percent: float + alert_type: str = "BOTH" + scope: str = "global" + lookback_hours: int = 24 + + # Phase 2: per-merchant fields – not used yet, wired through to calculator + merchant_id: Optional[str] = None + reseller_id: Optional[str] = None + + # Computed Redis dedup key – derived from scope, not configurable directly + _redis_key: str = field(init=False, repr=False, default="") + + def __post_init__(self) -> None: + safe_scope = self.scope.replace(":", "_").replace("/", "_") + self._redis_key = f"pickup_rate:{safe_scope}:last_alert" + + @property + def redis_dedup_key(self) -> str: + """Redis key used to track last-alert timestamp for this scope.""" + return self._redis_key diff --git a/app/services/pickup_rate/monitor.py b/app/services/pickup_rate/monitor.py new file mode 100644 index 000000000..9af7806d6 --- /dev/null +++ b/app/services/pickup_rate/monitor.py @@ -0,0 +1,282 @@ +""" +Pickup Rate Monitor + +Background task that checks whether global (and, in Phase 2, per-merchant) +call pickup rates have dropped below a configured threshold, and sends a +Slack warning alert when they have. + +Edge-case handling: +- Zero calls / leads in window → skip (no data to evaluate) +- Redis unavailable → local dedup/mark operations fail open (log warning, + proceed with alert); scheduled execution still + requires BackgroundTaskScheduler Redis lock +- DB error in calculator → log error, skip this check cycle +- Slack send failure → log error, do NOT mark-alerted (will retry next cycle) +""" + +from datetime import datetime, timedelta, timezone +from typing import Any, Dict + +from app.core.config.dynamic import ( + ENABLE_PICKUP_RATE_ALERT, + PICKUP_RATE_ALERT_INTERVAL_SECONDS, + PICKUP_RATE_ALERT_THRESHOLD, +) +from app.core.logger import logger +from app.services.pickup_rate.calculator import compute_pickup_rates +from app.services.pickup_rate.config import AlertConfig +from app.services.redis import get_redis_service, is_redis_configured +from app.services.slack.alert import slack_alert + + +class PickupRateMonitor: + """Monitors pickup rates and fires Slack alerts when thresholds are breached.""" + + # ----------------------------------------------------------------------- + # Public API + # ----------------------------------------------------------------------- + + async def check_and_alert(self) -> None: + """Entry point called by BackgroundTaskScheduler on each tick.""" + config = await self.load_config() + + if not config.enabled: + logger.debug("PickupRateMonitor: alerting disabled, skipping check") + return + + now = datetime.now(timezone.utc) + start_date = now - timedelta(hours=config.lookback_hours) + + logger.info( + f"PickupRateMonitor: checking pickup rate " + f"[{start_date.isoformat()} → {now.isoformat()}] " + f"threshold={config.threshold_percent}% type={config.alert_type}" + ) + + rates = await compute_pickup_rates( + start_date=start_date, + end_date=now, + reseller_id=config.reseller_id, + merchant_id=config.merchant_id, + ) + + if rates is None: + logger.error("PickupRateMonitor: DB query failed, skipping alert cycle") + return + + # Skip when there is no data – avoids false alerts on idle systems + if rates["calls_attempted"] == 0 and rates["total_leads"] == 0: + logger.info( + "PickupRateMonitor: no calls or leads in window, skipping alert" + ) + return + + if not self._is_threshold_breached(rates, config): + logger.info( + f"PickupRateMonitor: pickup rate OK " + f"(call={rates['call_pickup_rate']:.1f}% " + f"lead={rates['lead_pickup_rate']:.1f}% " + f"threshold={config.threshold_percent}%)" + ) + return + + # Dedup check – skip if we alerted too recently + if not await self._should_alert(config.redis_dedup_key): + logger.info( + f"PickupRateMonitor: threshold breached but already alerted recently " + f"(key={config.redis_dedup_key})" + ) + return + + # ------------------------------------------------------------------ + # Build and send Slack alert directly + # ------------------------------------------------------------------ + display_date = now.strftime("%d-%m-%y") + title = f"⚠️ Low Pickup Rate Alert - {display_date}" + threshold = config.threshold_percent + + fields = [ + {"name": "Scope", "value": config.scope.replace("_", " ").title()}, + {"name": "Time Window", "value": f"Last {config.lookback_hours} hours"}, + {"name": "Threshold", "value": f"{threshold}%"}, + {"name": "Alert Type", "value": config.alert_type}, + ] + + sections = [] + + if rates["calls_attempted"] > 0: + call_status = ( + "⚠️ BELOW THRESHOLD" + if rates["call_pickup_rate"] < threshold + else "✅ OK" + ) + sections.append( + { + "title": "Call-Based Pickup Rate", + "text": ( + f"• Rate: *{rates['call_pickup_rate']:.1f}%* " + f"(threshold: {threshold}%) {call_status}\n" + f"• Calls Attempted: *{rates['calls_attempted']}*\n" + f"• Calls Picked Up: *{rates['calls_picked']}*\n" + f"• Calls No Answer: *{rates['calls_no_answer']}*" + ), + } + ) + else: + sections.append( + { + "title": "Call-Based Pickup Rate", + "text": "• No calls attempted in this window", + } + ) + + if rates["total_leads"] > 0: + lead_status = ( + "⚠️ BELOW THRESHOLD" + if rates["lead_pickup_rate"] < threshold + else "✅ OK" + ) + sections.append( + { + "title": "Lead-Based Pickup Rate", + "text": ( + f"• Rate: *{rates['lead_pickup_rate']:.1f}%* " + f"(threshold: {threshold}%) {lead_status}\n" + f"• Total Leads: *{rates['total_leads']}*\n" + f"• Leads Picked: *{rates['leads_picked']}*" + ), + } + ) + else: + sections.append( + { + "title": "Lead-Based Pickup Rate", + "text": "• No leads processed in this window", + } + ) + + fallback = ( + f"Low Pickup Rate Alert: call={rates['call_pickup_rate']:.1f}% " + f"lead={rates['lead_pickup_rate']:.1f}% threshold={threshold}%" + ) + + try: + success = await slack_alert.send( + title=title, + fields=fields, + sections=sections, + fallback_text=fallback, + ) + except Exception as e: + logger.error(f"PickupRateMonitor: exception while sending Slack alert: {e}") + success = False + + # Only mark-alerted after a successful send so we retry on Slack failure + if success: + logger.info( + f"PickupRateMonitor: alert sent for scope='{config.scope}' " + f"call_rate={rates['call_pickup_rate']:.1f}% " + f"lead_rate={rates['lead_pickup_rate']:.1f}%" + ) + await self._mark_alerted(config.redis_dedup_key, config.interval_seconds) + else: + logger.warning( + "PickupRateMonitor: Slack send failed – will retry on next cycle" + ) + + # ----------------------------------------------------------------------- + # Configuration loading + # ----------------------------------------------------------------------- + + async def load_config(self) -> AlertConfig: + """Fetch runtime config from dynamic (Redis/DevCycle) getters.""" + enabled = await ENABLE_PICKUP_RATE_ALERT() + interval_seconds = await PICKUP_RATE_ALERT_INTERVAL_SECONDS() + threshold_percent = await PICKUP_RATE_ALERT_THRESHOLD() + + return AlertConfig( + enabled=enabled, + interval_seconds=interval_seconds, + threshold_percent=threshold_percent, + alert_type="BOTH", + scope="global", + lookback_hours=max(1, interval_seconds // 3600), + ) + + # ----------------------------------------------------------------------- + # Threshold evaluation + # ----------------------------------------------------------------------- + + def _is_threshold_breached( + self, rates: Dict[str, Any], config: AlertConfig + ) -> bool: + """Return True when the configured rate(s) fall below the threshold.""" + threshold = config.threshold_percent + alert_type = config.alert_type.upper() + + call_breached = rates["call_pickup_rate"] < threshold + lead_breached = rates["lead_pickup_rate"] < threshold + + # Only evaluate the relevant rate(s) depending on alert_type + if alert_type == "CALL_BASED": + # Skip lead check when no call data, but still evaluate call rate + if rates["calls_attempted"] == 0: + return False + return call_breached + + if alert_type == "LEAD_BASED": + if rates["total_leads"] == 0: + return False + return lead_breached + + # Default: "BOTH" – alert if *either* rate is below threshold + # Guard against checking a rate with zero denominator + call_ok = rates["calls_attempted"] == 0 or not call_breached + lead_ok = rates["total_leads"] == 0 or not lead_breached + return not (call_ok and lead_ok) + + # ----------------------------------------------------------------------- + # Redis dedup helpers + # ----------------------------------------------------------------------- + + async def _should_alert(self, redis_key: str) -> bool: + """Return True when it is safe to send an alert (dedup TTL has expired). + + Fails open: if Redis is unavailable the alert is allowed to fire so + we do not silently drop warnings on infrastructure issues. + """ + if not is_redis_configured(): + logger.warning( + "PickupRateMonitor: Redis not configured – skipping dedup check, " + "proceeding with alert" + ) + return True + + try: + redis_service = await get_redis_service() + key_exists = await redis_service.exists(redis_key) + return not key_exists + except Exception as e: + logger.warning( + f"PickupRateMonitor: Redis dedup check failed ({e}) – " + "proceeding with alert (fail-open)" + ) + return True + + async def _mark_alerted(self, redis_key: str, interval_seconds: int) -> None: + """Set the Redis dedup key with TTL = interval_seconds.""" + if not is_redis_configured(): + return + + try: + redis_service = await get_redis_service() + await redis_service.setex(redis_key, "1", interval_seconds) + logger.debug( + f"PickupRateMonitor: marked alerted (key={redis_key} ttl={interval_seconds}s)" + ) + except Exception as e: + logger.warning(f"PickupRateMonitor: failed to mark alerted in Redis: {e}") + + +# Module-level singleton (mirrors score_monitor pattern) +pickup_rate_monitor = PickupRateMonitor() diff --git a/app/services/pickup_rate/task.py b/app/services/pickup_rate/task.py new file mode 100644 index 000000000..858d6e94a --- /dev/null +++ b/app/services/pickup_rate/task.py @@ -0,0 +1,55 @@ +""" +Pickup Rate Alert Background Task Initialization + +Registers the pickup_rate_monitor task with the BackgroundTaskScheduler. +Mirrors the pattern used by app/services/langfuse/tasks/task.py. +""" + +from app.core.config.dynamic import PICKUP_RATE_ALERT_INTERVAL_SECONDS +from app.core.config.static import SLACK_WEBHOOK_URL +from app.core.logger import logger +from app.services.pickup_rate.monitor import pickup_rate_monitor + + +async def initialize_pickup_rate_tasks(scheduler) -> bool: + """ + Register pickup rate alert task if required configuration is present. + + The task is always registered when SLACK_WEBHOOK_URL is configured. + ENABLE_PICKUP_RATE_ALERT is re-read on each scheduler tick inside + check_and_alert(), so alerts can be enabled or disabled at runtime via + Redis/DevCycle without a restart. + + PICKUP_RATE_ALERT_INTERVAL_SECONDS is read at registration time and + determines the scheduler cadence. Changing it in Redis will not affect + an already-registered task until the process restarts. + + Args: + scheduler: BackgroundTaskScheduler instance to register the task with. + + Returns: + True if the task was registered, False if skipped. + """ + if not SLACK_WEBHOOK_URL: + logger.warning( + "PickupRateMonitor: SLACK_WEBHOOK_URL not configured – skipping task registration" + ) + return False + + interval_seconds = await PICKUP_RATE_ALERT_INTERVAL_SECONDS() + + try: + scheduler.register_task( + name="pickup_rate_monitor", + func=pickup_rate_monitor.check_and_alert, + interval_seconds=interval_seconds, + ) + logger.info( + f"Registered pickup_rate_monitor background task " + f"(interval={interval_seconds}s)" + ) + return True + + except Exception as e: + logger.error(f"Failed to register pickup_rate_monitor task: {e}") + return False From 351f824cbb00109ed756d5337ad3cdeb977120cf Mon Sep 17 00:00:00 2001 From: "devansh.raj@juspay.in" Date: Tue, 21 Apr 2026 12:56:21 +0530 Subject: [PATCH 2/2] feat: extend pickup rate alerting to per-merchant configuration - Add migration 025 to extend merchants table with pickup_rate_alert_enabled and pickup_rate_alert_threshold columns - Add pickup_rate_alert_enabled/threshold fields to MerchantCreate, MerchantUpdate and MerchantResponse schemas - Update merchants query/accessor/decoder to carry new alert config fields - Wire merchant_id filter through lead_call_tracker query and accessor layers to scope pickup rate calculations per merchant - Refactor PickupRateMonitor.check_and_alert() to delegate shared alert logic to _run_alert_for_config() for reuse across global and per-merchant paths - Add PickupRateMonitor.check_all_merchants() to loop all enabled merchants with isolated per-merchant error handling and scoped Redis dedup keys - Register pickup_rate_monitor_merchants background task alongside existing global task; global alerting unchanged and runs in parallel --- .../breeze_buddy/merchants/handlers.py | 12 ++ app/core/config/dynamic.py | 16 +- .../breeze_buddy/lead_call_tracker.py | 28 +++ .../accessor/breeze_buddy/merchants.py | 35 ++++ .../decoder/breeze_buddy/merchants.py | 2 + ...025_add_pickup_rate_alert_to_merchants.sql | 14 ++ .../queries/breeze_buddy/lead_call_tracker.py | 50 +++++ .../queries/breeze_buddy/merchants.py | 77 ++++++- app/schemas/breeze_buddy/merchants.py | 12 ++ app/services/pickup_rate/__init__.py | 2 +- app/services/pickup_rate/calculator.py | 36 +--- app/services/pickup_rate/config.py | 19 +- app/services/pickup_rate/monitor.py | 197 +++++++++++++----- app/services/pickup_rate/task.py | 76 +++++-- 14 files changed, 447 insertions(+), 129 deletions(-) create mode 100644 app/database/migrations/025_add_pickup_rate_alert_to_merchants.sql diff --git a/app/api/routers/breeze_buddy/merchants/handlers.py b/app/api/routers/breeze_buddy/merchants/handlers.py index b806c7e5d..b46550579 100644 --- a/app/api/routers/breeze_buddy/merchants/handlers.py +++ b/app/api/routers/breeze_buddy/merchants/handlers.py @@ -107,6 +107,8 @@ async def create_merchant_handler( merchant_data.is_active if merchant_data.is_active is not None else True ), reseller_id=reseller_id, + pickup_rate_alert_enabled=merchant_data.pickup_rate_alert_enabled or False, + pickup_rate_alert_threshold=merchant_data.pickup_rate_alert_threshold, ) if not merchant: @@ -235,12 +237,22 @@ async def update_merchant_handler( ) reseller_id = merchant_data.reseller_id + # Detect if pickup_rate_alert_threshold was explicitly set to null + # (vs simply not included in the request body). + clear_threshold = ( + "pickup_rate_alert_threshold" in merchant_data.model_fields_set + and merchant_data.pickup_rate_alert_threshold is None + ) + updated_merchant = await merchant_accessors.update_merchant( merchant_id=merchant_id, name=merchant_data.name, description=merchant_data.description, is_active=merchant_data.is_active, reseller_id=reseller_id, + pickup_rate_alert_enabled=merchant_data.pickup_rate_alert_enabled, + pickup_rate_alert_threshold=merchant_data.pickup_rate_alert_threshold, + clear_pickup_rate_alert_threshold=clear_threshold, ) if not updated_merchant: diff --git a/app/core/config/dynamic.py b/app/core/config/dynamic.py index 45711ea2c..4dfb847e6 100644 --- a/app/core/config/dynamic.py +++ b/app/core/config/dynamic.py @@ -53,13 +53,21 @@ async def ENABLE_PICKUP_RATE_ALERT() -> bool: async def PICKUP_RATE_ALERT_INTERVAL_SECONDS() -> int: - """Returns PICKUP_RATE_ALERT_INTERVAL_SECONDS from Redis (default: 86400 = 24 h)""" - return await get_config("PICKUP_RATE_ALERT_INTERVAL_SECONDS", 86400, int) + """Returns PICKUP_RATE_ALERT_INTERVAL_SECONDS from Redis (default: 86400 = 24 h). + + Minimum enforced value is 60 seconds to prevent runaway task loops. + """ + value = await get_config("PICKUP_RATE_ALERT_INTERVAL_SECONDS", 86400, int) + return max(60, value) async def PICKUP_RATE_ALERT_THRESHOLD() -> float: - """Returns PICKUP_RATE_ALERT_THRESHOLD from Redis (default: 40.0 %)""" - return await get_config("PICKUP_RATE_ALERT_THRESHOLD", 40.0, float) + """Returns PICKUP_RATE_ALERT_THRESHOLD from Redis (default: 40.0 %). + + Clamped to [0, 100] to guard against misconfiguration. + """ + value = await get_config("PICKUP_RATE_ALERT_THRESHOLD", 40.0, float) + return max(0.0, min(100.0, value)) async def DAILY_SUMMARY_HOUR() -> int: diff --git a/app/database/accessor/breeze_buddy/lead_call_tracker.py b/app/database/accessor/breeze_buddy/lead_call_tracker.py index 308e3209e..44cf03eec 100644 --- a/app/database/accessor/breeze_buddy/lead_call_tracker.py +++ b/app/database/accessor/breeze_buddy/lead_call_tracker.py @@ -14,6 +14,7 @@ abort_lead_by_id_query, acquire_lock_on_lead_by_id_query, get_all_lead_call_trackers_query, + get_call_based_pickup_rate_query, get_lead_based_analytics_query, get_lead_by_call_id_query, get_lead_by_id_query, @@ -426,6 +427,7 @@ async def get_all_lead_call_trackers( shop_name: Optional[str] = None, page: Optional[int] = None, page_size: Optional[int] = None, + merchant_id: Optional[str] = None, ) -> List[Tuple[LeadCallTracker, Optional[str]]]: """ Get all lead call trackers with optional filters and pagination. @@ -444,6 +446,7 @@ async def get_all_lead_call_trackers( shop_name=shop_name, limit=limit, offset=offset, + merchant_id=merchant_id, ) result = await run_parameterized_query(query_text, values) if result: @@ -513,9 +516,33 @@ async def get_lead_call_trackers_count( return 0 +async def get_call_based_pickup_rate( + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + merchant_id: Optional[str] = None, +) -> Tuple[int, int]: + """Get call-based pickup rate as a single aggregated SQL result. + + Returns: + Tuple of (calls_attempted, calls_no_answer) — both ints. + Raises on DB error so the caller can handle failure explicitly. + """ + query_text, values = get_call_based_pickup_rate_query( + start_date=start_date, + end_date=end_date, + merchant_id=merchant_id, + ) + result = await run_parameterized_query(query_text, values) + if result: + row = result[0] + return int(row["calls_attempted"]), int(row["calls_no_answer"]) + return 0, 0 + + async def get_lead_based_analytics( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, + merchant_id: Optional[str] = None, ) -> List[asyncpg.Record]: """ Get per-lead call data for analytics. @@ -527,6 +554,7 @@ async def get_lead_based_analytics( query_text, values = get_lead_based_analytics_query( start_date=start_date, end_date=end_date, + merchant_id=merchant_id, ) result = await run_parameterized_query(query_text, values) return result if result else [] diff --git a/app/database/accessor/breeze_buddy/merchants.py b/app/database/accessor/breeze_buddy/merchants.py index 8826a79fd..91c4bb439 100644 --- a/app/database/accessor/breeze_buddy/merchants.py +++ b/app/database/accessor/breeze_buddy/merchants.py @@ -19,6 +19,7 @@ get_merchant_by_merchant_identifier_query, get_merchants_by_ids_query, get_merchants_by_reseller_query, + get_merchants_with_pickup_rate_alert_enabled_query, update_merchant_query, ) from app.schemas.breeze_buddy.merchants import MerchantResponse @@ -49,6 +50,8 @@ async def create_merchant( name: Optional[str] = None, description: Optional[str] = None, is_active: bool = True, + pickup_rate_alert_enabled: bool = False, + pickup_rate_alert_threshold: Optional[float] = None, ) -> Optional[MerchantResponse]: """Create a new merchant entity (business entity). @@ -58,6 +61,8 @@ async def create_merchant( name: Optional display name description: Optional description is_active: Active status (default: true) + pickup_rate_alert_enabled: Enable per-merchant pickup rate alerting + pickup_rate_alert_threshold: Alert threshold %; None falls back to global config Returns: MerchantResponse if successful, None otherwise @@ -68,6 +73,8 @@ async def create_merchant( name=name, description=description, is_active=is_active, + pickup_rate_alert_enabled=pickup_rate_alert_enabled, + pickup_rate_alert_threshold=pickup_rate_alert_threshold, ) try: @@ -258,6 +265,9 @@ async def update_merchant( description: Optional[str] = None, is_active: Optional[bool] = None, reseller_id: Optional[str] = None, + pickup_rate_alert_enabled: Optional[bool] = None, + pickup_rate_alert_threshold: Optional[float] = None, + clear_pickup_rate_alert_threshold: bool = False, ) -> Optional[MerchantResponse]: """Update merchant entity (merchant_id cannot be changed). @@ -267,6 +277,11 @@ async def update_merchant( description: New description is_active: New active status reseller_id: New reseller owner ID + pickup_rate_alert_enabled: Toggle per-merchant pickup rate alerting + pickup_rate_alert_threshold: New alert threshold %; None leaves unchanged + clear_pickup_rate_alert_threshold: Set to True to explicitly clear the + threshold to NULL (fall back to global config). Takes precedence over + pickup_rate_alert_threshold. Returns: MerchantResponse if successful, None if not found @@ -277,6 +292,9 @@ async def update_merchant( description=description, is_active=is_active, reseller_id=reseller_id, + pickup_rate_alert_enabled=pickup_rate_alert_enabled, + pickup_rate_alert_threshold=pickup_rate_alert_threshold, + clear_pickup_rate_alert_threshold=clear_pickup_rate_alert_threshold, ) if not values: @@ -338,3 +356,20 @@ async def delete_merchant(merchant_id: str) -> bool: except Exception as e: logger.error(f"Error deleting merchant entity {merchant_id}: {e}") raise + + +async def get_merchants_with_pickup_rate_alert_enabled() -> List[MerchantResponse]: + """Get all active merchants that have pickup rate alerting enabled. + + Returns: + List of MerchantResponse for merchants where pickup_rate_alert_enabled = true + and is_active = true. + """ + query, values = get_merchants_with_pickup_rate_alert_enabled_query() + + try: + rows = await run_parameterized_query(query, values) + return [decode_merchant(row) for row in rows] if rows else [] + except Exception as e: + logger.error(f"Error fetching merchants with pickup rate alert enabled: {e}") + raise diff --git a/app/database/decoder/breeze_buddy/merchants.py b/app/database/decoder/breeze_buddy/merchants.py index 16e8eb962..48f244c71 100644 --- a/app/database/decoder/breeze_buddy/merchants.py +++ b/app/database/decoder/breeze_buddy/merchants.py @@ -20,6 +20,8 @@ def decode_merchant(row) -> MerchantResponse: description=row.get("description"), is_active=row.get("is_active", True), reseller_id=str(row["reseller_id"]) if row.get("reseller_id") else None, + pickup_rate_alert_enabled=row.get("pickup_rate_alert_enabled", False), + pickup_rate_alert_threshold=row.get("pickup_rate_alert_threshold"), created_at=row["created_at"], updated_at=row["updated_at"], ) diff --git a/app/database/migrations/025_add_pickup_rate_alert_to_merchants.sql b/app/database/migrations/025_add_pickup_rate_alert_to_merchants.sql new file mode 100644 index 000000000..2ff4ba428 --- /dev/null +++ b/app/database/migrations/025_add_pickup_rate_alert_to_merchants.sql @@ -0,0 +1,14 @@ +-- Migration 025: Add per-merchant pickup rate alert configuration +-- Extends merchants table with opt-in alerting columns. +-- NULL threshold means "use global PICKUP_RATE_ALERT_THRESHOLD dynamic config". + +BEGIN; + +ALTER TABLE merchants + ADD COLUMN IF NOT EXISTS pickup_rate_alert_enabled BOOLEAN NOT NULL DEFAULT false, + ADD COLUMN IF NOT EXISTS pickup_rate_alert_threshold FLOAT DEFAULT NULL; + +CREATE INDEX IF NOT EXISTS idx_merchants_pickup_rate_alert_enabled + ON merchants(pickup_rate_alert_enabled); + +COMMIT; diff --git a/app/database/queries/breeze_buddy/lead_call_tracker.py b/app/database/queries/breeze_buddy/lead_call_tracker.py index 34eba680d..bc22e88d7 100644 --- a/app/database/queries/breeze_buddy/lead_call_tracker.py +++ b/app/database/queries/breeze_buddy/lead_call_tracker.py @@ -338,6 +338,7 @@ def get_all_lead_call_trackers_query( shop_name: Optional[str] = None, limit: Optional[int] = None, offset: Optional[int] = None, + merchant_id: Optional[str] = None, ) -> Tuple[str, List[Any]]: """ Generate query to get all lead call trackers within a date range with optional filters and pagination. @@ -374,6 +375,10 @@ def get_all_lead_call_trackers_query( values.append(f"%{shop_name}%") conditions.append(f"payload->>'shop_name' LIKE ${len(values)}") + if merchant_id: + values.append(merchant_id) + conditions.append(f'lct."merchant_id" = ${len(values)}') + if conditions: text += " WHERE " + " AND ".join(conditions) @@ -451,9 +456,50 @@ def get_lead_call_trackers_count_query( return text, values +def get_call_based_pickup_rate_query( + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + merchant_id: Optional[str] = None, +) -> Tuple[str, List[Any]]: + """ + Generate query to aggregate call-based pickup rate metrics in a single SQL pass. + + Returns one row with: + calls_attempted - COUNT of FINISHED calls in window + calls_no_answer - COUNT of FINISHED calls with NO_ANSWER outcome + """ + values: List[Any] = [] + conditions = [] + + if start_date: + values.append(start_date) + conditions.append(f'"call_initiated_time" >= ${len(values)}') + + if end_date: + values.append(end_date) + conditions.append(f'"call_initiated_time" < ${len(values)}') + + if merchant_id: + values.append(merchant_id) + conditions.append(f'"merchant_id" = ${len(values)}') + + where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" + + text = f""" + SELECT + COUNT(*) FILTER (WHERE status = 'FINISHED') AS calls_attempted, + COUNT(*) FILTER (WHERE status = 'FINISHED' AND outcome = 'NO_ANSWER') + AS calls_no_answer + FROM "{LEAD_CALL_TRACKER_TABLE}" + {where_clause}; + """ + return text, values + + def get_lead_based_analytics_query( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, + merchant_id: Optional[str] = None, ) -> Tuple[str, List[Any]]: """ Generate query to get per-lead call data. @@ -470,6 +516,10 @@ def get_lead_based_analytics_query( values.append(end_date) conditions.append(f'"call_initiated_time" < ${len(values)}') + if merchant_id: + values.append(merchant_id) + conditions.append(f'"merchant_id" = ${len(values)}') + where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" text = f""" diff --git a/app/database/queries/breeze_buddy/merchants.py b/app/database/queries/breeze_buddy/merchants.py index 33f0f868f..622060d88 100644 --- a/app/database/queries/breeze_buddy/merchants.py +++ b/app/database/queries/breeze_buddy/merchants.py @@ -25,18 +25,34 @@ def create_merchant_query( name: Optional[str] = None, description: Optional[str] = None, is_active: bool = True, + pickup_rate_alert_enabled: bool = False, + pickup_rate_alert_threshold: Optional[float] = None, ) -> Tuple[str, List[Any]]: """Generate query to create a new merchant entity.""" query = f""" INSERT INTO {MERCHANTS_TABLE} ( merchant_id, name, description, - is_active, reseller_id, created_at, updated_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7) + is_active, reseller_id, + pickup_rate_alert_enabled, pickup_rate_alert_threshold, + created_at, updated_at + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING merchant_id, name, description, - is_active, reseller_id, created_at, updated_at + is_active, reseller_id, + pickup_rate_alert_enabled, pickup_rate_alert_threshold, + created_at, updated_at """ now = datetime.now(timezone.utc) - values = [merchant_id, name, description, is_active, reseller_id, now, now] + values = [ + merchant_id, + name, + description, + is_active, + reseller_id, + pickup_rate_alert_enabled, + pickup_rate_alert_threshold, + now, + now, + ] return query, values @@ -46,7 +62,9 @@ def get_merchant_by_merchant_identifier_query( """Generate query to get merchant by merchant_id.""" query = f""" SELECT merchant_id, name, description, - is_active, reseller_id, created_at, updated_at + is_active, reseller_id, + pickup_rate_alert_enabled, pickup_rate_alert_threshold, + created_at, updated_at FROM {MERCHANTS_TABLE} WHERE merchant_id = $1 """ @@ -109,7 +127,9 @@ def get_all_merchants_query( # Build queries query = f""" SELECT merchant_id, name, description, - is_active, reseller_id, created_at, updated_at + is_active, reseller_id, + pickup_rate_alert_enabled, pickup_rate_alert_threshold, + created_at, updated_at FROM {MERCHANTS_TABLE} {where_clause} ORDER BY {sort_by} {sort_order.upper()} @@ -137,7 +157,9 @@ def get_merchants_by_reseller_query( query = f""" SELECT merchant_id, name, description, - is_active, reseller_id, created_at, updated_at + is_active, reseller_id, + pickup_rate_alert_enabled, pickup_rate_alert_threshold, + created_at, updated_at FROM {MERCHANTS_TABLE} WHERE reseller_id = $1 ORDER BY created_at DESC @@ -159,6 +181,9 @@ def update_merchant_query( description: Optional[str] = None, is_active: Optional[bool] = None, reseller_id: Optional[str] = None, + pickup_rate_alert_enabled: Optional[bool] = None, + pickup_rate_alert_threshold: Optional[float] = None, + clear_pickup_rate_alert_threshold: bool = False, ) -> Tuple[str, List[Any]]: """Generate query to update a merchant entity. @@ -189,6 +214,21 @@ def update_merchant_query( params.append(reseller_id) param_idx += 1 + if pickup_rate_alert_enabled is not None: + set_clauses.append(f"pickup_rate_alert_enabled = ${param_idx}") + params.append(pickup_rate_alert_enabled) + param_idx += 1 + + if clear_pickup_rate_alert_threshold: + # Explicit null clear - set column to NULL + set_clauses.append(f"pickup_rate_alert_threshold = ${param_idx}") + params.append(None) + param_idx += 1 + elif pickup_rate_alert_threshold is not None: + set_clauses.append(f"pickup_rate_alert_threshold = ${param_idx}") + params.append(pickup_rate_alert_threshold) + param_idx += 1 + if not set_clauses: # No fields to update return "", [] @@ -206,7 +246,9 @@ def update_merchant_query( SET {', '.join(set_clauses)} WHERE merchant_id = ${param_idx} RETURNING merchant_id, name, description, - is_active, reseller_id, created_at, updated_at + is_active, reseller_id, + pickup_rate_alert_enabled, pickup_rate_alert_threshold, + created_at, updated_at """ return query, params @@ -262,7 +304,9 @@ def get_merchants_by_ids_query( query = f""" SELECT merchant_id, name, description, - is_active, reseller_id, created_at, updated_at + is_active, reseller_id, + pickup_rate_alert_enabled, pickup_rate_alert_threshold, + created_at, updated_at FROM {MERCHANTS_TABLE} {where_clause} ORDER BY {sort_by} {sort_order.upper()} @@ -298,3 +342,18 @@ def delete_merchant_query(merchant_id: str) -> Tuple[str, List[Any]]: (SELECT COUNT(*) FROM delete_operation) as deleted_count """ return query, [merchant_id] + + +def get_merchants_with_pickup_rate_alert_enabled_query() -> Tuple[str, List[Any]]: + """Generate query to get all active merchants with pickup rate alerting enabled.""" + query = f""" + SELECT merchant_id, name, description, + is_active, reseller_id, + pickup_rate_alert_enabled, pickup_rate_alert_threshold, + created_at, updated_at + FROM {MERCHANTS_TABLE} + WHERE pickup_rate_alert_enabled = true + AND is_active = true + ORDER BY merchant_id ASC + """ + return query, [] diff --git a/app/schemas/breeze_buddy/merchants.py b/app/schemas/breeze_buddy/merchants.py index 7e60aca64..01b3ad3dd 100644 --- a/app/schemas/breeze_buddy/merchants.py +++ b/app/schemas/breeze_buddy/merchants.py @@ -29,6 +29,14 @@ class MerchantCreate(BaseModel): description="Reseller user ID who owns this merchant. " "Admin-only field; auto-set for resellers.", ) + pickup_rate_alert_enabled: Optional[bool] = Field( + False, description="Enable per-merchant pickup rate alerting" + ) + pickup_rate_alert_threshold: Optional[float] = Field( + None, + description="Alert threshold % for this merchant. " + "If null, falls back to global PICKUP_RATE_ALERT_THRESHOLD.", + ) class MerchantUpdate(BaseModel): @@ -40,6 +48,8 @@ class MerchantUpdate(BaseModel): reseller_id: Optional[str] = Field( None, description="Update reseller assignment (admin only)" ) + pickup_rate_alert_enabled: Optional[bool] = None + pickup_rate_alert_threshold: Optional[float] = None class MerchantResponse(BaseModel): @@ -50,6 +60,8 @@ class MerchantResponse(BaseModel): description: Optional[str] = None is_active: bool = True reseller_id: Optional[str] = None + pickup_rate_alert_enabled: bool = False + pickup_rate_alert_threshold: Optional[float] = None created_at: Optional[datetime] = None updated_at: Optional[datetime] = None diff --git a/app/services/pickup_rate/__init__.py b/app/services/pickup_rate/__init__.py index 2ae050191..c4e2445ab 100644 --- a/app/services/pickup_rate/__init__.py +++ b/app/services/pickup_rate/__init__.py @@ -10,6 +10,6 @@ __all__ = [ "PickupRateMonitor", - "pickup_rate_monitor", "initialize_pickup_rate_tasks", + "pickup_rate_monitor", ] diff --git a/app/services/pickup_rate/calculator.py b/app/services/pickup_rate/calculator.py index 5425b46bb..b3d5003e7 100644 --- a/app/services/pickup_rate/calculator.py +++ b/app/services/pickup_rate/calculator.py @@ -3,9 +3,6 @@ Computes call-based and lead-based pickup rates for a given time window, mirroring the logic in ScoreMonitor._get_daily_call_stats(). - -Phase 1: merchant_id / reseller_id params are accepted but ignored. -Phase 2: pass them through to the DB accessor filters once those are wired up. """ from datetime import datetime @@ -13,19 +10,18 @@ from app.core.logger import logger from app.database.accessor.breeze_buddy.lead_call_tracker import ( - get_all_lead_call_trackers, + get_call_based_pickup_rate, get_lead_based_analytics, ) -# Sentinel value returned on DB error – callers must check for None +# Sentinel value returned on DB error - callers must check for None _ERROR_RESULT = None async def compute_pickup_rates( start_date: datetime, end_date: datetime, - reseller_id: Optional[str] = None, # Phase 2: filter by reseller - merchant_id: Optional[str] = None, # Phase 2: filter by merchant + merchant_id: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """ Calculate call-based and lead-based pickup rates for the given window. @@ -33,8 +29,7 @@ async def compute_pickup_rates( Args: start_date: Start of the rolling window (UTC-aware). end_date: End of the rolling window (UTC-aware). - reseller_id: (Phase 2) Restrict results to this reseller. - merchant_id: (Phase 2) Restrict results to this merchant. + merchant_id: Restrict results to this merchant (None = all merchants). Returns: Dictionary with the following keys on success:: @@ -53,25 +48,14 @@ async def compute_pickup_rates( """ try: # ------------------------------------------------------------------ - # 1. Call-based metrics + # 1. Call-based metrics - single SQL aggregation (no Python-side loop) # ------------------------------------------------------------------ - # Phase 2 NOTE: pass reseller_id / merchant_id to the accessor once - # those optional params are supported by get_all_lead_call_trackers(). - call_trackers = await get_all_lead_call_trackers( + calls_attempted, calls_no_answer = await get_call_based_pickup_rate( start_date=start_date, end_date=end_date, + merchant_id=merchant_id, ) - calls_attempted = 0 - calls_no_answer = 0 - - if call_trackers: - for tracker, _provider in call_trackers: - if tracker.status and tracker.status.value == "FINISHED": - calls_attempted += 1 - if tracker.outcome == "NO_ANSWER": - calls_no_answer += 1 - calls_picked = calls_attempted - calls_no_answer call_pickup_rate = ( calls_picked / calls_attempted * 100 if calls_attempted > 0 else 0.0 @@ -80,10 +64,10 @@ async def compute_pickup_rates( # ------------------------------------------------------------------ # 2. Lead-based metrics # ------------------------------------------------------------------ - # Phase 2 NOTE: pass reseller_id / merchant_id once supported. lead_data = await get_lead_based_analytics( start_date=start_date, end_date=end_date, + merchant_id=merchant_id, ) total_leads = len(lead_data) if lead_data else 0 @@ -108,14 +92,14 @@ async def compute_pickup_rates( } logger.debug( - f"compute_pickup_rates [{start_date} → {end_date}] " + f"compute_pickup_rates [{start_date} -> {end_date}] " f"call_rate={call_pickup_rate}% lead_rate={lead_pickup_rate}%" ) return result except Exception as e: logger.error( - f"compute_pickup_rates failed for window [{start_date} → {end_date}]: {e}", + f"compute_pickup_rates failed for window [{start_date} -> {end_date}]: {e}", exc_info=True, ) return _ERROR_RESULT diff --git a/app/services/pickup_rate/config.py b/app/services/pickup_rate/config.py index 1f3059ddf..4424e10f2 100644 --- a/app/services/pickup_rate/config.py +++ b/app/services/pickup_rate/config.py @@ -1,8 +1,7 @@ """ AlertConfig dataclass for the Pickup Rate Alert system. -Phase 1: global alerting only. -Phase 2 will extend this with optional merchant_id / reseller_id for per-merchant scope. +Supports both global alerting and per-merchant scoping via merchant_id. """ from dataclasses import dataclass, field @@ -19,16 +18,15 @@ class AlertConfig: Also used as the Redis TTL for the dedup key. threshold_percent: Alert fires when pickup rate drops below this value (0-100). alert_type: Which rate(s) to evaluate. - "CALL_BASED" – only call-based pickup rate. - "LEAD_BASED" – only lead-based pickup rate. - "BOTH" – alert if *either* rate is below threshold. + "CALL_BASED" - only call-based pickup rate. + "LEAD_BASED" - only lead-based pickup rate. + "BOTH" - alert if *either* rate is below threshold. scope: Human-readable label used in Slack messages and Redis keys (e.g. "global", "merchant:acme/store-1"). lookback_hours: Rolling window of call data to evaluate (default: 24 h). - Phase 2 extension fields (unused in Phase 1 – kept for forward-compatibility): - merchant_id: Merchant identifier to scope DB queries (None = no filter). - reseller_id: Reseller identifier to scope DB queries (None = no filter). + Optional scope fields: + merchant_id: Merchant identifier to scope DB queries (None = all merchants). """ enabled: bool @@ -38,11 +36,10 @@ class AlertConfig: scope: str = "global" lookback_hours: int = 24 - # Phase 2: per-merchant fields – not used yet, wired through to calculator + # Per-merchant scoping - wired through to calculator for DB query filtering merchant_id: Optional[str] = None - reseller_id: Optional[str] = None - # Computed Redis dedup key – derived from scope, not configurable directly + # Computed Redis dedup key - derived from scope, not configurable directly _redis_key: str = field(init=False, repr=False, default="") def __post_init__(self) -> None: diff --git a/app/services/pickup_rate/monitor.py b/app/services/pickup_rate/monitor.py index 9af7806d6..ec259e231 100644 --- a/app/services/pickup_rate/monitor.py +++ b/app/services/pickup_rate/monitor.py @@ -1,21 +1,19 @@ """ Pickup Rate Monitor -Background task that checks whether global (and, in Phase 2, per-merchant) -call pickup rates have dropped below a configured threshold, and sends a -Slack warning alert when they have. +Background task that checks whether global (and per-merchant) call pickup rates +have dropped below a configured threshold, and sends a Slack warning alert when +they have. Edge-case handling: - Zero calls / leads in window → skip (no data to evaluate) -- Redis unavailable → local dedup/mark operations fail open (log warning, - proceed with alert); scheduled execution still - requires BackgroundTaskScheduler Redis lock +- Redis unavailable → fail-open: log warning, allow alert to fire - DB error in calculator → log error, skip this check cycle - Slack send failure → log error, do NOT mark-alerted (will retry next cycle) """ from datetime import datetime, timedelta, timezone -from typing import Any, Dict +from typing import Any, Dict, Optional from app.core.config.dynamic import ( ENABLE_PICKUP_RATE_ALERT, @@ -23,6 +21,9 @@ PICKUP_RATE_ALERT_THRESHOLD, ) from app.core.logger import logger +from app.database.accessor.breeze_buddy.merchants import ( + get_merchants_with_pickup_rate_alert_enabled, +) from app.services.pickup_rate.calculator import compute_pickup_rates from app.services.pickup_rate.config import AlertConfig from app.services.redis import get_redis_service, is_redis_configured @@ -37,18 +38,113 @@ class PickupRateMonitor: # ----------------------------------------------------------------------- async def check_and_alert(self) -> None: - """Entry point called by BackgroundTaskScheduler on each tick.""" + """Global alert entry point called by BackgroundTaskScheduler on each tick.""" config = await self.load_config() if not config.enabled: logger.debug("PickupRateMonitor: alerting disabled, skipping check") return + await self._run_alert_for_config(config) + + async def check_all_merchants(self) -> None: + """Per-merchant alert entry point called by BackgroundTaskScheduler on each tick. + + Loads all active merchants with pickup_rate_alert_enabled=true and runs + the alert logic for each one independently. A failure on one merchant + does not block the others. + """ + try: + merchants = await get_merchants_with_pickup_rate_alert_enabled() + except Exception as e: + logger.error( + f"PickupRateMonitor: failed to load merchants for per-merchant alert: {e}" + ) + return + + if not merchants: + logger.debug( + "PickupRateMonitor: no merchants with pickup rate alert enabled" + ) + return + + interval_seconds = await PICKUP_RATE_ALERT_INTERVAL_SECONDS() + global_threshold = await PICKUP_RATE_ALERT_THRESHOLD() + + logger.info( + f"PickupRateMonitor: running per-merchant check for {len(merchants)} merchant(s)" + ) + + for merchant in merchants: + try: + threshold = ( + merchant.pickup_rate_alert_threshold + if merchant.pickup_rate_alert_threshold is not None + else global_threshold + ) + config = self._build_alert_config( + interval_seconds=interval_seconds, + threshold_percent=threshold, + scope=f"merchant:{merchant.merchant_id}", + merchant_id=merchant.merchant_id, + ) + await self._run_alert_for_config(config) + except Exception as e: + logger.error( + f"PickupRateMonitor: error processing merchant " + f"'{merchant.merchant_id}': {e}" + ) + + # ----------------------------------------------------------------------- + # Configuration loading + # ----------------------------------------------------------------------- + + def _build_alert_config( + self, + interval_seconds: int, + threshold_percent: float, + scope: str = "global", + merchant_id: Optional[str] = None, + enabled: bool = True, + ) -> AlertConfig: + """Build an AlertConfig with shared defaults applied. + + All alert scopes use alert_type='BOTH'. Callers only need to supply + the fields that differ between global and per-merchant configs. + """ + return AlertConfig( + enabled=enabled, + interval_seconds=interval_seconds, + threshold_percent=threshold_percent, + alert_type="BOTH", + scope=scope, + merchant_id=merchant_id, + ) + + async def load_config(self) -> AlertConfig: + """Fetch runtime config from dynamic (Redis/DevCycle) getters.""" + enabled = await ENABLE_PICKUP_RATE_ALERT() + interval_seconds = await PICKUP_RATE_ALERT_INTERVAL_SECONDS() + threshold_percent = await PICKUP_RATE_ALERT_THRESHOLD() + + return self._build_alert_config( + interval_seconds=interval_seconds, + threshold_percent=threshold_percent, + enabled=enabled, + scope="global", + ) + + # ----------------------------------------------------------------------- + # Core alert logic (shared by global and per-merchant paths) + # ----------------------------------------------------------------------- + + async def _run_alert_for_config(self, config: AlertConfig) -> None: + """Run a full alert cycle for a given AlertConfig scope.""" now = datetime.now(timezone.utc) start_date = now - timedelta(hours=config.lookback_hours) logger.info( - f"PickupRateMonitor: checking pickup rate " + f"PickupRateMonitor: checking pickup rate for scope='{config.scope}' " f"[{start_date.isoformat()} → {now.isoformat()}] " f"threshold={config.threshold_percent}% type={config.alert_type}" ) @@ -56,42 +152,47 @@ async def check_and_alert(self) -> None: rates = await compute_pickup_rates( start_date=start_date, end_date=now, - reseller_id=config.reseller_id, merchant_id=config.merchant_id, ) if rates is None: - logger.error("PickupRateMonitor: DB query failed, skipping alert cycle") + logger.error( + f"PickupRateMonitor: DB query failed for scope='{config.scope}', " + "skipping alert cycle" + ) return - # Skip when there is no data – avoids false alerts on idle systems + # Skip when there is no data - avoids false alerts on idle systems if rates["calls_attempted"] == 0 and rates["total_leads"] == 0: logger.info( - "PickupRateMonitor: no calls or leads in window, skipping alert" + f"PickupRateMonitor: no calls or leads in window for " + f"scope='{config.scope}', skipping alert" ) return if not self._is_threshold_breached(rates, config): logger.info( - f"PickupRateMonitor: pickup rate OK " - f"(call={rates['call_pickup_rate']:.1f}% " - f"lead={rates['lead_pickup_rate']:.1f}% " + f"PickupRateMonitor: pickup rate OK for scope='{config.scope}' " + f"(call={rates['call_pickup_rate']}% " + f"lead={rates['lead_pickup_rate']}% " f"threshold={config.threshold_percent}%)" ) return - # Dedup check – skip if we alerted too recently - if not await self._should_alert(config.redis_dedup_key): + # Dedup check - skip if we alerted too recently + if not await self._should_alert( + config.redis_dedup_key, config.interval_seconds + ): logger.info( - f"PickupRateMonitor: threshold breached but already alerted recently " - f"(key={config.redis_dedup_key})" + f"PickupRateMonitor: threshold breached for scope='{config.scope}' " + f"but already alerted recently (key={config.redis_dedup_key})" ) return # ------------------------------------------------------------------ - # Build and send Slack alert directly + # Build and send Slack alert # ------------------------------------------------------------------ - display_date = now.strftime("%d-%m-%y") + display_date = datetime.now(timezone.utc).strftime("%d-%m-%y") title = f"⚠️ Low Pickup Rate Alert - {display_date}" threshold = config.threshold_percent @@ -114,7 +215,7 @@ async def check_and_alert(self) -> None: { "title": "Call-Based Pickup Rate", "text": ( - f"• Rate: *{rates['call_pickup_rate']:.1f}%* " + f"• Rate: *{rates['call_pickup_rate']}%* " f"(threshold: {threshold}%) {call_status}\n" f"• Calls Attempted: *{rates['calls_attempted']}*\n" f"• Calls Picked Up: *{rates['calls_picked']}*\n" @@ -140,7 +241,7 @@ async def check_and_alert(self) -> None: { "title": "Lead-Based Pickup Rate", "text": ( - f"• Rate: *{rates['lead_pickup_rate']:.1f}%* " + f"• Rate: *{rates['lead_pickup_rate']}%* " f"(threshold: {threshold}%) {lead_status}\n" f"• Total Leads: *{rates['total_leads']}*\n" f"• Leads Picked: *{rates['leads_picked']}*" @@ -156,8 +257,9 @@ async def check_and_alert(self) -> None: ) fallback = ( - f"Low Pickup Rate Alert: call={rates['call_pickup_rate']:.1f}% " - f"lead={rates['lead_pickup_rate']:.1f}% threshold={threshold}%" + f"Low Pickup Rate Alert [{config.scope}]: " + f"call={rates['call_pickup_rate']}% " + f"lead={rates['lead_pickup_rate']}% threshold={threshold}%" ) try: @@ -168,41 +270,25 @@ async def check_and_alert(self) -> None: fallback_text=fallback, ) except Exception as e: - logger.error(f"PickupRateMonitor: exception while sending Slack alert: {e}") + logger.error( + f"PickupRateMonitor: exception while sending Slack alert " + f"for scope='{config.scope}': {e}" + ) success = False - # Only mark-alerted after a successful send so we retry on Slack failure if success: logger.info( f"PickupRateMonitor: alert sent for scope='{config.scope}' " - f"call_rate={rates['call_pickup_rate']:.1f}% " - f"lead_rate={rates['lead_pickup_rate']:.1f}%" + f"call_rate={rates['call_pickup_rate']}% " + f"lead_rate={rates['lead_pickup_rate']}%" ) await self._mark_alerted(config.redis_dedup_key, config.interval_seconds) else: logger.warning( - "PickupRateMonitor: Slack send failed – will retry on next cycle" + f"PickupRateMonitor: Slack send failed for scope='{config.scope}' " + "- will retry on next cycle" ) - # ----------------------------------------------------------------------- - # Configuration loading - # ----------------------------------------------------------------------- - - async def load_config(self) -> AlertConfig: - """Fetch runtime config from dynamic (Redis/DevCycle) getters.""" - enabled = await ENABLE_PICKUP_RATE_ALERT() - interval_seconds = await PICKUP_RATE_ALERT_INTERVAL_SECONDS() - threshold_percent = await PICKUP_RATE_ALERT_THRESHOLD() - - return AlertConfig( - enabled=enabled, - interval_seconds=interval_seconds, - threshold_percent=threshold_percent, - alert_type="BOTH", - scope="global", - lookback_hours=max(1, interval_seconds // 3600), - ) - # ----------------------------------------------------------------------- # Threshold evaluation # ----------------------------------------------------------------------- @@ -217,9 +303,7 @@ def _is_threshold_breached( call_breached = rates["call_pickup_rate"] < threshold lead_breached = rates["lead_pickup_rate"] < threshold - # Only evaluate the relevant rate(s) depending on alert_type if alert_type == "CALL_BASED": - # Skip lead check when no call data, but still evaluate call rate if rates["calls_attempted"] == 0: return False return call_breached @@ -229,8 +313,7 @@ def _is_threshold_breached( return False return lead_breached - # Default: "BOTH" – alert if *either* rate is below threshold - # Guard against checking a rate with zero denominator + # Default: "BOTH" - alert if *either* rate is below threshold call_ok = rates["calls_attempted"] == 0 or not call_breached lead_ok = rates["total_leads"] == 0 or not lead_breached return not (call_ok and lead_ok) @@ -239,7 +322,7 @@ def _is_threshold_breached( # Redis dedup helpers # ----------------------------------------------------------------------- - async def _should_alert(self, redis_key: str) -> bool: + async def _should_alert(self, redis_key: str, interval_seconds: int) -> bool: """Return True when it is safe to send an alert (dedup TTL has expired). Fails open: if Redis is unavailable the alert is allowed to fire so @@ -247,7 +330,7 @@ async def _should_alert(self, redis_key: str) -> bool: """ if not is_redis_configured(): logger.warning( - "PickupRateMonitor: Redis not configured – skipping dedup check, " + "PickupRateMonitor: Redis not configured - skipping dedup check, " "proceeding with alert" ) return True @@ -258,7 +341,7 @@ async def _should_alert(self, redis_key: str) -> bool: return not key_exists except Exception as e: logger.warning( - f"PickupRateMonitor: Redis dedup check failed ({e}) – " + f"PickupRateMonitor: Redis dedup check failed ({e}) - " "proceeding with alert (fail-open)" ) return True diff --git a/app/services/pickup_rate/task.py b/app/services/pickup_rate/task.py index 858d6e94a..71f2c1271 100644 --- a/app/services/pickup_rate/task.py +++ b/app/services/pickup_rate/task.py @@ -1,55 +1,89 @@ """ Pickup Rate Alert Background Task Initialization -Registers the pickup_rate_monitor task with the BackgroundTaskScheduler. +Registers the pickup_rate_monitor tasks with the BackgroundTaskScheduler. Mirrors the pattern used by app/services/langfuse/tasks/task.py. + +Two tasks are registered when conditions are met: + - pickup_rate_monitor → global alert (gated by ENABLE_PICKUP_RATE_ALERT) + - pickup_rate_monitor_merchants → per-merchant alerts (merchants table config) """ -from app.core.config.dynamic import PICKUP_RATE_ALERT_INTERVAL_SECONDS +from __future__ import annotations + +from typing import TYPE_CHECKING + +from app.core.config.dynamic import ( + ENABLE_PICKUP_RATE_ALERT, + PICKUP_RATE_ALERT_INTERVAL_SECONDS, +) from app.core.config.static import SLACK_WEBHOOK_URL from app.core.logger import logger from app.services.pickup_rate.monitor import pickup_rate_monitor +if TYPE_CHECKING: + from app.core.background_tasks.scheduler import BackgroundTaskScheduler -async def initialize_pickup_rate_tasks(scheduler) -> bool: - """ - Register pickup rate alert task if required configuration is present. - The task is always registered when SLACK_WEBHOOK_URL is configured. - ENABLE_PICKUP_RATE_ALERT is re-read on each scheduler tick inside - check_and_alert(), so alerts can be enabled or disabled at runtime via - Redis/DevCycle without a restart. +async def initialize_pickup_rate_tasks(scheduler: "BackgroundTaskScheduler") -> bool: + """ + Register pickup rate alert tasks if all required configuration is present. - PICKUP_RATE_ALERT_INTERVAL_SECONDS is read at registration time and - determines the scheduler cadence. Changing it in Redis will not affect - an already-registered task until the process restarts. + Registers two independent tasks: + 1. pickup_rate_monitor - global alert, gated by ENABLE_PICKUP_RATE_ALERT + 2. pickup_rate_monitor_merchants - per-merchant alerts, always registered when + SLACK_WEBHOOK_URL is set (individual merchants + opt in via their pickup_rate_alert_enabled flag) Args: scheduler: BackgroundTaskScheduler instance to register the task with. Returns: - True if the task was registered, False if skipped. + True if at least one task was registered, False if all were skipped. """ if not SLACK_WEBHOOK_URL: logger.warning( - "PickupRateMonitor: SLACK_WEBHOOK_URL not configured – skipping task registration" + "PickupRateMonitor: SLACK_WEBHOOK_URL not configured - skipping all task registration" ) return False interval_seconds = await PICKUP_RATE_ALERT_INTERVAL_SECONDS() + registered_any = False + # --- Global alert task --- + enabled = await ENABLE_PICKUP_RATE_ALERT() + if enabled: + try: + scheduler.register_task( + name="pickup_rate_monitor", + func=pickup_rate_monitor.check_and_alert, + interval_seconds=interval_seconds, + ) + logger.info( + f"Registered pickup_rate_monitor (global) background task " + f"(interval={interval_seconds}s)" + ) + registered_any = True + except Exception as e: + logger.error(f"Failed to register pickup_rate_monitor task: {e}") + else: + logger.debug( + "PickupRateMonitor: ENABLE_PICKUP_RATE_ALERT is false - skipping global task" + ) + + # --- Per-merchant alert task --- try: scheduler.register_task( - name="pickup_rate_monitor", - func=pickup_rate_monitor.check_and_alert, + name="pickup_rate_monitor_merchants", + func=pickup_rate_monitor.check_all_merchants, interval_seconds=interval_seconds, ) logger.info( - f"Registered pickup_rate_monitor background task " + f"Registered pickup_rate_monitor_merchants (per-merchant) background task " f"(interval={interval_seconds}s)" ) - return True - + registered_any = True except Exception as e: - logger.error(f"Failed to register pickup_rate_monitor task: {e}") - return False + logger.error(f"Failed to register pickup_rate_monitor_merchants task: {e}") + + return registered_any