diff --git a/docs/filters/http/ai/openai_responses_rehydrate.md b/docs/filters/http/ai/openai_responses_rehydrate.md new file mode 100644 index 00000000..1119ca9a --- /dev/null +++ b/docs/filters/http/ai/openai_responses_rehydrate.md @@ -0,0 +1,18 @@ + + + +# `openai_responses_rehydrate` + +Validates `previous_response_id` by fetching the stored response, confirming its status is `"completed"`, and populating `ResponsesState` with the full conversation history (stored turns + current input). + +Requires Cargo feature: `ai-inference`. + +## Configuration Notes + +The request body is **not** modified; downstream filters read from `ResponsesState.messages` instead. + +## Example + +```yaml +filter: openai_responses_rehydrate +``` diff --git a/docs/filters/reference.md b/docs/filters/reference.md index fb89bb4c..7843458f 100644 --- a/docs/filters/reference.md +++ b/docs/filters/reference.md @@ -22,6 +22,7 @@ Built-in filters organized by protocol and category. | [`openai_response_store`](http/ai/openai_response_store.md) | `ai-inference` | Persists non-streaming Responses API responses to the configured response store backend. | | [`openai_responses_format`](http/ai/openai_responses_format.md) | `ai-inference` | Classifies AI API request bodies and promotes routing facts to headers, metadata, and filter results without mutating the body. | | [`openai_responses_model_rewrite`](http/ai/openai_responses_model_rewrite.md) | `ai-inference` | Rewrites the `model` field in Responses API request bodies. | +| [`openai_responses_rehydrate`](http/ai/openai_responses_rehydrate.md) | `ai-inference` | Validates `previous_response_id` by fetching the stored response, confirming its status is `"completed"`, and populating `ResponsesState` with the full conversation history (stored turns + current input). | | [`openai_responses_validate`](http/ai/openai_responses_validate.md) | `ai-inference` | Validates and enriches Responses API requests. | | [`prompt_enrich`](http/ai/prompt_enrich.md) | `ai-inference` | Injects statically configured messages into the `messages` array of OpenAI-compatible chat completion request bodies. | | [`token_usage_headers`](http/ai/token_usage_headers.md) | - | Injects `Praxis-Token-Input`, `Praxis-Token-Output`, and `Praxis-Token-Total` headers into downstream responses when token usage data is present in [`filter_metadata`]. | diff --git a/examples/README.md b/examples/README.md index 0a8cb8f6..74dc481e 100644 --- a/examples/README.md +++ b/examples/README.md @@ -36,6 +36,7 @@ page. | [format-routing.yaml](configs/ai/openai/responses/format-routing.yaml) | Routes AI API traffic by detected body format | | [full-flow.yaml](configs/ai/openai/responses/full-flow.yaml) | Combines format classification, request validation, and backend routing into a single pipeline | | [model-rewrite.yaml](configs/ai/openai/responses/model-rewrite.yaml) | Rewrites or injects the top-level `model` field in Responses API request bodies before forwarding to the inference backend | +| [rehydrate.yaml](configs/ai/openai/responses/rehydrate.yaml) | Validates `previous_response_id` by fetching the stored response, confirming its status is completed, and promoting the ID to filter metadata | | [request-validate.yaml](configs/ai/openai/responses/request-validate.yaml) | Validates Responses API requests and rejects invalid parameter combinations | | [response-store.yaml](configs/ai/openai/responses/response-store.yaml) | Persists non-streaming Responses API responses to a database and serves stored data via GET endpoints and handles DELETE /v1/responses/{id} locally | | [responses-routing.yaml](configs/ai/openai/responses/responses-routing.yaml) | Routes Responses API traffic by detected mode | diff --git a/examples/configs/ai/openai/responses/full-flow.yaml b/examples/configs/ai/openai/responses/full-flow.yaml index aa63abf7..31111734 100644 --- a/examples/configs/ai/openai/responses/full-flow.yaml +++ b/examples/configs/ai/openai/responses/full-flow.yaml @@ -13,13 +13,19 @@ # and conversation IDs, and writes responses.* metadata for # downstream filters. Skips non-Responses requests gracefully. # -# 3. The router sends all valid Responses API create requests to +# 3. openai_response_store persists non-streaming responses to +# SQLite and registers the store backend so downstream filters +# (rehydrate, compact) can read from it. +# +# 4. openai_responses_rehydrate loads conversation context from +# previous_response_id by fetching stored responses and +# prepending their message history to the current request. +# Passthrough for requests without previous_response_id. +# +# 5. The router sends all valid Responses API create requests to # the same inference backend. The promoted mode remains available # as headers/metadata for downstream filters; it does not change -# backend selection in this example. Once #354 orchestration -# filters land, stateful requests could use a separate filter -# chain with response_store, rehydrate, tool_parse, etc. before -# reaching the same inference backend. +# backend selection in this example. # # A request is stateful when any of these hold: # - previous_response_id is set @@ -75,8 +81,8 @@ filter_chains: database_url: "sqlite://responses.db?mode=rwc" responses_table: openai_responses conversations_table: openai_conversations + - filter: openai_responses_rehydrate # Future #354 orchestration filters would follow here: - # - filter: rehydrate # - filter: compact # - filter: tool_parse # - filter: responses_proxy diff --git a/examples/configs/ai/openai/responses/rehydrate.yaml b/examples/configs/ai/openai/responses/rehydrate.yaml new file mode 100644 index 00000000..76d8ce05 --- /dev/null +++ b/examples/configs/ai/openai/responses/rehydrate.yaml @@ -0,0 +1,63 @@ +# Rehydrate +# +# Validates `previous_response_id` by fetching the stored +# response, confirming its status is completed, and +# promoting the ID to filter metadata. Body construction +# from the resolved conversation is handled downstream by +# the `responses_proxy` filter. +# +# Pipeline ordering: +# 1. openai_responses_format — classifies body, promotes metadata +# 2. openai_responses_validate — validates parameters, generates IDs +# 3. openai_response_store — initializes store, registers in ctx +# 4. openai_responses_rehydrate — validates previous response, sets metadata +# +# The response_store filter must appear before openai_responses_rehydrate so its +# store backend is registered in `ctx.response_stores` during +# `on_request`. +# +# Example requests: +# +# # First turn (stored) +# curl -X POST http://localhost:8080/v1/responses \ +# -H "Content-Type: application/json" \ +# -d '{"model":"gpt-4.1","input":"Hello"}' +# +# # Second turn (rehydrated — uses previous_response_id) +# curl -X POST http://localhost:8080/v1/responses \ +# -H "Content-Type: application/json" \ +# -d '{"model":"gpt-4.1","input":"What next?","previous_response_id":"resp_abc"}' +# +# Requires the ai-inference feature: +# cargo build -p praxis --features ai-inference + +listeners: + - name: ai-gateway + address: "127.0.0.1:8080" + filter_chains: [responses-pipeline] + +filter_chains: + - name: responses-pipeline + filters: + - filter: openai_responses_format + + - filter: openai_responses_validate + + - filter: openai_response_store + backend: sqlite + database_url: "sqlite://responses.db?mode=rwc" + responses_table: openai_responses + conversations_table: openai_conversations + + - filter: openai_responses_rehydrate + + - filter: router + routes: + - path: "/v1/responses" + cluster: "inference-backend" + + - filter: load_balancer + clusters: + - name: "inference-backend" + endpoints: + - "127.0.0.1:8000" diff --git a/filter/src/builtins/http/ai/mod.rs b/filter/src/builtins/http/ai/mod.rs index 48bc0f83..de49b5d2 100644 --- a/filter/src/builtins/http/ai/mod.rs +++ b/filter/src/builtins/http/ai/mod.rs @@ -49,6 +49,8 @@ pub use openai::ModelRewriteFilter; #[cfg(feature = "ai-inference")] pub use openai::OpenaiResponsesValidateFilter; #[cfg(feature = "ai-inference")] +pub use openai::RehydrateFilter; +#[cfg(feature = "ai-inference")] pub use openai::ResponseStoreFilter; #[cfg(feature = "ai-inference")] pub use openai::ResponsesFormatFilter; diff --git a/filter/src/builtins/http/ai/openai/mod.rs b/filter/src/builtins/http/ai/openai/mod.rs index 75e41876..fee54e13 100644 --- a/filter/src/builtins/http/ai/openai/mod.rs +++ b/filter/src/builtins/http/ai/openai/mod.rs @@ -9,4 +9,6 @@ pub(crate) mod responses; pub use responses::ModelRewriteFilter; #[cfg(feature = "ai-inference")] pub use responses::OpenaiResponsesValidateFilter; +#[cfg(feature = "ai-inference")] +pub use responses::RehydrateFilter; pub use responses::{ResponseStoreFilter, ResponsesFormatFilter}; diff --git a/filter/src/builtins/http/ai/openai/responses/mod.rs b/filter/src/builtins/http/ai/openai/responses/mod.rs index c49a9116..c3074296 100644 --- a/filter/src/builtins/http/ai/openai/responses/mod.rs +++ b/filter/src/builtins/http/ai/openai/responses/mod.rs @@ -421,8 +421,12 @@ fn promote_boolean_results( Ok(()) } +#[cfg(feature = "ai-inference")] +pub(crate) mod rehydrate; #[cfg(feature = "ai-inference")] pub(crate) mod validate; +#[cfg(feature = "ai-inference")] +pub use rehydrate::RehydrateFilter; #[cfg(feature = "ai-inference")] pub use validate::OpenaiResponsesValidateFilter; diff --git a/filter/src/builtins/http/ai/openai/responses/rehydrate/mod.rs b/filter/src/builtins/http/ai/openai/responses/rehydrate/mod.rs new file mode 100644 index 00000000..4eac8671 --- /dev/null +++ b/filter/src/builtins/http/ai/openai/responses/rehydrate/mod.rs @@ -0,0 +1,315 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2026 Praxis Contributors + +//! Rehydrate filter: validates `previous_response_id` by +//! fetching the stored response, confirming its status is +//! `"completed"`, and populating [`ResponsesState`] with the +//! full conversation history (stored turns + current input). +//! +//! The request body is **not** modified; downstream filters +//! read from `ResponsesState.messages` instead. +//! +//! [`ResponsesState`]: super::state::ResponsesState + +use async_trait::async_trait; +use bytes::Bytes; +use serde_json::Value; +use tracing::{debug, warn}; + +use super::state::ResponsesState; +use crate::{ + FilterAction, FilterError, Rejection, + body::{BodyAccess, BodyMode, limits::MAX_JSON_BODY_BYTES}, + builtins::http::ai::store::ResponseRecord, + factory::parse_filter_config, + filter::{HttpFilter, HttpFilterContext}, +}; + +// ----------------------------------------------------------------------------- +// Constants +// ----------------------------------------------------------------------------- + +/// Default store name to look up in the registry. +const DEFAULT_STORE_NAME: &str = "default"; + +/// Metadata key for the tenant identifier. +const TENANT_METADATA_KEY: &str = "responses.tenant_id"; + +/// Default tenant identifier. +const DEFAULT_TENANT_ID: &str = "default"; + +// ----------------------------------------------------------------------------- +// RehydrateFilter +// ----------------------------------------------------------------------------- + +/// Validates `previous_response_id` by fetching the stored +/// response, confirming its status is `"completed"`, and +/// populating `ResponsesState` with the full conversation +/// history (stored turns + current input). +/// +/// The request body is **not** modified; downstream filters +/// read from `ResponsesState.messages` instead. +/// +/// # YAML +/// +/// ```yaml +/// filter: openai_responses_rehydrate +/// ``` +pub struct RehydrateFilter; + +impl RehydrateFilter { + /// Create a filter from YAML config. + /// + /// This filter has no configuration fields. + /// + /// # Errors + /// + /// Returns [`FilterError`] if the YAML config contains unknown fields. + pub fn from_config(config: &serde_yaml::Value) -> Result, FilterError> { + let empty = serde_yaml::Value::Mapping(serde_yaml::Mapping::new()); + let cfg = if config.is_null() { &empty } else { config }; + let _validated: RehydrateConfig = parse_filter_config("openai_responses_rehydrate", cfg)?; + Ok(Box::new(Self)) + } +} + +/// Empty YAML configuration for [`RehydrateFilter`]. +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +#[expect( + clippy::empty_structs_with_brackets, + reason = "serde cannot deserialize a map into a unit struct" +)] +struct RehydrateConfig {} + +#[async_trait] +impl HttpFilter for RehydrateFilter { + fn name(&self) -> &'static str { + "openai_responses_rehydrate" + } + + fn request_body_access(&self) -> BodyAccess { + BodyAccess::ReadOnly + } + + /// `StreamBuffer` so the protocol layer assembles the complete + /// request body before delivering it at end-of-stream. + fn request_body_mode(&self) -> BodyMode { + BodyMode::StreamBuffer { + max_bytes: Some(MAX_JSON_BODY_BYTES), + } + } + + async fn on_request(&self, _ctx: &mut HttpFilterContext<'_>) -> Result { + Ok(FilterAction::Continue) + } + + async fn on_request_body( + &self, + ctx: &mut HttpFilterContext<'_>, + body: &mut Option, + end_of_stream: bool, + ) -> Result { + if !end_of_stream { + return Ok(FilterAction::Continue); + } + + if ctx.request.method != http::Method::POST { + return Ok(FilterAction::Continue); + } + + if is_responses_cancel_path(ctx.request.uri.path()) { + return Ok(FilterAction::Release); + } + + if ctx.get_metadata("openai_responses_format.format") != Some("openai_responses") { + return Ok(FilterAction::Release); + } + + validate_previous_response(ctx, body).await + } +} + +/// Return whether this request targets the body-less Responses cancel endpoint. +fn is_responses_cancel_path(path: &str) -> bool { + let path = path.strip_suffix('/').filter(|p| !p.is_empty()).unwrap_or(path); + let mut segments = path.split('/'); + + matches!( + ( + segments.next(), + segments.next(), + segments.next(), + segments.next(), + segments.next(), + segments.next(), + ), + (Some(""), Some("v1"), Some("responses"), Some(response_id), Some("cancel"), None) + if !response_id.is_empty() + ) +} + +/// Parse body, fetch stored response, validate status, +/// populate [`ResponsesState`], and promote metadata. +async fn validate_previous_response( + ctx: &mut HttpFilterContext<'_>, + body: &Option, +) -> Result { + let Some(bytes) = body.as_ref() else { + return Ok(FilterAction::Release); + }; + + let (parsed_body, prev_id) = match parse_body_and_extract_id(bytes) { + Ok((body, Some(id))) => (body, id), + Ok((_, None)) => return Ok(FilterAction::Release), + Err(action) => return Ok(action), + }; + + let tenant_id = ctx + .get_metadata(TENANT_METADATA_KEY) + .unwrap_or(DEFAULT_TENANT_ID) + .to_owned(); + + let record = match fetch_previous_response(ctx, &tenant_id, &prev_id).await { + Ok(r) => r, + Err(action) => return Ok(action), + }; + + if let Err(action) = validate_response_status(&record) { + return Ok(action); + } + + let mut state = ResponsesState::from_request_body(parsed_body); + // TODO(#697): enforce a max rehydrated history size before cloning stored messages. + let stored_messages = record.messages.as_array().cloned().unwrap_or_default(); + state.messages.splice(0..0, stored_messages); + ctx.extensions.insert(state); + + debug!(previous_response_id = %prev_id, "previous response validated, state populated"); + ctx.set_metadata("responses.previous_response_id", prev_id); + + Ok(FilterAction::Release) +} + +/// Parse the request body and extract `previous_response_id`. +/// +/// Returns the parsed body alongside the optional ID so callers +/// can reuse it for [`ResponsesState`] construction. +fn parse_body_and_extract_id(bytes: &[u8]) -> Result<(Value, Option), FilterAction> { + let parsed: Value = serde_json::from_slice(bytes).map_err(|e| { + debug!(error = %e, "rehydrate: invalid request JSON"); + reject_invalid(&format!("invalid request body: {e}")) + })?; + + let id = match parsed.get("previous_response_id") { + None | Some(Value::Null) => None, + Some(Value::String(s)) => Some(s.clone()), + Some(_) => return Err(reject_invalid("previous_response_id must be a string")), + }; + + Ok((parsed, id)) +} + +// ----------------------------------------------------------------------------- +// Fetch & Validate +// ----------------------------------------------------------------------------- + +/// Fetch the previous response record from the store. +async fn fetch_previous_response( + ctx: &HttpFilterContext<'_>, + tenant_id: &str, + prev_id: &str, +) -> Result { + let registry = ctx.response_stores.ok_or_else(|| { + warn!("rehydrate: response store registry not available"); + reject_server_error("response store is not available") + })?; + + let store = registry.get(DEFAULT_STORE_NAME).ok_or_else(|| { + warn!("rehydrate: default response store not registered"); + reject_server_error("response store is not available") + })?; + + let record = store.get_response(tenant_id, prev_id).await.map_err(|e| { + warn!(error = %e, "rehydrate: failed to fetch previous response"); + reject_server_error("failed to fetch previous response") + })?; + + record.ok_or_else(|| { + debug!(id = %prev_id, "rehydrate: previous response not found"); + reject_invalid(&format!("response '{prev_id}' not found")) + }) +} + +/// Validate that the stored response has status `"completed"`. +fn validate_response_status(record: &ResponseRecord) -> Result<(), FilterAction> { + let status = record + .response_object + .get("status") + .and_then(Value::as_str) + .unwrap_or("unknown"); + + if status != "completed" { + return Err(reject_invalid(&format!( + "cannot continue from response with status '{status}'" + ))); + } + + Ok(()) +} + +// ----------------------------------------------------------------------------- +// Rejection Helpers +// ----------------------------------------------------------------------------- + +/// Build a 400 rejection with a JSON error body. +fn reject_invalid(message: &str) -> FilterAction { + let body = serde_json::json!({ + "error": { + "message": message, + "type": "invalid_request_error" + } + }) + .to_string(); + + FilterAction::Reject( + Rejection::status(400) + .with_header("content-type", "application/json") + .with_body(Bytes::from(body)), + ) +} + +/// Build a 500 rejection with a JSON error body. +fn reject_server_error(message: &str) -> FilterAction { + let body = serde_json::json!({ + "error": { + "message": message, + "type": "server_error" + } + }) + .to_string(); + + FilterAction::Reject( + Rejection::status(500) + .with_header("content-type", "application/json") + .with_body(Bytes::from(body)), + ) +} + +// ----------------------------------------------------------------------------- +// Tests +// ----------------------------------------------------------------------------- + +#[cfg(test)] +#[expect(clippy::allow_attributes, reason = "blanket test suppressions")] +#[allow( + clippy::unwrap_used, + clippy::expect_used, + clippy::indexing_slicing, + clippy::panic, + clippy::needless_raw_strings, + clippy::needless_raw_string_hashes, + clippy::too_many_lines, + reason = "tests" +)] +mod tests; diff --git a/filter/src/builtins/http/ai/openai/responses/rehydrate/tests.rs b/filter/src/builtins/http/ai/openai/responses/rehydrate/tests.rs new file mode 100644 index 00000000..9b7e4fcf --- /dev/null +++ b/filter/src/builtins/http/ai/openai/responses/rehydrate/tests.rs @@ -0,0 +1,594 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2026 Praxis Contributors + +use std::sync::Arc; + +use bytes::Bytes; +use serde_json::json; + +use super::*; +use crate::{ + FilterAction, FilterEntry, FilterPipeline, FilterRegistry, + builtins::http::ai::store::{ + ConversationRecord, ResponseRecord, ResponseStore, ResponseStoreRegistry, SqliteResponseStore, StoreError, + }, +}; + +// ----------------------------------------------------------------------------- +// from_config +// ----------------------------------------------------------------------------- + +#[test] +fn from_config_succeeds() { + let filter = RehydrateFilter::from_config(&serde_yaml::Value::Null).unwrap(); + assert_eq!( + filter.name(), + "openai_responses_rehydrate", + "filter name should match convention" + ); +} + +#[test] +fn unknown_field_rejected() { + let yaml: serde_yaml::Value = serde_yaml::from_str("unexpected: true").unwrap(); + let result = RehydrateFilter::from_config(&yaml); + assert!( + result.is_err(), + "unknown fields should be rejected by deny_unknown_fields" + ); +} + +#[test] +fn body_access_is_read_only() { + let filter = RehydrateFilter; + assert_eq!( + filter.request_body_access(), + BodyAccess::ReadOnly, + "filter should use read-only body access" + ); +} + +// ----------------------------------------------------------------------------- +// Bypass +// ----------------------------------------------------------------------------- + +#[tokio::test] +async fn skips_non_post_request() { + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::GET, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + let mut body = Some(Bytes::from(r#"{"input":"test"}"#)); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + assert!(matches!(action, FilterAction::Continue), "non-POST should continue"); +} + +#[tokio::test] +async fn skips_non_responses_format() { + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::POST, "/v1/chat/completions"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.set_metadata("openai_responses_format.format", "openai_chat_completions"); + let mut body = Some(Bytes::from(r#"{"messages":[]}"#)); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + assert!( + matches!(action, FilterAction::Release), + "non-responses format should release" + ); +} + +#[tokio::test] +async fn continues_on_non_end_of_stream() { + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + let mut body = Some(Bytes::from(r#"{"input":"partial"}"#)); + + let action = filter.on_request_body(&mut ctx, &mut body, false).await.unwrap(); + assert!( + matches!(action, FilterAction::Continue), + "non-end-of-stream should continue" + ); +} + +#[tokio::test] +async fn skips_cancel_request_without_parsing_empty_body() { + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses/resp_123/cancel"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + let mut body = Some(Bytes::new()); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + assert!( + matches!(action, FilterAction::Release), + "cancel request should bypass rehydrate even with an empty stream-buffer body" + ); + assert_eq!(body.as_ref().unwrap().len(), 0, "empty body should stay unchanged"); + assert!( + ctx.extensions.get::().is_none(), + "ResponsesState should not be set for cancel requests" + ); +} + +// ----------------------------------------------------------------------------- +// Passthrough +// ----------------------------------------------------------------------------- + +#[tokio::test] +async fn passthrough_when_no_previous_response_id() { + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + let original = r#"{"model":"gpt-4.1","input":"Hello"}"#; + let mut body = Some(Bytes::from(original)); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + assert!( + matches!(action, FilterAction::Release), + "should release when no previous_response_id" + ); + assert_eq!( + body.as_ref().unwrap().as_ref(), + original.as_bytes(), + "body should be unchanged" + ); + assert!( + ctx.extensions.get::().is_none(), + "ResponsesState should not be set without previous_response_id" + ); +} + +#[tokio::test] +async fn passthrough_when_previous_response_id_is_null() { + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + let mut body = Some(Bytes::from( + r#"{"model":"gpt-4.1","input":"Hello","previous_response_id":null}"#, + )); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + assert!( + matches!(action, FilterAction::Release), + "should release when previous_response_id is null" + ); +} + +// ----------------------------------------------------------------------------- +// Validation + Metadata +// ----------------------------------------------------------------------------- + +#[tokio::test] +async fn validates_previous_response_and_sets_metadata() { + let messages = json!([ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there"} + ]); + let store = MockStore::with_completed_response("resp_prev", json!("Hello"), messages); + let registry = setup_registry(store); + + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.response_stores = Some(®istry); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + let original = r#"{"model":"gpt-4.1","input":"What next?","previous_response_id":"resp_prev"}"#; + let mut body = Some(Bytes::from(original)); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + assert!( + matches!(action, FilterAction::Release), + "should release after validation" + ); + + assert_eq!( + body.as_ref().unwrap().as_ref(), + original.as_bytes(), + "body should not be modified" + ); + assert_eq!( + ctx.get_metadata("responses.previous_response_id"), + Some("resp_prev"), + "should set previous_response_id metadata" + ); + + let state = ctx + .extensions + .get::() + .expect("ResponsesState should be populated"); + assert_eq!( + state.messages.len(), + 3, + "messages should contain 2 stored + 1 current input" + ); + assert_eq!(state.messages[0]["role"], "user", "first stored message"); + assert_eq!(state.messages[1]["role"], "assistant", "second stored message"); + assert_eq!( + state.messages[2]["content"], "What next?", + "current input should be last" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pipeline_validates_during_cold_request_body_pre_read() { + let (db_url, db_path) = temp_sqlite_url("rehydrate_cold_pre_read"); + let seeded_store = SqliteResponseStore::new(&db_url, "test_responses", "test_conversations", None) + .await + .unwrap(); + seeded_store + .upsert_response(&ResponseRecord { + id: "resp_prev".to_owned(), + tenant_id: "default".to_owned(), + created_at: 1000, + model: "gpt-4.1".to_owned(), + response_object: json!({ + "id": "resp_prev", + "status": "completed", + "output": [{"type": "message", "role": "assistant", "content": "Hi"}] + }), + input: json!("Hello"), + messages: json!([ + {"type": "message", "role": "user", "content": "Hello"}, + {"type": "message", "role": "assistant", "content": "Hi"} + ]), + }) + .await + .unwrap(); + drop(seeded_store); + + let mut entries: Vec = serde_yaml::from_str(&format!( + r#" +- filter: openai_responses_format +- filter: openai_response_store + backend: sqlite + database_url: "{db_url}" + responses_table: test_responses + conversations_table: test_conversations +- filter: openai_responses_rehydrate +"# + )) + .unwrap(); + let registry = FilterRegistry::with_builtins(); + let mut pipeline = FilterPipeline::build(&mut entries, ®istry).unwrap(); + pipeline.set_response_stores(ResponseStoreRegistry::new()); + + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.response_stores = pipeline.response_stores(); + + drop(pipeline.execute_http_request(&mut ctx).await.unwrap()); + + let original = r#"{"model":"gpt-4.1","input":"What next?","previous_response_id":"resp_prev"}"#; + let mut body = Some(Bytes::from(original)); + + let action = pipeline + .execute_http_request_body(&mut ctx, &mut body, true) + .await + .unwrap(); + assert!( + matches!(action, FilterAction::Release), + "on_request should register store so rehydrate finds it in on_request_body" + ); + + assert_eq!( + body.as_ref().unwrap().as_ref(), + original.as_bytes(), + "body should not be modified by rehydrate filter" + ); + assert_eq!( + ctx.get_metadata("responses.previous_response_id"), + Some("resp_prev"), + "previous_response_id should be promoted to metadata" + ); + + let state = ctx + .extensions + .get::() + .expect("ResponsesState should be populated in pipeline"); + assert_eq!( + state.messages.len(), + 3, + "messages should contain 2 stored + 1 current input" + ); + assert_eq!(state.messages[0]["role"], "user", "first stored message"); + assert_eq!(state.messages[1]["role"], "assistant", "second stored message"); + assert_eq!( + state.messages[2]["content"], "What next?", + "current input should be last" + ); + + drop(pipeline); + cleanup_sqlite_file(&db_path); +} + +// ----------------------------------------------------------------------------- +// Rejections +// ----------------------------------------------------------------------------- + +#[tokio::test] +async fn rejects_when_previous_response_not_found() { + let store = MockStore::empty(); + let registry = setup_registry(store); + + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.response_stores = Some(®istry); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + let mut body = Some(Bytes::from(r#"{"input":"Hi","previous_response_id":"resp_missing"}"#)); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + match action { + FilterAction::Reject(r) => assert_eq!(r.status, 400, "should reject with 400"), + other => panic!("expected Reject, got {other:?}"), + } +} + +#[tokio::test] +async fn rejects_when_status_not_completed() { + let store = MockStore::with_status("resp_123", "in_progress"); + let registry = setup_registry(store); + + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.response_stores = Some(®istry); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + let mut body = Some(Bytes::from(r#"{"input":"Hi","previous_response_id":"resp_123"}"#)); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + match action { + FilterAction::Reject(r) => assert_eq!(r.status, 400, "should reject non-completed status"), + other => panic!("expected Reject, got {other:?}"), + } +} + +#[tokio::test] +async fn rejects_when_status_incomplete() { + let store = MockStore::with_status("resp_123", "incomplete"); + let registry = setup_registry(store); + + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.response_stores = Some(®istry); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + let mut body = Some(Bytes::from(r#"{"input":"Hi","previous_response_id":"resp_123"}"#)); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + match action { + FilterAction::Reject(r) => assert_eq!(r.status, 400, "should reject incomplete status"), + other => panic!("expected Reject, got {other:?}"), + } +} + +#[tokio::test] +async fn rejects_when_status_failed() { + let store = MockStore::with_status("resp_123", "failed"); + let registry = setup_registry(store); + + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.response_stores = Some(®istry); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + let mut body = Some(Bytes::from(r#"{"input":"Hi","previous_response_id":"resp_123"}"#)); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + match action { + FilterAction::Reject(r) => assert_eq!(r.status, 400, "should reject failed status"), + other => panic!("expected Reject, got {other:?}"), + } +} + +#[tokio::test] +async fn rejects_when_store_unavailable() { + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + let mut body = Some(Bytes::from(r#"{"input":"Hi","previous_response_id":"resp_123"}"#)); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + match action { + FilterAction::Reject(r) => assert_eq!(r.status, 500, "should reject with 500 when store unavailable"), + other => panic!("expected Reject, got {other:?}"), + } +} + +#[tokio::test] +async fn rejects_when_store_not_registered() { + let registry = ResponseStoreRegistry::new(); + + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.response_stores = Some(®istry); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + let mut body = Some(Bytes::from(r#"{"input":"Hi","previous_response_id":"resp_123"}"#)); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + match action { + FilterAction::Reject(r) => assert_eq!(r.status, 500, "should reject with 500 when store not registered"), + other => panic!("expected Reject, got {other:?}"), + } +} + +#[tokio::test] +async fn rejects_invalid_json_body() { + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + let mut body = Some(Bytes::from("not json")); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + match action { + FilterAction::Reject(r) => assert_eq!(r.status, 400, "should reject invalid JSON"), + other => panic!("expected Reject, got {other:?}"), + } +} + +#[tokio::test] +async fn rejects_non_string_previous_response_id() { + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + let mut body = Some(Bytes::from(r#"{"input":"Hi","previous_response_id":123}"#)); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + match action { + FilterAction::Reject(r) => assert_eq!(r.status, 400, "should reject non-string previous_response_id"), + other => panic!("expected Reject, got {other:?}"), + } +} + +#[tokio::test] +async fn rejects_when_store_fetch_fails() { + let store = MockStore::failing(); + let registry = setup_registry(store); + + let filter = RehydrateFilter; + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.response_stores = Some(®istry); + ctx.set_metadata("openai_responses_format.format", "openai_responses"); + let mut body = Some(Bytes::from(r#"{"input":"Hi","previous_response_id":"resp_123"}"#)); + + let action = filter.on_request_body(&mut ctx, &mut body, true).await.unwrap(); + match action { + FilterAction::Reject(r) => assert_eq!(r.status, 500, "should reject with 500 on store error"), + other => panic!("expected Reject, got {other:?}"), + } +} + +// ----------------------------------------------------------------------------- +// Test Utilities +// ----------------------------------------------------------------------------- + +struct MockStore { + records: std::collections::HashMap, + should_fail: bool, +} + +impl MockStore { + fn with_completed_response(id: &str, input: Value, messages: Value) -> Self { + let mut records = std::collections::HashMap::new(); + records.insert( + id.to_owned(), + ResponseRecord { + id: id.to_owned(), + tenant_id: "default".to_owned(), + created_at: 1000, + model: "gpt-4.1".to_owned(), + response_object: json!({ + "id": id, + "status": "completed", + "output": [{"type": "message", "content": [{"type": "output_text", "text": "Hi"}]}] + }), + input, + messages, + }, + ); + Self { + records, + should_fail: false, + } + } + + fn with_status(id: &str, status: &str) -> Self { + let mut records = std::collections::HashMap::new(); + records.insert( + id.to_owned(), + ResponseRecord { + id: id.to_owned(), + tenant_id: "default".to_owned(), + created_at: 1000, + model: "gpt-4.1".to_owned(), + response_object: json!({"id": id, "status": status}), + input: json!("Hello"), + messages: json!([]), + }, + ); + Self { + records, + should_fail: false, + } + } + + fn empty() -> Self { + Self { + records: std::collections::HashMap::new(), + should_fail: false, + } + } + + fn failing() -> Self { + Self { + records: std::collections::HashMap::new(), + should_fail: true, + } + } +} + +#[async_trait::async_trait] +impl ResponseStore for MockStore { + async fn upsert_response(&self, _record: &ResponseRecord) -> Result<(), StoreError> { + Ok(()) + } + + async fn get_response(&self, _tenant_id: &str, id: &str) -> Result, StoreError> { + if self.should_fail { + return Err(StoreError::Unavailable("mock failure".to_owned())); + } + Ok(self.records.get(id).cloned()) + } + + async fn delete_response(&self, _tenant_id: &str, _id: &str) -> Result { + Ok(false) + } + + async fn upsert_conversation(&self, _record: &ConversationRecord) -> Result<(), StoreError> { + Ok(()) + } + + async fn get_conversation( + &self, + _tenant_id: &str, + _conversation_id: &str, + ) -> Result, StoreError> { + Ok(None) + } + + async fn delete_conversation(&self, _tenant_id: &str, _conversation_id: &str) -> Result { + Ok(false) + } +} + +fn setup_registry(store: MockStore) -> ResponseStoreRegistry { + let registry = ResponseStoreRegistry::new(); + let name: Arc = Arc::from("default"); + registry.register(&name, Arc::new(store)).unwrap(); + registry +} + +fn temp_sqlite_url(test_name: &str) -> (String, std::path::PathBuf) { + use std::time::{SystemTime, UNIX_EPOCH}; + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time should be after epoch") + .as_nanos(); + let db_path = std::env::temp_dir().join(format!("praxis_{test_name}_{}_{}.db", std::process::id(), nanos)); + (format!("sqlite://{}?mode=rwc", db_path.display()), db_path) +} + +fn cleanup_sqlite_file(db_path: &std::path::Path) { + drop(std::fs::remove_file(db_path)); + drop(std::fs::remove_file(format!("{}-shm", db_path.display()))); + drop(std::fs::remove_file(format!("{}-wal", db_path.display()))); +} diff --git a/filter/src/builtins/http/ai/openai/responses/store/filter.rs b/filter/src/builtins/http/ai/openai/responses/store/filter.rs index 37bb9904..400d344a 100644 --- a/filter/src/builtins/http/ai/openai/responses/store/filter.rs +++ b/filter/src/builtins/http/ai/openai/responses/store/filter.rs @@ -361,9 +361,10 @@ fn register_store_in_context(ctx: &HttpFilterContext<'_>, store: &Arc = serde_yaml::from_str(&format!( + r#" +- filter: openai_responses_format +- filter: openai_response_store + backend: sqlite + database_url: "{db_url}" + responses_table: test_responses + conversations_table: test_conversations +- filter: openai_responses_rehydrate +"# + )) + .unwrap(); + let registry = FilterRegistry::with_builtins(); + let mut pipeline = FilterPipeline::build(&mut entries, ®istry).unwrap(); + pipeline.set_response_stores(ResponseStoreRegistry::new()); + + let req = crate::test_utils::make_request(http::Method::POST, "/v1/responses"); + let mut ctx = crate::test_utils::make_filter_context(&req); + ctx.response_stores = pipeline.response_stores(); + + let request_json = json!({ + "model": "gpt-4.1", + "input": "What next?", + "previous_response_id": "resp_prev" + }); + let mut request_body = Some(Bytes::from(serde_json::to_vec(&request_json).unwrap())); + let request_body_action = pipeline + .execute_http_request_body(&mut ctx, &mut request_body, true) + .await + .unwrap(); + assert!( + matches!(request_body_action, FilterAction::Release), + "request body phase should classify, register the store, and rehydrate" + ); + + let request_action = pipeline.execute_http_request(&mut ctx).await.unwrap(); + assert!( + matches!(request_action, FilterAction::Continue), + "request phase should continue after pre-read rehydration" + ); + + let mut resp = crate::test_utils::make_response(); + resp.headers + .insert(http::header::CONTENT_TYPE, "application/json".parse().unwrap()); + ctx.response_header = Some(&mut resp); + let response_action = pipeline.execute_http_response(&mut ctx).await.unwrap(); + assert!( + matches!(response_action, FilterAction::Continue), + "response phase should arm persistence buffering" + ); + ctx.response_header = None; + + let response_json = json!({ + "id": "resp_next", + "created_at": 1_719_900_000, + "model": "gpt-4.1", + "status": "completed", + "output": [{"type": "message", "role": "assistant", "content": "Next answer"}] + }); + let mut response_body = Some(Bytes::from(serde_json::to_vec(&response_json).unwrap())); + let response_body_action = pipeline + .execute_http_response_body(&mut ctx, &mut response_body, true) + .unwrap(); + assert!( + matches!(response_body_action, FilterAction::Continue), + "response body phase should persist and continue" + ); + + let store = SqliteResponseStore::new(&db_url, "test_responses", "test_conversations", None) + .await + .unwrap(); + let record = store + .get_response("default", "resp_next") + .await + .unwrap() + .expect("pipeline should persist the rehydrated response"); + assert_eq!( + record.input, request_json["input"], + "stored input should remain the current request input" + ); + assert_eq!( + record.messages, + json!([ + {"type": "message", "role": "user", "content": "Hello"}, + {"type": "message", "role": "assistant", "content": "Hi"}, + {"type": "message", "role": "user", "content": "What next?"}, + {"type": "message", "role": "assistant", "content": "Next answer"} + ]), + "stored messages should preserve previous turns, current input, and output" + ); + + drop(store); + drop(pipeline); + cleanup_sqlite_file(&db_path); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn store_init_failure_is_permanent() { let yaml: serde_yaml::Value = serde_yaml::from_str( diff --git a/filter/src/builtins/http/ai/store/types.rs b/filter/src/builtins/http/ai/store/types.rs index 944f6c7a..675435b2 100644 --- a/filter/src/builtins/http/ai/store/types.rs +++ b/filter/src/builtins/http/ai/store/types.rs @@ -15,7 +15,7 @@ use std::fmt; /// messages used for multi-turn conversation rehydration. JSON /// columns use [`serde_json::Value`] — the store is intentionally /// schema-agnostic about their contents. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct ResponseRecord { /// Unique response ID (e.g., `"resp_abc123"`). pub id: String, diff --git a/filter/src/builtins/http/mod.rs b/filter/src/builtins/http/mod.rs index f1622522..6e53b844 100644 --- a/filter/src/builtins/http/mod.rs +++ b/filter/src/builtins/http/mod.rs @@ -32,6 +32,8 @@ pub use ai::OpenaiResponsesValidateFilter; #[cfg(feature = "ai-inference")] pub use ai::PromptEnrichFilter; #[cfg(feature = "ai-inference")] +pub use ai::RehydrateFilter; +#[cfg(feature = "ai-inference")] pub use ai::ResponseStoreFilter; #[cfg(feature = "ai-inference")] pub use ai::ResponseStoreRegistry; diff --git a/filter/src/builtins/mod.rs b/filter/src/builtins/mod.rs index 08b8525e..a7df1861 100644 --- a/filter/src/builtins/mod.rs +++ b/filter/src/builtins/mod.rs @@ -27,6 +27,8 @@ pub use http::OpenaiResponsesValidateFilter; #[cfg(feature = "ai-inference")] pub use http::PromptEnrichFilter; #[cfg(feature = "ai-inference")] +pub use http::RehydrateFilter; +#[cfg(feature = "ai-inference")] pub use http::ResponseStoreFilter; #[cfg(feature = "ai-inference")] pub use http::ResponseStoreRegistry; diff --git a/filter/src/lib.rs b/filter/src/lib.rs index d1d6a0e8..450a7a29 100644 --- a/filter/src/lib.rs +++ b/filter/src/lib.rs @@ -42,6 +42,8 @@ pub use builtins::OpenaiResponsesValidateFilter; #[cfg(feature = "ai-inference")] pub use builtins::PromptEnrichFilter; #[cfg(feature = "ai-inference")] +pub use builtins::RehydrateFilter; +#[cfg(feature = "ai-inference")] pub use builtins::ResponseStoreRegistry; #[cfg(feature = "ai-inference")] pub use builtins::ResponsesFormatFilter; diff --git a/filter/src/registry.rs b/filter/src/registry.rs index 392995d3..38f5a861 100644 --- a/filter/src/registry.rs +++ b/filter/src/registry.rs @@ -216,6 +216,12 @@ fn register_http_builtins(factories: &mut HashMap) { "openai_response_store", crate::builtins::ResponseStoreFilter::from_config, ); + #[cfg(feature = "ai-inference")] + register_http( + factories, + "openai_responses_rehydrate", + crate::builtins::RehydrateFilter::from_config, + ); } /// Register a single HTTP filter factory by name. @@ -383,6 +389,11 @@ mod tests { names.contains(&"openai_response_store"), "response_store should be registered" ); + #[cfg(feature = "ai-inference")] + assert!( + names.contains(&"openai_responses_rehydrate"), + "openai_responses_rehydrate should be registered" + ); } #[test] diff --git a/tests/integration/Cargo.toml b/tests/integration/Cargo.toml index c983a611..79d892e4 100644 --- a/tests/integration/Cargo.toml +++ b/tests/integration/Cargo.toml @@ -9,7 +9,7 @@ publish = false [features] default = ["ai-inference"] -ai-inference = ["praxis-filter/ai-inference"] +ai-inference = ["praxis-filter/ai-inference", "praxis-test-utils/ai-inference"] no-mac-cert-rotation-tests = [] # We use this to disable testing cert rotation on macOS [lints] diff --git a/tests/integration/tests/suite/examples/mod.rs b/tests/integration/tests/suite/examples/mod.rs index 43a73515..e466b534 100644 --- a/tests/integration/tests/suite/examples/mod.rs +++ b/tests/integration/tests/suite/examples/mod.rs @@ -53,6 +53,8 @@ mod prompt_enrichment; mod protocols; mod redirect; #[cfg(feature = "ai-inference")] +mod rehydrate; +#[cfg(feature = "ai-inference")] mod responses_routing; mod round_robin; mod session_affinity; diff --git a/tests/integration/tests/suite/examples/rehydrate.rs b/tests/integration/tests/suite/examples/rehydrate.rs new file mode 100644 index 00000000..946b7ab7 --- /dev/null +++ b/tests/integration/tests/suite/examples/rehydrate.rs @@ -0,0 +1,141 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2026 Praxis Contributors + +//! Functional tests for the `rehydrate` example config. + +use std::collections::HashMap; + +use praxis_test_utils::{ + Backend, example_config_path, free_port, http_send, json_post, parse_body, parse_status, patch_yaml, + start_echo_backend, start_proxy, +}; + +// ----------------------------------------------------------------------------- +// Constants +// ----------------------------------------------------------------------------- + +/// Backend response for the first turn — stored by response_store. +const FIRST_RESPONSE_JSON: &str = r#"{"id":"resp_first","created_at":1000,"model":"gpt-4.1","object":"response","status":"completed","input":"Hello","output":[{"type":"message","content":[{"type":"output_text","text":"Hi there"}]}]}"#; + +// ----------------------------------------------------------------------------- +// Tests +// ----------------------------------------------------------------------------- + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn rehydrate_validates_previous_response_and_passes_body_through() { + let backend_guard = Backend::fixed(FIRST_RESPONSE_JSON) + .header("content-type", "application/json") + .start_with_shutdown(); + let proxy_port = free_port(); + + let (db_url, db_path) = temp_sqlite_url("rehydrate"); + let yaml = std::fs::read_to_string(example_config_path("ai/openai/responses/rehydrate.yaml")) + .expect("example config should exist"); + let patched = patch_yaml( + &yaml.replace("sqlite://responses.db?mode=rwc", &db_url), + proxy_port, + &HashMap::from([("127.0.0.1:8000", backend_guard.port())]), + ); + let config = praxis_core::config::Config::from_yaml(&patched).expect("patched config should parse"); + let proxy = start_proxy(&config); + + let raw = http_send( + proxy.addr(), + &json_post("/v1/responses", r#"{"model":"gpt-4.1","input":"Hello"}"#), + ); + assert_eq!(parse_status(&raw), 200, "first request should succeed"); + assert_eq!( + parse_body(&raw), + FIRST_RESPONSE_JSON, + "first response body should match backend" + ); + + drop(backend_guard); + + let backend_guard2 = start_echo_backend(); + let patched2 = patch_yaml( + &yaml.replace("sqlite://responses.db?mode=rwc", &db_url), + proxy_port, + &HashMap::from([("127.0.0.1:8000", backend_guard2.port())]), + ); + let config2 = praxis_core::config::Config::from_yaml(&patched2).expect("second patched config should parse"); + drop(proxy); + + let proxy2 = start_proxy(&config2); + + let raw2 = http_send( + proxy2.addr(), + &json_post( + "/v1/responses", + r#"{"model":"gpt-4.1","input":"What next?","previous_response_id":"resp_first"}"#, + ), + ); + let status2 = parse_status(&raw2); + let body2 = parse_body(&raw2); + assert_eq!( + status2, 200, + "second request with previous_response_id should succeed (validation passed), body: {body2}" + ); + let echoed: serde_json::Value = serde_json::from_str(&body2).expect("echoed request should be valid JSON"); + assert_eq!( + echoed["input"], "What next?", + "body should pass through unchanged — input stays as original string" + ); + assert_eq!( + echoed["previous_response_id"], "resp_first", + "body should pass through unchanged — previous_response_id preserved" + ); + + drop(proxy2); + cleanup_sqlite_files(&db_path); +} + +#[test] +fn rehydrate_passes_through_non_responses_traffic() { + let backend_guard = Backend::fixed("fallback") + .header("content-type", "text/plain") + .start_with_shutdown(); + let proxy_port = free_port(); + + let yaml = std::fs::read_to_string(example_config_path("ai/openai/responses/rehydrate.yaml")) + .expect("example config should exist"); + let patched = patch_yaml( + &yaml.replace("sqlite://responses.db?mode=rwc", "sqlite::memory:"), + proxy_port, + &HashMap::from([("127.0.0.1:8000", backend_guard.port())]), + ); + let config = praxis_core::config::Config::from_yaml(&patched).expect("patched config should parse"); + let proxy = start_proxy(&config); + + let raw = http_send( + proxy.addr(), + &json_post( + "/v1/responses", + r#"{"model":"gpt-4","messages":[{"role":"user","content":"Hi"}]}"#, + ), + ); + + assert_eq!(parse_status(&raw), 200, "non-Responses body should pass through"); +} + +// ----------------------------------------------------------------------------- +// Test Utilities +// ----------------------------------------------------------------------------- + +/// Generate a unique file-backed SQLite URL for test isolation. +fn temp_sqlite_url(test_name: &str) -> (String, std::path::PathBuf) { + use std::time::{SystemTime, UNIX_EPOCH}; + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time should be after epoch") + .as_nanos(); + let db_path = std::env::temp_dir().join(format!("praxis_integ_{test_name}_{}_{nanos}.db", std::process::id())); + (format!("sqlite://{}?mode=rwc", db_path.display()), db_path) +} + +/// Remove a SQLite database file and its WAL/SHM companions. +fn cleanup_sqlite_files(db_path: &std::path::Path) { + drop(std::fs::remove_file(db_path)); + drop(std::fs::remove_file(format!("{}-shm", db_path.display()))); + drop(std::fs::remove_file(format!("{}-wal", db_path.display()))); +} diff --git a/tests/utils/Cargo.toml b/tests/utils/Cargo.toml index f9f56f07..46a621dd 100644 --- a/tests/utils/Cargo.toml +++ b/tests/utils/Cargo.toml @@ -32,3 +32,6 @@ tracing = { workspace = true } futures = { workspace = true } tokio-rustls = { workspace = true } tokio-tungstenite = { workspace = true } + +[features] +ai-inference = ["praxis-filter/ai-inference"]