-
Notifications
You must be signed in to change notification settings - Fork 43
Expand file tree
/
Copy pathstreamlit_app.py
More file actions
126 lines (99 loc) · 3.83 KB
/
Copy pathstreamlit_app.py
File metadata and controls
126 lines (99 loc) · 3.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import asyncio
from pathlib import Path
import time
import streamlit as st
import inngest
from dotenv import load_dotenv
import os
import requests
load_dotenv()
st.set_page_config(page_title="RAG Ingest PDF", page_icon="📄", layout="centered")
@st.cache_resource
def get_inngest_client() -> inngest.Inngest:
return inngest.Inngest(app_id="rag_app", is_production=False)
def save_uploaded_pdf(file) -> Path:
uploads_dir = Path("uploads")
uploads_dir.mkdir(parents=True, exist_ok=True)
file_path = uploads_dir / file.name
file_bytes = file.getbuffer()
file_path.write_bytes(file_bytes)
return file_path
async def send_rag_ingest_event(pdf_path: Path) -> None:
client = get_inngest_client()
await client.send(
inngest.Event(
name="rag/ingest_pdf",
data={
"pdf_path": str(pdf_path.resolve()),
"source_id": pdf_path.name,
},
)
)
st.title("Upload a PDF to Ingest")
uploaded = st.file_uploader("Choose a PDF", type=["pdf"], accept_multiple_files=False)
if uploaded is not None:
with st.spinner("Uploading and triggering ingestion..."):
path = save_uploaded_pdf(uploaded)
# Kick off the event and block until the send completes
asyncio.run(send_rag_ingest_event(path))
# Small pause for user feedback continuity
time.sleep(0.3)
st.success(f"Triggered ingestion for: {path.name}")
st.caption("You can upload another PDF if you like.")
st.divider()
st.title("Ask a question about your PDFs")
async def send_rag_query_event(question: str, top_k: int) -> None:
client = get_inngest_client()
result = await client.send(
inngest.Event(
name="rag/query_pdf_ai",
data={
"question": question,
"top_k": top_k,
},
)
)
return result[0]
def _inngest_api_base() -> str:
# Local dev server default; configurable via env
return os.getenv("INNGEST_API_BASE", "http://127.0.0.1:8288/v1")
def fetch_runs(event_id: str) -> list[dict]:
url = f"{_inngest_api_base()}/events/{event_id}/runs"
resp = requests.get(url)
resp.raise_for_status()
data = resp.json()
return data.get("data", [])
def wait_for_run_output(event_id: str, timeout_s: float = 120.0, poll_interval_s: float = 0.5) -> dict:
start = time.time()
last_status = None
while True:
runs = fetch_runs(event_id)
if runs:
run = runs[0]
status = run.get("status")
last_status = status or last_status
if status in ("Completed", "Succeeded", "Success", "Finished"):
return run.get("output") or {}
if status in ("Failed", "Cancelled"):
raise RuntimeError(f"Function run {status}")
if time.time() - start > timeout_s:
raise TimeoutError(f"Timed out waiting for run output (last status: {last_status})")
time.sleep(poll_interval_s)
with st.form("rag_query_form"):
question = st.text_input("Your question")
top_k = st.number_input("How many chunks to retrieve", min_value=1, max_value=20, value=5, step=1)
submitted = st.form_submit_button("Ask")
if submitted and question.strip():
with st.spinner("Sending event and generating answer..."):
# Fire-and-forget event to Inngest for observability/workflow
event_id = asyncio.run(send_rag_query_event(question.strip(), int(top_k)))
# Poll the local Inngest API for the run's output
output = wait_for_run_output(event_id)
answer = output.get("answer", "")
sources = output.get("sources", [])
st.subheader("Answer")
st.write(answer or "(No answer)")
if sources:
st.caption("Sources")
for s in sources:
st.write(f"- {s}")