From 70b4c12a39d53ebf84408937c8c74a5190574359 Mon Sep 17 00:00:00 2001 From: Narsimha Reddy Date: Tue, 24 Feb 2026 17:49:47 +0530 Subject: [PATCH] (feat): add actions for langfuse evaluators - added new actions for evaluators - added new config for actions of evaluators - added a basic update outcome action for now - they will work with same threshold as the alerts --- .env.example | 1 + .../agents/breeze_buddy/agent/__init__.py | 3 + .../observability/tracing_setup.py | 15 + .../voice/agents/breeze_buddy/utils/traces.py | 40 + app/core/config/dynamic.py | 102 +++ .../breeze_buddy/lead_call_tracker.py | 41 + .../queries/breeze_buddy/lead_call_tracker.py | 48 +- .../langfuse/tasks/actions/__init__.py | 3 + .../langfuse/tasks/actions/actions.py | 715 ++++++++++++++++++ app/services/langfuse/tasks/actions/utils.py | 194 +++++ .../langfuse/tasks/score_monitor/score.py | 208 ++++- app/services/slack/alert.py | 42 +- docs/EVALUATOR_ACTIONS.md | 412 ++++++++++ 13 files changed, 1810 insertions(+), 14 deletions(-) create mode 100644 app/ai/voice/agents/breeze_buddy/utils/traces.py create mode 100644 app/services/langfuse/tasks/actions/__init__.py create mode 100644 app/services/langfuse/tasks/actions/actions.py create mode 100644 app/services/langfuse/tasks/actions/utils.py create mode 100644 docs/EVALUATOR_ACTIONS.md diff --git a/.env.example b/.env.example index 326301eea..9d45c7e34 100644 --- a/.env.example +++ b/.env.example @@ -181,6 +181,7 @@ REDIS_PORT=6379 REDIS_CLUSTER_NODES=redis1:6379,redis2:6380,redis3:6381 LANGFUSE_EVALUATORS="breeze buddy outcome correctness, latency evaluator" # name of the evaluator +EVALUATOR_ACTIONS='{"OUTCOME MISMATCH": {"action_type": "outcome_update", "action_config": {"outcome_key": "$.actual_outcome"}, "action_steps": {"update_in_db": true, "send_reporting_webhook": true, "cancel_retries": true}}}' SLACK_WEBHOOK_URL="" # slack webhook url to send alerts SLACK_TAG_USERS="@john.doe,@jane.smith,@dev.team" # comma-separated list of users to tag in Slack alerts diff --git a/app/ai/voice/agents/breeze_buddy/agent/__init__.py b/app/ai/voice/agents/breeze_buddy/agent/__init__.py index 4f9f0caf7..b2319c4f6 100644 --- a/app/ai/voice/agents/breeze_buddy/agent/__init__.py +++ b/app/ai/voice/agents/breeze_buddy/agent/__init__.py @@ -828,6 +828,9 @@ async def _run_with_tracing(self, runner: PipelineRunner) -> None: evaluator_config=( self.configurations.evaluator_config if self.configurations else None ), + execution_mode=( + self.lead.execution_mode.value if self.lead.execution_mode else None + ), ) try: with trace.use_span(self.root_span, end_on_exit=True): diff --git a/app/ai/voice/agents/breeze_buddy/observability/tracing_setup.py b/app/ai/voice/agents/breeze_buddy/observability/tracing_setup.py index d3d36890c..49a6daf86 100644 --- a/app/ai/voice/agents/breeze_buddy/observability/tracing_setup.py +++ b/app/ai/voice/agents/breeze_buddy/observability/tracing_setup.py @@ -10,6 +10,7 @@ from opentelemetry.sdk.trace.export import BatchSpanProcessor from app.ai.voice.agents.breeze_buddy.template.context import TemplateContext +from app.ai.voice.agents.breeze_buddy.utils.traces import extract_possible_outcomes from app.core.config.static import ( BUDDY_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, BUDDY_OTEL_EXPORTER_OTLP_TRACES_HEADERS, @@ -79,6 +80,7 @@ def create_root_span( provider: str, template_type: str, evaluator_config: Optional[List[str]] = None, + execution_mode: Optional[str] = None, ) -> trace.Span: """ Create and configure the root tracing span with all conversation attributes. @@ -94,6 +96,7 @@ def create_root_span( template_type: Template type being used evaluator_config: List of evaluator names to add as Langfuse trace tags. If None/empty, "ALL_EVALS" tag is added so all evaluators run. + execution_mode: Execution mode of the call (e.g. "TELEPHONY", "TELEPHONY_TEST"). Returns: A span object that can be used with trace.use_span() context manager @@ -126,6 +129,8 @@ def create_root_span( "daily" if transport_type == "daily" else provider, ) span.set_attribute("template.type", template_type) + if execution_mode: + span.set_attribute("execution_mode", execution_mode) # Set evaluator tags for Langfuse trace filtering # Langfuse maps `langfuse.trace.tags` span attribute to trace-level tags @@ -203,6 +208,16 @@ def update_span_with_evaluation_data(context: TemplateContext) -> None: # Core evaluation data context.root_span.set_attribute("call_outcome", lead.outcome or "UNKNOWN") + # Extract and attach possible outcomes from the template flow definition + template = getattr(context.bot, "template", None) + if template and hasattr(template, "flow") and template.flow: + possible_outcomes = extract_possible_outcomes(template.flow) + if possible_outcomes: + context.root_span.set_attribute( + "possible_outcomes", + json.dumps(possible_outcomes), + ) + # Calculate call duration call_duration = None if lead.call_initiated_time: diff --git a/app/ai/voice/agents/breeze_buddy/utils/traces.py b/app/ai/voice/agents/breeze_buddy/utils/traces.py new file mode 100644 index 000000000..eee7a4232 --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/utils/traces.py @@ -0,0 +1,40 @@ +""" +Tracing utility helpers for Breeze Buddy. +""" + +from typing import Any, Dict, List + + +def extract_possible_outcomes(flow: Dict[str, Any]) -> List[str]: + """ + Extract all possible outcome values from a template flow definition. + + Walks through all nodes → functions → hooks to find every + ``update_outcome_in_database`` hook with a static ``outcome`` field and + collects the unique values. + + Args: + flow: The raw template flow dict (``template.flow``). + + Returns: + Deduplicated list of outcome strings defined in the template. + """ + outcomes: list[str] = [] + seen: set[str] = set() + + for node in flow.get("nodes", []): + for func in node.get("functions", []): + for hook in func.get("hooks", []): + if hook.get("name") != "update_outcome_in_database": + continue + expected_fields = hook.get("expected_fields", {}) + outcome_field = expected_fields.get("outcome", {}) + if outcome_field.get("source") == "static" and outcome_field.get( + "value" + ): + value = outcome_field["value"] + if value not in seen: + seen.add(value) + outcomes.append(value) + + return outcomes diff --git a/app/core/config/dynamic.py b/app/core/config/dynamic.py index 91afe3ef2..f7d942455 100644 --- a/app/core/config/dynamic.py +++ b/app/core/config/dynamic.py @@ -1,4 +1,5 @@ import json +from typing import Any from app.core.logger import logger from app.services.live_config.store import get_config @@ -280,6 +281,107 @@ async def LANGFUSE_EVALUATORS() -> dict[str, int]: return evaluators +async def EVALUATOR_ACTIONS() -> dict[str, Any]: + """ + Returns EVALUATOR_ACTIONS from Redis as a dict mapping evaluator names to action configs. + + Format: JSON string stored in Redis + { + "": { + "action_type": "outcome_update", + "action_config": { + "outcome": "VOICEMAIL", + "allowed_outcome_changes": {"BUSY": ["VOICEMAIL"]}, + "disallowed_outcome_changes": {"*": ["BUSY"]} + }, + "action_steps": { + "update_in_db": true, + "send_reporting_webhook": true, + "cancel_retries": true + } + }, + "": { + "action_type": "outcome_update", + "action_config": { + "outcome_key": "$.correct_outcome" + }, + "action_steps": { + "update_in_db": true, + "cancel_retries": true + } + } + } + + action_config options: + - outcome: Direct outcome value (e.g., "VOICEMAIL") - use for simple cases + - outcome_key: JSON path to extract from JSON at end of comment (e.g., "$.correct_outcome") + - allowed_outcome_changes: (optional) Dict of {current_outcome: [allowed_new_outcomes]} + Use "*" as key to allow a target from any current outcome. Deny (disallowed) takes precedence. + - disallowed_outcome_changes: (optional) Dict of {current_outcome: [disallowed_new_outcomes]} + Use "*" as key to disallow for all current outcomes (e.g., {"*": ["BUSY"]}) + + action_steps options: + - update_in_db: (default: true) Update the lead's outcome in the database + - send_reporting_webhook: (default: true) Send reporting webhook for outcome correction + - cancel_retries: (default: true) Cancel any pending retry leads + + Trigger logic: Action triggers when score < threshold (from LANGFUSE_EVALUATORS config) + """ + config_value = await get_config("EVALUATOR_ACTIONS", "", str) + if not config_value: + return {} + + try: + parsed = json.loads(config_value) + if not isinstance(parsed, dict): + logger.error( + f"EVALUATOR_ACTIONS config must be a JSON object, got {type(parsed).__name__}" + ) + return {} + + # Validate and decode each evaluator config entry + valid_actions: dict[str, Any] = {} + for evaluator_name, config in parsed.items(): + if not isinstance(config, dict): + logger.warning( + f"EVALUATOR_ACTIONS: skipping '{evaluator_name}' — config must be an object, got {type(config).__name__}" + ) + continue + + action_type = config.get("action_type") + if not action_type or not isinstance(action_type, str): + logger.warning( + f"EVALUATOR_ACTIONS: skipping '{evaluator_name}' — missing or invalid 'action_type'" + ) + continue + + action_config = config.get("action_config") + if action_config is not None and not isinstance(action_config, dict): + logger.warning( + f"EVALUATOR_ACTIONS: skipping '{evaluator_name}' — 'action_config' must be an object" + ) + continue + + action_steps = config.get("action_steps") + if action_steps is not None and not isinstance(action_steps, dict): + logger.warning( + f"EVALUATOR_ACTIONS: skipping '{evaluator_name}' — 'action_steps' must be an object" + ) + continue + + valid_actions[evaluator_name] = config + + if len(valid_actions) < len(parsed): + logger.info( + f"EVALUATOR_ACTIONS: {len(valid_actions)}/{len(parsed)} configs valid" + ) + + return valid_actions + except json.JSONDecodeError as e: + logger.error(f"Failed to parse EVALUATOR_ACTIONS config: {e}") + return {} + + # --- Noise Cancellation Configuration --- async def BB_NOISE_CANCELLATION_ENABLED() -> bool: """Returns BB_NOISE_CANCELLATION_ENABLED from Redis""" diff --git a/app/database/accessor/breeze_buddy/lead_call_tracker.py b/app/database/accessor/breeze_buddy/lead_call_tracker.py index 1c915c8ea..79ba2e158 100644 --- a/app/database/accessor/breeze_buddy/lead_call_tracker.py +++ b/app/database/accessor/breeze_buddy/lead_call_tracker.py @@ -14,6 +14,7 @@ abort_lead_by_id_query, acquire_lock_on_lead_by_id_query, append_metadata_field_query, + cancel_pending_retries_by_request_id_query, defer_lead_next_attempt_and_release_lock_query, get_all_lead_call_trackers_query, get_lead_based_analytics_query, @@ -625,6 +626,46 @@ async def handle_lead_abort( return None +async def cancel_pending_retries_by_request_id( + request_id: str, + reason: str = "outcome_corrected", +) -> int: + """ + Cancel all pending retry leads for a given request_id. + + Used when an outcome is corrected to a terminal state (e.g., BUSY -> CONFIRM) + to prevent scheduled retries from executing. + + Args: + request_id: The order/request ID to cancel retries for + reason: Reason for cancellation (stored in meta_data) + + Returns: + Number of leads cancelled + """ + logger.info(f"Cancelling pending retries for request_id: {request_id}") + + try: + query_text, values = cancel_pending_retries_by_request_id_query( + request_id, reason + ) + result = await run_parameterized_query(query_text, values) + cancelled_count = get_row_count(result) if result else 0 + + if cancelled_count > 0: + logger.info( + f"Cancelled {cancelled_count} pending retry leads for request_id: {request_id}" + ) + + return cancelled_count + + except Exception as e: + logger.error( + f"Error cancelling pending retries for request_id {request_id}: {e}" + ) + return 0 + + async def update_lead_payload( lead_id: str, payload_updates: Dict[str, Any] ) -> Optional[LeadCallTracker]: diff --git a/app/database/queries/breeze_buddy/lead_call_tracker.py b/app/database/queries/breeze_buddy/lead_call_tracker.py index 91e4a6978..85bf3f8cb 100644 --- a/app/database/queries/breeze_buddy/lead_call_tracker.py +++ b/app/database/queries/breeze_buddy/lead_call_tracker.py @@ -3,7 +3,7 @@ """ import json -from datetime import datetime +from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple from app.schemas import CallDirection, ExecutionMode, LeadCallStatus @@ -582,6 +582,52 @@ def abort_lead_by_id_query( return text, values +def cancel_pending_retries_by_request_id_query( + request_id: str, + reason: str = "outcome_corrected", +) -> Tuple[str, List[Any]]: + """ + Generate query to cancel all pending retry leads for a given request_id. + + Used when an outcome is corrected to a terminal state (e.g., BUSY -> CONFIRM) + to prevent scheduled retries from executing. + + Args: + request_id: The order/request ID to cancel retries for + reason: Reason for cancellation (stored in meta_data) + + Returns: + Tuple of (query_text, values) + """ + text = f""" + UPDATE "{LEAD_CALL_TRACKER_TABLE}" + SET + "status" = $1, + "outcome" = $2, + "updated_at" = NOW(), + "meta_data" = COALESCE("meta_data", '{{}}')::jsonb || $3::jsonb + WHERE + "request_id" = $4 + AND "status" IN ($5, $6) + RETURNING *; + """ + + metadata = { + "cancelled_at": datetime.now(timezone.utc).isoformat(), + "cancel_reason": reason, + } + + values = [ + LeadCallStatus.FINISHED.value, + "CANCELLED_BY_OUTCOME_CORRECTION", + json.dumps(metadata), + request_id, + LeadCallStatus.BACKLOG.value, + LeadCallStatus.RETRY.value, + ] + return text, values + + def update_lead_payload_query( lead_id: str, payload_updates: Dict[str, Any] ) -> Tuple[str, List[Any]]: diff --git a/app/services/langfuse/tasks/actions/__init__.py b/app/services/langfuse/tasks/actions/__init__.py new file mode 100644 index 000000000..a7045c2b9 --- /dev/null +++ b/app/services/langfuse/tasks/actions/__init__.py @@ -0,0 +1,3 @@ +from app.services.langfuse.tasks.actions.actions import ActionExecutor, ActionResult + +__all__ = ["ActionExecutor", "ActionResult"] diff --git a/app/services/langfuse/tasks/actions/actions.py b/app/services/langfuse/tasks/actions/actions.py new file mode 100644 index 000000000..f9d74bbd9 --- /dev/null +++ b/app/services/langfuse/tasks/actions/actions.py @@ -0,0 +1,715 @@ +""" +Evaluator Action Handlers + +This module provides functionality to execute actions based on evaluator results. +When an evaluator returns a score below threshold, configured actions can be triggered +to update outcomes in the database, cancel retries, and send reporting webhooks. +""" + +import json +from dataclasses import dataclass +from datetime import datetime, timezone +from enum import Enum +from typing import Any, Dict, Optional + +import aiohttp + +from app.core.config.static import ORDER_CONFIRMATION_WEBHOOK_SECRET_KEY +from app.core.logger import logger +from app.core.logger.context import ( + clear_log_context, + set_log_context, + update_log_context, +) +from app.core.security.sha import calculate_hmac_sha256 +from app.database.accessor.breeze_buddy.lead_call_tracker import ( + cancel_pending_retries_by_request_id, + get_lead_by_call_id, + update_lead_call_completion_details, +) +from app.services.langfuse.tasks.actions.utils import ( + extract_field, + extract_json_from_end, +) + +# Status icons for Slack alerts - module-level constant +STATUS_ICONS = { + "SUCCESS": "✅", + "SKIPPED": "⏭️", + "FAILED": "❌", + "ERROR": "⚠️", +} + + +@dataclass +class ActionResult: + """Result of an action execution with detailed status for each step""" + + success: bool + db_update: Optional[str] = None # "SUCCESS" | "SKIPPED" | "FAILED" | "ERROR" + cancel_retries: Optional[str] = None # "SUCCESS" | "SKIPPED" | "ERROR" + reporting_webhook: Optional[str] = ( + None # "SUCCESS" | "SKIPPED" | "FAILED" | "ERROR" + ) + error_message: Optional[str] = None + outcome_change: Optional[str] = None # e.g., "BUSY -> CONFIRM" + + canceled_count: Optional[int] = None # Number of retries cancelled + lead_id: Optional[str] = None # Lead ID for alerting + + # Generic step results for extensibility + step_results: Optional[Dict[str, str]] = ( + None # {"step_name": "SUCCESS"|"SKIPPED"|"ERROR"} + ) + + def to_slack_status(self) -> str: + """ + Generate a formatted status string for Slack alerts. + + Each step appears on a new line with a subtle status icon. + Step names are used directly from action_steps keys (snake_case). + """ + parts = [] + + # Use generic step_results if available (extensible) + if self.step_results: + for step_name, status in self.step_results.items(): + icon = STATUS_ICONS.get(status, "?") + parts.append(f"{step_name}: {icon}") + + # Fallback to legacy fields for backward compatibility + else: + # Map internal field names to display names + step_map = { + "db_update": ("update_in_db", self.db_update), + "cancel_retries": ("cancel_retries", self.cancel_retries), + "reporting_webhook": ( + "send_reporting_webhook", + self.reporting_webhook, + ), + } + + for display_name, status in step_map.values(): + if status: + icon = STATUS_ICONS.get(status, "?") + parts.append(f"{display_name}: {icon}") + + status_str = "\n".join(parts) if parts else "No actions" + + # Add outcome change if available + if self.outcome_change: + status_str += f"\n*Outcome:* {self.outcome_change}" + + # Add error message if available + if self.error_message: + status_str += f"\n*Error:* {self.error_message}" + + return status_str + + +class ActionType(str, Enum): + """Supported action types for evaluators""" + + OUTCOME_UPDATE = "outcome_update" + + +class OutcomeUpdateAction: + """Handler for outcome update actions""" + + def __init__(self, config: Dict[str, Any]): + """ + Initialize outcome update action. + + Args: + config: Full evaluator action config with structure: + { + "action_type": "outcome_update", + "action_config": { + "outcome": "VOICEMAIL", + "outcome_key": "$.actual_outcome", + "allowed_outcome_changes": {"BUSY": ["VOICEMAIL"]}, + "disallowed_outcome_changes": {"*": ["BUSY"]} + }, + "action_steps": { + "update_in_db": true, + "send_reporting_webhook": true, + "cancel_retries": true + } + } + """ + self.config = config + + # Extract action_config values (behavior control) + action_config = config.get("action_config", {}) + self.outcome = action_config.get("outcome") + self.outcome_key = action_config.get("outcome_key") + self.allowed_outcome_changes = action_config.get("allowed_outcome_changes", {}) + self.disallowed_outcome_changes = action_config.get( + "disallowed_outcome_changes", {} + ) + + # Extract action_steps (execution steps) + action_steps = config.get("action_steps", {}) + self.update_in_db = action_steps.get("update_in_db", True) + self.send_reporting_webhook = action_steps.get("send_reporting_webhook", True) + self.cancel_retries = action_steps.get("cancel_retries", True) + + # Store all action_steps for generic step tracking + self.action_steps = action_steps + + async def execute( + self, + call_sid: str, + score: Dict[str, Any], + current_lead: Optional[Any] = None, + ) -> ActionResult: + """ + Execute the outcome update action. + + All logs automatically include: call_sid, lead_id, request_id, + current_outcome, new_outcome, conversation_id (if available) + + Args: + call_sid: The call SID to update + score: The evaluator score dict + current_lead: Optional pre-fetched lead (to avoid N+1 queries) + + Returns: + ActionResult with detailed status for each step + """ + result = ActionResult(success=False) + + if not call_sid: + logger.error("[EVALUATOR_ACTION] Cannot execute: call_sid is None") + result.error_message = "call_sid is None" + return result + + # Set initial log context + set_log_context(call_sid=call_sid) + + try: + # Get current lead if not provided + if current_lead is None: + try: + current_lead = await get_lead_by_call_id(call_sid) + except Exception as e: + logger.error(f"[EVALUATOR_ACTION] Failed to fetch lead: {e}") + result.error_message = f"Failed to fetch lead: {e}" + result.db_update = "ERROR" + return result + + if not current_lead: + logger.error("[EVALUATOR_ACTION] No lead found") + result.error_message = "No lead found" + result.db_update = "ERROR" + return result + + # Store lead_id for alerting + result.lead_id = str(current_lead.id) + + # Update log context with all lead details + lead_meta = getattr(current_lead, "metaData", None) or {} + conversation_id = lead_meta.get("conversation_id") + + update_log_context( + lead_id=str(current_lead.id), + request_id=current_lead.request_id, + current_outcome=current_lead.outcome, + ) + + if conversation_id: + update_log_context(conversation_id=conversation_id) + + # Determine new outcome + new_outcome = self._extract_outcome(score) + if not new_outcome: + logger.error( + "[EVALUATOR_ACTION] FAILED - Could not extract outcome from score" + ) + result.db_update = "FAILED" + result.error_message = "Could not extract outcome from score" + self._populate_step_results(result) + return result + + # Skip if outcome isn't actually changing + if new_outcome == current_lead.outcome: + logger.info( + f"[EVALUATOR_ACTION] SKIPPED - outcome already is '{new_outcome}', no change needed" + ) + result.db_update = "SKIPPED" + result.cancel_retries = "SKIPPED" + result.reporting_webhook = "SKIPPED" + result.error_message = "Outcome unchanged" + self._populate_step_results(result) + return result + + result.outcome_change = f"{current_lead.outcome} -> {new_outcome}" + update_log_context(new_outcome=new_outcome) + + # Check if this transition is allowed (deny wins if both are configured) + if not self._is_allowed_transition(current_lead.outcome, new_outcome): + logger.info( + f"[EVALUATOR_ACTION] SKIPPED - transition {current_lead.outcome} -> {new_outcome} is not in allowed list" + ) + result.db_update = "SKIPPED" + result.cancel_retries = "SKIPPED" + result.reporting_webhook = "SKIPPED" + result.error_message = ( + f"Transition {current_lead.outcome} -> {new_outcome} is not allowed" + ) + self._populate_step_results(result) + return result + + # Check if this transition is disallowed (deny takes precedence) + if self._is_disallowed_transition(current_lead.outcome, new_outcome): + logger.info( + f"[EVALUATOR_ACTION] SKIPPED - transition {current_lead.outcome} -> {new_outcome} is disallowed" + ) + result.db_update = "SKIPPED" + result.cancel_retries = "SKIPPED" + result.reporting_webhook = "SKIPPED" + result.error_message = ( + f"Transition {current_lead.outcome} -> {new_outcome} is disallowed" + ) + self._populate_step_results(result) + return result + + # Step 1: Update the lead in database + if self.update_in_db: + try: + # Merge existing meta_data with correction details to preserve prior keys + existing_meta_data = current_lead.metaData or {} + merged_meta_data = dict(existing_meta_data) + merged_meta_data.update( + { + "outcome_corrected_by": "evaluator_action", + "evaluator_name": score.get("name"), + "previous_outcome": current_lead.outcome, + "correction_timestamp": datetime.now( + timezone.utc + ).isoformat(), + } + ) + db_result = await update_lead_call_completion_details( + id=current_lead.id, + outcome=new_outcome, + meta_data=merged_meta_data, + ) + + if db_result: + logger.info( + f"[EVALUATOR_ACTION] DB_UPDATE SUCCESS | {result.outcome_change}" + ) + result.db_update = "SUCCESS" + else: + logger.error( + "[EVALUATOR_ACTION] DB_UPDATE FAILED - update returned None" + ) + result.db_update = "FAILED" + result.error_message = "DB update returned no result" + self._populate_step_results(result) + return result + + except Exception as e: + logger.error(f"[EVALUATOR_ACTION] DB_UPDATE ERROR: {e}") + result.db_update = "ERROR" + result.error_message = f"DB update error: {e}" + self._populate_step_results(result) + return result + else: + logger.info("[EVALUATOR_ACTION] DB_UPDATE SKIPPED - update_in_db=False") + result.db_update = "SKIPPED" + + # Step 2: Cancel pending retries if enabled and request_id exists + if self.cancel_retries: + if current_lead.request_id: + try: + cancelled = await cancel_pending_retries_by_request_id( + request_id=current_lead.request_id, + reason=f"outcome_corrected_to_{new_outcome}", + ) + result.canceled_count = cancelled + if cancelled > 0: + logger.info( + f"[EVALUATOR_ACTION] CANCEL_RETRIES SUCCESS - Cancelled {cancelled} pending retries" + ) + else: + logger.info( + "[EVALUATOR_ACTION] CANCEL_RETRIES SUCCESS - No pending retries to cancel" + ) + result.cancel_retries = "SUCCESS" + + except Exception as e: + logger.error(f"[EVALUATOR_ACTION] CANCEL_RETRIES ERROR: {e}") + result.cancel_retries = "ERROR" + # Don't return - continue to webhook + else: + logger.info( + "[EVALUATOR_ACTION] CANCEL_RETRIES SKIPPED - no request_id" + ) + result.cancel_retries = "SKIPPED" + else: + logger.info( + "[EVALUATOR_ACTION] CANCEL_RETRIES SKIPPED - cancel_retries=False" + ) + result.cancel_retries = "SKIPPED" + + # Step 3: Send reporting webhook if enabled + if self.send_reporting_webhook: + try: + _, webhook_status = await self._send_reporting_webhook( + call_sid=call_sid, + new_outcome=new_outcome, + current_lead=current_lead, + evaluator_name=score.get("name"), + ) + result.reporting_webhook = webhook_status + if webhook_status == "SUCCESS": + logger.info( + f"[EVALUATOR_ACTION] REPORTING_WEBHOOK SUCCESS | outcome={new_outcome}" + ) + except Exception as e: + logger.error(f"[EVALUATOR_ACTION] REPORTING_WEBHOOK ERROR: {e}") + result.reporting_webhook = "ERROR" + else: + logger.info( + "[EVALUATOR_ACTION] REPORTING_WEBHOOK SKIPPED - send_reporting_webhook=False" + ) + result.reporting_webhook = "SKIPPED" + + # Determine overall success + # DB update is critical - must succeed or be skipped + # Cancel retries and webhook are non-critical - any non-None status is acceptable + result.success = ( + result.db_update in ("SUCCESS", "SKIPPED") + and result.cancel_retries in ("SUCCESS", "SKIPPED", "FAILED", "ERROR") + and result.reporting_webhook + in ("SUCCESS", "SKIPPED", "FAILED", "ERROR") + ) + + # Populate step_results from legacy fields based on action_steps keys + self._populate_step_results(result) + + return result + + finally: + # ALWAYS clear log context when done + clear_log_context() + + def _populate_step_results(self, result: ActionResult) -> None: + """ + Populate step_results dict from legacy fields based on action_steps keys. + + This maps the action_steps config keys (e.g., "update_in_db") + to their execution results (e.g., result.db_update). + + Args: + result: ActionResult to populate step_results in + """ + # Map action_steps keys to legacy result fields + step_mapping = { + "update_in_db": result.db_update, + "send_reporting_webhook": result.reporting_webhook, + "cancel_retries": result.cancel_retries, + } + + # Build step_results from action_steps that were executed + result.step_results = {} + for step_key in self.action_steps: + if step_key in step_mapping: + status = step_mapping[step_key] + if status: # Only include steps that have a status + result.step_results[step_key] = status + + def _extract_outcome(self, score: Dict[str, Any]) -> Optional[str]: + """ + Extract outcome value from score based on configuration. + + Args: + score: The evaluator score dict + + Returns: + Extracted outcome value or None + """ + # Direct outcome specified in config + if self.outcome: + return self.outcome + + # Extract from comment using JSON path + if self.outcome_key: + comment = score.get("comment", "") + if not comment: + logger.warning("No comment field in score to extract outcome from") + return None + + # Extract JSON from end of comment + json_data = extract_json_from_end(comment) + if not json_data: + logger.warning( + f"Could not extract JSON from comment: {comment[:100]}..." + ) + return None + + # Extract the specific field + outcome_value = extract_field(json_data, self.outcome_key) + if not outcome_value: + logger.warning( + f"Could not extract field '{self.outcome_key}' from JSON: {json_data}" + ) + return None + + return outcome_value + + return None + + def _is_allowed_transition( + self, current_outcome: str | None, new_outcome: str + ) -> bool: + """ + Check if outcome transition is allowed. + + Supports "*" wildcard to allow a target from any current outcome. + Returns True if no restrictions are configured (all transitions allowed). + + Args: + current_outcome: Current outcome value (can be None) + new_outcome: Proposed new outcome value + + Returns: + True if transition is allowed, False if blocked + """ + if not self.allowed_outcome_changes: + return True + + # Check wildcard "*" - these targets are allowed from any current outcome + globally_allowed = self.allowed_outcome_changes.get("*", []) + if new_outcome in globally_allowed: + return True + + # Check specific current outcome (None has no specific rules) + if current_outcome is not None: + specifically_allowed = self.allowed_outcome_changes.get(current_outcome, []) + if new_outcome in specifically_allowed: + return True + + return False + + def _is_disallowed_transition( + self, current_outcome: str | None, new_outcome: str + ) -> bool: + """ + Check if outcome transition is disallowed. + + Supports "*" wildcard to disallow target for any current outcome. + + Args: + current_outcome: Current outcome value (can be None) + new_outcome: Proposed new outcome value + + Returns: + True if transition is disallowed, False if allowed + """ + if not self.disallowed_outcome_changes: + return False + + # Check wildcard "*" - applies to all current outcomes + globally_disallowed = self.disallowed_outcome_changes.get("*", []) + if new_outcome in globally_disallowed: + return True + + # Check specific current outcome (None has no specific rules) + if current_outcome is not None: + specifically_disallowed = self.disallowed_outcome_changes.get( + current_outcome, [] + ) + if new_outcome in specifically_disallowed: + return True + + return False + + async def _send_reporting_webhook( + self, + call_sid: str, + new_outcome: str, + current_lead: Any, + evaluator_name: Optional[str] = None, + ) -> tuple[bool, str]: + """ + Send reporting webhook for outcome correction. + + Uses the reporting_webhook_url from lead payload and the same + payload format as the existing clairvoyance webhook mechanism. + + Args: + call_sid: The call SID + new_outcome: The new outcome value + current_lead: The lead object + evaluator_name: Name of the evaluator that triggered this + + Returns: + Tuple of (success, status) where status is one of: + - "SUCCESS": Webhook sent successfully + - "SKIPPED": No reporting_webhook_url in lead payload + - "FAILED": HTTP error response + - "ERROR": Network or unexpected error + """ + # Get reporting_webhook_url from lead payload + lead_payload = getattr(current_lead, "payload", None) or {} + reporting_webhook_url = lead_payload.get("reporting_webhook_url") + + if not reporting_webhook_url: + logger.info( + "[EVALUATOR_ACTION] REPORTING_WEBHOOK SKIPPED - No reporting_webhook_url in lead payload" + ) + return (False, "SKIPPED") + + # Build payload matching existing webhook format + payload = { + "callSid": call_sid, + "outcome": new_outcome, + "orderId": getattr(current_lead, "request_id", None), + "attemptCount": getattr(current_lead, "attempt_count", 0) or 0, + "cancellationReason": None, + "failureReason": None, + "updatedAddress": None, + "transcription": None, + "callDuration": None, + # Additional metadata for tracking outcome corrections + "evaluatorName": evaluator_name, + "correctedBy": "evaluator_action", + "previousOutcome": getattr(current_lead, "outcome", None), + } + + # Generate HMAC signature using the existing mechanism + payload_str = json.dumps(payload, separators=(",", ":"), ensure_ascii=False) + checksum = calculate_hmac_sha256( + payload_str, ORDER_CONFIRMATION_WEBHOOK_SECRET_KEY + ) + + headers = {"Content-Type": "application/json"} + if checksum: + headers["checksum"] = checksum + + try: + timeout = aiohttp.ClientTimeout(total=10.0) + async with aiohttp.ClientSession(timeout=timeout) as session: + # Use data=payload_str (not json=payload) to ensure the body + # matches what was used for HMAC checksum computation + response = await session.post( + reporting_webhook_url, + data=payload_str, + headers=headers, + ) + + if response.status == 200: + return (True, "SUCCESS") + else: + response_text = await response.text() + logger.error( + f"[EVALUATOR_ACTION] REPORTING_WEBHOOK FAILED - " + f"status={response.status} response={response_text[:200]}" + ) + return (False, "FAILED") + + except aiohttp.ClientError as e: + logger.error( + f"[EVALUATOR_ACTION] REPORTING_WEBHOOK ERROR - ClientError: {e}" + ) + return (False, "ERROR") + except Exception as e: + logger.error( + f"[EVALUATOR_ACTION] REPORTING_WEBHOOK ERROR - Unexpected: {e}" + ) + return (False, "ERROR") + + +class ActionExecutor: + """Main executor for evaluator actions""" + + def __init__(self): + self._action_handlers = { + ActionType.OUTCOME_UPDATE: OutcomeUpdateAction, + } + + def should_trigger( + self, + score: Dict[str, Any], + threshold: int, + ) -> bool: + """ + Check if action should trigger based on score being below threshold. + + Args: + score: Score dict with 'value' field + threshold: Threshold value (score below this triggers action) + + Returns: + True if score is below threshold, False otherwise + """ + try: + value = score.get("value") + if value is None: + return False + score_value = float(value) + return score_value < threshold + except (ValueError, TypeError): + logger.warning(f"Invalid score value: {score.get('value')}") + return False + + async def execute_action( + self, + action_type: str, + action_config: Dict[str, Any], + call_sid: Optional[str], + score: Dict[str, Any], + current_lead: Optional[Any] = None, + ) -> ActionResult: + """ + Execute an action for a given score. + + Note: Does not manage log context - caller should manage context. + + Args: + action_type: Type of action (e.g., "outcome_update") + action_config: Config for the action + call_sid: The call SID (can be None if trace has no call_sid) + score: The evaluator score dict + current_lead: Optional pre-fetched lead + + Returns: + ActionResult with detailed status for each step + """ + result = ActionResult(success=False) + + if not call_sid: + logger.error("[EVALUATOR_ACTION] Cannot execute action: call_sid is None") + result.error_message = "call_sid is None" + return result + + # Validate action_type is a known enum value + try: + action_type_enum = ActionType(action_type) + except ValueError: + logger.error(f"[EVALUATOR_ACTION] Unknown action type: {action_type}") + result.error_message = f"Unknown action type: {action_type}" + return result + + handler_class = self._action_handlers.get(action_type_enum) + if not handler_class: + logger.error( + f"[EVALUATOR_ACTION] No handler for action type: {action_type}" + ) + result.error_message = f"No handler for action type: {action_type}" + return result + + try: + handler = handler_class(action_config) + result = await handler.execute(call_sid, score, current_lead) + return result + + except Exception as e: + logger.error( + f"[EVALUATOR_ACTION] Error executing action {action_type}: {e}" + ) + result.error_message = f"Exception: {e}" + return result diff --git a/app/services/langfuse/tasks/actions/utils.py b/app/services/langfuse/tasks/actions/utils.py new file mode 100644 index 000000000..9f7b143a7 --- /dev/null +++ b/app/services/langfuse/tasks/actions/utils.py @@ -0,0 +1,194 @@ +""" +Utility functions for evaluator actions. + +Provides JSON extraction and parsing helpers for processing evaluator comments. +""" + +import json +import re +from typing import Any, Dict, Optional + +from app.core.logger import logger + + +def extract_json_from_end(comment: str) -> Optional[Dict[str, Any]]: + """ + Extract the LAST JSON object from comment text. + + If multiple JSON objects are found, returns the one at the end. + Handles various formats LLMs might output: + - Standard JSON with double quotes + - Single quotes (JavaScript/Python style) + - Unquoted keys (JavaScript object literal) + - Any combination of the above + + Args: + comment: Full comment text that may contain JSON + + Returns: + Parsed JSON dict if found, None otherwise + """ + if not comment: + return None + + # Find all JSON-like objects in the comment + # Pattern matches nested braces up to 2 levels deep + pattern = r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}" + matches = re.findall(pattern, comment, re.DOTALL) + + if not matches: + return None + + # Try to parse each match from the end, return first valid one + for match in reversed(matches): + result = _try_parse_json_flexible(match) + if result is not None: + return result + + logger.warning( + f"Found {len(matches)} JSON-like patterns but none parsed successfully" + ) + return None + + +def _try_parse_json_flexible(json_str: str) -> Optional[Dict[str, Any]]: + """ + Try multiple parsing strategies to handle various JSON formats. + + Strategies (in order of preference): + 1. Standard JSON (double quotes) + 2. Quote unquoted keys, then parse + 3. Convert single quotes to double quotes, then parse + 4. Do both transformations (handles single quotes + unquoted keys) + + Args: + json_str: String that looks like a JSON object + + Returns: + Parsed dict if successful, None otherwise + """ + # Strategy 1: Standard JSON + try: + return json.loads(json_str) + except json.JSONDecodeError: + pass + + # Strategy 2: Quote unquoted keys only + try: + fixed = _quote_unquoted_keys(json_str) + return json.loads(fixed) + except (json.JSONDecodeError, ValueError): + pass + + # Strategy 3: Convert single quotes to double quotes only + try: + fixed = _convert_single_to_double_quotes(json_str) + return json.loads(fixed) + except (json.JSONDecodeError, ValueError): + pass + + # Strategy 4: Both transformations (for single quotes + unquoted keys) + try: + fixed = _quote_unquoted_keys(json_str) + fixed = _convert_single_to_double_quotes(fixed) + return json.loads(fixed) + except (json.JSONDecodeError, ValueError): + pass + + return None + + +def _convert_single_to_double_quotes(s: str) -> str: + """ + Convert single-quoted strings to double-quoted for JSON compatibility. + + Handles escaped single quotes within strings. + Preserves existing double-quoted strings. + + Args: + s: String that may contain single-quoted values + + Returns: + String with single quotes converted to double quotes + """ + result = [] + i = 0 + in_single_quote = False + in_double_quote = False + + while i < len(s): + char = s[i] + + if char == '"' and not in_single_quote: + in_double_quote = not in_double_quote + result.append(char) + elif char == "'" and not in_double_quote: + if not in_single_quote: + # Start of single-quoted string + in_single_quote = True + result.append('"') + else: + # End of single-quoted string + in_single_quote = False + result.append('"') + elif char == "\\" and in_single_quote: + # Handle escapes in single-quoted strings + if i + 1 < len(s) and s[i + 1] == "'": + # Escaped single quote \' -> just ' (apostrophe in double-quoted string) + result.append("'") + i += 1 + elif i + 1 < len(s) and s[i + 1] == '"': + # Escaped double quote inside single-quoted string -> keep escaped + result.append("\\") + result.append('"') + i += 1 + else: + result.append(char) + else: + result.append(char) + i += 1 + + return "".join(result) + + +def _quote_unquoted_keys(s: str) -> str: + """ + Add quotes to unquoted keys in JSON-like strings. + + Converts JavaScript-style object literals to valid JSON. + Example: {name: "value"} -> {"name": "value"} + + Args: + s: String that may contain unquoted keys + + Returns: + String with quoted keys + """ + # Match unquoted keys: identifier followed by : + # Only matches after { or , to avoid matching values + pattern = r"(?<=[{,])\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*:" + return re.sub(pattern, r'"\1":', s) + + +def extract_field(data: Dict[str, Any], json_path: str) -> Optional[str]: + """ + Extract field from JSON using simple path like $.field_name. + + Args: + data: Parsed JSON dict + json_path: Path like "$.actual_outcome" + + Returns: + Extracted value as string, or None if not found + """ + if not data or not json_path: + return None + + # Simple path: $.actual_outcome -> actual_outcome + field = json_path.replace("$.", "") + value = data.get(field) + + # Coerce to string if value exists, return None otherwise + if value is None: + return None + return str(value) diff --git a/app/services/langfuse/tasks/score_monitor/score.py b/app/services/langfuse/tasks/score_monitor/score.py index 7399b529b..035ec8c71 100644 --- a/app/services/langfuse/tasks/score_monitor/score.py +++ b/app/services/langfuse/tasks/score_monitor/score.py @@ -12,6 +12,7 @@ from app.core.config.dynamic import ( DAILY_SUMMARY_HOUR, + EVALUATOR_ACTIONS, LANGFUSE_EVALUATORS, ) from app.core.config.static import LANGFUSE_BASEURL @@ -23,6 +24,7 @@ update_langfuse_scores, ) from app.services.langfuse.client import langfuse_readonly_client +from app.services.langfuse.tasks.actions import ActionExecutor, ActionResult from app.services.langfuse.trace import fetch_trace from app.services.redis import get_redis_service, is_redis_configured from app.services.slack.alert import slack_alert @@ -255,6 +257,7 @@ async def send_score_alert( score: Dict[str, Any], trace_details: Optional[Dict[str, Any]] = None, include_tags: bool = True, + action_result: Optional["ActionResult"] = None, ) -> bool: """ Send a Slack alert for a score below threshold (failure). @@ -267,6 +270,7 @@ async def send_score_alert( include_tags: Whether to include @mentions in the Slack message. Defaults to True. Set to False to suppress tagging (e.g., after the first alert in a batch to reduce notification noise). + action_result: ActionResult with detailed status for each step (or None) Returns: True if alert was sent successfully, False otherwise @@ -317,7 +321,7 @@ async def send_score_alert( # Get actual score value score_value = score.get("value", "N/A") - # Build fields for the alert + # Build fields for the alert (displayed in 2 columns) fields = [ {"name": "Score", "value": f"{score_value} (BELOW THRESHOLD)"}, {"name": "Timestamp", "value": time_str}, @@ -331,6 +335,15 @@ async def send_score_alert( }, ] + # Add Lead ID if available from action result + if action_result and action_result.lead_id: + fields.append({"name": "Lead ID", "value": f"`{action_result.lead_id}`"}) + + # Extract action status if an action was executed (to be passed separately for proper multiline rendering) + action_status = None + if action_result: + action_status = action_result.to_slack_status() + # Build sections for failure reason (if available) sections = [] if comment: @@ -347,6 +360,7 @@ async def send_score_alert( links=links, fallback_text=f"LLM Judge Failure: {evaluator_name} - Score {score_value}", include_tags=include_tags, + action_status=action_status, ) # Track alert count in Redis after successful Slack send @@ -753,6 +767,65 @@ async def get_trace_details(self, trace_id: str) -> Optional[Dict[str, Any]]: logger.error(f"Error fetching trace {trace_id}: {e}") return None + async def _process_evaluator_action( + self, + executor: Optional[ActionExecutor], + action_configs: dict[str, Any], + evaluator_name: str, + score: Dict[str, Any], + call_sid: Optional[str], + evaluators_config: dict[str, int], + call_sid_to_lead: Dict[str, Any], + ) -> Optional[ActionResult]: + """ + Execute evaluator action if configured and triggered. + + Runs the configured action (e.g. outcome_update) when the evaluator score + is below threshold, then re-fetches the lead to update the cache. + + Returns: + ActionResult if action was attempted, None if skipped. + """ + if not executor or not action_configs: + return None + + evaluator_action_config = action_configs.get(evaluator_name) + if not evaluator_action_config: + return None + + threshold = evaluators_config.get(evaluator_name, 5) + if not executor.should_trigger(score, threshold): + return None + + try: + action_type = evaluator_action_config.get("action_type", "outcome_update") + current_lead = call_sid_to_lead.get(call_sid) if call_sid else None + + action_result = await executor.execute_action( + action_type=action_type, + action_config=evaluator_action_config, + call_sid=call_sid, + score=score, + current_lead=current_lead, + ) + + # Update cached lead if action succeeded + if action_result and action_result.success and call_sid: + try: + updated_lead = await get_lead_by_call_id(call_sid) + if updated_lead: + call_sid_to_lead[call_sid] = updated_lead + except Exception as e: + logger.error(f"Error re-fetching lead for call_sid {call_sid}: {e}") + + return action_result + + except Exception as action_error: + logger.error( + f"Error executing action for evaluator '{evaluator_name}': {action_error}" + ) + return ActionResult(success=False, error_message=str(action_error)) + async def check_and_alert(self) -> None: """ Check for zero scores and send Slack alerts. @@ -862,6 +935,7 @@ async def check_and_alert(self) -> None: # ==================================================================== trace_details_cache: Dict[str, Dict[str, Any]] = {} trace_to_call_sid: Dict[str, str] = {} + test_trace_ids: set[str] = set() for trace_id in scores_by_trace.keys(): try: @@ -878,6 +952,10 @@ async def check_and_alert(self) -> None: call_sid = attributes.get("call_sid") if call_sid: trace_to_call_sid[trace_id] = call_sid + # Track test calls to skip evaluator actions + execution_mode = attributes.get("execution_mode") + if execution_mode and execution_mode.endswith("_TEST"): + test_trace_ids.add(trace_id) except Exception as e: logger.error(f"Error fetching trace details for {trace_id}: {e}") @@ -914,24 +992,102 @@ async def check_and_alert(self) -> None: f"{len(failing_scores_by_evaluator)} evaluators, sending Slack alerts..." ) + # Get action configs for processing before alerts + action_configs = await EVALUATOR_ACTIONS() + executor = ActionExecutor() if action_configs else None + + # Pre-fetch leads ONLY for call_sids that have failing scores with matching action configs + call_sid_to_lead: Dict[str, Any] = {} + if executor and action_configs: + # Collect call_sids that need pre-fetching (failing scores with action configs) + call_sids_to_fetch: set[str] = set() + for evaluator_name, failing_scores in failing_scores_by_evaluator.items(): + if action_configs.get(evaluator_name): + for score in failing_scores: + trace_id = score.get("trace_id") + call_sid = trace_to_call_sid.get(trace_id) if trace_id else None + if call_sid and trace_id not in test_trace_ids: + call_sids_to_fetch.add(call_sid) + + # Fetch only the relevant leads + for call_sid in call_sids_to_fetch: + try: + lead = await get_lead_by_call_id(call_sid) + if lead: + call_sid_to_lead[call_sid] = lead + except Exception as e: + logger.error(f"Error fetching lead for call_sid {call_sid}: {e}") + # Send individual alerts for each failing score using cached trace details # Only tag @mentions on the first alert per check cycle to reduce Slack noise + # Collect action results keyed by call_sid for DB storage after the loop + action_results_by_call_sid: Dict[str, Dict[str, Any]] = {} is_first_alert = True for evaluator_name, failing_scores in failing_scores_by_evaluator.items(): for score in failing_scores: try: trace_id = score.get("trace_id") + call_sid = trace_to_call_sid.get(trace_id) if trace_id else None # Use cached trace details instead of fetching again trace_details = ( trace_details_cache.get(trace_id) if trace_id else None ) - # Send Slack alert (only tag users on the first alert) + # Process action BEFORE sending alert + # Skip actions for test calls (TELEPHONY_TEST, DAILY_TEST) + if trace_id in test_trace_ids: + logger.info( + f"[EVALUATOR_ACTION] SKIPPED - test call " + f"(trace_id={trace_id})" + ) + action_result = None + else: + action_result = await self._process_evaluator_action( + executor=executor, + action_configs=action_configs, + evaluator_name=evaluator_name, + score=score, + call_sid=call_sid, + evaluators_config=evaluators_config, + call_sid_to_lead=call_sid_to_lead, + ) + + # Collect action result for DB storage + if action_result and call_sid: + action_cfg = action_configs.get(evaluator_name, {}) + action_results_by_call_sid[call_sid] = { + "evaluator_name": evaluator_name, + "action_type": action_cfg.get( + "action_type", "outcome_update" + ), + "outcome_change": action_result.outcome_change, + "success": action_result.success, + "steps": action_result.step_results + or { + k: v + for k, v in [ + ("update_in_db", action_result.db_update), + ( + "cancel_retries", + action_result.cancel_retries, + ), + ( + "send_reporting_webhook", + action_result.reporting_webhook, + ), + ] + if v is not None + }, + "error_message": action_result.error_message, + } + + # Send Slack alert with action status (only tag users on the first alert) try: await self.send_score_alert( evaluator_name=evaluator_name, score=score, trace_details=trace_details, + action_result=action_result, include_tags=is_first_alert, ) finally: @@ -945,6 +1101,54 @@ async def check_and_alert(self) -> None: f"{alert_error}" ) + # Store action results in DB alongside evaluator scores + if action_results_by_call_sid: + await self._store_action_results(action_results_by_call_sid) + + async def _store_action_results( + self, + action_results_by_call_sid: Dict[str, Dict[str, Any]], + ) -> None: + """ + Merge action results into existing langfuse_scores for each call_sid. + + Reads the current langfuse_scores, adds action_results, and writes back. + """ + stored_count = 0 + for call_sid, action_data in action_results_by_call_sid.items(): + try: + lead = await get_lead_by_call_id(call_sid) + if not lead: + logger.warning( + f"Cannot store action results: lead not found for call_sid {call_sid}" + ) + continue + + existing_scores = lead.langfuse_scores or {} + if "action_results" in existing_scores: + logger.info( + f"Skipping action results for call_sid: {call_sid}: already stored" + ) + continue + existing_scores["action_results"] = action_data + + await update_langfuse_scores(call_sid, existing_scores) + stored_count += 1 + logger.info( + f"Stored action results for call_sid: {call_sid}, " + f"success: {action_data.get('success')}, " + f"outcome_change: {action_data.get('outcome_change')}" + ) + + except Exception as e: + logger.error( + f"Error storing action results for call_sid {call_sid}: {e}" + ) + + logger.info( + f"Stored action results for {stored_count}/{len(action_results_by_call_sid)} leads" + ) + async def _store_scores( self, scores_by_trace: Dict[str, List[Dict[str, Any]]], diff --git a/app/services/slack/alert.py b/app/services/slack/alert.py index 40b65903d..5314cb8fe 100644 --- a/app/services/slack/alert.py +++ b/app/services/slack/alert.py @@ -29,6 +29,7 @@ async def send( links: Optional[List[Dict[str, str]]] = None, fallback_text: Optional[str] = None, include_tags: bool = True, + action_status: Optional[str] = None, ) -> bool: """ Generic function to send Slack alerts with customizable content. @@ -41,6 +42,7 @@ async def send( fallback_text: Optional fallback text for notifications (defaults to title) include_tags: Whether to include @mention tags (default True). Set to False to suppress tagging and reduce Slack notification noise. + action_status: Optional action status string with multiline content Returns: True if sent successfully, False otherwise @@ -62,19 +64,37 @@ async def send( } ] - # Add fields section if provided + # Add fields section if provided (excluding action_status which is handled separately) if fields: - # Slack fields are displayed in 2 columns, so we group them - field_items = [] - for field in fields: - field_items.append( - { - "type": "mrkdwn", - "text": f"*{field.get('name', '')}:*\n{field.get('value', '')}", - } - ) + # Filter out action_status from fields if present + filtered_fields = [ + f for f in fields if f.get("name") != "Action Status" + ] + + if filtered_fields: + # Slack fields are displayed in 2 columns, so we group them + field_items = [] + for field in filtered_fields: + field_items.append( + { + "type": "mrkdwn", + "text": f"*{field.get('name', '')}:*\n{field.get('value', '')}", + } + ) + + blocks.append({"type": "section", "fields": field_items}) - blocks.append({"type": "section", "fields": field_items}) + # Add action status as a separate section if provided (full width below) + if action_status: + blocks.append( + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": f"*Action Status:*\n{action_status}", + }, + } + ) # Add custom sections if provided if sections: diff --git a/docs/EVALUATOR_ACTIONS.md b/docs/EVALUATOR_ACTIONS.md new file mode 100644 index 000000000..423718ee9 --- /dev/null +++ b/docs/EVALUATOR_ACTIONS.md @@ -0,0 +1,412 @@ +# Evaluator Actions + +Auto-correct call outcomes when Langfuse evaluators detect issues. + +## Problem + +When a call completes, the voice agent records an outcome (e.g., `BUSY`, `NO_ANSWER`). Sometimes this is incorrect: +- Voicemail misdetection: Recorded as `BUSY`, actually `VOICEMAIL` +- Outcome mismatch: Recorded as `CANCEL`, actually `CONFIRM` + +This causes: +- Unnecessary retries (calling a customer who already confirmed) +- Wrong Shopify tags +- Poor customer experience + +## What It Does + +When an evaluator score is below threshold: +1. **Update DB** - Correct the outcome in `lead_call_tracker` +2. **Cancel Retries** - Stop pending calls for this request_id (any outcome change) +3. **Send Webhook** - Notify Nautilus to update Shopify tags +4. **Send Slack Alert** - Notify team with detailed action status + +## Flow + +``` +Call completes → Voice Agent records outcome → Webhook to Nautilus (initial) + ↓ + Langfuse evaluates transcript + ↓ + Score < threshold? + ↓ Yes + Check EVALUATOR_ACTIONS config + ↓ + Execute OutcomeUpdateAction: + 1. Update DB outcome + 2. Cancel pending retries (if request_id exists) + 3. Webhook to Nautilus (correction) + ↓ + Send Slack alert with status +``` + +## Configuration + +### Redis: `EVALUATOR_ACTIONS` + +The config is split into two parts: +- **`action_config`**: Controls behavior (what outcome to set, extraction rules) +- **`action_steps`**: Controls which steps to execute (flags for each step) + +```json +{ + "VOICEMAIL DETECTOR": { + "action_type": "outcome_update", + "action_config": { + "outcome": "VOICEMAIL", + "disallowed_outcome_changes": {"*": ["BUSY"]} + }, + "action_steps": { + "update_in_db": true, + "send_reporting_webhook": true, + "cancel_retries": true + } + }, + "OUTCOME CORRECTNESS": { + "action_type": "outcome_update", + "action_config": { + "outcome_key": "$.actual_outcome" + }, + "action_steps": { + "update_in_db": true, + "send_reporting_webhook": true, + "cancel_retries": true + } + } +} +``` + +### Redis: `LANGFUSE_EVALUATORS` +```json +{"VOICEMAIL DETECTOR": 5, "OUTCOME CORRECTNESS": 5} +``` + +### Action Config Options + +| Section | Option | Description | +|---------|--------|-------------| +| `action_config` | `outcome` | Direct outcome value (e.g., `"VOICEMAIL"`) | +| `action_config` | `outcome_key` | JSON path to extract from comment (e.g., `"$.actual_outcome"`) | +| `action_config` | `allowed_outcome_changes` | Permit transitions. `"*": ["CONFIRM"]` allows any→CONFIRM | +| `action_config` | `disallowed_outcome_changes` | Block transitions. `"*": ["BUSY"]` blocks any change TO BUSY | +| `action_steps` | `update_in_db` | Update outcome in database (default: true) | +| `action_steps` | `cancel_retries` | Cancel pending retries for this lead (default: true) | +| `action_steps` | `send_reporting_webhook` | Send webhook to Nautilus (default: true) | + +### Lead Payload Requirement + +Lead must have `reporting_webhook_url` in payload: +```json +{ + "reporting_webhook_url": "https://nautilus.example.com/apps/breeze-buddy/webhooks/clairvoyance", + "customer_mobile_number": "+919876543210" +} +``` + +## Configuration Details + +### Step Names Used in `action_steps` + +The step names from `action_steps` keys are used directly in Slack alerts: +- `update_in_db` - Updates the outcome in database +- `send_reporting_webhook` - Sends webhook to Nautilus +- `cancel_retries` - Cancels pending retry calls + +### Outcome Extraction from Comment + +When using `outcome_key`, the system: +1. Gets `score["comment"]` (full text from evaluator) +2. Finds JSON object `{...}` at the end using regex +3. Parses JSON and extracts the field specified in `outcome_key` + +**Example evaluator response:** +``` +The recorded outcome was CANCEL but based on the conversation analysis, the customer actually confirmed the order. {"recorded_outcome": "CANCEL","actual_outcome":"CONFIRM"} +``` + +**Config to extract `actual_outcome`:** +```json +{ + "action_config": { + "outcome_key": "$.actual_outcome" + } +} +``` + +## Files Changed + +| File | Changes | +|------|---------| +| `tasks/actions/actions.py` | `OutcomeUpdateAction`, `ActionResult`, `ActionExecutor` with step tracking | +| `tasks/actions/utils.py` | JSON extraction and parsing helpers | +| `score_monitor/score.py` | Integrated action execution into ScoreMonitor, Slack alert with action_status | +| `slack/alert.py` | Added `action_status` parameter for full-width step status display | +| `lead_call_tracker.py` (queries) | `cancel_pending_retries_by_request_id_query` | +| `lead_call_tracker.py` (accessor) | `cancel_pending_retries_by_request_id` | +| `dynamic.py` | `EVALUATOR_ACTIONS()` config getter | + +### Log Context Usage + +Actions use `app/core/logger/context.py` for automatic context injection: + +```python +from app.core.logger.context import set_log_context, update_log_context, clear_log_context + +# At start of action +set_log_context(call_sid=call_sid) + +# When lead is found +update_log_context(lead_id=str(lead.id), request_id=lead.request_id, current_outcome=lead.outcome) + +# At end of action +clear_log_context() +``` + +All logs within the action automatically include these fields in JSON output. + +## ActionResult Status + +Each step returns: `SUCCESS` | `SKIPPED` | `FAILED` | `ERROR` + +### Status Icons + +| Status | Icon | Used For | +|--------|------|----------| +| `SUCCESS` | ✅ | Step completed successfully | +| `SKIPPED` | ⏭️ | Step not executed (disabled or precondition failed) | +| `FAILED` | ❌ | Step failed with error | +| `ERROR` | ⚠️ | Unexpected error occurred | + +### ActionResult Fields + +```python +@dataclass +class ActionResult: + success: bool # Overall action success + db_update: Optional[str] # DB step status + cancel_retries: Optional[str] # Retry cancellation status + reporting_webhook: Optional[str] # Webhook step status + error_message: Optional[str] # Error details if any + outcome_change: Optional[str] # "OLD -> NEW" format + canceled_count: Optional[int] # Number of retries cancelled + lead_id: Optional[str] # Lead ID for alerting + step_results: Optional[Dict[str, str]] # Raw step statuses keyed by step name +``` + +### Step Status Legend + +| Step | SUCCESS | SKIPPED | FAILED/ERROR | +|------|---------|---------|--------------| +| `update_in_db` | DB updated | Precondition failed or disabled | Exception | +| `cancel_retries` | Cancelled or none existed | No request_id or disabled | Exception | +| `send_reporting_webhook` | 200 OK | No URL or disabled | Non-200 or exception | + +## Webhook Details + +- **URL**: From `lead.payload.reporting_webhook_url` (no new env vars) +- **HMAC**: Uses existing `ORDER_CONFIRMATION_WEBHOOK_SECRET_KEY` +- **Signature**: Base64-encoded SHA256 HMAC + +### Webhook Payload +```json +{ + "callSid": "CAabc123", + "outcome": "VOICEMAIL", + "orderId": "ORDER-123", + "attemptCount": 1, + "evaluatorName": "VOICEMAIL DETECTOR", + "correctedBy": "evaluator_action", + "previousOutcome": "BUSY" +} +``` + +## Slack Alert Format + +``` +🔴 Breeze Buddy - VOICEMAIL DETECTOR + +Score: 0 (BELOW THRESHOLD) Timestamp: 2026-02-27 10:30:00 +Lead ID: lead-abc123 Trace ID: `trace-xyz` + Call SID: `CAabc123...` + Recording: Listen + +Action Status: +update_in_db: ✅ +send_reporting_webhook: ✅ +cancel_retries: ✅ +Outcome: BUSY -> VOICEMAIL +``` + +### Alert Layout Details + +1. **Header**: Red circle emoji + evaluator name +2. **Fields** (2-column layout): + - Left: Score, Lead ID + - Right: Timestamp, Trace ID, Call SID, Recording link +3. **Action Status** (full-width section below fields): + - Each step on a new line with status icon + - Uses raw `action_steps` keys (snake_case) + - Shows outcome change at bottom +4. **User Mentions**: Configurable via `SLACK_TAG_USERS` env variable + +### `ActionResult.to_slack_status()` Method + +```python +def to_slack_status(self) -> str: + """Generate formatted status for Slack alerts""" + parts = [] + if self.step_results: + for step_name, status in self.step_results.items(): + icon = self.STATUS_ICONS.get(status, "?") + parts.append(f"{step_name}: {icon}") + status_str = "\n".join(parts) if parts else "No actions" + if self.outcome_change: + status_str += f"\n*Outcome:* {self.outcome_change}" + return status_str +``` + +## Logging + +Uses log context (`app/core/logger/context.py`) - all logs automatically include: +- `call_sid` - Call identifier +- `lead_id` - Lead database ID +- `request_id` - Order/request ID +- `conversation_id` - Conversation/trace ID (when available) +- `current_outcome` - Original outcome +- `new_outcome` - Corrected outcome (when available) + +```bash +# View all evaluator action logs +grep "call_sid" /var/log/clairvoyance/app.log | grep "EVALUATOR_ACTION" + +# Or filter by conversation_id +grep "conversation_id.*Customer-Shop" /var/log/clairvoyance/app.log +``` + +JSON log output example: +```json +{ + "timestamp": "2026-02-26T10:30:00Z", + "level": "INFO", + "message": "[EVALUATOR_ACTION] DB_UPDATE SUCCESS", + "call_sid": "CAabc123", + "lead_id": "uuid-xxx", + "request_id": "ORDER-123", + "conversation_id": "Customer-Shop-2026-02-26_10-30-00", + "current_outcome": "BUSY", + "new_outcome": "VOICEMAIL" +} +``` + +Context is managed with `try/finally`: +```python +set_log_context(call_sid=call_sid) +update_log_context(lead_id=..., request_id=..., current_outcome=...) +if conversation_id: + update_log_context(conversation_id=conversation_id) +# ... action logic ... +finally: + clear_log_context() # ALWAYS called +``` + +## Nautilus Integration + +Nautilus handles evaluator correction webhooks at `src/routes/apps/breeze-buddy/webhooks/clairvoyance/+server.ts`. + +When `correctedBy === 'evaluator_action'` is present in the webhook payload, Nautilus: +1. Updates the workflow outcome and metadata (regardless of workflow status — works on completed workflows too) +2. Replaces Shopify tags (removes old outcome tag, adds new one) +3. Adds an order note about the correction +4. Returns early (no re-billing) + +## Example Scenarios + +### Voicemail Detection +- **Initial**: Call recorded as `BUSY`, 3 retries scheduled +- **Evaluator**: Score 0, detects voicemail +- **Action**: DB updated to `VOICEMAIL`, 3 retries cancelled, webhook sent + +### Outcome Mismatch +- **Initial**: Call recorded as `CANCEL`, 2 retries scheduled +- **Evaluator**: Score 2, comment says `{"actual_outcome": "CONFIRM"}` +- **Action**: DB updated to `CONFIRM`, 2 retries cancelled, webhook sent + +## Test Calls + +Evaluator actions are automatically skipped for test calls (`TELEPHONY_TEST`, `DAILY_TEST`). + +- `execution_mode` is set as an OTEL span attribute during the call +- The score monitor reads it from the Langfuse trace metadata before processing actions +- If the trace belongs to a test call, the entire action (DB update, retry cancellation, webhook) is skipped +- Slack alerts and score storage still fire for test calls (monitoring visibility) + +## Rollout Checklist + +### Clairvoyance ✅ +- [x] Implement `OutcomeUpdateAction` with log context +- [x] Add `ActionResult` dataclass with `step_results` dict +- [x] Add `STATUS_ICONS` mapping (✅, ⏭️, ❌, ⚠️) +- [x] Add `to_slack_status()` method for formatted output +- [x] Add `lead_id` field to ActionResult for alerting +- [x] Integrate into ScoreMonitor with action execution +- [x] Add `action_status` parameter to Slack alert +- [x] Implement 2-column fields + full-width Action Status layout +- [x] Use raw `action_steps` keys (snake_case) in alerts +- [x] Add Redis config with `action_config` + `action_steps` format +- [ ] Commit and deploy +- [ ] Verify logs via `grep "evaluator_action" /var/log/clairvoyance/app.log` + +### Nautilus ✅ +- [x] Add handler for `correctedBy === 'evaluator_action'` +- [x] Implement tag replacement logic +- [x] Add order note about correction +- [x] Test end-to-end + +## Testing + +### Test Scripts + +| Script | Purpose | +|--------|---------| +| `scripts/test/demo_langfuse_server.py` | Mock Langfuse server for testing (port 8072) | +| `scripts/test/evaluator_actions_test_data.sql` | Insert test leads for E2E testing | +| `scripts/test/setup_evaluator_test_redis.sh` | Configure Redis for testing | + +### Running E2E Test + +```bash +# 1. Start demo Langfuse server +python scripts/test/demo_langfuse_server.py + +# 2. Configure Redis (in another terminal) +./scripts/test/setup_evaluator_test_redis.sh + +# 3. Insert test data +psql -d clairvoyance -f scripts/test/evaluator_actions_test_data.sql + +# 4. Start the application +uvicorn app.main:app --reload + +# 5. Verify results: +# - Lead outcome: CANCEL -> CONFIRM +# - Retry leads: Cancelled with CANCELLED_BY_OUTCOME_CORRECTION +# - Webhook: Received at demo server /webhook endpoint +# - Slack alert: Sent with Lead ID and Action Status +``` + +## Commit + +```bash +git commit -m "feat: add evaluator-triggered actions for outcome corrections + +- Add OutcomeUpdateAction handler with DB update, retry cancellation, webhook +- Add ActionResult dataclass with step_results dict for detailed tracking +- Add STATUS_ICONS mapping (✅, ⏭️, ❌, ⚠️) and to_slack_status() method +- Add lead_id field to ActionResult for Slack alert inclusion +- Split config into action_config (behavior) and action_steps (execution flags) +- Integrate action execution into ScoreMonitor polling loop +- Add action_status parameter to Slack alert (full-width section below fields) +- Use raw action_steps keys (snake_case) in alert display +- Add cancel_pending_retries_by_request_id query and accessor +- Add EVALUATOR_ACTIONS dynamic config from Redis" +```