Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions apps/api/src/modules/conversation/mutation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,32 @@ builder.mutationField('deleteConversation', (t) =>
id: t.arg.string({ required: true }),
},
resolve: async (_parent, args, ctx) => {
// 1. Find all external sources for this conversation
const sources =
await ctx.services.externalSourceService.findByConversationId(args.id);

// 2. Delete vectors for each source from Pinecone
for (const source of sources) {
try {
await ctx.vectorService.deleteVectors(
source.id,
args.id, // conversationId is the namespace for conversation-linked sources
args.id
);
} catch (error) {
console.error(
`Failed to delete vectors for source ${source.id}:`,
error
);
// Continue with other sources even if one fails
}
}

// 3. Delete all external sources for the conversation
// This will cascade delete their chunks
await ctx.services.externalSourceService.deleteByConversationId(args.id);

// 4. Delete the conversation itself
await ctx.services.conversationService.delete(args.id);
return true;
},
Expand Down
1 change: 1 addition & 0 deletions apps/api/src/modules/externalSource/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
import './type';
import './query';
import './mutation';
149 changes: 149 additions & 0 deletions apps/api/src/modules/externalSource/mutation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import { builder } from 'config/builder';

// ---- Delete external source
builder.mutationField('deleteExternalSource', (t) =>
t.field({
type: 'Boolean',
nullable: false,
args: {
id: t.arg.string({ required: true }),
},
resolve: async (_parent, args, ctx) => {
try {
// Get the source first to determine the vector namespace
const source = await ctx.services.externalSourceService.findById(
args.id
);
if (!source) {
throw new Error(`External source ${args.id} not found`);
}

// TODO: Add ownership verification once user auth is implemented
// if (source.conversation?.userId !== ctx.user.id) {
// throw new Error('Not authorized to delete this source');
// }

// Delete vectors from Pinecone first
// Namespace is conversationId for conversation-linked sources, or sourceId for standalone
const vectorNamespace = source.conversationId || args.id;
await ctx.vectorService.deleteVectors(
args.id,
vectorNamespace,
source.conversationId ?? undefined
);

// Delete the source and its chunks from the database
await ctx.services.externalSourceService.deleteWithChunks(args.id);

// TODO: Implement R2 file cleanup - currently leaves orphaned files
// Options:
// 1. Delete synchronously here: ctx.r2.deleteFile(metadata?.fileKey)
// 2. Create an outbox event for async cleanup
// 3. Implement a scheduled cleanup job to remove orphaned R2 files
// Tracked in: [add tracking issue URL when created]

return true;
Comment thread
carterax marked this conversation as resolved.
} catch (error) {
console.error(`Failed to delete external source ${args.id}:`, error);
throw new Error(
`Failed to delete external source: ${error instanceof Error ? error.message : 'Unknown error'}`
);
}
},
})
);

// ---- Re-ingest external source
/**
* Re-ingest an external source by triggering the processing pipeline again
* This clears existing chunks and re-processes the file from R2 storage
*/
builder.mutationField('reIngestSource', (t) =>
t.prismaField({
type: 'ExternalSource',
nullable: false,
args: {
id: t.arg.string({ required: true }),
},
resolve: async (_query, _parent, args, ctx) => {
try {
// Get the source to verify it exists and get file info
const source = await ctx.services.externalSourceService.findById(
args.id
);
if (!source) {
throw new Error(`External source ${args.id} not found`);
}

// Get metadata for file info
const metadata = source.metadata as Record<string, unknown> | null;
const fileKey = metadata?.fileKey as string | undefined;

if (!fileKey) {
throw new Error(
'Cannot re-ingest: source does not have a stored file'
);
}

// Delete existing vectors from Pinecone first (external service, not transactional)
// Namespace is conversationId for conversation-linked sources, or sourceId for standalone
// Note: If Pinecone deletion fails, we throw before DB changes, maintaining consistency.
// If Pinecone succeeds but DB fails, vectors are orphaned (acceptable - will be overwritten on retry).
const vectorNamespace = source.conversationId || args.id;
await ctx.vectorService.deleteVectors(
args.id,
vectorNamespace,
source.conversationId ?? undefined
);

// Wrap DB operations in a transaction to ensure atomicity:
// - Delete chunks
// - Update source metadata
// - Create outbox event
// This prevents the source from being in an inconsistent state if any DB operation fails.
const updatedSource = await ctx.prisma.$transaction(async (tx: any) => {
// Delete existing chunks (they will be re-created during processing)
await tx.chunk.deleteMany({
where: { externalSourceId: args.id },
});

// Update source metadata to mark as pending re-ingest
const updated = await tx.externalSource.update({
where: { id: args.id },
data: {
metadata: {
...metadata,
processingFailed: false,
processingError: null,
reIngestRequestedAt: new Date().toISOString(),
status: 'pending',
},
},
include: { conversation: true },
});

// Create outbox event for async re-processing
await tx.outbox.create({
data: {
type: 'SourceReIngestRequested',
payload: {
externalSourceId: args.id,
fileKey,
conversationId: source.conversationId,
},
},
});

return updated;
});

return updatedSource;
} catch (error) {
console.error(`Failed to re-ingest external source ${args.id}:`, error);
throw new Error(
`Failed to re-ingest source: ${error instanceof Error ? error.message : 'Unknown error'}`
);
}
},
})
);
34 changes: 34 additions & 0 deletions apps/api/src/modules/externalSource/query.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { builder } from 'config/builder';
import { ExternalSourceType } from '@overchat/database/services';
Comment thread
carterax marked this conversation as resolved.

builder.queryField('externalSources', (t) => {
return t.prismaField({
Expand All @@ -14,3 +15,36 @@ builder.queryField('externalSources', (t) => {
},
});
});

builder.queryField('externalSourcesAll', (t) => {
return t.prismaField({
type: ['ExternalSource'],
nullable: false,
args: {
limit: t.arg.int({ required: false, defaultValue: 50 }),
offset: t.arg.int({ required: false, defaultValue: 0 }),
type: t.arg({ type: 'ExternalSourceType', required: false }),
},
resolve: async (query, _root, args, ctx, _info) => {
return ctx.services.externalSourceService.findAll({
limit: args.limit ?? 50,
offset: args.offset ?? 0,
type: args.type as ExternalSourceType | undefined,
includeChunks: true,
});
},
});
});

builder.queryField('externalSource', (t) => {
return t.prismaField({
type: 'ExternalSource',
nullable: true,
args: {
id: t.arg.string({ required: true }),
},
resolve: async (query, _root, args, ctx, _info) => {
return ctx.services.externalSourceService.findByIdWithChunks(args.id);
},
});
});
2 changes: 1 addition & 1 deletion apps/api/src/modules/externalSource/type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { builder } from 'config/builder';
builder.prismaObject('ExternalSource', {
fields: (t) => ({
id: t.exposeID('id'),
conversationId: t.exposeString('conversationId'),
conversationId: t.exposeString('conversationId', { nullable: true }),
type: t.exposeString('type'),
vectorId: t.exposeString('vectorId', { nullable: true }),
sourceUrl: t.exposeString('sourceUrl', { nullable: true }),
Expand Down
31 changes: 10 additions & 21 deletions apps/api/src/plugins/r2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,7 @@ export const r2Plugin = fp(
return { success: false, error: 'No file provided' };
}

const conversationId = data.fields?.conversationId?.value;

if (!conversationId) {
console.log('❌ conversationId validation failed');
reply.code(400);
return { success: false, error: 'conversationId is required' };
}
const conversationId = data.fields?.conversationId?.value || undefined;

// Convert stream to buffer
const buffer = await data.toBuffer();
Expand Down Expand Up @@ -160,16 +154,17 @@ export const r2Plugin = fp(
try {
const { conversationId, filename, contentType } = request.body;

if (!conversationId || !filename || !contentType) {
if (!filename || !contentType) {
reply.code(400);
return {
success: false,
error: 'conversationId, filename, and contentType are required',
error: 'filename and contentType are required',
};
}

// Generate R2 key
const key = r2Service.generateKey(conversationId, filename);
// Generate R2 key - use 'standalone' folder for sources without conversation
const folder = conversationId || 'standalone';
const key = r2Service.generateKey(folder, filename);

// Get presigned URL
const presignedData = await r2Service.getPresignedUploadUrl(
Expand Down Expand Up @@ -221,12 +216,11 @@ export const r2Plugin = fp(
lowConfidenceThreshold,
} = request.body;

if (!key || !conversationId || !filename || !contentType) {
if (!key || !filename || !contentType) {
reply.code(400);
return {
success: false,
error:
'key, conversationId, filename, and contentType are required',
error: 'key, filename, and contentType are required',
};
}

Expand Down Expand Up @@ -287,13 +281,8 @@ export const r2Plugin = fp(

// Extract form fields from the first file (they should be the same for all files)
const firstFile = files[0];
const conversationId = firstFile.fields?.conversationId?.value;
console.log('conversationId', conversationId);

if (!conversationId) {
reply.code(400);
return { success: false, error: 'conversationId is required' };
}
const conversationId =
firstFile.fields?.conversationId?.value || undefined;

// Process all files
const analyze = asBool(firstFile.fields?.analyze?.value);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- DropForeignKey
ALTER TABLE "ExternalSource" DROP CONSTRAINT "ExternalSource_conversationId_fkey";

-- AlterTable
ALTER TABLE "ExternalSource" ALTER COLUMN "conversationId" DROP NOT NULL;

-- CreateIndex
CREATE INDEX "ExternalSource_conversationId_idx" ON "ExternalSource"("conversationId");

-- CreateIndex
CREATE INDEX "ExternalSource_type_idx" ON "ExternalSource"("type");

-- CreateIndex
CREATE INDEX "ExternalSource_createdAt_idx" ON "ExternalSource"("createdAt");

-- AddForeignKey
ALTER TABLE "ExternalSource" ADD CONSTRAINT "ExternalSource_conversationId_fkey" FOREIGN KEY ("conversationId") REFERENCES "Conversation"("id") ON DELETE SET NULL ON UPDATE CASCADE;
Comment thread
carterax marked this conversation as resolved.
10 changes: 7 additions & 3 deletions packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ enum ExternalSourceType {
}

model ExternalSource {
id String @id @default(cuid())
conversationId String
id String @id @default(cuid())
conversationId String?
type ExternalSourceType
conversation Conversation @relation(fields: [conversationId], references: [id])
conversation Conversation? @relation(fields: [conversationId], references: [id])

vectorId String?
/// @zod.string.url({ message: "Must be a valid URL" })
Expand All @@ -134,6 +134,10 @@ model ExternalSource {
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
chunks Chunk[]

@@index([conversationId])
@@index([type])
@@index([createdAt])
}

model Chunk {
Expand Down
Loading
Loading