-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi.py
More file actions
234 lines (200 loc) · 7.65 KB
/
api.py
File metadata and controls
234 lines (200 loc) · 7.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# Intrabot-Ai/api.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse, FileResponse
from pydantic import BaseModel
from typing import List, Optional
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from retrieval import search
from rbac import filter_hits_by_role
from gemini_client import generate_response
import os
app = FastAPI(title="Intrabot-AI - Retrieval API")
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount static files
app.mount("/static", StaticFiles(directory="frontend"), name="static")
class QueryIn(BaseModel):
query: str
top_k: int = 4
role: str = "employee"
@app.get("/")
def home():
"""Redirect to the frontend interface"""
try:
frontend_path = os.path.join("frontend", "index.html")
if os.path.exists(frontend_path):
return FileResponse(frontend_path)
else:
return {"status": "ok", "msg": "Intrabot-AI retrieval API. Frontend available at /static/index.html"}
except Exception as e:
return {"status": "ok", "msg": "Intrabot-AI retrieval API. Use POST /query"}
@app.get("/app")
def app_redirect():
"""Alternative route to access the frontend"""
frontend_path = os.path.join("frontend", "index.html")
return FileResponse(frontend_path)
class ChatMessage(BaseModel):
role: str # 'user' or 'assistant'
content: str
class ChatRequest(BaseModel):
messages: List[ChatMessage]
role: str = "employee"
class AnswerRequest(BaseModel):
query: str
top_k: int = 4
role: str = "employee"
use_llm: bool = True
@app.post("/query")
async def do_query(payload: QueryIn):
try:
# Run vector search
res = search(payload.query, top_k=payload.top_k)
docs = res.get("documents", [[]])[0]
metas = res.get("metadatas", [[]])[0]
dists = res.get("distances", [[]])[0]
hits = []
for i, doc in enumerate(docs):
meta = metas[i] if i < len(metas) else {}
dist = dists[i] if i < len(dists) else None
hits.append({
"source": meta.get("source"),
"text": doc,
"distance": float(dist) if dist is not None else None
})
# Apply RBAC filter
filtered_hits = filter_hits_by_role(hits, payload.role)
# Extract context from search results
context = "\n".join([hit["text"] for hit in filtered_hits])
# Generate response using Gemini
ai_response = generate_response(query=payload.query, context=context)
return {
"query": payload.query,
"role": payload.role,
"hits": filtered_hits,
"response": ai_response
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/answer")
async def answer_endpoint(payload: AnswerRequest):
"""Legacy endpoint for frontend compatibility"""
try:
# Run vector search
res = search(payload.query, top_k=payload.top_k)
docs = res.get("documents", [[]])[0]
metas = res.get("metadatas", [[]])[0]
dists = res.get("distances", [[]])[0]
hits = []
sources = []
for i, doc in enumerate(docs):
meta = metas[i] if i < len(metas) else {}
dist = dists[i] if i < len(dists) else None
source = meta.get("source", "unknown")
hits.append({
"source": source,
"text": doc,
"distance": float(dist) if dist is not None else None
})
if source not in sources:
sources.append(source)
# Apply RBAC filter
filtered_hits = filter_hits_by_role(hits, payload.role)
# Extract context from search results
context = "\n".join([hit["text"] for hit in filtered_hits])
# Generate response using Gemini if requested
if payload.use_llm and context:
try:
ai_response = generate_response(query=payload.query, context=context)
# If response starts with "Error", fallback to basic response
if ai_response.startswith("Error"):
ai_response = f"Based on the available information: {context[:200]}..."
except Exception:
ai_response = f"Based on the available information: {context[:200]}..."
else:
ai_response = f"Found {len(filtered_hits)} relevant document(s) for your query."
return {
"answer": ai_response,
"sources": sources,
"hits": filtered_hits,
"source": "online" if payload.use_llm else "offline"
}
except Exception as e:
return {
"answer": "I'm sorry, I encountered an error while processing your request. Please try again.",
"sources": [],
"hits": [],
"source": "error"
}
@app.get("/actions/leave_request")
async def leave_request_steps():
"""Provide leave request steps"""
return {
"steps": [
"Log into the HR portal using your employee credentials",
"Navigate to the 'Leave Management' section",
"Click on 'Submit New Leave Request'",
"Select the type of leave (Annual, Sick, Personal, etc.)",
"Choose your leave start and end dates",
"Enter a brief reason for your leave",
"Upload any required supporting documents",
"Submit the request for manager approval",
"Check your email for approval notifications"
]
}
@app.get("/financial-news")
async def get_financial_news():
"""Provide sample financial news"""
return {
"news": [
{
"title": "Market Update: Tech Stocks Rally",
"summary": "Technology sector shows strong performance with AI companies leading gains",
"category": "markets",
"timestamp": "2 hours ago"
},
{
"title": "Federal Reserve Policy Decision",
"summary": "Central bank maintains current interest rate amid economic stability",
"category": "policy",
"timestamp": "4 hours ago"
},
{
"title": "Quarterly Earnings Preview",
"summary": "Major corporations prepare to release Q4 earnings reports",
"category": "earnings",
"timestamp": "6 hours ago"
}
]
}
@app.post("/chat")
async def chat_endpoint(chat_request: ChatRequest):
try:
# Format conversation history for the model
conversation = "\n".join(
[f"{msg.role}: {msg.content}" for msg in chat_request.messages]
)
# Get the latest user message
user_message = next(
(msg.content for msg in reversed(chat_request.messages) if msg.role == "user"),
""
)
if not user_message:
raise HTTPException(status_code=400, detail="No user message found in chat history")
# Generate response using Gemini with conversation history as context
ai_response = generate_response(
query=user_message,
context=f"Conversation history:\n{conversation}"
)
return {
"role": "assistant",
"content": ai_response
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))