diff --git a/frontend/app/knowledge/page.tsx b/frontend/app/knowledge/page.tsx index f70811f1d..e82d51703 100644 --- a/frontend/app/knowledge/page.tsx +++ b/frontend/app/knowledge/page.tsx @@ -40,6 +40,7 @@ import { TooltipTrigger, } from "@/components/ui/tooltip"; import { useIsCloudBrand } from "@/contexts/brand-context"; +import { useKnowledgeIngestFocus } from "@/hooks/useKnowledgeIngestFocus"; import { getConnectorDescriptor } from "@/lib/connectors/registry"; import { buildKnowledgeTableRows, @@ -453,17 +454,13 @@ function SearchPage() { const gridRows = fileResults; const gridRef = useRef(null); + const { + gridRowsSelectionKey, + onKnowledgeGridReady, + onKnowledgeRowDataUpdated, + } = useKnowledgeIngestFocus(gridRef, gridRows, taskFiles); // Re-run only when row identity/status changes, not on every list poll reference. - const gridRowsSelectionKey = useMemo( - () => - gridRows - .map( - (row) => `${getKnowledgeFileIdentity(row)}:${row.status ?? "active"}`, - ) - .join("\0"), - [gridRows], - ); useEffect(() => { const api = gridRef.current?.api; @@ -1011,6 +1008,8 @@ function SearchPage() { } isRowSelectable={(params) => isDeletableKnowledgeRow(params.data)} domLayout="normal" + onGridReady={onKnowledgeGridReady} + onRowDataUpdated={onKnowledgeRowDataUpdated} onSelectionChanged={onSelectionChanged} pagination={pagination} paginationPageSize={paginationPageSize} @@ -1045,6 +1044,8 @@ function SearchPage() { } isRowSelectable={(params) => isDeletableKnowledgeRow(params.data)} domLayout="normal" + onGridReady={onKnowledgeGridReady} + onRowDataUpdated={onKnowledgeRowDataUpdated} onSelectionChanged={onSelectionChanged} pagination={pagination} paginationPageSize={paginationPageSize} diff --git a/frontend/app/upload/[provider]/page.tsx b/frontend/app/upload/[provider]/page.tsx index 8306192eb..fbfecc31a 100644 --- a/frontend/app/upload/[provider]/page.tsx +++ b/frontend/app/upload/[provider]/page.tsx @@ -19,6 +19,7 @@ import { import { useTask } from "@/contexts/task-context"; import { useSessionIngestSettings } from "@/hooks/useSessionIngestSettings"; import { getConnectorDescriptor } from "@/lib/connectors/registry"; +import { queueKnowledgeIngestFocusForCloudFiles } from "@/lib/knowledge-grid-pagination"; // CloudFile interface is now imported from the unified cloud picker @@ -91,6 +92,7 @@ export default function UploadProviderPage() { files: CloudFile[], replaceDuplicates: boolean, ) => { + queueKnowledgeIngestFocusForCloudFiles(files, replaceDuplicates); syncMutation.mutate( { connectorType: connector.type, diff --git a/frontend/components/AgGrid/registerAgGridModules.ts b/frontend/components/AgGrid/registerAgGridModules.ts index 486a76ba7..764919d22 100644 --- a/frontend/components/AgGrid/registerAgGridModules.ts +++ b/frontend/components/AgGrid/registerAgGridModules.ts @@ -1,5 +1,6 @@ import { CellStyleModule, + ClientSideRowModelApiModule, ClientSideRowModelModule, ColumnApiModule, ColumnAutoSizeModule, @@ -9,7 +10,9 @@ import { ModuleRegistry, PaginationModule, QuickFilterModule, + RowApiModule, RowSelectionModule, + ScrollApiModule, TextFilterModule, ValidationModule, } from "ag-grid-community"; @@ -24,6 +27,9 @@ ModuleRegistry.registerModules([ CellStyleModule, QuickFilterModule, ClientSideRowModelModule, + ClientSideRowModelApiModule, + RowApiModule, + ScrollApiModule, TextFilterModule, DateFilterModule, EventApiModule, diff --git a/frontend/components/knowledge-dropdown.tsx b/frontend/components/knowledge-dropdown.tsx index c8338e8de..d9a250c84 100644 --- a/frontend/components/knowledge-dropdown.tsx +++ b/frontend/components/knowledge-dropdown.tsx @@ -39,6 +39,7 @@ import { getConnectorDescriptor, getConnectorDescriptors, } from "@/lib/connectors/registry"; +import { dispatchKnowledgeIngestFocus } from "@/lib/knowledge-grid-pagination"; import { duplicateCheck, uploadFiles, @@ -335,6 +336,7 @@ export function KnowledgeDropdown() { setFileUploading(true); try { + dispatchKnowledgeIngestFocus(file.name, replace); await uploadFileUtil(file, replace); refetchTasks(); } catch (error) { diff --git a/frontend/contexts/task-context.tsx b/frontend/contexts/task-context.tsx index 8471e3743..d95588a86 100644 --- a/frontend/contexts/task-context.tsx +++ b/frontend/contexts/task-context.tsx @@ -116,6 +116,7 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { }, [], ); + const openTaskDialog = useCallback((taskId: string) => { setTaskDialogTaskId(taskId); setIsTaskDialogOpen(true); @@ -587,6 +588,8 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { !isTerminalFailedTask(previousTask) && isTerminalFailedTask(currentTask) ) { + clearTaskConnectorType(currentTask.task_id); + if (!isOnboardingActive) { selectTask(currentTask.task_id); setIsMenuOpen(true); diff --git a/frontend/hooks/useKnowledgeIngestFocus.ts b/frontend/hooks/useKnowledgeIngestFocus.ts new file mode 100644 index 000000000..39cc6755c --- /dev/null +++ b/frontend/hooks/useKnowledgeIngestFocus.ts @@ -0,0 +1,152 @@ +"use client"; + +import type { AgGridReact } from "ag-grid-react"; +import { type RefObject, useCallback, useEffect, useMemo, useRef } from "react"; +import type { File } from "@/app/api/queries/useGetSearchQuery"; +import type { TaskFile } from "@/contexts/task-context"; +import { + buildGridRowsSelectionKey, + collectNewIngestFocusIdentities, + collectProcessingFocusIdentities, + consumePersistedKnowledgeIngestFocus, + focusPendingIngestRows, + type IngestFocusMode, + inferIngestFocusMode, + ingestFocusModeFromReplace, + KNOWLEDGE_INGEST_FOCUS_EVENT, +} from "@/lib/knowledge-grid-pagination"; + +export function useKnowledgeIngestFocus( + gridRef: RefObject | null>, + gridRows: File[], + taskFiles: TaskFile[], +) { + const paginationSnapshotRef = useRef({ + initialized: false, + taskFiles: [] as TaskFile[], + gridRows: [] as File[], + }); + + const pendingFocusRef = useRef({ + identities: new Set(), + modes: new Map(), + }); + + const tryFocusPendingIngestRows = useCallback( + (rows: File[]) => { + const run = () => { + const api = gridRef.current?.api; + if (!api) { + return; + } + const pending = pendingFocusRef.current; + const resolved = focusPendingIngestRows( + api, + pending.identities, + rows, + pending.modes, + ); + for (const identity of resolved) { + pending.identities.delete(identity); + pending.modes.delete(identity); + } + }; + requestAnimationFrame(() => requestAnimationFrame(run)); + }, + [gridRef], + ); + + const queueIngestFocusIdentities = useCallback( + (identities: string[], mode?: IngestFocusMode, rows: File[] = gridRows) => { + if (identities.length === 0) { + return; + } + const pending = pendingFocusRef.current; + for (const identity of identities) { + pending.identities.add(identity); + pending.modes.set( + identity, + mode ?? + pending.modes.get(identity) ?? + inferIngestFocusMode(identity, rows), + ); + } + tryFocusPendingIngestRows(rows); + }, + [gridRows, tryFocusPendingIngestRows], + ); + + useEffect(() => { + const stored = consumePersistedKnowledgeIngestFocus(); + for (const target of stored) { + queueIngestFocusIdentities( + [target.filename], + ingestFocusModeFromReplace(target.replace), + ); + } + }, [queueIngestFocusIdentities]); + + useEffect(() => { + const handler = (event: Event) => { + const detail = ( + event as CustomEvent<{ filename: string; replace: boolean }> + ).detail; + if (!detail?.filename) { + return; + } + queueIngestFocusIdentities( + [detail.filename], + ingestFocusModeFromReplace(detail.replace), + ); + }; + window.addEventListener(KNOWLEDGE_INGEST_FOCUS_EVENT, handler); + return () => + window.removeEventListener(KNOWLEDGE_INGEST_FOCUS_EVENT, handler); + }, [queueIngestFocusIdentities]); + + useEffect(() => { + const snapshot = paginationSnapshotRef.current; + if (!snapshot.initialized) { + snapshot.taskFiles = taskFiles; + snapshot.gridRows = gridRows; + snapshot.initialized = true; + return; + } + + const fromTasks = collectNewIngestFocusIdentities( + snapshot.taskFiles, + taskFiles, + ); + const fromGrid = collectProcessingFocusIdentities( + snapshot.gridRows, + gridRows, + ); + snapshot.taskFiles = taskFiles; + snapshot.gridRows = gridRows; + + queueIngestFocusIdentities([...new Set([...fromTasks, ...fromGrid])]); + }, [taskFiles, gridRows, queueIngestFocusIdentities]); + + const gridRowsSelectionKey = useMemo( + () => buildGridRowsSelectionKey(gridRows), + [gridRows], + ); + + useEffect(() => { + tryFocusPendingIngestRows(gridRows); + }, [gridRows, gridRowsSelectionKey, tryFocusPendingIngestRows]); + + const onKnowledgeGridReady = useCallback(() => { + tryFocusPendingIngestRows(gridRows); + }, [gridRows, tryFocusPendingIngestRows]); + + const onKnowledgeRowDataUpdated = useCallback(() => { + tryFocusPendingIngestRows(gridRows); + }, [gridRows, tryFocusPendingIngestRows]); + + return { + gridRowsSelectionKey, + onKnowledgeGridReady, + onKnowledgeRowDataUpdated, + }; +} diff --git a/frontend/lib/knowledge-grid-pagination.ts b/frontend/lib/knowledge-grid-pagination.ts new file mode 100644 index 000000000..cfaa72860 --- /dev/null +++ b/frontend/lib/knowledge-grid-pagination.ts @@ -0,0 +1,474 @@ +import type { IRowNode } from "ag-grid-community"; +import type { AgGridReact } from "ag-grid-react"; +import type { File } from "@/app/api/queries/useGetSearchQuery"; +import type { TaskFile } from "@/contexts/task-context"; +import { + cleanConnectorFilename, + getKnowledgeFileAliasKeys, + getKnowledgeFileIdentity, +} from "@/lib/knowledge-table-state"; + +type GridApi = NonNullable["api"]>; + +type GridRowLike = { + filename?: string; + source_url?: string; + status?: string; +}; + +/** Stable key for row identity/status changes (avoids reacting to array reference churn). */ +export function buildGridRowsSelectionKey(rows: GridRowLike[]): string { + return rows + .map((row) => `${getKnowledgeFileIdentity(row)}:${row.status ?? "active"}`) + .join("\0"); +} + +export type IngestFocusMode = "existing" | "new"; + +/** + * Maps an upload overwrite flag to grid pagination focus mode. + * + * Intentionally `replace ? "existing" : "new"` — not inverted: + * - Overwrite: file is already indexed; jump to its current row (first match). + * - New ingest: processing overlay is appended after indexed rows (last page). + */ +export function ingestFocusModeFromReplace(replace: boolean): IngestFocusMode { + return replace ? "existing" : "new"; +} + +export const KNOWLEDGE_INGEST_FOCUS_EVENT = "knowledgeIngestFocus"; + +const INGEST_FOCUS_STORAGE_KEY = "openrag:knowledge-ingest-focus"; + +export type KnowledgeIngestFocusTarget = { + filename: string; + replace: boolean; +}; + +function readPersistedIngestFocusTargets(): KnowledgeIngestFocusTarget[] { + if (typeof window === "undefined") { + return []; + } + try { + const raw = sessionStorage.getItem(INGEST_FOCUS_STORAGE_KEY); + if (!raw) { + return []; + } + const parsed: unknown = JSON.parse(raw); + if (!Array.isArray(parsed)) { + return []; + } + return parsed.filter( + (item): item is KnowledgeIngestFocusTarget => + typeof item === "object" && + item !== null && + typeof (item as KnowledgeIngestFocusTarget).filename === "string" && + typeof (item as KnowledgeIngestFocusTarget).replace === "boolean", + ); + } catch { + return []; + } +} + +/** Persist focus targets across route changes (e.g. cloud upload → /knowledge). */ +export function persistKnowledgeIngestFocus( + targets: KnowledgeIngestFocusTarget[], +): void { + if (typeof window === "undefined" || targets.length === 0) { + return; + } + const merged = [...readPersistedIngestFocusTargets(), ...targets]; + sessionStorage.setItem(INGEST_FOCUS_STORAGE_KEY, JSON.stringify(merged)); +} + +export function consumePersistedKnowledgeIngestFocus(): KnowledgeIngestFocusTarget[] { + if (typeof window === "undefined") { + return []; + } + const targets = readPersistedIngestFocusTargets(); + sessionStorage.removeItem(INGEST_FOCUS_STORAGE_KEY); + return targets; +} + +export function dispatchKnowledgeIngestFocus( + filename: string, + replace: boolean, +): void { + if (typeof window === "undefined") { + return; + } + window.dispatchEvent( + new CustomEvent(KNOWLEDGE_INGEST_FOCUS_EVENT, { + detail: { filename, replace }, + }), + ); +} + +export function queueKnowledgeIngestFocusForCloudFiles( + files: Array<{ name: string; mimeType?: string }>, + replace: boolean, +): void { + if (files.length === 0) { + return; + } + persistKnowledgeIngestFocus( + files.map((file) => ({ + filename: cleanConnectorFilename(file.name, file.mimeType), + replace, + })), + ); +} + +type VisibleRowTarget = { + displayIndex: number; + node: IRowNode | null; +}; + +function rowMatchesIdentitySet( + row: GridRowLike, + identitySet: Set, +): boolean { + return getKnowledgeFileAliasKeys(row).some((key) => identitySet.has(key)); +} + +/** Infer focus mode from current grid rows (task-poll path). */ +export function inferIngestFocusMode( + identity: string, + rowData: GridRowLike[], +): IngestFocusMode { + const identitySet = new Set([identity]); + const hasMatch = rowData.some((row) => + rowMatchesIdentitySet(row, identitySet), + ); + return hasMatch ? "existing" : "new"; +} + +function hasActiveColumnSort(api: GridApi): boolean { + return api.getColumnState().some((column) => column.sort != null); +} + +function findRowDataIndex( + identitySet: Set, + rowData: GridRowLike[], + pick: "first" | "last", +): number { + let targetIndex = -1; + rowData.forEach((row, index) => { + if (!rowMatchesIdentitySet(row, identitySet)) { + return; + } + if (targetIndex < 0) { + targetIndex = index; + return; + } + if (pick === "first") { + targetIndex = Math.min(targetIndex, index); + } else { + targetIndex = Math.max(targetIndex, index); + } + }); + return targetIndex; +} + +/** Earliest or latest visible row (after filter/sort) matching any identity. */ +function findVisibleRowTarget( + api: GridApi, + identitySet: Set, + rowData: GridRowLike[] | undefined, + pick: "first" | "last", +): VisibleRowTarget | null { + let target: VisibleRowTarget | null = null; + + api.forEachNodeAfterFilterAndSort((node, index) => { + const data = node.data as GridRowLike | undefined; + const nodeId = node.id; + const matches = + (nodeId && identitySet.has(nodeId)) || + (data != null && rowMatchesIdentitySet(data, identitySet)); + if (!matches) { + return; + } + if (!target) { + target = { displayIndex: index, node }; + return; + } + if ( + pick === "first" + ? index < target.displayIndex + : index > target.displayIndex + ) { + target = { displayIndex: index, node }; + } + }); + + if (target) { + return target; + } + + for (const id of identitySet) { + const node = api.getRowNode(id); + if (!node) { + continue; + } + let displayIndex = -1; + api.forEachNodeAfterFilterAndSort((current, index) => { + if (current === node || current.id === id) { + displayIndex = index; + } + }); + if (displayIndex >= 0) { + return { displayIndex, node }; + } + } + + if (!rowData?.length || hasActiveColumnSort(api)) { + return null; + } + + const displayIndex = findRowDataIndex(identitySet, rowData, pick); + if (displayIndex < 0) { + return null; + } + + for (const id of identitySet) { + const node = api.getRowNode(id); + if (node) { + return { displayIndex, node }; + } + } + + return { displayIndex, node: null }; +} + +/** Map row alias hits back to the canonical pending ids that were queued. */ +function addResolvablePendingIdsForRowKeys( + found: Set, + identities: Set, + rowKeys: Iterable, +): void { + const keys = new Set(rowKeys); + for (const pendingId of identities) { + for (const alias of getKnowledgeFileAliasKeys({ + filename: pendingId, + source_url: pendingId, + })) { + if (keys.has(alias)) { + found.add(pendingId); + break; + } + } + } +} + +/** Identities from pending that appear in the grid model and/or current rowData. */ +function collectResolvableIdentities( + api: GridApi, + identities: Set, + rowData?: GridRowLike[], +): Set { + const found = new Set(); + + api.forEachNodeAfterFilterAndSort((node) => { + const data = node.data as GridRowLike | undefined; + const rowKeys = new Set(getKnowledgeFileAliasKeys(data)); + if (node.id) { + rowKeys.add(node.id); + } + addResolvablePendingIdsForRowKeys(found, identities, rowKeys); + }); + + if (!rowData?.length || hasActiveColumnSort(api)) { + return found; + } + + for (const row of rowData) { + addResolvablePendingIdsForRowKeys( + found, + identities, + getKnowledgeFileAliasKeys(row), + ); + } + + return found; +} + +function resolveFocusMode( + identitySet: Set, + modes: Map | undefined, +): IngestFocusMode { + for (const id of identitySet) { + const mode = modes?.get(id); + if (mode) { + return mode; + } + } + return "existing"; +} + +/** Identities of task overlays that just started ingesting or were retried. */ +export function collectNewIngestFocusIdentities( + previous: TaskFile[], + current: TaskFile[], +): string[] { + const prevByAlias = new Map(); + for (const file of previous) { + for (const key of getKnowledgeFileAliasKeys(file)) { + prevByAlias.set(key, file); + } + } + + const identities: string[] = []; + const seen = new Set(); + + for (const file of current) { + const keys = getKnowledgeFileAliasKeys(file); + const identity = getKnowledgeFileIdentity(file) || keys[0]; + if (!identity || seen.has(identity)) { + continue; + } + + const prev = keys.map((key) => prevByAlias.get(key)).find(Boolean); + if (!prev) { + if (file.status === "processing") { + identities.push(identity); + seen.add(identity); + } + continue; + } + if (prev.status !== "processing" && file.status === "processing") { + identities.push(identity); + seen.add(identity); + } + } + return identities; +} + +/** Identities of rendered rows that entered the processing state. */ +export function collectProcessingFocusIdentities( + previous: GridRowLike[], + current: GridRowLike[], +): string[] { + const prevStatusByAlias = new Map(); + for (const row of previous) { + const status = row.status ?? "active"; + for (const key of getKnowledgeFileAliasKeys(row)) { + prevStatusByAlias.set(key, status); + } + } + + const identities: string[] = []; + const seen = new Set(); + + for (const row of current) { + const status = row.status ?? "active"; + if (status !== "processing") { + continue; + } + const keys = getKnowledgeFileAliasKeys(row); + const identity = getKnowledgeFileIdentity(row) || keys[0]; + if (!identity || seen.has(identity)) { + continue; + } + const prevStatus = keys + .map((key) => prevStatusByAlias.get(key)) + .find((value) => value !== undefined); + if (prevStatus === undefined || prevStatus !== "processing") { + identities.push(identity); + seen.add(identity); + } + } + return identities; +} + +function scrollToRowTarget( + api: GridApi, + target: VisibleRowTarget, + afterPageChange = false, +): void { + const scroll = () => { + if (target.node) { + api.ensureNodeVisible(target.node, "middle"); + return; + } + api.ensureIndexVisible(target.displayIndex, "middle"); + }; + if (afterPageChange) { + requestAnimationFrame(() => requestAnimationFrame(scroll)); + } else { + requestAnimationFrame(scroll); + } +} + +/** Jump to the pagination page that contains the target ingest row. */ +export function goToGridRowIdentities( + api: GridApi, + identities: Iterable, + rowData?: GridRowLike[], + modes?: Map, +): boolean { + const identitySet = new Set(identities); + if (identitySet.size === 0) { + return false; + } + + const mode = resolveFocusMode(identitySet, modes); + const pick = mode === "new" ? "last" : "first"; + const target = findVisibleRowTarget(api, identitySet, rowData, pick); + + if (!target && mode === "new") { + api.paginationGoToLastPage(); + return true; + } + + if (!target) { + return false; + } + + let didChangePage = false; + const pageSize = api.paginationGetPageSize(); + if (pageSize && pageSize > 0) { + const targetPage = Math.floor(target.displayIndex / pageSize); + const currentPage = api.paginationGetCurrentPage(); + if (currentPage !== targetPage) { + api.paginationGoToPage(targetPage); + didChangePage = true; + } + } + + scrollToRowTarget(api, target, didChangePage); + return true; +} + +/** Focus pending ingest rows once they appear in the grid. Returns resolved identities. */ +export function focusPendingIngestRows( + api: GridApi, + pending: Set, + rowData?: GridRowLike[], + modes?: Map, +): string[] { + if (pending.size === 0) { + return []; + } + + const resolvable = collectResolvableIdentities(api, pending, rowData); + const hasExistingTarget = resolvable.size > 0; + const hasNewOnlyPending = [...pending].some( + (id) => modes?.get(id) === "new" && !resolvable.has(id), + ); + + if (!hasExistingTarget && hasNewOnlyPending) { + api.paginationGoToLastPage(); + return []; + } + + if (!hasExistingTarget) { + return []; + } + + const didJump = goToGridRowIdentities(api, pending, rowData, modes); + if (!didJump) { + return []; + } + + return [...pending].filter((id) => resolvable.has(id)); +} diff --git a/frontend/lib/knowledge-table-state.ts b/frontend/lib/knowledge-table-state.ts index af7faabde..03154e5a0 100644 --- a/frontend/lib/knowledge-table-state.ts +++ b/frontend/lib/knowledge-table-state.ts @@ -28,6 +28,92 @@ export function getKnowledgeFileIdentity(file?: { return ""; } +const MIME_TO_EXTENSION: Record = { + "application/pdf": ".pdf", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document": + ".docx", + "application/msword": ".doc", + "application/vnd.openxmlformats-officedocument.presentationml.presentation": + ".pptx", + "application/vnd.ms-powerpoint": ".ppt", + "text/plain": ".txt", + "text/markdown": ".md", + "text/x-markdown": ".md", + "text/html": ".html", + "text/csv": ".csv", + "application/json": ".json", + "application/xml": ".xml", + "text/xml": ".xml", + "application/rtf": ".rtf", + "application/vnd.google-apps.document": ".pdf", + "application/vnd.google-apps.presentation": ".pdf", + "application/vnd.google-apps.spreadsheet": ".pdf", +}; + +/** Mirror backend `clean_connector_filename` for overlay/focus matching. */ +export function cleanConnectorFilename( + filename: string, + mimetype?: string, +): string { + const trimmed = filename.trim(); + if (!trimmed || !mimetype) { + return trimmed; + } + const suffix = MIME_TO_EXTENSION[mimetype]; + if (!suffix) { + return trimmed; + } + if (!trimmed.toLowerCase().endsWith(suffix.toLowerCase())) { + return trimmed + suffix; + } + return trimmed; +} + +/** Mirror backend `get_filename_aliases` so overlays match indexed connector names. */ +function getKnowledgeFilenameAliases(filename?: string): string[] { + const normalized = filename?.trim() ?? ""; + if (!normalized) { + return []; + } + const aliases = [normalized]; + const lower = normalized.toLowerCase(); + if (lower.endsWith(".txt")) { + aliases.push(`${normalized.slice(0, -4)}.md`); + } else if (lower.endsWith(".md")) { + aliases.push(`${normalized.slice(0, -3)}.txt`); + } + for (const name of [...aliases]) { + aliases.push(name.replace(/ /g, "_").replace(/\//g, "_")); + } + return [...new Set(aliases)]; +} + +function addFilenameAliasKeys(keys: Set, filename?: string): void { + for (const alias of getKnowledgeFilenameAliases(filename)) { + keys.add(alias); + } +} + +/** Lookup keys for matching task overlays to indexed rows (filename, path, basename). */ +export function getKnowledgeFileAliasKeys(file?: { + filename?: string; + source_url?: string; + mimetype?: string; +}): string[] { + const keys = new Set(); + addFilenameAliasKeys( + keys, + cleanConnectorFilename(file?.filename ?? "", file?.mimetype), + ); + const sourceUrl = file?.source_url?.trim(); + if (sourceUrl) { + keys.add(sourceUrl); + addFilenameAliasKeys(keys, sourceUrl.split("/").pop()); + } + addFilenameAliasKeys(keys, getKnowledgeFileIdentity(file)); + return [...keys]; +} + function isMeaningfulConnectorType(connectorType?: string): boolean { const normalized = connectorType?.trim(); return Boolean(normalized && normalized !== "local"); @@ -91,16 +177,61 @@ export function resolveKnowledgeRowConnectorType( return taskType?.trim() || backendType?.trim() || "local"; } +function taskOverlayPriority(status?: string): number { + switch (status) { + case "processing": + return 3; + case "failed": + return 2; + case "active": + return 1; + default: + return 0; + } +} + +function indexTaskFileOverlays( + taskFilesAsFiles: SearchFile[], +): Map { + const map = new Map(); + for (const file of taskFilesAsFiles) { + for (const key of getKnowledgeFileAliasKeys(file)) { + const existing = map.get(key); + if ( + !existing || + taskOverlayPriority(file.status) >= taskOverlayPriority(existing.status) + ) { + map.set(key, file); + } + } + } + return map; +} + +function lookupTaskFileOverlay( + map: Map, + file: SearchFile, +): SearchFile | undefined { + for (const key of getKnowledgeFileAliasKeys(file)) { + const match = map.get(key); + if (match) { + return match; + } + } + return undefined; +} + export function buildKnowledgeTableRows( searchData: SearchFile[], taskFiles: TaskFile[], hasActiveFilter = false, ): SearchFile[] { const taskFilesAsFiles: SearchFile[] = taskFiles.map((taskFile) => { - const normalizedFilename = - taskFile.filename?.trim() || - taskFile.source_url?.trim() || - "Untitled source"; + const rawFilename = + taskFile.filename?.trim() || taskFile.source_url?.trim() || ""; + const normalizedFilename = rawFilename + ? cleanConnectorFilename(rawFilename, taskFile.mimetype) + : "Untitled source"; return { filename: normalizedFilename, @@ -115,27 +246,32 @@ export function buildKnowledgeTableRows( }; }); - const taskFileMap = new Map( - taskFilesAsFiles.map((file) => [getKnowledgeFileIdentity(file), file]), - ); + const taskFileMap = indexTaskFileOverlays(taskFilesAsFiles); const backendFiles = searchData.map((file) => { if (file.connector_type === "openrag_docs") { return file; } - const taskFile = taskFileMap.get(getKnowledgeFileIdentity(file)); + const taskFile = lookupTaskFileOverlay(taskFileMap, file); if (taskFile) { const backendStatus = file.status ?? "active"; + const status = + taskFile.status === "processing" || taskFile.status === "failed" + ? taskFile.status + : backendStatus; return { ...file, - filename: taskFile.filename, - source_url: taskFile.source_url, - connector_type: resolveKnowledgeRowConnectorType( - file.connector_type, - taskFile.connector_type, - backendStatus, - ), - status: backendStatus, + // Indexed row identity: prefer backend fields so filename and source_url stay paired. + filename: file.filename || taskFile.filename, + source_url: file.source_url || taskFile.source_url, + connector_type: isMeaningfulConnectorType(file.connector_type) + ? file.connector_type!.trim() + : resolveKnowledgeRowConnectorType( + file.connector_type, + taskFile.connector_type, + status, + ), + status, error: taskFile.error, embedding_model: taskFile.embedding_model ?? file.embedding_model, embedding_dimensions: @@ -145,9 +281,12 @@ export function buildKnowledgeTableRows( return file; }); - const backendIdentities = new Set( - backendFiles.map((f) => getKnowledgeFileIdentity(f)), - ); + const backendIdentityKeys = new Set(); + for (const file of searchData) { + for (const key of getKnowledgeFileAliasKeys(file)) { + backendIdentityKeys.add(key); + } + } const filteredTaskFiles = taskFilesAsFiles.filter((taskFile) => { if ( @@ -159,8 +298,11 @@ export function buildKnowledgeTableRows( if (taskFile.connector_type === "openrag_docs") { return false; } - const identity = getKnowledgeFileIdentity(taskFile); - if (backendIdentities.has(identity)) { + if ( + getKnowledgeFileAliasKeys(taskFile).some((key) => + backendIdentityKeys.has(key), + ) + ) { return false; } // Keep "active" overlays until the index lists the file (task drops key before refetch). diff --git a/src/connectors/service.py b/src/connectors/service.py index ade3f9e91..1e3ed859f 100644 --- a/src/connectors/service.py +++ b/src/connectors/service.py @@ -421,6 +421,7 @@ async def sync_connector_files( ), models_service=self.models_service, replace_duplicates=replace_duplicates, + connector_type=connector.CONNECTOR_TYPE, ) # Use file IDs as items (no more fake file paths!) @@ -603,6 +604,7 @@ async def sync_specific_files( models_service=self.models_service, ingest_settings=ingest_settings, replace_duplicates=replace_duplicates, + connector_type=connector.CONNECTOR_TYPE, ) # Create custom task using TaskService diff --git a/src/models/processors.py b/src/models/processors.py index c619c46a8..d89811e8b 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -698,6 +698,7 @@ def __init__( models_service=None, ingest_settings: dict[str, Any] | None = None, replace_duplicates: bool = False, + connector_type: str | None = None, ): super().__init__( document_service=document_service, @@ -713,6 +714,7 @@ def __init__( self.owner_email = owner_email self.ingest_settings = ingest_settings self.replace_duplicates = replace_duplicates + self.connector_type = connector_type async def process_item(self, upload_task: UploadTask, item: str, file_task: FileTask) -> None: """Process a connector file using unified methods""" @@ -730,6 +732,8 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File if not connector or not connection: raise ValueError(f"Connection '{self.connection_id}' not found") + connector_type = self.connector_type or connection.connector_type + # Validate file extension early if filename is available VALID_EXTENSIONS = { "adoc", @@ -844,17 +848,20 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File # filename differs from the current one. If any were removed (a real # rename), force a re-ingest below so the file is re-indexed under # the new name instead of short-circuiting as "unchanged". + # Match against file_task.filename — the cleaned name the file is + # actually indexed under — so duplicate/rename detection lines up + # with how chunks are keyed. renamed = ( await self._delete_connector_chunks( document.id, opensearch_client, self.user_id, - keep_filenames=get_filename_aliases(document.filename), + keep_filenames=get_filename_aliases(file_task.filename), ) > 0 ) - if await self.check_filename_exists(document.filename, opensearch_client): + if await self.check_filename_exists(file_task.filename, opensearch_client): if not self.replace_duplicates: file_task.status = TaskStatus.SKIPPED file_task.error = None @@ -867,13 +874,13 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File upload_task.successful_files += 1 return await self.delete_document_by_filename( - document.filename, + file_task.filename, opensearch_client, owner_user_id=self.user_id, ) # Create temporary file from document content - suffix = os.path.splitext(document.filename)[1] + suffix = os.path.splitext(file_task.filename)[1] if not suffix: suffix = get_file_extension(document.mimetype) with auto_cleanup_tempfile(suffix=suffix) as tmp_path: @@ -929,7 +936,7 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File # Ingest via unified Langflow pipeline (two-phase Docling + Langflow run) langflow_filename, processed_mimetype = langflow_safe_filename_and_mimetype( - document.filename, document.mimetype + file_task.filename, document.mimetype ) file_tuple = (langflow_filename, document.content, processed_mimetype) @@ -962,7 +969,7 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File owner=self.user_id, owner_name=self.owner_name, owner_email=self.owner_email, - connector_type=connection.connector_type, + connector_type=connector_type, docling_polling_service=self.connector_service.task_service.docling_polling_service if self.connector_service.task_service else None, @@ -1007,12 +1014,12 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File file_path=tmp_path, file_hash=file_hash, owner_user_id=self.user_id, - original_filename=document.filename, + original_filename=file_task.filename, jwt_token=self.jwt_token, owner_name=self.owner_name, owner_email=self.owner_email, file_size=len(document.content), - connector_type=connection.connector_type, + connector_type=connector_type, acl=document.acl, connector_file_id=document.id, **standard_kwargs, @@ -1023,7 +1030,7 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File await self.connector_service._update_connector_metadata( document, self.user_id, - connection.connector_type, + connector_type, self.jwt_token, id_field="connector_file_id", ) diff --git a/tests/unit/test_connector_processor_filename_dedupe.py b/tests/unit/test_connector_processor_filename_dedupe.py index 37f527187..008cf1390 100644 --- a/tests/unit/test_connector_processor_filename_dedupe.py +++ b/tests/unit/test_connector_processor_filename_dedupe.py @@ -225,6 +225,62 @@ async def test_connector_processor_deletes_chunks_when_source_returns_404( assert fields["document_id"] == "file-id-1" +@pytest.mark.asyncio +async def test_connector_processor_indexes_cleaned_filename(monkeypatch): + """MIME-enforced extensions must be indexed under the same name used for dedupe.""" + monkeypatch.setattr("config.settings.DISABLE_INGEST_WITH_LANGFLOW", True) + processor = _build_connector_processor(replace_duplicates=False) + document = ConnectorDocument( + id="doc-id-1", + filename="My Report", + mimetype="application/vnd.google-apps.document", + content=b"%PDF-1.4 dummy", + source_url="https://example.google.com/file", + acl=DocumentACL(owner="user@example.com"), + modified_time=datetime.now(), + created_time=datetime.now(), + ) + _wire_connector_processor(processor, document, filename_exists=False) + + file_task = _make_file_task() + upload_task = _make_upload_task() + + with patch.object( + processor, + "process_document_standard", + new=AsyncMock(return_value={"status": "indexed", "id": "hash-1"}), + ) as mock_process: + await processor.process_item(upload_task, "file-id-1", file_task) + + assert file_task.filename == "My Report.pdf" + assert mock_process.await_args.kwargs["original_filename"] == "My Report.pdf" + + +@pytest.mark.asyncio +async def test_langflow_connector_processor_uses_cleaned_filename(monkeypatch): + monkeypatch.setattr("config.settings.DISABLE_INGEST_WITH_LANGFLOW", False) + processor = _build_langflow_processor(replace_duplicates=False) + document = ConnectorDocument( + id="doc-id-1", + filename="My Report", + mimetype="application/vnd.google-apps.document", + content=b"%PDF-1.4 dummy", + source_url="https://example.google.com/file", + acl=DocumentACL(owner="user@example.com"), + modified_time=datetime.now(), + created_time=datetime.now(), + ) + _wire_langflow_processor(processor, document, filename_exists=False) + + file_task = _make_file_task() + upload_task = _make_upload_task() + + await processor.process_item(upload_task, "file-id-1", file_task) + + upload_call = processor.connector_service.langflow_service.upload_and_ingest_file.await_args + assert upload_call.kwargs["file_tuple"][0] == "My Report.pdf" + + @pytest.mark.asyncio async def test_connector_processor_proceeds_when_filename_absent(monkeypatch): monkeypatch.setattr("config.settings.DISABLE_INGEST_WITH_LANGFLOW", True)