Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion frontend/app/chat/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ function ChatPage() {
);

if (result.type === "task") {
addTask(result.taskId);
addTask(result.taskId, { source: "file" });
return { type: "task-queued" as const };
}

Expand Down
5 changes: 4 additions & 1 deletion frontend/app/connectors/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ export default function ConnectorsPage() {
if (response.status === 201) {
const taskId = result.task_id;
if (taskId) {
addTask(taskId, { connectorType: connector.type });
addTask(taskId, {
connectorType: connector.type,
source: "connector",
});
setSyncResult({
processed: 0,
total: selectedFiles.length,
Expand Down
22 changes: 21 additions & 1 deletion frontend/app/upload/[provider]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
} from "@/components/ui/tooltip";
import { useTask } from "@/contexts/task-context";
import { useSessionIngestSettings } from "@/hooks/useSessionIngestSettings";
import { trackProcessFailure, trackStartProcess } from "@/lib/analytics";
import { getConnectorDescriptor } from "@/lib/connectors/registry";

// CloudFile interface is now imported from the unified cloud picker
Expand Down Expand Up @@ -91,6 +92,14 @@ export default function UploadProviderPage() {
files: CloudFile[],
replaceDuplicates: boolean,
) => {
trackStartProcess({
processType: "Ingestion",
process: "Document Upload",
category: "Knowledge",
source: "connector",
connector_type: connector.type,
total_files: files.length,
});
syncMutation.mutate(
{
connectorType: connector.type,
Expand All @@ -112,11 +121,22 @@ export default function UploadProviderPage() {
onSuccess: (result) => {
const taskIds = result.task_ids;
if (taskIds && taskIds.length > 0) {
addTask(taskIds[0], { connectorType: connector.type });
addTask(taskIds[0], {
connectorType: connector.type,
source: "connector",
});
router.push("/knowledge");
}
},
onError: (err) => {
trackProcessFailure({
processType: "Ingestion",
process: "Document Upload",
category: "Knowledge",
source: "connector",
connector_type: connector.type,
resultValue: err instanceof Error ? err.message : "Sync failed",
});
toast.error(err instanceof Error ? err.message : "Sync failed");
},
},
Expand Down
27 changes: 25 additions & 2 deletions frontend/components/connectors/shared-bucket-view.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { FileBrowserDialog } from "@/components/file-browser-dialog";
import { Button } from "@/components/ui/button";
import { useAuth } from "@/contexts/auth-context";
import { useSessionIngestSettings } from "@/hooks/useSessionIngestSettings";
import { trackProcessFailure, trackStartProcess } from "@/lib/analytics";

export interface SharedBucketViewProps {
connector: any;
Expand All @@ -21,7 +22,10 @@ export interface SharedBucketViewProps {
onRefetch: () => void;
invalidateQueryKey: readonly unknown[];
syncMutation: ReturnType<typeof useSyncConnector>;
addTask: (id: string, options?: { connectorType?: string }) => void;
addTask: (
id: string,
options?: { connectorType?: string; source?: string },
) => void;
onBack: () => void;
onDone: () => void;
}
Expand Down Expand Up @@ -77,6 +81,14 @@ export function SharedBucketView({
toast.error("Could not start ingest", { description: chunkErr });
return;
}
trackStartProcess({
processType: "Ingestion",
process: "Document Upload",
category: "Knowledge",
source: "connector",
connector_type: connector.type,
total_buckets: selectedBuckets.size,
});
syncMutation.mutate(
{
connectorType: connector.type,
Expand All @@ -91,13 +103,24 @@ export function SharedBucketView({
onSuccess: (result) => {
invalidate();
if (result.task_ids?.length) {
addTask(result.task_ids[0], { connectorType: connector.type });
addTask(result.task_ids[0], {
connectorType: connector.type,
source: "connector",
});
onDone();
} else {
toast.info("No files found in the selected buckets.");
}
},
onError: (err) => {
trackProcessFailure({
processType: "Ingestion",
process: "Document Upload",
category: "Knowledge",
source: "connector",
connector_type: connector.type,
resultValue: err instanceof Error ? err.message : "Sync failed",
});
toast.error(err instanceof Error ? err.message : "Sync failed");
},
},
Expand Down
47 changes: 45 additions & 2 deletions frontend/components/knowledge-dropdown.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { useAuth } from "@/contexts/auth-context";
import { useIsCloudBrand } from "@/contexts/brand-context";
import { useTask } from "@/contexts/task-context";
import { usePermissions } from "@/hooks/use-permissions";
import { trackProcessFailure, trackStartProcess } from "@/lib/analytics";
import {
getConnectorDescriptor,
getConnectorDescriptors,
Expand Down Expand Up @@ -333,11 +334,25 @@ export function KnowledgeDropdown() {

const uploadFile = async (file: File, replace: boolean) => {
setFileUploading(true);
trackStartProcess({

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1b) [Minor] "Started Process" event fires even when uploadFile HTTP request fails — orphaned start event

Problem

  • In knowledge-dropdown.tsx:uploadFile, trackStartProcess is called unconditionally before the try block
  • If uploadFileUtil throws (e.g., network failure, 4xx/5xx response), the catch block shows an error toast but no task is created server-side
  • Without a task, task-context.tsx never fires trackProcessSuccess or trackProcessFailure
  • Result: a "Started Process" analytics event has no corresponding "Ended Process" event — these orphaned events inflate apparent funnel drop-off and skew conversion metrics

Code References

  • frontend/lib/analytics.ts lines 79–81 (trackStartProcess)
  • frontend/contexts/task-context.tsx lines 438–463 (completion event paths)

Potential Solution

  • Move trackStartProcess into the try block, after a successful response from uploadFileUtil:
    const uploadFile = async (file: File, replace: boolean) => {
      setFileUploading(true);
      try {
        await uploadFileUtil(file, replace);
        trackStartProcess({
          processType: "Ingestion",
          process: "Document Upload",
          category: "Knowledge",
          source: "file",
          total_files: 1,
        });
        refetchTasks();
      } catch (error) { ... }
  • This ensures a "Started Process" event only fires when a task has been accepted by the server

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed by adding a failed event in the catch blocks

processType: "Ingestion",
process: "Document Upload",
category: "Knowledge",
source: "file",
total_files: 1,
});

try {
await uploadFileUtil(file, replace);
refetchTasks();
} catch (error) {
trackProcessFailure({
processType: "Ingestion",
process: "Document Upload",
category: "Knowledge",
source: "file",
resultValue: error instanceof Error ? error.message : "Unknown error",
});
// Dispatch event that chat context can listen to
// This avoids circular dependency issues
if (typeof window !== "undefined") {
Expand All @@ -359,6 +374,14 @@ export function KnowledgeDropdown() {
filesToUpload: File[],
replace: boolean,
) => {
trackStartProcess({

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1c) [Normal] Batch folder uploads produce a 1:N start-to-end event ratio

Problem

  • uploadFolderBatches fires a single trackStartProcess event once at entry
  • It then creates one task per batch via uploadFiles in a loop (line 389: addTask(result.taskId))
  • Each task completion independently fires trackProcessSuccess or trackProcessFailure via task-context.tsx
  • For a 100-file folder upload with uploadBatchSize = 10, this produces 1 start event and 10 end events — a 1:10 mismatch

Background Information

  • Funnel analysis tools (Segment included) assume a 1:1 start/end relationship per user action
  • A 1:N ratio causes the funnel to show an inflated completion rate (multiple end events per start)
  • It also makes aggregate metrics like total_files inconsistent: the start event reports total_files: 100, but each end event reports total_files: 10

Code References

  • frontend/contexts/task-context.tsx lines 421–463 (per-task completion tracking)

Potential Solution

  • Track a single logical start and single logical end by waiting for all batches to resolve, then emitting one summary end event from uploadFolderBatches rather than relying on per-task completion events
  • Alternatively, move the start event inside the batch loop so each batch has its own paired start/end:
    for (const batch of batches) {
      try {
        const result = await uploadFiles(batch, replace);
        trackStartProcess({ ..., total_files: batch.length });
        addTask(result.taskId);
      } catch (error) { ... }
    }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we keep this for now unless there are issues w/ the amplitude dashboards.

processType: "Ingestion",
process: "Document Upload",
category: "Knowledge",
source: "folder",
total_files: filesToUpload.length,
});

const batches: File[][] = [];
for (let i = 0; i < filesToUpload.length; i += uploadBatchSize) {
batches.push(filesToUpload.slice(i, i + uploadBatchSize));
Expand All @@ -371,8 +394,15 @@ export function KnowledgeDropdown() {
for (const batch of batches) {
try {
const result = await uploadFiles(batch, replace);
addTask(result.taskId);
addTask(result.taskId, { source: "folder" });
} catch (error) {
trackProcessFailure({
processType: "Ingestion",
process: "Document Upload",
category: "Knowledge",
source: "folder",
resultValue: error instanceof Error ? error.message : "Unknown error",
});
console.error("[Folder Upload] Batch upload failed:", error);
toast.error("Batch upload failed", {
description: error instanceof Error ? error.message : "Unknown error",
Expand Down Expand Up @@ -577,6 +607,12 @@ export function KnowledgeDropdown() {

setFolderLoading(true);
setShowFolderDialog(false);
trackStartProcess({
processType: "Ingestion",
process: "Document Upload",
category: "Knowledge",
source: "path",
});

try {
const response = await fetch("/api/upload_path", {
Expand All @@ -596,7 +632,7 @@ export function KnowledgeDropdown() {
throw new Error("No task ID received from server");
}

addTask(taskId);
addTask(taskId, { source: "path" });
setFolderPath("");
// Refetch tasks to show the new task
refetchTasks();
Expand All @@ -613,6 +649,13 @@ export function KnowledgeDropdown() {
}
}
} catch (error) {
trackProcessFailure({
processType: "Ingestion",
process: "Document Upload",
category: "Knowledge",
source: "path",
resultValue: error instanceof Error ? error.message : "Unknown error",
});
console.error("Folder upload error:", error);
} finally {
setFolderLoading(false);
Expand Down
48 changes: 37 additions & 11 deletions frontend/contexts/task-context.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ export interface TaskFile {
interface TaskContextType {
tasks: Task[];
files: TaskFile[];
addTask: (taskId: string, options?: { connectorType?: string }) => void;
addTask: (
taskId: string,
options?: { connectorType?: string; source?: string },
) => void;
addFiles: (files: Partial<TaskFile>[], taskId: string) => void;
/** Mark knowledge-table overlays as processing when a retry starts. */
markTaskFilesProcessing: (taskId: string, sourceUrls: string[]) => void;
Expand Down Expand Up @@ -100,17 +103,20 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
const [isTaskDialogOpen, setIsTaskDialogOpen] = useState(false);
const previousTasksRef = useRef<Task[]>([]);
const taskConnectorTypesRef = useRef<Map<string, string>>(new Map());
const taskSourcesRef = useRef<Map<string, string>>(new Map());

const clearTaskConnectorType = useCallback((taskId: string) => {
const clearTaskMetadata = useCallback((taskId: string) => {
taskConnectorTypesRef.current.delete(taskId);
taskSourcesRef.current.delete(taskId);
}, []);

const clearTaskConnectorTypesWithoutOverlays = useCallback(
const clearTaskMetadataWithoutOverlays = useCallback(
(prevFiles: TaskFile[], nextFiles: TaskFile[]) => {
const nextTaskIds = new Set(nextFiles.map((file) => file.task_id));
for (const file of prevFiles) {
if (!nextTaskIds.has(file.task_id)) {
taskConnectorTypesRef.current.delete(file.task_id);
taskSourcesRef.current.delete(file.task_id);
}
}
},
Expand Down Expand Up @@ -157,7 +163,7 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
},
);

clearTaskConnectorType(variables.taskId);
clearTaskMetadata(variables.taskId);

// Update file to display as cancelled
setFiles((prevFiles) =>
Expand Down Expand Up @@ -248,7 +254,7 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
const currentTaskIds = new Set(tasks.map((task) => task.task_id));
for (const previousTask of previousTasksRef.current) {
if (!currentTaskIds.has(previousTask.task_id)) {
clearTaskConnectorType(previousTask.task_id);
clearTaskMetadata(previousTask.task_id);
}
}

Expand Down Expand Up @@ -428,6 +434,16 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
const failedFiles = getFailedFileCount(currentTask);
const isTotalFailure = failedFiles > 0 && successfulFiles === 0;

const firstFile = currentTask.files
? Object.values(currentTask.files)[0]
: undefined;
const embeddingModel = firstFile?.embedding_model;
const connectorType =
taskConnectorTypesRef.current.get(currentTask.task_id) || "local";
const source =
taskSourcesRef.current.get(currentTask.task_id) ||
(connectorType === "local" ? "file" : "connector");

if (isTotalFailure) {
trackProcessFailure({
processType: "Ingestion",
Expand All @@ -437,6 +453,9 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
total_files: currentTask.total_files,
failed_files: failedFiles,
duration_seconds: currentTask.duration_seconds,
embedding_model: embeddingModel,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1d) [Minor] source dimension missing from completion events — breaks source-level funnel analysis

Problem

  • All five trackStartProcess call sites include a source field ("file", "folder", "path", "connector")
  • Neither trackProcessSuccess nor trackProcessFailure in task-context.tsx includes a source field
  • This means it is impossible to segment completion rates by upload source (e.g., "what % of connector ingestions succeed vs file uploads?")

Code References

  • frontend/contexts/task-context.tsx lines 438–463 (trackProcessFailure / trackProcessSuccess calls)
  • frontend/components/knowledge-dropdown.tsx lines 337, 370, 596 (start events with source)

Potential Solution

  • Derive or propagate source alongside connector_type and include it in completion payloads:
    const source = connectorType === "local"
      ? "file"   // or use a separate ref to store the source per task_id
      : "connector";
    
    trackProcessSuccess({
      ...,
      connector_type: connectorType,
      source,
    });
  • For finer granularity ("file" vs "folder" vs "path"), store the source in a ref alongside taskConnectorTypesRef, similar to how connector type is stored

connector_type: connectorType,
source,
});
} else {
trackProcessSuccess({
Expand All @@ -448,6 +467,9 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
successful_files: successfulFiles,
failed_files: failedFiles,
duration_seconds: currentTask.duration_seconds,
embedding_model: embeddingModel,
connector_type: connectorType,
source,
});
}

Expand Down Expand Up @@ -541,7 +563,7 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
}
}

clearTaskConnectorType(currentTask.task_id);
clearTaskMetadata(currentTask.task_id);

setFiles((prevFiles) =>
prevFiles.filter((file) => {
Expand Down Expand Up @@ -578,7 +600,7 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
}
void refetchKnowledgeAfterTaskCompletion();
} else if (taskJustReachedTerminal) {
clearTaskConnectorType(currentTask.task_id);
clearTaskMetadata(currentTask.task_id);
}

if (
Expand Down Expand Up @@ -632,16 +654,20 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
tasks,
refetchSearch,
isOnboardingActive,
clearTaskConnectorType,
clearTaskMetadata,
queryClient,
]);

const addTask = useCallback(
(taskId: string, options?: { connectorType?: string }) => {
(taskId: string, options?: { connectorType?: string; source?: string }) => {
const connectorType = options?.connectorType?.trim();
if (connectorType) {
taskConnectorTypesRef.current.set(taskId, connectorType);
}
const source = options?.source?.trim();
if (source) {
taskSourcesRef.current.set(taskId, source);
}
// React Query will automatically handle polling when tasks are active
// Just trigger a refetch to get the latest data
setTimeout(() => {
Expand All @@ -656,11 +682,11 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
const nextFiles = prevFiles.filter(
(file) => file.status !== "active" && file.status !== "failed",
);
clearTaskConnectorTypesWithoutOverlays(prevFiles, nextFiles);
clearTaskMetadataWithoutOverlays(prevFiles, nextFiles);
return nextFiles;
});
await refetchTasks();
}, [refetchTasks, clearTaskConnectorTypesWithoutOverlays]);
}, [refetchTasks, clearTaskMetadataWithoutOverlays]);

const cancelTask = useCallback(
async (taskId: string) => {
Expand Down
10 changes: 10 additions & 0 deletions frontend/lib/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ export const trackButton = <T = Record<string, unknown>>({
}: T & ButtonEventParams): void =>
track("Button Clicked", { action, ...rest } as Record<string, unknown>);

interface StartProcessParams {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1a) [Minor] source field is untyped — bypasses compile-time validation

Problem

  • StartProcessParams defines processType, process, and category but omits source
  • Every call site passes source as an extra generic prop (typed as T = Record<string, unknown>), which bypasses TypeScript compile-time checking
  • Valid values ("file", "folder", "path", "connector") are undocumented at the type level — a future caller could pass any arbitrary string with no warning

Code References

  • frontend/app/upload/[provider]/page.tsx line 100 (source: "connector")
  • frontend/components/connectors/shared-bucket-view.tsx line 85 (source: "connector")
  • frontend/components/knowledge-dropdown.tsx lines 341, 373, 600 (source: "file", "folder", "path")

Potential Solution

  • Add source as an optional typed union to StartProcessParams:
    interface StartProcessParams {
      processType: string;
      process?: string;
      category?: string;
      source?: "file" | "folder" | "path" | "connector";
    }
  • This follows the same pattern as EndProcessParams and makes valid values discoverable/enforced by the compiler

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this is for tracking only type safety is pretty low priority

processType: string;
process?: string;
category?: string;
}

export const trackStartProcess = <T = Record<string, unknown>>(
props: T & StartProcessParams,
): void => track("Started Process", props as Record<string, unknown>);

interface EndProcessParams {
processType: string;
process?: string;
Expand Down
Loading