diff --git a/CHANGELOG.md b/CHANGELOG.md index 79798ed5..b47f36e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to - 🐛(fix) add prevent_url_hallucination instruction to ConversationAgent - ✨(projects) handle project files for RAG search - ✨(banner) configurable banner with level, title, content and start/end +- ✨(back) de-index collections after inactivity ### Changed diff --git a/Makefile b/Makefile index d090cf2d..eef2dfc8 100644 --- a/Makefile +++ b/Makefile @@ -240,7 +240,11 @@ back-i18n-generate: ## create the .pot files used for i18n shell: ## connect to database shell @$(MANAGE) shell #_plus -.PHONY: dbshell +.PHONY: shell + +deindex_inactive_collections: ## run the deindex_inactive_collections management command + @$(MANAGE) deindex_inactive_collections +.PHONY: deindex_inactive_collections # -- Database diff --git a/docs/attachments.md b/docs/attachments.md index 1dd15eb0..4de8a89e 100644 --- a/docs/attachments.md +++ b/docs/attachments.md @@ -20,6 +20,9 @@ Both share the same model, same storage, same RAG backend, and the same retrieva - [Project RAG collection](#project-rag-collection) - [Markdown companion attachment](#markdown-companion-attachment) - [Deletion lifecycle](#deletion-lifecycle) +- [RAG Collection Lifecycle](#rag-collection-lifecycle) + - [De-indexing inactive conversations](#de-indexing-inactive-conversations) + - [Transparent re-indexing on resume](#transparent-re-indexing-on-resume) - [Security & Validation](#security--validation) - [Malware Detection](#malware-detection) - [Document Processing for LLMs](#document-processing-for-llms) @@ -272,6 +275,76 @@ The trade-off accepted on every path: a transient backend hiccup may strand orph --- +## RAG Collection Lifecycle + +Every conversation that has indexed text attachments owns a RAG collection in the vector store, identified by `ChatConversation.collection_id`. Long-lived deployments accumulate many idle collections that consume storage and quota. This section describes the two-phase lifecycle: scheduled de-indexing of inactive conversations, and transparent re-indexing when a user resumes one. + +### De-indexing inactive conversations + +The `deindex_inactive_collections` management command identifies conversations that have been inactive for more than `RAG_COLLECTION_INACTIVITY_DAYS` days and removes their vector store collection. + + +**What "inactive" means**: `ChatConversation.updated_at < now() - RAG_COLLECTION_INACTIVITY_DAYS days`. Because `reindex_conversation` writes `update_fields=["collection_id", "updated_at"]` on success, a recent re-index resets the inactivity clock — a conversation is not de-indexed again immediately after it was just re-indexed. + +**Scheduling**: Run this as a periodic job. A Helm CronJob template is provided (`backend.deindexCronJob`) with `concurrencyPolicy: Forbid` to prevent overlapping runs. + +**What is NOT de-indexed**: Project collections are managed separately (their lifecycle is tied to project/attachment delete). Only conversation collections controlled by `ChatConversation.collection_id` are affected. + +### Transparent re-indexing on resume + +When a user sends a message to a conversation whose `index_state` is `DEINDEXED` or `ERROR` but which has `READY` text attachments, the backend automatically rebuilds the collection before running the agent. This is handled by `reindex_conversation` in `chat/clients/conversation_reindexer.py`. + +#### `reindex_conversation` — behaviour summary + +An async generator that brings a conversation's RAG collection up to date before the agent runs. It emits a `conversation_resume` +tool-call/result pair so the UI can show progress. + +**Claim (concurrency guard)** + +Before doing any work it atomically sets `index_state = INDEXING` on the row, but only if the conversation is in a claimable state: + +- `DEINDEXED` or `ERROR` → always claimable +- `INDEXING` with `updated_at` older than `REINDEX_CLAIM_TIMEOUT_SECONDS` → stale lock, also claimable + +If the row is not updated (another process holds a fresh claim), the generator returns immediately with **no events**. + +**Early exits (no events emitted)** + +| Condition | New state | +|-----------|-----------| +| No READY attachments | `UNINDEXED` | +| All text attachments are already indexed or in-context | `INDEXED` (if collection exists) / `UNINDEXED` | + +**Main path** + +1. **Collection**: reuses `conversation.collection_id` if set (so partial-failure retries add only the missing docs to the existing +collection). Creates a new collection otherwise; on creation failure → `ERROR`, error event, return. +2. **Per-attachment loop**: reads the file asynchronously (`asyncio.to_thread`), stores it in the document backend, marks `is_indexed = +True`. Individual failures are caught and collected; the loop always continues. +3. **Final state transition**: + - Zero failures → `index_state = INDEXED`, `collection_id` updated, `{state: "done"}` + - Partial failure → `index_state = ERROR`, `collection_id` updated, `{state: "partial", failed_documents: [...]}` + - Total failure → `index_state = ERROR`, `collection_id` **not** updated (collection is empty), `{state: "error"}` + +`ERROR` always triggers a retry on the next request, and because successful attachments have `is_indexed = True`, only the failed ones are + attempted again. + +**What gets re-indexed**: Only attachments that are both READY **and** not already inlined as `full-context` in the current LLM context window. Small documents that fit the inlining budget are already readable by the model directly from the system prompt — putting them in the vector store too would be redundant. Only `tool_call_only` attachments (too large to inline) are re-indexed. + +**Error states**: + +| `result.state` | Meaning | User-visible outcome | +|---|---|---| +| `"done"` | All attachments re-indexed | Silent — loader disappears, conversation continues | +| `"partial"` | Some attachments indexed, some failed | Error modal listing failed filenames — user can re-upload them | +| `"error"` | Collection creation failed **or** all attachments failed | Error modal — RAG tools unavailable for this turn | + +**Frontend**: While re-indexing is in progress, `ToolInvocationItem` renders a `ConversationResumeLoader` with a chat-bubble illustration and the copy "Picking up where you left off". Once the `ToolResultPart` arrives, the loader disappears. Errors surface via `setChatErrorModal`. + +**Binary attachments** (PDF, images): never re-indexed — `reindex_conversation` only processes `text/*` content types. PDFs are sent directly to the LLM as document URLs; images as presigned `ImageUrl` objects. Neither needs a vector store entry. + +--- + ## Security & Validation For now, the system is not intended to host user-uploaded files for public download. @@ -423,26 +496,26 @@ Notes: #### Inlining policy and FIFO eviction -The decision of which documents are inlined as `full-context` vs left as `tool_call_only` is made by `chat/document_context_builder.py:build_document_context_instruction` on each turn: +The decision of which documents are inlined as `full-context` vs left as `tool_call_only` is made by `chat/document_context_builder.py:build_documents_listing` on each turn (called via `_build_document_context_instruction` in `chat/clients/pydantic_ai.py`): 1. Compute the `document_budget` in tokens: ```text document_budget = max(int(model.max_token_context * DOCUMENT_CONTEXT_BUDGET_RATIO) - DOCUMENT_CONTEXT_SECURITY_BUFFER_TOKENS, 0) ``` -2. Iterate documents oldest-first. For each document: +2. Load all text attachments from object storage **in parallel** (`asyncio.gather`). Attachments that fail to load are marked `tool_call_only` with their failure logged; other documents are not affected. +3. Iterate documents oldest-first (`order_by("created_at", "id")`). For each document: - If its token count exceeds the whole budget alone → keep `tool_call_only`. - Otherwise, while adding it would overflow the budget, **evict the oldest currently-inlined document** (FIFO): demote it to `tool_call_only`, free its tokens. - Once it fits, mark it `full-context` and inline its content. -3. Edge cases: +4. Edge cases: - If the model has no `max_token_context` configured → all documents stay `tool_call_only` (warning logged). - If `DOCUMENT_CONTEXT_BUDGET_RATIO` is `0` → all documents stay `tool_call_only`. - - If reading an attachment from object storage fails → that document stays `tool_call_only` and the failure is logged; other documents are not affected. Token estimation uses `tiktoken` with the `cl100k_base` encoding (GPT-4 tokenizer). For non-OpenAI models (Mistral, Llama, Anthropic) actual usage may run 5-15% higher; the security buffer absorbs that drift. -The assembled instruction is **cached** per turn keyed on: -`conversation_id`, `user_id`, `model_hrid`, `model.max_token_context`, `DOCUMENT_CONTEXT_BUDGET_RATIO`, `DOCUMENT_CONTEXT_SECURITY_BUFFER_TOKENS`, and a fingerprint of `(attachment.id, attachment.updated_at)` for every text attachment - **conversation and project text attachments both contribute to the fingerprint**. Any attachment add / remove / edit (including project files), or any settings change, invalidates the cache. TTL is 30 minutes (`CACHE_TIMEOUT`). +The assembled listing is **cached** per turn (in `_build_documents_listing`, `pydantic_ai.py`) keyed on: +`conversation_id`, `user_id`, `model_hrid`, `model.max_token_context`, `DOCUMENT_CONTEXT_BUDGET_RATIO`, `DOCUMENT_CONTEXT_SECURITY_BUFFER_TOKENS`, and a fingerprint of `(attachment.id, attachment.updated_at)` for every text attachment — **conversation and project text attachments both contribute to the fingerprint**. Any attachment add / remove / edit (including project files), or any settings change, invalidates the cache. TTL is 30 minutes (`CACHE_TIMEOUT`). #### Targeted document operations (`document_id`) @@ -524,6 +597,7 @@ A `READY` attachment whose `rag_document_id` is null (e.g. parse succeeded but t | `PROJECT_IMAGES_MAX_COUNT` | `3` | Max image attachments per project. Enforced at upload-time. Bounds per-turn vision token cost - every project image is pinned to every turn alongside conversation-message images, and provider request-level image caps (Anthropic ~20/request) clip the trailing entries first. | | `DOCUMENT_CONTEXT_BUDGET_RATIO` | `0.5` | Fraction of `model.max_token_context` reserved for inlined documents (0 disables full-context inlining; everything stays `tool_call_only`) | | `DOCUMENT_CONTEXT_SECURITY_BUFFER_TOKENS` | `1000` | Tokens subtracted from the inlining budget to absorb tokenizer drift on non-OpenAI models | +| `RAG_COLLECTION_INACTIVITY_DAYS` | `30` | Conversations inactive for this many days have their RAG collection de-indexed by `deindex_inactive_collections`. Resets on re-index. | #### RAG_FILES_ACCEPTED_FORMATS diff --git a/src/backend/chat/clients/conversation_reindexer.py b/src/backend/chat/clients/conversation_reindexer.py new file mode 100644 index 00000000..ae08b250 --- /dev/null +++ b/src/backend/chat/clients/conversation_reindexer.py @@ -0,0 +1,175 @@ +"""Standalone async generator for re-indexing a conversation's RAG collection.""" + +import asyncio +import logging +import uuid +from datetime import timedelta +from typing import AsyncGenerator + +from django.conf import settings +from django.core.files.storage import default_storage +from django.db.models import Q +from django.utils import timezone +from django.utils.module_loading import import_string + +from core.file_upload.enums import AttachmentStatus + +from chat import models +from chat.enums import CollectionIndexState +from chat.vercel_ai_sdk.core import events_v4 + +logger = logging.getLogger(__name__) +document_store_backend = import_string(settings.RAG_DOCUMENT_SEARCH_BACKEND) + + +async def _read_attachment_bytes(key: str) -> bytes: + def _read(): + with default_storage.open(key, "rb") as f: + return f.read() + + return await asyncio.to_thread(_read) + + +async def reindex_conversation( + conversation: models.ChatConversation, + in_context_ids: set[str], +) -> AsyncGenerator[events_v4.Event, None]: + """ + Re-index READY attachments not already inlined in the context window. + + Only `tool_call_only` attachments (too large for context) need to be in the + vector store; `full-context` attachments are already readable by the model. + + Emits a ToolCallPart/ToolResultPart pair so the UI shows progress. + On collection creation failure: logs and returns without RAG (conversation continues). + On individual attachment failure: logs and continues with remaining attachments. + """ + timeout = timedelta(seconds=settings.REINDEX_CLAIM_TIMEOUT_SECONDS) + claimed = await models.ChatConversation.objects.filter( + Q( + pk=conversation.pk, + index_state__in=[ + CollectionIndexState.DEINDEXED, + CollectionIndexState.ERROR, + ], + ) + | Q( + pk=conversation.pk, + index_state=CollectionIndexState.INDEXING, + updated_at__lt=timezone.now() - timeout, + ) + ).aupdate(index_state=CollectionIndexState.INDEXING, updated_at=timezone.now()) + if not claimed: + return + + ready_attachments = [ + attachment + async for attachment in models.ChatConversationAttachment.objects.filter( + conversation=conversation, + upload_state=AttachmentStatus.READY, + ) + ] + + if not ready_attachments: + await models.ChatConversation.objects.filter(pk=conversation.pk).aupdate( + index_state=CollectionIndexState.UNINDEXED, + updated_at=timezone.now(), + ) + return + + text_attachments_to_reindex = [ + a + for a in ready_attachments + if a.content_type.startswith("text/") + and str(a.id) not in in_context_ids + and not a.is_indexed + ] + + if not text_attachments_to_reindex: + new_state = ( + CollectionIndexState.INDEXED + if conversation.collection_id + else CollectionIndexState.UNINDEXED + ) + await models.ChatConversation.objects.filter(pk=conversation.pk).aupdate( + index_state=new_state, + updated_at=timezone.now(), + ) + return + + _tool_call_id = str(uuid.uuid4()) + yield events_v4.ToolCallPart( + tool_call_id=_tool_call_id, + tool_name="conversation_resume", + args={}, + ) + + # Reuse existing collection if available so partial-failure retries add only + # the missing documents rather than rebuilding from scratch. + existing_collection_id = conversation.collection_id + document_store = document_store_backend(collection_id=existing_collection_id) + if not existing_collection_id: + try: + await document_store.acreate_collection( + name=f"conversation-{conversation.pk}", + ) + except Exception: # pylint: disable=broad-except + logger.exception("Failed to create collection for conversation %s", conversation.pk) + await models.ChatConversation.objects.filter(pk=conversation.pk).aupdate( + index_state=CollectionIndexState.ERROR, + collection_id=None, + updated_at=timezone.now(), + ) + await models.ChatConversationAttachment.objects.filter( + conversation=conversation, + ).aupdate(is_indexed=False) + yield events_v4.ToolResultPart( + tool_call_id=_tool_call_id, + result={"state": "error", "error": "Documents could not be re-indexed."}, + ) + return + + failed_documents = [] + for attachment in text_attachments_to_reindex: + try: + content = await _read_attachment_bytes(attachment.key) + rag_document_id = await asyncio.to_thread( + document_store.store_document, + name=attachment.file_name.removesuffix(".md"), + content=content.decode("utf-8"), + ) + await models.ChatConversationAttachment.objects.filter(pk=attachment.pk).aupdate( + is_indexed=True, + rag_document_id=rag_document_id or None, + ) + except Exception: # pylint: disable=broad-except + failed_documents.append(attachment.file_name) + logger.exception( + "Failed to re-index attachment %s for conversation %s", + attachment.pk, + conversation.pk, + ) + + any_failed = bool(failed_documents) + all_failed = len(failed_documents) == len(text_attachments_to_reindex) + + update_fields = { + "index_state": CollectionIndexState.ERROR if any_failed else CollectionIndexState.INDEXED, + "updated_at": timezone.now(), + } + + update_fields["collection_id"] = str(document_store.collection_id) + if all_failed: + result = {"state": "error", "error": "Documents could not be re-indexed."} + else: + result = ( + {"state": "partial", "failed_documents": failed_documents} + if failed_documents + else {"state": "done"} + ) + + await models.ChatConversation.objects.filter(pk=conversation.pk).aupdate(**update_fields) + yield events_v4.ToolResultPart( + tool_call_id=_tool_call_id, + result=result, + ) diff --git a/src/backend/chat/clients/pydantic_ai.py b/src/backend/chat/clients/pydantic_ai.py index 517e47b5..91e103fa 100644 --- a/src/backend/chat/clients/pydantic_ai.py +++ b/src/backend/chat/clients/pydantic_ai.py @@ -138,6 +138,7 @@ UIMessage, ) from chat.clients.async_to_sync import convert_async_generator_to_sync +from chat.clients.conversation_reindexer import reindex_conversation from chat.clients.exceptions import StreamCancelException from chat.clients.pydantic_ui_message_converter import ( model_message_to_ui_message, @@ -150,7 +151,12 @@ StreamingState, ) from chat.constants import TEXT_MIME_PREFIX -from chat.document_context_builder import build_document_context_instruction +from chat.document_context_builder import ( + DocumentsListing, + build_documents_listing, + render_listing, +) +from chat.enums import CollectionIndexState from chat.mcp_servers import get_mcp_servers from chat.tools.descriptions import ( DOCUMENT_SUMMARIZE_PROJECT_TOOL_DESCRIPTION, @@ -415,7 +421,8 @@ async def _prepare_agent_run( - image_key_mapping: Maps signed URLs back to storage keys (for saving) - usage: Token usage dict (initialized to zero) - history: Validated message history for the agent - - conversation_has_documents: Whether RAG should be enabled + - conversation_has_own_documents: Whether this + conversation has its own (non-project) text attachments """ history = ModelMessagesTypeAdapter.validate_python(self.conversation.pydantic_messages) history = update_history_local_urls( @@ -461,12 +468,13 @@ async def _prepare_agent_run( usage = {"promptTokens": 0, "completionTokens": 0, "co2_impact": 0} - conversation_has_documents = self._is_document_upload_enabled and ( + conversation_has_own_documents = self._is_document_upload_enabled and ( bool(self.conversation.collection_id) or bool( await models.ChatConversationAttachment.objects.filter( conversation=self.conversation, content_type__startswith=TEXT_MIME_PREFIX, + upload_state=models.AttachmentStatus.READY, ).aexists() ) ) @@ -477,7 +485,7 @@ async def _prepare_agent_run( image_actions, usage, history, - conversation_has_documents, + conversation_has_own_documents, ) def _setup_web_search(self, force_web_search: bool) -> bool: @@ -502,7 +510,7 @@ def force_web_search_prompt() -> str: return True - async def _check_should_enable_rag(self, conversation_has_documents: bool) -> bool: + async def _check_should_enable_rag(self, conversation_has_own_documents: bool) -> bool: """Check if RAG should be enabled based on actually-indexed documents. The tool is registered when any of the following holds: @@ -528,7 +536,10 @@ async def _check_should_enable_rag(self, conversation_has_documents: bool) -> bo if not self._is_document_upload_enabled: return False - if conversation_has_documents: + if ( + conversation_has_own_documents + and self.conversation.index_state == CollectionIndexState.INDEXED + ): return True indexed_qs = models.ChatConversationAttachment.objects.filter( @@ -680,7 +691,7 @@ async def _parse_input_documents(self, documents: List[BinaryContent | DocumentU if key and rag_document_id: await models.ChatConversationAttachment.objects.filter( conversation_id=self.conversation.pk, key=key - ).aupdate(rag_document_id=rag_document_id) + ).aupdate(rag_document_id=rag_document_id, is_indexed=True) if not document.media_type.startswith(TEXT_MIME_PREFIX): md_attachment = await models.ChatConversationAttachment.objects.acreate( @@ -697,6 +708,9 @@ async def _parse_input_documents(self, documents: List[BinaryContent | DocumentU md_attachment.upload_state = models.AttachmentStatus.READY await md_attachment.asave(update_fields=["upload_state", "updated_at"]) + self.conversation.index_state = CollectionIndexState.INDEXED + await self.conversation.asave(update_fields=["index_state", "updated_at"]) + def _prepare_prompt( # noqa: PLR0912 # pylint: disable=too-many-branches self, message: UIMessage ) -> Tuple[str, List[BinaryContent | ImageUrl], List[BinaryContent]]: @@ -748,7 +762,7 @@ def _prepare_prompt( # noqa: PLR0912 # pylint: disable=too-many-branches return user_prompt[0], attachment_images, attachment_documents - async def _build_document_context_instruction(self) -> str: + async def _build_documents_listing(self) -> DocumentsListing | None: budget_ratio = settings.DOCUMENT_CONTEXT_BUDGET_RATIO security_buffer_tokens = settings.DOCUMENT_CONTEXT_SECURITY_BUFFER_TOKENS # Restrict to READY uploads on both arms: PENDING / ANALYZING / @@ -772,7 +786,7 @@ async def _build_document_context_instruction(self) -> str: ) max_token_context = self.conversation_agent.configuration.max_token_context - # Cache the assembled instruction: building it requires reading every text + # Cache the DocumentsListing: building it requires reading every text # attachment from object storage and tokenizing it. The fingerprint includes # each attachment's updated_at (conversation + project) so any edit # invalidates the cache. @@ -795,7 +809,7 @@ async def _build_document_context_instruction(self) -> str: if cached is not None: return cached - instruction = await build_document_context_instruction( + listing = await build_documents_listing( conversation_id=str(self.conversation.id), text_attachments=text_attachments, project_text_attachments=project_text_attachments, @@ -804,8 +818,12 @@ async def _build_document_context_instruction(self) -> str: budget_ratio=budget_ratio, security_buffer_tokens=security_buffer_tokens, ) - await sync_to_async(cache.set)(cache_key, instruction, CACHE_TIMEOUT) - return instruction + await sync_to_async(cache.set)(cache_key, listing, CACHE_TIMEOUT) + return listing + + async def _build_document_context_instruction(self) -> str: + listing = await self._build_documents_listing() + return render_listing(listing) if listing is not None else "" def _setup_rag_tools(self, document_context_instruction: str = "") -> None: """Register RAG-related tools and instructions on the conversation agent.""" @@ -914,7 +932,7 @@ async def self_documentation(_ctx: RunContext) -> ToolReturn: async def _handle_input_documents( self, input_documents: List[BinaryContent | DocumentUrl], - conversation_has_documents: bool, + conversation_has_own_documents: bool, usage: Dict[str, Union[int, float]], ) -> AsyncGenerator[events_v4.Event | DocumentParsingResult, None]: """ @@ -942,7 +960,7 @@ async def _handle_input_documents( input_documents = [] if not input_documents: - yield DocumentParsingResult(success=True, has_documents=conversation_has_documents) + yield DocumentParsingResult(success=True, has_documents=conversation_has_own_documents) return _tool_call_id = str(uuid.uuid4()) @@ -972,7 +990,7 @@ async def _handle_input_documents( ), ) ) - yield DocumentParsingResult(success=False, has_documents=conversation_has_documents) + yield DocumentParsingResult(success=False, has_documents=conversation_has_own_documents) return yield events_v4.ToolResultPart(tool_call_id=_tool_call_id, result={"state": "done"}) yield DocumentParsingResult(success=True, has_documents=True) @@ -1184,7 +1202,7 @@ async def _finalize_conversation( # pylint: disable=too-many-arguments,too-many ), ) - async def _run_agent( # pylint: disable=too-many-locals + async def _run_agent( # pylint: disable=too-many-locals,too-many-branches,too-many-statements # noqa: PLR0912 self, messages: List[UIMessage], force_web_search: bool = False, @@ -1213,12 +1231,33 @@ async def _run_agent( # pylint: disable=too-many-locals image_actions, usage, history, - conversation_has_documents, + conversation_has_own_documents, ) = await self._prepare_agent_run(messages) + # Re-index conversation if it was de-indexed and has READY attachments + is_conversation_deindexed = bool( + self._is_document_upload_enabled + and self.conversation.index_state + in ( + CollectionIndexState.DEINDEXED, + CollectionIndexState.ERROR, + ) + and conversation_has_own_documents + ) + if is_conversation_deindexed: + listing = await self._build_documents_listing() + in_context_ids = ( + {doc.document_id for doc in listing.documents if doc.access == "full-context"} + if listing + else set() + ) + async for event in reindex_conversation(self.conversation, in_context_ids): + yield event + await self.conversation.arefresh_from_db(fields=["collection_id", "index_state"]) + doc_result = None async for item in self._handle_input_documents( - input_documents, conversation_has_documents, usage + input_documents, conversation_has_own_documents, usage ): if isinstance(item, DocumentParsingResult): doc_result = item @@ -1228,14 +1267,14 @@ async def _run_agent( # pylint: disable=too-many-locals if doc_result is None or not doc_result.success: return - conversation_has_documents = doc_result.has_documents + conversation_has_own_documents = doc_result.has_documents await self._agent_stop_streaming(force_cache_check=True) self._setup_self_documentation_tool() self._setup_web_search_tool() self._setup_web_search(force_web_search) - if await self._check_should_enable_rag(conversation_has_documents): + if await self._check_should_enable_rag(conversation_has_own_documents): document_context_instruction = await self._build_document_context_instruction() self._setup_rag_tools(document_context_instruction=document_context_instruction) diff --git a/src/backend/chat/document_context_builder.py b/src/backend/chat/document_context_builder.py index 98dc6c47..5a97f0c5 100644 --- a/src/backend/chat/document_context_builder.py +++ b/src/backend/chat/document_context_builder.py @@ -155,7 +155,7 @@ def _build_documents_listing( ) -def _render_listing(listing: DocumentsListing) -> str: +def render_listing(listing: DocumentsListing) -> str: """Serialize the listing as the JSON-prefixed instruction snippet. `project_documents=None` is dropped so the LLM doesn't see an empty/null @@ -260,7 +260,7 @@ def _project_placeholder_docs( ] -async def build_document_context_instruction( # noqa: PLR0913 # pylint: disable=too-many-arguments,too-many-locals +async def build_documents_listing( # noqa: PLR0913 # pylint: disable=too-many-arguments,too-many-locals *, conversation_id: str, text_attachments: Sequence[models.ChatConversationAttachment], @@ -269,9 +269,12 @@ async def build_document_context_instruction( # noqa: PLR0913 # pylint: disable budget_ratio: float, security_buffer_tokens: int, project_text_attachments: Sequence[models.ChatConversationAttachment] = (), -) -> str: +) -> DocumentsListing | None: """ - Build document instructions with a rolling full-context FIFO window. + Build the documents listing with a rolling full-context FIFO window. + + Returns None when there are no attachments to list. Use render_listing() to + serialise the result into the instruction string for the model. Rules: - Reserve a ratio of max model context for full document inclusion. @@ -282,7 +285,7 @@ async def build_document_context_instruction( # noqa: PLR0913 # pylint: disable for the inlining budget and have their own per-array `info` ordering. """ if not text_attachments and not project_text_attachments: - return "" + return None project_docs = _project_placeholder_docs(project_text_attachments) @@ -305,12 +308,10 @@ async def build_document_context_instruction( # noqa: PLR0913 # pylint: disable conversation_id, model_hrid, ) - return _render_listing( - _build_documents_listing( - docs=placeholder_docs, - force_tool_call_only=True, - project_docs=project_docs, - ) + return _build_documents_listing( + docs=placeholder_docs, + force_tool_call_only=True, + project_docs=project_docs, ) if budget_ratio == 0: @@ -320,12 +321,10 @@ async def build_document_context_instruction( # noqa: PLR0913 # pylint: disable conversation_id, model_hrid, ) - return _render_listing( - _build_documents_listing( - docs=placeholder_docs, - force_tool_call_only=True, - project_docs=project_docs, - ) + return _build_documents_listing( + docs=placeholder_docs, + force_tool_call_only=True, + project_docs=project_docs, ) document_budget = max(int(max_token_context * budget_ratio) - security_buffer_tokens, 0) @@ -390,6 +389,6 @@ async def _load_document( inlined_count, ) - return _render_listing( - _build_documents_listing(docs=docs, force_tool_call_only=False, project_docs=project_docs) + return _build_documents_listing( + docs=docs, force_tool_call_only=False, project_docs=project_docs ) diff --git a/src/backend/chat/enums.py b/src/backend/chat/enums.py new file mode 100644 index 00000000..5f3b23e9 --- /dev/null +++ b/src/backend/chat/enums.py @@ -0,0 +1,18 @@ +"""Chat conversation collection index state enums declaration.""" + +from enum import StrEnum + + +class CollectionIndexState(StrEnum): + """Defines the possible index states for a conversation's collection.""" + + UNINDEXED = "unindexed" # No collection; default for all new conversations + DEINDEXED = "deindexed" # Was indexed, then de-indexed by the inactivity command + INDEXING = "indexing" # Claim held, reindex in progress + INDEXED = "indexed" # Collection exists; collection_id holds real backend ID + ERROR = "error" # Last attempt failed; will retry on next request + + @classmethod + def choices(cls): + """Return a list of tuples for each enum member.""" + return [(member.value, member.name) for member in cls] diff --git a/src/backend/chat/management/__init__.py b/src/backend/chat/management/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/backend/chat/management/commands/__init__.py b/src/backend/chat/management/commands/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/backend/chat/management/commands/deindex_inactive_collections.py b/src/backend/chat/management/commands/deindex_inactive_collections.py new file mode 100644 index 00000000..2ed1c9bf --- /dev/null +++ b/src/backend/chat/management/commands/deindex_inactive_collections.py @@ -0,0 +1,114 @@ +"""Management command to de-index RAG collections from inactive conversations.""" + +import asyncio +import logging +from datetime import timedelta + +from django.conf import settings +from django.core.management.base import BaseCommand +from django.utils import timezone +from django.utils.module_loading import import_string + +from chat.enums import CollectionIndexState +from chat.models import ChatConversation, ChatConversationAttachment + +logger = logging.getLogger(__name__) + +_PARALLEL = 10 + + +async def _deindex_chunk(chunk, backend_cls, threshold): + """Fire HTTP deletes in parallel; each task owns its own DB clear + restore.""" + + async def _delete_one(conv): + def _claim(): + # Conditional claim: only clear if the row still has our collection_id + # and is still inactive, so we don't race with a concurrent re-index. + claimed = ChatConversation.objects.filter( + pk=conv.pk, + collection_id=conv.collection_id, + updated_at__lt=threshold, + ).update(collection_id=None, index_state=CollectionIndexState.DEINDEXED) + if claimed: + ChatConversationAttachment.objects.filter(conversation_id=conv.pk).update( + is_indexed=False, rag_document_id=None + ) + return bool(claimed) + + if not await asyncio.to_thread(_claim): + return False # already claimed or re-indexed by another process + + backend = backend_cls(collection_id=conv.collection_id) + await backend.adelete_collection() + return True + + results = await asyncio.gather( + *[_delete_one(conv) for conv in chunk], + return_exceptions=True, + ) + + success = errors = 0 + for conv, result in zip(chunk, results, strict=True): + if isinstance(result, Exception): + # Conditional restore: only write back if the row is still NULL so we + # don't overwrite a collection_id set by a concurrent re-index. + await asyncio.to_thread( + ChatConversation.objects.filter(pk=conv.pk, collection_id__isnull=True).update, + collection_id=conv.collection_id, + index_state=CollectionIndexState.INDEXED, + ) + logger.error("Failed to de-index collection for conversation %s: %s", conv.pk, result) + errors += 1 + elif result: + logger.info( + "De-indexed collection %s for conversation %s", + conv.collection_id, + conv.pk, + ) + success += 1 + # result is False: another process already handled this row, skip + return success, errors + + +class Command(BaseCommand): + """De-index RAG collections from conversations inactive longer than the threshold.""" + + help = "De-index RAG collections from inactive conversations." + + def handle(self, *args, **options): + """Run de-indexing for inactive conversations up to DEINDEX_BATCH_SIZE.""" + threshold = timezone.now() - timedelta(days=settings.RAG_COLLECTION_INACTIVITY_DAYS) + queryset = ( + ChatConversation.objects.filter( + collection_id__isnull=False, + updated_at__lt=threshold, + ) + .only("id", "collection_id") + .iterator(chunk_size=100) + ) + backend_cls = import_string(settings.RAG_DOCUMENT_SEARCH_BACKEND) + batch_limit = settings.DEINDEX_MAX_PER_RUN + count_success = count_error = 0 + chunk = [] + + def flush(): + nonlocal count_success, count_error, chunk + s, e = asyncio.run(_deindex_chunk(chunk, backend_cls, threshold)) + count_success += s + count_error += e + chunk = [] + + for conversation in queryset: + if count_success + count_error + len(chunk) >= batch_limit: + self.stdout.write("Batch limit reached, stopping.") + break + chunk.append(conversation) + if len(chunk) == _PARALLEL: + flush() + + if chunk: + flush() + + self.stdout.write( + self.style.SUCCESS(f"De-indexed {count_success} collection(s). {count_error} error(s).") + ) diff --git a/src/backend/chat/migrations/0008_chatconversation_index_state_and_more.py b/src/backend/chat/migrations/0008_chatconversation_index_state_and_more.py new file mode 100644 index 00000000..69ae643c --- /dev/null +++ b/src/backend/chat/migrations/0008_chatconversation_index_state_and_more.py @@ -0,0 +1,38 @@ +# Generated by Django 5.2.13 on 2026-05-27 14:42 + +from django.db import migrations, models + +import chat.enums + + +class Migration(migrations.Migration): + dependencies = [ + ("chat", "0007_chatconversationattachment_project_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="chatconversation", + name="index_state", + field=models.CharField( + choices=[ + ("unindexed", "UNINDEXED"), + ("deindexed", "DEINDEXED"), + ("indexing", "INDEXING"), + ("indexed", "INDEXED"), + ("error", "ERROR"), + ], + default=chat.enums.CollectionIndexState["UNINDEXED"], + help_text="Current indexing state of this conversation's RAG collection", + max_length=20, + ), + ), + migrations.AddField( + model_name="chatconversationattachment", + name="is_indexed", + field=models.BooleanField( + default=False, + help_text="Whether this attachment has been indexed in the RAG backend", + ), + ), + ] diff --git a/src/backend/chat/models.py b/src/backend/chat/models.py index 8a40e5c2..bf271bda 100644 --- a/src/backend/chat/models.py +++ b/src/backend/chat/models.py @@ -11,6 +11,7 @@ from core.models import BaseModel from chat.ai_sdk_types import UIMessage +from chat.enums import CollectionIndexState User = get_user_model() @@ -155,6 +156,13 @@ class ChatConversation(BaseModel): help_text="Collection ID for the conversation, used for RAG document search", ) + index_state = models.CharField( + max_length=20, + choices=CollectionIndexState.choices(), + default=CollectionIndexState.UNINDEXED, + help_text="Current indexing state of this conversation's RAG collection", + ) + project = models.ForeignKey( ChatProject, related_name="conversations", @@ -241,6 +249,11 @@ class ChatConversationAttachment(BaseModel): ), ) + is_indexed = models.BooleanField( + default=False, + help_text="Whether this attachment has been indexed in the RAG backend", + ) + class Meta: # pylint: disable=missing-class-docstring constraints = [ models.CheckConstraint( diff --git a/src/backend/chat/tests/clients/pydantic_ai/test_check_should_enable_rag.py b/src/backend/chat/tests/clients/pydantic_ai/test_check_should_enable_rag.py index ade61f93..758a59a3 100644 --- a/src/backend/chat/tests/clients/pydantic_ai/test_check_should_enable_rag.py +++ b/src/backend/chat/tests/clients/pydantic_ai/test_check_should_enable_rag.py @@ -8,6 +8,7 @@ from core.file_upload.enums import AttachmentStatus from chat.clients.pydantic_ai import AIAgentService +from chat.enums import CollectionIndexState from chat.factories import ( ChatConversationAttachmentFactory, ChatConversationFactory, @@ -49,8 +50,15 @@ def test_returns_false_when_feature_disabled(feature_flags): def test_returns_true_when_conversation_has_documents_arg(): - """Caller-provided in-message-document signal is enough.""" - conversation = ChatConversationFactory() + """Caller-provided in-message-document signal enables RAG once documents are indexed. + + In practice _parse_input_documents always runs before this check and sets + index_state=INDEXED, so the conversation must reflect that post-parse state. + """ + conversation = ChatConversationFactory( + index_state=CollectionIndexState.INDEXED, + collection_id="col-abc", + ) assert _check(conversation, conversation_has_documents=True) is True diff --git a/src/backend/chat/tests/clients/pydantic_ai/test_reindex_conversation.py b/src/backend/chat/tests/clients/pydantic_ai/test_reindex_conversation.py new file mode 100644 index 00000000..8d2fded5 --- /dev/null +++ b/src/backend/chat/tests/clients/pydantic_ai/test_reindex_conversation.py @@ -0,0 +1,479 @@ +"""Tests for reindex_conversation.""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from asgiref.sync import sync_to_async + +from core.file_upload.enums import AttachmentStatus + +from chat import models +from chat.clients.conversation_reindexer import reindex_conversation +from chat.enums import CollectionIndexState +from chat.factories import ChatConversationAttachmentFactory, ChatConversationFactory +from chat.vercel_ai_sdk.core import events_v4 + +# transaction=True is required so writes done via async threadpool connections +# (asave, aupdate) commit and are flushed via TRUNCATE between tests instead of +# leaking across them. +pytestmark = pytest.mark.django_db(transaction=True) + + +@pytest.mark.asyncio +async def test_reindex_emits_events_and_saves_collection_id(): + """Re-indexing emits ToolCallPart + ToolResultPart and saves collection_id.""" + conversation = await sync_to_async(ChatConversationFactory)( + collection_id=None, index_state=CollectionIndexState.DEINDEXED + ) + await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="text/markdown", + ) + await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="text/markdown", + ) + + mock_store = MagicMock() + mock_store.collection_id = "new-456" + mock_store.acreate_collection = AsyncMock() + + mock_backend = MagicMock(return_value=mock_store) + + to_thread_calls = [] + + async def capture_to_thread(func, *args, **kwargs): + to_thread_calls.append(func) + + with ( + patch("chat.clients.conversation_reindexer.document_store_backend", mock_backend), + patch( + "chat.clients.conversation_reindexer._read_attachment_bytes", + new=AsyncMock(return_value=b"data"), + ), + patch("asyncio.to_thread", side_effect=capture_to_thread), + ): + events = [] + async for event in reindex_conversation(conversation, in_context_ids=set()): + events.append(event) + + assert len(events) == 2 + assert isinstance(events[0], events_v4.ToolCallPart) + assert events[0].tool_name == "conversation_resume" + assert isinstance(events[1], events_v4.ToolResultPart) + assert events[1].result == {"state": "done"} + + # store_document (not parse_and_store_document) should be called for text/* attachments + assert len(to_thread_calls) == 2 + assert all(func == mock_store.store_document for func in to_thread_calls) + + await conversation.arefresh_from_db() + assert conversation.collection_id == "new-456" + + +@pytest.mark.asyncio +async def test_reindex_yields_nothing_when_no_ready_attachments(): + """No events yielded and collection_id stays None when no READY attachments.""" + conversation = await sync_to_async(ChatConversationFactory)( + collection_id=None, index_state=CollectionIndexState.DEINDEXED + ) + await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.PENDING, + ) + + events = [] + async for event in reindex_conversation(conversation, in_context_ids=set()): + events.append(event) + + assert not events + assert conversation.collection_id is None + + +@pytest.mark.asyncio +async def test_reindex_yields_error_event_on_collection_creation_failure(): + """Error event is yielded when acreate_collection raises, collection_id stays None.""" + conversation = await sync_to_async(ChatConversationFactory)( + collection_id=None, index_state=CollectionIndexState.DEINDEXED + ) + await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="text/markdown", + ) + + mock_store = MagicMock() + mock_store.collection_id = None + mock_store.acreate_collection = AsyncMock(side_effect=RuntimeError("Albert down")) + mock_backend = MagicMock(return_value=mock_store) + + with patch("chat.clients.conversation_reindexer.document_store_backend", mock_backend): + events = [] + async for event in reindex_conversation(conversation, in_context_ids=set()): + events.append(event) + + assert len(events) == 2 + assert isinstance(events[0], events_v4.ToolCallPart) + assert events[0].tool_name == "conversation_resume" + assert isinstance(events[1], events_v4.ToolResultPart) + assert events[1].result["state"] == "error" + + assert conversation.collection_id is None + + +@pytest.mark.asyncio +async def test_reindex_continues_on_individual_attachment_failure(): + """Failure on one attachment doesn't abort the loop; collection_id is saved.""" + conversation = await sync_to_async(ChatConversationFactory)( + collection_id=None, index_state=CollectionIndexState.DEINDEXED + ) + await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="text/markdown", + file_name="first.md", + ) + await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="text/markdown", + file_name="second.md", + ) + + mock_store = MagicMock() + mock_store.collection_id = "col-789" + mock_store.acreate_collection = AsyncMock() + + mock_to_thread = AsyncMock(side_effect=[RuntimeError("store_document failed"), None]) + + mock_backend = MagicMock(return_value=mock_store) + + with ( + patch("chat.clients.conversation_reindexer.document_store_backend", mock_backend), + patch( + "chat.clients.conversation_reindexer._read_attachment_bytes", + new=AsyncMock(return_value=b"data"), + ), + patch("asyncio.to_thread", side_effect=mock_to_thread), + ): + events = [] + async for event in reindex_conversation(conversation, in_context_ids=set()): + events.append(event) + + assert mock_to_thread.call_count == 2 # both attachments attempted + assert isinstance(events[-1], events_v4.ToolResultPart) + assert events[-1].result["state"] == "partial" + assert "first.md" in events[-1].result["failed_documents"] + + await conversation.arefresh_from_db() + assert conversation.collection_id == "col-789" + assert conversation.index_state == CollectionIndexState.ERROR + + +@pytest.mark.asyncio +async def test_reindex_skips_binary_attachments(): + """Binary attachments (PDF, etc.) are filtered out; only text/* attachments are stored.""" + conversation = await sync_to_async(ChatConversationFactory)( + collection_id=None, index_state=CollectionIndexState.DEINDEXED + ) + await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="application/pdf", + file_name="doc.pdf", + ) + await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="text/markdown", + file_name="doc.md", + ) + + mock_store = MagicMock() + mock_store.collection_id = "col-abc" + mock_store.acreate_collection = AsyncMock() + + mock_backend = MagicMock(return_value=mock_store) + + to_thread_calls = [] + + async def capture_to_thread(func, *args, **kwargs): + to_thread_calls.append(func) + + with ( + patch("chat.clients.conversation_reindexer.document_store_backend", mock_backend), + patch( + "chat.clients.conversation_reindexer._read_attachment_bytes", + new=AsyncMock(return_value=b"data"), + ), + patch("asyncio.to_thread", side_effect=capture_to_thread), + ): + events = [] + async for event in reindex_conversation(conversation, in_context_ids=set()): + events.append(event) + + # Only the text/markdown attachment triggers store_document; PDF is skipped + assert len(to_thread_calls) == 1 + assert to_thread_calls[0] == mock_store.store_document + assert mock_store.parse_and_store_document.call_count == 0 + + assert isinstance(events[-1], events_v4.ToolResultPart) + assert events[-1].result == {"state": "done"} + + +@pytest.mark.asyncio +async def test_reindex_skips_in_context_attachments(): + """Attachments whose IDs are in in_context_ids (full-context) are not reindexed.""" + conversation = await sync_to_async(ChatConversationFactory)( + collection_id=None, index_state=CollectionIndexState.DEINDEXED + ) + att = await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="text/markdown", + ) + + events = [] + async for event in reindex_conversation(conversation, in_context_ids={str(att.id)}): + events.append(event) + + assert not events + assert conversation.collection_id is None + + +@pytest.mark.asyncio +async def test_reindex_only_reindexes_out_of_context_attachments(): + """Only attachments absent from in_context_ids are stored.""" + conversation = await sync_to_async(ChatConversationFactory)( + collection_id=None, index_state=CollectionIndexState.DEINDEXED + ) + in_ctx_att = await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="text/markdown", + file_name="small.md", + ) + _ = await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="text/markdown", + file_name="large.md", + ) + + mock_store = MagicMock() + mock_store.collection_id = "col-xyz" + mock_store.acreate_collection = AsyncMock() + + mock_backend = MagicMock(return_value=mock_store) + + to_thread_calls = [] + + async def capture_to_thread(func, *args, **kwargs): + to_thread_calls.append(func) + + with ( + patch("chat.clients.conversation_reindexer.document_store_backend", mock_backend), + patch( + "chat.clients.conversation_reindexer._read_attachment_bytes", + new=AsyncMock(return_value=b"data"), + ), + patch("asyncio.to_thread", side_effect=capture_to_thread), + ): + events = [] + async for event in reindex_conversation(conversation, in_context_ids={str(in_ctx_att.id)}): + events.append(event) + + # Only the out-of-context attachment is stored + assert len(to_thread_calls) == 1 + assert to_thread_calls[0] == mock_store.store_document + + assert isinstance(events[-1], events_v4.ToolResultPart) + assert events[-1].result == {"state": "done"} + + await conversation.arefresh_from_db() + assert conversation.collection_id == "col-xyz" + + +@pytest.mark.asyncio +async def test_concurrent_reindex_only_creates_one_collection(): + """Two concurrent resumes on the same de-indexed conversation create only one collection. + + asyncio.Barrier forces both coroutines to the claim step simultaneously so the + race window is deterministic. + """ + conversation = await sync_to_async(ChatConversationFactory)( + collection_id=None, index_state=CollectionIndexState.DEINDEXED + ) + await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="text/markdown", + ) + + create_calls = 0 + + async def counting_create(*args, **kwargs): + nonlocal create_calls + create_calls += 1 + + mock_store = MagicMock() + mock_store.collection_id = "col-123" + mock_store.acreate_collection = counting_create + + barrier = asyncio.Barrier(2) + + async def attempt(): + await barrier.wait() # both tasks reach the claim step simultaneously + async for _ in reindex_conversation(conversation, set()): + pass + + with ( + patch( + "chat.clients.conversation_reindexer.document_store_backend", + MagicMock(return_value=mock_store), + ), + patch( + "chat.clients.conversation_reindexer._read_attachment_bytes", + new=AsyncMock(return_value=b"data"), + ), + patch("asyncio.to_thread", new=AsyncMock()), + ): + await asyncio.gather(attempt(), attempt()) + + assert create_calls == 1 + await conversation.arefresh_from_db() + assert conversation.collection_id == "col-123" + + +@pytest.mark.asyncio +async def test_reindex_existing_and_new_attachment_creates_single_collection(): + """New file added on resume is indexed in the same collection as existing deindexed files.""" + conversation = await sync_to_async(ChatConversationFactory)( + collection_id=None, index_state=CollectionIndexState.DEINDEXED + ) + # Two existing attachments reset by the deindex command + await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="text/markdown", + file_name="existing1.md", + is_indexed=False, + ) + await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="text/markdown", + file_name="existing2.md", + is_indexed=False, + ) + # New file added by the user when resuming (never indexed) + await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="text/markdown", + file_name="new.md", + is_indexed=False, + ) + + mock_store = MagicMock() + mock_store.collection_id = "col-resume" + mock_store.acreate_collection = AsyncMock() + + mock_backend = MagicMock(return_value=mock_store) + + to_thread_calls = [] + + async def capture_to_thread(func, *args, **kwargs): + to_thread_calls.append(func) + + with ( + patch("chat.clients.conversation_reindexer.document_store_backend", mock_backend), + patch( + "chat.clients.conversation_reindexer._read_attachment_bytes", + new=AsyncMock(return_value=b"data"), + ), + patch("asyncio.to_thread", side_effect=capture_to_thread), + ): + events = [] + async for event in reindex_conversation(conversation, in_context_ids=set()): + events.append(event) + + # Exactly one collection created for all three files + mock_store.acreate_collection.assert_called_once() + # All three attachments stored in that collection + assert len(to_thread_calls) == 3 + assert all(func == mock_store.store_document for func in to_thread_calls) + + assert isinstance(events[-1], events_v4.ToolResultPart) + assert events[-1].result == {"state": "done"} + + await conversation.arefresh_from_db() + assert conversation.collection_id == "col-resume" + assert conversation.index_state == CollectionIndexState.INDEXED + + +@pytest.mark.asyncio +async def test_reindex_yields_nothing_when_indexing_in_progress(): + """No events yielded when another process holds a fresh INDEXING claim.""" + conversation = await sync_to_async(ChatConversationFactory)( + collection_id=None, index_state=CollectionIndexState.DEINDEXED + ) + await models.ChatConversation.objects.filter(pk=conversation.pk).aupdate( + index_state=CollectionIndexState.INDEXING, + ) + await conversation.arefresh_from_db() + await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="text/markdown", + ) + + events = [] + async for event in reindex_conversation(conversation, in_context_ids=set()): + events.append(event) + + assert not events + await conversation.arefresh_from_db() + assert conversation.index_state == CollectionIndexState.INDEXING + + +@pytest.mark.asyncio +async def test_reindex_all_failures_sets_error_and_saves_collection_id(): + """When all attachments fail, index_state=ERROR and collection_id is saved (for retry reuse).""" + conversation = await sync_to_async(ChatConversationFactory)( + collection_id=None, index_state=CollectionIndexState.DEINDEXED + ) + await sync_to_async(ChatConversationAttachmentFactory)( + conversation=conversation, + upload_state=AttachmentStatus.READY, + content_type="text/markdown", + file_name="fail.md", + ) + + mock_store = MagicMock() + mock_store.collection_id = "col-new" + mock_store.acreate_collection = AsyncMock() + mock_backend = MagicMock(return_value=mock_store) + + with ( + patch("chat.clients.conversation_reindexer.document_store_backend", mock_backend), + patch( + "chat.clients.conversation_reindexer._read_attachment_bytes", + new=AsyncMock(return_value=b"data"), + ), + patch("asyncio.to_thread", new=AsyncMock(side_effect=RuntimeError("backend down"))), + ): + events = [] + async for event in reindex_conversation(conversation, in_context_ids=set()): + events.append(event) + + assert len(events) == 2 + assert isinstance(events[-1], events_v4.ToolResultPart) + assert events[-1].result["state"] == "error" + + await conversation.arefresh_from_db() + assert conversation.index_state == CollectionIndexState.ERROR + assert conversation.collection_id == "col-new" diff --git a/src/backend/chat/tests/management/__init__.py b/src/backend/chat/tests/management/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/backend/chat/tests/management/commands/__init__.py b/src/backend/chat/tests/management/commands/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/backend/chat/tests/management/commands/test_deindex_inactive_collections.py b/src/backend/chat/tests/management/commands/test_deindex_inactive_collections.py new file mode 100644 index 00000000..f5db0362 --- /dev/null +++ b/src/backend/chat/tests/management/commands/test_deindex_inactive_collections.py @@ -0,0 +1,131 @@ +"""Tests for the deindex_inactive_collections management command.""" + +from datetime import timedelta +from unittest.mock import AsyncMock, MagicMock, patch + +from django.core.management import call_command +from django.utils import timezone + +import pytest + +from chat.enums import CollectionIndexState +from chat.factories import ChatConversationFactory +from chat.models import ChatConversation + + +def _make_backend_mock(**adelete_kwargs): + """Return a backend class mock whose instances have an awaitable adelete_collection.""" + mock_backend_cls = MagicMock() + mock_backend_cls.return_value.adelete_collection = AsyncMock(**adelete_kwargs) + return mock_backend_cls + + +@pytest.mark.django_db(transaction=True) +def test_deindexes_inactive_conversation(settings): + """Active collection on inactive conversation is deleted and collection_id nulled.""" + settings.RAG_COLLECTION_INACTIVITY_DAYS = 30 + conversation = ChatConversationFactory(collection_id="albert-123") + ChatConversation.objects.filter(pk=conversation.pk).update( + updated_at=timezone.now() - timedelta(days=31) + ) + + mock_backend_cls = _make_backend_mock() + with patch( + "chat.management.commands.deindex_inactive_collections.import_string", + return_value=mock_backend_cls, + ): + call_command("deindex_inactive_collections") + + mock_backend_cls.assert_called_once_with(collection_id="albert-123") + mock_backend_cls.return_value.adelete_collection.assert_called_once() + conversation.refresh_from_db() + assert conversation.collection_id is None + assert conversation.index_state == CollectionIndexState.DEINDEXED + + +@pytest.mark.django_db(transaction=True) +def test_skips_active_conversation(settings): + """Collection on a recently active conversation is not deleted.""" + settings.RAG_COLLECTION_INACTIVITY_DAYS = 30 + conversation = ChatConversationFactory(collection_id="albert-456") + # updated_at is auto_now so it is already recent — no override needed + + mock_backend_cls = _make_backend_mock() + with patch( + "chat.management.commands.deindex_inactive_collections.import_string", + return_value=mock_backend_cls, + ): + call_command("deindex_inactive_collections") + + mock_backend_cls.return_value.adelete_collection.assert_not_called() + conversation.refresh_from_db() + assert conversation.collection_id == "albert-456" + + +@pytest.mark.django_db(transaction=True) +def test_skips_conversation_without_collection(settings): + """Conversation without a collection_id is never processed.""" + settings.RAG_COLLECTION_INACTIVITY_DAYS = 30 + conversation = ChatConversationFactory(collection_id=None) + ChatConversation.objects.filter(pk=conversation.pk).update( + updated_at=timezone.now() - timedelta(days=31) + ) + + mock_backend_cls = _make_backend_mock() + with patch( + "chat.management.commands.deindex_inactive_collections.import_string", + return_value=mock_backend_cls, + ): + call_command("deindex_inactive_collections") + + mock_backend_cls.assert_not_called() + mock_backend_cls.return_value.adelete_collection.assert_not_called() + + +@pytest.mark.django_db(transaction=True) +def test_continues_on_error(settings): + """If the first conversation fails, the second one is still processed.""" + settings.RAG_COLLECTION_INACTIVITY_DAYS = 30 + + conv1 = ChatConversationFactory(collection_id="albert-fail") + conv2 = ChatConversationFactory(collection_id="albert-ok") + past = timezone.now() - timedelta(days=31) + ChatConversation.objects.filter(pk__in=[conv1.pk, conv2.pk]).update(updated_at=past) + + mock_backend_cls = _make_backend_mock(side_effect=[Exception("API error"), None]) + + with patch( + "chat.management.commands.deindex_inactive_collections.import_string", + return_value=mock_backend_cls, + ): + call_command("deindex_inactive_collections") + + assert mock_backend_cls.return_value.adelete_collection.call_count == 2 + + conv1.refresh_from_db() + conv2.refresh_from_db() + # conv1 failed, collection_id should remain set + assert conv1.collection_id == "albert-fail" + # conv2 succeeded, collection_id should be None + assert conv2.collection_id is None + + +@pytest.mark.django_db(transaction=True) +def test_does_not_update_updated_at(settings): + """Running the command must not change updated_at on the conversation.""" + settings.RAG_COLLECTION_INACTIVITY_DAYS = 30 + conversation = ChatConversationFactory(collection_id="albert-789") + past = timezone.now() - timedelta(days=31) + ChatConversation.objects.filter(pk=conversation.pk).update(updated_at=past) + conversation.refresh_from_db() + original_updated_at = conversation.updated_at + + mock_backend_cls = _make_backend_mock() + with patch( + "chat.management.commands.deindex_inactive_collections.import_string", + return_value=mock_backend_cls, + ): + call_command("deindex_inactive_collections") + + conversation.refresh_from_db() + assert conversation.updated_at == original_updated_at diff --git a/src/backend/chat/tests/test_document_context_builder.py b/src/backend/chat/tests/test_document_context_builder.py index 1ef2194c..8eb1f285 100644 --- a/src/backend/chat/tests/test_document_context_builder.py +++ b/src/backend/chat/tests/test_document_context_builder.py @@ -1,5 +1,5 @@ """ -Real-component tests for build_document_context_instruction. +Real-component tests for document_context_builder. These tests exercise the full builder logic with: - Real Django ORM (factory-created attachments) @@ -8,8 +8,8 @@ builder module so budget math is precise. """ +import dataclasses import datetime -import json from django.core.files.base import ContentFile from django.core.files.storage import default_storage @@ -18,7 +18,9 @@ from asgiref.sync import sync_to_async from chat.constants import ACCESS_FULL_CONTEXT, ACCESS_TOOL_CALL_ONLY -from chat.document_context_builder import build_document_context_instruction +from chat.document_context_builder import ( + build_documents_listing, +) from chat.factories import ChatConversationAttachmentFactory, ChatConversationFactory from chat.models import ChatConversationAttachment @@ -76,21 +78,19 @@ def _ts(offset_seconds: int): _amake_attachment = sync_to_async(_make_attachment) -def _parse_listing(instruction: str) -> dict: - """Extract and parse the JSON listing from the instruction string.""" - prefix = "List of documents attached to this conversation:\n" - assert prefix in instruction, f"missing prefix in: {instruction!r}" - return json.loads(instruction.split(prefix, 1)[1]) +def _to_dict(listing) -> dict: + """Convert a DocumentsListing to a plain dict for assertions.""" + return dataclasses.asdict(listing) async def _build(conversation, *, max_token_context=100, budget_ratio=0.5, security_buffer=0): - """Run build_document_context_instruction with real components.""" + """Run build_documents_listing with real components.""" text_attachments = await sync_to_async(list)( conversation.attachments.filter(content_type__startswith="text/").order_by( "created_at", "id" ) ) - return await build_document_context_instruction( + return await build_documents_listing( conversation_id=str(conversation.id), text_attachments=text_attachments, model_hrid="test-model", @@ -101,11 +101,11 @@ async def _build(conversation, *, max_token_context=100, budget_ratio=0.5, secur @pytest.mark.asyncio -async def test_empty_attachments_returns_empty_string(): - """Conversation with no text attachments returns an empty instruction.""" +async def test_no_attachments_returns_none(): + """Conversation with no text attachments returns None.""" conversation = await _acreate_conversation() - instruction = await _build(conversation) - assert instruction == "" + listing = await _build(conversation) + assert listing is None @pytest.mark.asyncio @@ -118,8 +118,7 @@ async def test_single_small_doc_is_inlined(): content="ten words " * 5, # 10 words ) - instruction = await _build(conversation) # budget = 50 tokens - listing = _parse_listing(instruction) + listing = _to_dict(await _build(conversation)) # budget = 50 tokens assert listing["documents_order"] == "newest_to_oldest" assert len(listing["documents"]) == 1 @@ -142,8 +141,7 @@ async def test_oversized_doc_kept_tool_call_only(): content="word " * 60, # 60 > budget 50 ) - instruction = await _build(conversation) - listing = _parse_listing(instruction) + listing = _to_dict(await _build(conversation)) doc = listing["documents"][0] assert doc["title"] == "big.md" @@ -165,8 +163,7 @@ async def test_multiple_small_docs_all_inlined(): conversation, file_name="doc-3.md", content="gamma " * 10, created_at=_ts(3) ) # newest - instruction = await _build(conversation) # budget 50, total 30 - listing = _parse_listing(instruction) + listing = _to_dict(await _build(conversation)) # budget 50, total 30 docs = listing["documents"] # Newest first in the rendered listing. @@ -197,8 +194,7 @@ async def test_fifo_eviction_evicts_oldest_when_budget_overflows(): conversation, file_name="doc-3.md", content="gamma " * 25, created_at=_ts(3) ) # newest - instruction = await _build(conversation) # budget 50; 3 * 25 = 75 - listing = _parse_listing(instruction) + listing = _to_dict(await _build(conversation)) # budget 50; 3 * 25 = 75 by_id = {d["document_id"]: d for d in listing["documents"]} # Oldest evicted, two newest inlined. @@ -222,8 +218,7 @@ async def test_oversized_and_small_docs_processed_independently(): ) a3 = await _amake_attachment(conversation, file_name="small-2.md", content="gamma " * 20) - instruction = await _build(conversation) # budget 50 - listing = _parse_listing(instruction) + listing = _to_dict(await _build(conversation)) # budget 50 by_id = {d["document_id"]: d for d in listing["documents"]} # Smalls inline (20 + 20 = 40 <= 50). Huge stays tool_call_only and does not @@ -247,8 +242,7 @@ async def test_failed_storage_read_isolated_from_other_docs(): ) a3 = await _amake_attachment(conversation, file_name="ok-2.md", content="gamma " * 15) - instruction = await _build(conversation) # budget 50 - listing = _parse_listing(instruction) + listing = _to_dict(await _build(conversation)) # budget 50 by_id = {d["document_id"]: d for d in listing["documents"]} # Failed read kept tool_call_only with sentinel content; never inlined. @@ -279,8 +273,7 @@ async def test_fifo_eviction_can_evict_multiple_oldest_for_one_new_doc(): conversation, file_name="doc-4.md", content="delta " * 40, created_at=_ts(4) ) # 40 tokens, newest - evicts a1, a2, a3 to fit (40 + 15 = 55 > 50) - instruction = await _build(conversation) # budget = 50 - listing = _parse_listing(instruction) + listing = _to_dict(await _build(conversation)) # budget = 50 by_id = {d["document_id"]: d for d in listing["documents"]} # a1, a2, a3 all evicted to make room for a4. @@ -301,8 +294,7 @@ async def test_doc_exactly_filling_budget_is_inlined(): content="word " * 50, # exactly equal to budget=50 ) - instruction = await _build(conversation) - listing = _parse_listing(instruction) + listing = _to_dict(await _build(conversation)) doc = listing["documents"][0] assert doc["document_id"] == str(attachment.id) diff --git a/src/backend/chat/tests/views/chat/conversations/test_conversation_with_document_url.py b/src/backend/chat/tests/views/chat/conversations/test_conversation_with_document_url.py index 70be6a0b..7670d25b 100644 --- a/src/backend/chat/tests/views/chat/conversations/test_conversation_with_document_url.py +++ b/src/backend/chat/tests/views/chat/conversations/test_conversation_with_document_url.py @@ -884,7 +884,7 @@ def test_post_conversation_with_local_not_pdf_document_url( status=200, ) - chat_conversation = ChatConversationFactory(owner__language="en-us") + chat_conversation = ChatConversationFactory(owner__language="en-us", collection_id="123") api_client.force_authenticate(user=chat_conversation.owner) file_path = f"{chat_conversation.pk}/sample.pdf" diff --git a/src/backend/conversations/settings.py b/src/backend/conversations/settings.py index 7750df61..0c452d3c 100755 --- a/src/backend/conversations/settings.py +++ b/src/backend/conversations/settings.py @@ -776,6 +776,17 @@ class Base(BraveSettings, Configuration): environ_name="RAG_DOCUMENT_PARSER", environ_prefix=None, ) + RAG_COLLECTION_INACTIVITY_DAYS = values.PositiveIntegerValue( + default=30, + environ_name="RAG_COLLECTION_INACTIVITY_DAYS", + environ_prefix=None, + ) + REINDEX_CLAIM_TIMEOUT_SECONDS = values.PositiveIntegerValue(120, environ_prefix=None) + DEINDEX_MAX_PER_RUN = values.PositiveIntegerValue( + default=100, + environ_name="DEINDEX_MAX_PER_RUN", + environ_prefix=None, + ) SPECIFIC_RAG_DOCUMENT_SEARCH_TOOLS = values.DictValue( default={}, environ_name="SPECIFIC_RAG_DOCUMENT_SEARCH_TOOLS", diff --git a/src/frontend/apps/conversations/src/features/chat/assets/chat-bubbles-illustration.svg b/src/frontend/apps/conversations/src/features/chat/assets/chat-bubbles-illustration.svg new file mode 100644 index 00000000..d04fd39d --- /dev/null +++ b/src/frontend/apps/conversations/src/features/chat/assets/chat-bubbles-illustration.svg @@ -0,0 +1,112 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/frontend/apps/conversations/src/features/chat/components/Chat.tsx b/src/frontend/apps/conversations/src/features/chat/components/Chat.tsx index 0ea36506..06a22e51 100644 --- a/src/frontend/apps/conversations/src/features/chat/components/Chat.tsx +++ b/src/frontend/apps/conversations/src/features/chat/components/Chat.tsx @@ -120,7 +120,7 @@ export const Chat = ({ const chatContainerRef = useRef(null); const [chatErrorModal, setChatErrorModal] = useState<{ title: string; - message: string; + message: React.ReactNode; } | null>(null); const { setIsAtTop } = useScrollStore(); @@ -352,6 +352,50 @@ export const Chat = ({ } }, [messages]); + const lastResumeErrorMessageIdRef = useRef(null); + + // Show error modal when conversation resume fails to re-index some documents + useEffect(() => { + if (status === 'streaming' || status === 'submitted') return; + const lastAssistant = messages.filter((m) => m.role === 'assistant').pop(); + if (!lastAssistant?.parts) return; + if (lastAssistant.id === lastResumeErrorMessageIdRef.current) return; + const resumePart = lastAssistant.parts.find( + (p) => + p.type === 'tool-invocation' && + p.toolInvocation.toolName === 'conversation_resume' && + p.toolInvocation.state === 'result', + ); + if (!resumePart || resumePart.type !== 'tool-invocation') return; + if (resumePart.toolInvocation.state !== 'result') return; + const result = resumePart.toolInvocation.result as { + state: string; + failed_documents?: string[]; + error?: string; + }; + const isFullError = result?.state === 'error'; + const hasPartialFailures = Boolean(result?.failed_documents?.length); + if (!isFullError && !hasPartialFailures) return; + lastResumeErrorMessageIdRef.current = lastAssistant.id; + setChatErrorModal({ + title: t('Re-indexing Error'), + message: isFullError ? ( + + {result.error ?? t('Documents could not be re-indexed. Please try again.')} + + ) : ( + <> + {t('Some documents could not be re-indexed, please re-upload them:')} + + + ), + }); + }, [messages, status, t]); + // Clear "reading instructions" once streaming begins or on error, with minimum display time useEffect(() => { if (isReadingInstructions) { @@ -424,6 +468,27 @@ export const Chat = ({ // eslint-disable-next-line react-hooks/exhaustive-deps }, [status]); + // Scroll to bottom when conversation_resume tool appears so the illustration is fully visible + useEffect(() => { + const lastAssistant = messages.filter((m) => m.role === 'assistant').pop(); + const hasResumeTool = lastAssistant?.parts?.some( + (p) => + p.type === 'tool-invocation' && + p.toolInvocation.toolName === 'conversation_resume', + ); + if (hasResumeTool && chatContainerRef.current) { + requestAnimationFrame(() => { + if (chatContainerRef.current) { + chatContainerRef.current.scrollTo({ + top: chatContainerRef.current.scrollHeight, + behavior: 'smooth', + }); + } + }); + } + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [messages]); + // Synchronize conversationId state with prop when it changes (e.g., after navigation) useEffect(() => { setConversationId(initialConversationId); diff --git a/src/frontend/apps/conversations/src/features/chat/components/MessageItem.tsx b/src/frontend/apps/conversations/src/features/chat/components/MessageItem.tsx index 0bd1f5ad..d1722130 100644 --- a/src/frontend/apps/conversations/src/features/chat/components/MessageItem.tsx +++ b/src/frontend/apps/conversations/src/features/chat/components/MessageItem.tsx @@ -238,13 +238,17 @@ const MessageItemComponent: React.FC = ({ const hasNonDocumentParsingTool = React.useMemo(() => { return toolInvocationParts.some( - (part) => part.toolInvocation.toolName !== 'document_parsing', + (part) => + part.toolInvocation.toolName !== 'document_parsing' && + part.toolInvocation.toolName !== 'conversation_resume', ); }, [toolInvocationParts]); const activeToolInvocation = React.useMemo(() => { const tool = toolInvocationParts.find( - (part) => part.toolInvocation.toolName !== 'document_parsing', + (part) => + part.toolInvocation.toolName !== 'document_parsing' && + part.toolInvocation.toolName !== 'conversation_resume', ); return tool?.toolInvocation; }, [toolInvocationParts]); diff --git a/src/frontend/apps/conversations/src/features/chat/components/ToolInvocationItem.tsx b/src/frontend/apps/conversations/src/features/chat/components/ToolInvocationItem.tsx index 1f5a2319..34e63ec8 100644 --- a/src/frontend/apps/conversations/src/features/chat/components/ToolInvocationItem.tsx +++ b/src/frontend/apps/conversations/src/features/chat/components/ToolInvocationItem.tsx @@ -4,6 +4,43 @@ import { useTranslation } from 'react-i18next'; import { Box, Loader, Text } from '@/components'; +import ChatBubblesIllustration from '../assets/chat-bubbles-illustration.svg'; + +const ConversationResumeLoader = ({ t }: { t: (key: string) => string }) => { + return ( + + + + + {t('Picking up where you left off')} + + + {t( + 'Bringing this conversation and its documents back. This may take a moment longer than usual.', + )} + + + + ); +}; + interface ToolInvocationItemProps { toolInvocation: ToolInvocation; status?: string; @@ -17,6 +54,14 @@ export const ToolInvocationItem: React.FC = ({ }) => { const { t } = useTranslation(); + if (toolInvocation.toolName === 'conversation_resume') { + if (toolInvocation.state !== 'result') { + return ; + } + + return null; + } + if (toolInvocation.toolName === 'document_parsing') { if ( toolInvocation.state === 'partial-call' || diff --git a/src/frontend/apps/conversations/src/features/chat/components/__tests__/ToolInvocationItem.test.tsx b/src/frontend/apps/conversations/src/features/chat/components/__tests__/ToolInvocationItem.test.tsx new file mode 100644 index 00000000..17d48b59 --- /dev/null +++ b/src/frontend/apps/conversations/src/features/chat/components/__tests__/ToolInvocationItem.test.tsx @@ -0,0 +1,49 @@ +import { render, screen } from '@testing-library/react'; + +import { ToolInvocationItem } from '../ToolInvocationItem'; + +jest.mock('react-i18next', () => ({ + useTranslation: () => ({ t: (key: string) => key }), +})); + +describe('ToolInvocationItem', () => { + describe('conversation_resume', () => { + it('shows the resuming loader when state is call', () => { + render( + , + ); + + expect( + screen.getByText('Picking up where you left off'), + ).toBeInTheDocument(); + expect( + screen.getByText( + 'Bringing this conversation and its documents back. This may take a moment longer than usual.', + ), + ).toBeInTheDocument(); + }); + + it('renders nothing when state is result', () => { + const { container } = render( + , + ); + + expect(container).toBeEmptyDOMElement(); + }); + }); +}); diff --git a/src/helm/conversations/templates/backend_cronjob_deindex.yaml b/src/helm/conversations/templates/backend_cronjob_deindex.yaml new file mode 100644 index 00000000..9bb30cc4 --- /dev/null +++ b/src/helm/conversations/templates/backend_cronjob_deindex.yaml @@ -0,0 +1,128 @@ +{{- if .Values.backend.deindexCronJob.enabled }} +{{- $envVars := include "conversations.common.env" (list . .Values.backend) -}} +{{- $fullName := include "conversations.backend.fullname" . -}} +{{- $component := "backend" -}} +apiVersion: batch/v1 +kind: CronJob +metadata: + name: {{ $fullName }}-deindex-collections + namespace: {{ .Release.Namespace | quote }} + labels: + {{- include "conversations.common.labels" (list . $component) | nindent 4 }} +spec: + schedule: {{ .Values.backend.deindexCronJob.schedule | quote }} + concurrencyPolicy: Forbid + jobTemplate: + spec: + ttlSecondsAfterFinished: {{ .Values.backend.jobs.ttlSecondsAfterFinished }} + backoffLimit: {{ .Values.backend.jobs.backoffLimit }} + template: + metadata: + annotations: + {{- with .Values.backend.podAnnotations }} + {{- toYaml . | nindent 12 }} + {{- end }} + labels: + {{- include "conversations.common.selectorLabels" (list . $component) | nindent 12 }} + spec: + {{- if $.Values.image.credentials }} + imagePullSecrets: + - name: {{ include "conversations.secret.dockerconfigjson.name" (dict "fullname" (include "conversations.fullname" .) "imageCredentials" $.Values.image.credentials) }} + {{- end }} + shareProcessNamespace: {{ .Values.backend.shareProcessNamespace }} + containers: + - name: {{ .Chart.Name }} + image: "{{ (.Values.backend.image | default dict).repository | default .Values.image.repository }}:{{ (.Values.backend.image | default dict).tag | default .Values.image.tag }}" + imagePullPolicy: {{ (.Values.backend.image | default dict).pullPolicy | default .Values.image.pullPolicy }} + command: + - python + - manage.py + - deindex_inactive_collections + env: + {{- if $envVars }} + {{- $envVars | indent 16 }} + {{- end }} + {{- with .Values.backend.securityContext }} + securityContext: + {{- toYaml . | nindent 16 }} + {{- end }} + {{- with .Values.backend.resources }} + resources: + {{- toYaml . | nindent 16 }} + {{- end }} + volumeMounts: + {{- range $index, $value := .Values.mountFiles }} + - name: "files-{{ $index }}" + mountPath: {{ $value.path }} + subPath: content + {{- end }} + {{- range $name, $volume := .Values.backend.persistence }} + - name: "{{ $name }}" + mountPath: "{{ $volume.mountPath }}" + {{- end }} + {{- range .Values.backend.extraVolumeMounts }} + - name: {{ .name }} + mountPath: {{ .mountPath }} + subPath: {{ .subPath | default "" }} + readOnly: {{ .readOnly }} + {{- end }} + {{- if .Values.backend.llmConfiguration.enabled }} + - name: llm-configuration + mountPath: {{ .Values.backend.llmConfiguration.mount_path }} + readOnly: true + {{- end }} + {{- with .Values.backend.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- with .Values.backend.affinity }} + affinity: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- with .Values.backend.tolerations }} + tolerations: + {{- toYaml . | nindent 12 }} + {{- end }} + restartPolicy: Never + volumes: + {{- range $index, $value := .Values.mountFiles }} + - name: "files-{{ $index }}" + configMap: + name: "{{ include "conversations.fullname" $ }}-files-{{ $index }}" + {{- end }} + {{- range $name, $volume := .Values.backend.persistence }} + - name: "{{ $name }}" + {{- if eq $volume.type "emptyDir" }} + emptyDir: {} + {{- else }} + persistentVolumeClaim: + claimName: "{{ $fullName }}-{{ $name }}" + {{- end }} + {{- end }} + {{- range .Values.backend.extraVolumes }} + - name: {{ .name }} + {{- if .existingClaim }} + persistentVolumeClaim: + claimName: {{ .existingClaim }} + {{- else if .hostPath }} + hostPath: + {{ toYaml .hostPath | nindent 16 }} + {{- else if .csi }} + csi: + {{- toYaml .csi | nindent 16 }} + {{- else if .configMap }} + configMap: + {{- toYaml .configMap | nindent 16 }} + {{- else if .emptyDir }} + emptyDir: + {{- toYaml .emptyDir | nindent 16 }} + {{- else }} + emptyDir: {} + {{- end }} + {{- end }} + {{- if .Values.backend.llmConfiguration.enabled }} + - name: llm-configuration + configMap: + name: conversations-llm-configuration + {{- end }} +{{- end }} diff --git a/src/helm/conversations/values.yaml b/src/helm/conversations/values.yaml index 1af35e1f..56bc8d71 100644 --- a/src/helm/conversations/values.yaml +++ b/src/helm/conversations/values.yaml @@ -207,6 +207,12 @@ backend: python manage.py createsuperuser --email $DJANGO_SUPERUSER_EMAIL --password $DJANGO_SUPERUSER_PASSWORD restartPolicy: Never + ## @param backend.deindexCronJob.enabled Whether to enable the de-index CronJob + ## @param backend.deindexCronJob.schedule Cron schedule for the de-index job + deindexCronJob: + enabled: false + schedule: "0 2 * * *" + ## @extra backend.job job dedicated to run a random management command, for example after a deployment ## @param backend.job.name The name to use to describe this job ## @param backend.job.command The management command to execute