From 610f1dbf3ceb97f736e7c874a016ddf9d6e7b130 Mon Sep 17 00:00:00 2001 From: Daniel Salib Date: Wed, 19 Nov 2025 02:50:57 -0800 Subject: [PATCH 1/2] Improve Responses API input validation for multi-turn --- .../openai/parser/harmony_utils.py | 45 ++++++++++++ vllm/entrypoints/openai/protocol.py | 69 +++++++++++++++++-- vllm/entrypoints/openai/serving_responses.py | 39 ++++------- 3 files changed, 124 insertions(+), 29 deletions(-) diff --git a/vllm/entrypoints/openai/parser/harmony_utils.py b/vllm/entrypoints/openai/parser/harmony_utils.py index 376d97a03964..4c71821591d7 100644 --- a/vllm/entrypoints/openai/parser/harmony_utils.py +++ b/vllm/entrypoints/openai/parser/harmony_utils.py @@ -48,8 +48,53 @@ ResponseInputOutputItem, ResponsesRequest, ) +from vllm.logger import init_logger from vllm.utils import random_uuid +logger = init_logger(__name__) + + +def _create_reasoning_item_with_encrypted_content( + reasoning_id: str, + content: list[ResponseReasoningTextContent] | None = None, + status: str | None = None, + summary: list | None = None, +) -> ResponseReasoningItem: + """Create a ResponseReasoningItem with encrypted_content populated. + + This ensures Codex can properly round-trip reasoning items in multi-turn + conversations by serializing the content to encrypted_content. + """ + if summary is None: + summary = [] + + encrypted_content = None + if content: + # Serialize content AND id to encrypted_content for Codex + # This ensures the ID is preserved across multi-turn conversations + content_dict = [ + {"type": c.type, "text": c.text} for c in content + ] + encrypted_content = json.dumps({ + "id": reasoning_id, + "content": content_dict + }) + logger.debug( + f"Created reasoning item {reasoning_id} with encrypted_content: " + f"{encrypted_content[:100]}..." if len(encrypted_content) > 100 + else encrypted_content + ) + + return ResponseReasoningItem( + type="reasoning", + id=reasoning_id, + summary=summary, + content=content, + encrypted_content=encrypted_content, + status=status, + ) + + REASONING_EFFORT = { "high": ReasoningEffort.HIGH, "medium": ReasoningEffort.MEDIUM, diff --git a/vllm/entrypoints/openai/protocol.py b/vllm/entrypoints/openai/protocol.py index f6f725cf0e96..2f6fbbe240e7 100644 --- a/vllm/entrypoints/openai/protocol.py +++ b/vllm/entrypoints/openai/protocol.py @@ -490,6 +490,11 @@ def function_call_parsing(cls, data): Function calls provided as dicts are converted to ResponseFunctionToolCall objects before validation, while invalid structures are left for Pydantic to reject with appropriate error messages. + + Also handles client serialization variations by: + - Normalizing items to dicts and stripping None values + - Auto-generating missing IDs for reasoning items + - Converting output content types to input types for echoed messages """ input_data = data.get("input") @@ -509,18 +514,74 @@ def function_call_parsing(cls, data): processed_input = [] for item in input_data: - if isinstance(item, dict) and item.get("type") == "function_call": + # Normalize to dict: handle both plain dicts and Pydantic models + if not isinstance(item, dict): try: - processed_input.append(ResponseFunctionToolCall(**item)) + item_dict = item.model_dump(exclude_none=True) + except AttributeError: + # Not a Pydantic model, append as-is + processed_input.append(item) + continue + else: + # Strip None values that may come from clients + # with different serialization configs + item_dict = {k: v for k, v in item.items() if v is not None} + + item_type = item_dict.get("type") + + # Handle encrypted_content for reasoning items FIRST + # This must happen before ID auto-generation because the ID + # might be encoded in the encrypted_content + if item_type == "reasoning": + if "encrypted_content" in item_dict and item_dict["encrypted_content"] is not None: + # Codex sent back encrypted_content from a previous turn - SUCCESS! + # Deserialize to extract both content and ID + try: + encrypted_data = json.loads(item_dict["encrypted_content"]) + # New format: {"id": "rs_xxx", "content": [...]} + if isinstance(encrypted_data, dict) and "id" in encrypted_data: + # Extract ID from encrypted_content if not already present + if "id" not in item_dict or item_dict["id"] is None: + item_dict["id"] = encrypted_data["id"] + # Extract content + item_dict["content"] = encrypted_data.get("content", []) + else: + # Old format: just the content array + item_dict["content"] = encrypted_data + except (json.JSONDecodeError, TypeError) as e: + logger.warning( + f"Failed to deserialize encrypted_content: {e}. " + f"Treating as opaque string." + ) + + + # Normalize content types when clients echo back + # previous assistant messages as input + # Output content types (output_text) need to be + # converted to input types (input_text) + if item_type == "message" and "content" in item_dict: + content = item_dict["content"] + if isinstance(content, list): + for content_item in content: + if ( + isinstance(content_item, dict) + and content_item.get("type") == "output_text" + ): + content_item["type"] = "input_text" + + # Handle function_call special case + if item_type == "function_call": + try: + processed_input.append(ResponseFunctionToolCall(**item_dict)) except ValidationError: # Let Pydantic handle validation for malformed function calls logger.debug( "Failed to parse function_call to ResponseFunctionToolCall, " "leaving for Pydantic validation" ) - processed_input.append(item) + processed_input.append(item_dict) else: - processed_input.append(item) + processed_input.append(item_dict) data["input"] = processed_input return data diff --git a/vllm/entrypoints/openai/serving_responses.py b/vllm/entrypoints/openai/serving_responses.py index 54aa4795920b..4c02c061b443 100644 --- a/vllm/entrypoints/openai/serving_responses.py +++ b/vllm/entrypoints/openai/serving_responses.py @@ -67,6 +67,7 @@ ) from vllm.entrypoints.logger import RequestLogger from vllm.entrypoints.openai.parser.harmony_utils import ( + _create_reasoning_item_with_encrypted_content, construct_harmony_previous_input_messages, get_developer_message, get_stop_tokens_for_assistant_actions, @@ -869,10 +870,8 @@ def _make_response_output_items( reasoning_item = None message_item = None if reasoning: - reasoning_item = ResponseReasoningItem( - id=f"rs_{random_uuid()}", - summary=[], - type="reasoning", + reasoning_item = _create_reasoning_item_with_encrypted_content( + reasoning_id=f"rs_{random_uuid()}", content=[ ResponseReasoningTextContent(text=reasoning, type="reasoning_text") ], @@ -1272,10 +1271,8 @@ async def _process_simple_streaming_events( type="response.output_item.added", sequence_number=-1, output_index=current_output_index, - item=ResponseReasoningItem( - type="reasoning", - id=current_item_id, - summary=[], + item=_create_reasoning_item_with_encrypted_content( + reasoning_id=current_item_id, status="in_progress", ), ) @@ -1339,8 +1336,8 @@ async def _process_simple_streaming_events( ) ) current_content_index = 0 - reasoning_item = ResponseReasoningItem( - type="reasoning", + reasoning_item = _create_reasoning_item_with_encrypted_content( + reasoning_id=current_item_id, content=[ ResponseReasoningTextContent( text=reason_content, @@ -1348,8 +1345,6 @@ async def _process_simple_streaming_events( ), ], status="completed", - id=current_item_id, - summary=[], ) yield _increment_sequence_number_and_return( ResponseOutputItemDoneEvent( @@ -1447,8 +1442,8 @@ async def _process_simple_streaming_events( ) ) current_content_index += 1 - reasoning_item = ResponseReasoningItem( - type="reasoning", + reasoning_item = _create_reasoning_item_with_encrypted_content( + reasoning_id=current_item_id, content=[ ResponseReasoningTextContent( text=reason_content, @@ -1456,8 +1451,6 @@ async def _process_simple_streaming_events( ), ], status="completed", - id=current_item_id, - summary=[], ) yield _increment_sequence_number_and_return( ResponseOutputItemDoneEvent( @@ -1588,12 +1581,10 @@ async def _process_harmony_streaming_events( text=previous_item.content[0].text, type="reasoning_text", ) - reasoning_item = ResponseReasoningItem( - type="reasoning", + reasoning_item = _create_reasoning_item_with_encrypted_content( + reasoning_id=current_item_id, content=[content], status="completed", - id=current_item_id, - summary=[], ) yield _increment_sequence_number_and_return( ResponseReasoningTextDoneEvent( @@ -1722,16 +1713,14 @@ async def _process_harmony_streaming_events( ): if not sent_output_item_added: sent_output_item_added = True - current_item_id = f"msg_{random_uuid()}" + current_item_id = f"rs_{random_uuid()}" # Reasoning items use rs_ prefix yield _increment_sequence_number_and_return( ResponseOutputItemAddedEvent( type="response.output_item.added", sequence_number=-1, output_index=current_output_index, - item=ResponseReasoningItem( - type="reasoning", - id=current_item_id, - summary=[], + item=_create_reasoning_item_with_encrypted_content( + reasoning_id=current_item_id, status="in_progress", ), ) From d1a187d6689ddf6f9fbb806ad763fee500c95097 Mon Sep 17 00:00:00 2001 From: Daniel Salib Date: Fri, 21 Nov 2025 20:03:06 -0800 Subject: [PATCH 2/2] logging --- .../openai/test_response_api_mcp_tools.py | 61 +++++++++++++++++++ vllm/entrypoints/openai/serving_responses.py | 47 ++++++++++++++ 2 files changed, 108 insertions(+) diff --git a/tests/entrypoints/openai/test_response_api_mcp_tools.py b/tests/entrypoints/openai/test_response_api_mcp_tools.py index cd338b5555c5..73d184db8489 100644 --- a/tests/entrypoints/openai/test_response_api_mcp_tools.py +++ b/tests/entrypoints/openai/test_response_api_mcp_tools.py @@ -206,6 +206,67 @@ async def test_mcp_tool_env_flag_disabled(mcp_disabled_client: OpenAI, model_nam ) +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_mcp_tool_calling_streaming_types( + mcp_enabled_client: OpenAI, model_name: str +): + pairs_of_event_types = { + "response.completed": "response.created", + "response.output_item.done": "response.output_item.added", + "response.content_part.done": "response.content_part.added", + "response.output_text.done": "response.output_text.delta", + "response.reasoning_text.done": "response.reasoning_text.delta", + "response.reasoning_part.done": "response.reasoning_part.added", + "response.mcp_call_arguments.done": "response.mcp_call_arguments.delta", + "response.mcp_call.completed": "response.mcp_call.in_progress", + } + + tools = [ + { + "type": "mcp", + "server_label": "code_interpreter", + } + ] + input_text = "What is 123 * 456? Use python to calculate the result." + + stream_response = await mcp_enabled_client.responses.create( + model=model_name, + input=input_text, + tools=tools, + stream=True, + instructions=( + "You must use the Python tool to execute code. Never simulate execution." + ), + ) + + stack_of_event_types = [] + saw_mcp_type = False + async for event in stream_response: + if event.type == "response.created": + stack_of_event_types.append(event.type) + elif event.type == "response.completed": + assert stack_of_event_types[-1] == pairs_of_event_types[event.type] + stack_of_event_types.pop() + if ( + event.type.endswith("added") + or event.type == "response.mcp_call.in_progress" + ): + stack_of_event_types.append(event.type) + elif event.type.endswith("delta"): + if stack_of_event_types[-1] == event.type: + continue + stack_of_event_types.append(event.type) + elif event.type.endswith("done") or event.type == "response.mcp_call.completed": + assert stack_of_event_types[-1] == pairs_of_event_types[event.type] + if "mcp_call" in event.type: + saw_mcp_type = True + stack_of_event_types.pop() + + assert len(stack_of_event_types) == 0 + assert saw_mcp_type, "Should have seen at least one mcp call" + + def test_get_tool_description(): """Test MCPToolServer.get_tool_description filtering logic. diff --git a/vllm/entrypoints/openai/serving_responses.py b/vllm/entrypoints/openai/serving_responses.py index 4c02c061b443..fe151e66815a 100644 --- a/vllm/entrypoints/openai/serving_responses.py +++ b/vllm/entrypoints/openai/serving_responses.py @@ -319,6 +319,24 @@ async def create_responses( | ResponsesResponse | ErrorResponse ): + # Log input request details + try: + logger.info("=" * 80) + logger.info("RESPONSES API REQUEST:") + logger.info(f"Request ID: {request.request_id}") + logger.info(f"Model: {request.model}") + logger.info(f"Stream: {request.stream}") + # Log input messages + if hasattr(request, 'input') and request.input: + logger.info(f"Input messages ({len(request.input)} messages):") + for i, msg in enumerate(request.input): + if hasattr(msg, 'content'): + content_preview = str(msg.content)[:500] if len(str(msg.content)) > 500 else str(msg.content) + logger.info(f" Message[{i}] role={getattr(msg, 'role', 'unknown')}: {content_preview}") + logger.info("=" * 80) + except Exception as e: + logger.warning(f"Error logging request details: {e}") + error_check_ret = await self._check_model(request) if error_check_ret is not None: logger.error("Error with model %s", error_check_ret) @@ -739,6 +757,35 @@ async def responses_full_generator( # If the response is already cancelled, don't update it. if stored_response is None or stored_response.status != "cancelled": self.response_store[response.id] = response + + # Log output response details + try: + logger.info("=" * 80) + logger.info("RESPONSES API RESPONSE:") + logger.info(f"Response ID: {response.id}") + logger.info(f"Status: {status}") + logger.info(f"Output items: {len(output)}") + for i, item in enumerate(output): + # Try to extract text content from different possible fields + text_content = None + if hasattr(item, 'text') and item.text: + text_content = item.text + elif hasattr(item, 'reasoning') and item.reasoning: + text_content = item.reasoning + elif hasattr(item, 'content'): + text_content = str(item.content) + + if text_content: + preview = text_content[:1000] if len(text_content) > 1000 else text_content + logger.info(f"Output[{i}] type={getattr(item, 'type', 'unknown')}:") + logger.info(f" Content: {preview}") + else: + logger.info(f"Output[{i}] type={getattr(item, 'type', 'unknown')} (no text content)") + logger.info(f"Usage: input_tokens={usage.input_tokens}, output_tokens={usage.output_tokens}, total={usage.total_tokens}") + logger.info("=" * 80) + except Exception as e: + logger.warning(f"Error logging response details: {e}") + return response def _topk_logprobs(