diff --git a/filter/src/builtins/http/ai/store/mod.rs b/filter/src/builtins/http/ai/store/mod.rs index 8feeb0e3..d621de78 100644 --- a/filter/src/builtins/http/ai/store/mod.rs +++ b/filter/src/builtins/http/ai/store/mod.rs @@ -40,8 +40,8 @@ pub(crate) use schemas::{validate_identifier as validate_table_identifier, valid pub use self::{ postgres::{PostgresResponseStore, SslMode}, sqlite::SqliteResponseStore, - trait_def::ResponseStore, - types::{ConversationRecord, ResponseRecord, StoreError}, + trait_def::{ConversationItemStore, ResponseStore}, + types::{ConversationItemRecord, ConversationRecord, ResponseRecord, StoreError}, }; // ----------------------------------------------------------------------------- diff --git a/filter/src/builtins/http/ai/store/postgres.rs b/filter/src/builtins/http/ai/store/postgres.rs index 425e56fc..57031f0e 100644 --- a/filter/src/builtins/http/ai/store/postgres.rs +++ b/filter/src/builtins/http/ai/store/postgres.rs @@ -15,8 +15,8 @@ use tracing::info; use super::{ schemas::{TableNames, generate_ddl, validate_postgres_identifiers}, - trait_def::ResponseStore, - types::{ConversationRecord, ResponseRecord, StoreError}, + trait_def::{ConversationItemStore, ResponseStore}, + types::{ConversationItemRecord, ConversationRecord, ResponseRecord, StoreError}, }; // ----------------------------------------------------------------------------- @@ -282,20 +282,275 @@ impl ResponseStore for PostgresResponseStore { } async fn delete_conversation(&self, tenant_id: &str, conversation_id: &str) -> Result { + let mut tx = self + .pool + .begin() + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + + if let Some(items_table) = &self.tables.items { + let items_sql = format!("DELETE FROM {items_table} WHERE tenant_id = $1 AND conversation_id = $2"); + sqlx::query(&items_sql) + .bind(tenant_id) + .bind(conversation_id) + .execute(&mut *tx) + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + } + let sql = format!( "DELETE FROM {} WHERE conversation_id = $1 AND tenant_id = $2", self.tables.conversations ); - let result = sqlx::query(&sql) .bind(conversation_id) .bind(tenant_id) + .execute(&mut *tx) + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + + tx.commit().await.map_err(|e| StoreError::Database(e.to_string()))?; + Ok(result.rows_affected() > 0) + } +} + +#[expect( + clippy::too_many_lines, + reason = "async_trait counts the store method group as one expansion" +)] +#[async_trait] +impl ConversationItemStore for PostgresResponseStore { + async fn create_conversation_items(&self, items: &[ConversationItemRecord]) -> Result<(), StoreError> { + let table = self + .tables + .items + .as_deref() + .ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?; + + let mut tx = self + .pool + .begin() + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + + for item in items { + let item_data = + serde_json::to_string(&item.item_data).map_err(|e| StoreError::Serialization(e.to_string()))?; + + let sql = format!( + "INSERT INTO {table} \ + (item_id, tenant_id, conversation_id, item_data, created_at, position) \ + VALUES ($1, $2, $3, $4, $5, $6) \ + ON CONFLICT(item_id, tenant_id) DO UPDATE SET \ + conversation_id = EXCLUDED.conversation_id, \ + item_data = EXCLUDED.item_data, \ + created_at = EXCLUDED.created_at, \ + position = EXCLUDED.position" + ); + + sqlx::query(&sql) + .bind(&item.item_id) + .bind(&item.tenant_id) + .bind(&item.conversation_id) + .bind(&item_data) + .bind(item.created_at) + .bind(item.position) + .execute(&mut *tx) + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + } + + tx.commit().await.map_err(|e| StoreError::Database(e.to_string()))?; + Ok(()) + } + + async fn list_conversation_items( + &self, + tenant_id: &str, + conversation_id: &str, + after_item_id: Option<&str>, + limit: u32, + ascending: bool, + ) -> Result, StoreError> { + let table = self + .tables + .items + .as_deref() + .ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?; + + let direction = if ascending { "ASC" } else { "DESC" }; + let cursor_operator = if ascending { ">" } else { "<" }; + + let rows = if let Some(item_id) = after_item_id { + let Some(position) = self + .conversation_item_position(tenant_id, conversation_id, item_id) + .await? + else { + return Ok(Vec::new()); + }; + let sql = format!( + "SELECT item_id, tenant_id, conversation_id, item_data, created_at, position \ + FROM {table} \ + WHERE tenant_id = $1 AND conversation_id = $2 \ + AND (position {cursor_operator} $3 \ + OR (position = $4 AND item_id {cursor_operator} $5)) \ + ORDER BY position {direction}, item_id {direction} \ + LIMIT $6" + ); + sqlx::query(&sql) + .bind(tenant_id) + .bind(conversation_id) + .bind(position) + .bind(position) + .bind(item_id) + .bind(i64::from(limit)) + .fetch_all(&self.pool) + .await + .map_err(|e| StoreError::Database(e.to_string()))? + } else { + let sql = format!( + "SELECT item_id, tenant_id, conversation_id, item_data, created_at, position \ + FROM {table} \ + WHERE tenant_id = $1 AND conversation_id = $2 \ + ORDER BY position {direction}, item_id {direction} \ + LIMIT $3" + ); + sqlx::query(&sql) + .bind(tenant_id) + .bind(conversation_id) + .bind(i64::from(limit)) + .fetch_all(&self.pool) + .await + .map_err(|e| StoreError::Database(e.to_string()))? + }; + + rows.iter().map(row_to_conversation_item_record).collect() + } + + async fn get_conversation_item( + &self, + tenant_id: &str, + conversation_id: &str, + item_id: &str, + ) -> Result, StoreError> { + let table = self + .tables + .items + .as_deref() + .ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?; + + let sql = format!( + "SELECT item_id, tenant_id, conversation_id, item_data, created_at, position \ + FROM {table} \ + WHERE item_id = $1 AND tenant_id = $2 AND conversation_id = $3" + ); + + let row = sqlx::query(&sql) + .bind(item_id) + .bind(tenant_id) + .bind(conversation_id) + .fetch_optional(&self.pool) + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + + row.map(|r| row_to_conversation_item_record(&r)).transpose() + } + + async fn delete_conversation_item( + &self, + tenant_id: &str, + conversation_id: &str, + item_id: &str, + ) -> Result { + let table = self + .tables + .items + .as_deref() + .ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?; + + let sql = format!("DELETE FROM {table} WHERE item_id = $1 AND tenant_id = $2 AND conversation_id = $3"); + + let result = sqlx::query(&sql) + .bind(item_id) + .bind(tenant_id) + .bind(conversation_id) .execute(&self.pool) .await .map_err(|e| StoreError::Database(e.to_string()))?; Ok(result.rows_affected() > 0) } + + async fn conversation_item_position( + &self, + tenant_id: &str, + conversation_id: &str, + item_id: &str, + ) -> Result, StoreError> { + let table = self + .tables + .items + .as_deref() + .ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?; + + let sql = format!( + "SELECT position FROM {table} \ + WHERE item_id = $1 AND tenant_id = $2 AND conversation_id = $3" + ); + + let row = sqlx::query(&sql) + .bind(item_id) + .bind(tenant_id) + .bind(conversation_id) + .fetch_optional(&self.pool) + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + + row.map(|r| r.try_get("position").map_err(|e| StoreError::Database(e.to_string()))) + .transpose() + } + + async fn max_item_position(&self, tenant_id: &str, conversation_id: &str) -> Result { + let table = self + .tables + .items + .as_deref() + .ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?; + + let sql = format!( + "SELECT COALESCE(MAX(position), 0) AS max_pos \ + FROM {table} \ + WHERE tenant_id = $1 AND conversation_id = $2" + ); + + let row = sqlx::query(&sql) + .bind(tenant_id) + .bind(conversation_id) + .fetch_one(&self.pool) + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + + row.try_get("max_pos").map_err(|e| StoreError::Database(e.to_string())) + } + + async fn delete_conversation_items(&self, tenant_id: &str, conversation_id: &str) -> Result<(), StoreError> { + let table = self + .tables + .items + .as_deref() + .ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?; + + let sql = format!("DELETE FROM {table} WHERE tenant_id = $1 AND conversation_id = $2"); + + sqlx::query(&sql) + .bind(tenant_id) + .bind(conversation_id) + .execute(&self.pool) + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + + Ok(()) + } } // ----------------------------------------------------------------------------- @@ -328,6 +583,32 @@ fn row_to_response_record(row: &PgRow) -> Result { }) } +/// Convert a sqlx row to a [`ConversationItemRecord`]. +fn row_to_conversation_item_record(row: &PgRow) -> Result { + let item_data_json: String = row + .try_get("item_data") + .map_err(|e| StoreError::Database(e.to_string()))?; + + Ok(ConversationItemRecord { + item_id: row + .try_get("item_id") + .map_err(|e| StoreError::Database(e.to_string()))?, + tenant_id: row + .try_get("tenant_id") + .map_err(|e| StoreError::Database(e.to_string()))?, + conversation_id: row + .try_get("conversation_id") + .map_err(|e| StoreError::Database(e.to_string()))?, + item_data: serde_json::from_str(&item_data_json).map_err(|e| StoreError::Serialization(e.to_string()))?, + created_at: row + .try_get("created_at") + .map_err(|e| StoreError::Database(e.to_string()))?, + position: row + .try_get("position") + .map_err(|e| StoreError::Database(e.to_string()))?, + }) +} + /// Convert a sqlx row to a [`ConversationRecord`]. fn row_to_conversation_record(row: &PgRow) -> Result { let messages_json: String = row diff --git a/filter/src/builtins/http/ai/store/sqlite.rs b/filter/src/builtins/http/ai/store/sqlite.rs index 1e503c5b..35d11601 100644 --- a/filter/src/builtins/http/ai/store/sqlite.rs +++ b/filter/src/builtins/http/ai/store/sqlite.rs @@ -12,8 +12,8 @@ use tracing::info; use super::{ schemas::{TableNames, generate_ddl}, - trait_def::ResponseStore, - types::{ConversationRecord, ResponseRecord, StoreError}, + trait_def::{ConversationItemStore, ResponseStore}, + types::{ConversationItemRecord, ConversationRecord, ResponseRecord, StoreError}, }; // ----------------------------------------------------------------------------- @@ -221,20 +221,275 @@ impl ResponseStore for SqliteResponseStore { } async fn delete_conversation(&self, tenant_id: &str, conversation_id: &str) -> Result { + let mut tx = self + .pool + .begin() + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + + if let Some(items_table) = &self.tables.items { + let items_sql = format!("DELETE FROM {items_table} WHERE tenant_id = ? AND conversation_id = ?"); + sqlx::query(&items_sql) + .bind(tenant_id) + .bind(conversation_id) + .execute(&mut *tx) + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + } + let sql = format!( "DELETE FROM {} WHERE conversation_id = ? AND tenant_id = ?", self.tables.conversations ); - let result = sqlx::query(&sql) .bind(conversation_id) .bind(tenant_id) + .execute(&mut *tx) + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + + tx.commit().await.map_err(|e| StoreError::Database(e.to_string()))?; + Ok(result.rows_affected() > 0) + } +} + +#[expect( + clippy::too_many_lines, + reason = "async_trait counts the store method group as one expansion" +)] +#[async_trait] +impl ConversationItemStore for SqliteResponseStore { + async fn create_conversation_items(&self, items: &[ConversationItemRecord]) -> Result<(), StoreError> { + let table = self + .tables + .items + .as_deref() + .ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?; + + let mut tx = self + .pool + .begin() + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + + for item in items { + let item_data = + serde_json::to_string(&item.item_data).map_err(|e| StoreError::Serialization(e.to_string()))?; + + let sql = format!( + "INSERT INTO {table} \ + (item_id, tenant_id, conversation_id, item_data, created_at, position) \ + VALUES (?, ?, ?, ?, ?, ?) \ + ON CONFLICT(item_id, tenant_id) DO UPDATE SET \ + conversation_id = excluded.conversation_id, \ + item_data = excluded.item_data, \ + created_at = excluded.created_at, \ + position = excluded.position" + ); + + sqlx::query(&sql) + .bind(&item.item_id) + .bind(&item.tenant_id) + .bind(&item.conversation_id) + .bind(&item_data) + .bind(item.created_at) + .bind(item.position) + .execute(&mut *tx) + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + } + + tx.commit().await.map_err(|e| StoreError::Database(e.to_string()))?; + Ok(()) + } + + async fn list_conversation_items( + &self, + tenant_id: &str, + conversation_id: &str, + after_item_id: Option<&str>, + limit: u32, + ascending: bool, + ) -> Result, StoreError> { + let table = self + .tables + .items + .as_deref() + .ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?; + + let direction = if ascending { "ASC" } else { "DESC" }; + let cursor_operator = if ascending { ">" } else { "<" }; + + let rows = if let Some(item_id) = after_item_id { + let Some(position) = self + .conversation_item_position(tenant_id, conversation_id, item_id) + .await? + else { + return Ok(Vec::new()); + }; + let sql = format!( + "SELECT item_id, tenant_id, conversation_id, item_data, created_at, position \ + FROM {table} \ + WHERE tenant_id = ? AND conversation_id = ? \ + AND (position {cursor_operator} ? \ + OR (position = ? AND item_id {cursor_operator} ?)) \ + ORDER BY position {direction}, item_id {direction} \ + LIMIT ?" + ); + sqlx::query(&sql) + .bind(tenant_id) + .bind(conversation_id) + .bind(position) + .bind(position) + .bind(item_id) + .bind(limit) + .fetch_all(&self.pool) + .await + .map_err(|e| StoreError::Database(e.to_string()))? + } else { + let sql = format!( + "SELECT item_id, tenant_id, conversation_id, item_data, created_at, position \ + FROM {table} \ + WHERE tenant_id = ? AND conversation_id = ? \ + ORDER BY position {direction}, item_id {direction} \ + LIMIT ?" + ); + sqlx::query(&sql) + .bind(tenant_id) + .bind(conversation_id) + .bind(limit) + .fetch_all(&self.pool) + .await + .map_err(|e| StoreError::Database(e.to_string()))? + }; + + rows.iter().map(row_to_conversation_item_record).collect() + } + + async fn get_conversation_item( + &self, + tenant_id: &str, + conversation_id: &str, + item_id: &str, + ) -> Result, StoreError> { + let table = self + .tables + .items + .as_deref() + .ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?; + + let sql = format!( + "SELECT item_id, tenant_id, conversation_id, item_data, created_at, position \ + FROM {table} \ + WHERE item_id = ? AND tenant_id = ? AND conversation_id = ?" + ); + + let row = sqlx::query(&sql) + .bind(item_id) + .bind(tenant_id) + .bind(conversation_id) + .fetch_optional(&self.pool) + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + + row.map(|r| row_to_conversation_item_record(&r)).transpose() + } + + async fn delete_conversation_item( + &self, + tenant_id: &str, + conversation_id: &str, + item_id: &str, + ) -> Result { + let table = self + .tables + .items + .as_deref() + .ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?; + + let sql = format!("DELETE FROM {table} WHERE item_id = ? AND tenant_id = ? AND conversation_id = ?"); + + let result = sqlx::query(&sql) + .bind(item_id) + .bind(tenant_id) + .bind(conversation_id) .execute(&self.pool) .await .map_err(|e| StoreError::Database(e.to_string()))?; Ok(result.rows_affected() > 0) } + + async fn conversation_item_position( + &self, + tenant_id: &str, + conversation_id: &str, + item_id: &str, + ) -> Result, StoreError> { + let table = self + .tables + .items + .as_deref() + .ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?; + + let sql = format!( + "SELECT position FROM {table} \ + WHERE item_id = ? AND tenant_id = ? AND conversation_id = ?" + ); + + let row = sqlx::query(&sql) + .bind(item_id) + .bind(tenant_id) + .bind(conversation_id) + .fetch_optional(&self.pool) + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + + row.map(|r| r.try_get("position").map_err(|e| StoreError::Database(e.to_string()))) + .transpose() + } + + async fn max_item_position(&self, tenant_id: &str, conversation_id: &str) -> Result { + let table = self + .tables + .items + .as_deref() + .ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?; + + let sql = format!( + "SELECT COALESCE(MAX(position), 0) AS max_pos \ + FROM {table} \ + WHERE tenant_id = ? AND conversation_id = ?" + ); + + let row = sqlx::query(&sql) + .bind(tenant_id) + .bind(conversation_id) + .fetch_one(&self.pool) + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + + row.try_get("max_pos").map_err(|e| StoreError::Database(e.to_string())) + } + + async fn delete_conversation_items(&self, tenant_id: &str, conversation_id: &str) -> Result<(), StoreError> { + let table = self + .tables + .items + .as_deref() + .ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?; + + let sql = format!("DELETE FROM {table} WHERE tenant_id = ? AND conversation_id = ?"); + + sqlx::query(&sql) + .bind(tenant_id) + .bind(conversation_id) + .execute(&self.pool) + .await + .map_err(|e| StoreError::Database(e.to_string()))?; + + Ok(()) + } } // ----------------------------------------------------------------------------- @@ -267,6 +522,32 @@ fn row_to_response_record(row: &sqlx::sqlite::SqliteRow) -> Result Result { + let item_data_json: String = row + .try_get("item_data") + .map_err(|e| StoreError::Database(e.to_string()))?; + + Ok(ConversationItemRecord { + item_id: row + .try_get("item_id") + .map_err(|e| StoreError::Database(e.to_string()))?, + tenant_id: row + .try_get("tenant_id") + .map_err(|e| StoreError::Database(e.to_string()))?, + conversation_id: row + .try_get("conversation_id") + .map_err(|e| StoreError::Database(e.to_string()))?, + item_data: serde_json::from_str(&item_data_json).map_err(|e| StoreError::Serialization(e.to_string()))?, + created_at: row + .try_get("created_at") + .map_err(|e| StoreError::Database(e.to_string()))?, + position: row + .try_get("position") + .map_err(|e| StoreError::Database(e.to_string()))?, + }) +} + /// Convert a sqlx row to a [`ConversationRecord`]. fn row_to_conversation_record(row: &sqlx::sqlite::SqliteRow) -> Result { let messages_json: String = row diff --git a/filter/src/builtins/http/ai/store/tests.rs b/filter/src/builtins/http/ai/store/tests.rs index 8eb1632b..dfaaf291 100644 --- a/filter/src/builtins/http/ai/store/tests.rs +++ b/filter/src/builtins/http/ai/store/tests.rs @@ -8,8 +8,9 @@ use std::sync::Arc; use serde_json::json; use super::{ - ConversationRecord, PostgresResponseStore, ResponseRecord, ResponseStoreRegistry, SqliteResponseStore, SslMode, - StoreError, trait_def::ResponseStore, + ConversationItemRecord, ConversationRecord, PostgresResponseStore, ResponseRecord, ResponseStoreRegistry, + SqliteResponseStore, SslMode, StoreError, + trait_def::{ConversationItemStore as _, ResponseStore}, }; use crate::builtins::http::ai::openai::responses::store::{ListParams, Order, list_input_items}; @@ -592,6 +593,367 @@ async fn delete_conversation_tenant_isolation() { ); } +// ----------------------------------------------------------------------------- +// Conversation Item CRUD (SQLite) +// ----------------------------------------------------------------------------- + +#[tokio::test] +async fn conversation_items_paginate_ascending_and_descending() { + let store = make_store_with_items().await; + let items = [ + make_conversation_item("item_1", "tenant_a", "conv_1", 1), + make_conversation_item("item_2", "tenant_a", "conv_1", 2), + make_conversation_item("item_3", "tenant_a", "conv_1", 3), + make_conversation_item("item_4", "tenant_a", "conv_1", 4), + ]; + store + .create_conversation_items(&items) + .await + .expect("item insert should succeed"); + + let asc = store + .list_conversation_items("tenant_a", "conv_1", None, 2, true) + .await + .expect("ascending list should succeed"); + assert_item_ids(&asc, &["item_1", "item_2"]); + + let asc_page2 = store + .list_conversation_items("tenant_a", "conv_1", Some("item_2"), 2, true) + .await + .expect("ascending page 2 should succeed"); + assert_item_ids(&asc_page2, &["item_3", "item_4"]); + + let desc = store + .list_conversation_items("tenant_a", "conv_1", None, 2, false) + .await + .expect("descending list should succeed"); + assert_item_ids(&desc, &["item_4", "item_3"]); + + let desc_page2 = store + .list_conversation_items("tenant_a", "conv_1", Some("item_3"), 2, false) + .await + .expect("descending page 2 should succeed"); + assert_item_ids(&desc_page2, &["item_2", "item_1"]); +} + +#[tokio::test] +async fn conversation_items_paginate_duplicate_positions() { + let store = make_store_with_items().await; + let items = [ + make_conversation_item("item_a", "tenant_a", "conv_1", 1), + make_conversation_item("item_b", "tenant_a", "conv_1", 1), + make_conversation_item("item_c", "tenant_a", "conv_1", 1), + make_conversation_item("item_d", "tenant_a", "conv_1", 2), + ]; + store + .create_conversation_items(&items) + .await + .expect("item insert should succeed"); + + let page1 = store + .list_conversation_items("tenant_a", "conv_1", None, 2, true) + .await + .expect("page 1 should succeed"); + assert_item_ids(&page1, &["item_a", "item_b"]); + + let page2 = store + .list_conversation_items("tenant_a", "conv_1", Some("item_b"), 2, true) + .await + .expect("page 2 should succeed"); + assert_item_ids(&page2, &["item_c", "item_d"]); +} + +#[tokio::test] +async fn conversation_item_single_ops_scope_to_conversation() { + let store = make_store_with_items().await; + let item_conv1 = make_conversation_item("item_1", "tenant_a", "conv_1", 1); + let item_conv2 = make_conversation_item("item_2", "tenant_a", "conv_2", 1); + store + .create_conversation_items(&[item_conv1, item_conv2]) + .await + .expect("item insert should succeed"); + + let get_wrong_conv = store + .get_conversation_item("tenant_a", "conv_2", "item_1") + .await + .expect("get should succeed"); + assert!(get_wrong_conv.is_none(), "item_1 should not be visible in conv_2"); + + let delete_wrong_conv = store + .delete_conversation_item("tenant_a", "conv_2", "item_1") + .await + .expect("delete should succeed"); + assert!(!delete_wrong_conv, "deleting item_1 from conv_2 should return false"); + + let still_exists = store + .get_conversation_item("tenant_a", "conv_1", "item_1") + .await + .expect("get should succeed"); + assert!(still_exists.is_some(), "item_1 should still exist in conv_1"); +} + +#[tokio::test] +async fn max_item_position_returns_zero_when_empty() { + let store = make_store_with_items().await; + let max = store + .max_item_position("tenant_a", "conv_1") + .await + .expect("max_item_position should succeed"); + assert_eq!(max, 0, "empty conversation should have max position 0"); +} + +#[tokio::test] +async fn max_item_position_returns_highest() { + let store = make_store_with_items().await; + let items = [ + make_conversation_item("item_1", "tenant_a", "conv_1", 5), + make_conversation_item("item_2", "tenant_a", "conv_1", 10), + make_conversation_item("item_3", "tenant_a", "conv_1", 3), + ]; + store + .create_conversation_items(&items) + .await + .expect("item insert should succeed"); + + let max = store + .max_item_position("tenant_a", "conv_1") + .await + .expect("max_item_position should succeed"); + assert_eq!(max, 10, "max position should be 10"); +} + +#[tokio::test] +async fn delete_conversation_items_removes_all() { + let store = make_store_with_items().await; + let items = [ + make_conversation_item("item_1", "tenant_a", "conv_1", 1), + make_conversation_item("item_2", "tenant_a", "conv_1", 2), + ]; + store + .create_conversation_items(&items) + .await + .expect("item insert should succeed"); + + store + .delete_conversation_items("tenant_a", "conv_1") + .await + .expect("delete_conversation_items should succeed"); + + let remaining = store + .list_conversation_items("tenant_a", "conv_1", None, 100, true) + .await + .expect("list should succeed"); + assert!(remaining.is_empty(), "all items should be deleted"); +} + +#[tokio::test] +async fn conversation_item_tenant_isolation() { + let store = make_store_with_items().await; + let item = make_conversation_item("item_1", "tenant_a", "conv_1", 1); + store + .create_conversation_items(&[item]) + .await + .expect("item insert should succeed"); + + let cross_tenant = store + .get_conversation_item("tenant_b", "conv_1", "item_1") + .await + .expect("cross-tenant get should succeed"); + assert!(cross_tenant.is_none(), "tenant_b should not see tenant_a items"); + + let cross_tenant_list = store + .list_conversation_items("tenant_b", "conv_1", None, 100, true) + .await + .expect("cross-tenant list should succeed"); + assert!(cross_tenant_list.is_empty(), "tenant_b should see no items"); +} + +#[tokio::test] +async fn conversation_item_upsert_updates_existing() { + let store = make_store_with_items().await; + let original = make_conversation_item("item_1", "tenant_a", "conv_1", 1); + let updated = ConversationItemRecord { + item_data: json!({"type": "message", "role": "assistant", "content": "updated"}), + created_at: 2000, + position: 2, + ..make_conversation_item("item_1", "tenant_a", "conv_1", 1) + }; + + store + .create_conversation_items(&[original]) + .await + .expect("initial item insert should succeed"); + store + .create_conversation_items(&[updated]) + .await + .expect("duplicate item insert should upsert"); + + let fetched = store + .get_conversation_item("tenant_a", "conv_1", "item_1") + .await + .expect("get should succeed") + .expect("item should exist after upsert"); + + assert_eq!(fetched.position, 2, "upsert should update position"); + assert_eq!(fetched.created_at, 2000, "upsert should update created_at"); + assert_eq!( + fetched.item_data, + json!({"type": "message", "role": "assistant", "content": "updated"}), + "upsert should update item data" + ); +} + +#[tokio::test] +async fn get_conversation_item_returns_all_fields() { + let store = make_store_with_items().await; + let item = ConversationItemRecord { + item_id: "item_99".to_owned(), + tenant_id: "tenant_a".to_owned(), + conversation_id: "conv_1".to_owned(), + item_data: json!({"type": "function_call", "name": "search"}), + created_at: 5000, + position: 42, + }; + store + .create_conversation_items(&[item]) + .await + .expect("item insert should succeed"); + + let fetched = store + .get_conversation_item("tenant_a", "conv_1", "item_99") + .await + .expect("get should succeed") + .expect("item should exist"); + + assert_eq!(fetched.item_id, "item_99", "item_id should match"); + assert_eq!(fetched.tenant_id, "tenant_a", "tenant_id should match"); + assert_eq!(fetched.conversation_id, "conv_1", "conversation_id should match"); + assert_eq!( + fetched.item_data, + json!({"type": "function_call", "name": "search"}), + "item_data should round-trip" + ); + assert_eq!(fetched.created_at, 5000, "created_at should match"); + assert_eq!(fetched.position, 42, "position should match"); +} + +#[tokio::test] +async fn list_conversation_items_nonexistent_cursor_returns_empty() { + let store = make_store_with_items().await; + let item = make_conversation_item("item_1", "tenant_a", "conv_1", 1); + store + .create_conversation_items(&[item]) + .await + .expect("item insert should succeed"); + + let result = store + .list_conversation_items("tenant_a", "conv_1", Some("nonexistent"), 10, true) + .await + .expect("list with nonexistent cursor should succeed"); + + assert!(result.is_empty(), "nonexistent cursor item should return empty list"); +} + +#[tokio::test] +async fn delete_conversation_cascades_to_items() { + let store = make_store_with_items().await; + let conv = ConversationRecord { + conversation_id: "conv_1".to_owned(), + tenant_id: "tenant_a".to_owned(), + created_at: 1000, + metadata: json!({}), + messages: json!([]), + }; + store + .upsert_conversation(&conv) + .await + .expect("conversation upsert should succeed"); + + let items = [ + make_conversation_item("item_1", "tenant_a", "conv_1", 1), + make_conversation_item("item_2", "tenant_a", "conv_1", 2), + ]; + store + .create_conversation_items(&items) + .await + .expect("item insert should succeed"); + + let deleted = store + .delete_conversation("tenant_a", "conv_1") + .await + .expect("delete_conversation should succeed"); + assert!(deleted, "conversation should have been deleted"); + + let remaining = store + .list_conversation_items("tenant_a", "conv_1", None, 100, true) + .await + .expect("list should succeed"); + assert!( + remaining.is_empty(), + "items should be cascade-deleted with conversation" + ); +} + +#[tokio::test] +async fn conversation_item_methods_fail_without_items_table() { + let store = make_store().await; + let item = make_conversation_item("item_1", "tenant_a", "conv_1", 1); + + let err = store.create_conversation_items(&[item]).await.unwrap_err(); + assert!( + matches!(err, StoreError::Unavailable(_)), + "create should return Unavailable" + ); + + let err = store + .list_conversation_items("tenant_a", "conv_1", None, 10, true) + .await + .unwrap_err(); + assert!( + matches!(err, StoreError::Unavailable(_)), + "list should return Unavailable" + ); + + let err = store + .get_conversation_item("tenant_a", "conv_1", "item_1") + .await + .unwrap_err(); + assert!( + matches!(err, StoreError::Unavailable(_)), + "get should return Unavailable" + ); + + let err = store + .delete_conversation_item("tenant_a", "conv_1", "item_1") + .await + .unwrap_err(); + assert!( + matches!(err, StoreError::Unavailable(_)), + "delete_item should return Unavailable" + ); + + let err = store + .conversation_item_position("tenant_a", "conv_1", "item_1") + .await + .unwrap_err(); + assert!( + matches!(err, StoreError::Unavailable(_)), + "position should return Unavailable" + ); + + let err = store.max_item_position("tenant_a", "conv_1").await.unwrap_err(); + assert!( + matches!(err, StoreError::Unavailable(_)), + "max_position should return Unavailable" + ); + + let err = store.delete_conversation_items("tenant_a", "conv_1").await.unwrap_err(); + assert!( + matches!(err, StoreError::Unavailable(_)), + "delete_items should return Unavailable" + ); +} + // ----------------------------------------------------------------------------- // Registry // ----------------------------------------------------------------------------- @@ -1054,6 +1416,318 @@ async fn pg_conversation_tenant_isolation() { assert!(result.is_none(), "tenant_b should not see tenant_a conversation"); } +// ----------------------------------------------------------------------------- +// Conversation Item CRUD (PostgreSQL) +// ----------------------------------------------------------------------------- + +#[tokio::test] +#[ignore] +async fn pg_conversation_items_paginate_ascending_and_descending() { + let store = make_pg_store_with_items().await; + let items = [ + make_conversation_item("item_1", "tenant_a", "conv_1", 1), + make_conversation_item("item_2", "tenant_a", "conv_1", 2), + make_conversation_item("item_3", "tenant_a", "conv_1", 3), + make_conversation_item("item_4", "tenant_a", "conv_1", 4), + ]; + store + .create_conversation_items(&items) + .await + .expect("item insert should succeed"); + + let asc = store + .list_conversation_items("tenant_a", "conv_1", None, 2, true) + .await + .expect("ascending list should succeed"); + assert_item_ids(&asc, &["item_1", "item_2"]); + + let asc_page2 = store + .list_conversation_items("tenant_a", "conv_1", Some("item_2"), 2, true) + .await + .expect("ascending page 2 should succeed"); + assert_item_ids(&asc_page2, &["item_3", "item_4"]); + + let desc = store + .list_conversation_items("tenant_a", "conv_1", None, 2, false) + .await + .expect("descending list should succeed"); + assert_item_ids(&desc, &["item_4", "item_3"]); + + let desc_page2 = store + .list_conversation_items("tenant_a", "conv_1", Some("item_3"), 2, false) + .await + .expect("descending page 2 should succeed"); + assert_item_ids(&desc_page2, &["item_2", "item_1"]); +} + +#[tokio::test] +#[ignore] +async fn pg_conversation_items_paginate_duplicate_positions() { + let store = make_pg_store_with_items().await; + let items = [ + make_conversation_item("item_a", "tenant_a", "conv_1", 1), + make_conversation_item("item_b", "tenant_a", "conv_1", 1), + make_conversation_item("item_c", "tenant_a", "conv_1", 1), + make_conversation_item("item_d", "tenant_a", "conv_1", 2), + ]; + store + .create_conversation_items(&items) + .await + .expect("item insert should succeed"); + + let page1 = store + .list_conversation_items("tenant_a", "conv_1", None, 2, true) + .await + .expect("page 1 should succeed"); + assert_item_ids(&page1, &["item_a", "item_b"]); + + let page2 = store + .list_conversation_items("tenant_a", "conv_1", Some("item_b"), 2, true) + .await + .expect("page 2 should succeed"); + assert_item_ids(&page2, &["item_c", "item_d"]); +} + +#[tokio::test] +#[ignore] +async fn pg_conversation_item_single_ops_scope_to_conversation() { + let store = make_pg_store_with_items().await; + let item_conv1 = make_conversation_item("item_1", "tenant_a", "conv_1", 1); + let item_conv2 = make_conversation_item("item_2", "tenant_a", "conv_2", 1); + store + .create_conversation_items(&[item_conv1, item_conv2]) + .await + .expect("item insert should succeed"); + + let get_wrong_conv = store + .get_conversation_item("tenant_a", "conv_2", "item_1") + .await + .expect("get should succeed"); + assert!(get_wrong_conv.is_none(), "item_1 should not be visible in conv_2"); + + let delete_wrong_conv = store + .delete_conversation_item("tenant_a", "conv_2", "item_1") + .await + .expect("delete should succeed"); + assert!(!delete_wrong_conv, "deleting item_1 from conv_2 should return false"); + + let still_exists = store + .get_conversation_item("tenant_a", "conv_1", "item_1") + .await + .expect("get should succeed"); + assert!(still_exists.is_some(), "item_1 should still exist in conv_1"); +} + +#[tokio::test] +#[ignore] +async fn pg_max_item_position_returns_zero_when_empty() { + let store = make_pg_store_with_items().await; + let max = store + .max_item_position("tenant_a", "conv_1") + .await + .expect("max_item_position should succeed"); + assert_eq!(max, 0, "empty conversation should have max position 0"); +} + +#[tokio::test] +#[ignore] +async fn pg_max_item_position_returns_highest() { + let store = make_pg_store_with_items().await; + let items = [ + make_conversation_item("item_1", "tenant_a", "conv_1", 5), + make_conversation_item("item_2", "tenant_a", "conv_1", 10), + make_conversation_item("item_3", "tenant_a", "conv_1", 3), + ]; + store + .create_conversation_items(&items) + .await + .expect("item insert should succeed"); + + let max = store + .max_item_position("tenant_a", "conv_1") + .await + .expect("max_item_position should succeed"); + assert_eq!(max, 10, "max position should be 10"); +} + +#[tokio::test] +#[ignore] +async fn pg_delete_conversation_items_removes_all() { + let store = make_pg_store_with_items().await; + let items = [ + make_conversation_item("item_1", "tenant_a", "conv_1", 1), + make_conversation_item("item_2", "tenant_a", "conv_1", 2), + ]; + store + .create_conversation_items(&items) + .await + .expect("item insert should succeed"); + + store + .delete_conversation_items("tenant_a", "conv_1") + .await + .expect("delete_conversation_items should succeed"); + + let remaining = store + .list_conversation_items("tenant_a", "conv_1", None, 100, true) + .await + .expect("list should succeed"); + assert!(remaining.is_empty(), "all items should be deleted"); +} + +#[tokio::test] +#[ignore] +async fn pg_conversation_item_tenant_isolation() { + let store = make_pg_store_with_items().await; + let item = make_conversation_item("item_1", "tenant_a", "conv_1", 1); + store + .create_conversation_items(&[item]) + .await + .expect("item insert should succeed"); + + let cross_tenant = store + .get_conversation_item("tenant_b", "conv_1", "item_1") + .await + .expect("cross-tenant get should succeed"); + assert!(cross_tenant.is_none(), "tenant_b should not see tenant_a items"); + + let cross_tenant_list = store + .list_conversation_items("tenant_b", "conv_1", None, 100, true) + .await + .expect("cross-tenant list should succeed"); + assert!(cross_tenant_list.is_empty(), "tenant_b should see no items"); +} + +#[tokio::test] +#[ignore] +async fn pg_conversation_item_upsert_updates_existing() { + let store = make_pg_store_with_items().await; + let original = make_conversation_item("item_1", "tenant_a", "conv_1", 1); + let updated = ConversationItemRecord { + item_data: json!({"type": "message", "role": "assistant", "content": "updated"}), + created_at: 2000, + position: 2, + ..make_conversation_item("item_1", "tenant_a", "conv_1", 1) + }; + + store + .create_conversation_items(&[original]) + .await + .expect("initial item insert should succeed"); + store + .create_conversation_items(&[updated]) + .await + .expect("duplicate item insert should upsert"); + + let fetched = store + .get_conversation_item("tenant_a", "conv_1", "item_1") + .await + .expect("get should succeed") + .expect("item should exist after upsert"); + + assert_eq!(fetched.position, 2, "upsert should update position"); + assert_eq!(fetched.created_at, 2000, "upsert should update created_at"); + assert_eq!( + fetched.item_data, + json!({"type": "message", "role": "assistant", "content": "updated"}), + "upsert should update item data" + ); +} + +#[tokio::test] +#[ignore] +async fn pg_get_conversation_item_returns_all_fields() { + let store = make_pg_store_with_items().await; + let item = ConversationItemRecord { + item_id: "item_99".to_owned(), + tenant_id: "tenant_a".to_owned(), + conversation_id: "conv_1".to_owned(), + item_data: json!({"type": "function_call", "name": "search"}), + created_at: 5000, + position: 42, + }; + store + .create_conversation_items(&[item]) + .await + .expect("item insert should succeed"); + + let fetched = store + .get_conversation_item("tenant_a", "conv_1", "item_99") + .await + .expect("get should succeed") + .expect("item should exist"); + + assert_eq!(fetched.item_id, "item_99", "item_id should match"); + assert_eq!(fetched.tenant_id, "tenant_a", "tenant_id should match"); + assert_eq!(fetched.conversation_id, "conv_1", "conversation_id should match"); + assert_eq!( + fetched.item_data, + json!({"type": "function_call", "name": "search"}), + "item_data should round-trip" + ); + assert_eq!(fetched.created_at, 5000, "created_at should match"); + assert_eq!(fetched.position, 42, "position should match"); +} + +#[tokio::test] +#[ignore] +async fn pg_list_conversation_items_nonexistent_cursor_returns_empty() { + let store = make_pg_store_with_items().await; + let item = make_conversation_item("item_1", "tenant_a", "conv_1", 1); + store + .create_conversation_items(&[item]) + .await + .expect("item insert should succeed"); + + let result = store + .list_conversation_items("tenant_a", "conv_1", Some("nonexistent"), 10, true) + .await + .expect("list with nonexistent cursor should succeed"); + + assert!(result.is_empty(), "nonexistent cursor item should return empty list"); +} + +#[tokio::test] +#[ignore] +async fn pg_delete_conversation_cascades_to_items() { + let store = make_pg_store_with_items().await; + let conv = ConversationRecord { + conversation_id: "conv_1".to_owned(), + tenant_id: "tenant_a".to_owned(), + created_at: 1000, + metadata: json!({}), + messages: json!([]), + }; + store + .upsert_conversation(&conv) + .await + .expect("conversation upsert should succeed"); + + let items = [ + make_conversation_item("item_1", "tenant_a", "conv_1", 1), + make_conversation_item("item_2", "tenant_a", "conv_1", 2), + ]; + store + .create_conversation_items(&items) + .await + .expect("item insert should succeed"); + + let deleted = store + .delete_conversation("tenant_a", "conv_1") + .await + .expect("delete_conversation should succeed"); + assert!(deleted, "conversation should have been deleted"); + + let remaining = store + .list_conversation_items("tenant_a", "conv_1", None, 100, true) + .await + .expect("list should succeed"); + assert!( + remaining.is_empty(), + "items should be cascade-deleted with conversation" + ); +} + // ----------------------------------------------------------------------------- // Test Utilities // ----------------------------------------------------------------------------- @@ -1064,6 +1738,56 @@ async fn make_store() -> SqliteResponseStore { .expect("store creation should succeed") } +async fn make_store_with_items() -> SqliteResponseStore { + SqliteResponseStore::new( + "sqlite::memory:", + "test_responses", + "test_conversation_messages", + Some("test_conversation_items"), + ) + .await + .expect("store creation should succeed") +} + +async fn make_pg_store_with_items() -> PostgresResponseStore { + let url = pg_database_url(); + let suffix = pg_unique_suffix(); + let responses_table = format!("test_responses_{suffix}"); + let conversations_table = format!("test_conversations_{suffix}"); + let items_table = format!("test_conversation_items_{suffix}"); + PostgresResponseStore::new( + &url, + &responses_table, + &conversations_table, + Some(&items_table), + Some(SslMode::Disable), + None, + ) + .await + .expect("postgres store creation should succeed") +} + +fn make_conversation_item( + item_id: &str, + tenant_id: &str, + conversation_id: &str, + position: i64, +) -> ConversationItemRecord { + ConversationItemRecord { + item_id: item_id.to_owned(), + tenant_id: tenant_id.to_owned(), + conversation_id: conversation_id.to_owned(), + item_data: json!({"type": "message", "role": "user", "content": "test"}), + created_at: 1000, + position, + } +} + +fn assert_item_ids(items: &[ConversationItemRecord], expected: &[&str]) { + let ids: Vec<&str> = items.iter().map(|i| i.item_id.as_str()).collect(); + assert_eq!(ids, expected, "item IDs should match expected order"); +} + fn make_response_record(id: &str, tenant_id: &str, created_at: i64) -> ResponseRecord { ResponseRecord { id: id.to_owned(), diff --git a/filter/src/builtins/http/ai/store/trait_def.rs b/filter/src/builtins/http/ai/store/trait_def.rs index 773d68b4..5b69c31b 100644 --- a/filter/src/builtins/http/ai/store/trait_def.rs +++ b/filter/src/builtins/http/ai/store/trait_def.rs @@ -1,11 +1,12 @@ // SPDX-License-Identifier: MIT // Copyright (c) 2026 Praxis Contributors -//! The [`ResponseStore`] async trait for response persistence. +//! The [`ResponseStore`] and [`ConversationItemStore`] async traits +//! for response and conversation item persistence. use async_trait::async_trait; -use super::types::{ConversationRecord, ResponseRecord, StoreError}; +use super::types::{ConversationItemRecord, ConversationRecord, ResponseRecord, StoreError}; // ----------------------------------------------------------------------------- // ResponseStore Trait @@ -83,3 +84,119 @@ pub trait ResponseStore: Send + Sync { /// Returns [`StoreError`] if the database operation fails. async fn delete_conversation(&self, tenant_id: &str, conversation_id: &str) -> Result; } + +// ----------------------------------------------------------------------------- +// ConversationItemStore Trait +// ----------------------------------------------------------------------------- + +/// Async persistence layer for conversation item records. +/// +/// Provides CRUD operations for individual items within a +/// conversation. Every query is tenant- and conversation-scoped. +/// Implementors must also implement [`ResponseStore`] since they +/// share the same backing database and connection pool. +#[async_trait] +#[cfg_attr(not(test), expect(dead_code, reason = "used by conversations filter in #623"))] +pub trait ConversationItemStore: Send + Sync { + /// Insert one or more conversation items. + /// + /// Items are inserted individually. Duplicate `item_id` + + /// `tenant_id` pairs are upserted. + /// + /// # Errors + /// + /// Returns [`StoreError`] if the items table is not configured + /// or a database operation fails. + async fn create_conversation_items(&self, items: &[ConversationItemRecord]) -> Result<(), StoreError>; + + /// List items for a conversation with cursor-based pagination. + /// + /// Returns items ordered by `(position, item_id)`. When + /// `after_item_id` is `Some`, only items whose ordering key + /// compares past the cursor item are returned. + /// + /// # Errors + /// + /// Returns [`StoreError`] if the items table is not configured + /// or a database operation fails. + #[expect( + clippy::too_many_arguments, + reason = "pagination query keeps scope and cursor fields explicit" + )] + async fn list_conversation_items( + &self, + tenant_id: &str, + conversation_id: &str, + after_item_id: Option<&str>, + limit: u32, + ascending: bool, + ) -> Result, StoreError>; + + /// Retrieve a single item by ID, scoped to tenant and + /// conversation. + /// + /// Returns `None` if the item does not exist or belongs to a + /// different tenant or conversation. + /// + /// # Errors + /// + /// Returns [`StoreError`] if the items table is not configured + /// or a database operation fails. + async fn get_conversation_item( + &self, + tenant_id: &str, + conversation_id: &str, + item_id: &str, + ) -> Result, StoreError>; + + /// Delete a single item by ID, scoped to tenant and + /// conversation. + /// + /// Returns `true` if an item was deleted, `false` if no matching + /// item existed. + /// + /// # Errors + /// + /// Returns [`StoreError`] if the items table is not configured + /// or a database operation fails. + async fn delete_conversation_item( + &self, + tenant_id: &str, + conversation_id: &str, + item_id: &str, + ) -> Result; + + /// Look up the position of a specific item. + /// + /// Returns `None` if the item does not exist in the given + /// tenant and conversation scope. + /// + /// # Errors + /// + /// Returns [`StoreError`] if the items table is not configured + /// or a database operation fails. + async fn conversation_item_position( + &self, + tenant_id: &str, + conversation_id: &str, + item_id: &str, + ) -> Result, StoreError>; + + /// Return the maximum item position for a conversation. + /// + /// Returns `0` if the conversation has no items. + /// + /// # Errors + /// + /// Returns [`StoreError`] if the items table is not configured + /// or a database operation fails. + async fn max_item_position(&self, tenant_id: &str, conversation_id: &str) -> Result; + + /// Delete all items belonging to a conversation. + /// + /// # Errors + /// + /// Returns [`StoreError`] if the items table is not configured + /// or a database operation fails. + async fn delete_conversation_items(&self, tenant_id: &str, conversation_id: &str) -> Result<(), StoreError>; +} diff --git a/filter/src/builtins/http/ai/store/types.rs b/filter/src/builtins/http/ai/store/types.rs index 944f6c7a..ffc6ceba 100644 --- a/filter/src/builtins/http/ai/store/types.rs +++ b/filter/src/builtins/http/ai/store/types.rs @@ -78,7 +78,7 @@ pub struct ConversationRecord { /// Items are the individual entries within a conversation (messages, /// tool calls, tool outputs, etc.). Stored as opaque JSON blobs with /// a monotonic position for ordering. -#[expect(dead_code, reason = "used by ConversationItemStore in #631")] +#[derive(Debug)] pub struct ConversationItemRecord { /// Unique item ID (e.g., `"item_abc123"`). pub item_id: String,