Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7679bd5
feat(storage): add conversation memory store substrate
zhoug9127 Apr 8, 2026
0996098
fix(memory): align header naming and trim redundant tests
zhoug9127 Apr 8, 2026
20bc468
docs(memory): mark request headers as placeholders
zhoug9127 Apr 8, 2026
7d32b91
perf(openai): avoid cloning headers for memory context
zhoug9127 Apr 8, 2026
2c035f5
fix: tighten memory writer validation and context initialization
zhoug9127 Apr 8, 2026
f25a193
fix(storage): gate hook+memory-writer only when ltm store is enabled
zhoug9127 Apr 8, 2026
c1641f6
fix(clippy): move openai context tests to end of file
zhoug9127 Apr 8, 2026
a7ff412
fix(memory): accept 'on' flag and add targeted API docs
zhoug9127 Apr 9, 2026
a18196b
test(config): satisfy nightly rustfmt import ordering
zhoug9127 Apr 9, 2026
6d963c7
docs(memory): clarify runtime validation and expand API docs
zhoug9127 Apr 9, 2026
3b3add5
docs(memory): raise API doc coverage for router and storage
zhoug9127 Apr 9, 2026
dfbc99e
fix(memory): expose header constants and preserve streaming context
zhoug9127 Apr 9, 2026
5239b32
test(memory): cover conversation memory writer behavior
zhoug9127 Apr 9, 2026
785eb68
merge: sync feat/conversation-memory-store-substrate with origin/main
zhoug9127 Apr 9, 2026
01c192f
fix(app_context): enforce memory writer invariants in builder
zhoug9127 Apr 9, 2026
2144f06
test(config): cover legacy memory_runtime serde default
zhoug9127 Apr 9, 2026
650599b
refactor(memory): dedupe writer validation and trim internal docs
zhoug9127 Apr 10, 2026
155b1b0
fix(memory): default missing runtime flags during serde
zhoug9127 Apr 10, 2026
fa3f2e0
test(memory): assert all inactive flags for missing headers
zhoug9127 Apr 10, 2026
6a1c95a
Merge branch 'main' into feat/conversation-memory-store-substrate
zhoug9127 Apr 10, 2026
4fb8d0c
Merge branch 'main' into feat/conversation-memory-store-substrate
zhoug9127 Apr 14, 2026
2953e2c
refactor(memory): address PR review comments
zhoug9127 Apr 14, 2026
cbae33d
fix(memory): fail fast for unsupported store config
zhoug9127 Apr 14, 2026
c82abb7
refactor(memory): centralize headers and dedupe context init
zhoug9127 Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 126 additions & 97 deletions crates/data_connector/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ use url::Url;

use crate::{
config::{HistoryBackend, OracleConfig, PostgresConfig, RedisConfig},
core::{ConversationItemStorage, ConversationStorage, ResponseStorage},
core::{
ConversationItemStorage, ConversationMemoryWriter, ConversationStorage, ResponseStorage,
},
hooked::{HookedConversationItemStorage, HookedConversationStorage, HookedResponseStorage},
hooks::StorageHook,
memory::{MemoryConversationItemStorage, MemoryConversationStorage, MemoryResponseStorage},
memory::{
MemoryConversationItemStorage, MemoryConversationMemoryWriter, MemoryConversationStorage,
MemoryResponseStorage,
},
noop::{NoOpConversationItemStorage, NoOpConversationStorage, NoOpResponseStorage},
Comment thread
zhoug9127 marked this conversation as resolved.
oracle::{OracleConversationItemStorage, OracleConversationStorage, OracleResponseStorage},
postgres::{
Expand All @@ -22,13 +27,17 @@ use crate::{
},
};

/// Type alias for the storage tuple returned by factory functions.
/// This avoids clippy::type_complexity warnings while keeping Arc explicit.
pub type StorageTuple = (
Arc<dyn ResponseStorage>,
Arc<dyn ConversationStorage>,
Arc<dyn ConversationItemStorage>,
);
/// Complete storage handles returned by the factory, including optional memory writer.
pub struct StorageBundle {
/// Storage backend for responses.
pub response_storage: Arc<dyn ResponseStorage>,
/// Storage backend for conversations.
pub conversation_storage: Arc<dyn ConversationStorage>,
/// Storage backend for conversation items.
pub conversation_item_storage: Arc<dyn ConversationItemStorage>,
/// Optional writer used by long-term-memory store flows.
pub conversation_memory_writer: Option<Arc<dyn ConversationMemoryWriter>>,
}

/// Configuration for creating storage backends
pub struct StorageFactoryConfig<'a> {
Expand All @@ -42,33 +51,29 @@ pub struct StorageFactoryConfig<'a> {
pub hook: Option<Arc<dyn StorageHook>>,
}

/// Create all three storage backends based on configuration.
///
/// # Arguments
/// * `config` - Storage factory configuration
///
/// # Returns
/// Tuple of (response_storage, conversation_storage, conversation_item_storage)
/// Create all configured storage handles, including optional conversation memory writer.
///
/// # Errors
/// Returns error string if required configuration is missing or initialization fails
pub async fn create_storage(config: StorageFactoryConfig<'_>) -> Result<StorageTuple, String> {
let (resp, conv, items): StorageTuple = match config.backend {
pub async fn create_storage(config: StorageFactoryConfig<'_>) -> Result<StorageBundle, String> {
let bundle = match config.backend {
HistoryBackend::Memory => {
info!("Initializing data connector: Memory");
(
Arc::new(MemoryResponseStorage::new()),
Arc::new(MemoryConversationStorage::new()),
Arc::new(MemoryConversationItemStorage::new()),
)
StorageBundle {
response_storage: Arc::new(MemoryResponseStorage::new()),
conversation_storage: Arc::new(MemoryConversationStorage::new()),
conversation_item_storage: Arc::new(MemoryConversationItemStorage::new()),
conversation_memory_writer: Some(Arc::new(MemoryConversationMemoryWriter::new())),
}
}
HistoryBackend::None => {
info!("Initializing data connector: None (no persistence)");
(
Arc::new(NoOpResponseStorage::new()),
Arc::new(NoOpConversationStorage::new()),
Arc::new(NoOpConversationItemStorage::new()),
)
StorageBundle {
response_storage: Arc::new(NoOpResponseStorage::new()),
conversation_storage: Arc::new(NoOpConversationStorage::new()),
conversation_item_storage: Arc::new(NoOpConversationItemStorage::new()),
conversation_memory_writer: None,
}
}
HistoryBackend::Oracle => {
let oracle_cfg = config
Expand Down Expand Up @@ -140,18 +145,28 @@ pub async fn create_storage(config: StorageFactoryConfig<'_>) -> Result<StorageT
// Wrap backends in hooked storage when a hook is provided
if let Some(hook) = config.hook {
info!("Wrapping storage backends with hook");
Ok((
Arc::new(HookedResponseStorage::new(resp, hook.clone())),
Arc::new(HookedConversationStorage::new(conv, hook.clone())),
Arc::new(HookedConversationItemStorage::new(items, hook)),
))
Ok(StorageBundle {
response_storage: Arc::new(HookedResponseStorage::new(
bundle.response_storage,
hook.clone(),
)),
conversation_storage: Arc::new(HookedConversationStorage::new(
bundle.conversation_storage,
hook.clone(),
)),
conversation_item_storage: Arc::new(HookedConversationItemStorage::new(
bundle.conversation_item_storage,
hook,
)),
conversation_memory_writer: bundle.conversation_memory_writer,
})
Comment thread
coderabbitai[bot] marked this conversation as resolved.
} else {
Ok((resp, conv, items))
Ok(bundle)
}
}

/// Create Oracle storage backends with a single shared connection pool.
fn create_oracle_storage(oracle_cfg: &OracleConfig) -> Result<StorageTuple, String> {
fn create_oracle_storage(oracle_cfg: &OracleConfig) -> Result<StorageBundle, String> {
use crate::oracle::OracleStore;

let store = OracleStore::new(
Expand All @@ -163,14 +178,15 @@ fn create_oracle_storage(oracle_cfg: &OracleConfig) -> Result<StorageTuple, Stri
],
)?;

Ok((
Arc::new(OracleResponseStorage::new(store.clone())),
Arc::new(OracleConversationStorage::new(store.clone())),
Arc::new(OracleConversationItemStorage::new(store)),
))
Ok(StorageBundle {
response_storage: Arc::new(OracleResponseStorage::new(store.clone())),
conversation_storage: Arc::new(OracleConversationStorage::new(store.clone())),
conversation_item_storage: Arc::new(OracleConversationItemStorage::new(store)),
conversation_memory_writer: None,
})
}

async fn create_postgres_storage(postgres_cfg: &PostgresConfig) -> Result<StorageTuple, String> {
async fn create_postgres_storage(postgres_cfg: &PostgresConfig) -> Result<StorageBundle, String> {
let store = PostgresStore::new(postgres_cfg.clone())?;
let postgres_resp = PostgresResponseStorage::new(store.clone())
.await
Expand All @@ -191,24 +207,26 @@ async fn create_postgres_storage(postgres_cfg: &PostgresConfig) -> Result<Storag
store.ensure_response_indexes().await?;
}

Ok((
Arc::new(postgres_resp),
Arc::new(postgres_conv),
Arc::new(postgres_item),
))
Ok(StorageBundle {
response_storage: Arc::new(postgres_resp),
conversation_storage: Arc::new(postgres_conv),
conversation_item_storage: Arc::new(postgres_item),
conversation_memory_writer: None,
})
}

fn create_redis_storage(redis_cfg: &RedisConfig) -> Result<StorageTuple, String> {
fn create_redis_storage(redis_cfg: &RedisConfig) -> Result<StorageBundle, String> {
let store = RedisStore::new(redis_cfg.clone())?;
let redis_resp = RedisResponseStorage::new(store.clone());
let redis_conv = RedisConversationStorage::new(store.clone());
let redis_item = RedisConversationItemStorage::new(store);

Ok((
Arc::new(redis_resp),
Arc::new(redis_conv),
Arc::new(redis_item),
))
Ok(StorageBundle {
response_storage: Arc::new(redis_resp),
conversation_storage: Arc::new(redis_conv),
conversation_item_storage: Arc::new(redis_item),
conversation_memory_writer: None,
})
}

#[cfg(test)]
Expand All @@ -227,7 +245,12 @@ mod tests {
redis: None,
hook: None,
};
let (resp, conv, items) = create_storage(config).await.unwrap();
let bundle = create_storage(config).await.unwrap();
let (resp, conv, items) = (
bundle.response_storage,
bundle.conversation_storage,
bundle.conversation_item_storage,
);

// Verify they work end-to-end
let mut response = StoredResponse::new(None);
Expand Down Expand Up @@ -271,7 +294,8 @@ mod tests {
redis: None,
hook: None,
};
let (resp, conv, _items) = create_storage(config).await.unwrap();
let bundle = create_storage(config).await.unwrap();
let (resp, conv) = (bundle.response_storage, bundle.conversation_storage);

// NoOp storage should accept writes but return nothing on reads
let mut response = StoredResponse::new(None);
Expand All @@ -285,47 +309,83 @@ mod tests {
.is_none());
}

#[tokio::test]
async fn test_create_storage_memory_exposes_memory_writer() {
let bundle = create_storage(StorageFactoryConfig {
backend: &HistoryBackend::Memory,
oracle: None,
postgres: None,
redis: None,
hook: None,
})
.await
.unwrap();

assert!(bundle.conversation_memory_writer.is_some());
}

#[tokio::test]
async fn test_create_storage_none_does_not_expose_memory_writer() {
let bundle = create_storage(StorageFactoryConfig {
backend: &HistoryBackend::None,
oracle: None,
postgres: None,
redis: None,
hook: None,
})
.await
.unwrap();

assert!(bundle.conversation_memory_writer.is_none());
}

#[tokio::test]
async fn test_create_storage_oracle_missing_config() {
let config = StorageFactoryConfig {
let err = create_storage(StorageFactoryConfig {
backend: &HistoryBackend::Oracle,
oracle: None,
postgres: None,
redis: None,
hook: None,
};
let err = create_storage(config).await.err().expect("should fail");
})
.await
.err()
.expect("should fail");
assert!(err.contains("oracle configuration is required"));
}

#[tokio::test]
async fn test_create_storage_postgres_missing_config() {
let config = StorageFactoryConfig {
let err = create_storage(StorageFactoryConfig {
backend: &HistoryBackend::Postgres,
oracle: None,
postgres: None,
redis: None,
hook: None,
};
let err = create_storage(config).await.err().expect("should fail");
})
.await
.err()
.expect("should fail");
assert!(err.contains("Postgres configuration is required"));
}

#[tokio::test]
async fn test_create_storage_redis_missing_config() {
let config = StorageFactoryConfig {
let err = create_storage(StorageFactoryConfig {
backend: &HistoryBackend::Redis,
oracle: None,
postgres: None,
redis: None,
hook: None,
};
let err = create_storage(config).await.err().expect("should fail");
})
.await
.err()
.expect("should fail");
assert!(err.contains("Redis configuration is required"));
}

#[tokio::test]
async fn test_create_storage_with_hook() {
async fn test_create_storage_with_hook_keeps_memory_writer_for_runtime_gating() {
use std::sync::Arc;

use async_trait::async_trait;
Expand Down Expand Up @@ -367,38 +427,7 @@ mod tests {
redis: None,
hook: Some(Arc::new(NoOpHook)),
};
let (resp, conv, items) = create_storage(config).await.unwrap();

// Verify hooked storage works end-to-end
let mut response = StoredResponse::new(None);
response.input = json!("hello");
let id = resp.store_response(response).await.unwrap();
assert!(resp.get_response(&id).await.unwrap().is_some());

let conversation = conv
.create_conversation(NewConversation {
id: None,
metadata: None,
})
.await
.unwrap();
assert!(conv
.get_conversation(&conversation.id)
.await
.unwrap()
.is_some());

let item = items
.create_item(NewConversationItem {
id: None,
response_id: None,
item_type: "message".to_string(),
role: Some("user".to_string()),
content: json!([]),
status: Some("completed".to_string()),
})
.await
.unwrap();
assert!(items.get_item(&item.id).await.unwrap().is_some());
let bundle = create_storage(config).await.expect("should succeed");
assert!(bundle.conversation_memory_writer.is_some());
}
}
2 changes: 1 addition & 1 deletion crates/data_connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub use context::{
RequestContext,
};
// Re-export factory
pub use factory::{create_storage, StorageFactoryConfig};
pub use factory::{create_storage, StorageBundle, StorageFactoryConfig};
pub use hooks::{BeforeHookResult, ExtraColumns, HookError, StorageHook, StorageOperation};
// Re-export memory implementations for testing
pub use memory::{MemoryConversationItemStorage, MemoryConversationStorage, MemoryResponseStorage};
Expand Down
Loading
Loading