From e25efdadb243a0cd41989b9c8b00b0bee928e597 Mon Sep 17 00:00:00 2001 From: "harsh.tiwari" Date: Wed, 15 Apr 2026 07:43:04 +0530 Subject: [PATCH] feat: add voice alert notification endpoint and DB layer Standalone alerting service for firing voice calls to on-call groups. - POST /alerts/fire endpoint with Redis SETNX deduplication - alert_groups DB table (migration 023) + queries + accessors - ALERT_SYSTEM role added to UserRole enum for JWT auth - Fully parameterized via request body (no config dependencies) - Reuses existing BB template/lead machinery end-to-end --- .../agents/breeze_buddy/managers/calls.py | 12 +- app/api/routers/breeze_buddy/__init__.py | 4 + .../routers/breeze_buddy/alerts/__init__.py | 88 +++++++ .../routers/breeze_buddy/alerts/handlers.py | 243 ++++++++++++++++++ app/api/routers/breeze_buddy/auth/__init__.py | 11 +- app/api/routers/breeze_buddy/auth/handlers.py | 18 +- app/database/accessor/__init__.py | 6 + .../accessor/breeze_buddy/alert_groups.py | 78 ++++++ .../025_create_alert_groups_table.sql | 16 ++ ...t_system_role_and_telephony_alert_mode.sql | 17 ++ .../queries/breeze_buddy/alert_groups.py | 35 +++ .../queries/breeze_buddy/analytics.py | 2 +- .../queries/breeze_buddy/lead_call_tracker.py | 4 +- app/schemas/__init__.py | 3 + app/schemas/breeze_buddy/__init__.py | 3 + app/schemas/breeze_buddy/alerts.py | 67 +++++ app/schemas/breeze_buddy/auth.py | 1 + app/schemas/breeze_buddy/core.py | 1 + 18 files changed, 587 insertions(+), 22 deletions(-) create mode 100644 app/api/routers/breeze_buddy/alerts/__init__.py create mode 100644 app/api/routers/breeze_buddy/alerts/handlers.py create mode 100644 app/database/accessor/breeze_buddy/alert_groups.py create mode 100644 app/database/migrations/025_create_alert_groups_table.sql create mode 100644 app/database/migrations/026_add_alert_system_role_and_telephony_alert_mode.sql create mode 100644 app/database/queries/breeze_buddy/alert_groups.py create mode 100644 app/schemas/breeze_buddy/alerts.py diff --git a/app/ai/voice/agents/breeze_buddy/managers/calls.py b/app/ai/voice/agents/breeze_buddy/managers/calls.py index 9d2eb5b64..1f7d4a63c 100644 --- a/app/ai/voice/agents/breeze_buddy/managers/calls.py +++ b/app/ai/voice/agents/breeze_buddy/managers/calls.py @@ -403,7 +403,8 @@ async def _cleanup_stuck_leads(): if ( config and locked_lead.call_direction == CallDirection.OUTBOUND - and locked_lead.execution_mode == ExecutionMode.TELEPHONY + and locked_lead.execution_mode + in (ExecutionMode.TELEPHONY, ExecutionMode.TELEPHONY_ALERT) ): await _retry_call(locked_lead, config) @@ -911,7 +912,8 @@ async def handle_call_completion( if ( outcome in ["BUSY", "NO_ANSWER"] and lead.call_direction == CallDirection.OUTBOUND - and lead.execution_mode == ExecutionMode.TELEPHONY + and lead.execution_mode + in (ExecutionMode.TELEPHONY, ExecutionMode.TELEPHONY_ALERT) ): await _retry_call(lead, config, outcome) @@ -989,9 +991,9 @@ async def handle_unanswered_calls(call_id: str): ) # Only retry outbound telephony calls - inbound and test calls should not be retried - if ( - lead.call_direction == CallDirection.OUTBOUND - and lead.execution_mode == ExecutionMode.TELEPHONY + if lead.call_direction == CallDirection.OUTBOUND and lead.execution_mode in ( + ExecutionMode.TELEPHONY, + ExecutionMode.TELEPHONY_ALERT, ): await _retry_call(lead, config, "NO_ANSWER") diff --git a/app/api/routers/breeze_buddy/__init__.py b/app/api/routers/breeze_buddy/__init__.py index e072acd4f..e85db706d 100644 --- a/app/api/routers/breeze_buddy/__init__.py +++ b/app/api/routers/breeze_buddy/__init__.py @@ -2,6 +2,7 @@ # Pod health probes (1-pod-1-call isolation architecture) from app.api.routers.breeze_buddy.agent_router.health import router as pod_router +from app.api.routers.breeze_buddy.alerts import router as alerts_router # Modern RESTful routers from app.api.routers.breeze_buddy.analytics import router as analytics_router @@ -67,6 +68,9 @@ # Leads (call requests/trackers) router.include_router(leads_router, prefix="", tags=["leads"]) +# Alerts (voice alert firing for on-call notifications) +router.include_router(alerts_router, prefix="", tags=["alerts"]) + # Telephony (webhook handlers for call providers) router.include_router(telephony_router, prefix="", tags=["telephony"]) diff --git a/app/api/routers/breeze_buddy/alerts/__init__.py b/app/api/routers/breeze_buddy/alerts/__init__.py new file mode 100644 index 000000000..6668641e3 --- /dev/null +++ b/app/api/routers/breeze_buddy/alerts/__init__.py @@ -0,0 +1,88 @@ +""" +Alert fire endpoint. + +POST /alerts/fire -- receives an alert event, deduplicates via Redis, +looks up alert group phone numbers, and pushes one BB lead per phone number. + +This is the single interface for all alert sources: +- OpenObserve webhook destinations +- Internal HealthMonitor background task +- Future polling-based checks (e.g. balance pollers) + +Security: +- reseller_id comes from the JWT token (not request body) to prevent impersonation. +- Token must be scoped to exactly one reseller (reseller_ids must have a single entry). +- merchant_id (optional) comes from the body, validated against token scope. +""" + +from fastapi import APIRouter, Depends, HTTPException, status + +from app.api.routers.breeze_buddy.leads.rbac import validate_lead_access +from app.api.security.breeze_buddy.rbac_token import get_current_user_with_rbac +from app.schemas import AlertFireRequest, UserInfo, UserRole + +from .handlers import fire_alert_handler + +router = APIRouter() + + +def _resolve_reseller_id(current_user: UserInfo) -> str: + """Extract the single reseller_id from the token. + + ALERT_SYSTEM tokens must be scoped to exactly one reseller. + ADMIN tokens must also be scoped (admins should create a dedicated + alert_system user per reseller rather than using their own token). + + Raises: + HTTPException 403 if token has no reseller_ids, multiple, or wildcard. + """ + ids = current_user.reseller_ids + if not ids or len(ids) != 1 or ids[0] == "*": + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=( + "Alert token must be scoped to exactly one reseller. " + f"Got reseller_ids={ids}. " + "Create a dedicated alert_system user with a single reseller_id." + ), + ) + return ids[0] + + +@router.post("/alerts/fire", status_code=status.HTTP_202_ACCEPTED) +async def fire_alert( + req: AlertFireRequest, + current_user: UserInfo = Depends(get_current_user_with_rbac), +): + """ + Fire a voice alert. + + Flow: + 1. Validate role (ADMIN or ALERT_SYSTEM only) + 2. Extract reseller_id from JWT token (must be single-scoped) + 3. Validate merchant_id (if provided) against token scope + 4. Deduplicate on alert_id via Redis SETNX with dedup_ttl_seconds TTL + 5. Look up alert_group_name in alert_groups table for this reseller + 6. Validate template + call execution config exist (fail fast) + 7. Push one BB lead per group member via create_lead_call_tracker() + 8. Existing process_backlog_leads() cron picks up leads and fires calls + + Auth: ADMIN or ALERT_SYSTEM JWT role required. Token must be scoped + to exactly one reseller_id. + """ + # Step 1: Role gate + if current_user.role not in (UserRole.ADMIN, UserRole.ALERT_SYSTEM): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only ADMIN or ALERT_SYSTEM roles can fire alerts", + ) + + # Step 2: Extract reseller_id from token + reseller_id = _resolve_reseller_id(current_user) + + # Step 3: Validate merchant_id scope (reuses existing leads RBAC) + validate_lead_access( + current_user, reseller_id, req.merchant_id, operation="fire alert" + ) + + return await fire_alert_handler(req, current_user, reseller_id) diff --git a/app/api/routers/breeze_buddy/alerts/handlers.py b/app/api/routers/breeze_buddy/alerts/handlers.py new file mode 100644 index 000000000..fc47bcc93 --- /dev/null +++ b/app/api/routers/breeze_buddy/alerts/handlers.py @@ -0,0 +1,243 @@ +""" +Business logic for alert firing. + +Responsibilities: +1. Redis SETNX dedup on alert_id (fail-open: skip dedup on Redis error) +2. Resolve alert_group_name -> phone numbers from DB (scoped by reseller_id) +3. Validate template + call_execution_config exist +4. Push one lead per phone number via create_lead_call_tracker() +""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Any, Dict, List + +from fastapi import HTTPException, status + +from app.core.logger import logger +from app.database.accessor import ( + create_lead_call_tracker, + get_alert_group_by_name, + get_call_execution_config_by_merchant_id, + get_template_by_merchant, +) +from app.schemas import ExecutionMode, LeadCallStatus, UserInfo +from app.services.redis.client import get_redis_service + +if TYPE_CHECKING: + from app.schemas.breeze_buddy.alerts import AlertFireRequest + + +def _mask_phone(phone: str) -> str: + """Mask phone number for logging -- show only last 4 digits.""" + if len(phone) <= 4: + return "****" + return f"***{phone[-4:]}" + + +async def _try_dedup_acquire(dedup_key: str, ttl: int) -> bool | None: + """ + Attempt Redis SETNX for dedup. + + Returns: + True -- key acquired (first fire in window) + False -- key exists (duplicate, suppress) + None -- Redis error (fail-open: skip dedup, proceed with alert) + """ + try: + redis = await get_redis_service() + acquired = await redis.set(key=dedup_key, value="1", nx=True, ex=ttl) + return acquired + except Exception as e: + logger.error(f"Redis error during dedup acquire for '{dedup_key}': {e}") + return None + + +async def _try_dedup_release(dedup_key: str) -> None: + """Best-effort release of dedup key. Failures are logged, not raised.""" + try: + redis = await get_redis_service() + await redis.delete(dedup_key) + except Exception as e: + logger.error(f"Redis error during dedup release for '{dedup_key}': {e}") + + +async def fire_alert_handler( + req: AlertFireRequest, current_user: UserInfo, reseller_id: str +) -> Dict[str, Any]: + """ + Core alert firing logic. + + Args: + req: AlertFireRequest with alert parameters (no reseller_id) + current_user: Authenticated user info (used for audit logging) + reseller_id: Resolved from the JWT token by the router + + Returns: + Dict with status, alert_id, leads_queued, and per-member results + """ + dedup_key = f"bb:alert:dedup:{reseller_id}:{req.alert_id}" + dedup_acquired = False + + logger.info( + f"Alert fire request from user={current_user.username} " + f"role={current_user.role.value} reseller={reseller_id} " + f"alert_id={req.alert_id}" + ) + + # -- Step 1: Dedup (fail-open on Redis error) -------------------- + if req.dedup_ttl_seconds > 0: + result = await _try_dedup_acquire(dedup_key, req.dedup_ttl_seconds) + if result is False: + logger.info( + f"Alert '{req.alert_id}' deduplicated (TTL key exists in Redis)" + ) + return { + "status": "deduplicated", + "alert_id": req.alert_id, + "message": ( + f"Alert suppressed — already fired within " + f"{req.dedup_ttl_seconds}s window" + ), + } + if result is True: + dedup_acquired = True + # result is None => Redis error, proceed without dedup + + # -- Step 2: Resolve alert group (scoped by reseller) ------------- + group = await get_alert_group_by_name(req.alert_group_name, reseller_id) + if not group: + if dedup_acquired: + await _try_dedup_release(dedup_key) + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=( + f"Alert group '{req.alert_group_name}' not found " + f"for reseller '{reseller_id}'" + ), + ) + + members: List[Dict[str, str]] = group.get("members", []) + if not members: + if dedup_acquired: + await _try_dedup_release(dedup_key) + logger.warning( + f"Alert group '{req.alert_group_name}' has no members — no calls fired" + ) + return { + "status": "no_members", + "alert_id": req.alert_id, + "alert_group_name": req.alert_group_name, + "leads_queued": 0, + } + + # -- Step 3: Validate template + config (fail fast) --------------- + template = await get_template_by_merchant( + reseller_id, req.merchant_id, req.template + ) + if not template: + if dedup_acquired: + await _try_dedup_release(dedup_key) + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=( + f"Template '{req.template}' not found for reseller '{reseller_id}'" + ), + ) + + configs = await get_call_execution_config_by_merchant_id( + reseller_id, req.merchant_id + ) + config = next((c for c in (configs or []) if c.template == req.template), None) + if not config: + if dedup_acquired: + await _try_dedup_release(dedup_key) + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=(f"Call execution config not found for template '{req.template}'"), + ) + + # -- Step 4: Push one lead per member ----------------------------- + queued_leads: List[Dict[str, str]] = [] + failed: List[Dict[str, str]] = [] + + for member in members: + phone = member.get("phone") + name = member.get("name", "") + masked = _mask_phone(phone) if phone else "none" + + if not phone: + logger.warning(f"Skipping alert group member {name!r} — no phone number") + continue + + lead_id = str(uuid.uuid4()) + request_id = f"alert-{req.alert_id}-{lead_id[:8]}" + + # extra_payload merged first so reserved keys always win + payload: Dict[str, Any] = { + **(req.extra_payload or {}), + "customer_mobile_number": phone, + "customer_name": name, + "alert_message": req.alert_message, + "alert_id": req.alert_id, + } + + try: + lead = await create_lead_call_tracker( + id=lead_id, + reseller_id=reseller_id, + template=req.template, + template_id=str(template.id), + merchant_id=req.merchant_id, + next_attempt_at=datetime.now(timezone.utc), # immediate + payload=payload, + attempt_count=0, + meta_data={ + "alert_id": req.alert_id, + "alert_group": req.alert_group_name, + }, + request_id=request_id, + execution_mode=ExecutionMode.TELEPHONY_ALERT, + status=LeadCallStatus.BACKLOG, + ) + + if lead: + queued_leads.append( + { + "lead_id": lead_id, + "phone": masked, + "name": name, + } + ) + logger.info( + f"Alert lead queued: {lead_id} -> {masked} (alert={req.alert_id})" + ) + else: + failed.append( + { + "phone": masked, + "reason": "create_lead_call_tracker returned None", + } + ) + except Exception as e: + logger.error( + f"Failed to queue alert lead for {masked}: {e}", + exc_info=True, + ) + failed.append({"phone": masked, "reason": str(e)}) + + # Release dedup key if no leads were successfully queued + if not queued_leads and dedup_acquired: + await _try_dedup_release(dedup_key) + + return { + "status": "queued" if queued_leads else "all_failed", + "alert_id": req.alert_id, + "alert_group_name": req.alert_group_name, + "reseller_id": reseller_id, + "leads_queued": len(queued_leads), + "leads": queued_leads, + "failed": failed, + } diff --git a/app/api/routers/breeze_buddy/auth/__init__.py b/app/api/routers/breeze_buddy/auth/__init__.py index 99d58b958..ad0a1eb84 100644 --- a/app/api/routers/breeze_buddy/auth/__init__.py +++ b/app/api/routers/breeze_buddy/auth/__init__.py @@ -5,7 +5,7 @@ Endpoints: - POST /login - Authenticate user and get JWT token -- POST /auth/s2s/token - Generate long-lived S2S token (admin only) +- POST /auth/s2s/token - Generate long-lived S2S token (admin / alert_system) - GET /auth/me - Get current user info from token - POST /auth/logout - Logout (client-side token removal) - GET /logout - Legacy logout endpoint (deprecated) @@ -72,11 +72,11 @@ async def generate_s2s_token(request: S2STokenRequest): """ Generate long-lived token for Server-to-Server (S2S) authentication. - This endpoint allows admin users to generate long-lived JWT tokens - (up to 365 days) for automated integrations and S2S communication. + This endpoint allows admin and alert_system users to generate long-lived JWT + tokens (up to 365 days) for automated integrations and S2S communication. Requirements: - - Only admin users can generate S2S tokens (security restriction) + - Only admin and alert_system users can generate S2S tokens - Maximum token lifetime: 365 days - Token should be stored securely by the caller @@ -85,6 +85,7 @@ async def generate_s2s_token(request: S2STokenRequest): - Automated scripts and cron jobs - Third-party platform integrations - CI/CD pipelines + - Voice alert notification systems Request Body: { @@ -112,7 +113,7 @@ async def generate_s2s_token(request: S2STokenRequest): Security: - Returns 401 if credentials are invalid or account is inactive - - Returns 403 if user is not an admin + - Returns 403 if user is not admin or alert_system """ return await generate_s2s_token_handler(request) diff --git a/app/api/routers/breeze_buddy/auth/handlers.py b/app/api/routers/breeze_buddy/auth/handlers.py index 501fcdc7b..7130c1cfe 100644 --- a/app/api/routers/breeze_buddy/auth/handlers.py +++ b/app/api/routers/breeze_buddy/auth/handlers.py @@ -181,11 +181,11 @@ async def generate_s2s_token_handler(request: S2STokenRequest) -> S2STokenRespon """ Generate long-lived token for Server-to-Server (S2S) authentication. - Allows admin users to generate long-lived JWT tokens (up to 365 days) - for automated integrations and S2S communication. + Allows admin and alert_system users to generate long-lived JWT tokens + (up to 365 days) for automated integrations and S2S communication. Security restrictions: - - Only admin users can generate S2S tokens + - Only admin and alert_system users can generate S2S tokens - Maximum token lifetime: 365 days Args: @@ -196,7 +196,7 @@ async def generate_s2s_token_handler(request: S2STokenRequest) -> S2STokenRespon Raises: HTTPException: 401 if credentials are invalid or account is inactive - HTTPException: 403 if user is not an admin + HTTPException: 403 if user is not admin or alert_system """ # Authenticate user user = await get_user_by_username(request.username) @@ -223,14 +223,14 @@ async def generate_s2s_token_handler(request: S2STokenRequest) -> S2STokenRespon detail="Account is inactive. Please contact administrator.", ) - # Security restriction: Only admins can generate S2S tokens - if user.role != UserRole.ADMIN: + # Security restriction: Only admins and alert_system users can generate S2S tokens + if user.role not in (UserRole.ADMIN, UserRole.ALERT_SYSTEM): logger.warning( - f"S2S token request denied: non-admin user - {request.username} (role: {user.role})" + f"S2S token request denied: unauthorized role - {request.username} (role: {user.role})" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, - detail="Only admin users can generate S2S tokens. Please contact your administrator.", + detail="Only admin and alert_system users can generate S2S tokens. Please contact your administrator.", ) # Generate long-lived token @@ -240,7 +240,7 @@ async def generate_s2s_token_handler(request: S2STokenRequest) -> S2STokenRespon resolved_reseller_ids = user.reseller_ids resolved_merchant_ids = user.merchant_ids - if user.role != UserRole.ADMIN: + if user.role not in (UserRole.ADMIN, UserRole.ALERT_SYSTEM): temp_user_info = UserInfo( id=user.id, username=user.username, diff --git a/app/database/accessor/__init__.py b/app/database/accessor/__init__.py index b8379bd8a..298174ea4 100644 --- a/app/database/accessor/__init__.py +++ b/app/database/accessor/__init__.py @@ -3,6 +3,10 @@ This module exports all database accessor functions. """ +from .breeze_buddy.alert_groups import ( + get_alert_group_by_name, + upsert_alert_group, +) from .breeze_buddy.blacklisted_numbers import ( add_blacklisted_number, check_blacklisted_number, @@ -64,6 +68,8 @@ ) __all__ = [ + "get_alert_group_by_name", + "upsert_alert_group", "is_number_blacklisted", "add_blacklisted_number", "remove_blacklisted_number", diff --git a/app/database/accessor/breeze_buddy/alert_groups.py b/app/database/accessor/breeze_buddy/alert_groups.py new file mode 100644 index 000000000..645d9a99e --- /dev/null +++ b/app/database/accessor/breeze_buddy/alert_groups.py @@ -0,0 +1,78 @@ +""" +Database accessor functions for alert_groups table. +""" + +import json +from typing import Any, Dict, List, Optional + +from app.core.logger import logger +from app.database.queries import run_parameterized_query +from app.database.queries.breeze_buddy.alert_groups import ( + get_alert_group_by_name_query, + upsert_alert_group_query, +) + + +def _decode_alert_group(row) -> Dict[str, Any]: + """Decode an asyncpg Record into a dict.""" + members = row["members"] + if isinstance(members, str): + members = json.loads(members) + return { + "id": str(row["id"]), + "name": row["name"], + "reseller_id": row["reseller_id"], + "members": members, + "created_at": row["created_at"], + "updated_at": row["updated_at"], + } + + +async def get_alert_group_by_name( + name: str, reseller_id: str +) -> Optional[Dict[str, Any]]: + """ + Fetch alert group by name, scoped to a reseller. + + Returns: + Dict with keys: id, name, reseller_id, members (list of {name, phone}), + created_at, updated_at. None if not found. + """ + try: + text, values = get_alert_group_by_name_query(name, reseller_id) + result = await run_parameterized_query(text, values) + if not result: + return None + return _decode_alert_group(result[0]) + except Exception as e: + logger.error( + f"Error fetching alert group '{name}' for reseller '{reseller_id}': {e}" + ) + raise + + +async def upsert_alert_group( + name: str, reseller_id: str, members: List[Dict[str, str]] +) -> Optional[Dict[str, Any]]: + """ + Create or update an alert group scoped to a reseller. + + Args: + name: Group name (unique per reseller) + reseller_id: Owning reseller identifier + members: List of dicts with 'name' and 'phone' keys + + Returns: + The created/updated alert group dict, or None on failure. + """ + try: + text, values = upsert_alert_group_query(name, reseller_id, members) + result = await run_parameterized_query(text, values) + if not result: + return None + return _decode_alert_group(result[0]) + except Exception as e: + logger.error( + f"Error upserting alert group '{name}' for reseller '{reseller_id}': {e}" + ) + raise diff --git a/app/database/migrations/025_create_alert_groups_table.sql b/app/database/migrations/025_create_alert_groups_table.sql new file mode 100644 index 000000000..f745a2c46 --- /dev/null +++ b/app/database/migrations/025_create_alert_groups_table.sql @@ -0,0 +1,16 @@ +-- 023_create_alert_groups_table.sql +-- Alert groups: named sets of phone numbers to call when an alert fires. +-- Scoped by reseller_id so each reseller manages its own groups. +-- members is a JSONB array: [{"name": "Alice", "phone": "+919876543210"}, ...] + +CREATE TABLE IF NOT EXISTS alert_groups ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name TEXT NOT NULL, + reseller_id TEXT NOT NULL, + members JSONB NOT NULL DEFAULT '[]'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (name, reseller_id) +); + +CREATE INDEX IF NOT EXISTS idx_alert_groups_reseller_id ON alert_groups (reseller_id); diff --git a/app/database/migrations/026_add_alert_system_role_and_telephony_alert_mode.sql b/app/database/migrations/026_add_alert_system_role_and_telephony_alert_mode.sql new file mode 100644 index 000000000..d662910f2 --- /dev/null +++ b/app/database/migrations/026_add_alert_system_role_and_telephony_alert_mode.sql @@ -0,0 +1,17 @@ +-- 025_add_alert_system_role_and_telephony_alert_mode.sql +-- 1. Add 'alert_system' to the users role CHECK constraint. +-- 2. Add 'TELEPHONY_ALERT' to the lead_call_tracker execution_mode CHECK constraint. + +BEGIN; + +-- Users role: add 'alert_system' +ALTER TABLE users DROP CONSTRAINT IF EXISTS users_role_check; +ALTER TABLE users ADD CONSTRAINT users_role_check + CHECK (role IN ('admin', 'reseller', 'merchant', 'user', 'alert_system')); + +-- Execution mode: add 'TELEPHONY_ALERT' +ALTER TABLE lead_call_tracker DROP CONSTRAINT IF EXISTS lead_call_tracker_execution_mode_check; +ALTER TABLE lead_call_tracker ADD CONSTRAINT lead_call_tracker_execution_mode_check + CHECK (execution_mode IN ('TELEPHONY', 'TELEPHONY_TEST', 'DAILY', 'DAILY_TEST', 'DAILY_STREAM', 'TELEPHONY_ALERT')); + +COMMIT; diff --git a/app/database/queries/breeze_buddy/alert_groups.py b/app/database/queries/breeze_buddy/alert_groups.py new file mode 100644 index 000000000..5cc2690b4 --- /dev/null +++ b/app/database/queries/breeze_buddy/alert_groups.py @@ -0,0 +1,35 @@ +""" +Database query functions for alert_groups table. +""" + +import json +from typing import Any, List, Tuple + +ALERT_GROUPS_TABLE = "alert_groups" + + +def get_alert_group_by_name_query(name: str, reseller_id: str) -> Tuple[str, List[Any]]: + """Get alert group by name scoped to a reseller.""" + text = f""" + SELECT "id", "name", "reseller_id", "members", "created_at", "updated_at" + FROM "{ALERT_GROUPS_TABLE}" + WHERE "name" = $1 AND "reseller_id" = $2; + """ + values: List[Any] = [name, reseller_id] + return text, values + + +def upsert_alert_group_query( + name: str, reseller_id: str, members: list +) -> Tuple[str, List[Any]]: + """Create or update an alert group scoped to a reseller.""" + text = f""" + INSERT INTO "{ALERT_GROUPS_TABLE}" ("name", "reseller_id", "members", "updated_at") + VALUES ($1, $2, $3::jsonb, NOW()) + ON CONFLICT ("name", "reseller_id") DO UPDATE + SET "members" = EXCLUDED."members", + "updated_at" = NOW() + RETURNING "id", "name", "reseller_id", "members", "created_at", "updated_at"; + """ + values: List[Any] = [name, reseller_id, json.dumps(members)] + return text, values diff --git a/app/database/queries/breeze_buddy/analytics.py b/app/database/queries/breeze_buddy/analytics.py index 853eb841e..08d72cc39 100644 --- a/app/database/queries/breeze_buddy/analytics.py +++ b/app/database/queries/breeze_buddy/analytics.py @@ -111,7 +111,7 @@ def build_analytics_where_clause( # Filter by execution_mode to exclude test calls from analytics if filter_execution_mode: - conditions.append("lct.execution_mode = 'TELEPHONY'") + conditions.append("lct.execution_mode IN ('TELEPHONY', 'TELEPHONY_ALERT')") # Date range filters - convert IST to UTC before passing to DB if "date_from" in filters and filters["date_from"]: diff --git a/app/database/queries/breeze_buddy/lead_call_tracker.py b/app/database/queries/breeze_buddy/lead_call_tracker.py index 34eba680d..689dde0bf 100644 --- a/app/database/queries/breeze_buddy/lead_call_tracker.py +++ b/app/database/queries/breeze_buddy/lead_call_tracker.py @@ -110,14 +110,14 @@ def get_leads_based_on_status_and_next_attempt_query( ) -> Tuple[str, List[Any]]: """ Generate query to select leads based on status and next attempt time. - Selects TELEPHONY and TELEPHONY_TEST execution_mode leads for cron processing. + Selects TELEPHONY, TELEPHONY_TEST, and TELEPHONY_ALERT execution_mode leads for cron processing. """ text = f""" SELECT * FROM "{LEAD_CALL_TRACKER_TABLE}" WHERE "status" = $1 AND "next_attempt_at" <= $2 AND "is_locked" = FALSE - AND "execution_mode" IN ('TELEPHONY', 'TELEPHONY_TEST'); + AND "execution_mode" IN ('TELEPHONY', 'TELEPHONY_TEST', 'TELEPHONY_ALERT'); """ values = [status.value, time] return text, values diff --git a/app/schemas/__init__.py b/app/schemas/__init__.py index f3f21b9c8..68962909a 100644 --- a/app/schemas/__init__.py +++ b/app/schemas/__init__.py @@ -14,6 +14,7 @@ AutomaticVoiceTTSServiceConfig, AutomaticVoiceUserConnectRequest, ) +from app.schemas.breeze_buddy.alerts import AlertFireRequest from app.schemas.breeze_buddy.analytics import ( AnalyticsFilters, AnalyticsOptions, @@ -82,6 +83,8 @@ ) __all__ = [ + # Alerts + "AlertFireRequest", # Auth "AuthTokenData", "LoginRequest", diff --git a/app/schemas/breeze_buddy/__init__.py b/app/schemas/breeze_buddy/__init__.py index 169223919..8048d311d 100644 --- a/app/schemas/breeze_buddy/__init__.py +++ b/app/schemas/breeze_buddy/__init__.py @@ -1,5 +1,6 @@ """Breeze Buddy agent schemas.""" +from app.schemas.breeze_buddy.alerts import AlertFireRequest from app.schemas.breeze_buddy.analytics import ( AnalyticsFilters, AnalyticsOptions, @@ -62,6 +63,8 @@ from app.schemas.breeze_buddy.users import UserUpdate as UserAccountUpdate __all__ = [ + # Alerts + "AlertFireRequest", # Auth "AuthTokenData", "LoginRequest", diff --git a/app/schemas/breeze_buddy/alerts.py b/app/schemas/breeze_buddy/alerts.py new file mode 100644 index 000000000..1cacd62d8 --- /dev/null +++ b/app/schemas/breeze_buddy/alerts.py @@ -0,0 +1,67 @@ +"""Alert-related request/response schemas.""" + +from typing import Any, Dict, Optional + +from pydantic import BaseModel, Field + + +class AlertFireRequest(BaseModel): + """Request body for POST /alerts/fire. + + Note: reseller_id is NOT in the body — it comes from the JWT token claims. + This prevents reseller impersonation. The token must be scoped to exactly + one reseller (reseller_ids must have a single entry, not ["*"]). + """ + + # Dedup key — must be unique per alert scenario. + # OpenObserve: use {alert_name} template variable (e.g. "stt-degraded") + # Internal: use a descriptive ID (e.g. "stt-degraded-") + alert_id: str = Field( + ..., + min_length=1, + description="Unique alert identifier for deduplication", + examples=["stt-degraded", "tts-elevenlabs-down"], + ) + + # Which group of people to call + alert_group_name: str = Field( + ..., + min_length=1, + description="Name of the alert group (must exist in alert_groups table for this reseller)", + examples=["platform-oncall"], + ) + + # Human-readable message spoken via TTS + # The caller builds this string — the endpoint just passes it through + alert_message: str = Field( + ..., + min_length=1, + description="Alert message spoken via TTS on the call", + examples=["STT degraded: 47 errors in 5 minutes"], + ) + + # BB template config — fully reuses existing template machinery + # merchant_id is optional and validated against the token's merchant_ids scope + merchant_id: Optional[str] = Field( + None, + description="Merchant ID (optional). Validated against JWT token scope.", + ) + template: str = Field( + ..., + min_length=1, + description="BB template name (must exist in templates table)", + examples=["alert-voice-notification"], + ) + + # How long (seconds) to suppress duplicate alerts for this alert_id + dedup_ttl_seconds: int = Field( + default=300, + ge=0, + description="Dedup window in seconds. 0 = no dedup.", + ) + + # Extra payload fields merged into the lead payload + extra_payload: Optional[Dict[str, Any]] = Field( + default=None, + description="Additional fields to include in the lead payload", + ) diff --git a/app/schemas/breeze_buddy/auth.py b/app/schemas/breeze_buddy/auth.py index f3a3e9b17..571cea92f 100644 --- a/app/schemas/breeze_buddy/auth.py +++ b/app/schemas/breeze_buddy/auth.py @@ -23,6 +23,7 @@ class UserRole(str, Enum): RESELLER = "reseller" MERCHANT = "merchant" USER = "user" # Renamed from SHOP + ALERT_SYSTEM = "alert_system" class Permission(str, Enum): diff --git a/app/schemas/breeze_buddy/core.py b/app/schemas/breeze_buddy/core.py index ee053c1da..e2daba2b7 100644 --- a/app/schemas/breeze_buddy/core.py +++ b/app/schemas/breeze_buddy/core.py @@ -44,6 +44,7 @@ class ExecutionMode(str, Enum): TELEPHONY = "TELEPHONY" # Production telephony calls TELEPHONY_TEST = "TELEPHONY_TEST" # Test telephony calls + TELEPHONY_ALERT = "TELEPHONY_ALERT" # Alert-triggered telephony calls DAILY = "DAILY" # Production Daily (web) calls DAILY_TEST = "DAILY_TEST" # Test Daily (web) calls DAILY_STREAM = "DAILY_STREAM" # Daily STT/TTS-only (no LLM, client-driven)