diff --git a/frontend/app/upload/[provider]/page.tsx b/frontend/app/upload/[provider]/page.tsx
index c964fef9c..0c2de0e70 100644
--- a/frontend/app/upload/[provider]/page.tsx
+++ b/frontend/app/upload/[provider]/page.tsx
@@ -29,11 +29,16 @@ import {
} from "@/components/ui/tooltip";
import { useTask } from "@/contexts/task-context";
import { useSessionIngestSettings } from "@/hooks/useSessionIngestSettings";
-import { duplicateCheck } from "@/lib/upload-utils";
// Connectors that sync entire buckets/repositories without a file picker
const DIRECT_SYNC_PROVIDERS = ["ibm_cos", "aws_s3"];
+interface ConnectorDuplicateCheckResponse {
+ duplicate_names?: string[];
+ duplicate_count?: number;
+ non_duplicate_files?: CloudFile[];
+}
+
// ---------------------------------------------------------------------------
// Shared bucket view — used by both IBM COS and S3
// ---------------------------------------------------------------------------
@@ -419,6 +424,7 @@ export default function UploadProviderPage() {
allFiles: CloudFile[];
nonDuplicateFiles: CloudFile[];
duplicateNames: string[];
+ duplicateCount: number;
} | null>(null);
const isOverwriteConfirmedRef = useRef(false);
@@ -514,24 +520,27 @@ export default function UploadProviderPage() {
throw new Error(`Duplicate check failed: ${checkResponse.statusText}`);
}
- const checkData = await checkResponse.json();
+ const checkData =
+ (await checkResponse.json()) as ConnectorDuplicateCheckResponse;
const duplicateNames = checkData.duplicate_names || [];
- const totalFiles = checkData.total_files || 0;
+ const duplicateCount =
+ typeof checkData.duplicate_count === "number"
+ ? checkData.duplicate_count
+ : duplicateNames.length;
- if (duplicateNames.length === 0) {
+ if (duplicateCount === 0) {
submitSync(connector, selectedFiles, false);
return;
}
- // If all files are duplicates, we set nonDuplicateFiles to empty so it toasts "Nothing was synced" on skip
- const isAllDuplicate = duplicateNames.length === totalFiles;
- const nonDuplicateFiles = isAllDuplicate ? [] : selectedFiles;
+ const nonDuplicateFiles = checkData.non_duplicate_files || [];
setPendingSync({
connector,
allFiles: selectedFiles,
nonDuplicateFiles,
duplicateNames,
+ duplicateCount,
});
setDuplicateDialogOpen(true);
} catch (err) {
@@ -559,12 +568,12 @@ export default function UploadProviderPage() {
// "skip duplicates" branch.
isOverwriteConfirmedRef.current = false;
} else {
- const { connector, nonDuplicateFiles, duplicateNames } = pendingSync;
+ const { connector, nonDuplicateFiles, duplicateCount } = pendingSync;
if (nonDuplicateFiles.length > 0) {
submitSync(connector, nonDuplicateFiles, false);
} else {
toast.info(
- `All ${duplicateNames.length} selected file(s) already exist. Nothing was synced.`,
+ `All ${duplicateCount} selected file(s) already exist. Nothing was synced.`,
);
}
}
@@ -788,6 +797,7 @@ export default function UploadProviderPage() {
onOverwrite={handleOverwriteDuplicates}
isLoading={isIngesting}
duplicateNames={pendingSync?.duplicateNames}
+ duplicateCount={pendingSync?.duplicateCount}
/>
>
);
diff --git a/frontend/components/duplicate-handling-dialog.tsx b/frontend/components/duplicate-handling-dialog.tsx
index 76b5a84c9..7f5e747db 100644
--- a/frontend/components/duplicate-handling-dialog.tsx
+++ b/frontend/components/duplicate-handling-dialog.tsx
@@ -41,9 +41,12 @@ export const DuplicateHandlingDialog: React.FC<
};
const namesProvided = duplicateNames && duplicateNames.length > 0;
- const effectiveCount = namesProvided
- ? duplicateNames!.length
- : duplicateCount;
+ const effectiveCount =
+ typeof duplicateCount === "number"
+ ? duplicateCount
+ : namesProvided
+ ? duplicateNames!.length
+ : undefined;
const description =
typeof effectiveCount === "number"
@@ -79,8 +82,8 @@ export const DuplicateHandlingDialog: React.FC<
{namesProvided && (
- {visibleNames.map((name) => (
- -
+ {visibleNames.map((name, index) => (
+
-
{name}
))}
diff --git a/src/api/connectors.py b/src/api/connectors.py
index 7a6fa2134..a3af0dece 100644
--- a/src/api/connectors.py
+++ b/src/api/connectors.py
@@ -350,6 +350,138 @@ class ConnectorCheckDuplicatesBody(BaseModel):
selected_files: list[Any] | None = None
+def _connector_file_response(file_info: dict[str, Any], cleaned_name: str | None = None) -> dict:
+ """Normalize connector file metadata into the upload page's CloudFile shape."""
+ response = {
+ "id": file_info.get("id"),
+ "name": cleaned_name or file_info.get("name", ""),
+ "mimeType": file_info.get("mimeType")
+ or file_info.get("mime_type")
+ or file_info.get("mimetype")
+ or "",
+ "isFolder": bool(file_info.get("isFolder", False)),
+ }
+ for source_key, target_key in (
+ ("size", "size"),
+ ("webUrl", "webUrl"),
+ ("url", "webUrl"),
+ ("webViewLink", "webViewLink"),
+ ("downloadUrl", "downloadUrl"),
+ ("download_url", "downloadUrl"),
+ ):
+ value = file_info.get(source_key)
+ if value is not None and value != "":
+ response[target_key] = value
+ return response
+
+
+async def _expand_selected_connector_files(
+ connector,
+ selected_files_raw: list[Any],
+) -> list[dict[str, Any]]:
+ file_ids = [f.get("id") for f in selected_files_raw if isinstance(f, dict) and f.get("id")]
+ expanded_files_info: list[dict[str, Any]] = []
+
+ if file_ids and hasattr(connector, "cfg"):
+ original_file_ids = getattr(connector.cfg, "file_ids", None)
+ original_folder_ids = getattr(connector.cfg, "folder_ids", None)
+ try:
+ connector.cfg.file_ids = file_ids
+ connector.cfg.folder_ids = None
+
+ result = await connector.list_files()
+ for f in result.get("files", []):
+ expanded_files_info.append(_connector_file_response(f))
+ except Exception as e:
+ logger.error("Failed to expand files in duplicate check", error=str(e))
+ finally:
+ connector.cfg.file_ids = original_file_ids
+ connector.cfg.folder_ids = original_folder_ids
+
+ if not expanded_files_info:
+ for f in selected_files_raw:
+ if isinstance(f, dict) and not f.get("isFolder"):
+ expanded_files_info.append(_connector_file_response(f))
+
+ return expanded_files_info
+
+
+async def _classify_connector_duplicates(
+ connector,
+ selected_files_raw: list[Any],
+ session_manager,
+ user_id: str,
+ jwt_token: str | None,
+) -> dict[str, Any]:
+ """Expand connector selections and split them into duplicate/non-duplicate files."""
+ expanded_files_info = await _expand_selected_connector_files(connector, selected_files_raw)
+ if not expanded_files_info:
+ return {
+ "duplicate_names": [],
+ "duplicate_files": [],
+ "non_duplicate_files": [],
+ "duplicate_count": 0,
+ "total_files": 0,
+ }
+
+ from utils.file_utils import clean_connector_filename, get_filename_aliases
+
+ cleaned_files = []
+ all_candidates = set()
+ for file_info in expanded_files_info:
+ cleaned_name = clean_connector_filename(file_info["name"], file_info["mimeType"])
+ response_file = _connector_file_response(file_info, cleaned_name=cleaned_name)
+ aliases = get_filename_aliases(cleaned_name)
+ cleaned_files.append((response_file, aliases))
+ all_candidates.update(aliases)
+
+ if not all_candidates:
+ return {
+ "duplicate_names": [],
+ "duplicate_files": [],
+ "non_duplicate_files": [file_info for file_info, _ in cleaned_files],
+ "duplicate_count": 0,
+ "total_files": len(cleaned_files),
+ }
+
+ opensearch_client = session_manager.get_user_opensearch_client(user_id, jwt_token)
+ query_body = {
+ "size": 10000,
+ "query": {"terms": {"filename": list(all_candidates)}},
+ "_source": ["filename"],
+ }
+
+ existing_filenames = set()
+ try:
+ response = await opensearch_client.search(index=get_index_name(), body=query_body)
+ hits = response.get("hits", {}).get("hits", [])
+ for hit in hits:
+ fn = hit.get("_source", {}).get("filename")
+ if fn:
+ existing_filenames.add(fn)
+ except Exception as search_err:
+ if "index_not_found_exception" not in str(search_err):
+ raise
+
+ duplicate_files = []
+ non_duplicate_files = []
+ duplicate_names = []
+ for file_info, aliases in cleaned_files:
+ if any(alias in existing_filenames for alias in aliases):
+ duplicate_files.append(file_info)
+ duplicate_names.append(file_info["name"])
+ else:
+ non_duplicate_files.append(file_info)
+
+ return {
+ "duplicate_names": list(dict.fromkeys(duplicate_names)),
+ "duplicate_files": duplicate_files,
+ "non_duplicate_files": non_duplicate_files,
+ "duplicate_count": len(duplicate_files),
+ "total_files": len(cleaned_files),
+ }
+
+
async def connector_check_duplicates(
connector_type: str,
body: ConnectorCheckDuplicatesBody,
@@ -401,93 +533,14 @@ async def connector_check_duplicates(
status_code=404,
)
- # Get list of file IDs to expand
- file_ids = [f.get("id") for f in selected_files_raw if f.get("id")]
-
- expanded_files_info = []
-
- # Expand files (handling folders) if possible
- if file_ids and hasattr(connector, "cfg"):
- original_file_ids = getattr(connector.cfg, "file_ids", None)
- original_folder_ids = getattr(connector.cfg, "folder_ids", None)
- try:
- connector.cfg.file_ids = file_ids
- connector.cfg.folder_ids = None
-
- result = await connector.list_files()
- expanded_files = result.get("files", [])
- for f in expanded_files:
- expanded_files_info.append(
- {
- "name": f.get("name", ""),
- "mimeType": f.get("mime_type") or f.get("mimeType") or "",
- }
- )
- except Exception as e:
- logger.error("Failed to expand files in duplicate check", error=str(e))
- finally:
- connector.cfg.file_ids = original_file_ids
- connector.cfg.folder_ids = original_folder_ids
-
- # If expansion returned nothing, fall back to non-folders in selected_files_raw
- if not expanded_files_info:
- for f in selected_files_raw:
- if not f.get("isFolder"):
- expanded_files_info.append(
- {"name": f.get("name", ""), "mimeType": f.get("mimeType") or ""}
- )
-
- if not expanded_files_info:
- return JSONResponse({"duplicate_names": []})
-
- # Process and clean names using clean_connector_filename
- from utils.file_utils import clean_connector_filename, get_filename_aliases
-
- cleaned_names = []
- for f in expanded_files_info:
- cleaned_name = clean_connector_filename(f["name"], f["mimeType"])
- cleaned_names.append(cleaned_name)
-
- # Build candidate filenames (including aliases like .txt / .md mapping)
- all_candidates = set()
- for name in cleaned_names:
- all_candidates.update(get_filename_aliases(name))
-
- if not all_candidates:
- return JSONResponse({"duplicate_names": []})
-
- # Query OpenSearch in one batch
- opensearch_client = session_manager.get_user_opensearch_client(user.user_id, jwt_token)
-
- query_body = {
- "size": 10000,
- "query": {"terms": {"filename": list(all_candidates)}},
- "_source": ["filename"],
- }
-
- existing_filenames = set()
- try:
- response = await opensearch_client.search(index=get_index_name(), body=query_body)
- hits = response.get("hits", {}).get("hits", [])
- for hit in hits:
- fn = hit.get("_source", {}).get("filename")
- if fn:
- existing_filenames.add(fn)
- except Exception as search_err:
- if "index_not_found_exception" in str(search_err):
- # Index doesn't exist yet, so no duplicates can exist
- return JSONResponse({"duplicate_names": [], "total_files": len(cleaned_names)})
- raise
-
- # Check which of the cleaned names have duplicates (based on existing_filenames matching any alias)
- duplicate_names = []
- for name in cleaned_names:
- aliases = get_filename_aliases(name)
- if any(alias in existing_filenames for alias in aliases):
- duplicate_names.append(name)
-
return JSONResponse(
- {"duplicate_names": list(set(duplicate_names)), "total_files": len(cleaned_names)}
+ await _classify_connector_duplicates(
+ connector=connector,
+ selected_files_raw=selected_files_raw,
+ session_manager=session_manager,
+ user_id=user.user_id,
+ jwt_token=jwt_token,
+ )
)
except Exception:
@@ -600,6 +653,34 @@ async def connector_sync(
# Explicit files selected (e.g., from file picker) - sync those specific files
from .documents import _ensure_index_exists
+ if not body.replace_duplicates and file_infos:
+ duplicate_check = await _classify_connector_duplicates(
+ connector=await connector_service.get_connector(
+ working_connection.connection_id
+ ),
+ selected_files_raw=file_infos,
+ session_manager=session_manager,
+ user_id=user.user_id,
+ jwt_token=jwt_token,
+ )
+ if duplicate_check["duplicate_count"] > 0:
+ file_infos = duplicate_check["non_duplicate_files"]
+ selected_files = [f["id"] for f in file_infos if f.get("id")]
+ if not selected_files:
+ return JSONResponse(
+ {
+ "status": "no_files",
+ "message": (
+ f"All {duplicate_check['duplicate_count']} selected file(s) "
+ "already exist. Nothing was synced."
+ ),
+ "duplicate_names": duplicate_check["duplicate_names"],
+ "duplicate_count": duplicate_check["duplicate_count"],
+ "total_files": duplicate_check["total_files"],
+ },
+ status_code=200,
+ )
+
await _ensure_index_exists(jwt_token)
task_id = await connector_service.sync_specific_files(
working_connection.connection_id,
diff --git a/src/connectors/service.py b/src/connectors/service.py
index debd26f61..d4afb1450 100644
--- a/src/connectors/service.py
+++ b/src/connectors/service.py
@@ -203,6 +203,7 @@ async def _update_connector_metadata(
owner_user_id: str,
connector_type: str,
jwt_token: str = None,
+ indexed_filename: str | None = None,
):
"""Update indexed chunks with connector-specific metadata"""
from utils.acl_utils import update_document_acl
@@ -273,7 +274,7 @@ async def _update_connector_metadata(
"params": {
"source_url": document.source_url,
"connector_type": connector_type,
- "filename": document.filename,
+ "filename": indexed_filename or document.filename,
"created_time": document.created_time.isoformat()
if document.created_time
else None,
@@ -571,7 +572,7 @@ async def sync_specific_files(
if all_infos:
original_filenames = {
f["id"]: clean_connector_filename(
- f["name"], f.get("mimeType") or f.get("mimetype", "")
+ f["name"], f.get("mimeType") or f.get("mime_type") or f.get("mimetype", "")
)
for f in all_infos
if "id" in f and "name" in f
diff --git a/src/models/processors.py b/src/models/processors.py
index 605fd9063..5b1b28189 100644
--- a/src/models/processors.py
+++ b/src/models/processors.py
@@ -715,8 +715,8 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File
except (FileNotFoundError, ValueError) as e:
msg = str(e).lower()
if "not found" in msg or "404" in msg:
- # File gone at source — remove indexed chunks by document_id
- # (= connector file_id) so it stops appearing in search/chat.
+ # File gone at source — remove indexed chunks by connector file id
+ # so it stops appearing in search/chat.
# Filename rename (e.g. .txt → .md) is irrelevant here.
deleted_chunks = 0
try:
@@ -728,7 +728,10 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File
)
)
deleted_chunks = await delete_chunks_by_document_ids(
- [file_id], opensearch_client, get_index_name()
+ [file_id],
+ opensearch_client,
+ get_index_name(),
+ field="connector_file_id",
)
except Exception as cleanup_err:
logger.error(
@@ -775,7 +778,7 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File
opensearch_client = self.document_service.session_manager.get_user_opensearch_client(
self.user_id, self.jwt_token
)
- if await self.check_filename_exists(document.filename, opensearch_client):
+ if await self.check_filename_exists(file_task.filename, opensearch_client):
if not self.replace_duplicates:
file_task.status = TaskStatus.SKIPPED
file_task.error = None
@@ -788,12 +791,12 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File
upload_task.successful_files += 1
return
await self.delete_document_by_filename(
- document.filename,
+ file_task.filename,
opensearch_client,
)
# Create temporary file from document content
- suffix = os.path.splitext(document.filename)[1]
+ suffix = os.path.splitext(file_task.filename)[1]
if not suffix:
suffix = get_file_extension(document.mimetype)
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
@@ -849,7 +852,7 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File
# Ingest via unified Langflow pipeline (two-phase Docling + Langflow run)
langflow_filename, processed_mimetype = langflow_safe_filename_and_mimetype(
- document.filename, document.mimetype
+ file_task.filename, document.mimetype
)
file_tuple = (langflow_filename, document.content, processed_mimetype)
@@ -932,7 +935,7 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File
file_path=tmp_path,
file_hash=file_hash,
owner_user_id=self.user_id,
- original_filename=document.filename,
+ original_filename=file_task.filename,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
@@ -946,7 +949,11 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File
# Update indexed chunks with connector-specific metadata
if result["status"] in ["indexed", "unchanged"]:
await self.connector_service._update_connector_metadata(
- document, self.user_id, connection.connector_type, self.jwt_token
+ document,
+ self.user_id,
+ connection.connector_type,
+ self.jwt_token,
+ indexed_filename=file_task.filename,
)
# Add connector-specific metadata
diff --git a/tests/unit/connectors/test_connector_file_type_validation.py b/tests/unit/connectors/test_connector_file_type_validation.py
index 12e48a154..b20befbb6 100644
--- a/tests/unit/connectors/test_connector_file_type_validation.py
+++ b/tests/unit/connectors/test_connector_file_type_validation.py
@@ -1,5 +1,7 @@
+import json
import sys
from pathlib import Path
+from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
@@ -83,8 +85,6 @@ async def test_connector_file_processor_fails_incompatible_file():
@pytest.mark.asyncio
async def test_connector_check_duplicates():
- import json
-
from fastapi.responses import JSONResponse
from api.connectors import ConnectorCheckDuplicatesBody, connector_check_duplicates
@@ -151,3 +151,205 @@ async def test_connector_check_duplicates():
assert "existing.pdf" in data["duplicate_names"]
assert "new_file.docx" not in data["duplicate_names"]
assert data["total_files"] == 2
+ assert data["duplicate_count"] == 1
+ assert data["duplicate_files"] == [
+ {
+ "id": "file-1",
+ "name": "existing.pdf",
+ "mimeType": "application/pdf",
+ "isFolder": False,
+ }
+ ]
+ assert data["non_duplicate_files"] == [
+ {
+ "id": "file-2",
+ "name": "new_file.docx",
+ "mimeType": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
+ "isFolder": False,
+ }
+ ]
+
+
+@pytest.mark.asyncio
+async def test_connector_sync_skip_duplicates_returns_no_files_when_all_selected_are_duplicates(
+ monkeypatch,
+):
+ from api import connectors as connectors_api
+
+ monkeypatch.setattr(connectors_api.TelemetryClient, "send_event", AsyncMock())
+ monkeypatch.setattr(connectors_api, "_ensure_index_exists", AsyncMock(), raising=False)
+
+ connector_service = MagicMock()
+ connection_manager = MagicMock()
+ connector_service.connection_manager = connection_manager
+
+ connection = MagicMock()
+ connection.connection_id = "conn-id"
+ connection.is_active = True
+ connection_manager.list_connections = AsyncMock(return_value=[connection])
+
+ connector = MagicMock()
+ connector.is_authenticated = True
+ connector.authenticate = AsyncMock(return_value=True)
+ connector.cfg = MagicMock()
+ connector.list_files = AsyncMock(
+ return_value={
+ "files": [
+ {"id": "file-1", "name": "existing.pdf", "mimeType": "application/pdf"},
+ {
+ "id": "file-2",
+ "name": "existing.docx",
+ "mimeType": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
+ },
+ ]
+ }
+ )
+ connector_service.get_connector = AsyncMock(return_value=connector)
+ connector_service.sync_specific_files = AsyncMock(return_value="task-id")
+
+ session_manager = MagicMock()
+ opensearch_client = AsyncMock()
+ session_manager.get_user_opensearch_client = MagicMock(return_value=opensearch_client)
+ opensearch_client.search = AsyncMock(
+ return_value={
+ "hits": {
+ "hits": [
+ {"_source": {"filename": "existing.pdf"}},
+ {"_source": {"filename": "existing.docx"}},
+ ]
+ }
+ }
+ )
+
+ response = await connectors_api.connector_sync(
+ connector_type="google_drive",
+ body=connectors_api.ConnectorSyncBody(
+ selected_files=[{"id": "folder-id", "name": "Folder", "isFolder": True}],
+ replace_duplicates=False,
+ ),
+ connector_service=connector_service,
+ session_manager=session_manager,
+ user=SimpleNamespace(user_id="user-id", jwt_token="jwt-token"),
+ )
+
+ assert response.status_code == 200
+ data = json.loads(response.body.decode())
+ assert data["status"] == "no_files"
+ assert data["duplicate_count"] == 2
+ assert "already exist" in data["message"]
+ connector_service.sync_specific_files.assert_not_awaited()
+
+
+@pytest.mark.asyncio
+async def test_connector_sync_skip_duplicates_submits_only_expanded_non_duplicates(monkeypatch):
+ from api import connectors as connectors_api
+
+ monkeypatch.setattr(connectors_api.TelemetryClient, "send_event", AsyncMock())
+ monkeypatch.setattr("api.documents._ensure_index_exists", AsyncMock())
+
+ connector_service = MagicMock()
+ connection_manager = MagicMock()
+ connector_service.connection_manager = connection_manager
+
+ connection = MagicMock()
+ connection.connection_id = "conn-id"
+ connection.is_active = True
+ connection_manager.list_connections = AsyncMock(return_value=[connection])
+
+ connector = MagicMock()
+ connector.is_authenticated = True
+ connector.authenticate = AsyncMock(return_value=True)
+ connector.cfg = MagicMock()
+ connector.list_files = AsyncMock(
+ return_value={
+ "files": [
+ {"id": "file-1", "name": "existing.pdf", "mimeType": "application/pdf"},
+ {
+ "id": "file-2",
+ "name": "new_file.docx",
+ "mimeType": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
+ },
+ ]
+ }
+ )
+ connector_service.get_connector = AsyncMock(return_value=connector)
+ connector_service.sync_specific_files = AsyncMock(return_value="task-id")
+
+ session_manager = MagicMock()
+ opensearch_client = AsyncMock()
+ session_manager.get_user_opensearch_client = MagicMock(return_value=opensearch_client)
+ opensearch_client.search = AsyncMock(
+ return_value={"hits": {"hits": [{"_source": {"filename": "existing.pdf"}}]}}
+ )
+
+ response = await connectors_api.connector_sync(
+ connector_type="sharepoint",
+ body=connectors_api.ConnectorSyncBody(
+ selected_files=[{"id": "folder-id", "name": "Folder", "isFolder": True}],
+ replace_duplicates=False,
+ ),
+ connector_service=connector_service,
+ session_manager=session_manager,
+ user=SimpleNamespace(user_id="user-id", jwt_token="jwt-token"),
+ )
+
+ assert response.status_code == 201
+ connector_service.sync_specific_files.assert_awaited_once()
+ args = connector_service.sync_specific_files.await_args.args
+ kwargs = connector_service.sync_specific_files.await_args.kwargs
+ assert args[2] == ["file-2"]
+ assert kwargs["file_infos"] == [
+ {
+ "id": "file-2",
+ "name": "new_file.docx",
+ "mimeType": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
+ "isFolder": False,
+ }
+ ]
+
+
+@pytest.mark.asyncio
+async def test_connector_sync_does_not_report_all_duplicates_when_expansion_is_empty(monkeypatch):
+ from api import connectors as connectors_api
+
+ monkeypatch.setattr(connectors_api.TelemetryClient, "send_event", AsyncMock())
+ monkeypatch.setattr("api.documents._ensure_index_exists", AsyncMock())
+
+ connector_service = MagicMock()
+ connection_manager = MagicMock()
+ connector_service.connection_manager = connection_manager
+
+ connection = MagicMock()
+ connection.connection_id = "conn-id"
+ connection.is_active = True
+ connection_manager.list_connections = AsyncMock(return_value=[connection])
+
+ connector = MagicMock()
+ connector.is_authenticated = True
+ connector.authenticate = AsyncMock(return_value=True)
+ connector.cfg = MagicMock()
+ connector.list_files = AsyncMock(return_value={"files": []})
+ connector_service.get_connector = AsyncMock(return_value=connector)
+ connector_service.sync_specific_files = AsyncMock(return_value="task-id")
+
+ session_manager = MagicMock()
+ opensearch_client = AsyncMock()
+ session_manager.get_user_opensearch_client = MagicMock(return_value=opensearch_client)
+
+ response = await connectors_api.connector_sync(
+ connector_type="google_drive",
+ body=connectors_api.ConnectorSyncBody(
+ selected_files=[{"id": "folder-id", "name": "Folder", "isFolder": True}],
+ replace_duplicates=False,
+ ),
+ connector_service=connector_service,
+ session_manager=session_manager,
+ user=SimpleNamespace(user_id="user-id", jwt_token="jwt-token"),
+ )
+
+ assert response.status_code == 201
+ connector_service.sync_specific_files.assert_awaited_once()
+ args = connector_service.sync_specific_files.await_args.args
+ kwargs = connector_service.sync_specific_files.await_args.kwargs
+ assert args[2] == ["folder-id"]
+ assert kwargs["file_infos"] == [{"id": "folder-id", "name": "Folder", "isFolder": True}]
diff --git a/tests/unit/test_connector_processor_filename_dedupe.py b/tests/unit/test_connector_processor_filename_dedupe.py
index b30e39f0b..f94522f8d 100644
--- a/tests/unit/test_connector_processor_filename_dedupe.py
+++ b/tests/unit/test_connector_processor_filename_dedupe.py
@@ -199,6 +199,39 @@ async def test_connector_processor_deletes_chunks_when_source_returns_404(monkey
assert query["terms"]["connector_file_id"] == ["file-id-1"]
+@pytest.mark.asyncio
+async def test_connector_processor_indexes_cleaned_filename(monkeypatch):
+ """MIME-enforced extensions must be indexed under the same name used for dedupe."""
+ monkeypatch.setattr("config.settings.DISABLE_INGEST_WITH_LANGFLOW", True)
+ processor = _build_connector_processor(replace_duplicates=False)
+ document = ConnectorDocument(
+ id="doc-id-1",
+ filename="My Report",
+ mimetype="application/vnd.google-apps.document",
+ content=b"%PDF-1.4 dummy",
+ source_url="https://example.google.com/file",
+ acl=DocumentACL(owner="user@example.com"),
+ modified_time=datetime.now(),
+ created_time=datetime.now(),
+ )
+ _wire_connector_processor(processor, document, filename_exists=False)
+
+ file_task = _make_file_task()
+ upload_task = _make_upload_task()
+
+ with patch.object(
+ processor,
+ "process_document_standard",
+ new=AsyncMock(return_value={"status": "indexed", "id": "hash-1"}),
+ ) as mock_process:
+ await processor.process_item(upload_task, "file-id-1", file_task)
+
+ assert file_task.filename == "My Report.pdf"
+ assert mock_process.await_args.kwargs["original_filename"] == "My Report.pdf"
+ metadata_call = processor.connector_service._update_connector_metadata.await_args
+ assert metadata_call.kwargs["indexed_filename"] == "My Report.pdf"
+
+
@pytest.mark.asyncio
async def test_connector_processor_proceeds_when_filename_absent(monkeypatch):
monkeypatch.setattr("config.settings.DISABLE_INGEST_WITH_LANGFLOW", True)
@@ -244,15 +277,21 @@ def _wire_langflow_processor(
document: ConnectorDocument,
filename_exists: bool,
hash_exists: bool = False,
+ connector_id_exists: bool = False,
):
opensearch_client = AsyncMock()
async def mock_search(index, body, **kwargs):
- query_str = str(body)
- if "document_id" in query_str:
+ document_id = (
+ body.get("query", {}).get("term", {}).get("document_id")
+ if isinstance(body, dict)
+ else None
+ )
+ if document_id == document.id:
+ return _make_search_response(connector_id_exists)
+ if document_id:
return _make_search_response(hash_exists)
- else:
- return _make_search_response(filename_exists)
+ return _make_search_response(filename_exists)
opensearch_client.search = mock_search
opensearch_client.delete_by_query = AsyncMock(return_value={"deleted": 2})
@@ -300,12 +339,39 @@ async def test_langflow_connector_processor_skips_on_filename_collision(monkeypa
opensearch_client.delete_by_query.assert_not_called()
+@pytest.mark.asyncio
+async def test_langflow_connector_processor_uses_cleaned_filename(monkeypatch):
+ monkeypatch.setattr("config.settings.DISABLE_INGEST_WITH_LANGFLOW", False)
+ processor = _build_langflow_processor(replace_duplicates=False)
+ document = ConnectorDocument(
+ id="doc-id-1",
+ filename="My Report",
+ mimetype="application/vnd.google-apps.document",
+ content=b"%PDF-1.4 dummy",
+ source_url="https://example.google.com/file",
+ acl=DocumentACL(owner="user@example.com"),
+ modified_time=datetime.now(),
+ created_time=datetime.now(),
+ )
+ _wire_langflow_processor(processor, document, filename_exists=False, connector_id_exists=True)
+
+ file_task = _make_file_task()
+ upload_task = _make_upload_task()
+
+ await processor.process_item(upload_task, "file-id-1", file_task)
+
+ upload_call = processor.connector_service.langflow_service.upload_and_ingest_file.await_args
+ assert upload_call.kwargs["file_tuple"][0] == "My Report.pdf"
+
+
@pytest.mark.asyncio
async def test_langflow_connector_processor_overwrites_when_replace_true(monkeypatch):
monkeypatch.setattr("config.settings.DISABLE_INGEST_WITH_LANGFLOW", False)
processor = _build_langflow_processor(replace_duplicates=True)
document = _make_document()
- opensearch_client = _wire_langflow_processor(processor, document, filename_exists=True)
+ opensearch_client = _wire_langflow_processor(
+ processor, document, filename_exists=True, connector_id_exists=True
+ )
file_task = _make_file_task()
upload_task = _make_upload_task()