Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions app/ai/voice/agents/breeze_buddy/managers/calls.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ async def _cleanup_stuck_leads():
if (
config
and locked_lead.call_direction == CallDirection.OUTBOUND
and locked_lead.execution_mode == ExecutionMode.TELEPHONY
and locked_lead.execution_mode
in (ExecutionMode.TELEPHONY, ExecutionMode.TELEPHONY_ALERT)
):
await _retry_call(locked_lead, config)

Expand Down Expand Up @@ -911,7 +912,8 @@ async def handle_call_completion(
if (
outcome in ["BUSY", "NO_ANSWER"]
and lead.call_direction == CallDirection.OUTBOUND
and lead.execution_mode == ExecutionMode.TELEPHONY
and lead.execution_mode
in (ExecutionMode.TELEPHONY, ExecutionMode.TELEPHONY_ALERT)
):
await _retry_call(lead, config, outcome)

Expand Down Expand Up @@ -989,9 +991,9 @@ async def handle_unanswered_calls(call_id: str):
)

# Only retry outbound telephony calls - inbound and test calls should not be retried
if (
lead.call_direction == CallDirection.OUTBOUND
and lead.execution_mode == ExecutionMode.TELEPHONY
if lead.call_direction == CallDirection.OUTBOUND and lead.execution_mode in (
ExecutionMode.TELEPHONY,
ExecutionMode.TELEPHONY_ALERT,
):
await _retry_call(lead, config, "NO_ANSWER")

Expand Down
4 changes: 4 additions & 0 deletions app/api/routers/breeze_buddy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# Pod health probes (1-pod-1-call isolation architecture)
from app.api.routers.breeze_buddy.agent_router.health import router as pod_router
from app.api.routers.breeze_buddy.alerts import router as alerts_router

# Modern RESTful routers
from app.api.routers.breeze_buddy.analytics import router as analytics_router
Expand Down Expand Up @@ -67,6 +68,9 @@
# Leads (call requests/trackers)
router.include_router(leads_router, prefix="", tags=["leads"])

# Alerts (voice alert firing for on-call notifications)
router.include_router(alerts_router, prefix="", tags=["alerts"])

# Telephony (webhook handlers for call providers)
router.include_router(telephony_router, prefix="", tags=["telephony"])

Expand Down
88 changes: 88 additions & 0 deletions app/api/routers/breeze_buddy/alerts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""
Alert fire endpoint.

POST /alerts/fire -- receives an alert event, deduplicates via Redis,
looks up alert group phone numbers, and pushes one BB lead per phone number.

This is the single interface for all alert sources:
- OpenObserve webhook destinations
- Internal HealthMonitor background task
- Future polling-based checks (e.g. balance pollers)

Security:
- reseller_id comes from the JWT token (not request body) to prevent impersonation.
- Token must be scoped to exactly one reseller (reseller_ids must have a single entry).
- merchant_id (optional) comes from the body, validated against token scope.
"""

from fastapi import APIRouter, Depends, HTTPException, status

from app.api.routers.breeze_buddy.leads.rbac import validate_lead_access
from app.api.security.breeze_buddy.rbac_token import get_current_user_with_rbac
from app.schemas import AlertFireRequest, UserInfo, UserRole

from .handlers import fire_alert_handler

router = APIRouter()


def _resolve_reseller_id(current_user: UserInfo) -> str:
"""Extract the single reseller_id from the token.

ALERT_SYSTEM tokens must be scoped to exactly one reseller.
ADMIN tokens must also be scoped (admins should create a dedicated
alert_system user per reseller rather than using their own token).

Raises:
HTTPException 403 if token has no reseller_ids, multiple, or wildcard.
"""
ids = current_user.reseller_ids
if not ids or len(ids) != 1 or ids[0] == "*":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=(
"Alert token must be scoped to exactly one reseller. "
f"Got reseller_ids={ids}. "
"Create a dedicated alert_system user with a single reseller_id."
),
)
return ids[0]


@router.post("/alerts/fire", status_code=status.HTTP_202_ACCEPTED)
async def fire_alert(
req: AlertFireRequest,
current_user: UserInfo = Depends(get_current_user_with_rbac),
):
"""
Fire a voice alert.

Flow:
1. Validate role (ADMIN or ALERT_SYSTEM only)
2. Extract reseller_id from JWT token (must be single-scoped)
3. Validate merchant_id (if provided) against token scope
4. Deduplicate on alert_id via Redis SETNX with dedup_ttl_seconds TTL
5. Look up alert_group_name in alert_groups table for this reseller
6. Validate template + call execution config exist (fail fast)
7. Push one BB lead per group member via create_lead_call_tracker()
8. Existing process_backlog_leads() cron picks up leads and fires calls

Auth: ADMIN or ALERT_SYSTEM JWT role required. Token must be scoped
to exactly one reseller_id.
"""
# Step 1: Role gate
if current_user.role not in (UserRole.ADMIN, UserRole.ALERT_SYSTEM):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only ADMIN or ALERT_SYSTEM roles can fire alerts",
)

# Step 2: Extract reseller_id from token
reseller_id = _resolve_reseller_id(current_user)

# Step 3: Validate merchant_id scope (reuses existing leads RBAC)
validate_lead_access(
current_user, reseller_id, req.merchant_id, operation="fire alert"
)

return await fire_alert_handler(req, current_user, reseller_id)
243 changes: 243 additions & 0 deletions app/api/routers/breeze_buddy/alerts/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
"""
Business logic for alert firing.

Responsibilities:
1. Redis SETNX dedup on alert_id (fail-open: skip dedup on Redis error)
2. Resolve alert_group_name -> phone numbers from DB (scoped by reseller_id)
3. Validate template + call_execution_config exist
4. Push one lead per phone number via create_lead_call_tracker()
"""

from __future__ import annotations

import uuid
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Dict, List

from fastapi import HTTPException, status

from app.core.logger import logger
from app.database.accessor import (
create_lead_call_tracker,
get_alert_group_by_name,
get_call_execution_config_by_merchant_id,
get_template_by_merchant,
)
from app.schemas import ExecutionMode, LeadCallStatus, UserInfo
from app.services.redis.client import get_redis_service

if TYPE_CHECKING:
from app.schemas.breeze_buddy.alerts import AlertFireRequest


def _mask_phone(phone: str) -> str:
"""Mask phone number for logging -- show only last 4 digits."""
if len(phone) <= 4:
return "****"
return f"***{phone[-4:]}"


async def _try_dedup_acquire(dedup_key: str, ttl: int) -> bool | None:
"""
Attempt Redis SETNX for dedup.

Returns:
True -- key acquired (first fire in window)
False -- key exists (duplicate, suppress)
None -- Redis error (fail-open: skip dedup, proceed with alert)
"""
try:
redis = await get_redis_service()
acquired = await redis.set(key=dedup_key, value="1", nx=True, ex=ttl)
return acquired
except Exception as e:
logger.error(f"Redis error during dedup acquire for '{dedup_key}': {e}")
return None


async def _try_dedup_release(dedup_key: str) -> None:
"""Best-effort release of dedup key. Failures are logged, not raised."""
try:
redis = await get_redis_service()
await redis.delete(dedup_key)
except Exception as e:
logger.error(f"Redis error during dedup release for '{dedup_key}': {e}")


async def fire_alert_handler(
req: AlertFireRequest, current_user: UserInfo, reseller_id: str
) -> Dict[str, Any]:
"""
Core alert firing logic.

Args:
req: AlertFireRequest with alert parameters (no reseller_id)
current_user: Authenticated user info (used for audit logging)
reseller_id: Resolved from the JWT token by the router

Returns:
Dict with status, alert_id, leads_queued, and per-member results
"""
dedup_key = f"bb:alert:dedup:{reseller_id}:{req.alert_id}"
dedup_acquired = False

logger.info(
f"Alert fire request from user={current_user.username} "
f"role={current_user.role.value} reseller={reseller_id} "
f"alert_id={req.alert_id}"
)

# -- Step 1: Dedup (fail-open on Redis error) --------------------
if req.dedup_ttl_seconds > 0:
result = await _try_dedup_acquire(dedup_key, req.dedup_ttl_seconds)
if result is False:
logger.info(
f"Alert '{req.alert_id}' deduplicated (TTL key exists in Redis)"
)
return {
"status": "deduplicated",
"alert_id": req.alert_id,
"message": (
f"Alert suppressed — already fired within "
f"{req.dedup_ttl_seconds}s window"
),
}
if result is True:
dedup_acquired = True
# result is None => Redis error, proceed without dedup

# -- Step 2: Resolve alert group (scoped by reseller) -------------
group = await get_alert_group_by_name(req.alert_group_name, reseller_id)
if not group:
if dedup_acquired:
await _try_dedup_release(dedup_key)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=(
f"Alert group '{req.alert_group_name}' not found "
f"for reseller '{reseller_id}'"
),
)

members: List[Dict[str, str]] = group.get("members", [])
if not members:
if dedup_acquired:
await _try_dedup_release(dedup_key)
logger.warning(
f"Alert group '{req.alert_group_name}' has no members — no calls fired"
)
return {
"status": "no_members",
"alert_id": req.alert_id,
"alert_group_name": req.alert_group_name,
"leads_queued": 0,
}
Comment on lines +123 to +134
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Release dedup lock when zero leads are queued.

When group has no members (Lines 75-84) or all member queue attempts fail (final return block), the dedup key remains set and suppresses retries even though nothing was fired.

💡 Suggested change
     if not members:
+        if req.dedup_ttl_seconds > 0:
+            await redis.delete(dedup_key)
         logger.warning(
             f"Alert group '{req.alert_group_name}' has no members — no calls fired"
         )
         return {
             "status": "no_members",
@@
-    return {
+    if req.dedup_ttl_seconds > 0 and len(queued_leads) == 0:
+        await redis.delete(dedup_key)
+
+    return {
         "status": "queued",
         "alert_id": req.alert_id,
         "alert_group_name": req.alert_group_name,
         "leads_queued": len(queued_leads),
         "leads": queued_leads,
         "failed": failed,
     }

Also applies to: 182-189

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/api/routers/breeze_buddy/alerts/handlers.py` around lines 75 - 84, The
early return when no members (checking members and returning status
"no_members") and the final return when leads_queued == 0 must release the dedup
lock so retries aren't suppressed; locate the dedup key variable (dedup_key or
similar) and the dedup store API used elsewhere (e.g.,
set_dedup_key/remove_dedup_key or release_dedup_lock) and call the appropriate
release/remove function just before each return (both in the members-empty
branch and the final block that returns leads_queued 0). Ensure you reference
the same dedup identifier used when the lock was set so the lock is cleared
reliably on these zero-leads exit paths.

Comment on lines +122 to +134

# -- Step 3: Validate template + config (fail fast) ---------------
template = await get_template_by_merchant(
reseller_id, req.merchant_id, req.template
)
if not template:
if dedup_acquired:
await _try_dedup_release(dedup_key)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=(
f"Template '{req.template}' not found for reseller '{reseller_id}'"
),
)

configs = await get_call_execution_config_by_merchant_id(
reseller_id, req.merchant_id
)
config = next((c for c in (configs or []) if c.template == req.template), None)
if not config:
if dedup_acquired:
await _try_dedup_release(dedup_key)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=(f"Call execution config not found for template '{req.template}'"),
)

# -- Step 4: Push one lead per member -----------------------------
queued_leads: List[Dict[str, str]] = []
failed: List[Dict[str, str]] = []

for member in members:
phone = member.get("phone")
name = member.get("name", "")
masked = _mask_phone(phone) if phone else "none"

if not phone:
logger.warning(f"Skipping alert group member {name!r} — no phone number")
continue

lead_id = str(uuid.uuid4())
request_id = f"alert-{req.alert_id}-{lead_id[:8]}"

# extra_payload merged first so reserved keys always win
payload: Dict[str, Any] = {
**(req.extra_payload or {}),
"customer_mobile_number": phone,
"customer_name": name,
"alert_message": req.alert_message,
"alert_id": req.alert_id,
}

try:
lead = await create_lead_call_tracker(
id=lead_id,
reseller_id=reseller_id,
template=req.template,
template_id=str(template.id),
merchant_id=req.merchant_id,
next_attempt_at=datetime.now(timezone.utc), # immediate
payload=payload,
attempt_count=0,
meta_data={
"alert_id": req.alert_id,
"alert_group": req.alert_group_name,
},
request_id=request_id,
execution_mode=ExecutionMode.TELEPHONY_ALERT,
status=LeadCallStatus.BACKLOG,
)

if lead:
queued_leads.append(
{
"lead_id": lead_id,
"phone": masked,
"name": name,
}
)
logger.info(
f"Alert lead queued: {lead_id} -> {masked} (alert={req.alert_id})"
)
else:
failed.append(
{
"phone": masked,
"reason": "create_lead_call_tracker returned None",
}
)
except Exception as e:
logger.error(
f"Failed to queue alert lead for {masked}: {e}",
exc_info=True,
)
failed.append({"phone": masked, "reason": str(e)})

# Release dedup key if no leads were successfully queued
if not queued_leads and dedup_acquired:
await _try_dedup_release(dedup_key)

return {
"status": "queued" if queued_leads else "all_failed",
"alert_id": req.alert_id,
"alert_group_name": req.alert_group_name,
"reseller_id": reseller_id,
"leads_queued": len(queued_leads),
"leads": queued_leads,
"failed": failed,
}
Comment on lines +235 to +243
Loading
Loading