-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_loader.py
More file actions
157 lines (137 loc) · 6.44 KB
/
data_loader.py
File metadata and controls
157 lines (137 loc) · 6.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import os
import logging
from typing import List, Dict
from pdf_processor import PDFProcessor
from vector_store import VectorStore
from config import Config
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataLoader:
def __init__(self):
self.config = Config()
self.pdf_processor = PDFProcessor()
self.vector_store = VectorStore()
def load_documents(self, force_reload: bool = False) -> Dict:
"""Load all PDF documents from the configured directory into the vector store."""
try:
logger.info("Starting document loading process...")
# Check if PDF directory exists
if not os.path.exists(self.config.PDF_DIRECTORY):
logger.error(f"PDF directory {self.config.PDF_DIRECTORY} does not exist")
return {
'success': False,
'error': f"PDF directory {self.config.PDF_DIRECTORY} does not exist",
'documents_processed': 0,
'chunks_created': 0
}
# Get collection info
collection_info = self.vector_store.get_collection_info()
# If force reload is requested, clear the collection
if force_reload:
logger.info("Force reload requested. Clearing existing collection...")
self.vector_store.clear_collection()
collection_info = self.vector_store.get_collection_info()
# Check if documents are already loaded
if collection_info.get('document_count', 0) > 0 and not force_reload:
logger.info(f"Documents already loaded ({collection_info['document_count']} chunks found)")
return {
'success': True,
'message': f"Documents already loaded ({collection_info['document_count']} chunks)",
'documents_processed': 0,
'chunks_created': collection_info['document_count'],
'reused_existing': True
}
# Process PDFs
logger.info("Processing PDF documents...")
chunks = self.pdf_processor.process_pdf_directory()
if not chunks:
logger.warning("No chunks created from PDFs")
return {
'success': False,
'error': "No PDF documents found or processed",
'documents_processed': 0,
'chunks_created': 0
}
# Add chunks to vector store
logger.info(f"Adding {len(chunks)} chunks to vector store...")
success = self.vector_store.add_documents(chunks)
if success:
# Get updated collection info
updated_info = self.vector_store.get_collection_info()
logger.info(f"Successfully loaded {len(chunks)} chunks from PDFs")
return {
'success': True,
'message': f"Successfully loaded {len(chunks)} chunks",
'documents_processed': len(set(chunk['source_file'] for chunk in chunks)),
'chunks_created': len(chunks),
'collection_info': updated_info
}
else:
return {
'success': False,
'error': "Failed to add documents to vector store",
'documents_processed': len(set(chunk['source_file'] for chunk in chunks)),
'chunks_created': len(chunks)
}
except Exception as e:
logger.error(f"Error loading documents: {str(e)}")
return {
'success': False,
'error': str(e),
'documents_processed': 0,
'chunks_created': 0
}
def get_processing_stats(self) -> Dict:
"""Get statistics about the document processing."""
try:
collection_info = self.vector_store.get_collection_info()
# Count unique source files
if collection_info.get('document_count', 0) > 0:
# This would require querying the collection to get unique sources
# For now, return basic info
return {
'total_chunks': collection_info.get('document_count', 0),
'collection_name': collection_info.get('collection_name', ''),
'vector_db_path': collection_info.get('vector_db_path', ''),
'pdf_directory': self.config.PDF_DIRECTORY,
'pdf_directory_exists': os.path.exists(self.config.PDF_DIRECTORY)
}
else:
return {
'total_chunks': 0,
'collection_name': collection_info.get('collection_name', ''),
'vector_db_path': collection_info.get('vector_db_path', ''),
'pdf_directory': self.config.PDF_DIRECTORY,
'pdf_directory_exists': os.path.exists(self.config.PDF_DIRECTORY)
}
except Exception as e:
logger.error(f"Error getting processing stats: {str(e)}")
return {
'error': str(e),
'total_chunks': 0
}
def validate_pdf_directory(self) -> Dict:
"""Validate the PDF directory and its contents."""
try:
if not os.path.exists(self.config.PDF_DIRECTORY):
return {
'valid': False,
'error': f"PDF directory {self.config.PDF_DIRECTORY} does not exist",
'pdf_files': [],
'total_files': 0
}
pdf_files = [f for f in os.listdir(self.config.PDF_DIRECTORY) if f.lower().endswith('.pdf')]
return {
'valid': True,
'pdf_directory': self.config.PDF_DIRECTORY,
'pdf_files': pdf_files,
'total_files': len(pdf_files)
}
except Exception as e:
logger.error(f"Error validating PDF directory: {str(e)}")
return {
'valid': False,
'error': str(e),
'pdf_files': [],
'total_files': 0
}