Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions frontend/app/upload/[provider]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@ import {
} from "@/components/ui/tooltip";
import { useTask } from "@/contexts/task-context";
import { useSessionIngestSettings } from "@/hooks/useSessionIngestSettings";
import { duplicateCheck } from "@/lib/upload-utils";

// Connectors that sync entire buckets/repositories without a file picker
const DIRECT_SYNC_PROVIDERS = ["ibm_cos", "aws_s3"];

interface ConnectorDuplicateCheckResponse {
duplicate_names?: string[];
duplicate_count?: number;
non_duplicate_files?: CloudFile[];
}

// ---------------------------------------------------------------------------
// Shared bucket view — used by both IBM COS and S3
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -419,6 +424,7 @@ export default function UploadProviderPage() {
allFiles: CloudFile[];
nonDuplicateFiles: CloudFile[];
duplicateNames: string[];
duplicateCount: number;
} | null>(null);
const isOverwriteConfirmedRef = useRef(false);

Expand Down Expand Up @@ -514,24 +520,27 @@ export default function UploadProviderPage() {
throw new Error(`Duplicate check failed: ${checkResponse.statusText}`);
}

const checkData = await checkResponse.json();
const checkData =
(await checkResponse.json()) as ConnectorDuplicateCheckResponse;
const duplicateNames = checkData.duplicate_names || [];
const totalFiles = checkData.total_files || 0;
const duplicateCount =
typeof checkData.duplicate_count === "number"
? checkData.duplicate_count
: duplicateNames.length;

if (duplicateNames.length === 0) {
if (duplicateCount === 0) {
submitSync(connector, selectedFiles, false);
return;
}

// If all files are duplicates, we set nonDuplicateFiles to empty so it toasts "Nothing was synced" on skip
const isAllDuplicate = duplicateNames.length === totalFiles;
const nonDuplicateFiles = isAllDuplicate ? [] : selectedFiles;
const nonDuplicateFiles = checkData.non_duplicate_files || [];

setPendingSync({
connector,
allFiles: selectedFiles,
nonDuplicateFiles,
duplicateNames,
duplicateCount,
});
setDuplicateDialogOpen(true);
} catch (err) {
Expand Down Expand Up @@ -559,12 +568,12 @@ export default function UploadProviderPage() {
// "skip duplicates" branch.
isOverwriteConfirmedRef.current = false;
} else {
const { connector, nonDuplicateFiles, duplicateNames } = pendingSync;
const { connector, nonDuplicateFiles, duplicateCount } = pendingSync;
if (nonDuplicateFiles.length > 0) {
submitSync(connector, nonDuplicateFiles, false);
} else {
toast.info(
`All ${duplicateNames.length} selected file(s) already exist. Nothing was synced.`,
`All ${duplicateCount} selected file(s) already exist. Nothing was synced.`,
);
}
}
Expand Down Expand Up @@ -788,6 +797,7 @@ export default function UploadProviderPage() {
onOverwrite={handleOverwriteDuplicates}
isLoading={isIngesting}
duplicateNames={pendingSync?.duplicateNames}
duplicateCount={pendingSync?.duplicateCount}
/>
</>
);
Expand Down
13 changes: 8 additions & 5 deletions frontend/components/duplicate-handling-dialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ export const DuplicateHandlingDialog: React.FC<
};

const namesProvided = duplicateNames && duplicateNames.length > 0;
const effectiveCount = namesProvided
? duplicateNames!.length
: duplicateCount;
const effectiveCount =
typeof duplicateCount === "number"
? duplicateCount
: namesProvided
? duplicateNames!.length
: undefined;

const description =
typeof effectiveCount === "number"
Expand Down Expand Up @@ -79,8 +82,8 @@ export const DuplicateHandlingDialog: React.FC<

{namesProvided && (
<ul className="text-sm text-muted-foreground list-disc pl-5 space-y-0.5">
{visibleNames.map((name) => (
<li key={name} className="break-all">
{visibleNames.map((name, index) => (
<li key={`${name}-${index}`} className="break-all">
{name}
</li>
))}
Expand Down
253 changes: 167 additions & 86 deletions src/api/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,138 @@ class ConnectorCheckDuplicatesBody(BaseModel):
selected_files: list[Any] | None = None


def _connector_file_response(file_info: dict[str, Any], cleaned_name: str | None = None) -> dict:
"""Normalize connector file metadata into the upload page's CloudFile shape."""
response = {
"id": file_info.get("id"),
"name": cleaned_name or file_info.get("name", ""),
"mimeType": file_info.get("mimeType")
or file_info.get("mime_type")
or file_info.get("mimetype")
or "",
"isFolder": bool(file_info.get("isFolder", False)),
}
for source_key, target_key in (
("size", "size"),
("webUrl", "webUrl"),
("url", "webUrl"),
("webViewLink", "webViewLink"),
("downloadUrl", "downloadUrl"),
("download_url", "downloadUrl"),
):
value = file_info.get(source_key)
if value is not None and value != "":
response[target_key] = value
return response


async def _expand_selected_connector_files(
connector,
selected_files_raw: list[Any],
) -> list[dict[str, Any]]:
file_ids = [f.get("id") for f in selected_files_raw if isinstance(f, dict) and f.get("id")]
expanded_files_info: list[dict[str, Any]] = []

if file_ids and hasattr(connector, "cfg"):
original_file_ids = getattr(connector.cfg, "file_ids", None)
original_folder_ids = getattr(connector.cfg, "folder_ids", None)
try:
connector.cfg.file_ids = file_ids
connector.cfg.folder_ids = None

result = await connector.list_files()
for f in result.get("files", []):
expanded_files_info.append(_connector_file_response(f))
except Exception as e:
logger.error("Failed to expand files in duplicate check", error=str(e))
finally:
connector.cfg.file_ids = original_file_ids
connector.cfg.folder_ids = original_folder_ids

if not expanded_files_info:
for f in selected_files_raw:
if isinstance(f, dict) and not f.get("isFolder"):
expanded_files_info.append(_connector_file_response(f))

return expanded_files_info


async def _classify_connector_duplicates(
connector,
selected_files_raw: list[Any],
session_manager,
user_id: str,
jwt_token: str | None,
) -> dict[str, Any]:
"""Expand connector selections and split them into duplicate/non-duplicate files."""
expanded_files_info = await _expand_selected_connector_files(connector, selected_files_raw)
if not expanded_files_info:
return {
"duplicate_names": [],
"duplicate_files": [],
"non_duplicate_files": [],
"duplicate_count": 0,
"total_files": 0,
}

from utils.file_utils import clean_connector_filename, get_filename_aliases

cleaned_files = []
all_candidates = set()
for file_info in expanded_files_info:
cleaned_name = clean_connector_filename(file_info["name"], file_info["mimeType"])
response_file = _connector_file_response(file_info, cleaned_name=cleaned_name)
aliases = get_filename_aliases(cleaned_name)
cleaned_files.append((response_file, aliases))
all_candidates.update(aliases)

if not all_candidates:
return {
"duplicate_names": [],
"duplicate_files": [],
"non_duplicate_files": [file_info for file_info, _ in cleaned_files],
"duplicate_count": 0,
"total_files": len(cleaned_files),
}

opensearch_client = session_manager.get_user_opensearch_client(user_id, jwt_token)
query_body = {
"size": 10000,
"query": {"terms": {"filename": list(all_candidates)}},
"_source": ["filename"],
}

existing_filenames = set()
try:
response = await opensearch_client.search(index=get_index_name(), body=query_body)
hits = response.get("hits", {}).get("hits", [])
for hit in hits:
fn = hit.get("_source", {}).get("filename")
if fn:
existing_filenames.add(fn)
except Exception as search_err:
if "index_not_found_exception" not in str(search_err):
raise

duplicate_files = []
non_duplicate_files = []
duplicate_names = []
for file_info, aliases in cleaned_files:
if any(alias in existing_filenames for alias in aliases):
duplicate_files.append(file_info)
duplicate_names.append(file_info["name"])
else:
non_duplicate_files.append(file_info)

return {
"duplicate_names": list(dict.fromkeys(duplicate_names)),
"duplicate_files": duplicate_files,
"non_duplicate_files": non_duplicate_files,
"duplicate_count": len(duplicate_files),
"total_files": len(cleaned_files),
}


async def connector_check_duplicates(
connector_type: str,
body: ConnectorCheckDuplicatesBody,
Expand Down Expand Up @@ -401,93 +533,14 @@ async def connector_check_duplicates(
status_code=404,
)

# Get list of file IDs to expand
file_ids = [f.get("id") for f in selected_files_raw if f.get("id")]

expanded_files_info = []

# Expand files (handling folders) if possible
if file_ids and hasattr(connector, "cfg"):
original_file_ids = getattr(connector.cfg, "file_ids", None)
original_folder_ids = getattr(connector.cfg, "folder_ids", None)
try:
connector.cfg.file_ids = file_ids
connector.cfg.folder_ids = None

result = await connector.list_files()
expanded_files = result.get("files", [])
for f in expanded_files:
expanded_files_info.append(
{
"name": f.get("name", ""),
"mimeType": f.get("mime_type") or f.get("mimeType") or "",
}
)
except Exception as e:
logger.error("Failed to expand files in duplicate check", error=str(e))
finally:
connector.cfg.file_ids = original_file_ids
connector.cfg.folder_ids = original_folder_ids

# If expansion returned nothing, fall back to non-folders in selected_files_raw
if not expanded_files_info:
for f in selected_files_raw:
if not f.get("isFolder"):
expanded_files_info.append(
{"name": f.get("name", ""), "mimeType": f.get("mimeType") or ""}
)

if not expanded_files_info:
return JSONResponse({"duplicate_names": []})

# Process and clean names using clean_connector_filename
from utils.file_utils import clean_connector_filename, get_filename_aliases

cleaned_names = []
for f in expanded_files_info:
cleaned_name = clean_connector_filename(f["name"], f["mimeType"])
cleaned_names.append(cleaned_name)

# Build candidate filenames (including aliases like .txt / .md mapping)
all_candidates = set()
for name in cleaned_names:
all_candidates.update(get_filename_aliases(name))

if not all_candidates:
return JSONResponse({"duplicate_names": []})

# Query OpenSearch in one batch
opensearch_client = session_manager.get_user_opensearch_client(user.user_id, jwt_token)

query_body = {
"size": 10000,
"query": {"terms": {"filename": list(all_candidates)}},
"_source": ["filename"],
}

existing_filenames = set()
try:
response = await opensearch_client.search(index=get_index_name(), body=query_body)
hits = response.get("hits", {}).get("hits", [])
for hit in hits:
fn = hit.get("_source", {}).get("filename")
if fn:
existing_filenames.add(fn)
except Exception as search_err:
if "index_not_found_exception" in str(search_err):
# Index doesn't exist yet, so no duplicates can exist
return JSONResponse({"duplicate_names": [], "total_files": len(cleaned_names)})
raise

# Check which of the cleaned names have duplicates (based on existing_filenames matching any alias)
duplicate_names = []
for name in cleaned_names:
aliases = get_filename_aliases(name)
if any(alias in existing_filenames for alias in aliases):
duplicate_names.append(name)

return JSONResponse(
{"duplicate_names": list(set(duplicate_names)), "total_files": len(cleaned_names)}
await _classify_connector_duplicates(
connector=connector,
selected_files_raw=selected_files_raw,
session_manager=session_manager,
user_id=user.user_id,
jwt_token=jwt_token,
)
)

except Exception:
Expand Down Expand Up @@ -600,6 +653,34 @@ async def connector_sync(
# Explicit files selected (e.g., from file picker) - sync those specific files
from .documents import _ensure_index_exists

if not body.replace_duplicates and file_infos:
duplicate_check = await _classify_connector_duplicates(
connector=await connector_service.get_connector(
working_connection.connection_id
),
selected_files_raw=file_infos,
session_manager=session_manager,
user_id=user.user_id,
jwt_token=jwt_token,
)
if duplicate_check["duplicate_count"] > 0:
file_infos = duplicate_check["non_duplicate_files"]
selected_files = [f["id"] for f in file_infos if f.get("id")]
if not selected_files:
return JSONResponse(
{
"status": "no_files",
"message": (
f"All {duplicate_check['duplicate_count']} selected file(s) "
"already exist. Nothing was synced."
),
"duplicate_names": duplicate_check["duplicate_names"],
"duplicate_count": duplicate_check["duplicate_count"],
"total_files": duplicate_check["total_files"],
},
status_code=200,
)

await _ensure_index_exists(jwt_token)
task_id = await connector_service.sync_specific_files(
working_connection.connection_id,
Expand Down
Loading
Loading