-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathingest.py
More file actions
160 lines (137 loc) · 5.5 KB
/
ingest.py
File metadata and controls
160 lines (137 loc) · 5.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import os
import glob
import io
from typing import List
from sentence_transformers import SentenceTransformer
from vectorlitedb import VectorLiteDB
# Document parsing imports
import PyPDF2
from docx import Document
from pptx import Presentation
import openpyxl
DB_PATH = "kb.db"
EMBED_MODEL = "all-MiniLM-L6-v2" # 384-dim
CHUNK_CHARS = 800 # ~200 words
def chunk_text(t: str, n: int) -> List[str]:
return [t[i:i + n] for i in range(0, len(t), n)] if t else []
# Document parsing functions (same as in app.py)
def extract_text_from_pdf(content: bytes) -> str:
"""Extract text from PDF content"""
try:
pdf_reader = PyPDF2.PdfReader(io.BytesIO(content))
text = ""
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
return text.strip()
except Exception as e:
raise Exception(f"Failed to parse PDF: {str(e)}")
def extract_text_from_docx(content: bytes) -> str:
"""Extract text from DOCX content"""
try:
doc = Document(io.BytesIO(content))
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
return text.strip()
except Exception as e:
raise Exception(f"Failed to parse DOCX: {str(e)}")
def extract_text_from_pptx(content: bytes) -> str:
"""Extract text from PPTX content"""
try:
prs = Presentation(io.BytesIO(content))
text = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
text += shape.text + "\n"
return text.strip()
except Exception as e:
raise Exception(f"Failed to parse PPTX: {str(e)}")
def extract_text_from_xlsx(content: bytes) -> str:
"""Extract text from XLSX content"""
try:
workbook = openpyxl.load_workbook(io.BytesIO(content))
text = ""
for sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
for row in sheet.iter_rows(values_only=True):
row_text = " ".join(str(cell) for cell in row if cell is not None)
if row_text.strip():
text += row_text + "\n"
return text.strip()
except Exception as e:
raise Exception(f"Failed to parse XLSX: {str(e)}")
def extract_text_from_file(file_path: str) -> str:
"""Extract text from various file formats"""
file_ext = file_path.lower().split('.')[-1]
with open(file_path, 'rb') as f:
content = f.read()
if file_ext == 'txt':
return content.decode('utf-8', errors='ignore')
elif file_ext == 'md':
return content.decode('utf-8', errors='ignore')
elif file_ext == 'pdf':
return extract_text_from_pdf(content)
elif file_ext == 'docx':
return extract_text_from_docx(content)
elif file_ext == 'pptx':
return extract_text_from_pptx(content)
elif file_ext == 'xlsx':
return extract_text_from_xlsx(content)
else:
raise Exception(f"Unsupported file type: .{file_ext}")
def main() -> None:
# For demo: rebuild each time
if os.path.exists(DB_PATH):
os.remove(DB_PATH)
print("Loading embedding model...", EMBED_MODEL)
model = SentenceTransformer(EMBED_MODEL)
print("Opening VectorLiteDB...", DB_PATH)
db = VectorLiteDB(DB_PATH, dimension=384, distance_metric="cosine")
# Find all supported files
patterns = ["docs/*.txt", "docs/*.md", "docs/*.pdf", "docs/*.docx", "docs/*.pptx", "docs/*.xlsx"]
files = []
for pattern in patterns:
files.extend(glob.glob(pattern))
# Filter out .txt files that are just extracted text versions
original_files = []
for f in files:
filename = os.path.basename(f)
# Skip .txt files that are extracted versions of other files
if not (filename.endswith('.txt') and any(filename.replace('.txt', ext) in [os.path.basename(g) for g in files] for ext in ['.pdf', '.docx', '.pptx', '.xlsx'])):
original_files.append(f)
if not original_files:
print("No docs found in docs/ — add supported files (.txt, .md, .pdf, .docx, .pptx, .xlsx) and re-run.")
return
total_chunks = 0
for path in original_files:
try:
print(f"Processing {path}...")
text = extract_text_from_file(path)
if not text.strip():
print(f"Warning: No text content found in {path}")
continue
chunks = chunk_text(text, CHUNK_CHARS)
if not chunks:
print(f"Warning: No chunks created from {path}")
continue
for idx, chunk in enumerate(chunks):
vec = model.encode(chunk).tolist() # 384 floats
uid = f"{os.path.basename(path)}::{idx}"
file_type = os.path.basename(path).lower().split('.')[-1]
meta = {
"file": os.path.basename(path),
"index": idx,
"chunk": chunk,
"file_type": file_type
}
db.insert(id=uid, vector=vec, metadata=meta)
total_chunks += 1
print(f"Ingested {len(chunks)} chunks from {path}")
except Exception as e:
print(f"Error processing {path}: {str(e)}")
continue
print(f"Done. Total chunks: {total_chunks}; DB len: {len(db)}")
print(f"DB file: {DB_PATH} (bytes: {os.path.getsize(DB_PATH)})")
if __name__ == "__main__":
main()