Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 50 additions & 16 deletions src/models/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,16 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File
**standard_kwargs,
)

file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
if result.get("status") == "error":
file_task.status = TaskStatus.FAILED
file_task.error = result.get("error", "Failed to process document")
file_task.updated_at = time.time()
upload_task.failed_files += 1
else:
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1

except Exception as e:
file_task.status = TaskStatus.FAILED
Expand Down Expand Up @@ -844,6 +850,14 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File
allowed_users=allowed_users,
allowed_groups=allowed_groups,
)
# Langflow returns "success" even when no text was extracted
# (e.g. image files without OCR). Verify the document actually
# landed in OpenSearch before declaring success.
if not await self.check_document_exists(document.id, opensearch_client):
result = {
"status": "error",
"error": "No text content could be extracted from document",
}
else:
# Standard OpenRAG processing pipeline (process_document_standard)
standard_kwargs: dict[str, Any] = {}
Expand Down Expand Up @@ -892,10 +906,16 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File
}
)

file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
if result.get("status") == "error":
file_task.status = TaskStatus.FAILED
file_task.error = result.get("error", "Failed to process document")
file_task.updated_at = time.time()
upload_task.failed_files += 1
else:
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1

except Exception as e:
file_task.status = TaskStatus.FAILED
Expand Down Expand Up @@ -970,9 +990,14 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File
)

result["path"] = f"s3://{self.bucket}/{item}"
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
if result.get("status") == "error":
file_task.status = TaskStatus.FAILED
file_task.error = result.get("error", "Failed to process document")
upload_task.failed_files += 1
else:
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1

except Exception as e:
file_task.status = TaskStatus.FAILED
Expand Down Expand Up @@ -1106,11 +1131,20 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File
file_task=file_task,
)

# Update task with success
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
# Langflow returns "success" even when no text was extracted.
# Verify the document actually landed in OpenSearch.
file_hash = hash_id(item)
if not await self.check_document_exists(file_hash, opensearch_client):
file_task.status = TaskStatus.FAILED
file_task.error = "No text content could be extracted from document"
file_task.updated_at = time.time()
upload_task.failed_files += 1
else:
# Update task with success
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1

except Exception as e:
# Update task with failure
Expand Down
27 changes: 27 additions & 0 deletions src/services/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,22 @@
)


# Raster image formats that require OCR to produce any text content.
_OCR_REQUIRED_EXTENSIONS = frozenset({"bmp", "jpeg", "jpg", "png", "tiff", "webp"})


def _is_non_retryable_file_error(error: str) -> bool:
lowered = error.lower()
return any(marker in lowered for marker in _NON_RETRYABLE_FILE_ERROR_MARKERS)


def _is_ocr_required_file(filename: str) -> bool:
"""Return True if the file is a raster image that requires OCR to produce text."""
if not filename or "." not in filename:
return False
return filename.rsplit(".", 1)[-1].lower() in _OCR_REQUIRED_EXTENSIONS


def _is_transient_connectivity_error(error: str) -> bool:
lowered = error.lower()
return any(marker in lowered for marker in _TRANSIENT_CONNECTIVITY_ERROR_MARKERS)
Expand Down Expand Up @@ -958,6 +969,22 @@ def _infer_failure_metadata(self, file_task: FileTask) -> dict | None:

# First, check if the error is non-retryable based on common markers
if _is_non_retryable_file_error(error):
# Image files that produced no text almost certainly failed because OCR
# is disabled — not because the file is corrupted. Give a targeted tip.
filename = file_task.filename or file_task.file_path or ""
if "no text content could be extracted" in error.lower() and _is_ocr_required_file(
filename
):
return {
"component": "docling",
"failure_phase": "parsing",
"user_facing_message": (
"No text content could be extracted from this image file. "
"Enable OCR in Settings > Knowledge Base and retry ingestion."
),
"actionable_by": "RETRYABLE",
}

if phase == IngestionPhase.LANGFLOW:
component = "langflow"
failure_phase = "unknown"
Expand Down
Loading