From 5e4d531b624917574b621ef377e2b1db77cfd2cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Han?= Date: Mon, 22 Jun 2026 15:32:02 +0200 Subject: [PATCH 1/2] feat(filter): improve store filter init, scoping, and test coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire ResponseStoreRegistry through server reload/watcher pipeline. Eagerly init store in on_request_body for StreamBuffer pre-read, replace init_store with get_or_init_store retry logic, dedup shared constants to responses/mod.rs, and update integration test assertions for the new messages schema. Signed-off-by: Sébastien Han --- .../builtins/http/ai/openai/responses/mod.rs | 10 + .../http/ai/openai/responses/store/filter.rs | 213 ++++++-- .../http/ai/openai/responses/store/tests.rs | 453 +++++++++++++++++- filter/src/builtins/http/ai/store/mod.rs | 6 + filter/src/builtins/http/ai/store/tests.rs | 20 + server/src/reload.rs | 81 +++- server/src/server.rs | 7 - server/src/watcher.rs | 25 +- .../suite/examples/openai_response_store.rs | 12 +- .../openai_response_store_postgres.rs | 11 +- tests/utils/src/proxy.rs | 1 + tls/src/client_auth.rs | 1 - 12 files changed, 748 insertions(+), 92 deletions(-) diff --git a/filter/src/builtins/http/ai/openai/responses/mod.rs b/filter/src/builtins/http/ai/openai/responses/mod.rs index 4053dc00..e6090bc5 100644 --- a/filter/src/builtins/http/ai/openai/responses/mod.rs +++ b/filter/src/builtins/http/ai/openai/responses/mod.rs @@ -67,6 +67,16 @@ use crate::{ /// Maximum length of a body-derived value promoted to headers or filter results. const MAX_PROMOTED_VALUE_LEN: usize = 256; +/// Default store name used when registering the response store in the +/// per-request registry. +pub(crate) const DEFAULT_STORE_NAME: &str = "default"; + +/// Metadata key for tenant isolation. +pub(crate) const TENANT_METADATA_KEY: &str = "responses.tenant_id"; + +/// Fallback tenant ID when no tenant metadata is present. +pub(crate) const DEFAULT_TENANT_ID: &str = "default"; + // ----------------------------------------------------------------------------- // ResponsesFormatFilter // ----------------------------------------------------------------------------- diff --git a/filter/src/builtins/http/ai/openai/responses/store/filter.rs b/filter/src/builtins/http/ai/openai/responses/store/filter.rs index 9b7e2381..70964726 100644 --- a/filter/src/builtins/http/ai/openai/responses/store/filter.rs +++ b/filter/src/builtins/http/ai/openai/responses/store/filter.rs @@ -10,9 +10,9 @@ //! The filter spans three phases, each refining the "should we //! persist?" decision as new information becomes available: //! -//! - **`on_request`**: reads classifier metadata to decide whether the request is persistable (POST, responses format, -//! store enabled, non-streaming). Lazily initializes the store backend. Sets `responses.skip_persist` metadata on -//! store init failure. +//! - **`on_request`**: reads classifier metadata to decide whether the request needs the store (persistable POST or +//! `previous_response_id`). Lazily initializes the store backend when needed. Sets `responses.skip_persist` metadata +//! on store init failure for requests that would otherwise persist. //! //! - **`on_response`**: re-checks skip conditions, then inspects the response status and content-type. Non-2xx or //! non-JSON responses set `responses.skip_persist` and bail early. @@ -29,11 +29,16 @@ //! intentional. Each phase learns something new (request metadata, //! response headers, body bytes), and early exit avoids wasted //! work (store init, body buffering, JSON parsing). Cross-phase -//! state is carried exclusively through string metadata in +//! control state is carried through string metadata in //! [`filter_metadata`], following the same pattern as the A2A -//! filter. +//! filter. The original request `input` is carried through typed +//! per-filter state because it can be arbitrary JSON and is not part +//! of the Responses API response object. When rehydrate populated +//! [`ResponsesState`], that state is used as the stored message +//! history. //! //! [`filter_metadata`]: crate::HttpFilterContext::filter_metadata +//! [`ResponsesState`]: super::super::state::ResponsesState use std::sync::Arc; @@ -45,6 +50,7 @@ use tokio::sync::OnceCell; use tracing::{debug, trace, warn}; use super::{ + super::{DEFAULT_STORE_NAME, DEFAULT_TENANT_ID, TENANT_METADATA_KEY, state::ResponsesState}, ListParams, Order, config::{ResponseStoreConfig, StorageBackend, revalidate_postgres_host, validate_config}, list_input_items, @@ -59,16 +65,6 @@ use crate::{ filter::{HttpFilter, HttpFilterContext}, }; -// ----------------------------------------------------------------------------- -// Constants -// ----------------------------------------------------------------------------- - -/// Default tenant identifier for single-tenant deployments. -const DEFAULT_TENANT_ID: &str = "default"; - -/// Metadata key for the per-request tenant identifier. -const TENANT_METADATA_KEY: &str = "responses.tenant_id"; - /// Persists non-streaming Responses API responses to the /// configured response store backend. /// @@ -256,26 +252,86 @@ impl ResponseStoreFilter { } // ----------------------------------------------------------------------------- -// ResponseCapture +// Request / Response Capture // ----------------------------------------------------------------------------- +/// Request-phase data needed when persisting the response. +struct ResponseStoreRequestState { + /// Original `input` value from the Responses API create request. + input: Value, +} + /// Fields extracted from the response JSON for the store record. struct ResponseCapture { - /// Echoed input items from the response. + /// Original request input used by rehydration. input: Value, - /// Model output items. + /// Full message history used by rehydration. messages: Value, } impl ResponseCapture { - /// Extract input and output from a Responses API response object. - fn from_response_json(json: &Value) -> Self { - Self { - input: json.get("input").cloned().unwrap_or(Value::Null), - messages: json.get("output").cloned().unwrap_or(Value::Null), + /// Extract stored input and output from a Responses API exchange. + fn from_response_json(json: &Value, request_input: Option, state_messages: Option>) -> Self { + let input = request_input + .or_else(|| json.get("input").cloned()) + .unwrap_or(Value::Null); + let output = json.get("output").cloned().unwrap_or(Value::Null); + let history_input = state_messages.map_or_else(|| input.clone(), Value::Array); + let messages = assemble_stored_messages(&history_input, &output); + + Self { input, messages } + } +} + +/// Extract the original Responses API request input from the buffered +/// create request body. +fn extract_request_input(body: &Option) -> Option { + let bytes = body.as_ref().filter(|b| !b.is_empty())?; + let json: Value = match serde_json::from_slice(bytes) { + Ok(v) => v, + Err(e) => { + trace!(error = %e, "response store: invalid request JSON"); + return None; + }, + }; + json.get("input").cloned() +} + +/// Build the stored conversation history from response input and output. +fn assemble_stored_messages(input: &Value, output: &Value) -> Value { + let mut messages = Vec::new(); + + append_stored_input_items(&mut messages, input.clone()); + + if !output.is_null() { + if let Some(items) = output.as_array() { + messages.extend(items.iter().cloned()); + } else { + messages.push(output.clone()); } } + + Value::Array(messages) +} + +/// Append stored response input as valid Responses API item params. +fn append_stored_input_items(messages: &mut Vec, input: Value) { + match input { + Value::Null => {}, + Value::String(text) => messages.push(user_message_item(&text)), + Value::Array(items) => messages.extend(items), + other => messages.push(other), + } +} + +/// Build a Responses API user message item from string input. +fn user_message_item(text: &str) -> Value { + serde_json::json!({ + "type": "message", + "role": "user", + "content": text + }) } // ----------------------------------------------------------------------------- @@ -295,6 +351,28 @@ pub(super) fn extract_response_id(path: &str) -> Option<&str> { } } +// ----------------------------------------------------------------------------- +// Registry Helpers +// ----------------------------------------------------------------------------- + +/// Publish the initialized store into the per-request registry so +/// downstream filters (rehydrate, compact, etc.) can read from it. +fn register_store_in_context(ctx: &HttpFilterContext<'_>, store: &Arc) { + let Some(registry) = ctx.response_stores else { + return; + }; + // Known limitation: the first default backend wins for this registry. + // That matches today's one-store-per-listener setup, but a future + // multi-store or live backend migration design should scope this by config. + if registry.get(DEFAULT_STORE_NAME).is_some() { + return; + } + let name: Arc = Arc::from(DEFAULT_STORE_NAME); + if let Err(e) = registry.register(&name, Arc::clone(store)) { + debug!(error = %e, "response store already registered"); + } +} + // ----------------------------------------------------------------------------- // Delete Response Helpers // ----------------------------------------------------------------------------- @@ -337,6 +415,24 @@ fn should_skip(ctx: &HttpFilterContext<'_>) -> bool { is_non_post_request(ctx) || is_non_responses_format(ctx) || is_store_disabled(ctx) || is_streaming_request(ctx) } +/// Check whether this request should initialize the store. +fn should_init_store_for_request(ctx: &HttpFilterContext<'_>) -> bool { + request_will_persist_response(ctx) || request_needs_rehydrate_store(ctx) +} + +/// Check whether this request can persist the eventual response. +fn request_will_persist_response(ctx: &HttpFilterContext<'_>) -> bool { + ctx.request.method == http::Method::POST + && is_responses_format(ctx) + && !is_store_disabled(ctx) + && !is_streaming_request(ctx) +} + +/// Check whether rehydrate needs the store before the request phase. +fn request_needs_rehydrate_store(ctx: &HttpFilterContext<'_>) -> bool { + ctx.request.method == http::Method::POST && is_responses_format(ctx) && has_previous_response_id(ctx) +} + /// Return whether the request method is not persistable. fn is_non_post_request(ctx: &HttpFilterContext<'_>) -> bool { let skip = ctx.request.method != http::Method::POST; @@ -349,13 +445,18 @@ fn is_non_post_request(ctx: &HttpFilterContext<'_>) -> bool { /// Return whether the request is not a Responses API request. fn is_non_responses_format(ctx: &HttpFilterContext<'_>) -> bool { let format = ctx.get_metadata("openai_responses_format.format"); - let skip = format != Some("openai_responses"); + let skip = !is_responses_format(ctx); if skip { trace!(format = ?format, "skipping non-responses format"); } skip } +/// Return whether the request is classified as a Responses API request. +fn is_responses_format(ctx: &HttpFilterContext<'_>) -> bool { + ctx.get_metadata("openai_responses_format.format") == Some("openai_responses") +} + /// Return whether the request explicitly disabled persistence. fn is_store_disabled(ctx: &HttpFilterContext<'_>) -> bool { let skip = ctx.get_metadata("openai_responses_format.store") == Some("false"); @@ -374,6 +475,11 @@ fn is_streaming_request(ctx: &HttpFilterContext<'_>) -> bool { skip } +/// Return whether the request references a previous response. +fn has_previous_response_id(ctx: &HttpFilterContext<'_>) -> bool { + ctx.get_metadata("openai_responses_format.has_previous_response_id") == Some("true") +} + /// Check whether persistence was skipped during the response phase. fn should_skip_persist(ctx: &HttpFilterContext<'_>) -> bool { should_skip(ctx) || ctx.get_metadata("responses.skip_persist") == Some("true") @@ -417,7 +523,12 @@ fn response_is_persistable(ctx: &mut HttpFilterContext<'_>) -> bool { /// Parse a response body into a [`ResponseRecord`], returning /// `None` for invalid JSON or missing required fields. -fn parse_response_record(bytes: &[u8], tenant_id: &str) -> Option { +fn parse_response_record( + bytes: &[u8], + tenant_id: &str, + request_input: Option, + state_messages: Option>, +) -> Option { let json: Value = match serde_json::from_slice(bytes) { Ok(v) => v, Err(e) => { @@ -435,7 +546,7 @@ fn parse_response_record(bytes: &[u8], tenant_id: &str) -> Option BodyAccess { + BodyAccess::ReadOnly + } + fn response_body_access(&self) -> BodyAccess { BodyAccess::ReadOnly } @@ -516,17 +631,48 @@ impl HttpFilter for ResponseStoreFilter { return Ok(FilterAction::Continue); } - if should_skip(ctx) { + if !should_init_store_for_request(ctx) { return Ok(FilterAction::Continue); } - if self.get_or_init_store().await.is_none() { - ctx.set_metadata("responses.skip_persist", "true"); + let will_persist = request_will_persist_response(ctx); + match &self.get_or_init_store().await { + Some(store) => register_store_in_context(ctx, store), + None if will_persist => ctx.set_metadata("responses.skip_persist", "true"), + None => {}, } Ok(FilterAction::Continue) } + /// Eagerly register the store during the body phase so + /// downstream filters running in `StreamBuffer` pre-read + /// (before `on_request`) can access it. + async fn on_request_body( + &self, + ctx: &mut HttpFilterContext<'_>, + body: &mut Option, + end_of_stream: bool, + ) -> Result { + if !end_of_stream || ctx.request.method != http::Method::POST { + return Ok(FilterAction::Continue); + } + if !should_skip(ctx) + && let Some(input) = extract_request_input(body) + { + ctx.insert_filter_state(ResponseStoreRequestState { input }); + } + if should_init_store_for_request(ctx) { + let will_persist = request_will_persist_response(ctx); + match &self.get_or_init_store().await { + Some(store) => register_store_in_context(ctx, store), + None if will_persist => ctx.set_metadata("responses.skip_persist", "true"), + None => {}, + } + } + Ok(FilterAction::Continue) + } + async fn on_response(&self, ctx: &mut HttpFilterContext<'_>) -> Result { if should_skip_persist(ctx) { return Ok(FilterAction::Continue); @@ -572,7 +718,14 @@ impl HttpFilter for ResponseStoreFilter { .get_metadata(TENANT_METADATA_KEY) .unwrap_or(DEFAULT_TENANT_ID) .to_owned(); - let Some(record) = parse_response_record(bytes, &tenant_id) else { + let request_input = ctx + .remove_filter_state::() + .map(|state| state.input); + let state_messages = ctx + .extensions + .get::() + .map(|state| state.messages.clone()); + let Some(record) = parse_response_record(bytes, &tenant_id, request_input, state_messages) else { return Ok(FilterAction::Continue); }; diff --git a/filter/src/builtins/http/ai/openai/responses/store/tests.rs b/filter/src/builtins/http/ai/openai/responses/store/tests.rs index bf854da5..0168a14d 100644 --- a/filter/src/builtins/http/ai/openai/responses/store/tests.rs +++ b/filter/src/builtins/http/ai/openai/responses/store/tests.rs @@ -18,7 +18,7 @@ use super::{ use crate::{ FilterAction, FilterEntry, FilterPipeline, FilterRegistry, body::{BodyAccess, BodyMode}, - builtins::http::ai::store::{ResponseStore as _, SqliteResponseStore}, + builtins::http::ai::store::{ResponseRecord, ResponseStore as _, ResponseStoreRegistry, SqliteResponseStore}, factory::parse_filter_config, filter::{HttpFilter as _, HttpFilterContext}, }; @@ -214,6 +214,16 @@ fn response_body_access_is_read_only() { ); } +#[test] +fn request_body_access_is_read_only() { + let filter = make_filter(); + assert_eq!( + filter.request_body_access(), + BodyAccess::ReadOnly, + "request body access should be ReadOnly so the store is registered before rehydrate" + ); +} + #[test] fn response_body_mode_is_bounded_stream_buffer() { let filter = make_filter(); @@ -231,7 +241,7 @@ fn response_body_mode_is_bounded_stream_buffer() { // ----------------------------------------------------------------------------- #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn on_request_skips_when_format_metadata_absent() { +async fn on_request_does_not_initialize_store_without_format_metadata() { let filter = make_filter(); let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); let mut ctx = crate::test_utils::make_filter_context(&req); @@ -239,16 +249,16 @@ async fn on_request_skips_when_format_metadata_absent() { let action = filter.on_request(&mut ctx).await.unwrap(); assert!( matches!(action, FilterAction::Continue), - "should skip when format metadata is absent" + "should continue when format metadata is absent" ); assert!( filter.store.get().is_none(), - "store should not be initialized when skipped" + "store should not initialize before request classification is available" ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn on_request_skips_when_format_is_openai_chat_completions() { +async fn on_request_does_not_initialize_store_for_non_responses_format() { let filter = make_filter(); let req = crate::test_utils::make_request(http::Method::POST, "/v1/chat/completions"); let mut ctx = crate::test_utils::make_filter_context(&req); @@ -257,16 +267,16 @@ async fn on_request_skips_when_format_is_openai_chat_completions() { let action = filter.on_request(&mut ctx).await.unwrap(); assert!( matches!(action, FilterAction::Continue), - "should skip when format is openai_chat_completions" + "should continue for non-responses format" ); assert!( filter.store.get().is_none(), - "store should not be initialized for non-responses format" + "store should not initialize for non-Responses traffic" ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn on_request_skips_when_store_is_false() { +async fn on_request_does_not_initialize_store_when_store_is_false() { let filter = make_filter(); let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); let mut ctx = crate::test_utils::make_filter_context(&req); @@ -276,16 +286,16 @@ async fn on_request_skips_when_store_is_false() { let action = filter.on_request(&mut ctx).await.unwrap(); assert!( matches!(action, FilterAction::Continue), - "should skip when store is false" + "should continue when store is false" ); assert!( filter.store.get().is_none(), - "store should not be initialized when store=false" + "store should not initialize when persistence and rehydrate are both unnecessary" ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn on_request_skips_when_stream_is_true() { +async fn on_request_does_not_initialize_store_for_streaming_without_previous_response() { let filter = make_filter(); let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); let mut ctx = crate::test_utils::make_filter_context(&req); @@ -295,11 +305,11 @@ async fn on_request_skips_when_stream_is_true() { let action = filter.on_request(&mut ctx).await.unwrap(); assert!( matches!(action, FilterAction::Continue), - "should skip when stream is true" + "should continue for streaming requests" ); assert!( filter.store.get().is_none(), - "store should not be initialized for streaming requests" + "store should not initialize for streaming requests unless rehydrate needs it" ); } @@ -357,6 +367,109 @@ async fn on_request_initializes_store_for_openai_responses_format() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn on_request_initializes_store_for_previous_response_id_even_when_store_false() { + let filter = make_filter(); + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + ctx.set_metadata("openai_responses_format.store", "false"); + ctx.set_metadata("openai_responses_format.has_previous_response_id", "true"); + + let action = filter.on_request(&mut ctx).await.unwrap(); + assert!( + matches!(action, FilterAction::Continue), + "should continue after initializing the store for rehydrate" + ); + assert!( + filter.store.get().and_then(Option::as_ref).is_some(), + "store should initialize when previous_response_id requires rehydrate" + ); +} + +// ----------------------------------------------------------------------------- +// on_request Registry +// ----------------------------------------------------------------------------- + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn on_request_registers_store_in_response_stores() { + let filter = make_filter(); + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + let registry = ResponseStoreRegistry::new(); + ctx.response_stores = Some(®istry); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + + let action = filter.on_request(&mut ctx).await.unwrap(); + assert!( + matches!(action, FilterAction::Continue), + "should continue after registering store" + ); + assert!( + registry.get("default").is_some(), + "store should be registered as 'default' in response_stores" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn on_request_skips_registration_when_no_registry() { + let filter = make_filter(); + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + + let action = filter.on_request(&mut ctx).await.unwrap(); + assert!( + matches!(action, FilterAction::Continue), + "should continue even without registry" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn on_request_body_registers_store_for_previous_response_id_even_when_store_false() { + let filter = make_filter(); + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + let registry = ResponseStoreRegistry::new(); + ctx.response_stores = Some(®istry); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + ctx.set_metadata("openai_responses_format.store", "false"); + ctx.set_metadata("openai_responses_format.has_previous_response_id", "true"); + let mut body = Some(Bytes::from_static( + br#"{"model":"gpt-4.1","input":"Hi","store":false,"previous_response_id":"resp_prev"}"#, + )); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + assert!( + matches!(action, FilterAction::Continue), + "request body phase should continue after registering the store" + ); + assert!( + registry.get("default").is_some(), + "store should register so rehydrate can fetch previous_response_id" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn on_request_body_does_not_initialize_store_for_store_false_without_previous_response() { + let filter = make_filter(); + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + ctx.set_metadata("openai_responses_format.store", "false"); + let mut body = Some(Bytes::from_static(br#"{"model":"gpt-4.1","input":"Hi","store":false}"#)); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + assert!( + matches!(action, FilterAction::Continue), + "request body phase should continue for store=false without rehydrate" + ); + assert!( + filter.store.get().is_none(), + "store should not initialize when neither persistence nor rehydrate needs it" + ); +} + // ----------------------------------------------------------------------------- // on_response // ----------------------------------------------------------------------------- @@ -748,8 +861,125 @@ async fn on_response_body_persists_valid_response() { "persisted input should be extracted from the response" ); assert_eq!( - record.messages, body_json["output"], - "persisted messages should be extracted from the response output" + record.messages, + json!([ + {"role": "user", "content": "Hello"}, + {"type": "message", "content": "Hello"} + ]), + "persisted messages should preserve input before output for rehydration" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn on_response_body_persists_string_input_as_message_item() { + let filter = make_filter(); + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + + drop(filter.on_request(&mut ctx).await.unwrap()); + + let store_opt = filter.store.get().expect("store OnceCell should be initialized"); + assert!(store_opt.is_some(), "store should be initialized"); + + let body_json = json!({ + "id": "resp_string_input", + "created_at": 1_719_900_000, + "model": "gpt-4.1", + "status": "completed", + "input": "Hello", + "output": [{"type": "message", "content": "Hi"}] + }); + let mut body = Some(Bytes::from(serde_json::to_vec(&body_json).unwrap())); + + let action = filter.on_response_body(&mut ctx, &mut body, true).unwrap(); + assert!( + matches!(action, FilterAction::Continue), + "should continue after spawning persist task" + ); + + let store = store_opt.as_ref().unwrap(); + let record = store + .get_response("default", "resp_string_input") + .await + .expect("get_response should succeed") + .expect("record should exist after persist"); + + assert_eq!( + record.input, body_json["input"], + "persisted input should preserve the response input" + ); + assert_eq!( + record.messages, + json!([ + {"type": "message", "role": "user", "content": "Hello"}, + {"type": "message", "content": "Hi"} + ]), + "persisted messages should normalize string input before output" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn on_response_body_uses_request_input_when_response_omits_input() { + let filter = make_filter(); + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + ctx.current_filter_id = Some(7); + + let request_input = json!([{"role": "user", "content": "Captured request input"}]); + let request_json = json!({ + "model": "gpt-4.1", + "input": request_input + }); + let mut request_body = Some(Bytes::from(serde_json::to_vec(&request_json).unwrap())); + let request_action = filter.on_request_body(&mut ctx, &mut request_body, true).await.unwrap(); + assert!( + matches!(request_action, FilterAction::Continue), + "request body phase should capture input and continue" + ); + + let store_opt = filter.store.get().expect("store OnceCell should be initialized"); + assert!(store_opt.is_some(), "store should be initialized"); + + let response_json = json!({ + "id": "resp_no_echoed_input", + "created_at": 1_719_900_000, + "model": "gpt-4.1", + "status": "completed", + "output": [{"type": "message", "content": "Stored output"}] + }); + let mut response_body = Some(Bytes::from(serde_json::to_vec(&response_json).unwrap())); + + ctx.current_filter_id = Some(7); + let response_action = filter.on_response_body(&mut ctx, &mut response_body, true).unwrap(); + assert!( + matches!(response_action, FilterAction::Continue), + "response body phase should persist and continue" + ); + + let store = store_opt.as_ref().unwrap(); + let record = store + .get_response("default", "resp_no_echoed_input") + .await + .expect("get_response should succeed") + .expect("record should exist after persist"); + + assert_eq!( + record.response_object, response_json, + "stored response object should remain the backend response" + ); + assert_eq!( + record.input, request_input, + "stored input should come from the original request" + ); + assert_eq!( + record.messages, + json!([ + {"role": "user", "content": "Captured request input"}, + {"type": "message", "content": "Stored output"} + ]), + "stored messages should combine request input with response output" ); } @@ -815,8 +1045,7 @@ async fn pipeline_persists_after_format_request_body_classification() { "created_at": 1_719_900_000, "model": "gpt-4.1", "status": "completed", - "input": [{"role": "user", "content": "Hello"}], - "output": [] + "output": [{"type": "message", "content": "Hi"}] }); let mut response_body = Some(Bytes::from(serde_json::to_vec(&response_json).unwrap())); let response_body_action = pipeline @@ -836,7 +1065,193 @@ async fn pipeline_persists_after_format_request_body_classification() { .unwrap() .expect("pipeline should persist the response after body classification"); assert_eq!(record.response_object, response_json); - assert_eq!(record.input, response_json["input"]); + assert_eq!(record.input, request_json["input"]); + assert_eq!( + record.messages, + json!([ + {"role": "user", "content": "Hello"}, + {"type": "message", "content": "Hi"} + ]) + ); + + drop(store); + drop(pipeline); + cleanup_sqlite_file(&db_path); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pipeline_non_responses_post_does_not_open_sqlite_store() { + let (db_url, db_path) = temp_sqlite_url("pipeline_non_responses_post_does_not_open_sqlite_store"); + + let mut entries: Vec = serde_yaml::from_str(&format!( + r#" +- filter: openai_responses_format +- filter: openai_response_store + backend: sqlite + database_url: "{db_url}" + responses_table: test_responses + conversations_table: test_conversations +"# + )) + .unwrap(); + let registry = FilterRegistry::with_builtins(); + let pipeline = FilterPipeline::build(&mut entries, ®istry).unwrap(); + + let req = crate::test_utils::make_request(http::Method::POST, "/v1/chat/completions"); + let mut ctx = crate::test_utils::make_filter_context(&req); + + let request_json = json!({ + "model": "gpt-4.1", + "messages": [{"role": "user", "content": "Hello"}] + }); + let mut request_body = Some(Bytes::from(serde_json::to_vec(&request_json).unwrap())); + let request_body_action = pipeline + .execute_http_request_body(&mut ctx, &mut request_body, true) + .await + .unwrap(); + assert!( + matches!(request_body_action, FilterAction::Release), + "format classifier should release the buffered request body" + ); + assert_eq!( + ctx.get_metadata("openai_responses_format.format"), + Some("openai_chat_completions"), + "format classifier should mark Chat Completions traffic" + ); + + let request_action = pipeline.execute_http_request(&mut ctx).await.unwrap(); + assert!( + matches!(request_action, FilterAction::Continue), + "request phase should continue without opening the response store" + ); + assert!( + !db_path.exists(), + "non-Responses POST should not create the SQLite response store file" + ); + + drop(pipeline); + cleanup_sqlite_file(&db_path); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pipeline_persists_rehydrated_messages_when_response_omits_input() { + let (db_url, db_path) = temp_sqlite_url("pipeline_persists_rehydrated_messages"); + let seeded_store = SqliteResponseStore::new(&db_url, "test_responses", "test_conversations", None) + .await + .unwrap(); + seeded_store + .upsert_response(&ResponseRecord { + id: "resp_prev".to_owned(), + tenant_id: "default".to_owned(), + created_at: 1_719_800_000, + model: "gpt-4.1".to_owned(), + response_object: json!({ + "id": "resp_prev", + "created_at": 1_719_800_000, + "model": "gpt-4.1", + "status": "completed", + "output": [{"type": "message", "role": "assistant", "content": "Hi"}] + }), + input: json!("Hello"), + messages: json!([ + {"type": "message", "role": "user", "content": "Hello"}, + {"type": "message", "role": "assistant", "content": "Hi"} + ]), + }) + .await + .unwrap(); + drop(seeded_store); + + let mut entries: Vec = serde_yaml::from_str(&format!( + r#" +- filter: openai_responses_format +- filter: openai_response_store + backend: sqlite + database_url: "{db_url}" + responses_table: test_responses + conversations_table: test_conversations +- filter: openai_responses_rehydrate +"# + )) + .unwrap(); + let registry = FilterRegistry::with_builtins(); + let mut pipeline = FilterPipeline::build(&mut entries, ®istry).unwrap(); + pipeline.set_response_stores(ResponseStoreRegistry::new()); + + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.response_stores = pipeline.response_stores(); + + let request_json = json!({ + "model": "gpt-4.1", + "input": "What next?", + "previous_response_id": "resp_prev" + }); + let mut request_body = Some(Bytes::from(serde_json::to_vec(&request_json).unwrap())); + let request_body_action = pipeline + .execute_http_request_body(&mut ctx, &mut request_body, true) + .await + .unwrap(); + assert!( + matches!(request_body_action, FilterAction::Release), + "request body phase should classify, register the store, and rehydrate" + ); + + let request_action = pipeline.execute_http_request(&mut ctx).await.unwrap(); + assert!( + matches!(request_action, FilterAction::Continue), + "request phase should continue after pre-read rehydration" + ); + + let mut resp = crate::test_utils::make_response(); + resp.headers + .insert(http::header::CONTENT_TYPE, "application/json".parse().unwrap()); + ctx.response_header = Some(&mut resp); + let response_action = pipeline.execute_http_response(&mut ctx).await.unwrap(); + assert!( + matches!(response_action, FilterAction::Continue), + "response phase should arm persistence buffering" + ); + ctx.response_header = None; + + let response_json = json!({ + "id": "resp_next", + "created_at": 1_719_900_000, + "model": "gpt-4.1", + "status": "completed", + "output": [{"type": "message", "role": "assistant", "content": "Next answer"}] + }); + let mut response_body = Some(Bytes::from(serde_json::to_vec(&response_json).unwrap())); + let response_body_action = pipeline + .execute_http_response_body(&mut ctx, &mut response_body, true) + .unwrap(); + assert!( + matches!(response_body_action, FilterAction::Continue), + "response body phase should persist and continue" + ); + + let store = SqliteResponseStore::new(&db_url, "test_responses", "test_conversations", None) + .await + .unwrap(); + let record = store + .get_response("default", "resp_next") + .await + .unwrap() + .expect("pipeline should persist the rehydrated response"); + assert_eq!( + record.input, request_json["input"], + "stored input should remain the current request input" + ); + assert_eq!( + record.messages, + json!([ + {"type": "message", "role": "user", "content": "Hello"}, + {"type": "message", "role": "assistant", "content": "Hi"}, + {"type": "message", "role": "user", "content": "What next?"}, + {"type": "message", "role": "assistant", "content": "Next answer"} + ]), + "stored messages should preserve previous turns, current input, and output" + ); drop(store); drop(pipeline); @@ -2420,7 +2835,7 @@ async fn init_store_and_seed(filter: &ResponseStoreFilter, id: &str, tenant_id: .get_or_init(|| async { filter.build_store().await.ok() }) .await; let store = store_opt.as_ref().expect("store should be initialized"); - let record = crate::builtins::http::ai::store::ResponseRecord { + let record = ResponseRecord { id: id.to_owned(), tenant_id: tenant_id.to_owned(), created_at: 1000, diff --git a/filter/src/builtins/http/ai/store/mod.rs b/filter/src/builtins/http/ai/store/mod.rs index d52ba4cf..8feeb0e3 100644 --- a/filter/src/builtins/http/ai/store/mod.rs +++ b/filter/src/builtins/http/ai/store/mod.rs @@ -93,6 +93,12 @@ impl ResponseStoreRegistry { pub fn get(&self, name: &str) -> Option> { self.stores.get(name).map(|r| Arc::clone(r.value())) } + + /// Return whether two registry handles share the same backing storage. + #[must_use] + pub fn shares_storage_with(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.stores, &other.stores) + } } impl Default for ResponseStoreRegistry { diff --git a/filter/src/builtins/http/ai/store/tests.rs b/filter/src/builtins/http/ai/store/tests.rs index fa7dd324..8eb1632b 100644 --- a/filter/src/builtins/http/ai/store/tests.rs +++ b/filter/src/builtins/http/ai/store/tests.rs @@ -642,6 +642,26 @@ fn registry_default_is_empty() { ); } +#[test] +fn registry_clone_shares_storage() { + let registry = ResponseStoreRegistry::new(); + let cloned = registry.clone(); + assert!( + registry.shares_storage_with(&cloned), + "cloned registry handles should share backing storage" + ); +} + +#[test] +fn registry_new_has_independent_storage() { + let first = ResponseStoreRegistry::new(); + let second = ResponseStoreRegistry::new(); + assert!( + !first.shares_storage_with(&second), + "independent registries should not share backing storage" + ); +} + // ----------------------------------------------------------------------------- // PostgreSQL Backend (requires running instance, DATABASE_URL env var) // ----------------------------------------------------------------------------- diff --git a/server/src/reload.rs b/server/src/reload.rs index 95f17408..1a7e7a46 100644 --- a/server/src/reload.rs +++ b/server/src/reload.rs @@ -44,7 +44,6 @@ pub(crate) fn reload_pipelines( live: &ListenerPipelines, health_shutdown: &Arc>, kv_stores: &praxis_core::kv::KvStoreRegistry, - #[cfg(feature = "ai-inference")] response_stores: &praxis_filter::ResponseStoreRegistry, ) -> Result<(), Box> { info!("building new pipelines from reloaded config"); @@ -54,6 +53,8 @@ pub(crate) fn reload_pipelines( } let health_registry = build_health_registry(&new_config.clusters); + #[cfg(feature = "ai-inference")] + let response_stores = praxis_filter::ResponseStoreRegistry::new(); let new_pipelines = match resolve_pipelines( new_config, @@ -61,7 +62,7 @@ pub(crate) fn reload_pipelines( &health_registry, kv_stores, #[cfg(feature = "ai-inference")] - response_stores, + &response_stores, ) { Ok(p) => p, Err(e) => { @@ -320,8 +321,6 @@ mod tests { &live, &shutdown, &empty_kv_stores(), - #[cfg(feature = "ai-inference")] - &empty_response_stores(), ); assert!(result.is_ok(), "valid reload should succeed"); @@ -329,6 +328,47 @@ mod tests { assert_ne!(old_ptr, new_ptr, "pipeline pointer should change after reload"); } + #[cfg(feature = "ai-inference")] + #[test] + fn reload_uses_fresh_response_store_registry() { + let config = response_store_config(); + let registry = FilterRegistry::with_builtins(); + let health_registry: HealthRegistry = Arc::new(HashMap::new()); + let initial_response_stores = empty_response_stores(); + let live = resolve_pipelines( + &config, + ®istry, + &health_registry, + &empty_kv_stores(), + &initial_response_stores, + ) + .unwrap(); + let old_pipeline = live.get("web").unwrap().load(); + let old_pipeline_response_stores = old_pipeline + .response_stores() + .expect("initial pipeline should have response stores"); + assert!( + old_pipeline_response_stores.shares_storage_with(&initial_response_stores), + "initial pipeline should use the startup response-store registry" + ); + + let shutdown = Arc::new(Mutex::new(CancellationToken::new())); + reload_pipelines(&config, &config, ®istry, &live, &shutdown, &empty_kv_stores()).unwrap(); + + let new_pipeline = live.get("web").unwrap().load(); + let new_pipeline_response_stores = new_pipeline + .response_stores() + .expect("reloaded pipeline should have response stores"); + assert!( + !new_pipeline_response_stores.shares_storage_with(&initial_response_stores), + "reloaded pipeline should not reuse the startup response-store registry" + ); + assert!( + !new_pipeline_response_stores.shares_storage_with(old_pipeline_response_stores), + "reloaded pipeline should not inherit stores registered by the old pipeline" + ); + } + #[test] fn invalid_filter_returns_err_old_pipeline_untouched() { let (live, old_config, registry, shutdown) = setup_live_pipelines(); @@ -355,8 +395,6 @@ filter_chains: &live, &shutdown, &empty_kv_stores(), - #[cfg(feature = "ai-inference")] - &empty_response_stores(), ); assert!(result.is_err(), "invalid filter should return Err"); @@ -377,8 +415,6 @@ filter_chains: &live, &shutdown, &empty_kv_stores(), - #[cfg(feature = "ai-inference")] - &empty_response_stores(), ) .unwrap(); @@ -401,8 +437,6 @@ filter_chains: &live, &shutdown, &empty_kv_stores(), - #[cfg(feature = "ai-inference")] - &empty_response_stores(), ) .unwrap(); @@ -440,8 +474,6 @@ filter_chains: &live, &shutdown, &empty_kv_stores(), - #[cfg(feature = "ai-inference")] - &empty_response_stores(), ); assert!( !old_token.is_cancelled(), @@ -478,8 +510,6 @@ filter_chains: &live, &shutdown, &empty_kv_stores(), - #[cfg(feature = "ai-inference")] - &empty_response_stores(), ); assert!(result.is_ok(), "reload with new listener should succeed"); assert!( @@ -633,6 +663,29 @@ filter_chains: .unwrap() } + /// Config containing a response store for reload registry tests. + #[cfg(feature = "ai-inference")] + fn response_store_config() -> Config { + Config::from_yaml( + r#" +listeners: + - name: web + address: "127.0.0.1:8080" + filter_chains: [main] +filter_chains: + - name: main + filters: + - filter: openai_responses_format + - filter: openai_response_store + backend: sqlite + database_url: "sqlite::memory:" + responses_table: test_responses + conversations_table: test_conversations +"#, + ) + .unwrap() + } + /// Set up live pipelines, registry, and shutdown token for reload tests. fn setup_live_pipelines() -> (ListenerPipelines, Config, FilterRegistry, Arc>) { let config = valid_config(); diff --git a/server/src/server.rs b/server/src/server.rs index 13eebd30..1a29c548 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -107,9 +107,6 @@ struct ServerState { kv_stores: praxis_core::kv::KvStoreRegistry, /// Health check cancellation token. health_shutdown: Arc>, - /// Response store registry. - #[cfg(feature = "ai-inference")] - response_stores: praxis_filter::ResponseStoreRegistry, } /// Build filter pipelines, health checks, and registries. @@ -138,8 +135,6 @@ fn build_server_state(config: &Config, registry: &FilterRegistry) -> ServerState health_registry, kv_stores, health_shutdown, - #[cfg(feature = "ai-inference")] - response_stores, } } @@ -187,8 +182,6 @@ fn spawn_watcher( kv_stores: state.kv_stores, pipelines: Arc::clone(&state.pipelines), registry: Arc::new(registry), - #[cfg(feature = "ai-inference")] - response_stores: state.response_stores, shutdown: CancellationToken::new(), }); Some(handle) diff --git a/server/src/watcher.rs b/server/src/watcher.rs index d6d87435..999f915c 100644 --- a/server/src/watcher.rs +++ b/server/src/watcher.rs @@ -31,6 +31,10 @@ use crate::reload::reload_pipelines; /// Debounce window for filesystem events. const DEBOUNCE_MS: u64 = 500; +/// Test-only startup wait for the background notify watcher. +#[cfg(test)] +const WATCHER_STARTUP_MS: u64 = 750; + // ----------------------------------------------------------------------------- // WatcherParams // ----------------------------------------------------------------------------- @@ -49,10 +53,6 @@ pub(crate) struct WatcherParams { /// KV store registry, preserved across reloads. pub(crate) kv_stores: praxis_core::kv::KvStoreRegistry, - /// Response store registry, preserved across reloads. - #[cfg(feature = "ai-inference")] - pub(crate) response_stores: praxis_filter::ResponseStoreRegistry, - /// Live pipeline storage, swapped atomically on reload. pub(crate) pipelines: Arc, @@ -121,8 +121,6 @@ async fn run_event_loop(rx: &mut mpsc::Receiver<()>, params: &WatcherParams) { ¶ms.pipelines, ¶ms.health_shutdown, ¶ms.kv_stores, - #[cfg(feature = "ai-inference")] - ¶ms.response_stores, ); } () = params.shutdown.cancelled() => { @@ -146,7 +144,6 @@ fn handle_reload( pipelines: &ListenerPipelines, health_shutdown: &Arc>, kv_stores: &praxis_core::kv::KvStoreRegistry, - #[cfg(feature = "ai-inference")] response_stores: &praxis_filter::ResponseStoreRegistry, ) { let content = match std::fs::read_to_string(config_path) { Ok(c) => c, @@ -179,8 +176,6 @@ fn handle_reload( pipelines, health_shutdown, kv_stores, - #[cfg(feature = "ai-inference")] - response_stores, ) { Ok(()) => { *current_config = new_config; @@ -315,8 +310,6 @@ mod tests { kv_stores: praxis_core::kv::KvStoreRegistry::new(), pipelines, registry, - #[cfg(feature = "ai-inference")] - response_stores: empty_response_stores(), shutdown: shutdown.clone(), }); @@ -358,12 +351,10 @@ mod tests { kv_stores: praxis_core::kv::KvStoreRegistry::new(), pipelines: Arc::clone(&pipelines), registry: Arc::clone(®istry), - #[cfg(feature = "ai-inference")] - response_stores: empty_response_stores(), shutdown: shutdown.clone(), }); - std::thread::sleep(Duration::from_millis(200)); + std::thread::sleep(Duration::from_millis(WATCHER_STARTUP_MS)); std::fs::write(&config_path, VALID_YAML_CHANGED).unwrap(); @@ -409,12 +400,10 @@ mod tests { kv_stores: praxis_core::kv::KvStoreRegistry::new(), pipelines: Arc::clone(&pipelines), registry: Arc::clone(®istry), - #[cfg(feature = "ai-inference")] - response_stores: empty_response_stores(), shutdown: shutdown.clone(), }); - std::thread::sleep(Duration::from_millis(200)); + std::thread::sleep(Duration::from_millis(WATCHER_STARTUP_MS)); std::fs::write(&config_path, "invalid: [[[yaml").unwrap(); @@ -489,8 +478,6 @@ mod tests { kv_stores, pipelines, registry, - #[cfg(feature = "ai-inference")] - response_stores: empty_response_stores(), shutdown: shutdown.clone(), }); diff --git a/tests/integration/tests/suite/examples/openai_response_store.rs b/tests/integration/tests/suite/examples/openai_response_store.rs index 5de4cbc6..2f656982 100644 --- a/tests/integration/tests/suite/examples/openai_response_store.rs +++ b/tests/integration/tests/suite/examples/openai_response_store.rs @@ -89,7 +89,17 @@ async fn response_store_persists_response_to_sqlite() { let messages: serde_json::Value = serde_json::from_str(&messages_raw).expect("messages column should be valid JSON"); let items = messages.as_array().expect("messages should be an array"); - assert_eq!(items.len(), 1, "messages should have one output item"); + assert_eq!( + items.len(), + 2, + "messages should include normalized input plus output for rehydration" + ); + assert_eq!( + items[0], + serde_json::json!({"type": "message", "role": "user", "content": "Hello"}), + "string input should be normalized as a message item" + ); + assert_eq!(items[1]["type"], "message", "output item should be preserved"); drop(proxy); cleanup_sqlite_files(&db_path); diff --git a/tests/integration/tests/suite/examples/openai_response_store_postgres.rs b/tests/integration/tests/suite/examples/openai_response_store_postgres.rs index 3b69275f..2053f3cc 100644 --- a/tests/integration/tests/suite/examples/openai_response_store_postgres.rs +++ b/tests/integration/tests/suite/examples/openai_response_store_postgres.rs @@ -109,7 +109,16 @@ async fn response_store_persists_response_to_postgres() { let messages: serde_json::Value = serde_json::from_str(&messages_raw).expect("messages column should be valid JSON"); let items = messages.as_array().expect("messages should be an array"); - assert_eq!(items.len(), 1, "messages should have one output item"); + assert_eq!( + items.len(), + 2, + "messages should include normalized input plus output for rehydration" + ); + assert_eq!( + items[0], + serde_json::json!({"type": "message", "role": "user", "content": "Hello from postgres"}), + "first message should be the normalized user input" + ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/tests/utils/src/proxy.rs b/tests/utils/src/proxy.rs index e954aebd..4d287b9c 100644 --- a/tests/utils/src/proxy.rs +++ b/tests/utils/src/proxy.rs @@ -50,6 +50,7 @@ fn resolve_listener_pipeline(config: &Config, listener: &Listener, registry: &Fi config.insecure_options.allow_unbounded_body, ) .unwrap(); + pipeline.set_response_stores(praxis_filter::ResponseStoreRegistry::new()); Arc::new(pipeline) } diff --git a/tls/src/client_auth.rs b/tls/src/client_auth.rs index 0297c498..85f16a91 100644 --- a/tls/src/client_auth.rs +++ b/tls/src/client_auth.rs @@ -120,7 +120,6 @@ fn load_ca_root_store(ca_path: &str) -> Result { detail: e.to_string(), })?); - let certs: Vec<_> = rustls_pemfile::certs(&mut &ca_pem[..]) .collect::, _>>() .map_err(|e| TlsError::FileLoadError { From c3a6e2e553380d9567534e46e553941741ece261 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Han?= Date: Mon, 22 Jun 2026 15:39:31 +0200 Subject: [PATCH 2/2] fix(test): remove rehydrate-dependent test from store infra MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The pipeline_persists_rehydrated_messages_when_response_omits_input test depends on the openai_responses_rehydrate filter which lives in the rehydrate PR (#604). Move it there. Signed-off-by: Sébastien Han --- .../http/ai/openai/responses/store/tests.rs | 125 ------------------ 1 file changed, 125 deletions(-) diff --git a/filter/src/builtins/http/ai/openai/responses/store/tests.rs b/filter/src/builtins/http/ai/openai/responses/store/tests.rs index 0168a14d..fe2a32b5 100644 --- a/filter/src/builtins/http/ai/openai/responses/store/tests.rs +++ b/filter/src/builtins/http/ai/openai/responses/store/tests.rs @@ -1133,131 +1133,6 @@ async fn pipeline_non_responses_post_does_not_open_sqlite_store() { cleanup_sqlite_file(&db_path); } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn pipeline_persists_rehydrated_messages_when_response_omits_input() { - let (db_url, db_path) = temp_sqlite_url("pipeline_persists_rehydrated_messages"); - let seeded_store = SqliteResponseStore::new(&db_url, "test_responses", "test_conversations", None) - .await - .unwrap(); - seeded_store - .upsert_response(&ResponseRecord { - id: "resp_prev".to_owned(), - tenant_id: "default".to_owned(), - created_at: 1_719_800_000, - model: "gpt-4.1".to_owned(), - response_object: json!({ - "id": "resp_prev", - "created_at": 1_719_800_000, - "model": "gpt-4.1", - "status": "completed", - "output": [{"type": "message", "role": "assistant", "content": "Hi"}] - }), - input: json!("Hello"), - messages: json!([ - {"type": "message", "role": "user", "content": "Hello"}, - {"type": "message", "role": "assistant", "content": "Hi"} - ]), - }) - .await - .unwrap(); - drop(seeded_store); - - let mut entries: Vec = serde_yaml::from_str(&format!( - r#" -- filter: openai_responses_format -- filter: openai_response_store - backend: sqlite - database_url: "{db_url}" - responses_table: test_responses - conversations_table: test_conversations -- filter: openai_responses_rehydrate -"# - )) - .unwrap(); - let registry = FilterRegistry::with_builtins(); - let mut pipeline = FilterPipeline::build(&mut entries, ®istry).unwrap(); - pipeline.set_response_stores(ResponseStoreRegistry::new()); - - let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); - let mut ctx = crate::test_utils::make_filter_context(&req); - ctx.response_stores = pipeline.response_stores(); - - let request_json = json!({ - "model": "gpt-4.1", - "input": "What next?", - "previous_response_id": "resp_prev" - }); - let mut request_body = Some(Bytes::from(serde_json::to_vec(&request_json).unwrap())); - let request_body_action = pipeline - .execute_http_request_body(&mut ctx, &mut request_body, true) - .await - .unwrap(); - assert!( - matches!(request_body_action, FilterAction::Release), - "request body phase should classify, register the store, and rehydrate" - ); - - let request_action = pipeline.execute_http_request(&mut ctx).await.unwrap(); - assert!( - matches!(request_action, FilterAction::Continue), - "request phase should continue after pre-read rehydration" - ); - - let mut resp = crate::test_utils::make_response(); - resp.headers - .insert(http::header::CONTENT_TYPE, "application/json".parse().unwrap()); - ctx.response_header = Some(&mut resp); - let response_action = pipeline.execute_http_response(&mut ctx).await.unwrap(); - assert!( - matches!(response_action, FilterAction::Continue), - "response phase should arm persistence buffering" - ); - ctx.response_header = None; - - let response_json = json!({ - "id": "resp_next", - "created_at": 1_719_900_000, - "model": "gpt-4.1", - "status": "completed", - "output": [{"type": "message", "role": "assistant", "content": "Next answer"}] - }); - let mut response_body = Some(Bytes::from(serde_json::to_vec(&response_json).unwrap())); - let response_body_action = pipeline - .execute_http_response_body(&mut ctx, &mut response_body, true) - .unwrap(); - assert!( - matches!(response_body_action, FilterAction::Continue), - "response body phase should persist and continue" - ); - - let store = SqliteResponseStore::new(&db_url, "test_responses", "test_conversations", None) - .await - .unwrap(); - let record = store - .get_response("default", "resp_next") - .await - .unwrap() - .expect("pipeline should persist the rehydrated response"); - assert_eq!( - record.input, request_json["input"], - "stored input should remain the current request input" - ); - assert_eq!( - record.messages, - json!([ - {"type": "message", "role": "user", "content": "Hello"}, - {"type": "message", "role": "assistant", "content": "Hi"}, - {"type": "message", "role": "user", "content": "What next?"}, - {"type": "message", "role": "assistant", "content": "Next answer"} - ]), - "stored messages should preserve previous turns, current input, and output" - ); - - drop(store); - drop(pipeline); - cleanup_sqlite_file(&db_path); -} - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn store_init_failure_is_permanent() { let yaml: serde_yaml::Value = serde_yaml::from_str(