generated from rajeev-sr/Project-Template-for-AI-Devlopment
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
74 lines (64 loc) · 2.36 KB
/
main.py
File metadata and controls
74 lines (64 loc) · 2.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from fastapi import FastAPI,HTTPException
from pydantic import BaseModel
from typing import Optional,List,Dict,Any
from app.services.graph_service import execute_graph
from app.celery_worker import process_document
from celery.result import AsyncResult
from datetime import datetime
import random
import string
app = FastAPI(
title="Insurance Claim Analysis",
description="API for Insurance Claim Analysis",
)
@app.get("/")
def health_check():
return {"message":"server is running"}
class ProcessRequest(BaseModel):
# jobId: str
documents: str
questions: List[str]
class Answer(BaseModel):
decision: str
details: Dict[str, Any]
justification: str
clauses: List[str]
class ProcessResponse(BaseModel):
jobId: str
task_id: str
class StatusResponse(BaseModel):
task_id: str
status: str
result: Optional[Any]
@app.post("/process", response_model=ProcessResponse)
async def process_document_and_query(request: ProcessRequest):
if not all([request.documents, request.questions]):
raise HTTPException(status_code=400, detail=" documents (url), and a list of questions are required.")
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
random_suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6))
jobId = f"{timestamp}-{random_suffix}"
try:
task = process_document.delay(jobId, request.documents, request.questions)
return {"jobId": jobId, "task_id": task.id}
except Exception as e:
print(f"An error occurred during graph execution for jobId {jobId}: {e}")
raise HTTPException(status_code=500, detail=f"An internal error occurred: {str(e)}")
@app.get("/status/{task_id}", response_model=StatusResponse)
def get_job_status(task_id: str):
task_result = AsyncResult(task_id, app=process_document.app)
if not task_result:
raise HTTPException(status_code=404, detail="Task not found.")
if task_result.ready():
if task_result.successful():
return {
"task_id": task_id,
"status": "SUCCESS",
"result": task_result.get()
}
else:
return {
"task_id": task_id,
"status": "FAILURE",
"result": {"error": str(task_result.info)}
}
return {"task_id": task_id, "status": "PENDING", "result": None}