diff --git a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/components/welcome-experience.tsx b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/components/welcome-experience.tsx index f41e4440d..a4f987efb 100644 --- a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/components/welcome-experience.tsx +++ b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/components/welcome-experience.tsx @@ -27,7 +27,7 @@ type WelcomeExperienceProps = { }; const WELCOME_MESSAGE = `Welcome to Ambient AI! Please select a workflow or type a message to get started.`; -const SETUP_MESSAGE = `Great! Give me a moment to get set up`; +const SETUP_MESSAGE = `Great! Give me a moment to get set up.`; export function WelcomeExperience({ ootbWorkflows, diff --git a/components/runners/claude-code-runner/main.py b/components/runners/claude-code-runner/main.py index 31e5bc8c4..3822c2532 100644 --- a/components/runners/claude-code-runner/main.py +++ b/components/runners/claude-code-runner/main.py @@ -2,6 +2,7 @@ AG-UI Server entry point for Claude Code runner. Implements the official AG-UI server pattern. """ + import asyncio import os import json @@ -26,6 +27,7 @@ # Flexible input model that matches what our frontend actually sends class RunnerInput(BaseModel): """Input model for runner with optional AG-UI fields.""" + threadId: Optional[str] = None thread_id: Optional[str] = None # Support both camelCase and snake_case runId: Optional[str] = None @@ -35,28 +37,30 @@ class RunnerInput(BaseModel): messages: List[Dict[str, Any]] state: Optional[Dict[str, Any]] = None tools: Optional[List[Any]] = None - context: Optional[Union[List[Any], Dict[str, Any]]] = None # Accept both list and dict, convert to list + context: Optional[Union[List[Any], Dict[str, Any]]] = ( + None # Accept both list and dict, convert to list + ) forwardedProps: Optional[Dict[str, Any]] = None environment: Optional[Dict[str, str]] = None metadata: Optional[Dict[str, Any]] = None - + def to_run_agent_input(self) -> RunAgentInput: """Convert to official RunAgentInput model.""" import uuid - + # Normalize field names (prefer camelCase for AG-UI) thread_id = self.threadId or self.thread_id run_id = self.runId or self.run_id parent_run_id = self.parentRunId or self.parent_run_id - + # Generate runId if not provided if not run_id: run_id = str(uuid.uuid4()) logger.info(f"Generated run_id: {run_id}") - + # Context should be a list, not a dict context_list = self.context if isinstance(self.context, list) else [] - + return RunAgentInput( thread_id=thread_id, run_id=run_id, @@ -68,6 +72,7 @@ def to_run_agent_input(self) -> RunAgentInput: forwarded_props=self.forwardedProps or {}, ) + # Global context and adapter context: Optional[RunnerContext] = None adapter = None # Will be ClaudeCodeAdapter after initialization @@ -77,118 +82,133 @@ def to_run_agent_input(self) -> RunAgentInput: async def lifespan(app: FastAPI): """Initialize and cleanup application resources.""" global context, adapter - + # Import adapter here to avoid circular imports from adapter import ClaudeCodeAdapter - + # Initialize context from environment session_id = os.getenv("SESSION_ID", "unknown") workspace_path = os.getenv("WORKSPACE_PATH", "/workspace") - + logger.info(f"Initializing AG-UI server for session {session_id}") - + context = RunnerContext( session_id=session_id, workspace_path=workspace_path, ) - + adapter = ClaudeCodeAdapter() adapter.context = context - + logger.info("Adapter initialized - fresh client will be created for each run") - + # Check if this is a resume session via IS_RESUME env var # This is set by the operator when restarting a stopped/completed/failed session is_resume = os.getenv("IS_RESUME", "").strip().lower() == "true" if is_resume: logger.info("IS_RESUME=true - this is a resumed session") - + # INITIAL_PROMPT is no longer auto-executed on startup # User must explicitly send the first message to start the conversation # Workflow greetings are still triggered when a workflow is activated initial_prompt = os.getenv("INITIAL_PROMPT", "").strip() if initial_prompt: - logger.info(f"INITIAL_PROMPT detected ({len(initial_prompt)} chars) but not auto-executing (user will send first message)") - + logger.info( + f"INITIAL_PROMPT detected ({len(initial_prompt)} chars) but not auto-executing (user will send first message)" + ) + logger.info(f"AG-UI server ready for session {session_id}") - + yield - + # Cleanup logger.info("Shutting down AG-UI server...") async def auto_execute_initial_prompt(prompt: str, session_id: str): """Auto-execute INITIAL_PROMPT by POSTing to backend after short delay. - + The delay gives the runner service time to register in DNS. Backend has retry logic to handle if Service DNS isn't ready yet, so this can be short. - + Only called for fresh sessions (no hydrated state in .claude/). """ import uuid import aiohttp - + # Configurable delay (default 1s, was 3s) # Backend has retry logic, so we don't need to wait long delay_seconds = float(os.getenv("INITIAL_PROMPT_DELAY_SECONDS", "1")) - logger.info(f"Waiting {delay_seconds}s before auto-executing INITIAL_PROMPT (allow Service DNS to propagate)...") + logger.info( + f"Waiting {delay_seconds}s before auto-executing INITIAL_PROMPT (allow Service DNS to propagate)..." + ) await asyncio.sleep(delay_seconds) - + logger.info("Auto-executing INITIAL_PROMPT via backend POST...") - + # Get backend URL from environment backend_url = os.getenv("BACKEND_API_URL", "").rstrip("/") - project_name = os.getenv("PROJECT_NAME", "").strip() or os.getenv("AGENTIC_SESSION_NAMESPACE", "").strip() - + project_name = ( + os.getenv("PROJECT_NAME", "").strip() + or os.getenv("AGENTIC_SESSION_NAMESPACE", "").strip() + ) + if not backend_url or not project_name: - logger.error("Cannot auto-execute INITIAL_PROMPT: BACKEND_API_URL or PROJECT_NAME not set") + logger.error( + "Cannot auto-execute INITIAL_PROMPT: BACKEND_API_URL or PROJECT_NAME not set" + ) return - + # BACKEND_API_URL already includes /api suffix from operator - url = f"{backend_url}/projects/{project_name}/agentic-sessions/{session_id}/agui/run" + url = ( + f"{backend_url}/projects/{project_name}/agentic-sessions/{session_id}/agui/run" + ) logger.info(f"Auto-execution URL: {url}") - + payload = { "threadId": session_id, "runId": str(uuid.uuid4()), - "messages": [{ - "id": str(uuid.uuid4()), - "role": "user", - "content": prompt, - "metadata": { - "hidden": True, - "autoSent": True, - "source": "runner_initial_prompt" + "messages": [ + { + "id": str(uuid.uuid4()), + "role": "user", + "content": prompt, + "metadata": { + "hidden": True, + "autoSent": True, + "source": "runner_initial_prompt", + }, } - }] + ], } - + # Get BOT_TOKEN for auth bot_token = os.getenv("BOT_TOKEN", "").strip() headers = {"Content-Type": "application/json"} if bot_token: headers["Authorization"] = f"Bearer {bot_token}" - + try: async with aiohttp.ClientSession() as session: - async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp: + async with session.post( + url, + json=payload, + headers=headers, + timeout=aiohttp.ClientTimeout(total=30), + ) as resp: if resp.status == 200: result = await resp.json() logger.info(f"INITIAL_PROMPT auto-execution started: {result}") else: error_text = await resp.text() - logger.warning(f"INITIAL_PROMPT failed with status {resp.status}: {error_text[:200]}") + logger.warning( + f"INITIAL_PROMPT failed with status {resp.status}: {error_text[:200]}" + ) except Exception as e: logger.warning(f"INITIAL_PROMPT auto-execution error (backend will retry): {e}") - -app = FastAPI( - title="Claude Code AG-UI Server", - version="0.2.0", - lifespan=lifespan -) +app = FastAPI(title="Claude Code AG-UI Server", version="0.2.0", lifespan=lifespan) # Track if adapter has been initialized @@ -201,41 +221,45 @@ async def auto_execute_initial_prompt(prompt: str, session_id: str): async def run_agent(input_data: RunnerInput, request: Request): """ AG-UI compatible run endpoint. - + Accepts flexible input with thread_id, run_id, messages. Optional fields: state, tools, context, forwardedProps. Returns SSE stream of AG-UI events. """ global _adapter_initialized - + if not adapter: raise HTTPException(status_code=503, detail="Adapter not initialized") - + # Convert to official RunAgentInput run_agent_input = input_data.to_run_agent_input() - + # Get Accept header for encoder accept_header = request.headers.get("accept", "text/event-stream") encoder = EventEncoder(accept=accept_header) - - logger.info(f"Processing run: thread_id={run_agent_input.thread_id}, run_id={run_agent_input.run_id}") - + + logger.info( + f"Processing run: thread_id={run_agent_input.thread_id}, run_id={run_agent_input.run_id}" + ) + async def event_generator(): """Generate AG-UI events from adapter.""" global _adapter_initialized - + try: logger.info("Event generator started") - + # Initialize adapter on first run if not _adapter_initialized: - logger.info("First run - initializing adapter with workspace preparation") + logger.info( + "First run - initializing adapter with workspace preparation" + ) await adapter.initialize(context) logger.info("Adapter initialization complete") _adapter_initialized = True - + logger.info("Starting adapter.process_run()...") - + # Process the run (creates fresh client each time) async for event in adapter.process_run(run_agent_input): logger.debug(f"Yielding run event: {event.type}") @@ -245,21 +269,22 @@ async def event_generator(): logger.error(f"Error in event generator: {e}") # Yield error event from ag_ui.core import RunErrorEvent, EventType + error_event = RunErrorEvent( type=EventType.RUN_ERROR, thread_id=run_agent_input.thread_id or context.session_id, run_id=run_agent_input.run_id or "unknown", - message=str(e) + message=str(e), ) yield encoder.encode(error_event) - + return StreamingResponse( event_generator(), media_type=encoder.get_content_type(), headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", - } + }, ) @@ -267,19 +292,19 @@ async def event_generator(): async def interrupt_run(): """ Interrupt the current Claude SDK execution. - + Sends interrupt signal to Claude subprocess to stop mid-execution. See: https://platform.claude.com/docs/en/agent-sdk/python#methods """ if not adapter: raise HTTPException(status_code=503, detail="Adapter not initialized") - + logger.info("Interrupt request received") - + try: # Call adapter's interrupt method which signals the active Claude SDK client await adapter.interrupt() - + return {"message": "Interrupt signal sent to Claude SDK"} except Exception as e: logger.error(f"Interrupt failed: {e}") @@ -288,6 +313,7 @@ async def interrupt_run(): class FeedbackEvent(BaseModel): """AG-UI META event for user feedback (thumbs up/down).""" + type: str # "META" metaType: str # "thumbs_up" or "thumbs_down" payload: Dict[str, Any] @@ -299,20 +325,24 @@ class FeedbackEvent(BaseModel): async def handle_feedback(event: FeedbackEvent): """ Handle user feedback META events and send to Langfuse. - + This endpoint receives thumbs up/down feedback from the frontend (via backend) and logs it to Langfuse for observability tracking. - + See: https://docs.ag-ui.com/drafts/meta-events#user-feedback """ - logger.info(f"Feedback received: {event.metaType} from {event.payload.get('userId', 'unknown')}") - + logger.info( + f"Feedback received: {event.metaType} from {event.payload.get('userId', 'unknown')}" + ) + if event.type != "META": raise HTTPException(status_code=400, detail="Expected META event type") - + if event.metaType not in ("thumbs_up", "thumbs_down"): - raise HTTPException(status_code=400, detail="metaType must be 'thumbs_up' or 'thumbs_down'") - + raise HTTPException( + status_code=400, detail="metaType must be 'thumbs_up' or 'thumbs_down'" + ) + try: # Extract payload fields payload = event.payload @@ -320,17 +350,19 @@ async def handle_feedback(event: FeedbackEvent): project_name = payload.get("projectName", "") session_name = payload.get("sessionName", "") message_id = payload.get("messageId", "") - trace_id = payload.get("traceId", "") # Langfuse trace ID for specific turn association + trace_id = payload.get( + "traceId", "" + ) # Langfuse trace ID for specific turn association comment = payload.get("comment", "") reason = payload.get("reason", "") workflow = payload.get("workflow", "") context_str = payload.get("context", "") include_transcript = payload.get("includeTranscript", False) transcript = payload.get("transcript", []) - + # Map metaType to boolean value (True = positive, False = negative) value = True if event.metaType == "thumbs_up" else False - + # Build comment string with context comment_parts = [] if comment: @@ -345,27 +377,31 @@ async def handle_feedback(event: FeedbackEvent): for m in transcript ) comment_parts.append(f"\nFull Transcript:\n{transcript_text}") - + feedback_comment = "\n".join(comment_parts) if comment_parts else None - + # Send to Langfuse if configured - langfuse_enabled = os.getenv("LANGFUSE_ENABLED", "").strip().lower() in ("1", "true", "yes") - + langfuse_enabled = os.getenv("LANGFUSE_ENABLED", "").strip().lower() in ( + "1", + "true", + "yes", + ) + if langfuse_enabled: try: from langfuse import Langfuse - + public_key = os.getenv("LANGFUSE_PUBLIC_KEY", "").strip() secret_key = os.getenv("LANGFUSE_SECRET_KEY", "").strip() host = os.getenv("LANGFUSE_HOST", "").strip() - + if public_key and secret_key and host: langfuse = Langfuse( public_key=public_key, secret_key=secret_key, host=host, ) - + # Build metadata for structured filtering in Langfuse UI metadata = { "project": project_name, @@ -377,7 +413,7 @@ async def handle_feedback(event: FeedbackEvent): metadata["workflow"] = workflow if message_id: metadata["messageId"] = message_id - + # Create score directly using create_score() API # Prefer trace_id (specific turn) over session_id (whole session) # Langfuse expects trace_id OR session_id, not both @@ -389,15 +425,19 @@ async def handle_feedback(event: FeedbackEvent): comment=feedback_comment, metadata=metadata, ) - + # Flush immediately to ensure feedback is sent langfuse.flush() - + # Log success after flush completes if trace_id: - logger.info(f"Langfuse: Feedback score sent successfully (trace_id={trace_id}, value={value})") + logger.info( + f"Langfuse: Feedback score sent successfully (trace_id={trace_id}, value={value})" + ) else: - logger.info(f"Langfuse: Feedback score sent successfully (session={session_name}, value={value})") + logger.info( + f"Langfuse: Feedback score sent successfully (session={session_name}, value={value})" + ) else: logger.warning("Langfuse enabled but missing credentials") except ImportError: @@ -405,14 +445,16 @@ async def handle_feedback(event: FeedbackEvent): except Exception as e: logger.error(f"Failed to send feedback to Langfuse: {e}", exc_info=True) else: - logger.info("Langfuse not enabled - feedback logged but not sent to Langfuse") - + logger.info( + "Langfuse not enabled - feedback logged but not sent to Langfuse" + ) + return { "message": "Feedback received", "metaType": event.metaType, "recorded": langfuse_enabled, } - + except Exception as e: logger.error(f"Error processing feedback: {e}") raise HTTPException(status_code=500, detail=str(e)) @@ -421,19 +463,21 @@ async def handle_feedback(event: FeedbackEvent): def _check_mcp_authentication(server_name: str) -> tuple[bool | None, str | None]: """ Check if credentials are available for known MCP servers. - + Returns: Tuple of (is_authenticated, auth_message) Returns (None, None) for servers we don't know how to check """ from pathlib import Path - + # Google Workspace MCP - we know how to check this if server_name == "google-workspace": # Check mounted secret location first, then workspace copy secret_path = Path("/app/.google_workspace_mcp/credentials/credentials.json") - workspace_path = Path("/workspace/.google_workspace_mcp/credentials/credentials.json") - + workspace_path = Path( + "/workspace/.google_workspace_mcp/credentials/credentials.json" + ) + for cred_path in [workspace_path, secret_path]: if cred_path.exists(): try: @@ -442,19 +486,19 @@ def _check_mcp_authentication(server_name: str) -> tuple[bool | None, str | None except OSError: pass return False, "Google OAuth not configured - authenticate via Integrations page" - + # Jira/Atlassian MCP - we know how to check this if server_name in ("mcp-atlassian", "jira"): jira_url = os.getenv("JIRA_URL", "").strip() jira_token = os.getenv("JIRA_API_TOKEN", "").strip() - + if jira_url and jira_token: return True, "Jira credentials configured" elif jira_url: return False, "Jira URL set but API token missing" else: return False, "Jira not configured - set credentials in Workspace Settings" - + # For all other servers (webfetch, unknown) - don't claim to know auth status return None, None @@ -464,127 +508,138 @@ async def get_mcp_status(): """ Returns MCP servers configured for this session with authentication status. Goes straight to the source - uses adapter's _load_mcp_config() method. - + For known integrations (Google, Jira), also checks if credentials are present. """ try: global adapter - + if not adapter: return { "servers": [], "totalCount": 0, - "message": "Adapter not initialized yet" + "message": "Adapter not initialized yet", } - + mcp_servers_list = [] - + # Get the working directory (same logic as adapter uses) - workspace_path = adapter.context.workspace_path if adapter.context else "/workspace" - - active_workflow_url = os.getenv('ACTIVE_WORKFLOW_GIT_URL', '').strip() + workspace_path = ( + adapter.context.workspace_path if adapter.context else "/workspace" + ) + + active_workflow_url = os.getenv("ACTIVE_WORKFLOW_GIT_URL", "").strip() cwd_path = workspace_path - + if active_workflow_url: workflow_name = active_workflow_url.split("/")[-1].removesuffix(".git") workflow_path = os.path.join(workspace_path, "workflows", workflow_name) if os.path.exists(workflow_path): cwd_path = workflow_path - + # Use adapter's method to load MCP config (same as it does during runs) mcp_config = adapter._load_mcp_config(cwd_path) logger.info(f"MCP config: {mcp_config}") - + if mcp_config: for server_name, server_config in mcp_config.items(): # Check authentication status for known servers (Google, Jira) is_authenticated, auth_message = _check_mcp_authentication(server_name) - + # Platform servers are built-in (webfetch), workflow servers come from config is_platform = server_name == "webfetch" - + server_info = { "name": server_name, - "displayName": server_name.replace('-', ' ').replace('_', ' ').title(), + "displayName": server_name.replace("-", " ") + .replace("_", " ") + .title(), "status": "configured", "command": server_config.get("command", ""), - "source": "platform" if is_platform else "workflow" + "source": "platform" if is_platform else "workflow", } - + # Only include auth fields for servers we know how to check if is_authenticated is not None: server_info["authenticated"] = is_authenticated server_info["authMessage"] = auth_message - + mcp_servers_list.append(server_info) - + return { "servers": mcp_servers_list, "totalCount": len(mcp_servers_list), - "note": "Status shows 'configured' - check 'authenticated' field for credential status" + "note": "Status shows 'configured' - check 'authenticated' field for credential status", } - + except Exception as e: logger.error(f"Failed to get MCP status: {e}", exc_info=True) - return { - "servers": [], - "totalCount": 0, - "error": str(e) - } + return {"servers": [], "totalCount": 0, "error": str(e)} -async def clone_workflow_at_runtime(git_url: str, branch: str, subpath: str) -> tuple[bool, str]: +async def clone_workflow_at_runtime( + git_url: str, branch: str, subpath: str +) -> tuple[bool, str]: """ Clone a workflow repository at runtime. - + This mirrors the logic in hydrate.sh but runs when workflows are changed after the pod has started. - + Returns: (success, workflow_dir_path) tuple """ import tempfile import shutil from pathlib import Path - + if not git_url: return False, "" - + # Derive workflow name from URL workflow_name = git_url.split("/")[-1].removesuffix(".git") workspace_path = os.getenv("WORKSPACE_PATH", "/workspace") workflow_final = Path(workspace_path) / "workflows" / workflow_name - + logger.info(f"Cloning workflow '{workflow_name}' from {git_url}@{branch}") if subpath: logger.info(f" Subpath: {subpath}") - + # Create temp directory for clone temp_dir = Path(tempfile.mkdtemp(prefix="workflow-clone-")) - + try: # Build git clone command with optional auth token github_token = os.getenv("GITHUB_TOKEN", "").strip() gitlab_token = os.getenv("GITLAB_TOKEN", "").strip() - + # Determine which token to use based on URL clone_url = git_url if github_token and "github" in git_url.lower(): - clone_url = git_url.replace("https://", f"https://x-access-token:{github_token}@") + clone_url = git_url.replace( + "https://", f"https://x-access-token:{github_token}@" + ) logger.info("Using GITHUB_TOKEN for workflow authentication") elif gitlab_token and "gitlab" in git_url.lower(): clone_url = git_url.replace("https://", f"https://oauth2:{gitlab_token}@") logger.info("Using GITLAB_TOKEN for workflow authentication") - + # Clone the repository process = await asyncio.create_subprocess_exec( - "git", "clone", "--branch", branch, "--single-branch", "--depth", "1", - clone_url, str(temp_dir), + "git", + "clone", + "--branch", + branch, + "--single-branch", + "--depth", + "1", + clone_url, + str(temp_dir), stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() - + if process.returncode != 0: # Redact tokens from error message error_msg = stderr.decode() @@ -594,9 +649,9 @@ async def clone_workflow_at_runtime(git_url: str, branch: str, subpath: str) -> error_msg = error_msg.replace(gitlab_token, "***REDACTED***") logger.error(f"Failed to clone workflow: {error_msg}") return False, "" - + logger.info("Clone successful, processing...") - + # Handle subpath extraction if subpath: subpath_full = temp_dir / subpath @@ -619,10 +674,10 @@ async def clone_workflow_at_runtime(git_url: str, branch: str, subpath: str) -> if workflow_final.exists(): shutil.rmtree(workflow_final) shutil.move(str(temp_dir), str(workflow_final)) - + logger.info(f"Workflow '{workflow_name}' ready at {workflow_final}") return True, str(workflow_final) - + except Exception as e: logger.error(f"Error cloning workflow: {e}") return False, "" @@ -636,19 +691,19 @@ async def clone_workflow_at_runtime(git_url: str, branch: str, subpath: str) -> async def change_workflow(request: Request): """ Change active workflow - triggers Claude SDK client restart and new greeting. - + Accepts: {"gitUrl": "...", "branch": "...", "path": "..."} """ global _adapter_initialized - + if not adapter: raise HTTPException(status_code=503, detail="Adapter not initialized") - + body = await request.json() git_url = (body.get("gitUrl") or "").strip() branch = (body.get("branch") or "main").strip() or "main" path = (body.get("path") or "").strip() - + logger.info(f"Workflow change request: {git_url}@{branch} (path: {path})") async with _workflow_change_lock: @@ -662,14 +717,23 @@ async def change_workflow(request: Request): and current_path == path ): logger.info("Workflow unchanged; skipping reinit and greeting") - return {"message": "Workflow already active", "gitUrl": git_url, "branch": branch, "path": path} + return { + "message": "Workflow already active", + "gitUrl": git_url, + "branch": branch, + "path": path, + } # Clone the workflow repository at runtime # This is needed because the init container only runs once at pod startup if git_url: - success, workflow_path = await clone_workflow_at_runtime(git_url, branch, path) + success, workflow_path = await clone_workflow_at_runtime( + git_url, branch, path + ) if not success: - logger.warning("Failed to clone workflow, will use default workflow directory") + logger.warning( + "Failed to clone workflow, will use default workflow directory" + ) # Update environment variables os.environ["ACTIVE_WORKFLOW_GIT_URL"] = git_url @@ -686,7 +750,12 @@ async def change_workflow(request: Request): # This runs in background via backend POST asyncio.create_task(trigger_workflow_greeting(git_url, branch, path)) - return {"message": "Workflow updated", "gitUrl": git_url, "branch": branch, "path": path} + return { + "message": "Workflow updated", + "gitUrl": git_url, + "branch": branch, + "path": path, + } async def get_default_branch(repo_path: str) -> str: @@ -706,9 +775,13 @@ async def get_default_branch(repo_path: str) -> str: """ # Method 1: symbolic-ref (fast but may not be set) process = await asyncio.create_subprocess_exec( - "git", "-C", str(repo_path), "symbolic-ref", "refs/remotes/origin/HEAD", + "git", + "-C", + str(repo_path), + "symbolic-ref", + "refs/remotes/origin/HEAD", stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode == 0: @@ -720,9 +793,14 @@ async def get_default_branch(repo_path: str) -> str: # Method 2: remote show origin (more reliable) process = await asyncio.create_subprocess_exec( - "git", "-C", str(repo_path), "remote", "show", "origin", + "git", + "-C", + str(repo_path), + "remote", + "show", + "origin", stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode == 0: @@ -737,9 +815,14 @@ async def get_default_branch(repo_path: str) -> str: # Method 3: Try common default branch names for candidate in ["main", "master", "develop"]: process = await asyncio.create_subprocess_exec( - "git", "-C", str(repo_path), "rev-parse", "--verify", f"origin/{candidate}", + "git", + "-C", + str(repo_path), + "rev-parse", + "--verify", + f"origin/{candidate}", stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) await process.communicate() if process.returncode == 0: @@ -751,7 +834,9 @@ async def get_default_branch(repo_path: str) -> str: return "main" -async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[bool, str, bool]: +async def clone_repo_at_runtime( + git_url: str, branch: str, name: str +) -> tuple[bool, str, bool]: """ Clone a repository at runtime or add a new branch to existing repo. @@ -787,7 +872,9 @@ async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[b # Generate unique branch name if not specified (only if user didn't provide one) # IMPORTANT: Keep in sync with backend (sessions.go) and frontend (add-context-modal.tsx) if not branch or branch.strip() == "": - session_id = os.getenv("AGENTIC_SESSION_NAME", "").strip() or os.getenv("SESSION_ID", "unknown") + session_id = os.getenv("AGENTIC_SESSION_NAME", "").strip() or os.getenv( + "SESSION_ID", "unknown" + ) branch = f"ambient/{session_id}" logger.info(f"No branch specified, auto-generated: {branch}") @@ -802,7 +889,9 @@ async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[b gitlab_token = os.getenv("GITLAB_TOKEN", "").strip() clone_url = git_url if github_token and "github" in git_url.lower(): - clone_url = git_url.replace("https://", f"https://x-access-token:{github_token}@") + clone_url = git_url.replace( + "https://", f"https://x-access-token:{github_token}@" + ) logger.info("Using GITHUB_TOKEN for authentication") elif gitlab_token and "gitlab" in git_url.lower(): clone_url = git_url.replace("https://", f"https://oauth2:{gitlab_token}@") @@ -810,21 +899,31 @@ async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[b # Case 1: Repo already exists - add new branch if repo_final.exists(): - logger.info(f"Repo '{name}' already exists at {repo_final}, adding branch '{branch}'") + logger.info( + f"Repo '{name}' already exists at {repo_final}, adding branch '{branch}'" + ) try: # Fetch latest refs process = await asyncio.create_subprocess_exec( - "git", "-C", str(repo_final), "fetch", "origin", + "git", + "-C", + str(repo_final), + "fetch", + "origin", stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) await process.communicate() # Try to checkout the branch process = await asyncio.create_subprocess_exec( - "git", "-C", str(repo_final), "checkout", branch, + "git", + "-C", + str(repo_final), + "checkout", + branch, stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() @@ -835,9 +934,15 @@ async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[b # Branch doesn't exist locally, try to checkout from remote logger.info(f"Branch '{branch}' not found locally, trying origin/{branch}") process = await asyncio.create_subprocess_exec( - "git", "-C", str(repo_final), "checkout", "-b", branch, f"origin/{branch}", + "git", + "-C", + str(repo_final), + "checkout", + "-b", + branch, + f"origin/{branch}", stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() @@ -846,24 +951,35 @@ async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[b return True, str(repo_final), False # Already existed, not newly cloned # Branch doesn't exist remotely, create from default branch - logger.info(f"Branch '{branch}' not found on remote, creating from default branch") + logger.info( + f"Branch '{branch}' not found on remote, creating from default branch" + ) # Get default branch using robust detection default_branch = await get_default_branch(str(repo_final)) # Checkout default branch first process = await asyncio.create_subprocess_exec( - "git", "-C", str(repo_final), "checkout", default_branch, + "git", + "-C", + str(repo_final), + "checkout", + default_branch, stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) await process.communicate() # Create new branch from default process = await asyncio.create_subprocess_exec( - "git", "-C", str(repo_final), "checkout", "-b", branch, + "git", + "-C", + str(repo_final), + "checkout", + "-b", + branch, stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() @@ -888,10 +1004,12 @@ async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[b # Clone without --single-branch to support multi-branch workflows # No --depth=1 to allow full branch operations process = await asyncio.create_subprocess_exec( - "git", "clone", - clone_url, str(temp_dir), + "git", + "clone", + clone_url, + str(temp_dir), stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() @@ -908,9 +1026,13 @@ async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[b # Try to checkout requested/auto-generated branch process = await asyncio.create_subprocess_exec( - "git", "-C", str(temp_dir), "checkout", branch, + "git", + "-C", + str(temp_dir), + "checkout", + branch, stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() @@ -918,13 +1040,17 @@ async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[b # Branch doesn't exist, create it from default branch logger.info(f"Branch '{branch}' not found, creating from default branch") process = await asyncio.create_subprocess_exec( - "git", "-C", str(temp_dir), "checkout", "-b", branch, + "git", + "-C", + str(temp_dir), + "checkout", + "-b", + branch, stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) await process.communicate() - # Move to final location logger.info("Moving to final location...") repo_final.parent.mkdir(parents=True, exist_ok=True) @@ -945,50 +1071,65 @@ async def trigger_workflow_greeting(git_url: str, branch: str, path: str): """Trigger workflow greeting after workflow change.""" import uuid import aiohttp - + # Wait a moment for workflow to be cloned/initialized await asyncio.sleep(3) - + logger.info("Triggering workflow greeting...") - + try: backend_url = os.getenv("BACKEND_API_URL", "").rstrip("/") project_name = os.getenv("AGENTIC_SESSION_NAMESPACE", "").strip() session_id = context.session_id if context else "unknown" - + if not backend_url or not project_name: - logger.error("Cannot trigger workflow greeting: BACKEND_API_URL or PROJECT_NAME not set") + logger.error( + "Cannot trigger workflow greeting: BACKEND_API_URL or PROJECT_NAME not set" + ) return - + url = f"{backend_url}/projects/{project_name}/agentic-sessions/{session_id}/agui/run" - + # Extract workflow name for greeting workflow_name = git_url.split("/")[-1].removesuffix(".git") if path: workflow_name = path.split("/")[-1] - - greeting = f"Greet the user and explain that the {workflow_name} workflow is now active. Briefly describe what this workflow helps with based on the systemPrompt in ambient.json. Keep it concise and friendly." - + + # Use proper nomenclature for SpecKit workflows + display_name = ( + "SpecKit" + if "speckit" in workflow_name.lower() or "spec-kit" in workflow_name.lower() + else workflow_name + ) + + # Create appropriate greeting based on workflow type + if "speckit" in workflow_name.lower() or "spec-kit" in workflow_name.lower(): + greeting = f"Greet the user and explain that the {display_name} workflow is now active. Briefly describe what this workflow helps with based on the systemPrompt in ambient.json. Mention that they can use commands like /speckit.specify to create specifications, /speckit.plan for implementation planning, /speckit.tasks for task generation, and /speckit.implement to execute the plan. Keep it concise and friendly." + else: + greeting = f"Greet the user and explain that the {display_name} workflow is now active. Briefly describe what this workflow helps with based on the systemPrompt in ambient.json. Keep it concise and friendly." + payload = { "threadId": session_id, "runId": str(uuid.uuid4()), - "messages": [{ - "id": str(uuid.uuid4()), - "role": "user", - "content": greeting, - "metadata": { - "hidden": True, - "autoSent": True, - "source": "workflow_activation" + "messages": [ + { + "id": str(uuid.uuid4()), + "role": "user", + "content": greeting, + "metadata": { + "hidden": True, + "autoSent": True, + "source": "workflow_activation", + }, } - }] + ], } - + bot_token = os.getenv("BOT_TOKEN", "").strip() headers = {"Content-Type": "application/json"} if bot_token: headers["Authorization"] = f"Bearer {bot_token}" - + async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, headers=headers) as resp: if resp.status == 200: @@ -996,8 +1137,10 @@ async def trigger_workflow_greeting(git_url: str, branch: str, path: str): logger.info(f"Workflow greeting started: {result}") else: error_text = await resp.text() - logger.error(f"Workflow greeting failed: {resp.status} - {error_text}") - + logger.error( + f"Workflow greeting failed: {resp.status} - {error_text}" + ) + except Exception as e: logger.error(f"Failed to trigger workflow greeting: {e}") @@ -1006,32 +1149,36 @@ async def trigger_workflow_greeting(git_url: str, branch: str, path: str): async def add_repo(request: Request): """ Add repository - clones repo and triggers Claude SDK client restart. - + Accepts: {"url": "...", "branch": "...", "name": "..."} """ global _adapter_initialized - + if not adapter: raise HTTPException(status_code=503, detail="Adapter not initialized") - + body = await request.json() url = body.get("url", "") branch = body.get("branch", "main") name = body.get("name", "") - + logger.info(f"Add repo request: url={url}, branch={branch}, name={name}") - + if not url: raise HTTPException(status_code=400, detail="Repository URL is required") - + # Derive name from URL if not provided if not name: name = url.split("/")[-1].removesuffix(".git") - + # Clone the repository at runtime - success, repo_path, was_newly_cloned = await clone_repo_at_runtime(url, branch, name) + success, repo_path, was_newly_cloned = await clone_repo_at_runtime( + url, branch, name + ) if not success: - raise HTTPException(status_code=500, detail=f"Failed to clone repository: {url}") + raise HTTPException( + status_code=500, detail=f"Failed to clone repository: {url}" + ) # Only update state and trigger notification if repo was newly cloned # This prevents duplicate notifications when both backend and operator call this endpoint @@ -1044,13 +1191,7 @@ async def add_repo(request: Request): repos = [] # Add new repo - repos.append({ - "name": name, - "input": { - "url": url, - "branch": branch - } - }) + repos.append({"name": name, "input": {"url": url, "branch": branch}}) os.environ["REPOS_JSON"] = json.dumps(repos) @@ -1058,59 +1199,72 @@ async def add_repo(request: Request): _adapter_initialized = False adapter._first_run = True - logger.info(f"Repo '{name}' added and cloned, adapter will reinitialize on next run") + logger.info( + f"Repo '{name}' added and cloned, adapter will reinitialize on next run" + ) # Trigger a notification to Claude about the new repository asyncio.create_task(trigger_repo_added_notification(name, url)) else: - logger.info(f"Repo '{name}' already existed, skipping notification (idempotent call)") + logger.info( + f"Repo '{name}' already existed, skipping notification (idempotent call)" + ) - return {"message": "Repository added", "name": name, "path": repo_path, "newly_cloned": was_newly_cloned} + return { + "message": "Repository added", + "name": name, + "path": repo_path, + "newly_cloned": was_newly_cloned, + } async def trigger_repo_added_notification(repo_name: str, repo_url: str): """Notify Claude that a repository has been added.""" import uuid import aiohttp - + # Wait a moment for repo to be fully ready await asyncio.sleep(1) - + logger.info(f"Triggering repo added notification for: {repo_name}") - + try: backend_url = os.getenv("BACKEND_API_URL", "").rstrip("/") project_name = os.getenv("AGENTIC_SESSION_NAMESPACE", "").strip() session_id = context.session_id if context else "unknown" - + if not backend_url or not project_name: - logger.error("Cannot trigger repo notification: BACKEND_API_URL or PROJECT_NAME not set") + logger.error( + "Cannot trigger repo notification: BACKEND_API_URL or PROJECT_NAME not set" + ) return - + url = f"{backend_url}/projects/{project_name}/agentic-sessions/{session_id}/agui/run" - + notification = f"The repository '{repo_name}' has been added to your workspace. You can now access it at the path 'repos/{repo_name}/'. Please acknowledge this to the user and let them know you can now read and work with files in this repository." - + payload = { "threadId": session_id, "runId": str(uuid.uuid4()), - "messages": [{ - "id": str(uuid.uuid4()), - "role": "user", - "content": notification, - "metadata": { - "hidden": True, - "autoSent": True, - "source": "repo_added" + "messages": [ + { + "id": str(uuid.uuid4()), + "role": "user", + "content": notification, + "metadata": { + "hidden": True, + "autoSent": True, + "source": "repo_added", + }, } - }] + ], } - + bot_token = os.getenv("BOT_TOKEN", "").strip() headers = {"Content-Type": "application/json"} if bot_token: headers["Authorization"] = f"Bearer {bot_token}" - + async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, headers=headers) as resp: if resp.status == 200: @@ -1118,8 +1272,10 @@ async def trigger_repo_added_notification(repo_name: str, repo_url: str): logger.info(f"Repo notification sent: {result}") else: error_text = await resp.text() - logger.error(f"Repo notification failed: {resp.status} - {error_text}") - + logger.error( + f"Repo notification failed: {resp.status} - {error_text}" + ) + except Exception as e: logger.error(f"Failed to trigger repo notification: {e}") @@ -1153,7 +1309,9 @@ async def remove_repo(request: Request): logger.info(f"Deleted repository directory: {repo_path}") except Exception as e: logger.error(f"Failed to delete repository directory {repo_path}: {e}") - raise HTTPException(status_code=500, detail=f"Failed to delete repository: {e}") + raise HTTPException( + status_code=500, detail=f"Failed to delete repository: {e}" + ) else: logger.warning(f"Repository directory not found: {repo_path}") @@ -1214,45 +1372,67 @@ async def get_repos_status(): # Get remote URL process = await asyncio.create_subprocess_exec( - "git", "-C", str(repo_path), "config", "--get", "remote.origin.url", + "git", + "-C", + str(repo_path), + "config", + "--get", + "remote.origin.url", stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() repo_url = stdout.decode().strip() if process.returncode == 0 else "" # Strip any embedded tokens from URL before returning (security) # Remove patterns like: https://x-access-token:TOKEN@github.com -> https://github.com - repo_url = re.sub(r'https://[^:]+:[^@]+@', 'https://', repo_url) + repo_url = re.sub(r"https://[^:]+:[^@]+@", "https://", repo_url) # Get current active branch process = await asyncio.create_subprocess_exec( - "git", "-C", str(repo_path), "rev-parse", "--abbrev-ref", "HEAD", + "git", + "-C", + str(repo_path), + "rev-parse", + "--abbrev-ref", + "HEAD", stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() - current_branch = stdout.decode().strip() if process.returncode == 0 else "unknown" + current_branch = ( + stdout.decode().strip() if process.returncode == 0 else "unknown" + ) # Get all local branches process = await asyncio.create_subprocess_exec( - "git", "-C", str(repo_path), "branch", "--format=%(refname:short)", + "git", + "-C", + str(repo_path), + "branch", + "--format=%(refname:short)", stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() - branches = [b.strip() for b in stdout.decode().split("\n") if b.strip()] if process.returncode == 0 else [] + branches = ( + [b.strip() for b in stdout.decode().split("\n") if b.strip()] + if process.returncode == 0 + else [] + ) # Get default branch using robust detection default_branch = await get_default_branch(str(repo_path)) - repos_status.append({ - "url": repo_url, - "name": repo_name, - "branches": branches, - "currentActiveBranch": current_branch, - "defaultBranch": default_branch, - }) + repos_status.append( + { + "url": repo_url, + "name": repo_name, + "branches": branches, + "currentActiveBranch": current_branch, + "defaultBranch": default_branch, + } + ) except Exception as e: logger.error(f"Error getting status for repo {repo_path}: {e}") @@ -1274,9 +1454,9 @@ def main(): """Start the AG-UI server.""" port = int(os.getenv("AGUI_PORT", "8000")) host = os.getenv("AGUI_HOST", "0.0.0.0") - + logger.info(f"Starting Claude Code AG-UI server on {host}:{port}") - + uvicorn.run( app, host=host, @@ -1287,4 +1467,3 @@ def main(): if __name__ == "__main__": main() -