Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,6 @@ logs
.claude/settings.local.json
.claude/memory/
CLAUDE.local.md
.claude/settings.json
.claude/settings.json

temp/
2 changes: 2 additions & 0 deletions app/ai/voice/agents/breeze_buddy/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def __init__(
self.stream_sid: Optional[str] = None
self.vad_analyzer: Optional[SileroVADAnalyzer] = None
self.transport: Any = None
self.room_url: Optional[str] = None # Daily room URL (Daily mode only)
self.lead: Optional[LeadCallTracker] = None
self.root_span: Any = None
self.flow_manager: Optional[FlowManager] = None
Expand Down Expand Up @@ -354,6 +355,7 @@ async def _setup_daily_transport(self, runner_args: RunnerArguments) -> None:

transport_params = get_transport_params(self.template, self.configurations)
self.transport = await create_transport(runner_args, transport_params)
self.room_url = getattr(runner_args, "room_url", None)

async def _setup_telephony_transport(self) -> bool:
"""Initialize transport for telephony mode. Returns False if setup fails."""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
"""
Daily warm transfer handler.

Invoked when ``connect_to_live_agent`` fires inside a Daily-mode call. Dials
the configured human agent on PSTN via the lead's telephony provider, then
waits for an inbound bridge handler to confirm it has joined the Daily room.
On success, ends the AI conversation (AI bot leaves the room). On failure,
returns a graceful error so the LLM can inform the customer.

The audio bridging itself runs in
``app/api/routers/breeze_buddy/telephony/bridge.py`` — when the dialed agent
answers, the provider's answer webhook is intercepted (see
``answer/handlers.py``) and the WebSocket is routed to the bridge handler.
"""

import asyncio
from typing import Any, Dict, Optional

from pipecat.transports.daily.utils import (
DailyMeetingTokenParams,
DailyMeetingTokenProperties,
DailyRESTHelper,
)

from app.ai.voice.agents.breeze_buddy.handlers.internal.end_conversation import (
end_conversation,
)
from app.ai.voice.agents.breeze_buddy.template.context import TemplateContext
from app.ai.voice.agents.breeze_buddy.utils.bridge_flag import (
STATUS_FAILED,
STATUS_JOINED,
clear_bridge_flag,
get_bridge_flag,
set_bridge_flag,
update_bridge_status,
)
from app.core.config.static import (
BREEZE_BUDDY_DAILY_API_KEY,
BREEZE_BUDDY_DAILY_API_URL,
)
from app.core.logger import logger
from app.core.transport.http_client import create_aiohttp_session
from app.database.accessor import get_outbound_number_by_id

# How long to wait for the bridge handler to mark itself "joined" after the
# outbound agent call is initiated. Dial + ring + answer + Daily-join can
# realistically take ~10-25s; 45s gives headroom without leaving the customer
# hanging too long.
BRIDGE_JOIN_TIMEOUT_SECONDS = 45.0
BRIDGE_POLL_INTERVAL_SECONDS = 0.25


async def _mint_bridge_daily_token(room_url: str, aiohttp_session) -> Optional[str]:
"""Mint a fresh owner token for the bridge handler to join the Daily room."""
daily_rest = DailyRESTHelper(
daily_api_key=BREEZE_BUDDY_DAILY_API_KEY,
daily_api_url=BREEZE_BUDDY_DAILY_API_URL,
aiohttp_session=aiohttp_session,
)
try:
return await daily_rest.get_token(
room_url,
expiry_time=3600,
params=DailyMeetingTokenParams(properties=DailyMeetingTokenProperties()),
)
except Exception as e:
logger.error(f"Failed to mint bridge Daily token for {room_url}: {e}")
return None


async def _wait_for_bridge_status(
call_sid: str,
timeout_seconds: float = BRIDGE_JOIN_TIMEOUT_SECONDS,
) -> Optional[str]:
"""Poll the bridge flag until status leaves ``dialing`` or timeout.

Returns the terminal status (``joined`` / ``failed``) or None on timeout.
"""
deadline = asyncio.get_event_loop().time() + timeout_seconds
while asyncio.get_event_loop().time() < deadline:
flag = await get_bridge_flag(call_sid)
if flag is None:
# Flag was cleared by an error path — treat as failure.
return STATUS_FAILED
status = flag.get("status")
if status in (STATUS_JOINED, STATUS_FAILED):
return status
await asyncio.sleep(BRIDGE_POLL_INTERVAL_SECONDS)
return None


async def connect_to_live_agent_daily(
context: TemplateContext,
args: Dict[str, Any],
transition_to: Optional[str] = None,
) -> Dict[str, Any]:
"""Daily-mode warm transfer to a human agent on PSTN.

Mirrors the failure-mode shape of ``handlers.internal.warm_transfer.connect_to_live_agent``
so the LLM sees consistent error contracts across telephony and Daily modes.
"""
customer_call_sid = context.call_sid
logger.info(f"[DailyTransfer] Initiated for customer call {customer_call_sid}")

if not context.lead or not context.lead.outbound_number_id:
logger.error(
f"[DailyTransfer] No outbound_number_id on lead for {customer_call_sid}"
)
return {
"status": "failed",
"reason": "missing_outbound_number_id",
"message": "Outbound number not configured for this call",
}

if not context.room_url:
logger.error(f"[DailyTransfer] No room_url on bot for {customer_call_sid}")
return {
"status": "failed",
"reason": "missing_room_url",
"message": "Daily room URL unavailable",
}

configurations = getattr(context.bot, "configurations", None)
transfer_number = getattr(configurations, "transfer_number", None)
if not transfer_number:
logger.warning(
f"[DailyTransfer] No transfer_number configured for {customer_call_sid}"
)
return {
"status": "failed",
"reason": "transfer_number_not_configured",
"message": (
"Transfer number is not configured for this assistant. "
"Continuing with AI."
),
}

outbound_number_record = await get_outbound_number_by_id(
context.lead.outbound_number_id
)
if not outbound_number_record:
logger.error(
f"[DailyTransfer] outbound number not found "
f"({context.lead.outbound_number_id})"
)
return {
"status": "failed",
"reason": "outbound_number_not_found",
"message": "Outbound number configuration not found",
}

provider_enum = outbound_number_record.provider
provider_name = provider_enum.value.lower()

# Lazy import to avoid circulars (telephony/utils imports the agent module).
from app.ai.voice.agents.breeze_buddy.services.telephony.utils import (
get_voice_provider,
)

aiohttp_session = context.aiohttp_session or create_aiohttp_session()

# Mint a Daily owner token for the bridge to use when joining the room.
daily_token = await _mint_bridge_daily_token(context.room_url, aiohttp_session)
if not daily_token:
Comment on lines +159 to +164
return {
"status": "failed",
"reason": "daily_token_mint_failed",
"message": "Could not mint Daily token for bridge",
}

# Derive room_name from the URL (Daily URL = https://{team}.daily.co/{room})
room_name = context.room_url.rstrip("/").rsplit("/", 1)[-1]

voice_provider = get_voice_provider(provider_enum, aiohttp_session)

# Initiate outbound call to the agent. The provider's answer webhook
# (e.g. /plivo/answer) will detect the bridge flag and route the WS to
# the bridge handler instead of the AI bot.
try:
call_result = voice_provider.make_call(
customer_mobile_number=transfer_number,
outbound_number=outbound_number_record.number,
reseller_id=context.lead.reseller_id,
template_name=None,
)
except Exception as e:
logger.error(
f"[DailyTransfer] make_call exception for {customer_call_sid}: {e}",
exc_info=True,
)
return {
"status": "failed",
"reason": "make_call_exception",
"message": "Failed to initiate agent call",
"error": str(e),
}

if not call_result or call_result.get("status") != "call_initiated":
logger.error(
f"[DailyTransfer] make_call did not initiate for {customer_call_sid}: "
f"{call_result}"
)
return {
"status": "failed",
"reason": "make_call_failed",
"message": "Provider did not accept the agent call request",
}

agent_call_sid = call_result.get("sid")
if not agent_call_sid:
return {
"status": "failed",
"reason": "missing_agent_call_sid",
"message": "Provider returned no call sid",
}

# Set bridge flag keyed by the agent leg's call_sid. The provider's answer
# webhook will look this up to route the WS to the bridge handler.
await set_bridge_flag(
call_sid=agent_call_sid,
room_url=context.room_url,
room_name=room_name,
lead_id=str(context.lead.id),
provider=provider_name,
outbound_number=outbound_number_record.number,
agent_phone=transfer_number,
daily_token=daily_token,
)

logger.info(
f"[DailyTransfer] Dialing agent {transfer_number} via {provider_name}, "
f"agent_call_sid={agent_call_sid}, waiting for bridge join…"
)

# Wait for the bridge to join the Daily room (or fail).
terminal_status = await _wait_for_bridge_status(agent_call_sid)

if terminal_status != STATUS_JOINED:
# Failure or timeout — clear flag, leave AI conversation running so
# the LLM can apologise to the customer.
reason = (
"join_timeout"
if terminal_status is None
else (
(await get_bridge_flag(agent_call_sid) or {}).get("failure_reason")
or "bridge_failed"
)
)
await update_bridge_status(agent_call_sid, STATUS_FAILED, failure_reason=reason)
logger.warning(
f"[DailyTransfer] Bridge did not join for {agent_call_sid}: {reason}"
)
return {
"status": "failed",
"reason": reason,
"message": (
"Could not connect the human agent. Continuing with AI assistant."
),
}

# Bridge is live in the Daily room — record outcome and end AI conversation.
transfer_meta = {
"status": "success",
"conference_id": agent_call_sid, # No conference; reuse field for parity
"agent_phone_number": transfer_number,
"agent_call_id": agent_call_sid,
"via": "daily_bridge",
}
if context.lead and hasattr(context.lead, "metaData"):
if context.lead.metaData is None:
context.lead.metaData = {}
context.lead.metaData["transfer"] = transfer_meta

logger.info(
f"[DailyTransfer] Bridge joined for {agent_call_sid}; ending AI conversation"
)

await end_conversation(context, None)

# Bridge handler is responsible for clearing its own flag when both legs end.
# We only clear here on the failure path.
_ = clear_bridge_flag # Referenced for explicitness; bridge handler clears.

return {
"status": "success",
"conference_id": agent_call_sid,
"agent_call_id": agent_call_sid,
"message": "Successfully transferred to human agent",
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

from typing import Any, Dict, Optional

from app.ai.voice.agents.breeze_buddy.handlers.internal.daily_warm_transfer import (
connect_to_live_agent_daily,
)
from app.ai.voice.agents.breeze_buddy.handlers.internal.end_conversation import (
end_conversation,
)
Expand All @@ -18,7 +21,13 @@
from app.core.config.static import APP_BASE_URL
from app.core.logger import logger
from app.database.accessor import get_outbound_number_by_id
from app.schemas import CallProvider
from app.schemas import CallProvider, ExecutionMode

DAILY_EXECUTION_MODES = {
ExecutionMode.DAILY,
ExecutionMode.DAILY_TEST,
ExecutionMode.DAILY_STREAM,
}


async def connect_to_live_agent(
Expand All @@ -42,6 +51,13 @@ async def connect_to_live_agent(
"""
logger.info(f"Transfer called for {context.call_sid}")

# Daily-mode calls have no telephony leg to bridge into — delegate to the
# Daily-specific handler which dials the agent on PSTN and bridges audio
# into the Daily room via an in-process WebSocket forwarder.
lead = context.lead
if lead is not None and lead.execution_mode in DAILY_EXECUTION_MODES:
return await connect_to_live_agent_daily(context, args, transition_to)

# Fetch outbound number from database
if not context.lead or not context.lead.outbound_number_id:
logger.error(
Expand Down
Loading
Loading