-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbackend.py
More file actions
134 lines (121 loc) · 4.03 KB
/
backend.py
File metadata and controls
134 lines (121 loc) · 4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import asyncio
import logging
import uuid
from fastapi import FastAPI, WebSocket, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Optional
from crewai import Agent, Task, Crew
def get_available_models():
"""Query Ollama for available models"""
from ollama import Client
client = Client(host='http://localhost:11434')
return [model['model'] for model in client.list()['models']]
app = FastAPI()
# Data models for incoming requests
class AgentConfig(BaseModel):
role: str
goal: str
backstory: str
model: str
class TaskConfig(BaseModel):
description: str
expected_output: str
agent_id: str
class CrewConfig(BaseModel):
name: str
agent_ids: List[str]
task_ids: List[str]
process: str = "sequential"
inputs: Dict[str, str] = {}
# State models and template stores
class ExecutionState(BaseModel):
status: str = "pending"
output: Optional[str] = None
error: Optional[str] = None
active_crews: Dict[str, ExecutionState] = {}
agent_templates: Dict[str, Agent] = {}
task_templates: Dict[str, Task] = {}
# Endpoint to fetch available models (queries Ollama)
@app.get("/models")
def get_models():
return get_available_models()
# Endpoint to create a new Agent
@app.post("/agents")
def create_agent(config: AgentConfig):
agent = Agent(
role=config.role,
goal=config.goal,
backstory=config.backstory,
llm=config.model
)
agent_id = str(uuid.uuid4())
agent_templates[agent_id] = agent
return {"id": agent_id}
# Endpoint to create a new Task; ensures the referenced Agent exists
@app.post("/tasks")
def create_task(task: TaskConfig):
if task.agent_id not in agent_templates:
raise HTTPException(status_code=404, detail="Agent not found")
agent = agent_templates[task.agent_id]
task_instance = Task(
description=task.description,
expected_output=task.expected_output,
agent=agent
)
task_id = str(uuid.uuid4())
task_templates[task_id] = task_instance
return {"id": task_id}
# Endpoint to execute a Crew created from Agents and Tasks
@app.post("/crews/execute")
async def execute_crew(crew: CrewConfig):
crew_id = str(uuid.uuid4())
active_crews[crew_id] = ExecutionState(status="running")
try:
agents = [agent_templates[a_id] for a_id in crew.agent_ids]
tasks = [task_templates[t_id] for t_id in crew.task_ids]
crew_instance = Crew(
agents=agents,
tasks=tasks,
process=crew.process,
verbose=True
)
result = await crew_instance.kickoff(inputs=crew.inputs)
active_crews[crew_id].status = "completed"
active_crews[crew_id].output = result
except Exception as e:
logging.exception("Error executing crew:")
active_crews[crew_id].status = "error"
active_crews[crew_id].error = str(e)
return {"crew_id": crew_id}
# Endpoint to check the status of a crew execution
@app.get("/crews/{crew_id}")
def get_crew_status(crew_id: str):
if crew := active_crews.get(crew_id):
return crew
raise HTTPException(status_code=404, detail="Crew not found")
# WebSocket endpoint for real-time crew updates
@app.websocket("/crew-updates/{crew_id}")
async def websocket_endpoint(websocket: WebSocket, crew_id: str):
await websocket.accept()
try:
while True:
state = active_crews.get(crew_id)
if state is None:
await websocket.send_json({"error": "Crew not found"})
break
await websocket.send_json({
"status": state.status,
"output": state.output,
"error": state.error
})
await asyncio.sleep(1)
except Exception as e:
logging.error("WebSocket error", exc_info=e)
await websocket.close()
@app.get("/api/models")
def get_models():
try:
models = get_available_models()
return {"models": models}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))