-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdb_utils.py
More file actions
321 lines (291 loc) · 10.3 KB
/
db_utils.py
File metadata and controls
321 lines (291 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import os
import psycopg2
from psycopg2.extras import RealDictCursor
from dotenv import load_dotenv
from fastapi import UploadFile
import requests
import urllib.parse
from typing import Optional
load_dotenv() # This will load the variables from .env into the environment
# Database URL must be set in the environment
# Example: export DATABASE_URL="postgresql://<user>:<password>@68.183.184.3:5432/<dbname>"
DB_URL = os.getenv("DATABASE_URL")
if not DB_URL:
raise RuntimeError("Environment variable DATABASE_URL is not set")
def get_connection():
"""
Establish a new database connection using the DATABASE_URL.
"""
return psycopg2.connect(DB_URL)
def get_or_create_user(name: str, email: str = None) -> int:
"""
Find an existing user by name/email or create one if not found.
Returns the user_id.
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT id FROM users WHERE name = %s AND email IS NOT DISTINCT FROM %s",
(name, email),
)
row = cur.fetchone()
if row:
return row[0]
cur.execute(
"INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id",
(name, email),
)
user_id = cur.fetchone()[0]
conn.commit()
return user_id
finally:
conn.close()
def get_user_id_by_email(email: str) -> int:
"""
Retrieve the user_id associated with the given email.
Returns None if no user is found.
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute("SELECT id FROM users WHERE email = %s", (email,))
row = cur.fetchone()
if row:
return row[0]
finally:
conn.close()
def create_conversation(user_id: int) -> int:
"""
Start a new conversation for the given user.
Returns the conversation_id.
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO conversations (user_id, last_message_at) VALUES (%s, NOW()) RETURNING id",
(user_id,),
)
conversation_id = cur.fetchone()[0]
conn.commit()
return conversation_id
finally:
conn.close()
def insert_message(conversation_id: int, is_user: bool, content: str, file_urls: str = None) -> None:
"""
Insert a message into the messages table and update the conversation's last_message_at.
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO messages (conversation_id, is_user, content, file_urls) VALUES (%s, %s, %s, %s)",
(conversation_id, is_user, content, file_urls),
)
# keep track of when the conversation last saw activity
cur.execute(
"UPDATE conversations SET last_message_at = NOW() WHERE id = %s",
(conversation_id,),
)
conn.commit()
finally:
conn.close()
def insert_reply_message(conversation_id: int, is_user: bool, content: str) -> None:
"""
Insert a message into the messages table and update the conversation's last_message_at.
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO messages (conversation_id, is_user, content) VALUES (%s, %s, %s)",
(conversation_id, is_user, content),
)
# keep track of when the conversation last saw activity
cur.execute(
"UPDATE conversations SET last_message_at = NOW() WHERE id = %s",
(conversation_id,),
)
conn.commit()
finally:
conn.close()
def insert_record(conversation_id: int, question: str, answer: str, file_urls: str = None) -> int:
"""
Convenience function to record a QA interaction end-to-end:
1. Ensures the user exists (or creates them).
2. Creates a new conversation.
3. Inserts the user's question and agent's answer.
Returns:
The new conversation_id.
"""
# record the back-and-forth
insert_message(conversation_id, True, question, file_urls)
insert_message(conversation_id, False, answer, file_urls)
update_conversation_title(conversation_id, question)
return conversation_id
'''
if __name__ == "__main__":
record_qa(conversation_id=1, question="Hello AI?", answer="Hello! How can I help you today?")
'''
def update_conversation_title(conversation_id: int, question: str) -> None:
"""
Given a conversation_id and question, checks if the message is the first one in the conversation and if so,
updates the conversation title in the conversations table.
"""
conn = get_connection()
try:
with conn.cursor() as cur:
# count the number of messages in the conversation
cur.execute("SELECT COUNT(*) FROM messages WHERE conversation_id = %s", (conversation_id,))
row = cur.fetchone()
print(row)
if row[0] == 2:
cur.execute("UPDATE conversations SET title = %s WHERE id = %s", (question, conversation_id))
conn.commit()
finally:
conn.close()
# ========= File Upload =========
FILE_SERVER_BASE = os.getenv("FILE_SERVER_BASE_URL", "https://api.nexiuslabs.com")
def upload_file(folder_path: str, file: UploadFile) -> str:
"""
Uploads a file to your file‐server.
folder_path: e.g. "/uploads" or "uploads"
file: the FastAPI UploadFile
Returns the full URL where the file now lives.
"""
# normalize folder_path
folder = folder_path.lstrip("/")
# build a full URL safely
upload_url = urllib.parse.urljoin(FILE_SERVER_BASE.rstrip("/") + "/", f"{folder}/{file.filename}")
# rewind and read
file.file.seek(0)
data = file.file.read()
resp = requests.put(upload_url, data=data, stream=True)
# raise on any HTTP error
resp.raise_for_status()
return upload_url
#------------------------ TASK ---------------------------
def get_tasks_by_user_id(user_id: int):
"""
Retrieve all tasks for a given user_id from the Tasks table.
Returns a list of dicts.
"""
conn = get_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT id, user_id, mail_id, title, detail, due_at, is_done, created_at
FROM Tasks
WHERE user_id = %s
ORDER BY due_at ASC NULLS LAST, created_at DESC
""", (user_id,))
return cur.fetchall()
finally:
conn.close()
def insert_new_task(user_id: int, mail_id: str, title: str, detail: str, due_at: Optional[str] = None) -> int:
"""
Insert a new task into the Tasks table.
Returns the id of the new task.
"""
conn = get_connection()
try:
with conn.cursor() as cur:
if due_at:
cur.execute("""
INSERT INTO Tasks (user_id, mail_id, title, detail, due_at, created_at)
VALUES (%s, %s, %s, %s, %s, NOW())
RETURNING id
""", (user_id, mail_id, title, detail, due_at))
else:
cur.execute("""
INSERT INTO Tasks (user_id, mail_id, title, detail, created_at)
VALUES (%s, %s, %s, %s, NOW())
RETURNING id
""", (user_id, mail_id, title, detail))
row = cur.fetchone()
conn.commit()
return row[0]
finally:
conn.close()
def update_task_status(task_id: int, is_done: bool) -> None:
"""
Update the is_done status of a task with given id.
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute("""
UPDATE Tasks
SET is_done = %s
WHERE id = %s
""", (is_done, task_id))
conn.commit()
finally:
conn.close()
def list_tasks_by_user_id(user_id: int):
"""
Retrieve all tasks for a given user_id from the Tasks table.
Returns a list of dicts.
"""
conn = get_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT id, user_id, mail_id, title, detail, due_at, is_done, created_at
FROM Tasks
WHERE user_id = %s
ORDER BY due_at ASC NULLS LAST, created_at DESC
""", (user_id,))
return cur.fetchall()
finally:
conn.close()
#============ Email =============
def get_mail_id_by_task_id(task_id: int) -> int:
"""
Retrieve mail_id associated with a given task_id from the Tasks table.
Returns None if no task is found.
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute("SELECT mail_id FROM Tasks WHERE id = %s", (task_id,))
row = cur.fetchone()
if row:
return row[0]
finally:
conn.close()
#=========== Email Table ===============
def insert_email(user_id: int, mail_id: str, subject: str, body_summary: str, sender: str, body_detail: str) -> int:
"""
Insert a new email into the Emails table.
Returns the id of the new email.
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO email_records (user_id, mail_id, subject, body_summary, sender, body_detail)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
""", (user_id, mail_id, subject, body_summary, sender, body_detail))
row = cur.fetchone()
conn.commit()
return row[0]
finally:
conn.close()
def update_draft_reply(mail_id: str, ai_draft_reply: str) -> None:
"""
Update the draft reply and body detail of a mail record with given mail_id.
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute("""
UPDATE email_records
SET ai_draft_reply = %s
WHERE mail_id = %s
""", (ai_draft_reply, mail_id))
conn.commit()
finally:
conn.close()