diff --git a/.gitignore b/.gitignore index 0cf35fa54..67db36ba1 100644 --- a/.gitignore +++ b/.gitignore @@ -54,4 +54,6 @@ logs .claude/settings.local.json .claude/memory/ CLAUDE.local.md -.claude/settings.json \ No newline at end of file +.claude/settings.json + +temp/ \ No newline at end of file diff --git a/app/ai/voice/agents/breeze_buddy/agent/__init__.py b/app/ai/voice/agents/breeze_buddy/agent/__init__.py index c08d512bb..510974294 100644 --- a/app/ai/voice/agents/breeze_buddy/agent/__init__.py +++ b/app/ai/voice/agents/breeze_buddy/agent/__init__.py @@ -135,6 +135,7 @@ def __init__( self.stream_sid: Optional[str] = None self.vad_analyzer: Optional[SileroVADAnalyzer] = None self.transport: Any = None + self.room_url: Optional[str] = None # Daily room URL (Daily mode only) self.lead: Optional[LeadCallTracker] = None self.root_span: Any = None self.flow_manager: Optional[FlowManager] = None @@ -354,6 +355,7 @@ async def _setup_daily_transport(self, runner_args: RunnerArguments) -> None: transport_params = get_transport_params(self.template, self.configurations) self.transport = await create_transport(runner_args, transport_params) + self.room_url = getattr(runner_args, "room_url", None) async def _setup_telephony_transport(self) -> bool: """Initialize transport for telephony mode. Returns False if setup fails.""" diff --git a/app/ai/voice/agents/breeze_buddy/handlers/internal/daily_warm_transfer.py b/app/ai/voice/agents/breeze_buddy/handlers/internal/daily_warm_transfer.py new file mode 100644 index 000000000..16c760b74 --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/handlers/internal/daily_warm_transfer.py @@ -0,0 +1,289 @@ +""" +Daily warm transfer handler. + +Invoked when ``connect_to_live_agent`` fires inside a Daily-mode call. Dials +the configured human agent on PSTN via the lead's telephony provider, then +waits for an inbound bridge handler to confirm it has joined the Daily room. +On success, ends the AI conversation (AI bot leaves the room). On failure, +returns a graceful error so the LLM can inform the customer. + +The audio bridging itself runs in +``app/api/routers/breeze_buddy/telephony/bridge.py`` — when the dialed agent +answers, the provider's answer webhook is intercepted (see +``answer/handlers.py``) and the WebSocket is routed to the bridge handler. +""" + +import asyncio +from typing import Any, Dict, Optional + +from pipecat.transports.daily.utils import ( + DailyMeetingTokenParams, + DailyMeetingTokenProperties, + DailyRESTHelper, +) + +from app.ai.voice.agents.breeze_buddy.handlers.internal.end_conversation import ( + end_conversation, +) +from app.ai.voice.agents.breeze_buddy.template.context import TemplateContext +from app.ai.voice.agents.breeze_buddy.utils.bridge_flag import ( + STATUS_FAILED, + STATUS_JOINED, + clear_bridge_flag, + get_bridge_flag, + set_bridge_flag, + update_bridge_status, +) +from app.core.config.static import ( + BREEZE_BUDDY_DAILY_API_KEY, + BREEZE_BUDDY_DAILY_API_URL, +) +from app.core.logger import logger +from app.core.transport.http_client import create_aiohttp_session +from app.database.accessor import get_outbound_number_by_id + +# How long to wait for the bridge handler to mark itself "joined" after the +# outbound agent call is initiated. Dial + ring + answer + Daily-join can +# realistically take ~10-25s; 45s gives headroom without leaving the customer +# hanging too long. +BRIDGE_JOIN_TIMEOUT_SECONDS = 45.0 +BRIDGE_POLL_INTERVAL_SECONDS = 0.25 + + +async def _mint_bridge_daily_token(room_url: str, aiohttp_session) -> Optional[str]: + """Mint a fresh owner token for the bridge handler to join the Daily room.""" + daily_rest = DailyRESTHelper( + daily_api_key=BREEZE_BUDDY_DAILY_API_KEY, + daily_api_url=BREEZE_BUDDY_DAILY_API_URL, + aiohttp_session=aiohttp_session, + ) + try: + return await daily_rest.get_token( + room_url, + expiry_time=3600, + params=DailyMeetingTokenParams(properties=DailyMeetingTokenProperties()), + ) + except Exception as e: + logger.error(f"Failed to mint bridge Daily token for {room_url}: {e}") + return None + + +async def _wait_for_bridge_status( + call_sid: str, + timeout_seconds: float = BRIDGE_JOIN_TIMEOUT_SECONDS, +) -> Optional[str]: + """Poll the bridge flag until status leaves ``dialing`` or timeout. + + Returns the terminal status (``joined`` / ``failed``) or None on timeout. + """ + deadline = asyncio.get_event_loop().time() + timeout_seconds + while asyncio.get_event_loop().time() < deadline: + flag = await get_bridge_flag(call_sid) + if flag is None: + # Flag was cleared by an error path — treat as failure. + return STATUS_FAILED + status = flag.get("status") + if status in (STATUS_JOINED, STATUS_FAILED): + return status + await asyncio.sleep(BRIDGE_POLL_INTERVAL_SECONDS) + return None + + +async def connect_to_live_agent_daily( + context: TemplateContext, + args: Dict[str, Any], + transition_to: Optional[str] = None, +) -> Dict[str, Any]: + """Daily-mode warm transfer to a human agent on PSTN. + + Mirrors the failure-mode shape of ``handlers.internal.warm_transfer.connect_to_live_agent`` + so the LLM sees consistent error contracts across telephony and Daily modes. + """ + customer_call_sid = context.call_sid + logger.info(f"[DailyTransfer] Initiated for customer call {customer_call_sid}") + + if not context.lead or not context.lead.outbound_number_id: + logger.error( + f"[DailyTransfer] No outbound_number_id on lead for {customer_call_sid}" + ) + return { + "status": "failed", + "reason": "missing_outbound_number_id", + "message": "Outbound number not configured for this call", + } + + if not context.room_url: + logger.error(f"[DailyTransfer] No room_url on bot for {customer_call_sid}") + return { + "status": "failed", + "reason": "missing_room_url", + "message": "Daily room URL unavailable", + } + + configurations = getattr(context.bot, "configurations", None) + transfer_number = getattr(configurations, "transfer_number", None) + if not transfer_number: + logger.warning( + f"[DailyTransfer] No transfer_number configured for {customer_call_sid}" + ) + return { + "status": "failed", + "reason": "transfer_number_not_configured", + "message": ( + "Transfer number is not configured for this assistant. " + "Continuing with AI." + ), + } + + outbound_number_record = await get_outbound_number_by_id( + context.lead.outbound_number_id + ) + if not outbound_number_record: + logger.error( + f"[DailyTransfer] outbound number not found " + f"({context.lead.outbound_number_id})" + ) + return { + "status": "failed", + "reason": "outbound_number_not_found", + "message": "Outbound number configuration not found", + } + + provider_enum = outbound_number_record.provider + provider_name = provider_enum.value.lower() + + # Lazy import to avoid circulars (telephony/utils imports the agent module). + from app.ai.voice.agents.breeze_buddy.services.telephony.utils import ( + get_voice_provider, + ) + + aiohttp_session = context.aiohttp_session or create_aiohttp_session() + + # Mint a Daily owner token for the bridge to use when joining the room. + daily_token = await _mint_bridge_daily_token(context.room_url, aiohttp_session) + if not daily_token: + return { + "status": "failed", + "reason": "daily_token_mint_failed", + "message": "Could not mint Daily token for bridge", + } + + # Derive room_name from the URL (Daily URL = https://{team}.daily.co/{room}) + room_name = context.room_url.rstrip("/").rsplit("/", 1)[-1] + + voice_provider = get_voice_provider(provider_enum, aiohttp_session) + + # Initiate outbound call to the agent. The provider's answer webhook + # (e.g. /plivo/answer) will detect the bridge flag and route the WS to + # the bridge handler instead of the AI bot. + try: + call_result = voice_provider.make_call( + customer_mobile_number=transfer_number, + outbound_number=outbound_number_record.number, + reseller_id=context.lead.reseller_id, + template_name=None, + ) + except Exception as e: + logger.error( + f"[DailyTransfer] make_call exception for {customer_call_sid}: {e}", + exc_info=True, + ) + return { + "status": "failed", + "reason": "make_call_exception", + "message": "Failed to initiate agent call", + "error": str(e), + } + + if not call_result or call_result.get("status") != "call_initiated": + logger.error( + f"[DailyTransfer] make_call did not initiate for {customer_call_sid}: " + f"{call_result}" + ) + return { + "status": "failed", + "reason": "make_call_failed", + "message": "Provider did not accept the agent call request", + } + + agent_call_sid = call_result.get("sid") + if not agent_call_sid: + return { + "status": "failed", + "reason": "missing_agent_call_sid", + "message": "Provider returned no call sid", + } + + # Set bridge flag keyed by the agent leg's call_sid. The provider's answer + # webhook will look this up to route the WS to the bridge handler. + await set_bridge_flag( + call_sid=agent_call_sid, + room_url=context.room_url, + room_name=room_name, + lead_id=str(context.lead.id), + provider=provider_name, + outbound_number=outbound_number_record.number, + agent_phone=transfer_number, + daily_token=daily_token, + ) + + logger.info( + f"[DailyTransfer] Dialing agent {transfer_number} via {provider_name}, " + f"agent_call_sid={agent_call_sid}, waiting for bridge join…" + ) + + # Wait for the bridge to join the Daily room (or fail). + terminal_status = await _wait_for_bridge_status(agent_call_sid) + + if terminal_status != STATUS_JOINED: + # Failure or timeout — clear flag, leave AI conversation running so + # the LLM can apologise to the customer. + reason = ( + "join_timeout" + if terminal_status is None + else ( + (await get_bridge_flag(agent_call_sid) or {}).get("failure_reason") + or "bridge_failed" + ) + ) + await update_bridge_status(agent_call_sid, STATUS_FAILED, failure_reason=reason) + logger.warning( + f"[DailyTransfer] Bridge did not join for {agent_call_sid}: {reason}" + ) + return { + "status": "failed", + "reason": reason, + "message": ( + "Could not connect the human agent. Continuing with AI assistant." + ), + } + + # Bridge is live in the Daily room — record outcome and end AI conversation. + transfer_meta = { + "status": "success", + "conference_id": agent_call_sid, # No conference; reuse field for parity + "agent_phone_number": transfer_number, + "agent_call_id": agent_call_sid, + "via": "daily_bridge", + } + if context.lead and hasattr(context.lead, "metaData"): + if context.lead.metaData is None: + context.lead.metaData = {} + context.lead.metaData["transfer"] = transfer_meta + + logger.info( + f"[DailyTransfer] Bridge joined for {agent_call_sid}; ending AI conversation" + ) + + await end_conversation(context, None) + + # Bridge handler is responsible for clearing its own flag when both legs end. + # We only clear here on the failure path. + _ = clear_bridge_flag # Referenced for explicitness; bridge handler clears. + + return { + "status": "success", + "conference_id": agent_call_sid, + "agent_call_id": agent_call_sid, + "message": "Successfully transferred to human agent", + } diff --git a/app/ai/voice/agents/breeze_buddy/handlers/internal/warm_transfer.py b/app/ai/voice/agents/breeze_buddy/handlers/internal/warm_transfer.py index 09f8fe898..3dfde2d02 100644 --- a/app/ai/voice/agents/breeze_buddy/handlers/internal/warm_transfer.py +++ b/app/ai/voice/agents/breeze_buddy/handlers/internal/warm_transfer.py @@ -7,6 +7,9 @@ from typing import Any, Dict, Optional +from app.ai.voice.agents.breeze_buddy.handlers.internal.daily_warm_transfer import ( + connect_to_live_agent_daily, +) from app.ai.voice.agents.breeze_buddy.handlers.internal.end_conversation import ( end_conversation, ) @@ -18,7 +21,13 @@ from app.core.config.static import APP_BASE_URL from app.core.logger import logger from app.database.accessor import get_outbound_number_by_id -from app.schemas import CallProvider +from app.schemas import CallProvider, ExecutionMode + +DAILY_EXECUTION_MODES = { + ExecutionMode.DAILY, + ExecutionMode.DAILY_TEST, + ExecutionMode.DAILY_STREAM, +} async def connect_to_live_agent( @@ -42,6 +51,13 @@ async def connect_to_live_agent( """ logger.info(f"Transfer called for {context.call_sid}") + # Daily-mode calls have no telephony leg to bridge into — delegate to the + # Daily-specific handler which dials the agent on PSTN and bridges audio + # into the Daily room via an in-process WebSocket forwarder. + lead = context.lead + if lead is not None and lead.execution_mode in DAILY_EXECUTION_MODES: + return await connect_to_live_agent_daily(context, args, transition_to) + # Fetch outbound number from database if not context.lead or not context.lead.outbound_number_id: logger.error( diff --git a/app/ai/voice/agents/breeze_buddy/services/daily/transfer_bridge.py b/app/ai/voice/agents/breeze_buddy/services/daily/transfer_bridge.py new file mode 100644 index 000000000..1fc9c1471 --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/services/daily/transfer_bridge.py @@ -0,0 +1,190 @@ +""" +In-process audio bridge between a telephony WebSocket leg and a Daily room. + +Used by the Daily warm-transfer flow: when the dialed agent picks up, the +telephony provider opens a WebSocket to the bridge endpoint. This module +builds a Pipecat dual-transport pipeline that forwards audio in both +directions with no STT/LLM/TTS in the path. Sample-rate conversion is +delegated to Pipecat's transport layer (input transport emits frames at its +native rate; output transport resamples to its own configured rate). + +V1 supports Plivo only. Exotel works through the same dispatcher path and +can be added by wiring its serializer + provider hangup; Twilio requires a +separate dial path (its `make_call` does not route through `/answer`). +""" + +import asyncio +from typing import Optional + +from fastapi import WebSocket +from pipecat.frames.frames import ( + EndFrame, + Frame, + InputAudioRawFrame, + OutputAudioRawFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.serializers.plivo import PlivoFrameSerializer +from pipecat.transports.daily.transport import DailyParams, DailyTransport +from pipecat.transports.websocket.fastapi import ( + FastAPIWebsocketParams, + FastAPIWebsocketTransport, +) + +from app.ai.voice.agents.breeze_buddy.template.vad import TELEPHONY_SAMPLE_RATE +from app.ai.voice.agents.breeze_buddy.utils.bridge_flag import ( + STATUS_DISCONNECTED, + STATUS_FAILED, + STATUS_JOINED, + clear_bridge_flag, + update_bridge_status, +) +from app.core.config.static import PLIVO_AUTH_ID, PLIVO_AUTH_TOKEN +from app.core.logger import logger + +DAILY_BRIDGE_SAMPLE_RATE = 24000 +BRIDGE_BOT_NAME = "transfer-bridge" + + +class _AudioForwarder(FrameProcessor): + """Converts incoming InputAudioRawFrame to OutputAudioRawFrame so it can + be sent out on a different transport's output side. Drops everything else + (control frames, transcription frames) so the bridge stays pure-audio. + """ + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, InputAudioRawFrame): + await self.push_frame( + OutputAudioRawFrame( + audio=frame.audio, + sample_rate=frame.sample_rate, + num_channels=frame.num_channels, + ), + direction, + ) + elif isinstance(frame, EndFrame): + await self.push_frame(frame, direction) + + +def _build_telephony_serializer(provider: str, stream_id: str, call_id: str): + """Pipecat serializer for the dialed agent leg.""" + if provider == "plivo": + return PlivoFrameSerializer( + stream_id=stream_id, + call_id=call_id, + auth_id=PLIVO_AUTH_ID, + auth_token=PLIVO_AUTH_TOKEN, + ) + raise NotImplementedError( + f"Bridge serializer not implemented for provider '{provider}'. " + "V1 supports Plivo only." + ) + + +async def run_bridge( + websocket: WebSocket, + provider: str, + stream_id: str, + call_id: str, + room_url: str, + daily_token: str, +) -> None: + """Build and run the bridge pipeline until either leg disconnects. + + Updates the Redis bridge flag as the bridge transitions through joined → + disconnected (or failed) so the AI bot's wait loop can react. + + Args: + websocket: the accepted FastAPI WebSocket from the telephony provider. + provider: lowercase provider name ("plivo"). + stream_id: provider stream id parsed from the WS handshake. + call_id: provider call sid (= bridge flag key). + room_url: Daily room URL the AI bot is in. + daily_token: fresh owner token minted by the Daily handler. + """ + serializer = _build_telephony_serializer(provider, stream_id, call_id) + + telephony_transport = FastAPIWebsocketTransport( + websocket=websocket, + params=FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + audio_in_sample_rate=TELEPHONY_SAMPLE_RATE, + audio_out_sample_rate=TELEPHONY_SAMPLE_RATE, + add_wav_header=False, + serializer=serializer, + ), + ) + + daily_transport = DailyTransport( + room_url=room_url, + token=daily_token, + bot_name=BRIDGE_BOT_NAME, + params=DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + audio_in_sample_rate=DAILY_BRIDGE_SAMPLE_RATE, + audio_out_sample_rate=DAILY_BRIDGE_SAMPLE_RATE, + ), + ) + + # Two unidirectional pipelines so both transports can be hooked up at once. + # tel → daily and daily → tel each have their own forwarder so the + # InputAudioRawFrame produced by the input side is converted into the + # OutputAudioRawFrame the output side expects. + tel_to_daily = Pipeline( + [ + telephony_transport.input(), + _AudioForwarder(), + daily_transport.output(), + ] + ) + daily_to_tel = Pipeline( + [ + daily_transport.input(), + _AudioForwarder(), + telephony_transport.output(), + ] + ) + + tel_to_daily_task = PipelineTask(tel_to_daily) + daily_to_tel_task = PipelineTask(daily_to_tel) + + runner = PipelineRunner(handle_sigint=False) + + # Mark the bridge as joined immediately *before* the pipelines start + # running. The AI bot is polling for this status and can begin tearing + # down the moment the bridge is ready to relay audio. If the runner then + # fails, we'll flip the flag to disconnected/failed below. + await update_bridge_status(call_id, STATUS_JOINED) + logger.info(f"[BridgeRun] Starting bridge for call {call_id}, room {room_url}") + + try: + await asyncio.gather( + runner.run(tel_to_daily_task), + runner.run(daily_to_tel_task), + ) + except Exception as e: + logger.error( + f"[BridgeRun] Bridge runtime exception for call {call_id}: {e}", + exc_info=True, + ) + await update_bridge_status(call_id, STATUS_FAILED, failure_reason=str(e)) + raise + finally: + # Either side closing trips the runner. Mark as disconnected so any + # waiters know it's over, then clear the flag. + await update_bridge_status(call_id, STATUS_DISCONNECTED) + await clear_bridge_flag(call_id) + logger.info(f"[BridgeRun] Bridge ended for call {call_id}") + + +__all__ = ["run_bridge"] + + +# Keep imports referenced for static checkers when V1 only uses some symbols. +_ = (Optional,) diff --git a/app/ai/voice/agents/breeze_buddy/template/context.py b/app/ai/voice/agents/breeze_buddy/template/context.py index b8bd1fad6..df95e7c58 100644 --- a/app/ai/voice/agents/breeze_buddy/template/context.py +++ b/app/ai/voice/agents/breeze_buddy/template/context.py @@ -138,6 +138,11 @@ def telephony_service(self): """Get telephony service instance (TwilioProvider)""" return getattr(self.bot, "telephony_service", None) + @property + def room_url(self): + """Daily room URL (only set in Daily mode).""" + return getattr(self.bot, "room_url", None) + @property def end_conversation_callbacks(self): """Get end conversation callbacks""" diff --git a/app/ai/voice/agents/breeze_buddy/utils/bridge_flag.py b/app/ai/voice/agents/breeze_buddy/utils/bridge_flag.py new file mode 100644 index 000000000..cf92aa4ac --- /dev/null +++ b/app/ai/voice/agents/breeze_buddy/utils/bridge_flag.py @@ -0,0 +1,113 @@ +""" +Redis state for the Daily warm-transfer telephony bridge. + +The AI bot (running in a Daily room) writes a bridge flag keyed by the dialed +agent leg's call_sid. When the provider's answer webhook fires for that leg, +the dispatcher looks up the flag and routes the WebSocket to the bridge +handler instead of the standard AI bot path. The handler updates the flag +status as it joins / fails so the AI bot can decide whether to hand off or +inform the LLM that the transfer failed. +""" + +import json +import time +from typing import Any, Dict, Optional + +from app.core.logger import logger +from app.services.redis.client import get_redis_service + +BRIDGE_FLAG_PREFIX = "bridge:" +BRIDGE_FLAG_TTL_SECONDS = 7200 # 2h, matches transfer flag + +STATUS_DIALING = "dialing" +STATUS_JOINED = "joined" +STATUS_FAILED = "failed" +STATUS_DISCONNECTED = "disconnected" + + +def _key(call_sid: str) -> str: + return f"{BRIDGE_FLAG_PREFIX}{call_sid}" + + +async def set_bridge_flag( + call_sid: str, + room_url: str, + room_name: str, + lead_id: str, + provider: str, + outbound_number: str, + agent_phone: str, + daily_token: str, + ttl_seconds: int = BRIDGE_FLAG_TTL_SECONDS, +) -> bool: + """Write the initial bridge flag in `dialing` status.""" + payload = { + "room_url": room_url, + "room_name": room_name, + "lead_id": lead_id, + "provider": provider, + "outbound_number": outbound_number, + "agent_phone": agent_phone, + "daily_token": daily_token, + "status": STATUS_DIALING, + "failure_reason": None, + "created_at": time.time(), + } + redis = await get_redis_service() + ok = await redis.setex(_key(call_sid), json.dumps(payload), ttl_seconds) + if ok: + logger.info(f"[BRIDGE REDIS] Set flag for call {call_sid} (room={room_name})") + else: + logger.error(f"[BRIDGE REDIS] Failed to set flag for call {call_sid}") + return ok + + +async def get_bridge_flag(call_sid: str) -> Optional[Dict[str, Any]]: + """Return bridge flag payload or None.""" + redis = await get_redis_service() + raw = await redis.get(_key(call_sid)) + if not raw: + return None + try: + return json.loads(raw) + except json.JSONDecodeError as e: + logger.error(f"[BRIDGE REDIS] Invalid JSON for call {call_sid}: {e}") + return None + + +async def update_bridge_status( + call_sid: str, + status: str, + failure_reason: Optional[str] = None, +) -> bool: + """Update status (and optional failure_reason) on an existing flag. + + Read-modify-write — race tolerable because only the bridge handler and + failure paths write status, and they don't run concurrently for the same + call_sid. + """ + flag = await get_bridge_flag(call_sid) + if not flag: + logger.warning( + f"[BRIDGE REDIS] update_bridge_status: no flag for call {call_sid}" + ) + return False + + flag["status"] = status + if failure_reason is not None: + flag["failure_reason"] = failure_reason + + redis = await get_redis_service() + ok = await redis.setex(_key(call_sid), json.dumps(flag), BRIDGE_FLAG_TTL_SECONDS) + if ok: + logger.info( + f"[BRIDGE REDIS] Updated call {call_sid} status={status} " + f"reason={failure_reason}" + ) + return ok + + +async def clear_bridge_flag(call_sid: str) -> bool: + """Delete the bridge flag.""" + redis = await get_redis_service() + return await redis.delete(_key(call_sid)) diff --git a/app/api/routers/breeze_buddy/telephony/__init__.py b/app/api/routers/breeze_buddy/telephony/__init__.py index 446dbbf0c..622936a47 100644 --- a/app/api/routers/breeze_buddy/telephony/__init__.py +++ b/app/api/routers/breeze_buddy/telephony/__init__.py @@ -11,6 +11,7 @@ from fastapi import APIRouter from .answer import router as answer_router +from .bridge import router as bridge_router from .callbacks import router as callbacks_router router = APIRouter() @@ -20,3 +21,6 @@ # Include answer router (unified call answering endpoint) router.include_router(answer_router, prefix="", tags=["telephony-answer"]) + +# Include bridge router (Daily warm-transfer agent leg WebSocket) +router.include_router(bridge_router, prefix="", tags=["telephony-bridge"]) diff --git a/app/api/routers/breeze_buddy/telephony/answer/handlers.py b/app/api/routers/breeze_buddy/telephony/answer/handlers.py index b575afa4c..1237f2355 100644 --- a/app/api/routers/breeze_buddy/telephony/answer/handlers.py +++ b/app/api/routers/breeze_buddy/telephony/answer/handlers.py @@ -53,6 +53,7 @@ start_call_recording, ) from app.ai.voice.agents.breeze_buddy.template.types import TTSConfig +from app.ai.voice.agents.breeze_buddy.utils.bridge_flag import get_bridge_flag from app.core.config.dynamic import ( BB_NOISE_CANCELLATION_ENABLED, BB_NOISE_CANCELLATION_LEVEL, @@ -532,6 +533,21 @@ async def handle_provider_answer(request: Request, provider: str) -> Response: logger.error(f"[{tag}] Missing call ID") return _error_response(provider, "Missing call identifier", 400) + # Daily warm-transfer bridge: when the AI bot dials a human agent, it + # writes a bridge flag keyed by the agent leg's call_sid. Detect that + # here and route the WS to the bridge endpoint instead of the AI bot. + # Skips pod allocation, IVR, inbound policy — none apply to the bridge. + bridge_flag = await get_bridge_flag(call_id) + if bridge_flag: + ws_base = APP_BASE_URL.replace("https://", "wss://").replace("http://", "ws://") + bridge_ws_url = f"{ws_base}/agent/voice/breeze-buddy/{provider}/bridge/v2" + logger.info( + f"[{tag}] Bridge flag found for {call_id}; routing WS to {bridge_ws_url}" + ) + if provider == "exotel": + return _build_json_response(bridge_ws_url) + return await _build_xml_response(bridge_ws_url) + # Plivo-specific: start recording if provider == "plivo": try: diff --git a/app/api/routers/breeze_buddy/telephony/bridge.py b/app/api/routers/breeze_buddy/telephony/bridge.py new file mode 100644 index 000000000..6776115b0 --- /dev/null +++ b/app/api/routers/breeze_buddy/telephony/bridge.py @@ -0,0 +1,111 @@ +""" +WebSocket endpoint for the Daily warm-transfer bridge. + +When a Daily-mode lead transfers to a human agent, the AI bot dials the +agent through the lead's telephony provider and writes a Redis bridge flag +keyed by the agent leg's call_sid. The provider's answer webhook (see +``answer/handlers.py``) detects that flag and points the provider's +```` here instead of the standard AI bot WebSocket. + +This endpoint accepts the WebSocket, identifies the call, looks up the +flag, and runs an in-process audio bridge between the telephony WS and the +Daily room. No AI bot is started for this leg. +""" + +from fastapi import APIRouter, WebSocket +from pipecat.runner.utils import parse_telephony_websocket +from starlette.websockets import WebSocketDisconnect + +from app.ai.voice.agents.breeze_buddy.services.daily.transfer_bridge import ( + run_bridge, +) +from app.ai.voice.agents.breeze_buddy.utils.bridge_flag import ( + STATUS_FAILED, + get_bridge_flag, + update_bridge_status, +) +from app.core.logger import logger + +router = APIRouter() + + +@router.websocket("/{service_provider}/bridge/v2") +async def telephony_bridge_handler(service_provider: str, websocket: WebSocket): + """Run the Daily warm-transfer audio bridge for the dialed agent leg.""" + provider_name = service_provider.lower() + logger.info(f"[Bridge WS] Connection received for provider {provider_name}") + + await websocket.accept() + + try: + transport_type, call_data = await parse_telephony_websocket(websocket) + except Exception as e: + logger.error(f"[Bridge WS] parse_telephony_websocket failed: {e}") + await websocket.close(code=4000, reason="Bridge: cannot parse stream") + return + + call_id = call_data.get("call_id") + stream_id = call_data.get("stream_id") + if not call_id or not stream_id: + logger.error(f"[Bridge WS] Missing call_id/stream_id in handshake: {call_data}") + await websocket.close(code=4000, reason="Bridge: missing call identifiers") + return + + flag = await get_bridge_flag(call_id) + if not flag: + logger.error(f"[Bridge WS] No bridge flag for call {call_id}; rejecting") + await websocket.close(code=4004, reason="Bridge: no flag for call") + return + + expected_provider = flag.get("provider") + if expected_provider and expected_provider != provider_name: + logger.error( + f"[Bridge WS] Provider mismatch for call {call_id}: " + f"flag={expected_provider} ws={provider_name}" + ) + await update_bridge_status( + call_id, STATUS_FAILED, failure_reason="provider_mismatch" + ) + await websocket.close(code=4003, reason="Bridge: provider mismatch") + return + + room_url = flag.get("room_url") + daily_token = flag.get("daily_token") + if not room_url or not daily_token: + logger.error(f"[Bridge WS] Flag for {call_id} missing room_url/daily_token") + await update_bridge_status( + call_id, STATUS_FAILED, failure_reason="missing_daily_credentials" + ) + await websocket.close(code=4000, reason="Bridge: missing Daily credentials") + return + + logger.info( + f"[Bridge WS] Starting bridge for call {call_id} " + f"(stream={stream_id}, room={room_url})" + ) + + try: + await run_bridge( + websocket=websocket, + provider=provider_name, + stream_id=stream_id, + call_id=call_id, + room_url=room_url, + daily_token=daily_token, + ) + except WebSocketDisconnect: + logger.info(f"[Bridge WS] Client disconnected for call {call_id}") + except Exception as e: + logger.error( + f"[Bridge WS] Bridge run failed for call {call_id}: {e}", + exc_info=True, + ) + try: + if websocket.client_state.name != "DISCONNECTED": + await websocket.close(code=1011, reason="Bridge: internal error") + except Exception as close_err: + logger.warning( + f"[Bridge WS] Could not close socket for {call_id}: {close_err}" + ) + finally: + logger.info(f"[Bridge WS] Connection closed for call {call_id}") diff --git a/docs/breeze_buddy/daily_warm_transfer.md b/docs/breeze_buddy/daily_warm_transfer.md new file mode 100644 index 000000000..e8c1804a8 --- /dev/null +++ b/docs/breeze_buddy/daily_warm_transfer.md @@ -0,0 +1,135 @@ +# Daily Warm Transfer (Telephony Bridge) + +Design doc for warm-transferring a Daily (web/mobile) call to a human agent reachable only by phone (PSTN). + +## Status + +Design — not yet implemented. + +## Problem + +Today, `connect_to_live_agent` ([handlers/internal/warm_transfer.py](../../app/ai/voice/agents/breeze_buddy/handlers/internal/warm_transfer.py)) only works when the original customer call is itself a telephony call (Twilio / Plivo / Exotel). It uses each provider's conference / native-transfer API to bridge the customer's existing phone leg with a freshly-dialed agent leg. + +For Daily-mode leads (`ExecutionMode.DAILY*`), the customer is in a WebRTC room — there is no phone leg to bridge into. The function silently has no path for them. + +## Goal + +When the LLM in a Daily call invokes `connect_to_live_agent`: + +1. Dial the configured human agent on PSTN via the lead's existing telephony provider. +2. Bridge that phone leg into the Daily room so customer (in Daily) and agent (on phone) talk live. +3. AI bot leaves the room as soon as the bridge is live. No whisper / coaching. + +## Why a bridge bot, not Daily PSTN dial-out + +Daily.co supports native PSTN dial-out (`dialOut` API), which would add the agent's phone as a Daily participant directly with no bridge. We are **not** taking that path because: + +- Reuses existing Twilio / Plivo / Exotel outbound infrastructure, billing, outbound-number selection per reseller. +- Keeps `lead.metaData.transfer` payload shape identical to the telephony warm-transfer flow → analytics queries don't fork. +- No dependency on Daily PSTN credits / SIP trunk provisioning. + +The trade-off: we own the audio resampling and one extra Pipecat task per transfer. + +## Why in-process, not subprocess + +Originally planned as a subprocess (mirroring the AI bot's process model). Pivoted to **in-process**: the provider's WebSocket lands in the FastAPI server, so handling it in a new WS endpoint is direct. A subprocess would require shuttling audio frames over IPC (Redis pub-sub or unix socket) — adds latency, complexity, and a new failure surface for marginal isolation benefit. Crash isolation is per-handler: an exception in one bridge only kills that one transfer. + +## Architecture + +The bridge runs inside the FastAPI server as a WebSocket handler. When the dialed agent picks up, the telephony provider opens a WS to `/agent/voice/breeze-buddy/{provider}/bridge/ws?call_sid=…`. That handler reads the Redis bridge flag, joins the existing Daily room as a Pipecat client, and runs a forwarding pipeline: telephony WS frames → resample/encode → Daily; Daily frames → resample/encode → telephony WS. Two stateful frame processors hold the `audioop.ratecv` state per direction. + +## Sequence + +| Step | Actor | Action | +|------|-------|--------| +| 1 | LLM (in AI Bot) | Calls `connect_to_live_agent` function | +| 2 | AI Bot | Resolves `outbound_number` from lead, picks provider | +| 3 | AI Bot | `set_bridge_flag(agent_call_sid_placeholder, room_url, room_name, lead_id, provider)` in Redis | +| 4 | AI Bot | `provider.make_call(agent_phone, outbound_number)` → returns `agent_call_sid` | +| 5 | AI Bot | Updates Redis flag key from placeholder to real `agent_call_sid` | +| 6 | AI Bot | Waits (with 30s timeout) for `bridge:{agent_call_sid}.status == "joined"` | +| 7 | Provider | Phone rings; agent picks up; provider POSTs answer webhook | +| 8 | Webhook dispatcher | Reads `bridge:{call_sid}` flag → returns `` | +| 9 | Bridge WS handler | Provider opens WS; handler joins Daily room R with bot token, builds forwarding pipeline | +| 10 | Bridge WS handler | Sets `bridge:{call_sid}.status = "joined"` | +| 11 | AI Bot | Sees joined status → `end_conversation`, leaves Daily room | +| 12 | Bridge WS handler | Pumps audio in both directions until either leg ends | +| 13 | Bridge WS handler | On either leg ending: hangup the other; `update_lead_call_completion_details` + write `lead.metaData.transfer` | + +## Audio Bridging + +| Direction | Source | Resample | Encode | Sink | +|-----------|--------|----------|--------|------| +| Customer → Agent | Daily 24kHz PCM | `audioop.ratecv 24000 → 8000` (stateful) | `lin2ulaw` | Telephony WS μ-law | +| Agent → Customer | Telephony WS μ-law | `ulaw2lin` | `audioop.ratecv 8000 → 24000` (stateful) | Daily 24kHz PCM | + +**State-keeping is critical**: `audioop.ratecv` returns a state tuple that must be passed back into the next call to avoid resampling clicks/discontinuities. Two `FrameProcessor` subclasses, one per direction, each holding their own `state` attribute. + +Primitives already exist in [utils/common.py:70-112](../../app/ai/voice/agents/breeze_buddy/utils/common.py#L70-L112) (used today for non-realtime greeting audio); we wrap them in stateful realtime processors. + +## Redis Bridge Flag + +Key: `bridge:{call_sid}` (the *agent leg's* call_sid, set after `make_call` returns) + +```json +{ + "room_url": "https://breezebuddy.daily.co/abc123", + "room_name": "abc123", + "lead_id": "lead_xyz", + "provider": "twilio", + "outbound_number": "+1...", + "agent_phone": "+91...", + "status": "dialing | joined | failed", + "failure_reason": "no_answer | busy | timeout | crash | null", + "created_at": 1736300000.0 +} +``` + +TTL: 2h (matches existing `transfer:{call_sid}` flag). + +## Provider differences + +The dispatcher-detect approach (preferred — see plan) means **no provider API changes**: each provider's existing inbound webhook entrypoint already handles `` for AI calls; we add a flag check at the top to switch to the bridge stream URL when present. + +| Provider | Webhook entry | Notes | +|----------|---------------|-------| +| Twilio | `POST /v2/twilio/callback/{template}` | Returns TwiML `` | +| Plivo | `POST /v2/plivo/answer/{template}` | Returns Plivo XML `` | +| Exotel | Applet-driven webhook | Stream URL returned via applet response | + +The bridge WebSocket endpoint must speak each provider's wire format (Twilio binary frame format vs Plivo vs Exotel) — Pipecat's telephony transports already abstract this. The bridge handler picks the transport based on the `{provider}` path parameter. + +## Failure Modes + +| Scenario | Detection | Recovery | +|----------|-----------|----------| +| Agent doesn't answer (no_answer / busy / failed) | Provider status callback fires before `bridge.status = joined` | Set `bridge.status = failed`, `failure_reason`. AI bot poll/sub sees this → tells LLM transfer failed → conversation continues | +| Answer timeout (30s) | AI bot wait expires | Provider hangup API on `agent_call_sid`; flag → failed; LLM informed | +| Customer leaves Daily mid-dial | Daily participant-left event in bridge handler | AI bot cancels outbound via provider hangup; bridge exits cleanly | +| Bridge handler exception pre-join | `bridge.status` never flips to joined; AI bot timeout | Same as answer timeout | +| Telephony WS drops mid-call | WS disconnect event in handler | Tear down Daily side; do **not** reconnect (PSTN won't reconnect cleanly); write transfer status `disconnected` | +| Agent hangs up normally | Telephony provider status callback `completed` | Bridge handler leaves Daily; lead marked complete | + +## What gets shipped + +See plan: `/Users/amreet.khuntia/.claude/plans/no-i-want-a-parsed-walrus.md`. + +New code: + +- `app/ai/voice/agents/breeze_buddy/utils/bridge_flag.py` — Redis helpers for `bridge:{call_sid}`. +- `app/ai/voice/agents/breeze_buddy/handlers/internal/daily_warm_transfer.py` — handler invoked by `connect_to_live_agent` for Daily mode. +- `app/ai/voice/agents/breeze_buddy/services/daily/transfer_bridge.py` — async pipeline builder + run loop. Pure function called from the WS handler. +- `app/api/routers/breeze_buddy/telephony/bridge.py` — new WebSocket endpoint `/{provider}/bridge/ws` that the dialed agent leg connects to. + +Modified: + +- [app/ai/voice/agents/breeze_buddy/handlers/internal/warm_transfer.py](../../app/ai/voice/agents/breeze_buddy/handlers/internal/warm_transfer.py) — branch on `execution_mode` at the top of `connect_to_live_agent`. +- [app/api/routers/breeze_buddy/telephony/answer/handlers.py](../../app/api/routers/breeze_buddy/telephony/answer/handlers.py) — bridge-flag check at the top of `handle_provider_answer`; if set, return WS URL pointing at the bridge endpoint. +- [app/api/routers/breeze_buddy/telephony/__init__.py](../../app/api/routers/breeze_buddy/telephony/__init__.py) — register bridge router. + +## Out of Scope + +- Whisper / coaching audio to agent before bridging in customer. +- AI bot staying in room as silent transcription observer. +- Per-leg recordings (Daily room cloud recording captures the full bridged audio). +- Native Daily PSTN dial-out — alternative architecture, deferred.