diff --git a/apps/api/src/modules/conversation/mutation.ts b/apps/api/src/modules/conversation/mutation.ts index 6c1a9e1..7491bd6 100644 --- a/apps/api/src/modules/conversation/mutation.ts +++ b/apps/api/src/modules/conversation/mutation.ts @@ -41,6 +41,32 @@ builder.mutationField('deleteConversation', (t) => id: t.arg.string({ required: true }), }, resolve: async (_parent, args, ctx) => { + // 1. Find all external sources for this conversation + const sources = + await ctx.services.externalSourceService.findByConversationId(args.id); + + // 2. Delete vectors for each source from Pinecone + for (const source of sources) { + try { + await ctx.vectorService.deleteVectors( + source.id, + args.id, // conversationId is the namespace for conversation-linked sources + args.id + ); + } catch (error) { + console.error( + `Failed to delete vectors for source ${source.id}:`, + error + ); + // Continue with other sources even if one fails + } + } + + // 3. Delete all external sources for the conversation + // This will cascade delete their chunks + await ctx.services.externalSourceService.deleteByConversationId(args.id); + + // 4. Delete the conversation itself await ctx.services.conversationService.delete(args.id); return true; }, diff --git a/apps/api/src/modules/externalSource/index.ts b/apps/api/src/modules/externalSource/index.ts index 04b44d1..1a1527b 100644 --- a/apps/api/src/modules/externalSource/index.ts +++ b/apps/api/src/modules/externalSource/index.ts @@ -1,2 +1,3 @@ import './type'; import './query'; +import './mutation'; diff --git a/apps/api/src/modules/externalSource/mutation.ts b/apps/api/src/modules/externalSource/mutation.ts new file mode 100644 index 0000000..ee3c5b7 --- /dev/null +++ b/apps/api/src/modules/externalSource/mutation.ts @@ -0,0 +1,149 @@ +import { builder } from 'config/builder'; + +// ---- Delete external source +builder.mutationField('deleteExternalSource', (t) => + t.field({ + type: 'Boolean', + nullable: false, + args: { + id: t.arg.string({ required: true }), + }, + resolve: async (_parent, args, ctx) => { + try { + // Get the source first to determine the vector namespace + const source = await ctx.services.externalSourceService.findById( + args.id + ); + if (!source) { + throw new Error(`External source ${args.id} not found`); + } + + // TODO: Add ownership verification once user auth is implemented + // if (source.conversation?.userId !== ctx.user.id) { + // throw new Error('Not authorized to delete this source'); + // } + + // Delete vectors from Pinecone first + // Namespace is conversationId for conversation-linked sources, or sourceId for standalone + const vectorNamespace = source.conversationId || args.id; + await ctx.vectorService.deleteVectors( + args.id, + vectorNamespace, + source.conversationId ?? undefined + ); + + // Delete the source and its chunks from the database + await ctx.services.externalSourceService.deleteWithChunks(args.id); + + // TODO: Implement R2 file cleanup - currently leaves orphaned files + // Options: + // 1. Delete synchronously here: ctx.r2.deleteFile(metadata?.fileKey) + // 2. Create an outbox event for async cleanup + // 3. Implement a scheduled cleanup job to remove orphaned R2 files + // Tracked in: [add tracking issue URL when created] + + return true; + } catch (error) { + console.error(`Failed to delete external source ${args.id}:`, error); + throw new Error( + `Failed to delete external source: ${error instanceof Error ? error.message : 'Unknown error'}` + ); + } + }, + }) +); + +// ---- Re-ingest external source +/** + * Re-ingest an external source by triggering the processing pipeline again + * This clears existing chunks and re-processes the file from R2 storage + */ +builder.mutationField('reIngestSource', (t) => + t.prismaField({ + type: 'ExternalSource', + nullable: false, + args: { + id: t.arg.string({ required: true }), + }, + resolve: async (_query, _parent, args, ctx) => { + try { + // Get the source to verify it exists and get file info + const source = await ctx.services.externalSourceService.findById( + args.id + ); + if (!source) { + throw new Error(`External source ${args.id} not found`); + } + + // Get metadata for file info + const metadata = source.metadata as Record | null; + const fileKey = metadata?.fileKey as string | undefined; + + if (!fileKey) { + throw new Error( + 'Cannot re-ingest: source does not have a stored file' + ); + } + + // Delete existing vectors from Pinecone first (external service, not transactional) + // Namespace is conversationId for conversation-linked sources, or sourceId for standalone + // Note: If Pinecone deletion fails, we throw before DB changes, maintaining consistency. + // If Pinecone succeeds but DB fails, vectors are orphaned (acceptable - will be overwritten on retry). + const vectorNamespace = source.conversationId || args.id; + await ctx.vectorService.deleteVectors( + args.id, + vectorNamespace, + source.conversationId ?? undefined + ); + + // Wrap DB operations in a transaction to ensure atomicity: + // - Delete chunks + // - Update source metadata + // - Create outbox event + // This prevents the source from being in an inconsistent state if any DB operation fails. + const updatedSource = await ctx.prisma.$transaction(async (tx: any) => { + // Delete existing chunks (they will be re-created during processing) + await tx.chunk.deleteMany({ + where: { externalSourceId: args.id }, + }); + + // Update source metadata to mark as pending re-ingest + const updated = await tx.externalSource.update({ + where: { id: args.id }, + data: { + metadata: { + ...metadata, + processingFailed: false, + processingError: null, + reIngestRequestedAt: new Date().toISOString(), + status: 'pending', + }, + }, + include: { conversation: true }, + }); + + // Create outbox event for async re-processing + await tx.outbox.create({ + data: { + type: 'SourceReIngestRequested', + payload: { + externalSourceId: args.id, + fileKey, + conversationId: source.conversationId, + }, + }, + }); + + return updated; + }); + + return updatedSource; + } catch (error) { + console.error(`Failed to re-ingest external source ${args.id}:`, error); + throw new Error( + `Failed to re-ingest source: ${error instanceof Error ? error.message : 'Unknown error'}` + ); + } + }, + }) +); diff --git a/apps/api/src/modules/externalSource/query.ts b/apps/api/src/modules/externalSource/query.ts index 66d792d..81b5f17 100644 --- a/apps/api/src/modules/externalSource/query.ts +++ b/apps/api/src/modules/externalSource/query.ts @@ -1,4 +1,5 @@ import { builder } from 'config/builder'; +import { ExternalSourceType } from '@overchat/database/services'; builder.queryField('externalSources', (t) => { return t.prismaField({ @@ -14,3 +15,36 @@ builder.queryField('externalSources', (t) => { }, }); }); + +builder.queryField('externalSourcesAll', (t) => { + return t.prismaField({ + type: ['ExternalSource'], + nullable: false, + args: { + limit: t.arg.int({ required: false, defaultValue: 50 }), + offset: t.arg.int({ required: false, defaultValue: 0 }), + type: t.arg({ type: 'ExternalSourceType', required: false }), + }, + resolve: async (query, _root, args, ctx, _info) => { + return ctx.services.externalSourceService.findAll({ + limit: args.limit ?? 50, + offset: args.offset ?? 0, + type: args.type as ExternalSourceType | undefined, + includeChunks: true, + }); + }, + }); +}); + +builder.queryField('externalSource', (t) => { + return t.prismaField({ + type: 'ExternalSource', + nullable: true, + args: { + id: t.arg.string({ required: true }), + }, + resolve: async (query, _root, args, ctx, _info) => { + return ctx.services.externalSourceService.findByIdWithChunks(args.id); + }, + }); +}); diff --git a/apps/api/src/modules/externalSource/type.ts b/apps/api/src/modules/externalSource/type.ts index f401119..d3a49c2 100644 --- a/apps/api/src/modules/externalSource/type.ts +++ b/apps/api/src/modules/externalSource/type.ts @@ -3,7 +3,7 @@ import { builder } from 'config/builder'; builder.prismaObject('ExternalSource', { fields: (t) => ({ id: t.exposeID('id'), - conversationId: t.exposeString('conversationId'), + conversationId: t.exposeString('conversationId', { nullable: true }), type: t.exposeString('type'), vectorId: t.exposeString('vectorId', { nullable: true }), sourceUrl: t.exposeString('sourceUrl', { nullable: true }), diff --git a/apps/api/src/plugins/r2.ts b/apps/api/src/plugins/r2.ts index 73aedc7..485cdb3 100644 --- a/apps/api/src/plugins/r2.ts +++ b/apps/api/src/plugins/r2.ts @@ -81,13 +81,7 @@ export const r2Plugin = fp( return { success: false, error: 'No file provided' }; } - const conversationId = data.fields?.conversationId?.value; - - if (!conversationId) { - console.log('❌ conversationId validation failed'); - reply.code(400); - return { success: false, error: 'conversationId is required' }; - } + const conversationId = data.fields?.conversationId?.value || undefined; // Convert stream to buffer const buffer = await data.toBuffer(); @@ -160,16 +154,17 @@ export const r2Plugin = fp( try { const { conversationId, filename, contentType } = request.body; - if (!conversationId || !filename || !contentType) { + if (!filename || !contentType) { reply.code(400); return { success: false, - error: 'conversationId, filename, and contentType are required', + error: 'filename and contentType are required', }; } - // Generate R2 key - const key = r2Service.generateKey(conversationId, filename); + // Generate R2 key - use 'standalone' folder for sources without conversation + const folder = conversationId || 'standalone'; + const key = r2Service.generateKey(folder, filename); // Get presigned URL const presignedData = await r2Service.getPresignedUploadUrl( @@ -221,12 +216,11 @@ export const r2Plugin = fp( lowConfidenceThreshold, } = request.body; - if (!key || !conversationId || !filename || !contentType) { + if (!key || !filename || !contentType) { reply.code(400); return { success: false, - error: - 'key, conversationId, filename, and contentType are required', + error: 'key, filename, and contentType are required', }; } @@ -287,13 +281,8 @@ export const r2Plugin = fp( // Extract form fields from the first file (they should be the same for all files) const firstFile = files[0]; - const conversationId = firstFile.fields?.conversationId?.value; - console.log('conversationId', conversationId); - - if (!conversationId) { - reply.code(400); - return { success: false, error: 'conversationId is required' }; - } + const conversationId = + firstFile.fields?.conversationId?.value || undefined; // Process all files const analyze = asBool(firstFile.fields?.analyze?.value); diff --git a/packages/database/prisma/migrations/20251130081107_make_external_source_conversation_optional/migration.sql b/packages/database/prisma/migrations/20251130081107_make_external_source_conversation_optional/migration.sql new file mode 100644 index 0000000..7f88295 --- /dev/null +++ b/packages/database/prisma/migrations/20251130081107_make_external_source_conversation_optional/migration.sql @@ -0,0 +1,17 @@ +-- DropForeignKey +ALTER TABLE "ExternalSource" DROP CONSTRAINT "ExternalSource_conversationId_fkey"; + +-- AlterTable +ALTER TABLE "ExternalSource" ALTER COLUMN "conversationId" DROP NOT NULL; + +-- CreateIndex +CREATE INDEX "ExternalSource_conversationId_idx" ON "ExternalSource"("conversationId"); + +-- CreateIndex +CREATE INDEX "ExternalSource_type_idx" ON "ExternalSource"("type"); + +-- CreateIndex +CREATE INDEX "ExternalSource_createdAt_idx" ON "ExternalSource"("createdAt"); + +-- AddForeignKey +ALTER TABLE "ExternalSource" ADD CONSTRAINT "ExternalSource_conversationId_fkey" FOREIGN KEY ("conversationId") REFERENCES "Conversation"("id") ON DELETE SET NULL ON UPDATE CASCADE; diff --git a/packages/database/prisma/schema.prisma b/packages/database/prisma/schema.prisma index 955650f..a619a84 100644 --- a/packages/database/prisma/schema.prisma +++ b/packages/database/prisma/schema.prisma @@ -120,10 +120,10 @@ enum ExternalSourceType { } model ExternalSource { - id String @id @default(cuid()) - conversationId String + id String @id @default(cuid()) + conversationId String? type ExternalSourceType - conversation Conversation @relation(fields: [conversationId], references: [id]) + conversation Conversation? @relation(fields: [conversationId], references: [id]) vectorId String? /// @zod.string.url({ message: "Must be a valid URL" }) @@ -134,6 +134,10 @@ model ExternalSource { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt chunks Chunk[] + + @@index([conversationId]) + @@index([type]) + @@index([createdAt]) } model Chunk { diff --git a/packages/database/src/services/__tests__/external-source.service.test.ts b/packages/database/src/services/__tests__/external-source.service.test.ts index 12d48cb..da4cfd0 100644 --- a/packages/database/src/services/__tests__/external-source.service.test.ts +++ b/packages/database/src/services/__tests__/external-source.service.test.ts @@ -5,22 +5,32 @@ import { createPrismaMock, resetPrismaMock } from './test-utils.js'; const BASE_SOURCE = { id: 'ckpr2ap7e000001w9det0d3bm', conversationId: 'conv-1', - type: 'DOC', + type: 'PDF', sourceUrl: 'https://example.com/doc', summary: 'Summary', + vectorId: null, metadata: { processingFailed: false }, createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), }; +const STANDALONE_SOURCE = { + ...BASE_SOURCE, + id: 'ckpr2ap7e000002w9det0d3bm', + conversationId: null, +}; + describe('ExternalSourceService', () => { let prisma: ReturnType< typeof createPrismaMock<{ externalSource: readonly [ 'findMany', + 'findUnique', 'update', + 'delete', 'deleteMany', - 'updateMany' + 'updateMany', + 'count' ]; }> >; @@ -30,9 +40,12 @@ describe('ExternalSourceService', () => { prisma = createPrismaMock({ externalSource: [ 'findMany', + 'findUnique', 'update', + 'delete', 'deleteMany', 'updateMany', + 'count', ] as const, }); @@ -176,4 +189,165 @@ describe('ExternalSourceService', () => { ); expect(result).toEqual({ count: 3 }); }); + + describe('findAll', () => { + it('returns all sources with default pagination', async () => { + prisma.externalSource.findMany.mockResolvedValueOnce([ + BASE_SOURCE, + STANDALONE_SOURCE, + ]); + + const results = await service.findAll(); + + expect(prisma.externalSource.findMany).toHaveBeenCalledWith({ + where: {}, + take: 50, + skip: 0, + orderBy: { createdAt: 'desc' }, + include: { + conversation: true, + }, + }); + expect(results).toHaveLength(2); + }); + + it('filters by type when provided', async () => { + prisma.externalSource.findMany.mockResolvedValueOnce([BASE_SOURCE]); + + await service.findAll({ type: 'PDF' }); + + expect(prisma.externalSource.findMany).toHaveBeenCalledWith( + expect.objectContaining({ + where: { type: 'PDF' }, + }) + ); + }); + + it('includes chunks with limit when requested', async () => { + prisma.externalSource.findMany.mockResolvedValueOnce([ + { ...BASE_SOURCE, chunks: [] }, + ]); + + await service.findAll({ includeChunks: true, chunkLimit: 50 }); + + expect(prisma.externalSource.findMany).toHaveBeenCalledWith( + expect.objectContaining({ + include: { + conversation: true, + chunks: { + take: 50, + orderBy: { chunkIndex: 'asc' }, + }, + }, + }) + ); + }); + + it('uses custom pagination', async () => { + prisma.externalSource.findMany.mockResolvedValueOnce([]); + + await service.findAll({ limit: 10, offset: 20 }); + + expect(prisma.externalSource.findMany).toHaveBeenCalledWith( + expect.objectContaining({ + take: 10, + skip: 20, + }) + ); + }); + }); + + describe('findByIdWithChunks', () => { + it('returns source with chunks ordered by index', async () => { + const sourceWithChunks = { + ...BASE_SOURCE, + chunks: [ + { id: 'chunk-1', chunkIndex: 0 }, + { id: 'chunk-2', chunkIndex: 1 }, + ], + }; + prisma.externalSource.findUnique.mockResolvedValueOnce(sourceWithChunks); + + const result = await service.findByIdWithChunks(BASE_SOURCE.id); + + expect(prisma.externalSource.findUnique).toHaveBeenCalledWith({ + where: { id: BASE_SOURCE.id }, + include: { + conversation: true, + chunks: { + orderBy: { chunkIndex: 'asc' }, + }, + }, + }); + expect(result).toBeDefined(); + }); + + it('returns null when source not found', async () => { + prisma.externalSource.findUnique.mockResolvedValueOnce(null); + + const result = await service.findByIdWithChunks('nonexistent'); + + expect(result).toBeNull(); + }); + }); + + describe('deleteWithChunks', () => { + it('deletes source and returns the deleted source', async () => { + const findByIdSpy = vi + .spyOn(service, 'findById') + .mockResolvedValueOnce(BASE_SOURCE as any); + const deleteSpy = vi + .spyOn(service, 'delete') + .mockResolvedValueOnce(BASE_SOURCE as any); + + const result = await service.deleteWithChunks(BASE_SOURCE.id); + + expect(findByIdSpy).toHaveBeenCalledWith(BASE_SOURCE.id); + expect(deleteSpy).toHaveBeenCalledWith(BASE_SOURCE.id); + expect(result).toEqual(BASE_SOURCE); + }); + + it('throws error when source not found', async () => { + vi.spyOn(service, 'findById').mockResolvedValueOnce(null); + + await expect(service.deleteWithChunks('nonexistent')).rejects.toThrow( + 'ExternalSource with id nonexistent not found' + ); + }); + }); + + describe('countAll', () => { + it('counts all sources', async () => { + const countSpy = vi + .spyOn(service, 'count') + .mockResolvedValueOnce(10); + + const result = await service.countAll(); + + expect(countSpy).toHaveBeenCalledWith({}); + expect(result).toBe(10); + }); + + it('counts sources by type', async () => { + const countSpy = vi + .spyOn(service, 'count') + .mockResolvedValueOnce(5); + + const result = await service.countAll('PDF'); + + expect(countSpy).toHaveBeenCalledWith({ type: 'PDF' }); + expect(result).toBe(5); + }); + }); + + describe('standalone sources (conversationId is null)', () => { + it('findAll returns standalone sources', async () => { + prisma.externalSource.findMany.mockResolvedValueOnce([STANDALONE_SOURCE]); + + const results = await service.findAll(); + + expect(results).toHaveLength(1); + expect(results[0].conversationId).toBeNull(); + }); + }); }); diff --git a/packages/database/src/services/external-source.ts b/packages/database/src/services/external-source.ts index 8679c26..3e87ee8 100644 --- a/packages/database/src/services/external-source.ts +++ b/packages/database/src/services/external-source.ts @@ -9,6 +9,17 @@ import { } from '@overchat/shared'; import { z } from 'zod'; +export type ExternalSourceType = z.infer; + +export interface FindAllOptions { + limit?: number; + offset?: number; + type?: ExternalSourceType; + includeChunks?: boolean; + /** Limit number of chunks per source when includeChunks is true (default: 100) */ + chunkLimit?: number; +} + export class ExternalSourceService extends BaseService< ExternalSource, z.infer, @@ -23,6 +34,86 @@ export class ExternalSourceService extends BaseService< conversation: true, }; + /** + * Find all external sources with pagination and optional type filter + * Used for the Sources page to list all standalone sources + */ + async findAll(options: FindAllOptions = {}): Promise { + const { + limit = 50, + offset = 0, + type, + includeChunks = false, + chunkLimit = 100, + } = options; + + const where: any = {}; + if (type) { + where.type = type; + } + + const results = await (this.prisma as any).externalSource.findMany({ + where, + take: limit, + skip: offset, + orderBy: { createdAt: 'desc' }, + include: { + conversation: true, + ...(includeChunks && { + chunks: { + take: chunkLimit, + orderBy: { chunkIndex: 'asc' }, + }, + }), + }, + }); + + return results.map((r: any) => this.resultSchema.parse(r)); + } + + /** + * Find a single external source by ID with its chunks + */ + async findByIdWithChunks(id: string): Promise { + const result = await (this.prisma as any).externalSource.findUnique({ + where: { id }, + include: { + conversation: true, + chunks: { + orderBy: { chunkIndex: 'asc' }, + }, + }, + }); + return result ? this.resultSchema.parse(result) : null; + } + + /** + * Delete an external source and its associated chunks + * Note: Chunks are deleted via CASCADE in the schema + */ + async deleteWithChunks(id: string): Promise { + // First get the source to return it + const source = await this.findById(id); + if (!source) { + throw new Error(`ExternalSource with id ${id} not found`); + } + + // Delete the source (chunks will be cascade deleted) + await this.delete(id); + return source; + } + + /** + * Count all external sources with optional type filter + */ + async countAll(type?: ExternalSourceType): Promise { + const where: z.infer = {}; + if (type) { + where.type = type; + } + return this.count(where); + } + async findByConversationId( conversationId: string ): Promise { diff --git a/packages/database/src/services/index.ts b/packages/database/src/services/index.ts index d5e8862..251b4c7 100644 --- a/packages/database/src/services/index.ts +++ b/packages/database/src/services/index.ts @@ -4,7 +4,11 @@ export { ConversationService } from './conversation.js'; export { ConversationAnalysisService } from './conversation-analysis.js'; export { ConversationTargetService } from './conversation-target.js'; export { ConnectorMetricsService } from './connector-metrics.js'; -export { ExternalSourceService } from './external-source.js'; +export { + ExternalSourceService, + type ExternalSourceType, + type FindAllOptions, +} from './external-source.js'; export { IntentService } from './intent.js'; export { MessageService } from './message.js'; export { VectorService } from './vector.js'; diff --git a/packages/graphql/schema.graphql b/packages/graphql/schema.graphql index 4de30c8..b42d27a 100644 --- a/packages/graphql/schema.graphql +++ b/packages/graphql/schema.graphql @@ -397,11 +397,13 @@ type Mutation { createConversation(projectRef: String, service: String, title: String!): Conversation deactivateConnector(conversationTargetId: String!): Boolean deleteConversation(id: String!): Boolean + deleteExternalSource(id: String!): Boolean! disconnectSlackWorkspace(workspaceId: String!): SlackDisconnectResponse executeIntent(intentId: String!, platform: String): ExecuteIntentResult prepareSecretRotation(projectRef: String!, service: String!): SecretRotationPreparePayload processAllSlackEvents(forceAnalysis: Boolean, limit: Int): ProcessSlackEventsResponse processSlackEvents(channelId: String!, forceAnalysis: Boolean, limit: Int, workspaceId: String!): ProcessSlackEventsResponse + reIngestSource(id: String!): ExternalSource! registerGitHubConnector(conversationId: String!, installationId: String!, owner: String!, repo: String!, setDefault: Boolean = true): RegisterGitHubConnectorPayload registerLinearConnector(conversationId: String!, description: String, setDefault: Boolean = true, teamId: String!, workspaceId: String!): RegisterLinearConnectorPayload registerNotionConnector(conversationId: String!, setDefault: Boolean = true, sources: [NotionSourceInput!], workspaceId: String!): RegisterNotionConnectorPayload @@ -466,7 +468,9 @@ type Query { conversation(id: String!): Conversation conversationAnalyses(conversationId: String!, limit: Int = 10, offset: Int = 0): [ConversationAnalysis!] conversations(limit: Int = 20, offset: Int = 0): [Conversation!] + externalSource(id: String!): ExternalSource externalSources(conversationId: String!): [ExternalSource!] + externalSourcesAll(limit: Int = 50, offset: Int = 0, type: ExternalSourceType): [ExternalSource!]! intent(id: String!): Intent intents(analysisId: String, conversationId: String, isActive: Boolean, limit: Int = 50, maxConfidence: Float, minConfidence: Float, offset: Int = 0, reviewState: IntentReviewState, type: IntentType): [Intent!] latestConversationAnalysis(conversationId: String!): ConversationAnalysis diff --git a/packages/graphql/src/generated/hooks.ts b/packages/graphql/src/generated/hooks.ts index 5556998..4c672c0 100644 --- a/packages/graphql/src/generated/hooks.ts +++ b/packages/graphql/src/generated/hooks.ts @@ -442,11 +442,13 @@ export type Mutation = { createConversation?: Maybe; deactivateConnector?: Maybe; deleteConversation?: Maybe; + deleteExternalSource: Scalars['Boolean']['output']; disconnectSlackWorkspace?: Maybe; executeIntent?: Maybe; prepareSecretRotation?: Maybe; processAllSlackEvents?: Maybe; processSlackEvents?: Maybe; + reIngestSource: ExternalSource; registerGitHubConnector?: Maybe; registerLinearConnector?: Maybe; registerNotionConnector?: Maybe; @@ -517,6 +519,11 @@ export type MutationDeleteConversationArgs = { }; +export type MutationDeleteExternalSourceArgs = { + id: Scalars['String']['input']; +}; + + export type MutationDisconnectSlackWorkspaceArgs = { workspaceId: Scalars['String']['input']; }; @@ -548,6 +555,11 @@ export type MutationProcessSlackEventsArgs = { }; +export type MutationReIngestSourceArgs = { + id: Scalars['String']['input']; +}; + + export type MutationRegisterGitHubConnectorArgs = { conversationId: Scalars['String']['input']; installationId: Scalars['String']['input']; @@ -672,7 +684,9 @@ export type Query = { conversation?: Maybe; conversationAnalyses?: Maybe>; conversations?: Maybe>; + externalSource?: Maybe; externalSources?: Maybe>; + externalSourcesAll: Array; intent?: Maybe; intents?: Maybe>; latestConversationAnalysis?: Maybe; @@ -709,11 +723,23 @@ export type QueryConversationsArgs = { }; +export type QueryExternalSourceArgs = { + id: Scalars['String']['input']; +}; + + export type QueryExternalSourcesArgs = { conversationId: Scalars['String']['input']; }; +export type QueryExternalSourcesAllArgs = { + limit?: InputMaybe; + offset?: InputMaybe; + type?: InputMaybe; +}; + + export type QueryIntentArgs = { id: Scalars['String']['input']; }; diff --git a/packages/graphql/src/generated/sdk.ts b/packages/graphql/src/generated/sdk.ts index 57eae22..5b2c439 100644 --- a/packages/graphql/src/generated/sdk.ts +++ b/packages/graphql/src/generated/sdk.ts @@ -434,11 +434,13 @@ export type Mutation = { createConversation?: Maybe; deactivateConnector?: Maybe; deleteConversation?: Maybe; + deleteExternalSource: Scalars['Boolean']['output']; disconnectSlackWorkspace?: Maybe; executeIntent?: Maybe; prepareSecretRotation?: Maybe; processAllSlackEvents?: Maybe; processSlackEvents?: Maybe; + reIngestSource: ExternalSource; registerGitHubConnector?: Maybe; registerLinearConnector?: Maybe; registerNotionConnector?: Maybe; @@ -509,6 +511,11 @@ export type MutationDeleteConversationArgs = { }; +export type MutationDeleteExternalSourceArgs = { + id: Scalars['String']['input']; +}; + + export type MutationDisconnectSlackWorkspaceArgs = { workspaceId: Scalars['String']['input']; }; @@ -540,6 +547,11 @@ export type MutationProcessSlackEventsArgs = { }; +export type MutationReIngestSourceArgs = { + id: Scalars['String']['input']; +}; + + export type MutationRegisterGitHubConnectorArgs = { conversationId: Scalars['String']['input']; installationId: Scalars['String']['input']; @@ -664,7 +676,9 @@ export type Query = { conversation?: Maybe; conversationAnalyses?: Maybe>; conversations?: Maybe>; + externalSource?: Maybe; externalSources?: Maybe>; + externalSourcesAll: Array; intent?: Maybe; intents?: Maybe>; latestConversationAnalysis?: Maybe; @@ -701,11 +715,23 @@ export type QueryConversationsArgs = { }; +export type QueryExternalSourceArgs = { + id: Scalars['String']['input']; +}; + + export type QueryExternalSourcesArgs = { conversationId: Scalars['String']['input']; }; +export type QueryExternalSourcesAllArgs = { + limit?: InputMaybe; + offset?: InputMaybe; + type?: InputMaybe; +}; + + export type QueryIntentArgs = { id: Scalars['String']['input']; }; diff --git a/packages/graphql/src/generated/types.ts b/packages/graphql/src/generated/types.ts index 2882e88..cfa6df8 100644 --- a/packages/graphql/src/generated/types.ts +++ b/packages/graphql/src/generated/types.ts @@ -431,11 +431,13 @@ export type Mutation = { createConversation: Maybe; deactivateConnector: Maybe; deleteConversation: Maybe; + deleteExternalSource: Scalars['Boolean']['output']; disconnectSlackWorkspace: Maybe; executeIntent: Maybe; prepareSecretRotation: Maybe; processAllSlackEvents: Maybe; processSlackEvents: Maybe; + reIngestSource: ExternalSource; registerGitHubConnector: Maybe; registerLinearConnector: Maybe; registerNotionConnector: Maybe; @@ -506,6 +508,11 @@ export type MutationDeleteConversationArgs = { }; +export type MutationDeleteExternalSourceArgs = { + id: Scalars['String']['input']; +}; + + export type MutationDisconnectSlackWorkspaceArgs = { workspaceId: Scalars['String']['input']; }; @@ -537,6 +544,11 @@ export type MutationProcessSlackEventsArgs = { }; +export type MutationReIngestSourceArgs = { + id: Scalars['String']['input']; +}; + + export type MutationRegisterGitHubConnectorArgs = { conversationId: Scalars['String']['input']; installationId: Scalars['String']['input']; @@ -661,7 +673,9 @@ export type Query = { conversation: Maybe; conversationAnalyses: Maybe>; conversations: Maybe>; + externalSource: Maybe; externalSources: Maybe>; + externalSourcesAll: Array; intent: Maybe; intents: Maybe>; latestConversationAnalysis: Maybe; @@ -698,11 +712,23 @@ export type QueryConversationsArgs = { }; +export type QueryExternalSourceArgs = { + id: Scalars['String']['input']; +}; + + export type QueryExternalSourcesArgs = { conversationId: Scalars['String']['input']; }; +export type QueryExternalSourcesAllArgs = { + limit?: InputMaybe; + offset?: InputMaybe; + type: InputMaybe; +}; + + export type QueryIntentArgs = { id: Scalars['String']['input']; }; diff --git a/packages/shared/src/services-api/index.ts b/packages/shared/src/services-api/index.ts index ec15a45..bf5f5e9 100644 --- a/packages/shared/src/services-api/index.ts +++ b/packages/shared/src/services-api/index.ts @@ -236,6 +236,12 @@ export interface IVectorService { externalSourceId: string ): Promise; + deleteVectors( + externalSourceId: string, + namespace?: string, + conversationId?: string + ): Promise; + storeTicketEmbedding( embeddingId: string, embedding: number[], @@ -272,6 +278,15 @@ export interface IIntentSimilarityService { }>; } +export interface FindAllOptions { + limit?: number; + offset?: number; + type?: z.infer; + includeChunks?: boolean; + /** Limit number of chunks per source when includeChunks is true (default: 100) */ + chunkLimit?: number; +} + export interface IExternalSourceService extends IBaseService< ExternalSource, @@ -279,6 +294,10 @@ export interface IExternalSourceService z.infer, z.infer > { + findAll(options?: FindAllOptions): Promise; + findByIdWithChunks(id: string): Promise; + deleteWithChunks(id: string): Promise; + countAll(type?: z.infer): Promise; findByConversationId(conversationId: string): Promise; findByType( type: z.infer, @@ -308,6 +327,9 @@ export interface IChunkService ): Promise; getChunksByIds(chunkIds: string[]): Promise; getChunkIdsByExternalSource(externalSourceId: string): Promise; + deleteMany( + where: z.infer + ): Promise<{ count: number }>; } export interface IConversationService @@ -593,6 +615,11 @@ export interface OutboxEvent { } export interface IOutboxService { + /** + * Create a generic outbox event + */ + create(data: { type: string; payload: OutboxEventPayload }): Promise; + /** * Create a MessageCreated outbox event */ diff --git a/packages/shared/src/types/prisma/index.ts b/packages/shared/src/types/prisma/index.ts index 6a1c3a1..c0813c8 100644 --- a/packages/shared/src/types/prisma/index.ts +++ b/packages/shared/src/types/prisma/index.ts @@ -257,7 +257,7 @@ export type SlackInstallation = z.infer export const ExternalSourceSchema = z.object({ type: ExternalSourceTypeSchema, id: z.string().cuid(), - conversationId: z.string(), + conversationId: z.string().nullable(), vectorId: z.string().nullable(), sourceUrl: z.string().url({ message: "Must be a valid URL" }).nullable(), summary: z.string().nullable(), @@ -2034,7 +2034,7 @@ export const ExternalSourceWhereInputSchema: z.ZodType ExternalSourceWhereInputSchema).array().optional(), NOT: z.union([ z.lazy(() => ExternalSourceWhereInputSchema),z.lazy(() => ExternalSourceWhereInputSchema).array() ]).optional(), id: z.union([ z.lazy(() => StringFilterSchema),z.string() ]).optional(), - conversationId: z.union([ z.lazy(() => StringFilterSchema),z.string() ]).optional(), + conversationId: z.union([ z.lazy(() => StringNullableFilterSchema),z.string() ]).optional().nullable(), type: z.union([ z.lazy(() => EnumExternalSourceTypeFilterSchema),z.lazy(() => ExternalSourceTypeSchema) ]).optional(), vectorId: z.union([ z.lazy(() => StringNullableFilterSchema),z.string() ]).optional().nullable(), sourceUrl: z.union([ z.lazy(() => StringNullableFilterSchema),z.string() ]).optional().nullable(), @@ -2042,13 +2042,13 @@ export const ExternalSourceWhereInputSchema: z.ZodType JsonNullableFilterSchema).optional(), createdAt: z.union([ z.lazy(() => DateTimeFilterSchema),z.coerce.date() ]).optional(), updatedAt: z.union([ z.lazy(() => DateTimeFilterSchema),z.coerce.date() ]).optional(), - conversation: z.union([ z.lazy(() => ConversationRelationFilterSchema),z.lazy(() => ConversationWhereInputSchema) ]).optional(), + conversation: z.union([ z.lazy(() => ConversationNullableRelationFilterSchema),z.lazy(() => ConversationWhereInputSchema) ]).optional().nullable(), chunks: z.lazy(() => ChunkListRelationFilterSchema).optional() }).strict(); export const ExternalSourceOrderByWithRelationInputSchema: z.ZodType = z.object({ id: z.lazy(() => SortOrderSchema).optional(), - conversationId: z.lazy(() => SortOrderSchema).optional(), + conversationId: z.union([ z.lazy(() => SortOrderSchema),z.lazy(() => SortOrderInputSchema) ]).optional(), type: z.lazy(() => SortOrderSchema).optional(), vectorId: z.union([ z.lazy(() => SortOrderSchema),z.lazy(() => SortOrderInputSchema) ]).optional(), sourceUrl: z.union([ z.lazy(() => SortOrderSchema),z.lazy(() => SortOrderInputSchema) ]).optional(), @@ -2068,7 +2068,7 @@ export const ExternalSourceWhereUniqueInputSchema: z.ZodType ExternalSourceWhereInputSchema),z.lazy(() => ExternalSourceWhereInputSchema).array() ]).optional(), OR: z.lazy(() => ExternalSourceWhereInputSchema).array().optional(), NOT: z.union([ z.lazy(() => ExternalSourceWhereInputSchema),z.lazy(() => ExternalSourceWhereInputSchema).array() ]).optional(), - conversationId: z.union([ z.lazy(() => StringFilterSchema),z.string() ]).optional(), + conversationId: z.union([ z.lazy(() => StringNullableFilterSchema),z.string() ]).optional().nullable(), type: z.union([ z.lazy(() => EnumExternalSourceTypeFilterSchema),z.lazy(() => ExternalSourceTypeSchema) ]).optional(), vectorId: z.union([ z.lazy(() => StringNullableFilterSchema),z.string() ]).optional().nullable(), sourceUrl: z.union([ z.lazy(() => StringNullableFilterSchema),z.string().url({ message: "Must be a valid URL" }) ]).optional().nullable(), @@ -2076,13 +2076,13 @@ export const ExternalSourceWhereUniqueInputSchema: z.ZodType JsonNullableFilterSchema).optional(), createdAt: z.union([ z.lazy(() => DateTimeFilterSchema),z.coerce.date() ]).optional(), updatedAt: z.union([ z.lazy(() => DateTimeFilterSchema),z.coerce.date() ]).optional(), - conversation: z.union([ z.lazy(() => ConversationRelationFilterSchema),z.lazy(() => ConversationWhereInputSchema) ]).optional(), + conversation: z.union([ z.lazy(() => ConversationNullableRelationFilterSchema),z.lazy(() => ConversationWhereInputSchema) ]).optional().nullable(), chunks: z.lazy(() => ChunkListRelationFilterSchema).optional() }).strict()); export const ExternalSourceOrderByWithAggregationInputSchema: z.ZodType = z.object({ id: z.lazy(() => SortOrderSchema).optional(), - conversationId: z.lazy(() => SortOrderSchema).optional(), + conversationId: z.union([ z.lazy(() => SortOrderSchema),z.lazy(() => SortOrderInputSchema) ]).optional(), type: z.lazy(() => SortOrderSchema).optional(), vectorId: z.union([ z.lazy(() => SortOrderSchema),z.lazy(() => SortOrderInputSchema) ]).optional(), sourceUrl: z.union([ z.lazy(() => SortOrderSchema),z.lazy(() => SortOrderInputSchema) ]).optional(), @@ -2100,7 +2100,7 @@ export const ExternalSourceScalarWhereWithAggregatesInputSchema: z.ZodType ExternalSourceScalarWhereWithAggregatesInputSchema).array().optional(), NOT: z.union([ z.lazy(() => ExternalSourceScalarWhereWithAggregatesInputSchema),z.lazy(() => ExternalSourceScalarWhereWithAggregatesInputSchema).array() ]).optional(), id: z.union([ z.lazy(() => StringWithAggregatesFilterSchema),z.string() ]).optional(), - conversationId: z.union([ z.lazy(() => StringWithAggregatesFilterSchema),z.string() ]).optional(), + conversationId: z.union([ z.lazy(() => StringNullableWithAggregatesFilterSchema),z.string() ]).optional().nullable(), type: z.union([ z.lazy(() => EnumExternalSourceTypeWithAggregatesFilterSchema),z.lazy(() => ExternalSourceTypeSchema) ]).optional(), vectorId: z.union([ z.lazy(() => StringNullableWithAggregatesFilterSchema),z.string() ]).optional().nullable(), sourceUrl: z.union([ z.lazy(() => StringNullableWithAggregatesFilterSchema),z.string() ]).optional().nullable(), @@ -4735,13 +4735,13 @@ export const ExternalSourceCreateInputSchema: z.ZodType NullableJsonNullValueInputSchema),InputJsonValueSchema ]).optional(), createdAt: z.coerce.date().optional(), updatedAt: z.coerce.date().optional(), - conversation: z.lazy(() => ConversationCreateNestedOneWithoutExternalDocsInputSchema), + conversation: z.lazy(() => ConversationCreateNestedOneWithoutExternalDocsInputSchema).optional(), chunks: z.lazy(() => ChunkCreateNestedManyWithoutExternalSourceInputSchema).optional() }).strict(); export const ExternalSourceUncheckedCreateInputSchema: z.ZodType = z.object({ id: z.string().cuid().optional(), - conversationId: z.string(), + conversationId: z.string().optional().nullable(), type: z.lazy(() => ExternalSourceTypeSchema), vectorId: z.string().optional().nullable(), sourceUrl: z.string().url({ message: "Must be a valid URL" }).optional().nullable(), @@ -4761,13 +4761,13 @@ export const ExternalSourceUpdateInputSchema: z.ZodType NullableJsonNullValueInputSchema),InputJsonValueSchema ]).optional(), createdAt: z.union([ z.coerce.date(),z.lazy(() => DateTimeFieldUpdateOperationsInputSchema) ]).optional(), updatedAt: z.union([ z.coerce.date(),z.lazy(() => DateTimeFieldUpdateOperationsInputSchema) ]).optional(), - conversation: z.lazy(() => ConversationUpdateOneRequiredWithoutExternalDocsNestedInputSchema).optional(), + conversation: z.lazy(() => ConversationUpdateOneWithoutExternalDocsNestedInputSchema).optional(), chunks: z.lazy(() => ChunkUpdateManyWithoutExternalSourceNestedInputSchema).optional() }).strict(); export const ExternalSourceUncheckedUpdateInputSchema: z.ZodType = z.object({ id: z.union([ z.string().cuid(),z.lazy(() => StringFieldUpdateOperationsInputSchema) ]).optional(), - conversationId: z.union([ z.string(),z.lazy(() => StringFieldUpdateOperationsInputSchema) ]).optional(), + conversationId: z.union([ z.string(),z.lazy(() => NullableStringFieldUpdateOperationsInputSchema) ]).optional().nullable(), type: z.union([ z.lazy(() => ExternalSourceTypeSchema),z.lazy(() => EnumExternalSourceTypeFieldUpdateOperationsInputSchema) ]).optional(), vectorId: z.union([ z.string(),z.lazy(() => NullableStringFieldUpdateOperationsInputSchema) ]).optional().nullable(), sourceUrl: z.union([ z.string().url({ message: "Must be a valid URL" }),z.lazy(() => NullableStringFieldUpdateOperationsInputSchema) ]).optional().nullable(), @@ -4780,7 +4780,7 @@ export const ExternalSourceUncheckedUpdateInputSchema: z.ZodType = z.object({ id: z.string().cuid().optional(), - conversationId: z.string(), + conversationId: z.string().optional().nullable(), type: z.lazy(() => ExternalSourceTypeSchema), vectorId: z.string().optional().nullable(), sourceUrl: z.string().url({ message: "Must be a valid URL" }).optional().nullable(), @@ -4803,7 +4803,7 @@ export const ExternalSourceUpdateManyMutationInputSchema: z.ZodType = z.object({ id: z.union([ z.string().cuid(),z.lazy(() => StringFieldUpdateOperationsInputSchema) ]).optional(), - conversationId: z.union([ z.string(),z.lazy(() => StringFieldUpdateOperationsInputSchema) ]).optional(), + conversationId: z.union([ z.string(),z.lazy(() => NullableStringFieldUpdateOperationsInputSchema) ]).optional().nullable(), type: z.union([ z.lazy(() => ExternalSourceTypeSchema),z.lazy(() => EnumExternalSourceTypeFieldUpdateOperationsInputSchema) ]).optional(), vectorId: z.union([ z.string(),z.lazy(() => NullableStringFieldUpdateOperationsInputSchema) ]).optional().nullable(), sourceUrl: z.union([ z.string().url({ message: "Must be a valid URL" }),z.lazy(() => NullableStringFieldUpdateOperationsInputSchema) ]).optional().nullable(), @@ -7440,6 +7440,11 @@ export const EnumExternalSourceTypeFilterSchema: z.ZodType ExternalSourceTypeSchema),z.lazy(() => NestedEnumExternalSourceTypeFilterSchema) ]).optional(), }).strict(); +export const ConversationNullableRelationFilterSchema: z.ZodType = z.object({ + is: z.lazy(() => ConversationWhereInputSchema).optional().nullable(), + isNot: z.lazy(() => ConversationWhereInputSchema).optional().nullable() +}).strict(); + export const ChunkListRelationFilterSchema: z.ZodType = z.object({ every: z.lazy(() => ChunkWhereInputSchema).optional(), some: z.lazy(() => ChunkWhereInputSchema).optional(), @@ -9507,10 +9512,12 @@ export const EnumExternalSourceTypeFieldUpdateOperationsInputSchema: z.ZodType

ExternalSourceTypeSchema).optional() }).strict(); -export const ConversationUpdateOneRequiredWithoutExternalDocsNestedInputSchema: z.ZodType = z.object({ +export const ConversationUpdateOneWithoutExternalDocsNestedInputSchema: z.ZodType = z.object({ create: z.union([ z.lazy(() => ConversationCreateWithoutExternalDocsInputSchema),z.lazy(() => ConversationUncheckedCreateWithoutExternalDocsInputSchema) ]).optional(), connectOrCreate: z.lazy(() => ConversationCreateOrConnectWithoutExternalDocsInputSchema).optional(), upsert: z.lazy(() => ConversationUpsertWithoutExternalDocsInputSchema).optional(), + disconnect: z.union([ z.boolean(),z.lazy(() => ConversationWhereInputSchema) ]).optional(), + delete: z.union([ z.boolean(),z.lazy(() => ConversationWhereInputSchema) ]).optional(), connect: z.lazy(() => ConversationWhereUniqueInputSchema).optional(), update: z.union([ z.lazy(() => ConversationUpdateToOneWithWhereWithoutExternalDocsInputSchema),z.lazy(() => ConversationUpdateWithoutExternalDocsInputSchema),z.lazy(() => ConversationUncheckedUpdateWithoutExternalDocsInputSchema) ]).optional(), }).strict(); @@ -11463,7 +11470,7 @@ export const ExternalSourceScalarWhereInputSchema: z.ZodType ExternalSourceScalarWhereInputSchema).array().optional(), NOT: z.union([ z.lazy(() => ExternalSourceScalarWhereInputSchema),z.lazy(() => ExternalSourceScalarWhereInputSchema).array() ]).optional(), id: z.union([ z.lazy(() => StringFilterSchema),z.string() ]).optional(), - conversationId: z.union([ z.lazy(() => StringFilterSchema),z.string() ]).optional(), + conversationId: z.union([ z.lazy(() => StringNullableFilterSchema),z.string() ]).optional().nullable(), type: z.union([ z.lazy(() => EnumExternalSourceTypeFilterSchema),z.lazy(() => ExternalSourceTypeSchema) ]).optional(), vectorId: z.union([ z.lazy(() => StringNullableFilterSchema),z.string() ]).optional().nullable(), sourceUrl: z.union([ z.lazy(() => StringNullableFilterSchema),z.string() ]).optional().nullable(), @@ -12529,12 +12536,12 @@ export const ExternalSourceCreateWithoutChunksInputSchema: z.ZodType NullableJsonNullValueInputSchema),InputJsonValueSchema ]).optional(), createdAt: z.coerce.date().optional(), updatedAt: z.coerce.date().optional(), - conversation: z.lazy(() => ConversationCreateNestedOneWithoutExternalDocsInputSchema) + conversation: z.lazy(() => ConversationCreateNestedOneWithoutExternalDocsInputSchema).optional() }).strict(); export const ExternalSourceUncheckedCreateWithoutChunksInputSchema: z.ZodType = z.object({ id: z.string().cuid().optional(), - conversationId: z.string(), + conversationId: z.string().optional().nullable(), type: z.lazy(() => ExternalSourceTypeSchema), vectorId: z.string().optional().nullable(), sourceUrl: z.string().url({ message: "Must be a valid URL" }).optional().nullable(), @@ -12619,12 +12626,12 @@ export const ExternalSourceUpdateWithoutChunksInputSchema: z.ZodType NullableJsonNullValueInputSchema),InputJsonValueSchema ]).optional(), createdAt: z.union([ z.coerce.date(),z.lazy(() => DateTimeFieldUpdateOperationsInputSchema) ]).optional(), updatedAt: z.union([ z.coerce.date(),z.lazy(() => DateTimeFieldUpdateOperationsInputSchema) ]).optional(), - conversation: z.lazy(() => ConversationUpdateOneRequiredWithoutExternalDocsNestedInputSchema).optional() + conversation: z.lazy(() => ConversationUpdateOneWithoutExternalDocsNestedInputSchema).optional() }).strict(); export const ExternalSourceUncheckedUpdateWithoutChunksInputSchema: z.ZodType = z.object({ id: z.union([ z.string().cuid(),z.lazy(() => StringFieldUpdateOperationsInputSchema) ]).optional(), - conversationId: z.union([ z.string(),z.lazy(() => StringFieldUpdateOperationsInputSchema) ]).optional(), + conversationId: z.union([ z.string(),z.lazy(() => NullableStringFieldUpdateOperationsInputSchema) ]).optional().nullable(), type: z.union([ z.lazy(() => ExternalSourceTypeSchema),z.lazy(() => EnumExternalSourceTypeFieldUpdateOperationsInputSchema) ]).optional(), vectorId: z.union([ z.string(),z.lazy(() => NullableStringFieldUpdateOperationsInputSchema) ]).optional().nullable(), sourceUrl: z.union([ z.string().url({ message: "Must be a valid URL" }),z.lazy(() => NullableStringFieldUpdateOperationsInputSchema) ]).optional().nullable(), diff --git a/packages/uploads/src/services/r2UploadHandler.ts b/packages/uploads/src/services/r2UploadHandler.ts index 7c3a698..d3153e8 100644 --- a/packages/uploads/src/services/r2UploadHandler.ts +++ b/packages/uploads/src/services/r2UploadHandler.ts @@ -82,7 +82,7 @@ export class R2UploadHandler { file: Buffer, originalFilename: string, mimeType: string, - conversationId: string, + conversationId?: string, // Now optional - sources can be standalone opts?: { analyze?: boolean; generateSpec?: boolean; @@ -101,50 +101,63 @@ export class R2UploadHandler { }); try { - // 2. Check if this document (by docHash) already exists for the conversation - const existing = await this.services.externalSourceService.findMany( - { - conversationId, - metadata: { - path: ['docHash'], - equals: docHash, + // 2. Check if this document (by docHash) already exists within the same conversation + // Only perform deduplication when conversationId is provided to prevent cross-conversation data leakage + if (conversationId) { + const existing = await this.services.externalSourceService.findMany( + { + conversationId, + metadata: { + path: ['docHash'], + equals: docHash, + }, }, - } as any, - { take: 1 } - ); + { take: 1 } + ); - if (existing.length > 0) { - const found = existing[0] as any; - if (found?.metadata?.processingFailed === true) { - console.log( - `⚠️ Found previous failed processing for this docHash; reprocessing now.` - ); - } else { - console.log( - `🟡 Duplicate upload detected (docHash match). Reusing ExternalSource ${found.id}` - ); - return { - ...found, - analysisId: undefined, - specId: undefined, - docHash, - wasIdempotent: true, - metrics: { - intentsPersistedCount: 0, - specLinksCreated: 0, - specTicketsLinked: 0, - }, - } as any; + if (existing.length > 0) { + const found = existing[0] as any; + if (found?.metadata?.processingFailed === true) { + console.log( + `⚠️ Found previous failed processing for this docHash; reprocessing now.` + ); + } else { + console.log( + `🟡 Duplicate upload detected (docHash match). Reusing ExternalSource ${found.id}` + ); + return { + ...found, + analysisId: undefined, + specId: undefined, + docHash, + wasIdempotent: true, + metrics: { + intentsPersistedCount: 0, + specLinksCreated: 0, + specTicketsLinked: 0, + }, + } as any; + } } } // 3. Generate unique key and upload to R2 - const key = this.generateKey(conversationId, originalFilename); + // Use 'standalone' folder for sources without a conversation + const folder = conversationId || 'standalone'; + const key = this.generateKey(folder, originalFilename); const fileUrl = this.getFileUrl(key); console.log(`📁 Uploading to R2: ${key}`); // Upload to R2 using AWS SDK Upload + const metadata: Record = { + originalFilename, + uploadedAt: new Date().toISOString(), + }; + if (conversationId) { + metadata.conversationId = conversationId; + } + const upload = new Upload({ client: this.s3Client, params: { @@ -152,11 +165,7 @@ export class R2UploadHandler { Key: key, Body: file, ContentType: mimeType, - Metadata: { - conversationId, - originalFilename, - uploadedAt: new Date().toISOString(), - }, + Metadata: metadata, }, }); @@ -180,18 +189,24 @@ export class R2UploadHandler { docHash, }; - // 5. Create external source - let externalSource: any = - await this.services.externalSourceService.create({ - conversation: { - connect: { - id: conversationId, - }, + // 5. Create external source - conversation connection is optional + const createData: any = { + type: getSourceTypeFromMimeType(mimeType), + metadata: fileMetadata, + sourceUrl: fileUrl, + }; + + // Only connect to conversation if conversationId is provided + if (conversationId) { + createData.conversation = { + connect: { + id: conversationId, }, - type: getSourceTypeFromMimeType(mimeType), - metadata: fileMetadata, - sourceUrl: fileUrl, - }); + }; + } + + let externalSource: any = + await this.services.externalSourceService.create(createData); // 6. Create chunks console.log(`🤖 Processing document with AI package...`); @@ -214,7 +229,10 @@ export class R2UploadHandler { result.chunks ); - // 8. Save vectors + // 8. Save vectors - use externalSource.id as namespace if no conversationId + // Note: Standalone sources use externalSource.id as the vector namespace. + // If a feature to link standalone sources to conversations is added later, + // vectors would need to be migrated to the new namespace (conversationId). await this.services.vectorService.storeVectors( dbChunks.map((dbChunk: any, index: number) => ({ id: dbChunk.id, @@ -223,26 +241,29 @@ export class R2UploadHandler { metadata: dbChunk.metadata, embedding: result.chunks[index]?.embedding || [], })), - conversationId, + conversationId || externalSource.id, // Use source ID as namespace for standalone sources externalSource.id ); - // 9. Emit ChunkUpserted events for each chunk - for (const chunk of dbChunks) { - await this.services.outboxService.createChunkUpsertedEvent( - chunk.id, - conversationId - ); + // 9. Emit ChunkUpserted events for each chunk (only if tied to a conversation) + if (conversationId) { + for (const chunk of dbChunks) { + await this.services.outboxService.createChunkUpsertedEvent( + chunk.id, + conversationId + ); + } } - // 10. Optional: analyze and/or generate spec + // 10. Optional: analyze and/or generate spec (only if tied to a conversation) let analysisId: string | undefined; let specId: string | undefined; let intentsPersistedCount = 0; let specLinksCreated = 0; let specTicketsLinked = 0; - if (opts?.analyze || opts?.generateSpec) { + // Analysis requires a conversation context + if (conversationId && (opts?.analyze || opts?.generateSpec)) { const result = await this.services.specGeneration.analyzeDocumentAndGenerateSpec( dbChunks, @@ -341,7 +362,7 @@ export class R2UploadHandler { filename: string; mimeType: string; }>, - conversationId: string + conversationId?: string // Now optional - sources can be standalone ) { console.log(`🔄 Processing ${files.length} files in bulk...`); @@ -380,7 +401,7 @@ export class R2UploadHandler { */ async uploadAndProcessFromKey( key: string, - conversationId: string, + conversationId: string | undefined, // Now optional - sources can be standalone filename: string, contentType: string, opts?: { @@ -394,46 +415,49 @@ export class R2UploadHandler { // 1) Stream/download from R2 using key -> Buffer const buffer = await this.getObjectAsBuffer(key); - // 2) Compute doc hash and short-circuit if duplicate + // 2) Compute doc hash and short-circuit if duplicate within the same conversation const docHash = generateUploadDocHash(buffer, { filename, firstBytes: 1024, fileSize: buffer.length, }); - const existing = await this.services.externalSourceService.findMany( - { - conversationId, - metadata: { - path: ['docHash'], - equals: docHash, + // Only perform deduplication when conversationId is provided to prevent cross-conversation data leakage + if (conversationId) { + const existing = await this.services.externalSourceService.findMany( + { + conversationId, + metadata: { + path: ['docHash'], + equals: docHash, + }, }, - } as any, - { take: 1 } - ); + { take: 1 } + ); - if (existing.length > 0) { - const found = existing[0] as any; - if (found?.metadata?.processingFailed === true) { - console.log( - `⚠️ Found previous failed processing for this docHash (presigned); reprocessing.` - ); - } else { - console.log( - `🟡 Duplicate presigned upload detected (docHash match). Reusing ExternalSource ${found.id}` - ); - return { - ...found, - analysisId: undefined, - specId: undefined, - docHash, - wasIdempotent: true, - metrics: { - intentsPersistedCount: 0, - specLinksCreated: 0, - specTicketsLinked: 0, - }, - } as any; + if (existing.length > 0) { + const found = existing[0] as any; + if (found?.metadata?.processingFailed === true) { + console.log( + `⚠️ Found previous failed processing for this docHash (presigned); reprocessing.` + ); + } else { + console.log( + `🟡 Duplicate presigned upload detected (docHash match). Reusing ExternalSource ${found.id}` + ); + return { + ...found, + analysisId: undefined, + specId: undefined, + docHash, + wasIdempotent: true, + metrics: { + intentsPersistedCount: 0, + specLinksCreated: 0, + specTicketsLinked: 0, + }, + } as any; + } } }