-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
127 lines (107 loc) · 4.37 KB
/
worker.py
File metadata and controls
127 lines (107 loc) · 4.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import os
import time
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.datamodel.base_models import InputFormat
from qdrant_client.models import Filter, FieldCondition, MatchValue, VectorParams, Distance
from config import (
qdrant_client,
QDRANT_COLLECTION,
MAX_RETRIES,
get_vector_store,
)
from database import SessionLocal
from models import IngestionJob, JobStatus
from redis_client import get_job_queue
from logger import get_logger
log = get_logger("worker")
def process_ingestion(job_id: str, file_path: str, filename: str, enable_ocr: bool = False):
"""
Main ingestion task. Idempotent: deletes existing vectors for this
document_id before inserting, so retries never produce duplicates.
"""
db = SessionLocal()
try:
job = db.query(IngestionJob).filter(IngestionJob.id == job_id).first()
if not job:
log.error(f"Job {job_id} not found in database")
return
job.status = JobStatus.PROCESSING
db.commit()
log.info(f"[{job_id}] PENDING -> PROCESSING | file={filename} | ocr={enable_ocr}")
pipeline_options = PdfPipelineOptions()
pipeline_options.do_ocr = enable_ocr
pipeline_options.do_table_structure = True
converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(
pipeline_options=pipeline_options
)
}
)
doc_converted = converter.convert(file_path)
md_content = doc_converted.document.export_to_markdown()
log.info(f"[{job_id}] OCR complete, markdown_length={len(md_content)}")
langchain_doc = Document(
page_content=md_content,
metadata={"source": filename, "document_id": job_id},
)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1500,
chunk_overlap=300,
)
splits = text_splitter.split_documents([langchain_doc])
for doc in splits:
doc.metadata["document_id"] = job_id
log.info(f"[{job_id}] Chunked into {len(splits)} chunks")
# Wipe entire collection so only the new document is in context
try:
collection_info = qdrant_client.get_collection(QDRANT_COLLECTION)
if collection_info.points_count > 0:
qdrant_client.delete_collection(QDRANT_COLLECTION)
qdrant_client.create_collection(
collection_name=QDRANT_COLLECTION,
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
)
log.info(f"[{job_id}] Cleared collection ({collection_info.points_count} points)")
except Exception as e:
log.warning(f"[{job_id}] Failed to clear collection: {e}")
vector_store = get_vector_store()
vector_store.add_documents(splits)
log.info(f"[{job_id}] Added {len(splits)} chunks to Qdrant")
if os.path.exists(file_path):
os.remove(file_path)
job.status = JobStatus.COMPLETED
db.commit()
log.info(f"[{job_id}] PROCESSING -> COMPLETED")
except Exception as e:
db.rollback()
log.error(f"[{job_id}] Ingestion failed: {e}")
job = db.query(IngestionJob).filter(IngestionJob.id == job_id).first()
if job:
job.retry_count += 1
if job.retry_count < MAX_RETRIES:
job.status = JobStatus.PENDING
db.commit()
delay = 2 ** job.retry_count
log.info(
f"[{job_id}] FAILED -> PENDING (retry "
f"{job.retry_count}/{MAX_RETRIES} in {delay}s)"
)
time.sleep(delay)
queue = get_job_queue()
queue.enqueue(
process_ingestion, job_id, file_path, filename
)
else:
job.status = JobStatus.FAILED
job.error_message = str(e)[:500]
db.commit()
log.error(
f"[{job_id}] PROCESSING -> FAILED | "
f"max retries exceeded"
)
finally:
db.close()