From 960b02a01f5f8838fe7da016ee6abc4f2157eb86 Mon Sep 17 00:00:00 2001 From: Rico Furtado Date: Mon, 15 Jun 2026 15:31:20 -0400 Subject: [PATCH 1/2] fix: improve error handling and status updates for document processing and add OCR requirement check for image files --- src/models/processors.py | 66 +++++++++++++++++++++++++++--------- src/services/task_service.py | 28 +++++++++++++++ 2 files changed, 78 insertions(+), 16 deletions(-) diff --git a/src/models/processors.py b/src/models/processors.py index f73f53bce..2cabb1fa5 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -560,10 +560,16 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File **standard_kwargs, ) - file_task.status = TaskStatus.COMPLETED - file_task.result = result - file_task.updated_at = time.time() - upload_task.successful_files += 1 + if result.get("status") == "error": + file_task.status = TaskStatus.FAILED + file_task.error = result.get("error", "Failed to process document") + file_task.updated_at = time.time() + upload_task.failed_files += 1 + else: + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + upload_task.successful_files += 1 except Exception as e: file_task.status = TaskStatus.FAILED @@ -844,6 +850,14 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File allowed_users=allowed_users, allowed_groups=allowed_groups, ) + # Langflow returns "success" even when no text was extracted + # (e.g. image files without OCR). Verify the document actually + # landed in OpenSearch before declaring success. + if not await self.check_document_exists(document.id, opensearch_client): + result = { + "status": "error", + "error": "No text content could be extracted from document", + } else: # Standard OpenRAG processing pipeline (process_document_standard) standard_kwargs: dict[str, Any] = {} @@ -892,10 +906,16 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File } ) - file_task.status = TaskStatus.COMPLETED - file_task.result = result - file_task.updated_at = time.time() - upload_task.successful_files += 1 + if result.get("status") == "error": + file_task.status = TaskStatus.FAILED + file_task.error = result.get("error", "Failed to process document") + file_task.updated_at = time.time() + upload_task.failed_files += 1 + else: + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + upload_task.successful_files += 1 except Exception as e: file_task.status = TaskStatus.FAILED @@ -970,9 +990,14 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File ) result["path"] = f"s3://{self.bucket}/{item}" - file_task.status = TaskStatus.COMPLETED - file_task.result = result - upload_task.successful_files += 1 + if result.get("status") == "error": + file_task.status = TaskStatus.FAILED + file_task.error = result.get("error", "Failed to process document") + upload_task.failed_files += 1 + else: + file_task.status = TaskStatus.COMPLETED + file_task.result = result + upload_task.successful_files += 1 except Exception as e: file_task.status = TaskStatus.FAILED @@ -1106,11 +1131,20 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File file_task=file_task, ) - # Update task with success - file_task.status = TaskStatus.COMPLETED - file_task.result = result - file_task.updated_at = time.time() - upload_task.successful_files += 1 + # Langflow returns "success" even when no text was extracted. + # Verify the document actually landed in OpenSearch. + file_hash = hash_id(item) + if not await self.check_document_exists(file_hash, opensearch_client): + file_task.status = TaskStatus.FAILED + file_task.error = "No text content could be extracted from document" + file_task.updated_at = time.time() + upload_task.failed_files += 1 + else: + # Update task with success + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + upload_task.successful_files += 1 except Exception as e: # Update task with failure diff --git a/src/services/task_service.py b/src/services/task_service.py index 521444669..05210aee5 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -64,11 +64,22 @@ ) +# Raster image formats that require OCR to produce any text content. +_OCR_REQUIRED_EXTENSIONS = frozenset({"bmp", "jpeg", "jpg", "png", "tiff", "webp"}) + + def _is_non_retryable_file_error(error: str) -> bool: lowered = error.lower() return any(marker in lowered for marker in _NON_RETRYABLE_FILE_ERROR_MARKERS) +def _is_ocr_required_file(filename: str) -> bool: + """Return True if the file is a raster image that requires OCR to produce text.""" + if not filename or "." not in filename: + return False + return filename.rsplit(".", 1)[-1].lower() in _OCR_REQUIRED_EXTENSIONS + + def _is_transient_connectivity_error(error: str) -> bool: lowered = error.lower() return any(marker in lowered for marker in _TRANSIENT_CONNECTIVITY_ERROR_MARKERS) @@ -958,6 +969,23 @@ def _infer_failure_metadata(self, file_task: FileTask) -> dict | None: # First, check if the error is non-retryable based on common markers if _is_non_retryable_file_error(error): + # Image files that produced no text almost certainly failed because OCR + # is disabled — not because the file is corrupted. Give a targeted tip. + filename = file_task.filename or file_task.file_path or "" + if "no text content could be extracted" in error.lower() and _is_ocr_required_file( + filename + ): + return { + "component": "docling", + "failure_phase": "parsing", + "user_facing_message": ( + "No text content could be extracted from this image file. " + "Enable OCR in Settings > Knowledge Base and retry ingestion." + ), + "actionable_by": "RETRYABLE", + } + + if phase == IngestionPhase.LANGFLOW: component = "langflow" failure_phase = "unknown" From 795337d8eceb8f9aca925a4ec3cf31f65a97b002 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 15 Jun 2026 19:32:09 +0000 Subject: [PATCH 2/2] style: ruff autofix (auto) --- src/services/task_service.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/services/task_service.py b/src/services/task_service.py index 05210aee5..1aaf11a04 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -969,7 +969,7 @@ def _infer_failure_metadata(self, file_task: FileTask) -> dict | None: # First, check if the error is non-retryable based on common markers if _is_non_retryable_file_error(error): - # Image files that produced no text almost certainly failed because OCR + # Image files that produced no text almost certainly failed because OCR # is disabled — not because the file is corrupted. Give a targeted tip. filename = file_task.filename or file_task.file_path or "" if "no text content could be extracted" in error.lower() and _is_ocr_required_file( @@ -985,7 +985,6 @@ def _infer_failure_metadata(self, file_task: FileTask) -> dict | None: "actionable_by": "RETRYABLE", } - if phase == IngestionPhase.LANGFLOW: component = "langflow" failure_phase = "unknown"