Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions filter/src/builtins/http/ai/openai/responses/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ use crate::{
/// Maximum length of a body-derived value promoted to headers or filter results.
const MAX_PROMOTED_VALUE_LEN: usize = 256;

/// Default store name used when registering the response store in the
/// per-request registry.
pub(crate) const DEFAULT_STORE_NAME: &str = "default";

/// Metadata key for tenant isolation.
pub(crate) const TENANT_METADATA_KEY: &str = "responses.tenant_id";

/// Fallback tenant ID when no tenant metadata is present.
pub(crate) const DEFAULT_TENANT_ID: &str = "default";

// -----------------------------------------------------------------------------
// ResponsesFormatFilter
// -----------------------------------------------------------------------------
Expand Down
213 changes: 183 additions & 30 deletions filter/src/builtins/http/ai/openai/responses/store/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
//! The filter spans three phases, each refining the "should we
//! persist?" decision as new information becomes available:
//!
//! - **`on_request`**: reads classifier metadata to decide whether the request is persistable (POST, responses format,
//! store enabled, non-streaming). Lazily initializes the store backend. Sets `responses.skip_persist` metadata on
//! store init failure.
//! - **`on_request`**: reads classifier metadata to decide whether the request needs the store (persistable POST or
//! `previous_response_id`). Lazily initializes the store backend when needed. Sets `responses.skip_persist` metadata
//! on store init failure for requests that would otherwise persist.
//!
//! - **`on_response`**: re-checks skip conditions, then inspects the response status and content-type. Non-2xx or
//! non-JSON responses set `responses.skip_persist` and bail early.
Expand All @@ -29,11 +29,16 @@
//! intentional. Each phase learns something new (request metadata,
//! response headers, body bytes), and early exit avoids wasted
//! work (store init, body buffering, JSON parsing). Cross-phase
//! state is carried exclusively through string metadata in
//! control state is carried through string metadata in
//! [`filter_metadata`], following the same pattern as the A2A
//! filter.
//! filter. The original request `input` is carried through typed
//! per-filter state because it can be arbitrary JSON and is not part
//! of the Responses API response object. When rehydrate populated
//! [`ResponsesState`], that state is used as the stored message
//! history.
//!
//! [`filter_metadata`]: crate::HttpFilterContext::filter_metadata
//! [`ResponsesState`]: super::super::state::ResponsesState

use std::sync::Arc;

Expand All @@ -45,6 +50,7 @@ use tokio::sync::OnceCell;
use tracing::{debug, trace, warn};

use super::{
super::{DEFAULT_STORE_NAME, DEFAULT_TENANT_ID, TENANT_METADATA_KEY, state::ResponsesState},
ListParams, Order,
config::{ResponseStoreConfig, StorageBackend, revalidate_postgres_host, validate_config},
list_input_items,
Expand All @@ -59,16 +65,6 @@ use crate::{
filter::{HttpFilter, HttpFilterContext},
};

// -----------------------------------------------------------------------------
// Constants
// -----------------------------------------------------------------------------

/// Default tenant identifier for single-tenant deployments.
const DEFAULT_TENANT_ID: &str = "default";

/// Metadata key for the per-request tenant identifier.
const TENANT_METADATA_KEY: &str = "responses.tenant_id";

/// Persists non-streaming Responses API responses to the
/// configured response store backend.
///
Expand Down Expand Up @@ -256,26 +252,86 @@ impl ResponseStoreFilter {
}

// -----------------------------------------------------------------------------
// ResponseCapture
// Request / Response Capture
// -----------------------------------------------------------------------------

/// Request-phase data needed when persisting the response.
struct ResponseStoreRequestState {
/// Original `input` value from the Responses API create request.
input: Value,
}

/// Fields extracted from the response JSON for the store record.
struct ResponseCapture {
/// Echoed input items from the response.
/// Original request input used by rehydration.
input: Value,

/// Model output items.
/// Full message history used by rehydration.
messages: Value,
}

impl ResponseCapture {
/// Extract input and output from a Responses API response object.
fn from_response_json(json: &Value) -> Self {
Self {
input: json.get("input").cloned().unwrap_or(Value::Null),
messages: json.get("output").cloned().unwrap_or(Value::Null),
/// Extract stored input and output from a Responses API exchange.
fn from_response_json(json: &Value, request_input: Option<Value>, state_messages: Option<Vec<Value>>) -> Self {
let input = request_input
.or_else(|| json.get("input").cloned())
.unwrap_or(Value::Null);
let output = json.get("output").cloned().unwrap_or(Value::Null);
let history_input = state_messages.map_or_else(|| input.clone(), Value::Array);
let messages = assemble_stored_messages(&history_input, &output);

Self { input, messages }
}
}

/// Extract the original Responses API request input from the buffered
/// create request body.
fn extract_request_input(body: &Option<Bytes>) -> Option<Value> {
let bytes = body.as_ref().filter(|b| !b.is_empty())?;
let json: Value = match serde_json::from_slice(bytes) {
Ok(v) => v,
Err(e) => {
trace!(error = %e, "response store: invalid request JSON");
return None;
},
};
json.get("input").cloned()
}

/// Build the stored conversation history from response input and output.
fn assemble_stored_messages(input: &Value, output: &Value) -> Value {
let mut messages = Vec::new();

append_stored_input_items(&mut messages, input.clone());

if !output.is_null() {
if let Some(items) = output.as_array() {
messages.extend(items.iter().cloned());
} else {
messages.push(output.clone());
}
}

Value::Array(messages)
}

/// Append stored response input as valid Responses API item params.
fn append_stored_input_items(messages: &mut Vec<Value>, input: Value) {
match input {
Value::Null => {},
Value::String(text) => messages.push(user_message_item(&text)),
Value::Array(items) => messages.extend(items),
other => messages.push(other),
}
}

/// Build a Responses API user message item from string input.
fn user_message_item(text: &str) -> Value {
serde_json::json!({
"type": "message",
"role": "user",
"content": text
})
}

// -----------------------------------------------------------------------------
Expand All @@ -295,6 +351,28 @@ pub(super) fn extract_response_id(path: &str) -> Option<&str> {
}
}

// -----------------------------------------------------------------------------
// Registry Helpers
// -----------------------------------------------------------------------------

/// Publish the initialized store into the per-request registry so
/// downstream filters (rehydrate, compact, etc.) can read from it.
fn register_store_in_context(ctx: &HttpFilterContext<'_>, store: &Arc<dyn ResponseStore>) {
let Some(registry) = ctx.response_stores else {
return;
};
// Known limitation: the first default backend wins for this registry.
// That matches today's one-store-per-listener setup, but a future
// multi-store or live backend migration design should scope this by config.
if registry.get(DEFAULT_STORE_NAME).is_some() {
return;
}
let name: Arc<str> = Arc::from(DEFAULT_STORE_NAME);
Comment thread
leseb marked this conversation as resolved.
if let Err(e) = registry.register(&name, Arc::clone(store)) {
debug!(error = %e, "response store already registered");
}
}

// -----------------------------------------------------------------------------
// Delete Response Helpers
// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -337,6 +415,24 @@ fn should_skip(ctx: &HttpFilterContext<'_>) -> bool {
is_non_post_request(ctx) || is_non_responses_format(ctx) || is_store_disabled(ctx) || is_streaming_request(ctx)
}

/// Check whether this request should initialize the store.
fn should_init_store_for_request(ctx: &HttpFilterContext<'_>) -> bool {
request_will_persist_response(ctx) || request_needs_rehydrate_store(ctx)
}

/// Check whether this request can persist the eventual response.
fn request_will_persist_response(ctx: &HttpFilterContext<'_>) -> bool {
ctx.request.method == http::Method::POST
&& is_responses_format(ctx)
&& !is_store_disabled(ctx)
&& !is_streaming_request(ctx)
}

/// Check whether rehydrate needs the store before the request phase.
fn request_needs_rehydrate_store(ctx: &HttpFilterContext<'_>) -> bool {
ctx.request.method == http::Method::POST && is_responses_format(ctx) && has_previous_response_id(ctx)
}

/// Return whether the request method is not persistable.
fn is_non_post_request(ctx: &HttpFilterContext<'_>) -> bool {
let skip = ctx.request.method != http::Method::POST;
Expand All @@ -349,13 +445,18 @@ fn is_non_post_request(ctx: &HttpFilterContext<'_>) -> bool {
/// Return whether the request is not a Responses API request.
fn is_non_responses_format(ctx: &HttpFilterContext<'_>) -> bool {
let format = ctx.get_metadata("openai_responses_format.format");
let skip = format != Some("openai_responses");
let skip = !is_responses_format(ctx);
if skip {
trace!(format = ?format, "skipping non-responses format");
}
skip
}

/// Return whether the request is classified as a Responses API request.
fn is_responses_format(ctx: &HttpFilterContext<'_>) -> bool {
ctx.get_metadata("openai_responses_format.format") == Some("openai_responses")
}

/// Return whether the request explicitly disabled persistence.
fn is_store_disabled(ctx: &HttpFilterContext<'_>) -> bool {
let skip = ctx.get_metadata("openai_responses_format.store") == Some("false");
Expand All @@ -374,6 +475,11 @@ fn is_streaming_request(ctx: &HttpFilterContext<'_>) -> bool {
skip
}

/// Return whether the request references a previous response.
fn has_previous_response_id(ctx: &HttpFilterContext<'_>) -> bool {
ctx.get_metadata("openai_responses_format.has_previous_response_id") == Some("true")
}

/// Check whether persistence was skipped during the response phase.
fn should_skip_persist(ctx: &HttpFilterContext<'_>) -> bool {
should_skip(ctx) || ctx.get_metadata("responses.skip_persist") == Some("true")
Expand Down Expand Up @@ -417,7 +523,12 @@ fn response_is_persistable(ctx: &mut HttpFilterContext<'_>) -> bool {

/// Parse a response body into a [`ResponseRecord`], returning
/// `None` for invalid JSON or missing required fields.
fn parse_response_record(bytes: &[u8], tenant_id: &str) -> Option<ResponseRecord> {
fn parse_response_record(
bytes: &[u8],
tenant_id: &str,
request_input: Option<Value>,
state_messages: Option<Vec<Value>>,
) -> Option<ResponseRecord> {
let json: Value = match serde_json::from_slice(bytes) {
Ok(v) => v,
Err(e) => {
Expand All @@ -435,7 +546,7 @@ fn parse_response_record(bytes: &[u8], tenant_id: &str) -> Option<ResponseRecord
return None;
};

let capture = ResponseCapture::from_response_json(&json);
let capture = ResponseCapture::from_response_json(&json, request_input, state_messages);

Some(ResponseRecord {
id: id.to_owned(),
Expand Down Expand Up @@ -480,6 +591,10 @@ impl HttpFilter for ResponseStoreFilter {
"openai_response_store"
}

fn request_body_access(&self) -> BodyAccess {
Comment thread
leseb marked this conversation as resolved.
BodyAccess::ReadOnly
}

fn response_body_access(&self) -> BodyAccess {
BodyAccess::ReadOnly
}
Expand Down Expand Up @@ -516,17 +631,48 @@ impl HttpFilter for ResponseStoreFilter {
return Ok(FilterAction::Continue);
}

if should_skip(ctx) {
if !should_init_store_for_request(ctx) {
return Ok(FilterAction::Continue);
}

if self.get_or_init_store().await.is_none() {
ctx.set_metadata("responses.skip_persist", "true");
let will_persist = request_will_persist_response(ctx);
match &self.get_or_init_store().await {
Some(store) => register_store_in_context(ctx, store),
None if will_persist => ctx.set_metadata("responses.skip_persist", "true"),
None => {},
}

Ok(FilterAction::Continue)
}

/// Eagerly register the store during the body phase so
/// downstream filters running in `StreamBuffer` pre-read
/// (before `on_request`) can access it.
async fn on_request_body(
&self,
ctx: &mut HttpFilterContext<'_>,
body: &mut Option<Bytes>,
end_of_stream: bool,
) -> Result<FilterAction, FilterError> {
if !end_of_stream || ctx.request.method != http::Method::POST {
return Ok(FilterAction::Continue);
}
if !should_skip(ctx)
&& let Some(input) = extract_request_input(body)
{
ctx.insert_filter_state(ResponseStoreRequestState { input });
}
if should_init_store_for_request(ctx) {
let will_persist = request_will_persist_response(ctx);
match &self.get_or_init_store().await {
Some(store) => register_store_in_context(ctx, store),
None if will_persist => ctx.set_metadata("responses.skip_persist", "true"),
None => {},
}
}
Ok(FilterAction::Continue)
}

async fn on_response(&self, ctx: &mut HttpFilterContext<'_>) -> Result<FilterAction, FilterError> {
if should_skip_persist(ctx) {
return Ok(FilterAction::Continue);
Expand Down Expand Up @@ -572,7 +718,14 @@ impl HttpFilter for ResponseStoreFilter {
.get_metadata(TENANT_METADATA_KEY)
.unwrap_or(DEFAULT_TENANT_ID)
.to_owned();
let Some(record) = parse_response_record(bytes, &tenant_id) else {
let request_input = ctx
.remove_filter_state::<ResponseStoreRequestState>()
.map(|state| state.input);
let state_messages = ctx
.extensions
.get::<ResponsesState>()
.map(|state| state.messages.clone());
let Some(record) = parse_response_record(bytes, &tenant_id, request_input, state_messages) else {
return Ok(FilterAction::Continue);
};

Expand Down
Loading
Loading