From 0c702d441c72fe0ae5c27327a332fd3554f67e11 Mon Sep 17 00:00:00 2001 From: Anirudh Reddy Malgari Date: Thu, 23 Apr 2026 17:34:22 -0700 Subject: [PATCH 1/3] loads mcp app for get_context --- backend/src/analytics_agent/agent/graph.py | 15 +- .../src/analytics_agent/agent/mcp_app_tool.py | 37 ++ .../src/analytics_agent/agent/streaming.py | 61 +- backend/src/analytics_agent/api/__init__.py | 3 +- backend/src/analytics_agent/api/chat.py | 68 ++- backend/src/analytics_agent/api/mcp_apps.py | 279 +++++++++ .../analytics_agent/context/mcp_platform.py | 8 + .../analytics_agent/context/mcp_resources.py | 240 ++++++++ backend/src/analytics_agent/context/mcp_ui.py | 220 +++++++ backend/src/analytics_agent/db/repository.py | 3 + .../src/analytics_agent/engines/mcp/engine.py | 9 + backend/src/analytics_agent/prompts/system.py | 7 +- .../analytics_agent/prompts/system_prompt.md | 4 +- frontend/src/api/mcpApp.ts | 62 ++ frontend/src/api/stream.ts | 6 +- frontend/src/components/Chat/ChatView.tsx | 1 + frontend/src/components/Chat/MessageList.tsx | 85 +-- .../Chat/messages/AgentWorkBlock.tsx | 11 + .../Chat/messages/MCPAppMessage.tsx | 123 ++++ .../Chat/messages/SelectionChip.tsx | 39 ++ frontend/src/lib/buildUiMessages.ts | 1 + frontend/src/lib/groupMessages.ts | 2 +- frontend/src/lib/mcpApps/MCPAppFrame.tsx | 137 +++++ frontend/src/lib/mcpApps/protocol.ts | 184 ++++++ frontend/src/lib/mcpApps/useMcpAppBridge.ts | 542 ++++++++++++++++++ frontend/src/types/index.ts | 18 + .../unit/test_business_context_suppression.py | 131 +++++ 27 files changed, 2245 insertions(+), 51 deletions(-) create mode 100644 backend/src/analytics_agent/agent/mcp_app_tool.py create mode 100644 backend/src/analytics_agent/api/mcp_apps.py create mode 100644 backend/src/analytics_agent/context/mcp_resources.py create mode 100644 backend/src/analytics_agent/context/mcp_ui.py create mode 100644 frontend/src/api/mcpApp.ts create mode 100644 frontend/src/components/Chat/messages/MCPAppMessage.tsx create mode 100644 frontend/src/components/Chat/messages/SelectionChip.tsx create mode 100644 frontend/src/lib/mcpApps/MCPAppFrame.tsx create mode 100644 frontend/src/lib/mcpApps/protocol.ts create mode 100644 frontend/src/lib/mcpApps/useMcpAppBridge.ts create mode 100644 tests/unit/test_business_context_suppression.py diff --git a/backend/src/analytics_agent/agent/graph.py b/backend/src/analytics_agent/agent/graph.py index 3f48ca1..55e40a1 100644 --- a/backend/src/analytics_agent/agent/graph.py +++ b/backend/src/analytics_agent/agent/graph.py @@ -43,6 +43,7 @@ def build_graph( enabled_mutations: set[str] | None = None, context_tools: list | None = None, # pre-built from DB context platforms at request time engine_tools: list | None = None, # pre-built for MCP data sources (bypasses QueryEngine) + suppress_business_context_skill: bool = False, ): from analytics_agent.agent.chart_generator import chart_node from analytics_agent.engines.factory import get_registry @@ -64,7 +65,10 @@ def build_graph( # Always-on skills (context search etc.) + opt-in write-back skills from analytics_agent.skills.loader import build_always_on_skill_tools, build_skill_tools - skill_tools = build_always_on_skill_tools() + build_skill_tools(enabled_mutations or set()) + always_on_skills = build_always_on_skill_tools() + if suppress_business_context_skill: + always_on_skills = [t for t in always_on_skills if t.name != "search_business_context"] + skill_tools = always_on_skills + build_skill_tools(enabled_mutations or set()) # Engine tools — MCP data sources supply pre-built tools; native engines use QueryEngine if engine_tools is not None: @@ -87,12 +91,17 @@ def build_graph( ) system_prompt = system_prompt_override.format(engine_name=engine_name) - system_prompt += get_search_business_context_section() + if not suppress_business_context_skill: + system_prompt += get_search_business_context_section() system_prompt += get_improve_context_prompt_section() if enabled_mutations: system_prompt += get_skill_system_prompt_section(enabled_mutations) else: - system_prompt = build_system_prompt(engine_name, enabled_skills=enabled_mutations) + system_prompt = build_system_prompt( + engine_name, + enabled_skills=enabled_mutations, + include_business_context=not suppress_business_context_skill, + ) # Enable per-tool error handling so validation errors (e.g. hallucinated # arguments like filter= on get_entities) are returned as tool messages diff --git a/backend/src/analytics_agent/agent/mcp_app_tool.py b/backend/src/analytics_agent/agent/mcp_app_tool.py new file mode 100644 index 0000000..d18833a --- /dev/null +++ b/backend/src/analytics_agent/agent/mcp_app_tool.py @@ -0,0 +1,37 @@ +"""Side-channel for MCP App tool results. + +Mirrors the _pending_charts pattern in chart_tool.py: +- The wrapped tool returns a short marker string (MCP_APP_READY:). +- The actual structured payload lives here, keyed by app_id. +- streaming.py pops from this dict when it sees the marker in on_tool_end. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class PendingApp: + app_id: str + connection_key: str + server_name: str + tool_name: str + tool_input: dict + # Structured CallToolResult content (list of content blocks), preserved so + # the frontend can forward it verbatim as `ui/notifications/tool-result` + # params per the MCP Apps spec. + tool_result: Any + resource_uri: str + csp: str | None = None + permissions: list[str] = field(default_factory=list) + # Tool names scoped to this app's connection that the iframe is allowed to + # call via the Phase 2 tool-proxy endpoint. Populated at wrap time from the + # full tool list for the originating connection_key. Persisted in the MCP_APP + # SSE payload so the endpoint can rehydrate it from the DB row. + allowed_tools: list[str] = field(default_factory=list) + + +# Keyed by app_id; popped once streaming.py emits the MCP_APP SSE event. +_pending_apps: dict[str, PendingApp] = {} diff --git a/backend/src/analytics_agent/agent/streaming.py b/backend/src/analytics_agent/agent/streaming.py index da01197..d2ff6c6 100644 --- a/backend/src/analytics_agent/agent/streaming.py +++ b/backend/src/analytics_agent/agent/streaming.py @@ -68,6 +68,14 @@ async def stream_graph_events( final_state: dict[str, Any] = {} chart_emitted = False # guard against double-emitting CHART + # Track active tool runs by run_id -> tool_name so we can detect when a + # wrapped tool (e.g. MCP UI wrapper) invokes an inner tool of the same name. + # LangGraph v2 fires on_tool_start/on_tool_end for both the outer StructuredTool + # and the inner delegated tool, which would otherwise double-record TOOL_CALL + # and emit a redundant TOOL_RESULT alongside the MCP_APP bubble. + active_tool_runs: dict[str, str] = {} + suppressed_tool_runs: set[str] = set() + try: from analytics_agent.config import settings as _settings @@ -78,6 +86,7 @@ async def stream_graph_events( data: dict[str, Any] = event.get("data", {}) name: str = event.get("name", "") run_id: str = event.get("run_id", "") + parent_ids: list[str] = event.get("parent_ids", []) or [] node: str = event.get("metadata", {}).get("langgraph_node", "") # ── TEXT ── @@ -88,12 +97,16 @@ async def stream_graph_events( if getattr(chunk, "tool_call_chunks", None): continue content = chunk.content if hasattr(chunk, "content") else "" + # Each chunk gets its own unique message_id. All other event + # types already do this (uuid4). Sharing run_id across chunks + # collides on the Message.id primary key in persistence and + # silently drops every chunk after the first (see chat.py). if isinstance(content, str) and content: final_text_parts.append(content) yield { "event": "TEXT", "conversation_id": conversation_id, - "message_id": run_id, + "message_id": str(uuid.uuid4()), "payload": {"text": content}, } elif isinstance(content, list): @@ -105,13 +118,19 @@ async def stream_graph_events( yield { "event": "TEXT", "conversation_id": conversation_id, - "message_id": run_id, + "message_id": str(uuid.uuid4()), "payload": {"text": text}, } # ── TOOL_CALL ── elif event_type == "on_tool_start": tool_input = data.get("input", {}) + # Suppress inner tool calls nested under a same-named outer tool + # (MCP UI wrapper pattern) so the bubble/history isn't duplicated. + if any(active_tool_runs.get(pid) == name for pid in parent_ids): + suppressed_tool_runs.add(run_id) + continue + active_tool_runs[run_id] = name if name == "execute_sql": pending_sql[run_id] = tool_input.get("sql", "") # create_chart renders as a CHART event — don't show a tool call bubble @@ -126,6 +145,11 @@ async def stream_graph_events( # ── TOOL_ERROR (unhandled exception from tool) ── elif event_type == "on_tool_error": + if run_id in suppressed_tool_runs: + suppressed_tool_runs.discard(run_id) + active_tool_runs.pop(run_id, None) + pending_sql.pop(run_id, None) + continue error_msg = str(data.get("error", "Tool failed")) yield { "event": "TOOL_RESULT", @@ -138,9 +162,16 @@ async def stream_graph_events( }, } pending_sql.pop(run_id, None) + active_tool_runs.pop(run_id, None) # ── SQL / TOOL_RESULT / CHART ── elif event_type == "on_tool_end": + if run_id in suppressed_tool_runs: + suppressed_tool_runs.discard(run_id) + active_tool_runs.pop(run_id, None) + pending_sql.pop(run_id, None) + continue + active_tool_runs.pop(run_id, None) output = data.get("output", "") if hasattr(output, "content"): output = output.content @@ -184,6 +215,32 @@ async def stream_graph_events( "is_error": True, }, } + elif output_str.startswith("MCP_APP_READY:"): + # Pop the PendingApp from the side-channel and emit an MCP_APP event. + # HTML is never included in the SSE payload — the frontend fetches + # it via GET .../mcp-app/{message_id}/ui (prefetch warms the cache). + app_id = output_str.split(":", 1)[1].split()[0].strip() + from analytics_agent.agent.mcp_app_tool import _pending_apps + + pending_app = _pending_apps.pop(app_id, None) + if pending_app: + yield { + "event": "MCP_APP", + "conversation_id": conversation_id, + "message_id": str(uuid.uuid4()), + "payload": { + "app_id": app_id, + "connection_key": pending_app.connection_key, + "server_name": pending_app.server_name, + "tool_name": pending_app.tool_name, + "tool_input": pending_app.tool_input, + "tool_result": pending_app.tool_result, + "resource_uri": pending_app.resource_uri, + "csp": pending_app.csp, + "permissions": pending_app.permissions, + "allowed_tools": pending_app.allowed_tools, + }, + } elif name == "create_chart": # Fetch chart spec from side-channel (tool returns only a short marker) if output_str.startswith("CHART_READY:"): diff --git a/backend/src/analytics_agent/api/__init__.py b/backend/src/analytics_agent/api/__init__.py index c96a525..fdd148f 100644 --- a/backend/src/analytics_agent/api/__init__.py +++ b/backend/src/analytics_agent/api/__init__.py @@ -2,11 +2,12 @@ from fastapi import APIRouter -from analytics_agent.api import chat, conversations, oauth, settings +from analytics_agent.api import chat, conversations, mcp_apps, oauth, settings api_router = APIRouter() api_router.include_router(conversations.router) api_router.include_router(chat.router) +api_router.include_router(mcp_apps.router) api_router.include_router(settings.router) api_router.include_router(oauth.router) diff --git a/backend/src/analytics_agent/api/chat.py b/backend/src/analytics_agent/api/chat.py index d4dad54..ace4c1f 100644 --- a/backend/src/analytics_agent/api/chat.py +++ b/backend/src/analytics_agent/api/chat.py @@ -74,6 +74,16 @@ async def _compute_quality_background(conv_id: str, factory) -> None: class ChatMessageRequest(BaseModel): text: str + # Phase 2: MCP App iframe-originated turns include these for auditing and + # frontend rendering (source="mcp_app" rows render as SelectionChip, not a + # user bubble; origin_message_id anchors the chip under its selector iframe). + source: str | None = None # e.g. "mcp_app" + app_id: str | None = None + origin_message_id: str | None = None + # Short human-readable label shown on the SelectionChip; `text` may be a + # richer agent-facing message (e.g. includes URN + instruction to prevent + # redundant disambiguation tool calls). + display_text: str | None = None async def _persist_message( @@ -83,9 +93,10 @@ async def _persist_message( role: str, payload: dict, sequence: int, + message_id: str | None = None, ) -> None: msg = Message( - id=str(uuid.uuid4()), + id=message_id or str(uuid.uuid4()), conversation_id=conversation_id, event_type=event_type, role=role, @@ -103,6 +114,10 @@ async def _run_and_broadcast( user_text: str, engine_name: str, keepalive_interval: int, + source: str | None = None, + app_id: str | None = None, + origin_message_id: str | None = None, + display_text: str | None = None, ) -> None: """ Background task: runs the full agent pipeline independently of the HTTP @@ -123,8 +138,17 @@ def _broadcast(evt: dict) -> None: msg_repo = MessageRepo(session) sequence = await msg_repo.next_sequence(conversation_id) + user_payload: dict = {"text": user_text} + if source: + user_payload["source"] = source + if app_id: + user_payload["app_id"] = app_id + if origin_message_id: + user_payload["origin_message_id"] = origin_message_id + if display_text: + user_payload["display_text"] = display_text await _persist_message( - session, conversation_id, "TEXT", "user", {"text": user_text}, sequence + session, conversation_id, "TEXT", "user", user_payload, sequence ) await session.commit() sequence += 1 @@ -238,6 +262,10 @@ def _broadcast(evt: dict) -> None: try: context_tools: list = [] include_mutations = bool(enabled_mutations) + suppress_business_context_skill = False + + # Each platform owns its disabled_tools and include_mutations state. + # build_platform() reads from DB config; get_tools() filters internally. for row in all_cp_rows: platform = build_platform( row, @@ -248,6 +276,12 @@ def _broadcast(evt: dict) -> None: continue tools = await platform.get_tools() context_tools.extend(tools) + # If a datahub-mcp server advertises `get_context`, it supersedes + # the packaged `search_business_context` skill. Check post-filter + # so an operator who explicitly disabled `get_context` keeps the + # packaged skill active. + if row.type == "datahub-mcp" and any(t.name == "get_context" for t in tools): + suppress_business_context_skill = True logger.info( "Total context_tools=%d for conversation %s", @@ -268,6 +302,7 @@ def _broadcast(evt: dict) -> None: enabled_mutations=enabled_mutations, context_tools=context_tools, engine_tools=engine_tools, + suppress_business_context_skill=suppress_business_context_skill, ) except Exception as exc: for _evt in cast( @@ -310,7 +345,7 @@ def _broadcast(evt: dict) -> None: history=history, ): if evt.get("event") not in (None, "KEEPALIVE"): - with contextlib.suppress(Exception): + try: await _persist_message( session, conversation_id, @@ -318,9 +353,24 @@ def _broadcast(evt: dict) -> None: "assistant", evt.get("payload", {}), sequence, + message_id=evt.get("message_id"), ) await session.commit() sequence += 1 + except Exception as persist_exc: + # A failed commit (e.g. duplicate PK) leaves the async + # session in a doomed transaction — every subsequent + # commit on it would also fail until we rollback. + # Without this, one bad insert silently drops every + # later event in the same turn (incl. COMPLETE). + logger.warning( + "Persist failed for %s event in conv %s: %s", + evt.get("event"), + conversation_id, + persist_exc, + ) + with contextlib.suppress(Exception): + await session.rollback() if evt.get("event") == "TOOL_RESULT": tool_name = evt.get("payload", {}).get("tool_name", "") @@ -405,6 +455,14 @@ async def send_message( if not body.text.strip(): raise HTTPException(status_code=422, detail="Message text cannot be empty") + if body.source: + logger.info( + "Message in conv %s sourced from %s (app_id=%s)", + conversation_id, + body.source, + body.app_id, + ) + stream = ConvStream(task=None) _active_streams[conversation_id] = stream stream.task = asyncio.create_task( @@ -414,6 +472,10 @@ async def send_message( body.text.strip(), conv.engine_name, settings.sse_keepalive_interval, + source=body.source, + app_id=body.app_id, + origin_message_id=body.origin_message_id, + display_text=body.display_text, ) ) diff --git a/backend/src/analytics_agent/api/mcp_apps.py b/backend/src/analytics_agent/api/mcp_apps.py new file mode 100644 index 0000000..619fcd2 --- /dev/null +++ b/backend/src/analytics_agent/api/mcp_apps.py @@ -0,0 +1,279 @@ +"""MCP App endpoints. + +Phase 1: GET /api/conversations/{conversation_id}/mcp-app/{message_id}/ui + +Resolves the persisted {connection_key, resource_uri} from the MCP_APP message +row, fetches HTML via MCPResourceClient (cache-first, live-server fallback), and +returns {html, csp, permissions}. + +Returns HTTP 404 when both the cache and the live MCP server are unavailable. + +Phase 2: POST /api/conversations/{conversation_id}/mcp-app/{app_id}/tool-call + +Scoped MCP tool proxy. The iframe sends {tool_name, arguments}; this endpoint: + 1. Loads the PendingApp from the in-memory side-channel (fast path, live + during streaming) or rehydrates from the persisted MCP_APP message row. + 2. Validates that (connection_key, tool_name) is in the per-app allow-list + so the iframe can never reach engine tools, other MCP servers, or + create_chart. + 3. Dispatches tools/call on the originating MCP client session. + 4. Returns the CallToolResult as JSON. +""" + +from __future__ import annotations + +import logging +from typing import Any + +import orjson +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from sqlalchemy.ext.asyncio import AsyncSession + +from analytics_agent.db.base import get_session +from analytics_agent.db.repository import ConversationRepo, MessageRepo + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/conversations", tags=["mcp-apps"]) + + +# ── Phase 2 helpers ─────────────────────────────────────────────────────────── + + +class _AppContext: + """Minimal info needed to authorise and dispatch a scoped tool call.""" + + def __init__(self, connection_key: str, server_name: str, allowed_tools: list[str]) -> None: + self.connection_key = connection_key + self.server_name = server_name + self.allowed_tools = allowed_tools + + +async def _load_app_context( + conversation_id: str, + app_id: str, + session: AsyncSession, +) -> _AppContext: + """Return app context from the in-memory side-channel or the DB row. + + Prefers the in-memory PendingApp (available while streaming is active or + before it was popped by on_tool_end). Falls back to scanning the + conversation's MCP_APP message rows when the in-memory entry is gone + (replay / post-stream calls). + + Raises HTTPException 404 if neither source has a record for this app_id. + """ + from analytics_agent.agent.mcp_app_tool import _pending_apps + + pending = _pending_apps.get(app_id) + if pending is not None: + return _AppContext( + connection_key=pending.connection_key, + server_name=pending.server_name, + allowed_tools=pending.allowed_tools, + ) + + # Rehydrate from persisted message rows. + msg_repo = MessageRepo(session) + messages = await msg_repo.list_for_conversation(conversation_id) + for msg in messages: + if msg.event_type != "MCP_APP" or not msg.payload: + continue + try: + payload: dict = orjson.loads(msg.payload) + except Exception: + continue + if payload.get("app_id") == app_id: + connection_key = payload.get("connection_key", "") + server_name = payload.get("server_name", "") + allowed_tools: list[str] = payload.get("allowed_tools") or [] + if not connection_key: + break + return _AppContext( + connection_key=connection_key, + server_name=server_name, + allowed_tools=allowed_tools, + ) + + raise HTTPException(status_code=404, detail=f"MCP App not found: app_id={app_id!r}") + + +async def _dispatch_tool_call( + connection_key: str, + server_name: str, + tool_name: str, + arguments: dict[str, Any], +) -> dict: + """Call tool_name on the originating MCP client; return the CallToolResult dict.""" + from analytics_agent.context.mcp_resources import _mcp_clients + + if connection_key not in _mcp_clients: + raise HTTPException( + status_code=503, + detail=f"MCP client for connection {connection_key!r} is not available — " + "the server may have restarted. Reload the page to reconnect.", + ) + + client, _sname = _mcp_clients[connection_key] + if server_name and _sname != server_name: + # server_name mismatch — use the one from the registry to be safe + logger.warning( + "server_name mismatch for connection_key=%s: stored=%s registry=%s", + connection_key, + server_name, + _sname, + ) + server_name = _sname + + try: + async with client.session(server_name) as mcp_session: + result = await mcp_session.call_tool(tool_name, arguments) + except Exception as exc: + logger.error( + "MCP tool call failed: connection=%s server=%s tool=%s: %s", + connection_key, + server_name, + tool_name, + exc, + ) + raise HTTPException( + status_code=502, + detail=f"MCP tool call failed: {exc}", + ) from exc + + # Serialise the result into a plain dict (content blocks list). + content: list[dict] = [] + for block in result.content or []: + if hasattr(block, "model_dump"): + content.append(block.model_dump()) + elif isinstance(block, dict): + content.append(block) + else: + content.append({"type": "text", "text": str(block)}) + + return { + "content": content, + "isError": getattr(result, "isError", False) or False, + } + + +class ToolCallRequest(BaseModel): + tool_name: str + arguments: dict[str, Any] = {} + + +# ── Endpoints ───────────────────────────────────────────────────────────────── + + +@router.get("/{conversation_id}/mcp-app/{message_id}/ui") +async def get_mcp_app_ui( + conversation_id: str, + message_id: str, + session: AsyncSession = Depends(get_session), +): + """Return the HTML, CSP, and permissions for a persisted MCP App message. + + Cache-first: serves from disk cache within TTL. Falls back to a live + resources/read call when the cache is stale or missing. Returns 404 with + a structured placeholder when both the cache and the live server are + unavailable. + """ + conv_repo = ConversationRepo(session) + conv = await conv_repo.get(conversation_id) + if not conv: + raise HTTPException(status_code=404, detail="Conversation not found") + + msg_repo = MessageRepo(session) + message = await msg_repo.get_by_id(message_id) + + if message is None or message.conversation_id != conversation_id: + raise HTTPException(status_code=404, detail="Message not found") + + if message.event_type != "MCP_APP": + raise HTTPException(status_code=404, detail="Message is not an MCP App event") + + try: + payload: dict = orjson.loads(message.payload) + except Exception: + raise HTTPException(status_code=500, detail="Corrupt message payload") + + connection_key: str | None = payload.get("connection_key") + resource_uri: str | None = payload.get("resource_uri") + + if not connection_key or not resource_uri: + raise HTTPException( + status_code=404, + detail="Message payload missing connection_key or resource_uri", + ) + + from analytics_agent.context.mcp_resources import resource_client + + try: + result = await resource_client.read_ui_resource(connection_key, resource_uri) + except RuntimeError as exc: + logger.warning( + "MCP app UI unavailable for message_id=%s connection_key=%s uri=%s: %s", + message_id, + connection_key, + resource_uri, + exc, + ) + raise HTTPException( + status_code=404, + detail={ + "message": "MCP app HTML unavailable — server offline and no cache", + "connection_key": connection_key, + "resource_uri": resource_uri, + }, + ) + + return { + "html": result["html"], + "csp": result.get("csp"), + "permissions": result.get("permissions", []), + "is_stale": result.get("is_stale", False), + } + + +@router.post("/{conversation_id}/mcp-app/{app_id}/tool-call") +async def mcp_app_tool_call( + conversation_id: str, + app_id: str, + body: ToolCallRequest, + session: AsyncSession = Depends(get_session), +): + """Phase 2: scoped MCP tool proxy for iframe apps. + + Validates the requested (connection_key, tool_name) pair against the + per-app allow-list, then dispatches tools/call on the originating MCP + client session. Never routes to engine tools, other connections, or + create_chart. + """ + conv_repo = ConversationRepo(session) + conv = await conv_repo.get(conversation_id) + if not conv: + raise HTTPException(status_code=404, detail="Conversation not found") + + app_ctx = await _load_app_context(conversation_id, app_id, session) + + # Security: ensure the requested tool is in the per-app allow-list. + if app_ctx.allowed_tools and body.tool_name not in app_ctx.allowed_tools: + logger.warning( + "Blocked tool call from iframe: app_id=%s tool=%s allowed=%s", + app_id, + body.tool_name, + app_ctx.allowed_tools, + ) + raise HTTPException( + status_code=403, + detail=f"Tool {body.tool_name!r} is not in the allow-list for this app.", + ) + + result = await _dispatch_tool_call( + connection_key=app_ctx.connection_key, + server_name=app_ctx.server_name, + tool_name=body.tool_name, + arguments=body.arguments, + ) + return result diff --git a/backend/src/analytics_agent/context/mcp_platform.py b/backend/src/analytics_agent/context/mcp_platform.py index 81223bc..eb3d192 100644 --- a/backend/src/analytics_agent/context/mcp_platform.py +++ b/backend/src/analytics_agent/context/mcp_platform.py @@ -110,6 +110,14 @@ async def get_tools(self) -> list[BaseTool]: client = MultiServerMCPClient({self.name: conn}) # type: ignore[dict-item] all_tools = await client.get_tools() + + from analytics_agent.context.mcp_ui import wrap_tools_with_ui_resources + + connection_key = f"ctx:{self.name}" + all_tools = await wrap_tools_with_ui_resources( + connection_key, client, self.name, all_tools + ) + self._tools_cache = all_tools result = [t for t in all_tools if t.name not in self.disabled_tools] logger.info( diff --git a/backend/src/analytics_agent/context/mcp_resources.py b/backend/src/analytics_agent/context/mcp_resources.py new file mode 100644 index 0000000..8634f04 --- /dev/null +++ b/backend/src/analytics_agent/context/mcp_resources.py @@ -0,0 +1,240 @@ +"""MCPResourceClient — reads ui:// resources from MCP servers. + +Reuses the MultiServerMCPClient instances in the _mcp_clients registry so we +open a fresh session (per resources/read call) without duplicating connection +config or spinning up a second stdio process. + +Disk cache layout:: + + ./data/mcp-app-cache/.html + ./data/mcp-app-cache/.meta.json + +The .meta.json sidecar holds {etag, last_modified, fetched_at, content_hash}. +Cache policy: + - Within TTL (default 1 h): serve from disk without contacting the server. + - Past TTL: re-validate using resources/read; compare content_hash if the + server doesn't return ETag/Last-Modified. + - Server offline past TTL: return stale cache (HTTP 200) with + is_stale=True; 404 only when both cache and live server are unavailable. +""" + +from __future__ import annotations + +import hashlib +import logging +import time +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import orjson + +if TYPE_CHECKING: + from langchain_mcp_adapters.client import MultiServerMCPClient + +logger = logging.getLogger(__name__) + +# Process-local registry populated by mcp_ui.register_mcp_client(). +# Maps connection_key -> (MultiServerMCPClient, server_name). +_mcp_clients: dict[str, tuple[Any, str]] = {} + +# Default disk-cache TTL in seconds (1 hour). +_DEFAULT_TTL = 3600 + +_CACHE_DIR: Path | None = None + + +def _get_cache_dir() -> Path: + global _CACHE_DIR # noqa: PLW0603 + if _CACHE_DIR is None: + from analytics_agent.config import settings + + db_path = Path(settings.database_url.split("///")[-1]) + data_dir = db_path.parent if db_path.parent.exists() else Path("data") + _CACHE_DIR = data_dir / "mcp-app-cache" + _CACHE_DIR.mkdir(parents=True, exist_ok=True) + return _CACHE_DIR + + +def _cache_key(connection_key: str, uri: str) -> str: + return hashlib.sha256(f"{connection_key}\x00{uri}".encode()).hexdigest() + + +def _html_path(key: str) -> Path: + return _get_cache_dir() / f"{key}.html" + + +def _meta_path(key: str) -> Path: + return _get_cache_dir() / f"{key}.meta.json" + + +def _read_cache(key: str) -> tuple[str | None, dict]: + """Return (html, meta) from disk; html is None on miss.""" + html_file = _html_path(key) + meta_file = _meta_path(key) + if not html_file.exists() or not meta_file.exists(): + return None, {} + try: + html = html_file.read_text(encoding="utf-8") + meta = orjson.loads(meta_file.read_bytes()) + return html, meta + except Exception: + return None, {} + + +def _write_cache(key: str, html: str, meta: dict) -> None: + try: + _html_path(key).write_text(html, encoding="utf-8") + _meta_path(key).write_bytes(orjson.dumps(meta)) + except Exception: + logger.warning("MCP app cache write failed for key=%s", key) + + +def _is_fresh(meta: dict, ttl: int) -> bool: + fetched_at = meta.get("fetched_at", 0) + return (time.time() - fetched_at) < ttl + + +class MCPResourceClient: + """Reads ui:// resources from MCP servers with disk caching. + + Instantiate once per process; the underlying MultiServerMCPClient instances + are shared via the _mcp_clients registry. + """ + + def __init__(self, ttl: int = _DEFAULT_TTL) -> None: + self.ttl = ttl + + async def read_ui_resource( + self, + connection_key: str, + uri: str, + *, + use_cache: bool = True, + ) -> dict: + """Fetch a ui:// resource. + + Returns a dict with keys: html, csp, permissions, etag, last_modified, + is_stale (True when falling back to a stale cache entry). + + Raises RuntimeError if both cache and live server are unavailable. + """ + key = _cache_key(connection_key, uri) + cached_html, cached_meta = _read_cache(key) + + if use_cache and cached_html is not None and _is_fresh(cached_meta, self.ttl): + logger.debug("MCP app cache hit (fresh) for uri=%s", uri) + return { + "html": cached_html, + "csp": cached_meta.get("csp"), + "permissions": cached_meta.get("permissions", []), + "etag": cached_meta.get("etag"), + "last_modified": cached_meta.get("last_modified"), + "is_stale": False, + } + + # Need to fetch / re-validate from the server. + try: + result = await self._fetch_from_server(connection_key, uri, cached_meta) + except Exception as exc: + if cached_html is not None: + logger.warning( + "MCP server unavailable for uri=%s; serving stale cache: %s", + uri, + exc, + ) + return { + "html": cached_html, + "csp": cached_meta.get("csp"), + "permissions": cached_meta.get("permissions", []), + "etag": cached_meta.get("etag"), + "last_modified": cached_meta.get("last_modified"), + "is_stale": True, + } + raise RuntimeError( + f"MCP resource unavailable and no cache for uri={uri!r}: {exc}" + ) from exc + + if result is None: + # Server said "not modified" (304-style). + assert cached_html is not None + new_meta = {**cached_meta, "fetched_at": time.time()} + _write_cache(key, cached_html, new_meta) + return { + "html": cached_html, + "csp": cached_meta.get("csp"), + "permissions": cached_meta.get("permissions", []), + "etag": cached_meta.get("etag"), + "last_modified": cached_meta.get("last_modified"), + "is_stale": False, + } + + html, new_meta = result + _write_cache(key, html, new_meta) + return { + "html": html, + "csp": new_meta.get("csp"), + "permissions": new_meta.get("permissions", []), + "etag": new_meta.get("etag"), + "last_modified": new_meta.get("last_modified"), + "is_stale": False, + } + + async def _fetch_from_server( + self, + connection_key: str, + uri: str, + cached_meta: dict, + ) -> tuple[str, dict] | None: + """Fetch resource from the MCP server. + + Returns (html, new_meta) on success, None if content is unchanged. + Raises on failure. + """ + if connection_key not in _mcp_clients: + raise RuntimeError( + f"No MCP client registered for connection_key={connection_key!r}" + ) + + client, server_name = _mcp_clients[connection_key] + + from pydantic import AnyUrl + + async with client.session(server_name) as session: + result = await session.read_resource(AnyUrl(uri)) + + if not result.contents: + raise RuntimeError(f"Empty resource response for uri={uri!r}") + + first = result.contents[0] + if not hasattr(first, "text"): + raise RuntimeError( + f"Resource {uri!r} returned non-text content (type={type(first).__name__})" + ) + html: str = first.text + + # Compare content hash for change detection when server doesn't send ETags. + content_hash = hashlib.sha256(html.encode()).hexdigest() + if cached_meta.get("content_hash") == content_hash: + return None # unchanged + + new_meta: dict = { + "fetched_at": time.time(), + "content_hash": content_hash, + "etag": None, + "last_modified": None, + "csp": None, + "permissions": [], + } + + # Extract optional csp/permissions from resource metadata (if exposed). + meta_obj = getattr(result, "meta", None) or {} + if isinstance(meta_obj, dict): + ui = meta_obj.get("ui") or {} + new_meta["csp"] = ui.get("csp") + new_meta["permissions"] = ui.get("permissions") or [] + + return html, new_meta + + +# Module-level singleton shared across call sites. +resource_client = MCPResourceClient() diff --git a/backend/src/analytics_agent/context/mcp_ui.py b/backend/src/analytics_agent/context/mcp_ui.py new file mode 100644 index 0000000..f1912b6 --- /dev/null +++ b/backend/src/analytics_agent/context/mcp_ui.py @@ -0,0 +1,220 @@ +"""MCP UI helpers — client registry + tool wrapping. + +Two responsibilities: +1. _mcp_clients registry: populated when a platform/engine is built; provides + the MultiServerMCPClient reference that MCPResourceClient uses for + resources/read calls. +2. wrap_tools_with_ui_resources(): called from both MCPContextPlatform.get_tools() + and MCPQueryEngine.get_tools_async(). For every tool that advertises a + _meta.ui.resourceUri, it: + a. Kicks off an asyncio.create_task() prefetch of the resource into the disk + cache so the cache is warm by the time the agent calls the tool. + b. Replaces the tool with a wrapper that, after the underlying MCP tool/call + returns, stuffs {tool_result, connection_key, tool_name, tool_input, + resource_uri, csp, permissions} into _pending_apps and returns the short + marker MCP_APP_READY:. +""" + +from __future__ import annotations + +import asyncio +import logging +import uuid +from typing import TYPE_CHECKING, Any + +import orjson + +if TYPE_CHECKING: + from langchain_core.tools import BaseTool + +logger = logging.getLogger(__name__) + + +_DEFAULT_UI_AGENT_INSTRUCTIONS = ( + "An interactive UI card has been rendered inline in the chat for the user to " + "act on. Do not restate the options, do not call any other tools, and do not " + "answer the user's question yet. Stop and wait for the user's next message." +) + + +def _extract_agent_instructions(content_blocks: list[dict]) -> str | None: + """Return the `_agent_instructions` string from any JSON-bearing text block. + + MCP servers that render UI can embed `_agent_instructions` in their tool + output so the host model knows to stop and wait for user interaction. Our + wrapper previously swallowed the entire payload by returning only a marker, + which meant the LLM never saw this directive and would happily call more + tools. Surface it back into the tool's return value so it lands in the + ToolMessage the agent reads. + """ + for block in content_blocks: + if not isinstance(block, dict) or block.get("type") != "text": + continue + text = block.get("text", "") + if not isinstance(text, str) or "_agent_instructions" not in text: + continue + try: + data = orjson.loads(text) + except Exception: + continue + if isinstance(data, dict): + instructions = data.get("_agent_instructions") + if isinstance(instructions, str) and instructions.strip(): + return instructions.strip() + return None + + +def register_mcp_client( + connection_key: str, + client: Any, + server_name: str, +) -> None: + """Register a MultiServerMCPClient in the shared registry (mcp_resources._mcp_clients).""" + from analytics_agent.context.mcp_resources import _mcp_clients + + _mcp_clients[connection_key] = (client, server_name) + logger.debug("Registered MCP client for connection_key=%s", connection_key) + + +def _get_ui_meta(tool: BaseTool) -> dict: + """Extract _meta.ui from a LangChain tool's metadata (may be empty).""" + meta = (tool.metadata or {}).get("_meta") or {} + if isinstance(meta, dict): + return meta.get("ui") or {} + return {} + + +async def _prefetch_resource(connection_key: str, uri: str) -> None: + """Best-effort prefetch of a ui:// resource into the disk cache.""" + try: + from analytics_agent.context.mcp_resources import resource_client + + await resource_client.read_ui_resource(connection_key, uri, use_cache=False) + logger.debug("Prefetched MCP ui resource: connection=%s uri=%s", connection_key, uri) + except Exception as exc: + logger.warning( + "MCP ui resource prefetch failed (connection=%s uri=%s): %s", + connection_key, + uri, + exc, + ) + + +def _make_content_blocks(raw: Any) -> list[dict]: + """Coerce an ainvoke() return value into standard MCP content blocks. + + The MCP Apps spec requires `ui/notifications/tool-result` params to be a + CallToolResult (`{content: [{type, text, ...}, ...]}`). Preserve structured + content so the iframe app can parse it; wrap plain strings as a single + text block. + """ + if isinstance(raw, list): + blocks: list[dict] = [] + for block in raw: + if isinstance(block, dict): + blocks.append(block) + elif isinstance(block, str): + blocks.append({"type": "text", "text": block}) + return blocks + if isinstance(raw, str): + return [{"type": "text", "text": raw}] + return [{"type": "text", "text": str(raw)}] + + +def _wrap_tool( + original: BaseTool, + connection_key: str, + server_name: str, + resource_uri: str, + csp: str | None, + permissions: list[str], + allowed_tools: list[str], +) -> BaseTool: + """Return a new StructuredTool that calls original then stuffs _pending_apps.""" + from langchain_core.tools import StructuredTool + + from analytics_agent.agent.mcp_app_tool import PendingApp, _pending_apps + + tool_name = original.name + + async def _wrapper(**kwargs: Any) -> str: + raw = await original.ainvoke(kwargs) + content_blocks = _make_content_blocks(raw) + + app_id = str(uuid.uuid4()) + _pending_apps[app_id] = PendingApp( + app_id=app_id, + connection_key=connection_key, + server_name=server_name, + tool_name=tool_name, + tool_input=dict(kwargs), + tool_result=content_blocks, + resource_uri=resource_uri, + csp=csp, + permissions=list(permissions), + allowed_tools=list(allowed_tools), + ) + # Preserve the server-provided `_agent_instructions` (or fall back to a + # sensible default) so the LLM actually receives a directive to stop + # and wait for the user. streaming.py still parses the leading + # `MCP_APP_READY:` marker on the first line. + instructions = ( + _extract_agent_instructions(content_blocks) + or _DEFAULT_UI_AGENT_INSTRUCTIONS + ) + return f"MCP_APP_READY:{app_id} ({tool_name})\n{instructions}" + + return StructuredTool( + name=original.name, + description=original.description, + args_schema=original.args_schema, + coroutine=_wrapper, + metadata=original.metadata, + ) + + +async def wrap_tools_with_ui_resources( + connection_key: str, + client: Any, + server_name: str, + tools: list[BaseTool], +) -> list[BaseTool]: + """Register the client and wrap any UI-bearing tools. + + For each tool whose MCP descriptor carries _meta.ui.resourceUri: + - Kick off an asyncio.create_task() prefetch into the disk cache. + - Replace the tool with a wrapper that emits MCP_APP_READY:. + + Tools without a resourceUri are returned unchanged. + """ + register_mcp_client(connection_key, client, server_name) + + # Collect all tool names on this connection for the Phase 2 allow-list. + all_tool_names = [t.name for t in tools] + + result: list[BaseTool] = [] + for tool in tools: + ui_meta = _get_ui_meta(tool) + resource_uri: str | None = ui_meta.get("resourceUri") + if not resource_uri: + result.append(tool) + continue + + csp: str | None = ui_meta.get("csp") + permissions: list[str] = ui_meta.get("permissions") or [] + + # Fire-and-forget prefetch so the cache is warm before the agent calls the tool. + asyncio.create_task( + _prefetch_resource(connection_key, resource_uri), + name=f"mcp-prefetch-{connection_key}-{tool.name}", + ) + + wrapped = _wrap_tool( + tool, connection_key, server_name, resource_uri, csp, permissions, all_tool_names + ) + result.append(wrapped) + logger.info( + "MCP tool '%s' wrapped with UI resource: %s", tool.name, resource_uri + ) + + return result diff --git a/backend/src/analytics_agent/db/repository.py b/backend/src/analytics_agent/db/repository.py index 3487f5f..9439b36 100644 --- a/backend/src/analytics_agent/db/repository.py +++ b/backend/src/analytics_agent/db/repository.py @@ -90,6 +90,9 @@ async def create(self, message: Message) -> Message: await self._session.commit() return message + async def get_by_id(self, message_id: str) -> Message | None: + return await self._session.get(Message, message_id) + async def list_for_conversation(self, conversation_id: str) -> list[Message]: result = await self._session.execute( select(Message) diff --git a/backend/src/analytics_agent/engines/mcp/engine.py b/backend/src/analytics_agent/engines/mcp/engine.py index b0ab318..9ba25a4 100644 --- a/backend/src/analytics_agent/engines/mcp/engine.py +++ b/backend/src/analytics_agent/engines/mcp/engine.py @@ -70,6 +70,15 @@ async def get_tools_async(self) -> list[BaseTool]: client = MultiServerMCPClient({"engine": conn}) # type: ignore[dict-item] tools = await client.get_tools() + + from analytics_agent.context.mcp_ui import wrap_tools_with_ui_resources + + # Use a stable key derived from the MCP config so multiple MCP engines + # can coexist in the same process without registry collisions. + engine_label = self._mcp_cfg.get("name") or self._mcp_cfg.get("url", "engine") + connection_key = f"engine:{engine_label}" + tools = await wrap_tools_with_ui_resources(connection_key, client, "engine", tools) + logger.info("MCP engine provided %d tools", len(tools)) return tools diff --git a/backend/src/analytics_agent/prompts/system.py b/backend/src/analytics_agent/prompts/system.py index c925791..7c194fe 100644 --- a/backend/src/analytics_agent/prompts/system.py +++ b/backend/src/analytics_agent/prompts/system.py @@ -11,6 +11,7 @@ def get_prompt_template() -> str: def build_system_prompt( engine_name: str, enabled_skills: set[str] | None = None, + include_business_context: bool = True, ) -> str: from analytics_agent.skills.loader import ( get_improve_context_prompt_section, @@ -21,8 +22,10 @@ def build_system_prompt( today = date.today().strftime("%B %d, %Y") base = get_prompt_template().format(engine_name=engine_name, today=today) - # Always inject always-on meta-skills - base = base + get_search_business_context_section() + # Always inject always-on meta-skills. `search_business_context` is skipped + # when a richer `get_context` tool is in play (see build_graph). + if include_business_context: + base = base + get_search_business_context_section() base = base + get_improve_context_prompt_section() if enabled_skills: diff --git a/backend/src/analytics_agent/prompts/system_prompt.md b/backend/src/analytics_agent/prompts/system_prompt.md index 02e6af7..2927fe8 100644 --- a/backend/src/analytics_agent/prompts/system_prompt.md +++ b/backend/src/analytics_agent/prompts/system_prompt.md @@ -11,7 +11,7 @@ Your goal is to answer the user's data questions by: ## Available tool groups **DataHub (catalog & context)** -- search_documents: search business documentation, metric definitions, and domain knowledge — USE THIS FIRST +- search_documents: search business documentation, metric definitions, and domain knowledge - grep_documents: search document content for specific terms or patterns - search: find datasets, dashboards, and other data assets by keyword - get_entities: get detailed metadata (schema, description, owners) for specific assets — takes a LIST OF URNs only (no filters); call search() first to get URNs, then pass them here @@ -124,7 +124,7 @@ me about whether I'm querying the right table?* ## Workflow -For every data question, follow this order: +For every data question, follow this order only if "get_context" tool is unavailable: **Step 1 — Understand the business context first** Always call search_documents before doing anything else. Search for: diff --git a/frontend/src/api/mcpApp.ts b/frontend/src/api/mcpApp.ts new file mode 100644 index 0000000..ec8e667 --- /dev/null +++ b/frontend/src/api/mcpApp.ts @@ -0,0 +1,62 @@ +const BASE = "/api"; + +export interface McpAppUi { + html: string; + csp: string | null; + permissions: string[]; +} + +/** + * Fetch the HTML for an MCP App by message ID. + * + * The backend serves this from the disk cache (warmed by the prefetch + * at tool-discovery time) or falls back to a live resources/read. + * Returns null if both cache and server are unavailable (404). + */ +export async function fetchMcpAppUi( + conversationId: string, + messageId: string +): Promise { + const res = await fetch( + `${BASE}/conversations/${conversationId}/mcp-app/${messageId}/ui` + ); + if (res.status === 404) return null; + if (!res.ok) throw new Error(`Failed to fetch MCP App UI: ${res.status}`); + return res.json(); +} + +export interface McpToolCallResult { + content: Array<{ type: string; text?: string; [k: string]: unknown }>; + isError?: boolean; +} + +/** + * Phase 2: scoped MCP tool proxy. + * + * Called by useMcpAppBridge when the iframe sends a `tools/call` JSON-RPC + * request. The backend validates the (connection_key, tool_name) allow-list + * before dispatching to the originating MCP server. + * + * Throws on network / HTTP errors; the hook re-raises as a JSON-RPC error + * reply to the iframe. + */ +export async function callMcpAppTool( + conversationId: string, + appId: string, + toolName: string, + args: Record = {} +): Promise { + const res = await fetch( + `${BASE}/conversations/${conversationId}/mcp-app/${appId}/tool-call`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ tool_name: toolName, arguments: args }), + } + ); + if (!res.ok) { + const detail = await res.text().catch(() => res.statusText); + throw new Error(`MCP tool call failed (${res.status}): ${detail}`); + } + return res.json(); +} diff --git a/frontend/src/api/stream.ts b/frontend/src/api/stream.ts index 9bda768..e103f3d 100644 --- a/frontend/src/api/stream.ts +++ b/frontend/src/api/stream.ts @@ -33,12 +33,14 @@ export async function* reattachStream( export async function* streamMessage( conversationId: string, text: string, - signal?: AbortSignal + signal?: AbortSignal, + /** Extra body fields forwarded verbatim (e.g. source, app_id for auditing). */ + extraBody?: Record ): AsyncIterator { const res = await fetch(`/api/conversations/${conversationId}/messages`, { method: "POST", headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ text }), + body: JSON.stringify({ text, ...extraBody }), signal, }); diff --git a/frontend/src/components/Chat/ChatView.tsx b/frontend/src/components/Chat/ChatView.tsx index f911085..f58f11a 100644 --- a/frontend/src/components/Chat/ChatView.tsx +++ b/frontend/src/components/Chat/ChatView.tsx @@ -351,6 +351,7 @@ export function ChatView() { { if (chartErrorRetried.current || isStreaming) return; diff --git a/frontend/src/components/Chat/MessageList.tsx b/frontend/src/components/Chat/MessageList.tsx index 072f438..175a6e5 100644 --- a/frontend/src/components/Chat/MessageList.tsx +++ b/frontend/src/components/Chat/MessageList.tsx @@ -2,16 +2,18 @@ import { Fragment, useEffect, useRef } from "react"; import type { UIMessage } from "@/types"; import { TextMessage } from "./messages/TextMessage"; import { AgentWorkBlock } from "./messages/AgentWorkBlock"; +import { SelectionChip, type SelectionChipPayload } from "./messages/SelectionChip"; import { groupIntoTurns } from "@/lib/groupMessages"; interface Props { messages: UIMessage[]; isStreaming?: boolean; + conversationId?: string; showReasoning?: boolean; onChartError?: (error: string) => void; } -export function MessageList({ messages, isStreaming = false, showReasoning = true, onChartError }: Props) { +export function MessageList({ messages, isStreaming = false, conversationId, showReasoning = true, onChartError }: Props) { const bottomRef = useRef(null); useEffect(() => { @@ -30,42 +32,55 @@ export function MessageList({ messages, isStreaming = false, showReasoning = tru return (
- {groups.map((group) => ( - - {/* User message */} - {group.userMsg && ( -
- -
- )} - - {/* Agent work block — only shown when there are intermediate steps */} - {group.workMsgs.length > 0 && ( - - )} + {groups.map((group) => { + // MCP App iframe-originated user turns render as a compact selection + // chip (flavor C) — not a full user bubble. + const isMcpAppSelection = + group.userMsg?.event_type === "TEXT" && + (group.userMsg.payload as Record).source === "mcp_app"; + return ( + + {/* User message */} + {group.userMsg && isMcpAppSelection && ( +
+ +
+ )} + {group.userMsg && !isMcpAppSelection && ( +
+ +
+ )} - {/* Final visible response */} - {group.finalMsg && (group.finalMsg.payload as { text?: string }).text?.trim() && ( -
- 0 && ( + -
- )} -
- ))} + )} + + {/* Final visible response */} + {group.finalMsg && (group.finalMsg.payload as { text?: string }).text?.trim() && ( +
+ +
+ )} +
+ ); + })}
); diff --git a/frontend/src/components/Chat/messages/AgentWorkBlock.tsx b/frontend/src/components/Chat/messages/AgentWorkBlock.tsx index 23f8c98..20534cf 100644 --- a/frontend/src/components/Chat/messages/AgentWorkBlock.tsx +++ b/frontend/src/components/Chat/messages/AgentWorkBlock.tsx @@ -7,6 +7,8 @@ import { ToolCallMessage, ToolResultMessage } from "./ToolCallMessage"; import { SqlMessage } from "./SqlMessage"; import { ChartMessage } from "./ChartMessage"; import { ErrorMessage } from "./ErrorMessage"; +import { MCPAppMessage } from "./MCPAppMessage"; +import type { MCPAppPayload } from "@/types"; function fmt(n: number): string { if (n < 1000) return String(n); @@ -19,6 +21,7 @@ interface Props { turnUsage?: TurnUsage; isStreaming?: boolean; showReasoning?: boolean; + conversationId?: string; onChartError?: (error: string) => void; } @@ -27,6 +30,7 @@ export function AgentWorkBlock({ turnUsage, isStreaming = false, showReasoning = true, + conversationId, onChartError, }: Props) { const [expanded, setExpanded] = useState(isStreaming); @@ -197,6 +201,13 @@ export function AgentWorkBlock({ {msg.event_type === "CHART" && ( )} + {msg.event_type === "MCP_APP" && conversationId && ( + + )} {msg.event_type === "ERROR" && ( )} diff --git a/frontend/src/components/Chat/messages/MCPAppMessage.tsx b/frontend/src/components/Chat/messages/MCPAppMessage.tsx new file mode 100644 index 0000000..e5f7056 --- /dev/null +++ b/frontend/src/components/Chat/messages/MCPAppMessage.tsx @@ -0,0 +1,123 @@ +/** + * MCPAppMessage — adapter between an MCP_APP SSE payload and MCPAppFrame. + * + * On mount, always fetches HTML via GET .../mcp-app/{message_id}/ui. + * The prefetched disk cache on the backend makes this fast for first render; + * the same code path handles replay when the conversation is reloaded. + * + * Shows a skeleton while the fetch is in flight, a graceful placeholder on + * 404 (both cache and server unavailable), and the sandboxed iframe once ready. + */ + +import { useEffect, useState } from "react"; +import { Loader2, AppWindow } from "lucide-react"; +import type { MCPAppPayload } from "@/types"; +import { fetchMcpAppUi } from "@/api/mcpApp"; +import { MCPAppFrame } from "@/lib/mcpApps/MCPAppFrame"; + +interface Props { + messageId: string; + conversationId: string; + payload: MCPAppPayload; +} + +type FetchState = + | { status: "loading" } + | { status: "ready"; html: string; csp: string | null; permissions: string[] } + | { status: "unavailable" } + | { status: "error"; message: string }; + +export function MCPAppMessage({ messageId, conversationId, payload }: Props) { + const [fetchState, setFetchState] = useState({ status: "loading" }); + const [appReady, setAppReady] = useState(false); + + useEffect(() => { + let cancelled = false; + setFetchState({ status: "loading" }); + setAppReady(false); + + fetchMcpAppUi(conversationId, messageId) + .then((ui) => { + if (cancelled) return; + if (ui === null) { + setFetchState({ status: "unavailable" }); + } else { + setFetchState({ + status: "ready", + html: ui.html, + csp: ui.csp, + permissions: ui.permissions, + }); + } + }) + .catch((err: unknown) => { + if (cancelled) return; + setFetchState({ + status: "error", + message: err instanceof Error ? err.message : String(err), + }); + }); + + return () => { + cancelled = true; + }; + }, [conversationId, messageId]); + + return ( +
+ {/* Header strip */} +
+ + {payload.tool_name} + {payload.server_name && ( + {payload.server_name} + )} +
+ + {/* Content area */} + {fetchState.status === "loading" && ( +
+ + Loading app… +
+ )} + + {fetchState.status === "unavailable" && ( +
+

App unavailable

+

+ The MCP server is offline and no cached version is available. +

+
+ )} + + {fetchState.status === "error" && ( +
+ {fetchState.message} +
+ )} + + {fetchState.status === "ready" && ( +
+ {!appReady && ( +
+ +
+ )} + setAppReady(true)} + className="w-full border-0 block" + conversationId={conversationId} + appId={payload.app_id} + originMessageId={messageId} + /> +
+ )} +
+ ); +} diff --git a/frontend/src/components/Chat/messages/SelectionChip.tsx b/frontend/src/components/Chat/messages/SelectionChip.tsx new file mode 100644 index 0000000..11127bd --- /dev/null +++ b/frontend/src/components/Chat/messages/SelectionChip.tsx @@ -0,0 +1,39 @@ +/** + * SelectionChip — compact pill rendered when an MCP App iframe posts a + * `ui/message` on behalf of the user (flavor C). + * + * Appears anchored under the selector iframe that originated the turn + * (referenced by origin_message_id in the persisted payload) instead of + * rendering a full user-bubble. Keeps the transcript auditable without a + * phantom "I picked X" user turn cluttering the conversation. + */ + +import { MousePointerClick } from "lucide-react"; + +export interface SelectionChipPayload { + /** Agent-facing text (may be verbose to prevent LLM re-disambiguation). */ + text: string; + /** Optional short label for display; falls back to `text` if absent. */ + display_text?: string; + source: "mcp_app"; + app_id?: string; + origin_message_id?: string; +} + +interface Props { + payload: SelectionChipPayload; +} + +export function SelectionChip({ payload }: Props) { + const label = payload.display_text?.trim() || payload.text; + return ( +
+ + {label} +
+ ); +} diff --git a/frontend/src/lib/buildUiMessages.ts b/frontend/src/lib/buildUiMessages.ts index 3de3426..88be52c 100644 --- a/frontend/src/lib/buildUiMessages.ts +++ b/frontend/src/lib/buildUiMessages.ts @@ -91,6 +91,7 @@ export function buildUiMessages(records: MessageRecord[]): { case "TOOL_RESULT": case "SQL": case "CHART": + case "MCP_APP": case "ERROR": result.push({ id: m.id, event_type: m.event_type, role: "assistant", payload: m.payload, created_at: m.created_at }); break; diff --git a/frontend/src/lib/groupMessages.ts b/frontend/src/lib/groupMessages.ts index aa3a76f..ef34836 100644 --- a/frontend/src/lib/groupMessages.ts +++ b/frontend/src/lib/groupMessages.ts @@ -1,6 +1,6 @@ import type { UIMessage, TurnUsage } from "@/types"; -const WORK_TYPES = new Set(["TOOL_CALL", "TOOL_RESULT", "SQL", "CHART", "ERROR", "THINKING"]); +const WORK_TYPES = new Set(["TOOL_CALL", "TOOL_RESULT", "SQL", "CHART", "MCP_APP", "ERROR", "THINKING"]); export interface TurnGroup { key: string; diff --git a/frontend/src/lib/mcpApps/MCPAppFrame.tsx b/frontend/src/lib/mcpApps/MCPAppFrame.tsx new file mode 100644 index 0000000..c87d4bd --- /dev/null +++ b/frontend/src/lib/mcpApps/MCPAppFrame.tsx @@ -0,0 +1,137 @@ +/** + * MCPAppFrame — sandboxed iframe wrapper for MCP App HTML. + * + * Security posture: + * - sandbox="allow-scripts" (no allow-same-origin, no allow-forms, etc.) + * - csp attribute on the iframe element (W3C, Chromium-enforced before any + * script runs; stronger than meta CSP because it supports frame-ancestors). + * - When csp is provided and the browser doesn't support the iframe csp + * attribute (non-Chromium), we inject a into + * the HTML before setting srcDoc as a documented weaker fallback. + * + * Wires up useMcpAppBridge so the app receives ui/initialize ack + + * ui/toolResult push as soon as it loads. + * + * Height: starts at MIN_FRAME_HEIGHT_PX and grows via + * `ui/notifications/size-changed` from the app. We never shrink below + * the min, and we ignore non-finite or non-positive values from the + * app (defensive — a buggy app shouldn't collapse the frame to 0). + */ + +import { useCallback, useEffect, useRef, useState } from "react"; +import { useMcpAppBridge } from "./useMcpAppBridge"; + +const MIN_FRAME_HEIGHT_PX = 120; + +export interface MCPAppFrameProps { + /** Raw HTML to render inside the iframe. */ + html: string; + /** Tool input arguments (what the agent called the tool with). */ + toolInput: Record; + /** Tool result to push to the app after ui/initialize. */ + toolResult: unknown; + /** CSP string from the MCP server's _meta.ui.csp field. */ + csp?: string | null; + /** permissions from the MCP server's _meta.ui.permissions field. */ + permissions?: string[]; + /** Called when the app sends ui/notifications/initialized. */ + onReady?: () => void; + className?: string; + /** + * Phase 2: conversation the frame lives in. Passed to useMcpAppBridge so + * tools/call requests can be routed to the correct backend endpoint. + */ + conversationId?: string; + /** + * Phase 2: the app_id for this MCP App instance — used as the path segment + * in POST .../mcp-app/{app_id}/tool-call. + */ + appId?: string; + /** + * Phase 2: the message ID of the MCP_APP message that created this frame. + * Forwarded to useMcpAppBridge so `ui/message` posts include it as + * `origin_message_id`, enabling the SelectionChip to be anchored under this + * selector on replay. + */ + originMessageId?: string; +} + +/** Returns true if the browser supports the iframe `csp` attribute. */ +function supportsCspAttribute(): boolean { + return "csp" in HTMLIFrameElement.prototype; +} + +/** Inject a as a documented weaker fallback. */ +function injectMetaCsp(html: string, csp: string): string { + const metaTag = ``; + if (/]*>/i.test(html)) { + return html.replace(/(]*>)/i, `$1\n ${metaTag}`); + } + // No — prepend before everything + return `${metaTag}\n${html}`; +} + +export function MCPAppFrame({ + html, + toolInput, + toolResult, + csp, + onReady, + className, + conversationId, + appId, + originMessageId, +}: MCPAppFrameProps) { + const iframeRef = useRef(null); + const [iframeWindow, setIframeWindow] = useState(null); + const [frameHeight, setFrameHeight] = useState(MIN_FRAME_HEIGHT_PX); + + // Resolve srcDoc: inject meta CSP fallback for non-Chromium if needed + const srcDoc = (() => { + if (csp && !supportsCspAttribute()) { + return injectMetaCsp(html, csp); + } + return html; + })(); + + const handleSizeChanged = useCallback((height: number) => { + setFrameHeight(Math.max(MIN_FRAME_HEIGHT_PX, Math.ceil(height))); + }, []); + + useMcpAppBridge({ + iframeWindow, + toolInput, + toolResult, + onReady, + onSizeChanged: handleSizeChanged, + conversationId, + appId, + originMessageId, + }); + + const handleLoad = useCallback(() => { + setIframeWindow(iframeRef.current?.contentWindow ?? null); + }, []); + + useEffect(() => { + setIframeWindow(null); + setFrameHeight(MIN_FRAME_HEIGHT_PX); + }, [srcDoc]); + + return ( +