Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions filter/src/builtins/http/ai/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ pub(crate) use schemas::{validate_identifier as validate_table_identifier, valid
pub use self::{
postgres::{PostgresResponseStore, SslMode},
sqlite::SqliteResponseStore,
trait_def::ResponseStore,
types::{ConversationRecord, ResponseRecord, StoreError},
trait_def::{ConversationItemStore, ResponseStore},
types::{ConversationItemRecord, ConversationRecord, ResponseRecord, StoreError},
};

// -----------------------------------------------------------------------------
Expand Down
287 changes: 284 additions & 3 deletions filter/src/builtins/http/ai/store/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use tracing::info;

use super::{
schemas::{TableNames, generate_ddl, validate_postgres_identifiers},
trait_def::ResponseStore,
types::{ConversationRecord, ResponseRecord, StoreError},
trait_def::{ConversationItemStore, ResponseStore},
types::{ConversationItemRecord, ConversationRecord, ResponseRecord, StoreError},
};

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -282,20 +282,275 @@ impl ResponseStore for PostgresResponseStore {
}

async fn delete_conversation(&self, tenant_id: &str, conversation_id: &str) -> Result<bool, StoreError> {
let mut tx = self
.pool
.begin()
.await
.map_err(|e| StoreError::Database(e.to_string()))?;

if let Some(items_table) = &self.tables.items {
let items_sql = format!("DELETE FROM {items_table} WHERE tenant_id = $1 AND conversation_id = $2");
sqlx::query(&items_sql)
.bind(tenant_id)
.bind(conversation_id)
.execute(&mut *tx)
.await
.map_err(|e| StoreError::Database(e.to_string()))?;
}

let sql = format!(
"DELETE FROM {} WHERE conversation_id = $1 AND tenant_id = $2",
self.tables.conversations
);

let result = sqlx::query(&sql)
.bind(conversation_id)
.bind(tenant_id)
.execute(&mut *tx)
.await
.map_err(|e| StoreError::Database(e.to_string()))?;

tx.commit().await.map_err(|e| StoreError::Database(e.to_string()))?;
Ok(result.rows_affected() > 0)
}
}

#[expect(
clippy::too_many_lines,
reason = "async_trait counts the store method group as one expansion"
)]
#[async_trait]
impl ConversationItemStore for PostgresResponseStore {
async fn create_conversation_items(&self, items: &[ConversationItemRecord]) -> Result<(), StoreError> {
let table = self
.tables
.items
.as_deref()
.ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?;

let mut tx = self
.pool
.begin()
.await
.map_err(|e| StoreError::Database(e.to_string()))?;

for item in items {
let item_data =
serde_json::to_string(&item.item_data).map_err(|e| StoreError::Serialization(e.to_string()))?;

let sql = format!(
"INSERT INTO {table} \
(item_id, tenant_id, conversation_id, item_data, created_at, position) \
VALUES ($1, $2, $3, $4, $5, $6) \
ON CONFLICT(item_id, tenant_id) DO UPDATE SET \
conversation_id = EXCLUDED.conversation_id, \
item_data = EXCLUDED.item_data, \
created_at = EXCLUDED.created_at, \
position = EXCLUDED.position"
);

sqlx::query(&sql)
.bind(&item.item_id)
.bind(&item.tenant_id)
.bind(&item.conversation_id)
.bind(&item_data)
.bind(item.created_at)
.bind(item.position)
.execute(&mut *tx)
.await
.map_err(|e| StoreError::Database(e.to_string()))?;
}

tx.commit().await.map_err(|e| StoreError::Database(e.to_string()))?;
Ok(())
}

async fn list_conversation_items(
&self,
tenant_id: &str,
conversation_id: &str,
after_item_id: Option<&str>,
limit: u32,
ascending: bool,
) -> Result<Vec<ConversationItemRecord>, StoreError> {
let table = self
.tables
.items
.as_deref()
.ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?;

let direction = if ascending { "ASC" } else { "DESC" };
let cursor_operator = if ascending { ">" } else { "<" };

let rows = if let Some(item_id) = after_item_id {
let Some(position) = self
.conversation_item_position(tenant_id, conversation_id, item_id)
.await?
else {
return Ok(Vec::new());
};
let sql = format!(
"SELECT item_id, tenant_id, conversation_id, item_data, created_at, position \
FROM {table} \
WHERE tenant_id = $1 AND conversation_id = $2 \
AND (position {cursor_operator} $3 \
OR (position = $4 AND item_id {cursor_operator} $5)) \
ORDER BY position {direction}, item_id {direction} \
LIMIT $6"
);
sqlx::query(&sql)
.bind(tenant_id)
.bind(conversation_id)
.bind(position)
.bind(position)
.bind(item_id)
.bind(i64::from(limit))
.fetch_all(&self.pool)
.await
.map_err(|e| StoreError::Database(e.to_string()))?
} else {
let sql = format!(
"SELECT item_id, tenant_id, conversation_id, item_data, created_at, position \
FROM {table} \
WHERE tenant_id = $1 AND conversation_id = $2 \
ORDER BY position {direction}, item_id {direction} \
LIMIT $3"
);
sqlx::query(&sql)
.bind(tenant_id)
.bind(conversation_id)
.bind(i64::from(limit))
.fetch_all(&self.pool)
.await
.map_err(|e| StoreError::Database(e.to_string()))?
};

rows.iter().map(row_to_conversation_item_record).collect()
}

async fn get_conversation_item(
&self,
tenant_id: &str,
conversation_id: &str,
item_id: &str,
) -> Result<Option<ConversationItemRecord>, StoreError> {
let table = self
.tables
.items
.as_deref()
.ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?;

let sql = format!(
"SELECT item_id, tenant_id, conversation_id, item_data, created_at, position \
FROM {table} \
WHERE item_id = $1 AND tenant_id = $2 AND conversation_id = $3"
);

let row = sqlx::query(&sql)
.bind(item_id)
.bind(tenant_id)
.bind(conversation_id)
.fetch_optional(&self.pool)
.await
.map_err(|e| StoreError::Database(e.to_string()))?;

row.map(|r| row_to_conversation_item_record(&r)).transpose()
}

async fn delete_conversation_item(
&self,
tenant_id: &str,
conversation_id: &str,
item_id: &str,
) -> Result<bool, StoreError> {
let table = self
.tables
.items
.as_deref()
.ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?;

let sql = format!("DELETE FROM {table} WHERE item_id = $1 AND tenant_id = $2 AND conversation_id = $3");

let result = sqlx::query(&sql)
.bind(item_id)
.bind(tenant_id)
.bind(conversation_id)
.execute(&self.pool)
.await
.map_err(|e| StoreError::Database(e.to_string()))?;

Ok(result.rows_affected() > 0)
}

async fn conversation_item_position(
&self,
tenant_id: &str,
conversation_id: &str,
item_id: &str,
) -> Result<Option<i64>, StoreError> {
let table = self
.tables
.items
.as_deref()
.ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?;

let sql = format!(
"SELECT position FROM {table} \
WHERE item_id = $1 AND tenant_id = $2 AND conversation_id = $3"
);

let row = sqlx::query(&sql)
.bind(item_id)
.bind(tenant_id)
.bind(conversation_id)
.fetch_optional(&self.pool)
.await
.map_err(|e| StoreError::Database(e.to_string()))?;

row.map(|r| r.try_get("position").map_err(|e| StoreError::Database(e.to_string())))
.transpose()
}

async fn max_item_position(&self, tenant_id: &str, conversation_id: &str) -> Result<i64, StoreError> {
let table = self
.tables
.items
.as_deref()
.ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?;

let sql = format!(
"SELECT COALESCE(MAX(position), 0) AS max_pos \
FROM {table} \
WHERE tenant_id = $1 AND conversation_id = $2"
);

let row = sqlx::query(&sql)
.bind(tenant_id)
.bind(conversation_id)
.fetch_one(&self.pool)
.await
.map_err(|e| StoreError::Database(e.to_string()))?;

row.try_get("max_pos").map_err(|e| StoreError::Database(e.to_string()))
}

async fn delete_conversation_items(&self, tenant_id: &str, conversation_id: &str) -> Result<(), StoreError> {
let table = self
.tables
.items
.as_deref()
.ok_or_else(|| StoreError::Unavailable("items table not configured".to_owned()))?;

let sql = format!("DELETE FROM {table} WHERE tenant_id = $1 AND conversation_id = $2");

sqlx::query(&sql)
.bind(tenant_id)
.bind(conversation_id)
.execute(&self.pool)
.await
.map_err(|e| StoreError::Database(e.to_string()))?;

Ok(())
}
}

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -328,6 +583,32 @@ fn row_to_response_record(row: &PgRow) -> Result<ResponseRecord, StoreError> {
})
}

/// Convert a sqlx row to a [`ConversationItemRecord`].
fn row_to_conversation_item_record(row: &PgRow) -> Result<ConversationItemRecord, StoreError> {
let item_data_json: String = row
.try_get("item_data")
.map_err(|e| StoreError::Database(e.to_string()))?;

Ok(ConversationItemRecord {
item_id: row
.try_get("item_id")
.map_err(|e| StoreError::Database(e.to_string()))?,
tenant_id: row
.try_get("tenant_id")
.map_err(|e| StoreError::Database(e.to_string()))?,
conversation_id: row
.try_get("conversation_id")
.map_err(|e| StoreError::Database(e.to_string()))?,
item_data: serde_json::from_str(&item_data_json).map_err(|e| StoreError::Serialization(e.to_string()))?,
created_at: row
.try_get("created_at")
.map_err(|e| StoreError::Database(e.to_string()))?,
position: row
.try_get("position")
.map_err(|e| StoreError::Database(e.to_string()))?,
})
}

/// Convert a sqlx row to a [`ConversationRecord`].
fn row_to_conversation_record(row: &PgRow) -> Result<ConversationRecord, StoreError> {
let messages_json: String = row
Expand Down
Loading
Loading