diff --git a/src/models/processors.py b/src/models/processors.py index f73f53bce..2cabb1fa5 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -560,10 +560,16 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File **standard_kwargs, ) - file_task.status = TaskStatus.COMPLETED - file_task.result = result - file_task.updated_at = time.time() - upload_task.successful_files += 1 + if result.get("status") == "error": + file_task.status = TaskStatus.FAILED + file_task.error = result.get("error", "Failed to process document") + file_task.updated_at = time.time() + upload_task.failed_files += 1 + else: + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + upload_task.successful_files += 1 except Exception as e: file_task.status = TaskStatus.FAILED @@ -844,6 +850,14 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File allowed_users=allowed_users, allowed_groups=allowed_groups, ) + # Langflow returns "success" even when no text was extracted + # (e.g. image files without OCR). Verify the document actually + # landed in OpenSearch before declaring success. + if not await self.check_document_exists(document.id, opensearch_client): + result = { + "status": "error", + "error": "No text content could be extracted from document", + } else: # Standard OpenRAG processing pipeline (process_document_standard) standard_kwargs: dict[str, Any] = {} @@ -892,10 +906,16 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File } ) - file_task.status = TaskStatus.COMPLETED - file_task.result = result - file_task.updated_at = time.time() - upload_task.successful_files += 1 + if result.get("status") == "error": + file_task.status = TaskStatus.FAILED + file_task.error = result.get("error", "Failed to process document") + file_task.updated_at = time.time() + upload_task.failed_files += 1 + else: + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + upload_task.successful_files += 1 except Exception as e: file_task.status = TaskStatus.FAILED @@ -970,9 +990,14 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File ) result["path"] = f"s3://{self.bucket}/{item}" - file_task.status = TaskStatus.COMPLETED - file_task.result = result - upload_task.successful_files += 1 + if result.get("status") == "error": + file_task.status = TaskStatus.FAILED + file_task.error = result.get("error", "Failed to process document") + upload_task.failed_files += 1 + else: + file_task.status = TaskStatus.COMPLETED + file_task.result = result + upload_task.successful_files += 1 except Exception as e: file_task.status = TaskStatus.FAILED @@ -1106,11 +1131,20 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File file_task=file_task, ) - # Update task with success - file_task.status = TaskStatus.COMPLETED - file_task.result = result - file_task.updated_at = time.time() - upload_task.successful_files += 1 + # Langflow returns "success" even when no text was extracted. + # Verify the document actually landed in OpenSearch. + file_hash = hash_id(item) + if not await self.check_document_exists(file_hash, opensearch_client): + file_task.status = TaskStatus.FAILED + file_task.error = "No text content could be extracted from document" + file_task.updated_at = time.time() + upload_task.failed_files += 1 + else: + # Update task with success + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + upload_task.successful_files += 1 except Exception as e: # Update task with failure diff --git a/src/services/task_service.py b/src/services/task_service.py index 521444669..1aaf11a04 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -64,11 +64,22 @@ ) +# Raster image formats that require OCR to produce any text content. +_OCR_REQUIRED_EXTENSIONS = frozenset({"bmp", "jpeg", "jpg", "png", "tiff", "webp"}) + + def _is_non_retryable_file_error(error: str) -> bool: lowered = error.lower() return any(marker in lowered for marker in _NON_RETRYABLE_FILE_ERROR_MARKERS) +def _is_ocr_required_file(filename: str) -> bool: + """Return True if the file is a raster image that requires OCR to produce text.""" + if not filename or "." not in filename: + return False + return filename.rsplit(".", 1)[-1].lower() in _OCR_REQUIRED_EXTENSIONS + + def _is_transient_connectivity_error(error: str) -> bool: lowered = error.lower() return any(marker in lowered for marker in _TRANSIENT_CONNECTIVITY_ERROR_MARKERS) @@ -958,6 +969,22 @@ def _infer_failure_metadata(self, file_task: FileTask) -> dict | None: # First, check if the error is non-retryable based on common markers if _is_non_retryable_file_error(error): + # Image files that produced no text almost certainly failed because OCR + # is disabled — not because the file is corrupted. Give a targeted tip. + filename = file_task.filename or file_task.file_path or "" + if "no text content could be extracted" in error.lower() and _is_ocr_required_file( + filename + ): + return { + "component": "docling", + "failure_phase": "parsing", + "user_facing_message": ( + "No text content could be extracted from this image file. " + "Enable OCR in Settings > Knowledge Base and retry ingestion." + ), + "actionable_by": "RETRYABLE", + } + if phase == IngestionPhase.LANGFLOW: component = "langflow" failure_phase = "unknown"