Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions app/ai/voice/agents/breeze_buddy/utils/pipecat_log_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Loguru sink filter for known-harmless Pipecat transport log messages.

Pipecat's FastAPIWebsocketClient.disconnect() calls ws.close() during pipeline
teardown (triggered by EndFrame). When the remote client has already closed the
connection first, Starlette raises:

RuntimeError: Cannot call "send" once a close message has been sent.

Pipecat catches this and logs it at ERROR level from its own loguru logger.
This is a transport-layer race that is unreachable from application code —
the EndFrame teardown path (our shutdown) is identical in both the happy path
and the client-disconnect path. The error has zero impact on call flow, data
integrity, or cleanup.

The application's core logger (app/core/logger/__init__.py) integrates a check
in filter_spam_logs() that drops this specific record from all sinks.

This module provides the install sentinel and the string constants used by that
integration.
Comment on lines +15 to +19

Reference: pipecat v1.1.0 src/pipecat/transports/websocket/fastapi.py
async def disconnect(self):
...
try:
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception while closing the websocket: {e}")

Usage:
Call `install_pipecat_log_filter()` once at application startup,
before the first pipeline runs:

from app.ai.voice.agents.breeze_buddy.utils.pipecat_log_filter import (
install_pipecat_log_filter,
)
install_pipecat_log_filter()
"""

from loguru import logger

# The exact substrings Pipecat logs in FastAPIWebsocketClient.disconnect()
# Source: pipecat v1.1.0 src/pipecat/transports/websocket/fastapi.py
PIPECAT_WS_CLOSE_MARKER = "exception while closing the websocket"
STARLETTE_WS_CLOSE_ERROR = 'Cannot call "send" once a close message has been sent'

# Module-level sentinel for idempotency
_FILTER_INSTALLED = False


def install_pipecat_log_filter() -> None:
"""Activate the Pipecat WS close error suppression filter.

Safe to call multiple times — idempotent via module-level sentinel.

The filter is integrated into the application's filter_spam_logs()
in app/core/logger/__init__.py and suppresses the record from all
configured log sinks.

Call once at application startup before any pipeline runs.
"""
global _FILTER_INSTALLED
if _FILTER_INSTALLED:
return
_FILTER_INSTALLED = True
logger.debug(
"[pipecat_log_filter] Pipecat WS close error suppression filter active"
)
13 changes: 13 additions & 0 deletions app/core/logger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ def filter_spam_logs(record):

logger_name = record["name"]
message = record["message"]

# Suppress known-harmless Pipecat WS close error (transport-layer race on disconnect).
# Pipecat's FastAPIWebsocketClient.disconnect() calls ws.close() after EndFrame; when
# the remote client has already closed first, Starlette raises RuntimeError which Pipecat
# catches and logs at ERROR. Zero impact on call flow or data integrity.
# See: app/ai/voice/agents/breeze_buddy/utils/pipecat_log_filter.py
if (
logger_name.startswith("pipecat")
and "exception while closing the websocket" in message
and 'Cannot call "send" once a close message has been sent' in message
):
Comment on lines +119 to +123
Comment on lines +119 to +123
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Import and use constants from pipecat_log_filter.py to avoid string duplication.

The error message substrings are hardcoded here, but pipecat_log_filter.py defines PIPECAT_WS_CLOSE_MARKER and STARLETTE_WS_CLOSE_ERROR constants with identical values. Importing these constants ensures a single source of truth and prevents drift if the error messages need updating.

♻️ Refactor to use shared constants

At the top of the file, add the import:

 from app.core.config.static import ENVIRONMENT, PROD_LOG_LEVEL
 from app.core.logger.context import get_log_context
+from app.ai.voice.agents.breeze_buddy.utils.pipecat_log_filter import (
+    PIPECAT_WS_CLOSE_MARKER,
+    STARLETTE_WS_CLOSE_ERROR,
+)

Then update the filter condition:

         if (
             logger_name.startswith("pipecat")
-            and "exception while closing the websocket" in message
-            and 'Cannot call "send" once a close message has been sent' in message
+            and PIPECAT_WS_CLOSE_MARKER in message
+            and STARLETTE_WS_CLOSE_ERROR in message
         ):
             return False
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/core/logger/__init__.py` around lines 119 - 123, Replace the hardcoded
substrings in the websocket-close filter with the shared constants from
pipecat_log_filter.py: add an import for PIPECAT_WS_CLOSE_MARKER and
STARLETTE_WS_CLOSE_ERROR at the top of app/core/logger/__init__.py, then update
the conditional inside the if block to use PIPECAT_WS_CLOSE_MARKER and
STARLETTE_WS_CLOSE_ERROR instead of the literal strings (keeping the existing
logger_name.startswith("pipecat") check); this centralizes the message
definitions and prevents duplication/drift.

return False

return not (
logger_name.startswith("websockets")
or logger_name.startswith("daily_core")
Expand Down
7 changes: 7 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from app.ai.voice.agents.breeze_buddy.services.agent_router.client import (
close_smart_router_client,
)
from app.ai.voice.agents.breeze_buddy.utils.pipecat_log_filter import (
install_pipecat_log_filter,
)

# Database imports
from app.ai.voice.llm._pools import close_all_pools as close_llm_http_pools
Expand Down Expand Up @@ -98,6 +101,10 @@ async def lifespan(_app: FastAPI):
"""FastAPI lifespan manager that handles startup and shutdown tasks."""
logger.info("Application startup...")

# Suppress known-harmless Pipecat WS close error on client-initiated disconnect.
# Must be called before any pipeline runs.
install_pipecat_log_filter()
Comment on lines +104 to +106

# Initialize database and create tables if needed
try:
await init_db_pool()
Expand Down
Loading