-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_handler.py
More file actions
105 lines (85 loc) · 3.19 KB
/
lambda_handler.py
File metadata and controls
105 lines (85 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""
AWS Lambda handler for the Student Loan RAG Chatbot.
Deployment: Lambda container image via ECR.
Trigger: API Gateway HTTP POST /chat
Request body (JSON):
{"message": "What types of student loans are available?"}
Response body (JSON):
{
"response": "...",
"sources": ["...", "..."],
"confidence": 0.82,
"retrieval_scores": [0.91, 0.88, ...]
}
"""
import json
import os
import logging
# Module-level initialization: runs once per container lifecycle (warm starts reuse this).
# Avoids re-connecting to Pinecone + Voyage AI on every request.
logger = logging.getLogger()
logger.setLevel(logging.INFO)
_agent = None
def _get_agent():
"""Lazy-initialize the RAG agent (once per container)."""
global _agent
if _agent is None:
logger.info("Cold start: initializing RAG agent")
from src.llm import LLMChat
from src.vector_db import VectorStore
from src.rag_agent import RAGAgent
llm = LLMChat()
store = VectorStore(index_name=os.environ.get("PINECONE_INDEX_NAME", "student-loans"))
_agent = RAGAgent(vector_store=store, llm=llm)
logger.info("RAG agent initialized successfully")
return _agent
def handler(event, context):
"""
AWS Lambda entry point.
Args:
event: API Gateway proxy event dict
context: Lambda context object (request ID, timeout, etc.)
Returns:
API Gateway proxy response dict
"""
logger.info("Request received: %s", json.dumps({
"requestId": context.aws_request_id,
"path": event.get("path", "/"),
"httpMethod": event.get("httpMethod", "UNKNOWN"),
}))
# Handle CORS preflight
if event.get("httpMethod") == "OPTIONS":
return _cors_response(200, "")
# Parse request body
try:
body = json.loads(event.get("body") or "{}")
message = body.get("message", "").strip()
except (json.JSONDecodeError, AttributeError) as e:
logger.warning("Bad request body: %s", e)
return _cors_response(400, json.dumps({"error": "Invalid JSON body"}))
if not message:
return _cors_response(400, json.dumps({"error": "Missing 'message' field"}))
if len(message) > 2000:
return _cors_response(400, json.dumps({"error": "Message too long (max 2000 chars)"}))
# Run RAG pipeline
try:
agent = _get_agent()
result = agent.answer(message)
logger.info("Response generated. confidence=%.3f sources=%d",
result["confidence"], len(result["sources"]))
return _cors_response(200, json.dumps(result))
except Exception as e:
logger.error("RAG pipeline error: %s", str(e), exc_info=True)
return _cors_response(500, json.dumps({"error": "Internal server error"}))
def _cors_response(status_code: int, body: str) -> dict:
"""Wrap response with CORS headers for browser access."""
return {
"statusCode": status_code,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Content-Type",
"Access-Control-Allow-Methods": "POST, OPTIONS",
},
"body": body,
}