Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to
- 🐛(fix) add prevent_url_hallucination instruction to ConversationAgent
- ✨(projects) handle project files for RAG search
- ✨(banner) configurable banner with level, title, content and start/end
- ✨(back) de-index collections after inactivity

### Changed

Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,11 @@ back-i18n-generate: ## create the .pot files used for i18n

shell: ## connect to database shell
@$(MANAGE) shell #_plus
.PHONY: dbshell
.PHONY: shell

deindex_inactive_collections: ## run the deindex_inactive_collections management command
@$(MANAGE) deindex_inactive_collections
.PHONY: deindex_inactive_collections

# -- Database

Expand Down
86 changes: 80 additions & 6 deletions docs/attachments.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ Both share the same model, same storage, same RAG backend, and the same retrieva
- [Project RAG collection](#project-rag-collection)
- [Markdown companion attachment](#markdown-companion-attachment)
- [Deletion lifecycle](#deletion-lifecycle)
- [RAG Collection Lifecycle](#rag-collection-lifecycle)
- [De-indexing inactive conversations](#de-indexing-inactive-conversations)
- [Transparent re-indexing on resume](#transparent-re-indexing-on-resume)
- [Security & Validation](#security--validation)
- [Malware Detection](#malware-detection)
- [Document Processing for LLMs](#document-processing-for-llms)
Expand Down Expand Up @@ -272,6 +275,76 @@ The trade-off accepted on every path: a transient backend hiccup may strand orph

---

## RAG Collection Lifecycle

Every conversation that has indexed text attachments owns a RAG collection in the vector store, identified by `ChatConversation.collection_id`. Long-lived deployments accumulate many idle collections that consume storage and quota. This section describes the two-phase lifecycle: scheduled de-indexing of inactive conversations, and transparent re-indexing when a user resumes one.

### De-indexing inactive conversations

The `deindex_inactive_collections` management command identifies conversations that have been inactive for more than `RAG_COLLECTION_INACTIVITY_DAYS` days and removes their vector store collection.


**What "inactive" means**: `ChatConversation.updated_at < now() - RAG_COLLECTION_INACTIVITY_DAYS days`. Because `reindex_conversation` writes `update_fields=["collection_id", "updated_at"]` on success, a recent re-index resets the inactivity clock — a conversation is not de-indexed again immediately after it was just re-indexed.

**Scheduling**: Run this as a periodic job. A Helm CronJob template is provided (`backend.deindexCronJob`) with `concurrencyPolicy: Forbid` to prevent overlapping runs.

**What is NOT de-indexed**: Project collections are managed separately (their lifecycle is tied to project/attachment delete). Only conversation collections controlled by `ChatConversation.collection_id` are affected.

### Transparent re-indexing on resume

When a user sends a message to a conversation whose `index_state` is `DEINDEXED` or `ERROR` but which has `READY` text attachments, the backend automatically rebuilds the collection before running the agent. This is handled by `reindex_conversation` in `chat/clients/conversation_reindexer.py`.

#### `reindex_conversation` — behaviour summary

An async generator that brings a conversation's RAG collection up to date before the agent runs. It emits a `conversation_resume`
tool-call/result pair so the UI can show progress.

**Claim (concurrency guard)**

Before doing any work it atomically sets `index_state = INDEXING` on the row, but only if the conversation is in a claimable state:

- `DEINDEXED` or `ERROR` → always claimable
- `INDEXING` with `updated_at` older than `REINDEX_CLAIM_TIMEOUT_SECONDS` → stale lock, also claimable

If the row is not updated (another process holds a fresh claim), the generator returns immediately with **no events**.

**Early exits (no events emitted)**

| Condition | New state |
|-----------|-----------|
| No READY attachments | `UNINDEXED` |
| All text attachments are already indexed or in-context | `INDEXED` (if collection exists) / `UNINDEXED` |

**Main path**

1. **Collection**: reuses `conversation.collection_id` if set (so partial-failure retries add only the missing docs to the existing
collection). Creates a new collection otherwise; on creation failure → `ERROR`, error event, return.
2. **Per-attachment loop**: reads the file asynchronously (`asyncio.to_thread`), stores it in the document backend, marks `is_indexed =
True`. Individual failures are caught and collected; the loop always continues.
3. **Final state transition**:
- Zero failures → `index_state = INDEXED`, `collection_id` updated, `{state: "done"}`
- Partial failure → `index_state = ERROR`, `collection_id` updated, `{state: "partial", failed_documents: [...]}`
- Total failure → `index_state = ERROR`, `collection_id` **not** updated (collection is empty), `{state: "error"}`

`ERROR` always triggers a retry on the next request, and because successful attachments have `is_indexed = True`, only the failed ones are
attempted again.

**What gets re-indexed**: Only attachments that are both READY **and** not already inlined as `full-context` in the current LLM context window. Small documents that fit the inlining budget are already readable by the model directly from the system prompt — putting them in the vector store too would be redundant. Only `tool_call_only` attachments (too large to inline) are re-indexed.

**Error states**:

| `result.state` | Meaning | User-visible outcome |
|---|---|---|
| `"done"` | All attachments re-indexed | Silent — loader disappears, conversation continues |
| `"partial"` | Some attachments indexed, some failed | Error modal listing failed filenames — user can re-upload them |
| `"error"` | Collection creation failed **or** all attachments failed | Error modal — RAG tools unavailable for this turn |

**Frontend**: While re-indexing is in progress, `ToolInvocationItem` renders a `ConversationResumeLoader` with a chat-bubble illustration and the copy "Picking up where you left off". Once the `ToolResultPart` arrives, the loader disappears. Errors surface via `setChatErrorModal`.

**Binary attachments** (PDF, images): never re-indexed — `reindex_conversation` only processes `text/*` content types. PDFs are sent directly to the LLM as document URLs; images as presigned `ImageUrl` objects. Neither needs a vector store entry.

---

## Security & Validation

For now, the system is not intended to host user-uploaded files for public download.
Expand Down Expand Up @@ -423,26 +496,26 @@ Notes:

#### Inlining policy and FIFO eviction

The decision of which documents are inlined as `full-context` vs left as `tool_call_only` is made by `chat/document_context_builder.py:build_document_context_instruction` on each turn:
The decision of which documents are inlined as `full-context` vs left as `tool_call_only` is made by `chat/document_context_builder.py:build_documents_listing` on each turn (called via `_build_document_context_instruction` in `chat/clients/pydantic_ai.py`):

1. Compute the `document_budget` in tokens:
```text
document_budget = max(int(model.max_token_context * DOCUMENT_CONTEXT_BUDGET_RATIO)
- DOCUMENT_CONTEXT_SECURITY_BUFFER_TOKENS, 0)
```
2. Iterate documents oldest-first. For each document:
2. Load all text attachments from object storage **in parallel** (`asyncio.gather`). Attachments that fail to load are marked `tool_call_only` with their failure logged; other documents are not affected.
3. Iterate documents oldest-first (`order_by("created_at", "id")`). For each document:
- If its token count exceeds the whole budget alone → keep `tool_call_only`.
- Otherwise, while adding it would overflow the budget, **evict the oldest currently-inlined document** (FIFO): demote it to `tool_call_only`, free its tokens.
- Once it fits, mark it `full-context` and inline its content.
3. Edge cases:
4. Edge cases:
- If the model has no `max_token_context` configured → all documents stay `tool_call_only` (warning logged).
- If `DOCUMENT_CONTEXT_BUDGET_RATIO` is `0` → all documents stay `tool_call_only`.
- If reading an attachment from object storage fails → that document stays `tool_call_only` and the failure is logged; other documents are not affected.

Token estimation uses `tiktoken` with the `cl100k_base` encoding (GPT-4 tokenizer). For non-OpenAI models (Mistral, Llama, Anthropic) actual usage may run 5-15% higher; the security buffer absorbs that drift.

The assembled instruction is **cached** per turn keyed on:
`conversation_id`, `user_id`, `model_hrid`, `model.max_token_context`, `DOCUMENT_CONTEXT_BUDGET_RATIO`, `DOCUMENT_CONTEXT_SECURITY_BUFFER_TOKENS`, and a fingerprint of `(attachment.id, attachment.updated_at)` for every text attachment - **conversation and project text attachments both contribute to the fingerprint**. Any attachment add / remove / edit (including project files), or any settings change, invalidates the cache. TTL is 30 minutes (`CACHE_TIMEOUT`).
The assembled listing is **cached** per turn (in `_build_documents_listing`, `pydantic_ai.py`) keyed on:
`conversation_id`, `user_id`, `model_hrid`, `model.max_token_context`, `DOCUMENT_CONTEXT_BUDGET_RATIO`, `DOCUMENT_CONTEXT_SECURITY_BUFFER_TOKENS`, and a fingerprint of `(attachment.id, attachment.updated_at)` for every text attachment **conversation and project text attachments both contribute to the fingerprint**. Any attachment add / remove / edit (including project files), or any settings change, invalidates the cache. TTL is 30 minutes (`CACHE_TIMEOUT`).

#### Targeted document operations (`document_id`)

Expand Down Expand Up @@ -524,6 +597,7 @@ A `READY` attachment whose `rag_document_id` is null (e.g. parse succeeded but t
| `PROJECT_IMAGES_MAX_COUNT` | `3` | Max image attachments per project. Enforced at upload-time. Bounds per-turn vision token cost - every project image is pinned to every turn alongside conversation-message images, and provider request-level image caps (Anthropic ~20/request) clip the trailing entries first. |
| `DOCUMENT_CONTEXT_BUDGET_RATIO` | `0.5` | Fraction of `model.max_token_context` reserved for inlined documents (0 disables full-context inlining; everything stays `tool_call_only`) |
| `DOCUMENT_CONTEXT_SECURITY_BUFFER_TOKENS` | `1000` | Tokens subtracted from the inlining budget to absorb tokenizer drift on non-OpenAI models |
| `RAG_COLLECTION_INACTIVITY_DAYS` | `30` | Conversations inactive for this many days have their RAG collection de-indexed by `deindex_inactive_collections`. Resets on re-index. |

#### RAG_FILES_ACCEPTED_FORMATS

Expand Down
175 changes: 175 additions & 0 deletions src/backend/chat/clients/conversation_reindexer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""Standalone async generator for re-indexing a conversation's RAG collection."""

import asyncio
import logging
import uuid
from datetime import timedelta
from typing import AsyncGenerator

from django.conf import settings
from django.core.files.storage import default_storage
from django.db.models import Q
from django.utils import timezone
from django.utils.module_loading import import_string

from core.file_upload.enums import AttachmentStatus

from chat import models
from chat.enums import CollectionIndexState
from chat.vercel_ai_sdk.core import events_v4

logger = logging.getLogger(__name__)
document_store_backend = import_string(settings.RAG_DOCUMENT_SEARCH_BACKEND)


async def _read_attachment_bytes(key: str) -> bytes:
def _read():
with default_storage.open(key, "rb") as f:
return f.read()

return await asyncio.to_thread(_read)


async def reindex_conversation(

Check failure on line 33 in src/backend/chat/clients/conversation_reindexer.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 18 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=suitenumerique_conversations&issues=AZ5o5fBhESpfEprJNW45&open=AZ5o5fBhESpfEprJNW45&pullRequest=441
conversation: models.ChatConversation,
in_context_ids: set[str],
) -> AsyncGenerator[events_v4.Event, None]:
"""
Re-index READY attachments not already inlined in the context window.

Only `tool_call_only` attachments (too large for context) need to be in the
vector store; `full-context` attachments are already readable by the model.

Emits a ToolCallPart/ToolResultPart pair so the UI shows progress.
On collection creation failure: logs and returns without RAG (conversation continues).
On individual attachment failure: logs and continues with remaining attachments.
"""
timeout = timedelta(seconds=settings.REINDEX_CLAIM_TIMEOUT_SECONDS)
claimed = await models.ChatConversation.objects.filter(
Q(
pk=conversation.pk,
index_state__in=[
CollectionIndexState.DEINDEXED,
CollectionIndexState.ERROR,
],
)
| Q(
pk=conversation.pk,
index_state=CollectionIndexState.INDEXING,
updated_at__lt=timezone.now() - timeout,
)
).aupdate(index_state=CollectionIndexState.INDEXING, updated_at=timezone.now())
if not claimed:
Comment thread
maxenceh marked this conversation as resolved.
return

ready_attachments = [
attachment
async for attachment in models.ChatConversationAttachment.objects.filter(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rag_document_id should be updated (it has been added on ChatConversationAttachment since this code was written )

conversation=conversation,
upload_state=AttachmentStatus.READY,
)
]

if not ready_attachments:
await models.ChatConversation.objects.filter(pk=conversation.pk).aupdate(
index_state=CollectionIndexState.UNINDEXED,
updated_at=timezone.now(),
)
return
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
maxenceh marked this conversation as resolved.

text_attachments_to_reindex = [
a
for a in ready_attachments
if a.content_type.startswith("text/")
and str(a.id) not in in_context_ids
and not a.is_indexed
]

if not text_attachments_to_reindex:
new_state = (
CollectionIndexState.INDEXED
if conversation.collection_id
else CollectionIndexState.UNINDEXED
)
await models.ChatConversation.objects.filter(pk=conversation.pk).aupdate(
index_state=new_state,
updated_at=timezone.now(),
)
return

_tool_call_id = str(uuid.uuid4())
yield events_v4.ToolCallPart(
tool_call_id=_tool_call_id,
tool_name="conversation_resume",
args={},
)

# Reuse existing collection if available so partial-failure retries add only
# the missing documents rather than rebuilding from scratch.
existing_collection_id = conversation.collection_id
document_store = document_store_backend(collection_id=existing_collection_id)
if not existing_collection_id:
try:
await document_store.acreate_collection(
name=f"conversation-{conversation.pk}",
)
except Exception: # pylint: disable=broad-except
logger.exception("Failed to create collection for conversation %s", conversation.pk)
await models.ChatConversation.objects.filter(pk=conversation.pk).aupdate(
index_state=CollectionIndexState.ERROR,
collection_id=None,
updated_at=timezone.now(),
)
await models.ChatConversationAttachment.objects.filter(
conversation=conversation,
).aupdate(is_indexed=False)
yield events_v4.ToolResultPart(
tool_call_id=_tool_call_id,
result={"state": "error", "error": "Documents could not be re-indexed."},
)
return

failed_documents = []
for attachment in text_attachments_to_reindex:
try:
content = await _read_attachment_bytes(attachment.key)
rag_document_id = await asyncio.to_thread(
document_store.store_document,
name=attachment.file_name.removesuffix(".md"),
content=content.decode("utf-8"),
)
await models.ChatConversationAttachment.objects.filter(pk=attachment.pk).aupdate(
is_indexed=True,
rag_document_id=rag_document_id or None,
)
except Exception: # pylint: disable=broad-except
failed_documents.append(attachment.file_name)
logger.exception(
"Failed to re-index attachment %s for conversation %s",
attachment.pk,
conversation.pk,
)

any_failed = bool(failed_documents)
all_failed = len(failed_documents) == len(text_attachments_to_reindex)

update_fields = {
"index_state": CollectionIndexState.ERROR if any_failed else CollectionIndexState.INDEXED,
"updated_at": timezone.now(),
}

update_fields["collection_id"] = str(document_store.collection_id)
if all_failed:
result = {"state": "error", "error": "Documents could not be re-indexed."}
else:
result = (
{"state": "partial", "failed_documents": failed_documents}
if failed_documents
else {"state": "done"}
)

await models.ChatConversation.objects.filter(pk=conversation.pk).aupdate(**update_fields)
yield events_v4.ToolResultPart(
tool_call_id=_tool_call_id,
result=result,
)
Loading
Loading