-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
777 lines (621 loc) · 33.6 KB
/
main.py
File metadata and controls
777 lines (621 loc) · 33.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
# %%
import time
import os
import re
import json
import warnings
import logging
# This hides the annoying Telemetry and Deprecation logs
logging.getLogger('chromadb').setLevel(logging.ERROR)
warnings.filterwarnings("ignore", category=DeprecationWarning)
import torch
# 1. Check the status and save it to a variable
cuda_available = torch.cuda.is_available()
# 2. Print the status
print(f"Is CUDA available? {cuda_available}")
# 3. Only ask for the GPU name if it actually exists
if cuda_available:
try:
print(f"GPU Name: {torch.cuda.get_device_name(0)}")
except Exception:
print("GPU found but name could not be retrieved.")
else:
print("Running in CPU mode. (Optimized for Python 3.13 stability)")
# Suppress Torch internal warnings
os.environ['TORCH_CPP_LOG_LEVEL'] = 'ERROR'
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
warnings.filterwarnings("ignore", category=UserWarning)
# Neutralize the path inspection error
# pylint: disable=no-member
torch.classes.__path__ = [] # type: ignore
# Disable Streamlit's file watcher (which often triggers the lag)
os.environ["STREAMLIT_SERVER_ENABLE_FILE_WATCHER"] = "false"
# Ensure UTF-8 is used even if not set in the terminal
if os.environ.get("PYTHONUTF8") != "1":
os.environ["PYTHONUTF8"] = "1"
# Force Offline Mode (Bypasses the 10054 Connection Errors)
os.environ["HF_HUB_OFFLINE"] = "1"
os.environ["TRANSFORMERS_OFFLINE"] = "1"
# Standardizing the HuggingFace cache to avoid path issues
if "HF_HOME" not in os.environ:
os.environ["HF_HOME"] = "C:/huggingface_cache"
# Optional: Silence the Symlinks warning you mentioned
os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1"
os.environ["HF_TOKEN"] = "" # This kills the bad character error, ensures no 'latin-1' encoding crashes
import json
import streamlit as st
from PIL import Image, ImageOps
import io # Streamlit's st.download_button requires raw bytes
from langchain_chroma import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
# These are the ones that should finally stop being underlined
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
# --- GUARDRAIL UTILITIES ---
def is_injection(text):
"""
Scans user input for potential prompt injection or adversarial attacks.
This security function performs a heuristic check against a library of
known 'jailbreak' patterns and system-override commands. It prevents
users from attempting to:
1. Hijack the model's 'Heritage Expert' persona.
2. Extract the internal system prompt or instructions.
3. Force the model to bypass character limits or safety guardrails.
4. Execute unintended code or logic shifts within the RAG pipeline.
Args:
text (str): The sanitized user input string (post-PII scrubbing).
Returns:
bool: True if the input matches a high-risk injection pattern,
False if the query appears to be a legitimate user request.
Notes:
This function uses a list of forbidden keywords and phrases (e.g.,
'Ignore all previous', 'System Prompt', 'developer mode'). It is a
first-line defense and should be updated as new injection
techniques emerge in the LLM landscape.
"""
# Checks for common prompt injection phrases.
black_list = ["ignore all instructions", "system prompt", "developer mode", "dan mode"] #"DAN mode" stands for "Do Anything Now."
return any(phrase in text.lower() for phrase in black_list)
def scrub_sensitive_info(text):
"""
Identifies and redacts Personal Identifiable Information (PII) from user input.
This function acts as a primary security guardrail by:
1. Detecting Singapore-specific NRIC/FIN patterns (e.g., S1234567A).
2. Identifying local 8-digit phone numbers and international formats.
3. Spotting common email address patterns.
4. Replacing sensitive strings with generic labels (e.g., [REDACTED_NRIC])
before the data reaches the LLM or logs.
Args:
text (str): The raw input string provided by the user in the chat interface.
Returns:
str: The 'sanitized' version of the text with all detected PII masked.
Notes:
This is a deterministic regex-based scrubber. While highly effective for
standard formats, it does not use NLP to detect PII in complex
conversational contexts (e.g., a home address written in prose).
"""
# 1. Scrub Singapore EP / FIN / NRIC
singapore_id_pattern = r'\b[FGMSThfgmst]\d{7}[A-Za-z]\b'
text = re.sub(singapore_id_pattern, r"[IDENTIFIER REDACTED]", text)
# 2. Scrub Emails
text = re.sub(r'\S+@\S+', r"[EMAIL REDACTED]", text)
# 3. Scrub Singapore Phone Numbers (8 digits starting with 8 or 9)
text = re.sub(r'\b[89]\d{7}\b', r"[PHONE REDACTED]", text)
return text
def log_image_feedback(path, score, duration):
"""Writes image feedback immediately to the telemetry file."""
status_msg = "LIKE (1)" if score == 1 else "DISLIKE (0)"
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
with open("telemetry_log.txt", "a", encoding="utf-8") as f:
f.write(f"{status_msg} | Image: {path} | Time: {timestamp} | Latency: {duration:.2f}s\n")
# --- Save and Load Functions ( To recognise returning users) ---
def load_chat_history(username):
"""
Retrieves the persisted chat session for a specific user from local storage.
This function facilitates session continuity by:
1. Locating the user-specific JSON file based on the provided username.
2. Validating the existence of the file to prevent file-not-found errors.
3. Parsing the JSON data back into a Python list of message dictionaries.
Args:
username (str): The unique identifier for the user, matching the
filename used during the save process.
Returns:
list: A list of message dictionaries (e.g., [{'role': 'user', ...}])
retrieved from the file. Returns an empty list [] if no history
file exists for the user.
Notes:
This function is typically called during the initial Streamlit page
load to populate 'st.session_state.messages'.
"""
# Loads existing clean history from a JSON file.
file_path = f"history_{username}.json"
if os.path.exists(file_path):
with open(file_path, "r") as f:
return json.load(f)
return []
def save_chat_history(username, messages, max_history=30):
"""
Persists the current chat session to a local JSON file.
This function manages the long-term memory of the application by:
1. Mapping the session data to a specific user file (e.g., 'nathan_history.json').
2. Implementing a 'Rolling Window' memory management strategy to keep
the file size performant.
3. Ensuring the data is stored in a structured JSON format for easy retrieval.
Args:
username (str): The unique identifier for the user, used as the filename.
messages (list): A list of dictionary objects containing the chat
history (e.g., [{'role': 'user', 'content': '...'}, ...]).
max_history (int, optional): The maximum number of messages to retain
before the oldest entries are pruned. Defaults to 30.
Returns:
bool: True if the file was written successfully, False otherwise.
Notes:
The function automatically creates the directory if it does not exist
and overwrites the previous history file with the updated, pruned list.
"""
# Saves only the last N messages (already scrubbed) to JSON.
file_path = f"history_{username}.json"
# Slice the list to keep only the most recent messages
# -max_history takes the last 30 items from the list
recent_messages = messages[-max_history:]
with open(file_path, "w") as f:
json.dump(recent_messages, f)
@st.cache_resource # Use Streamlit's cache so this only runs ONCE per app start
def get_vectorstore():
"""
Initializes and loads the Chroma vector database and embedding model.
This function is cached by Streamlit to ensure the embedding model and
database connection are only loaded into memory once per application session.
It performs a safety check to ensure the persisted database exists before
attempting to load it.
Returns:
langchain_community.vectorstores.Chroma: The initialized vectorstore
object ready for retrieval.
Raises:
SystemExit: If the 'persist_dir' directory is not found, stopping the
Streamlit app with an error message.
"""
# Runs once on the first load.
# On the next message, it returns the existing object in ~0.001 seconds.
local_model_path = r"C:\huggingface_cache\hub\models--sentence-transformers--all-MiniLM-L6-v2\snapshots\c9745ed1d9f207416be6d2e6f8de32d1f16199bf"
# 1. Configuration Constants
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
PERSIST_DIR = os.path.join(BASE_DIR, "chroma_db_v3")
model_kwargs = {'device': 'cpu'}
encode_kwargs = {'normalize_embeddings': False}
# 2. Initialize Embeddings (The "Translator")
embeddings = HuggingFaceEmbeddings(
model_name=local_model_path,
model_kwargs=model_kwargs,
encode_kwargs=encode_kwargs,
cache_folder=r"C:/huggingface_cache"
)
# 3. Safety check: Stop if the folder is missing
if not os.path.exists(PERSIST_DIR):
st.error("Vector database not found. Please run ingest.py first!")
st.stop()
# 4. Return the loaded Chroma instance
return Chroma(persist_directory=PERSIST_DIR,
embedding_function=embeddings,
collection_metadata={"hnsw:num_threads": 16} # Leveraging on my laptop's 22 threads
)
def log_download_telemetry(img_path, img_desc):
"""Logs the download event to both the text file and the JSON chat history."""
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
# 1. Update the flat telemetry log
with open("telemetry_log.txt", "a", encoding="utf-8") as f:
f.write(f"DOWNLOAD | User: {USER_NAME} | Image: {img_path} | Time: {timestamp}\n")
# 2. Update the session state so it persists in the JSON
if st.session_state.messages:
# Find the last message and attach download metadata
st.session_state.messages[-1]["download_confirmed"] = {
"image": img_desc,
"time": timestamp
}
# 3. Trigger your existing save function to update history_Nathan.json
save_chat_history(USER_NAME, st.session_state.messages)
def handle_chat_query(user_query, vectorstore):
"""
Orchestrates the RAG pipeline to generate a heritage-focused response.
This function manages the end-to-end inference process:
1. Initializes a connection to the local LM Studio server.
2. Retrieves relevant context from the vector database.
3. Executes a single-pass 'Researcher-Compliance' chain to generate a
formatted, two-paragraph response with citations.
4. Handles connection errors to the local LLM gracefully.
5. Displays the result and any associated historical images in the Streamlit UI.
Args:
user_query (str): The cleaned and validated text input from the user.
vectorstore (langchain_community.vectorstores.Chroma): The loaded
Chroma database instance used for similarity search.
Returns:
None: The function directly updates the Streamlit UI and session state.
Raises:
Exception: Catches and displays local server connection issues or
timeout errors within the internal try-except block.
"""
# 1. Setup the LLM
llm = ChatOpenAI(
base_url="http://localhost:1234/v1",
api_key="lm-studio",
model_name="meta-llama-3.1-8b-instruct",
temperature=0,
verbose=True
)
# 2. SLIDING WINDOW: Get only the last 5 messages from session state
# This keeps the 'buffer' small and the LLM focused.
contextual_history = st.session_state.messages[-5:]
# 3. Format history for the prompt
# Convert the list of dicts into a single string the LLM can read
history_str = "\n".join([f"{m['role'].capitalize()}: {m['content']}" for m in contextual_history])
# 2. Define the Researcher Chain
research_prompt = ChatPromptTemplate.from_template("""
You are an expert on Peranakan heritage.
Use the following pieces of retrieved context and chat history to answer the question.
If you don't know the answer, say that you don't know.
CHAT HISTORY: {chat_history}
CONTEXT: {context}
Crucial: At the end of your response, cite the source and page number found in the metadata of the context provided above.
Do NOT use default values like 'Page 24' unless it is explicitly in the context.
RESEARCH TASK: Explain the rituals related to "{input}" based ONLY on the context.
""")
research_chain = research_prompt | llm | StrOutputParser()
document_prompt = PromptTemplate(
input_variables=["page_content"], # Only content is strictly required now
template="Context Chunk:\n{page_content}\n(Metadata -> Source: {source}, Page: {page})",
# If 'source' or 'page' are missing in the DB, use these fallbacks:
default_values={"source": "A Baba Wedding", "page": "N/A"}
)
# 3. Define the Compliance Chain
compliance_prompt = ChatPromptTemplate.from_template("""
'You are a compliance editor. Polished the following draft for the Heritage Bot using the provided draft. '
DRAFT: {draft}
'Ensure it is exactly two paragraphs and cites [Source: 'A Baba Wedding', Page X]. '
'Use the 'page_number' from the RETRIEVED CONTEXT to replace 'Page X' in the final response. '
'Only answer to the user query, do not suggest anything else. '
Specific Instructions(ONLY apply to text draft) ONLY when user ask:
1. If, and only if asked about the author of the book, provide this answer: "Author is Cheo Kim Ban. He is a renown Peranakan."
2. If, and only if asked about the number of pages of this book, provide only this answer "This book has 110 pages."
3. If, and only if asked who wrote the Foreword, provide this answer: "Charles K.K.Chua wrote the foreword."
4. If, and only if asked about Author's Preface, go to the section "Author's Preface" writtened by 'Cheo Kim Boon, Kenneth Singapore 1983' and provide a summary. Do not mentioned displaying of image of Two Nyongyas.
5. If, and only if asked about the Message to Facsimile Edition, provided by Baba Peter Wee, 1st Vice President The Peranakan Association, go to the section "Message to Facsimile Edition" and provide a summary.
ONLY If the user asks for a 'pic', 'image', 'plate', 'photo', or 'visual', you are to provide ONLY ONE curated visual aid that best represents the ritual or query described in the book.:
1. Scan the metadata of the provided documents for 'file_path'.
2. If 'file_path' exists, your answer MUST start with: "I am displaying a visual representation of [Title] now."
3. Use the 'image_source' from metadata to provide proper credit (e.g., Peranakan Museum or AI Reconstruction).
4. Do NOT mention that these are "substitute" or "replacement" images; simply present them as the relevant visual aid.
5. Do NOT say "I can show you" or "the path is not provided." If the metadata exists in the context, the image IS available.
6. Do not discuss the availability of other images; focus only on the one being displayed.
7. Ensure the tone acknowledges these as heritage visuals (from archives, museum collections, or AI reconstructions) rather than original book plates.
8. If, and only if asked, provide the page number of the original plate found in the book.
"""
)
compliance_chain = compliance_prompt | llm | StrOutputParser()
# 4. Combine them into one "Super Chain"
# This is what you would call in handle_chat_query function
full_process_chain = (
{
"context": lambda x: retriever.invoke(x["input"]),
"input": RunnablePassthrough(),
"chat_history": lambda x: history_str # Pass the 5-message window here
}
| RunnablePassthrough.assign(draft=research_chain)
| compliance_chain # This final stage formats it to 2 paragraphs + citations
)
# 5. Start the status container
with st.status("Searching the archives...", expanded=False) as status:
try:
# Capture Start Time
start_time = time.time()
# Initialise source_docs for the image logic later:
source_docs = retriever.get_relevant_documents(user_query)
# --- INSERT THE DEBUG CODE HERE ---
print(f"\n--- BACKEND DEBUG START ---")
print(f"Query: {user_query}")
print(f"Documents Found: {len(source_docs)}")
for i, doc in enumerate(source_docs):
print(f"Doc {i} Metadata: {doc.metadata}")
# This helps you see if 'VISUAL AID' tags are actually present
print(f"Doc {i} Content Snippet: {doc.page_content[:100]}...")
print(f"--- BACKEND DEBUG END ---\n")
# ----------------------------------
# INVOKE the Super Chain
# Wrap the query in a dict so the chain can access x["input"]
answer = full_process_chain.invoke({"input": user_query})
# Capture End Time
end_time = time.time()
duration = end_time - start_time
with open("performance_logs.txt", "a", encoding="utf-8") as f:
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
f.write(f"[{timestamp}] User: {USER_NAME} | Latency: {duration:.2f}s | Query: {user_query[:50]}...\n")
status.update(label=f"Research complete in {duration:.2f}s!", state="complete")
except Exception as e:
# Handle the error gracefully
st.error(f"Error: {str(e)}")
return
# --- DISPLAY LOGIC HERE ---
with st.chat_message("assistant"):
image_keywords = ["show", "picture", "image", "plate", "photo", "look like"]
asked_for_pic = any(word in user_query.lower() for word in image_keywords)
start_time = time.time()
final_response = answer
if asked_for_pic:
image_found = False
# 1. DEFINE YOUR ABSOLUTE BASE
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# Use BASE_DIR to build all other paths
PERSIST_DIR = os.path.join(BASE_DIR, "chroma_db_v3")
IMAGE_DIR = os.path.join(BASE_DIR, "data1")
DICTIONARY_PATH = os.path.join(BASE_DIR, "data_dictionary.json")
# 2. Load the Master Data Dictionary
try:
dict_path = os.path.join(BASE_DIR, "data_dictionary.json")
with open(DICTIONARY_PATH, "r", encoding="utf-8") as f:
master_dict = json.load(f) # This is a DICT, not a list
except Exception as e:
print(f"DEBUG: Could not load JSON: {e}")
master_dict = {}
# 3. Iterate through results from the Vectorstore
for doc in source_docs:
raw_rel_path = doc.metadata.get("file_path", "")
if raw_rel_path:
# 4. CONSTRUCT THE ABSOLUTE PATH
raw_rel_path = raw_rel_path.replace("data/", "data1/")
clean_rel_path = os.path.normpath(raw_rel_path)
full_img_path = os.path.join(BASE_DIR, clean_rel_path)
if os.path.exists(full_img_path):
raw_img = Image.open(full_img_path)
fixed_img = ImageOps.exif_transpose(raw_img)
# Iterate through .values() because it's a dict ---
image_entry = next(
(item for item in master_dict.values()
if isinstance(item, dict) and item.get("file_path") == raw_rel_path),
{}
)
# 5. PULL METADATA
# Use data from the JSON entry if found, otherwise fallback to DB metadata
description = image_entry.get("description", doc.metadata.get("title", "Archival Image"))
ref_source = image_entry.get("reference_source", "A Baba Wedding")
page_num = image_entry.get("page_number", doc.metadata.get("page_number", "N/A"))
license_info = image_entry.get("license", "Singapore Copy Right Fair Dealing 2021")
final_caption = f"{description} | Source: {ref_source}, Page {page_num} | {license_info}"
# 6. UI DISPLAY
st.image(fixed_img, caption=final_caption, width=400)
# 6a. Add Download Button
# Convert the PIL image to bytes for the download button
img_byte_arr = io.BytesIO()
# Save it as JPEG depending on needs
fixed_img.save(img_byte_arr, format='JPEG')
img_data = img_byte_arr.getvalue()
# 6b. Download Button with Telemetry
# In Streamlit, if a button is clicked, it returns True for that rerun
if st.download_button(
label="📥 Download This Image",
data=img_data,
file_name=f"{description.replace(' ', '_')}.jpg",
mime="image/jpg",
key=f"dl_{raw_rel_path, description}" # Unique key for Streamlit widgets
):
# This block runs when the user clicks the button
with open("telemetry_log.txt", "a", encoding="utf-8") as f:
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
f.write(f"DOWNLOAD | User: {USER_NAME} | Image: {raw_rel_path} | Time: {timestamp}\n")
# 7. TELEMETRICS (Using built-in feedback for automatic shading)
feedback_key = f"img_feedback_{raw_rel_path}"
# We use 'on_change' to trigger the function the moment the user clicks
st.feedback(
"thumbs",
key=feedback_key,
on_change=lambda: log_image_feedback(
raw_rel_path,
st.session_state[feedback_key],
duration
)
)
if feedback is not None:
# feedback is 1 for Up (Thumbs Up), 0 for Down (Thumbs Down)
status_msg = "LIKE" if feedback == 1 else "DISLIKE"
with open("telemetry_log.txt", "a") as f:
f.write(f"{status_msg} | Image: {raw_rel_path} | Time: {time.ctime()} | Duration: {duration:.2f}s\n")
# 8. SAVE TO SESSION STATE
st.session_state.messages.append({
"role": "assistant",
"content": f"Found image: {description}",
"images": [{"path": full_img_path, "caption": final_caption}]
})
image_found = True
# Attach the image data directly to the message object we will save later
persistent_image_list = [{"path": full_img_path, "caption": final_caption}]
break
# Create the message dictionary
ans_msg = {"role": "assistant", "content": final_response}
# This structure matches the "Display Chat History Loop" at the bottom
if image_found:
ans_msg["images"] = [
{
"path": raw_rel_path,
"caption": final_caption,
"description": description
}
]
st.session_state.messages.append(ans_msg)
# 9. Capture end time after the process finishes
end_time = time.time()
duration = end_time - start_time
if not image_found:
st.warning("I found the information, but the corresponding archival image is missing from the folder.")
# CASE B: User asked for info (Text Only)
else:
# 1. Start timer for the text response
end_time = time.time()
# 2. Stream the data to the UI for the "live" feel
def stream_data():
for word in answer.split(" "):
yield word + " "
time.sleep(0.04)
full_response = st.write_stream(stream_data)
# 3. Calculate final duration
end_time = time.time()
duration = end_time - start_time
# 4. Format the content with the Duration prefix (matching the image logic intent)
formatted_content = f"⏱️ Duration {duration:.2f}s\n\n{full_response}"
# Save ONLY the text info to history
st.session_state.messages.append({
"role": "assistant",
"content": formatted_content
})
# --- CRITICAL: Stop here if it was a text request ---
save_chat_history(USER_NAME, st.session_state.messages)
st.rerun()
# Initialize the vectorstore globally
vectorstore = get_vectorstore()
retriever = vectorstore.as_retriever(search_kwargs={"k": 1})
# --- Peranakan Heritage Header CSS ---
st.markdown(
"""
<style>
.block-container {
padding-top: 12rem !important;
padding-bottom: 5rem !important;
}
/* 1. Define the Animation (The 'Instructions' for the pulse) */
@keyframes pulse {
0% { transform: scale(1); }
50% { transform: scale(1.2); } /* Grows by 20% */
100% { transform: scale(1); }
}
.fixed-header {
position: fixed;
top: 2.875rem;
left: 0;
width: 100%;
/* Updated to Turquoise and Pink Gradient */
background: linear-gradient(135deg, #40E0D0 0%, #FFB6C1 100%);
z-index: 999;
padding: 20px 0;
border-bottom: 5px double #D4AF37; /* Keeping the Gold border */
text-align: center;
box-shadow: 0px 4px 10px rgba(0,0,0,0.1);
}
.header-title {
color: #FFFFFF !important; /* White text looks better on bright colors */
text-shadow: 2px 2px 4px rgba(0,0,0,0.3);
font-weight: 800;
margin-bottom: 0px;
}
.header-subtitle {
color: #F8F8F8 !important;
font-style: italic;
font-weight: 400;
}
/* 2. Apply the Animation to the Lantern class */
.lantern {
font-size: 1.5em;
display: inline-block;
vertical-align: middle;
padding: 0 10px;
filter: drop-shadow(0 0 5px rgba(255, 255, 255, 0.6));
/* This line connects the keyframes to the emoji */
animation: pulse 2s infinite ease-in-out;
}
</style>
<div class="fixed-header">
<h1 class="header-title">BABA-NYONYA HERITAGE BOT <span class="lantern">🏮</span></h1>
<div class="header-subtitle">Insights from "A Baba Wedding" by Cheo Kim Ban</div>
</div>
""",
unsafe_allow_html=True
)
# --- 1. Update and define the User Identifier Initialization ---
USER_NAME = "Nathan" # This matches the filename history_Nathan.json
# --- 2. Initialize or Load Chat Memory ---
if "messages" not in st.session_state:
# Try to load existing clean history
existing_history = load_chat_history(USER_NAME)
# If history exists, use it; otherwise, start an empty list
if existing_history:
st.session_state.messages = existing_history
else:
st.session_state.messages = []
# --- Updated Display Chat History Loop ---
for i, message in enumerate(st.session_state.messages):
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Display Sources if they exist in history
if "sources" in message:
with st.expander("View Research Sources"):
for src in message["sources"]:
st.write(f"Ref: Page {src['page']} | {src['text']}...")
# Display images if they exist
if "images" in message:
for img_info in message["images"]:
# 1. Get the path and force the 'data1' correction
raw_saved_path = img_info["path"]
corrected_history_path = raw_saved_path.replace("data/", "data1/")
# 2. Re-verify the absolute path
full_path = os.path.normpath(os.path.join(os.getcwd(), corrected_history_path))
if os.path.exists(full_path):
img = Image.open(full_path)
img_to_show = ImageOps.exif_transpose(img)
# Display the image
st.image(img_to_show, caption=img_info["caption"], width=300)
# --- 2. ADD PERSISTENT DOWNLOAD BUTTON ---
# Convert the displayed image to bytes
buf = io.BytesIO()
# Use PNG for high quality, or JPEG if you prefer
img_to_show.save(buf, format="PNG")
img_bytes = buf.getvalue()
st.download_button(
label="📥 Download Image",
data=img_bytes,
file_name=f"heritage_image_{i}.png",
mime="image/png",
key=f"dl_btn_{i}_{corrected_history_path}", # Unique key for each image in history
on_click=log_download_telemetry, # Uses your telemetry function
args=(corrected_history_path, img_info["caption"])
)
else:
st.warning(f"Note: Image from previous chat session not found at {corrected_history_path}")
# ADD FEEDBACK BUTTONS FOR ASSISTANT RESPONSES
if message["role"] == "assistant":
# Check if feedback already exists for this message
existing_feedback = message.get("feedback", None)
# Using the built-in thumbs feedback component
feedback = st.feedback(
"thumbs",
key=f"feedback_{i}",
disabled=existing_feedback is not None,
)
# If user clicks a button and we haven't recorded it yet
if feedback is not None and existing_feedback is None:
st.session_state.messages[i]["feedback"] = feedback # Index based to ensure that every set of buttons is unique to that specific message in the conversation
save_chat_history(USER_NAME, st.session_state.messages) # Note: 1 is usually Thumbs Up, 0 is Thumbs Down
st.rerun()
# User Input
if prompt := st.chat_input("Ask about Baba wedding rituals..."):
# STEP 1: Scrub the input immediately
clean_prompt = scrub_sensitive_info(prompt)
# STEP 2: Check length guardrail on the cleaned text
if len(clean_prompt) > 500: # Prevent 'stolen tokens' too many tokens from long input text
st.error(f"Your message is too long ({len(prompt)} characters). Please keep it under 500 characters.")
elif is_injection(clean_prompt):
st.warning("⚠️ Access Denied: This query contains restricted system instructions.")
else:
# STEP 3: If all guardrails pass, display the user message with a typing effect and store the REDACTED version
with st.chat_message("user"):
def stream_user_text():
for word in clean_prompt.split(" "):
yield word + " "
time.sleep(0.02) # Faster speed for user input (0.02s)
st.write_stream(stream_user_text)
# STEP 4: Add to session state (standard behavior)
st.session_state.messages.append({"role": "user", "content": clean_prompt})
# STEP 5: Pass the safe text to the RAG logic
handle_chat_query(clean_prompt, vectorstore)
# STEP: Save immediately after the reponse
save_chat_history(USER_NAME, st.session_state.messages)
# %%