Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ REDIS_PORT=6379
REDIS_CLUSTER_NODES=redis1:6379,redis2:6380,redis3:6381

LANGFUSE_EVALUATORS="breeze buddy outcome correctness, latency evaluator" # name of the evaluator
EVALUATOR_ACTIONS='{"OUTCOME MISMATCH": {"action_type": "outcome_update", "action_config": {"outcome_key": "$.actual_outcome"}, "action_steps": {"update_in_db": true, "send_reporting_webhook": true, "cancel_retries": true}}}'
SLACK_WEBHOOK_URL="" # slack webhook url to send alerts
SLACK_TAG_USERS="@john.doe,@jane.smith,@dev.team" # comma-separated list of users to tag in Slack alerts

Expand Down
3 changes: 3 additions & 0 deletions app/ai/voice/agents/breeze_buddy/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,9 @@ async def _run_with_tracing(self, runner: PipelineRunner) -> None:
evaluator_config=(
self.configurations.evaluator_config if self.configurations else None
),
execution_mode=(
self.lead.execution_mode.value if self.lead.execution_mode else None
),
)
try:
with trace.use_span(self.root_span, end_on_exit=True):
Expand Down
15 changes: 15 additions & 0 deletions app/ai/voice/agents/breeze_buddy/observability/tracing_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from app.ai.voice.agents.breeze_buddy.template.context import TemplateContext
from app.ai.voice.agents.breeze_buddy.utils.traces import extract_possible_outcomes
from app.core.config.static import (
BUDDY_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
BUDDY_OTEL_EXPORTER_OTLP_TRACES_HEADERS,
Expand Down Expand Up @@ -79,6 +80,7 @@ def create_root_span(
provider: str,
template_type: str,
evaluator_config: Optional[List[str]] = None,
execution_mode: Optional[str] = None,
) -> trace.Span:
"""
Create and configure the root tracing span with all conversation attributes.
Expand All @@ -94,6 +96,7 @@ def create_root_span(
template_type: Template type being used
evaluator_config: List of evaluator names to add as Langfuse trace tags.
If None/empty, "ALL_EVALS" tag is added so all evaluators run.
execution_mode: Execution mode of the call (e.g. "TELEPHONY", "TELEPHONY_TEST").

Returns:
A span object that can be used with trace.use_span() context manager
Expand Down Expand Up @@ -126,6 +129,8 @@ def create_root_span(
"daily" if transport_type == "daily" else provider,
)
span.set_attribute("template.type", template_type)
if execution_mode:
span.set_attribute("execution_mode", execution_mode)

# Set evaluator tags for Langfuse trace filtering
# Langfuse maps `langfuse.trace.tags` span attribute to trace-level tags
Expand Down Expand Up @@ -203,6 +208,16 @@ def update_span_with_evaluation_data(context: TemplateContext) -> None:
# Core evaluation data
context.root_span.set_attribute("call_outcome", lead.outcome or "UNKNOWN")

# Extract and attach possible outcomes from the template flow definition
template = getattr(context.bot, "template", None)
if template and hasattr(template, "flow") and template.flow:
possible_outcomes = extract_possible_outcomes(template.flow)
if possible_outcomes:
context.root_span.set_attribute(
"possible_outcomes",
json.dumps(possible_outcomes),
)

# Calculate call duration
call_duration = None
if lead.call_initiated_time:
Expand Down
40 changes: 40 additions & 0 deletions app/ai/voice/agents/breeze_buddy/utils/traces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
Tracing utility helpers for Breeze Buddy.
"""

from typing import Any, Dict, List


def extract_possible_outcomes(flow: Dict[str, Any]) -> List[str]:
"""
Extract all possible outcome values from a template flow definition.

Walks through all nodes → functions → hooks to find every
``update_outcome_in_database`` hook with a static ``outcome`` field and
collects the unique values.

Args:
flow: The raw template flow dict (``template.flow``).

Returns:
Deduplicated list of outcome strings defined in the template.
"""
outcomes: list[str] = []
seen: set[str] = set()

for node in flow.get("nodes", []):
for func in node.get("functions", []):
for hook in func.get("hooks", []):
if hook.get("name") != "update_outcome_in_database":
continue
expected_fields = hook.get("expected_fields", {})
outcome_field = expected_fields.get("outcome", {})
if outcome_field.get("source") == "static" and outcome_field.get(
"value"
):
value = outcome_field["value"]
if value not in seen:
seen.add(value)
outcomes.append(value)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

return outcomes
102 changes: 102 additions & 0 deletions app/core/config/dynamic.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from typing import Any

from app.core.logger import logger
from app.services.live_config.store import get_config
Expand Down Expand Up @@ -280,6 +281,107 @@ async def LANGFUSE_EVALUATORS() -> dict[str, int]:
return evaluators


async def EVALUATOR_ACTIONS() -> dict[str, Any]:
"""
Returns EVALUATOR_ACTIONS from Redis as a dict mapping evaluator names to action configs.

Format: JSON string stored in Redis
{
"<VOICEMAIL_EVALUATOR_NAME>": {
"action_type": "outcome_update",
"action_config": {
"outcome": "VOICEMAIL",
"allowed_outcome_changes": {"BUSY": ["VOICEMAIL"]},
"disallowed_outcome_changes": {"*": ["BUSY"]}
},
"action_steps": {
"update_in_db": true,
"send_reporting_webhook": true,
"cancel_retries": true
}
},
"<OUTCOME_CORRECTOR_NAME>": {
"action_type": "outcome_update",
"action_config": {
"outcome_key": "$.correct_outcome"
},
"action_steps": {
"update_in_db": true,
"cancel_retries": true
}
}
}

action_config options:
- outcome: Direct outcome value (e.g., "VOICEMAIL") - use for simple cases
- outcome_key: JSON path to extract from JSON at end of comment (e.g., "$.correct_outcome")
- allowed_outcome_changes: (optional) Dict of {current_outcome: [allowed_new_outcomes]}
Use "*" as key to allow a target from any current outcome. Deny (disallowed) takes precedence.
- disallowed_outcome_changes: (optional) Dict of {current_outcome: [disallowed_new_outcomes]}
Use "*" as key to disallow for all current outcomes (e.g., {"*": ["BUSY"]})

action_steps options:
- update_in_db: (default: true) Update the lead's outcome in the database
- send_reporting_webhook: (default: true) Send reporting webhook for outcome correction
- cancel_retries: (default: true) Cancel any pending retry leads

Trigger logic: Action triggers when score < threshold (from LANGFUSE_EVALUATORS config)
Comment thread
narsimhaReddyJuspay marked this conversation as resolved.
"""
config_value = await get_config("EVALUATOR_ACTIONS", "", str)
if not config_value:
return {}

try:
parsed = json.loads(config_value)
if not isinstance(parsed, dict):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decode here

logger.error(
f"EVALUATOR_ACTIONS config must be a JSON object, got {type(parsed).__name__}"
)
return {}

# Validate and decode each evaluator config entry
valid_actions: dict[str, Any] = {}
for evaluator_name, config in parsed.items():
if not isinstance(config, dict):
logger.warning(
f"EVALUATOR_ACTIONS: skipping '{evaluator_name}' — config must be an object, got {type(config).__name__}"
)
continue

action_type = config.get("action_type")
if not action_type or not isinstance(action_type, str):
logger.warning(
f"EVALUATOR_ACTIONS: skipping '{evaluator_name}' — missing or invalid 'action_type'"
)
continue

action_config = config.get("action_config")
if action_config is not None and not isinstance(action_config, dict):
logger.warning(
f"EVALUATOR_ACTIONS: skipping '{evaluator_name}' — 'action_config' must be an object"
)
continue

action_steps = config.get("action_steps")
if action_steps is not None and not isinstance(action_steps, dict):
logger.warning(
f"EVALUATOR_ACTIONS: skipping '{evaluator_name}' — 'action_steps' must be an object"
)
continue

valid_actions[evaluator_name] = config

if len(valid_actions) < len(parsed):
logger.info(
f"EVALUATOR_ACTIONS: {len(valid_actions)}/{len(parsed)} configs valid"
)

return valid_actions
except json.JSONDecodeError as e:
logger.error(f"Failed to parse EVALUATOR_ACTIONS config: {e}")
return {}
Comment thread
coderabbitai[bot] marked this conversation as resolved.


# --- Noise Cancellation Configuration ---
async def BB_NOISE_CANCELLATION_ENABLED() -> bool:
"""Returns BB_NOISE_CANCELLATION_ENABLED from Redis"""
Expand Down
41 changes: 41 additions & 0 deletions app/database/accessor/breeze_buddy/lead_call_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
abort_lead_by_id_query,
acquire_lock_on_lead_by_id_query,
append_metadata_field_query,
cancel_pending_retries_by_request_id_query,
defer_lead_next_attempt_and_release_lock_query,
get_all_lead_call_trackers_query,
get_lead_based_analytics_query,
Expand Down Expand Up @@ -625,6 +626,46 @@ async def handle_lead_abort(
return None


async def cancel_pending_retries_by_request_id(
request_id: str,
reason: str = "outcome_corrected",
) -> int:
"""
Cancel all pending retry leads for a given request_id.

Used when an outcome is corrected to a terminal state (e.g., BUSY -> CONFIRM)
to prevent scheduled retries from executing.

Args:
request_id: The order/request ID to cancel retries for
reason: Reason for cancellation (stored in meta_data)

Returns:
Number of leads cancelled
"""
logger.info(f"Cancelling pending retries for request_id: {request_id}")

try:
query_text, values = cancel_pending_retries_by_request_id_query(
request_id, reason
)
result = await run_parameterized_query(query_text, values)
cancelled_count = get_row_count(result) if result else 0

if cancelled_count > 0:
logger.info(
f"Cancelled {cancelled_count} pending retry leads for request_id: {request_id}"
)

return cancelled_count

except Exception as e:
logger.error(
f"Error cancelling pending retries for request_id {request_id}: {e}"
)
return 0


async def update_lead_payload(
lead_id: str, payload_updates: Dict[str, Any]
) -> Optional[LeadCallTracker]:
Expand Down
48 changes: 47 additions & 1 deletion app/database/queries/breeze_buddy/lead_call_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

import json
from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple

from app.schemas import CallDirection, ExecutionMode, LeadCallStatus
Expand Down Expand Up @@ -582,6 +582,52 @@ def abort_lead_by_id_query(
return text, values


def cancel_pending_retries_by_request_id_query(
request_id: str,
reason: str = "outcome_corrected",
) -> Tuple[str, List[Any]]:
"""
Generate query to cancel all pending retry leads for a given request_id.

Used when an outcome is corrected to a terminal state (e.g., BUSY -> CONFIRM)
to prevent scheduled retries from executing.

Args:
request_id: The order/request ID to cancel retries for
reason: Reason for cancellation (stored in meta_data)

Returns:
Tuple of (query_text, values)
"""
text = f"""
UPDATE "{LEAD_CALL_TRACKER_TABLE}"
SET
"status" = $1,
"outcome" = $2,
"updated_at" = NOW(),
"meta_data" = COALESCE("meta_data", '{{}}')::jsonb || $3::jsonb
WHERE
"request_id" = $4
AND "status" IN ($5, $6)
RETURNING *;
"""

metadata = {
"cancelled_at": datetime.now(timezone.utc).isoformat(),
"cancel_reason": reason,
}

values = [
LeadCallStatus.FINISHED.value,
"CANCELLED_BY_OUTCOME_CORRECTION",
json.dumps(metadata),
request_id,
LeadCallStatus.BACKLOG.value,
LeadCallStatus.RETRY.value,
]
return text, values


def update_lead_payload_query(
lead_id: str, payload_updates: Dict[str, Any]
) -> Tuple[str, List[Any]]:
Expand Down
3 changes: 3 additions & 0 deletions app/services/langfuse/tasks/actions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from app.services.langfuse.tasks.actions.actions import ActionExecutor, ActionResult

__all__ = ["ActionExecutor", "ActionResult"]
Loading
Loading