From 2d1aadb036fc64743f35cedd661cb4b4ffd53448 Mon Sep 17 00:00:00 2001 From: Will Deines Date: Tue, 3 Mar 2026 11:32:43 -0500 Subject: [PATCH 1/7] [Responses API] Sanitize leaked Harmony control tokens in tool names and recipients MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GPT-OSS models generate Harmony protocol control tokens (<|channel|>, <|constrain|>, <|start|>, <|end|>, <|message|>) in unexpected positions during output generation, causing tool name contamination, recipient misrouting, and parser crashes. Three layers of defense: 1. sanitize_harmony_name() — pure string function that strips leaked control token strings from tool/recipient names. 2. ResilientStreamableParser — wrapper around StreamableParser that recovers from missing <|start|> tokens between messages and malformed <|constrain|> tokens in headers. 3. Routing-level fallback — sanitized-to-empty recipients fall through to _parse_message_no_recipient() instead of being misrouted. Applied at all input parsing, output dispatching, tool routing, and streaming delta extraction sites. Signed-off-by: Will Deines --- .../openai/parser/test_harmony_utils.py | 119 ++++++++++++++++ .../openai/responses/test_harmony_utils.py | 37 +++++ .../openai/chat_completion/stream_harmony.py | 5 +- .../openai/parser/harmony_utils.py | 129 +++++++++++++++++- vllm/entrypoints/openai/responses/context.py | 13 +- vllm/entrypoints/openai/responses/harmony.py | 16 ++- 6 files changed, 309 insertions(+), 10 deletions(-) diff --git a/tests/entrypoints/openai/parser/test_harmony_utils.py b/tests/entrypoints/openai/parser/test_harmony_utils.py index 21b53dff1507..51f22566641d 100644 --- a/tests/entrypoints/openai/parser/test_harmony_utils.py +++ b/tests/entrypoints/openai/parser/test_harmony_utils.py @@ -8,10 +8,12 @@ from vllm.entrypoints.openai.parser.harmony_utils import ( auto_drop_analysis_messages, get_encoding, + get_streamable_parser_for_assistant, get_system_message, has_custom_tools, parse_chat_input_to_harmony_message, parse_chat_output, + sanitize_harmony_name, ) from vllm.entrypoints.openai.responses.harmony import ( response_input_to_harmony, @@ -928,3 +930,120 @@ def test_reasoning_with_empty_content_returns_none(self): msg = response_input_to_harmony(item, prev_responses=[]) assert msg is None + + +class TestSanitizeHarmonyName: + """Tests for sanitize_harmony_name().""" + + def test_clean_name_unchanged(self) -> None: + assert sanitize_harmony_name("get_weather") == "get_weather" + + def test_strip_channel_token(self) -> None: + assert ( + sanitize_harmony_name("manage_cart<|channel|>commentary") == "manage_cart" + ) + + def test_strip_constrain_token(self) -> None: + assert sanitize_harmony_name("<|constrain|>json") == "" + + def test_pure_control_token_returns_empty(self) -> None: + assert sanitize_harmony_name("<|start|>") == "" + + def test_multiple_tokens_earliest_wins(self) -> None: + assert ( + sanitize_harmony_name("foo<|channel|>bar<|constrain|>baz") == "foo" + ) + + def test_empty_string(self) -> None: + assert sanitize_harmony_name("") == "" + + def test_trailing_whitespace_stripped(self) -> None: + assert sanitize_harmony_name("tool_name <|end|>") == "tool_name" + + +class TestResilientStreamableParser: + """Tests for ResilientStreamableParser error recovery.""" + + def test_normal_sequence_unchanged(self) -> None: + """Normal token sequence should produce same results as raw parser.""" + encoding = get_encoding() + harmony_str = "<|channel|>final<|message|>Hello world<|end|>" + token_ids = encoding.encode(harmony_str, allowed_special="all") + + parser = get_streamable_parser_for_assistant() + for tok in token_ids: + parser.process(tok) + + assert len(parser.messages) == 1 + assert parser.messages[0].content[0].text == "Hello world" + assert parser.messages[0].channel == "final" + + def test_missing_start_recovery(self) -> None: + """Parser should recover when <|start|> is missing between messages.""" + encoding = get_encoding() + # First message completes normally, second is missing <|start|> + first_msg = "<|channel|>final<|message|>First.<|end|>" + second_msg = "<|channel|>final<|message|>Second.<|end|>" + first_tokens = encoding.encode(first_msg, allowed_special="all") + second_tokens = encoding.encode(second_msg, allowed_special="all") + + parser = get_streamable_parser_for_assistant() + for tok in first_tokens: + parser.process(tok) + # Feed second message tokens directly (missing <|start|>assistant) + for tok in second_tokens: + parser.process(tok) + + assert len(parser.messages) == 2 + assert parser.messages[0].content[0].text == "First." + assert parser.messages[1].content[0].text == "Second." + + def test_constrain_in_header_skipped(self) -> None: + """<|constrain|> in HEADER state should be skipped gracefully.""" + encoding = get_encoding() + # First, complete a normal message so parser goes to EXPECT_START + first_msg = "<|channel|>final<|message|>First.<|end|>" + first_tokens = encoding.encode(first_msg, allowed_special="all") + + # Build a second message where <|constrain|> appears in the header + # after <|start|>assistant, before <|channel|> + start_tok = encoding.encode("<|start|>", allowed_special="all") + role_toks = encoding.encode("assistant", allowed_special="all") + constrain_tok = encoding.encode("<|constrain|>", allowed_special="all") + # Garbage tokens that should be skipped + json_toks = encoding.encode("json", allowed_special="all") + message_tok = encoding.encode("<|message|>", allowed_special="all") + text_toks = encoding.encode("Second.", allowed_special="all") + end_tok = encoding.encode("<|end|>", allowed_special="all") + + parser = get_streamable_parser_for_assistant() + # Complete first message + for tok in first_tokens: + parser.process(tok) + assert len(parser.messages) == 1 + + # Feed: <|start|>assistant → puts parser in HEADER state + for tok in start_tok: + parser.process(tok) + for tok in role_toks: + parser.process(tok) + # Feed: <|constrain|> → should enter skip mode + for tok in constrain_tok: + parser.process(tok) + # Feed: json tokens → should be discarded in skip mode + for tok in json_toks: + parser.process(tok) + # Feed: <|message|> → should exit skip mode and resume + for tok in message_tok: + parser.process(tok) + # Feed: text + <|end|> + for tok in text_toks: + parser.process(tok) + for tok in end_tok: + parser.process(tok) + + # Should have produced two messages despite the malformed sequence + assert len(parser.messages) == 2 + assert parser.messages[0].content[0].text == "First." + + diff --git a/tests/entrypoints/openai/responses/test_harmony_utils.py b/tests/entrypoints/openai/responses/test_harmony_utils.py index e51538298ff9..388c7b1d3c59 100644 --- a/tests/entrypoints/openai/responses/test_harmony_utils.py +++ b/tests/entrypoints/openai/responses/test_harmony_utils.py @@ -461,3 +461,40 @@ def test_parser_state_to_response_output_analysis_channel() -> None: assert len(builtin_items) == 1 assert not isinstance(builtin_items[0], McpCall) assert builtin_items[0].type == "reasoning" + + +class TestHarmonyOutputSanitization: + """Tests that leaked Harmony control tokens are sanitized in output.""" + + def test_constrain_recipient_treated_as_no_recipient(self): + """<|constrain|>json as recipient should be sanitized to empty, + falling through to _parse_message_no_recipient (produces message).""" + message = Message.from_role_and_content( + Role.ASSISTANT, "Some output text" + ) + message = message.with_channel("commentary") + message = message.with_recipient("<|constrain|>json") + + output_items = harmony_to_response_output(message) + + # Should produce a message (preamble), not an MCP call + assert len(output_items) == 1 + assert isinstance(output_items[0], ResponseOutputMessage) + assert output_items[0].type == "message" + + def test_contaminated_tool_name_cleaned_in_function_call(self): + """Function name with leaked <|channel|> should be sanitized.""" + message = Message.from_role_and_content( + Role.ASSISTANT, '{"location": "SF"}' + ) + message = message.with_channel("commentary") + message = message.with_recipient( + "functions.get_weather<|channel|>commentary" + ) + + output_items = harmony_to_response_output(message) + + assert len(output_items) == 1 + assert isinstance(output_items[0], ResponseFunctionToolCall) + assert output_items[0].name == "get_weather" + assert "<|" not in output_items[0].name diff --git a/vllm/entrypoints/openai/chat_completion/stream_harmony.py b/vllm/entrypoints/openai/chat_completion/stream_harmony.py index 87f2f9b92275..8528aa7f0682 100644 --- a/vllm/entrypoints/openai/chat_completion/stream_harmony.py +++ b/vllm/entrypoints/openai/chat_completion/stream_harmony.py @@ -12,6 +12,7 @@ from openai_harmony import StreamableParser from vllm.entrypoints.chat_utils import make_tool_call_id +from vllm.entrypoints.openai.parser.harmony_utils import sanitize_harmony_name from vllm.entrypoints.openai.engine.protocol import ( DeltaFunctionCall, DeltaMessage, @@ -109,7 +110,9 @@ def extract_harmony_streaming_delta( opened_new_call = False if prev_recipient != group.recipient: # New tool call - emit the opening message - tool_name = group.recipient.split("functions.", 1)[1] + tool_name = sanitize_harmony_name( + group.recipient.split("functions.", 1)[1] + ) tool_messages.append( DeltaToolCall( id=make_tool_call_id(), diff --git a/vllm/entrypoints/openai/parser/harmony_utils.py b/vllm/entrypoints/openai/parser/harmony_utils.py index 9b4264456c51..b359169254ca 100644 --- a/vllm/entrypoints/openai/parser/harmony_utils.py +++ b/vllm/entrypoints/openai/parser/harmony_utils.py @@ -15,6 +15,7 @@ ReasoningEffort, Role, StreamableParser, + StreamState, SystemContent, TextContent, ToolDescription, @@ -27,6 +28,126 @@ logger = init_logger(__name__) +# Harmony special token strings that may leak into tool names or recipients +# during generation by GPT-OSS models. +_HARMONY_SPECIAL_TOKEN_STRS = ( + "<|channel|>", + "<|constrain|>", + "<|start|>", + "<|end|>", + "<|message|>", + "<|call|>", +) + +# Harmony special token IDs (GPT-OSS encoding) +_TOK_CONSTRAIN = 200003 +_TOK_CHANNEL = 200005 +_TOK_START = 200006 +_TOK_END = 200007 +_TOK_MESSAGE = 200008 + + +def sanitize_harmony_name(name: str) -> str: + """Strip leaked Harmony control tokens from a tool/recipient name. + + Finds the earliest Harmony control token string in *name* and returns + only the text before it. Returns the original string unchanged when + no control tokens are present. + """ + earliest = len(name) + for tok in _HARMONY_SPECIAL_TOKEN_STRS: + idx = name.find(tok) + if idx != -1 and idx < earliest: + earliest = idx + return name[:earliest].rstrip() + + +class ResilientStreamableParser: + """Wrapper around ``StreamableParser`` that recovers from two common + malformed-output patterns produced by GPT-OSS models: + + 1. **Missing ``<|start|>`` recovery** – When the parser expects a + ``<|start|>`` token but receives ``<|channel|>`` instead, inject the + missing ``<|start|>`` + assistant role token before forwarding. + + 2. **Malformed ``<|constrain|>`` in headers** – When the parser is in + ``HEADER`` state and receives ``<|constrain|>``, enter skip mode and + discard tokens until ``<|message|>`` or ``<|end|>`` is seen. + + All public properties are delegated to the underlying parser. The + ``current_recipient`` getter additionally sanitizes leaked tokens. + """ + + def __init__(self, inner: StreamableParser, encoding): + self._inner = inner + self._encoding = encoding + self._skip_until_message_or_end = False + + # --- error-recovering process() ----------------------------------- + + def process(self, token_id: int) -> None: + # Pattern 2: skip mode – discard until <|message|> or <|end|> + if self._skip_until_message_or_end: + if token_id in (_TOK_MESSAGE, _TOK_END): + self._skip_until_message_or_end = False + self._inner.process(token_id) + # else: silently discard the token + return + + state = self._inner.state + + # Pattern 1: missing <|start|> before <|channel|> + if state == StreamState.EXPECT_START and token_id == _TOK_CHANNEL: + # Inject <|start|> + assistant role token + self._inner.process(_TOK_START) + role_tokens = self._encoding.encode("assistant", allowed_special="all") + for rt in role_tokens: + self._inner.process(rt) + self._inner.process(token_id) + return + + # Pattern 2: <|constrain|> during HEADER → enter skip mode + if state == StreamState.HEADER and token_id == _TOK_CONSTRAIN: + self._skip_until_message_or_end = True + return + + self._inner.process(token_id) + + # --- delegated properties ----------------------------------------- + + @property + def messages(self): + return self._inner.messages + + @property + def current_content(self): + return self._inner.current_content + + @property + def current_channel(self): + return self._inner.current_channel + + @property + def current_recipient(self): + raw = self._inner.current_recipient + if raw is not None: + sanitized = sanitize_harmony_name(raw) + return sanitized if sanitized else None + return raw + + @property + def current_role(self): + return self._inner.current_role + + @property + def state(self): + return self._inner.state + + @property + def last_content_delta(self): + return self._inner.last_content_delta + + REASONING_EFFORT = { "high": ReasoningEffort.HIGH, "medium": ReasoningEffort.MEDIUM, @@ -256,7 +377,7 @@ def parse_chat_input_to_harmony_message( for call in tool_calls: func = call.get("function", {}) - name = func.get("name", "") + name = sanitize_harmony_name(func.get("name", "")) arguments = func.get("arguments", "") or "" msg = Message.from_role_and_content(Role.ASSISTANT, arguments) msg = msg.with_channel("commentary") @@ -328,8 +449,10 @@ def get_stop_tokens_for_assistant_actions() -> list[int]: return get_encoding().stop_tokens_for_assistant_actions() -def get_streamable_parser_for_assistant() -> StreamableParser: - return StreamableParser(get_encoding(), role=Role.ASSISTANT) +def get_streamable_parser_for_assistant() -> ResilientStreamableParser: + encoding = get_encoding() + inner = StreamableParser(encoding, role=Role.ASSISTANT) + return ResilientStreamableParser(inner, encoding) def parse_output_into_messages(token_ids: Iterable[int]) -> StreamableParser: diff --git a/vllm/entrypoints/openai/responses/context.py b/vllm/entrypoints/openai/responses/context.py index bab59e0aa1ec..70dd0cb5dda3 100644 --- a/vllm/entrypoints/openai/responses/context.py +++ b/vllm/entrypoints/openai/responses/context.py @@ -31,6 +31,7 @@ get_encoding, get_streamable_parser_for_assistant, render_for_completion, + sanitize_harmony_name, ) from vllm.entrypoints.openai.parser.responses_parser import ( get_responses_parser_for_simple_context, @@ -669,7 +670,9 @@ def messages(self) -> list: def need_builtin_tool_call(self) -> bool: last_msg = self.messages[-1] recipient = last_msg.recipient - if recipient is None: + if recipient is not None: + recipient = sanitize_harmony_name(recipient) + if not recipient: return False if recipient.startswith("browser."): return "browser" in self.available_tools @@ -685,6 +688,8 @@ async def call_tool(self) -> list[Message]: last_msg = self.messages[-1] recipient = last_msg.recipient if recipient is not None: + recipient = sanitize_harmony_name(recipient) + if recipient: if recipient.startswith("browser."): return await self.call_search_tool( self._tool_sessions["browser"], last_msg @@ -708,7 +713,7 @@ async def call_search_tool( self.called_tools.add("browser") if isinstance(tool_session, Tool): return await tool_session.get_result(self) - tool_name = last_msg.recipient.split(".")[1] + tool_name = sanitize_harmony_name(last_msg.recipient.split(".")[1]) if envs.VLLM_TOOL_JSON_ERROR_AUTOMATIC_RETRY: try: args = json.loads(last_msg.content[0].text) @@ -795,7 +800,9 @@ async def call_container_tool( self.called_tools.add("container") if isinstance(tool_session, Tool): return await tool_session.get_result(self) - tool_name = last_msg.recipient.split(".")[1].split(" ")[0] + tool_name = sanitize_harmony_name( + last_msg.recipient.split(".")[1].split(" ")[0] + ) if envs.VLLM_TOOL_JSON_ERROR_AUTOMATIC_RETRY: try: args = json.loads(last_msg.content[0].text) diff --git a/vllm/entrypoints/openai/responses/harmony.py b/vllm/entrypoints/openai/responses/harmony.py index faab2f7f4cc7..d5b045b942af 100644 --- a/vllm/entrypoints/openai/responses/harmony.py +++ b/vllm/entrypoints/openai/responses/harmony.py @@ -32,6 +32,7 @@ from vllm.entrypoints.openai.parser.harmony_utils import ( BUILTIN_TOOL_TO_MCP_SERVER_LABEL, flatten_chat_text_content, + sanitize_harmony_name, ) from vllm.entrypoints.openai.responses.protocol import ( ResponseInputOutputItem, @@ -93,7 +94,7 @@ def _parse_chat_format_message(chat_msg: dict) -> list[Message]: msgs: list[Message] = [] for call in tool_calls: func = call.get("function", {}) - name = func.get("name", "") + name = sanitize_harmony_name(func.get("name", "")) arguments = func.get("arguments", "") or "" msg = Message.from_role_and_content(Role.ASSISTANT, arguments) msg = msg.with_channel("commentary") @@ -186,7 +187,8 @@ def response_input_to_harmony( elif response_msg["type"] == "function_call": msg = Message.from_role_and_content(Role.ASSISTANT, response_msg["arguments"]) msg = msg.with_channel("commentary") - msg = msg.with_recipient(f"functions.{response_msg['name']}") + sanitized_name = sanitize_harmony_name(response_msg["name"]) + msg = msg.with_recipient(f"functions.{sanitized_name}") msg = msg.with_content_type("json") else: raise ValueError(f"Unknown input type: {response_msg['type']}") @@ -292,7 +294,7 @@ def _parse_browser_tool_call(message: Message, recipient: str) -> ResponseOutput def _parse_function_call(message: Message, recipient: str) -> list[ResponseOutputItem]: """Parse function calls into function tool call items.""" - function_name = recipient.split(".")[-1] + function_name = sanitize_harmony_name(recipient.split(".")[-1]) output_items = [] for content in message.content: random_id = random_uuid() @@ -421,6 +423,10 @@ def harmony_to_response_output(message: Message) -> list[ResponseOutputItem]: output_items: list[ResponseOutputItem] = [] recipient = message.recipient + if recipient is not None: + recipient = sanitize_harmony_name(recipient) + if not recipient: + recipient = None if recipient is not None: # Browser tool calls (browser.search, browser.open, browser.find) @@ -459,6 +465,10 @@ def parser_state_to_response_output( if parser.current_role != Role.ASSISTANT: return [] current_recipient = parser.current_recipient + if current_recipient is not None: + current_recipient = sanitize_harmony_name(current_recipient) + if not current_recipient: + current_recipient = None if current_recipient is not None and current_recipient.startswith("browser."): return [] From 8e0572477f6e84eec55ebe85722323b5f28e636e Mon Sep 17 00:00:00 2001 From: Will Deines Date: Tue, 3 Mar 2026 12:11:16 -0500 Subject: [PATCH 2/7] Fix Gemini review issues: sanitize message history, structured recipients, remove redundancy - Add sanitize_harmony_recipient() that splits on '.', sanitizes each part, and rejoins to preserve dotted structure (e.g. browser<|channel|>.search becomes browser.search instead of being truncated to browser) - Sanitize recipients on messages returned by ResilientStreamableParser.messages to prevent control token injection in multi-turn conversation history - Remove redundant sanitization in parser_state_to_response_output since ResilientStreamableParser.current_recipient already handles it - Use sanitize_harmony_recipient for full recipient strings in context.py and harmony.py routing logic Signed-off-by: Will Deines --- .../openai/parser/test_harmony_utils.py | 77 +++++++++++++++++++ .../openai/parser/harmony_utils.py | 25 +++++- vllm/entrypoints/openai/responses/context.py | 5 +- vllm/entrypoints/openai/responses/harmony.py | 7 +- 4 files changed, 105 insertions(+), 9 deletions(-) diff --git a/tests/entrypoints/openai/parser/test_harmony_utils.py b/tests/entrypoints/openai/parser/test_harmony_utils.py index 51f22566641d..3c9f94bd2fe4 100644 --- a/tests/entrypoints/openai/parser/test_harmony_utils.py +++ b/tests/entrypoints/openai/parser/test_harmony_utils.py @@ -14,6 +14,7 @@ parse_chat_input_to_harmony_message, parse_chat_output, sanitize_harmony_name, + sanitize_harmony_recipient, ) from vllm.entrypoints.openai.responses.harmony import ( response_input_to_harmony, @@ -961,6 +962,50 @@ def test_trailing_whitespace_stripped(self) -> None: assert sanitize_harmony_name("tool_name <|end|>") == "tool_name" +class TestSanitizeHarmonyRecipient: + """Tests for sanitize_harmony_recipient().""" + + def test_clean_dotted_name_unchanged(self) -> None: + assert sanitize_harmony_recipient("browser.search") == "browser.search" + + def test_clean_simple_name_unchanged(self) -> None: + assert sanitize_harmony_recipient("python") == "python" + + def test_contaminated_first_part_preserved_structure(self) -> None: + """browser<|channel|>.search → browser.search""" + assert ( + sanitize_harmony_recipient("browser<|channel|>.search") == "browser.search" + ) + + def test_contaminated_second_part(self) -> None: + """browser.search<|end|>garbage → browser.search""" + assert ( + sanitize_harmony_recipient("browser.search<|end|>garbage") + == "browser.search" + ) + + def test_pure_control_token_returns_empty(self) -> None: + assert sanitize_harmony_recipient("<|constrain|>json") == "" + + def test_functions_dotted_contaminated(self) -> None: + """functions.get_weather<|channel|>commentary → functions.get_weather""" + assert ( + sanitize_harmony_recipient( + "functions.get_weather<|channel|>commentary" + ) + == "functions.get_weather" + ) + + def test_empty_string(self) -> None: + assert sanitize_harmony_recipient("") == "" + + def test_container_dotted_contaminated(self) -> None: + """container<|channel|>.exec → container.exec""" + assert ( + sanitize_harmony_recipient("container<|channel|>.exec") == "container.exec" + ) + + class TestResilientStreamableParser: """Tests for ResilientStreamableParser error recovery.""" @@ -1046,4 +1091,36 @@ def test_constrain_in_header_skipped(self) -> None: assert len(parser.messages) == 2 assert parser.messages[0].content[0].text == "First." + def test_messages_recipients_sanitized(self) -> None: + """Messages returned by .messages should have sanitized recipients, + preventing contaminated history in multi-turn interactions.""" + encoding = get_encoding() + # Build a tool call message with a contaminated recipient + harmony_str = ( + "<|channel|>commentary" + "<|message|>Let me search.<|end|>" + "<|start|>assistant to=functions.get_weather<|channel|>commentary" + '<|constrain|>json<|message|>{"loc": "SF"}<|end|>' + ) + token_ids = encoding.encode(harmony_str, allowed_special="all") + + parser = get_streamable_parser_for_assistant() + for tok in token_ids: + parser.process(tok) + + msgs = parser.messages + # All recipients should be clean (no control tokens) + for msg in msgs: + if msg.recipient is not None: + for tok_str in ( + "<|channel|>", + "<|constrain|>", + "<|start|>", + "<|end|>", + "<|message|>", + ): + assert tok_str not in msg.recipient, ( + f"Leaked control token {tok_str!r} " + f"in message recipient: {msg.recipient!r}" + ) diff --git a/vllm/entrypoints/openai/parser/harmony_utils.py b/vllm/entrypoints/openai/parser/harmony_utils.py index b359169254ca..a0c8be2d9f91 100644 --- a/vllm/entrypoints/openai/parser/harmony_utils.py +++ b/vllm/entrypoints/openai/parser/harmony_utils.py @@ -62,6 +62,22 @@ def sanitize_harmony_name(name: str) -> str: return name[:earliest].rstrip() +def sanitize_harmony_recipient(recipient: str) -> str: + """Sanitize a structured recipient name (e.g. ``browser.search``). + + Splits on ``'.'``, sanitizes each part individually with + :func:`sanitize_harmony_name`, filters out parts that became empty, + and rejoins. This preserves the dotted structure while removing + control tokens from any component. + + Example: ``browser<|channel|>.search`` → ``browser.search`` + """ + parts = recipient.split(".") + sanitized_parts = [sanitize_harmony_name(part) for part in parts] + sanitized_parts = [p for p in sanitized_parts if p] + return ".".join(sanitized_parts) + + class ResilientStreamableParser: """Wrapper around ``StreamableParser`` that recovers from two common malformed-output patterns produced by GPT-OSS models: @@ -117,7 +133,12 @@ def process(self, token_id: int) -> None: @property def messages(self): - return self._inner.messages + msgs = self._inner.messages + for msg in msgs: + if msg.recipient is not None: + sanitized = sanitize_harmony_recipient(msg.recipient) + msg.recipient = sanitized if sanitized else None + return msgs @property def current_content(self): @@ -131,7 +152,7 @@ def current_channel(self): def current_recipient(self): raw = self._inner.current_recipient if raw is not None: - sanitized = sanitize_harmony_name(raw) + sanitized = sanitize_harmony_recipient(raw) return sanitized if sanitized else None return raw diff --git a/vllm/entrypoints/openai/responses/context.py b/vllm/entrypoints/openai/responses/context.py index 70dd0cb5dda3..82106114afd0 100644 --- a/vllm/entrypoints/openai/responses/context.py +++ b/vllm/entrypoints/openai/responses/context.py @@ -32,6 +32,7 @@ get_streamable_parser_for_assistant, render_for_completion, sanitize_harmony_name, + sanitize_harmony_recipient, ) from vllm.entrypoints.openai.parser.responses_parser import ( get_responses_parser_for_simple_context, @@ -671,7 +672,7 @@ def need_builtin_tool_call(self) -> bool: last_msg = self.messages[-1] recipient = last_msg.recipient if recipient is not None: - recipient = sanitize_harmony_name(recipient) + recipient = sanitize_harmony_recipient(recipient) if not recipient: return False if recipient.startswith("browser."): @@ -688,7 +689,7 @@ async def call_tool(self) -> list[Message]: last_msg = self.messages[-1] recipient = last_msg.recipient if recipient is not None: - recipient = sanitize_harmony_name(recipient) + recipient = sanitize_harmony_recipient(recipient) if recipient: if recipient.startswith("browser."): return await self.call_search_tool( diff --git a/vllm/entrypoints/openai/responses/harmony.py b/vllm/entrypoints/openai/responses/harmony.py index d5b045b942af..f2502d5721de 100644 --- a/vllm/entrypoints/openai/responses/harmony.py +++ b/vllm/entrypoints/openai/responses/harmony.py @@ -33,6 +33,7 @@ BUILTIN_TOOL_TO_MCP_SERVER_LABEL, flatten_chat_text_content, sanitize_harmony_name, + sanitize_harmony_recipient, ) from vllm.entrypoints.openai.responses.protocol import ( ResponseInputOutputItem, @@ -424,7 +425,7 @@ def harmony_to_response_output(message: Message) -> list[ResponseOutputItem]: output_items: list[ResponseOutputItem] = [] recipient = message.recipient if recipient is not None: - recipient = sanitize_harmony_name(recipient) + recipient = sanitize_harmony_recipient(recipient) if not recipient: recipient = None @@ -465,10 +466,6 @@ def parser_state_to_response_output( if parser.current_role != Role.ASSISTANT: return [] current_recipient = parser.current_recipient - if current_recipient is not None: - current_recipient = sanitize_harmony_name(current_recipient) - if not current_recipient: - current_recipient = None if current_recipient is not None and current_recipient.startswith("browser."): return [] From 41170ec38d79ad6bf4f959e04e05591a85b2420b Mon Sep 17 00:00:00 2001 From: Will Deines Date: Tue, 3 Mar 2026 12:59:14 -0500 Subject: [PATCH 3/7] Fix pre-commit formatting: import order, line length, trailing blank line Signed-off-by: Will Deines --- .../entrypoints/openai/parser/test_harmony_utils.py | 9 ++------- .../openai/responses/test_harmony_utils.py | 12 +++--------- .../openai/chat_completion/stream_harmony.py | 2 +- 3 files changed, 6 insertions(+), 17 deletions(-) diff --git a/tests/entrypoints/openai/parser/test_harmony_utils.py b/tests/entrypoints/openai/parser/test_harmony_utils.py index 3c9f94bd2fe4..1af4bd72577c 100644 --- a/tests/entrypoints/openai/parser/test_harmony_utils.py +++ b/tests/entrypoints/openai/parser/test_harmony_utils.py @@ -951,9 +951,7 @@ def test_pure_control_token_returns_empty(self) -> None: assert sanitize_harmony_name("<|start|>") == "" def test_multiple_tokens_earliest_wins(self) -> None: - assert ( - sanitize_harmony_name("foo<|channel|>bar<|constrain|>baz") == "foo" - ) + assert sanitize_harmony_name("foo<|channel|>bar<|constrain|>baz") == "foo" def test_empty_string(self) -> None: assert sanitize_harmony_name("") == "" @@ -990,9 +988,7 @@ def test_pure_control_token_returns_empty(self) -> None: def test_functions_dotted_contaminated(self) -> None: """functions.get_weather<|channel|>commentary → functions.get_weather""" assert ( - sanitize_harmony_recipient( - "functions.get_weather<|channel|>commentary" - ) + sanitize_harmony_recipient("functions.get_weather<|channel|>commentary") == "functions.get_weather" ) @@ -1123,4 +1119,3 @@ def test_messages_recipients_sanitized(self) -> None: f"Leaked control token {tok_str!r} " f"in message recipient: {msg.recipient!r}" ) - diff --git a/tests/entrypoints/openai/responses/test_harmony_utils.py b/tests/entrypoints/openai/responses/test_harmony_utils.py index 388c7b1d3c59..91100fcee76a 100644 --- a/tests/entrypoints/openai/responses/test_harmony_utils.py +++ b/tests/entrypoints/openai/responses/test_harmony_utils.py @@ -469,9 +469,7 @@ class TestHarmonyOutputSanitization: def test_constrain_recipient_treated_as_no_recipient(self): """<|constrain|>json as recipient should be sanitized to empty, falling through to _parse_message_no_recipient (produces message).""" - message = Message.from_role_and_content( - Role.ASSISTANT, "Some output text" - ) + message = Message.from_role_and_content(Role.ASSISTANT, "Some output text") message = message.with_channel("commentary") message = message.with_recipient("<|constrain|>json") @@ -484,13 +482,9 @@ def test_constrain_recipient_treated_as_no_recipient(self): def test_contaminated_tool_name_cleaned_in_function_call(self): """Function name with leaked <|channel|> should be sanitized.""" - message = Message.from_role_and_content( - Role.ASSISTANT, '{"location": "SF"}' - ) + message = Message.from_role_and_content(Role.ASSISTANT, '{"location": "SF"}') message = message.with_channel("commentary") - message = message.with_recipient( - "functions.get_weather<|channel|>commentary" - ) + message = message.with_recipient("functions.get_weather<|channel|>commentary") output_items = harmony_to_response_output(message) diff --git a/vllm/entrypoints/openai/chat_completion/stream_harmony.py b/vllm/entrypoints/openai/chat_completion/stream_harmony.py index 8528aa7f0682..a44b83baa902 100644 --- a/vllm/entrypoints/openai/chat_completion/stream_harmony.py +++ b/vllm/entrypoints/openai/chat_completion/stream_harmony.py @@ -12,12 +12,12 @@ from openai_harmony import StreamableParser from vllm.entrypoints.chat_utils import make_tool_call_id -from vllm.entrypoints.openai.parser.harmony_utils import sanitize_harmony_name from vllm.entrypoints.openai.engine.protocol import ( DeltaFunctionCall, DeltaMessage, DeltaToolCall, ) +from vllm.entrypoints.openai.parser.harmony_utils import sanitize_harmony_name class TokenState(NamedTuple): From 599f35c50f754c8e4246f28d155b1b87a1cce336 Mon Sep 17 00:00:00 2001 From: Will Deines Date: Tue, 3 Mar 2026 16:13:37 -0500 Subject: [PATCH 4/7] fix: use dynamic role instead of hardcoded 'assistant' and strengthen test assertion - Use self._inner.role instead of hardcoded "assistant" in ResilientStreamableParser.process for correctness with non-assistant roles - Add assertion for second message content in test_constrain_in_header_skipped Signed-off-by: Will Deines --- tests/entrypoints/openai/parser/test_harmony_utils.py | 1 + vllm/entrypoints/openai/parser/harmony_utils.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/entrypoints/openai/parser/test_harmony_utils.py b/tests/entrypoints/openai/parser/test_harmony_utils.py index 1af4bd72577c..8258fbd659ff 100644 --- a/tests/entrypoints/openai/parser/test_harmony_utils.py +++ b/tests/entrypoints/openai/parser/test_harmony_utils.py @@ -1086,6 +1086,7 @@ def test_constrain_in_header_skipped(self) -> None: # Should have produced two messages despite the malformed sequence assert len(parser.messages) == 2 assert parser.messages[0].content[0].text == "First." + assert parser.messages[1].content[0].text == "Second." def test_messages_recipients_sanitized(self) -> None: """Messages returned by .messages should have sanitized recipients, diff --git a/vllm/entrypoints/openai/parser/harmony_utils.py b/vllm/entrypoints/openai/parser/harmony_utils.py index a0c8be2d9f91..d8a85ce42ba5 100644 --- a/vllm/entrypoints/openai/parser/harmony_utils.py +++ b/vllm/entrypoints/openai/parser/harmony_utils.py @@ -116,7 +116,7 @@ def process(self, token_id: int) -> None: if state == StreamState.EXPECT_START and token_id == _TOK_CHANNEL: # Inject <|start|> + assistant role token self._inner.process(_TOK_START) - role_tokens = self._encoding.encode("assistant", allowed_special="all") + role_tokens = self._encoding.encode(self._inner.role, allowed_special="all") for rt in role_tokens: self._inner.process(rt) self._inner.process(token_id) From 714ad9026602c6eaf5c3c5878bb45bfc37af0d24 Mon Sep 17 00:00:00 2001 From: Will Deines Date: Tue, 17 Mar 2026 21:29:00 -0400 Subject: [PATCH 5/7] fix(harmony): discard free text between harmony channel messages The triggered_tags grammar's sub-dispatch loop allows all tokens between triggered tags. The model can generate trailing text after a <|end|> before EOS (e.g. restating the answer as plain text after a tool call). These free-text tokens arrive in EXPECT_START state, causing HarmonyError. Add Pattern 3 to ResilientStreamableParser: silently discard any token in EXPECT_START state that is not <|start|>. This preserves all completed messages while ignoring inter-message garbage tokens. Signed-off-by: Will Deines --- vllm/entrypoints/openai/parser/harmony_utils.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/vllm/entrypoints/openai/parser/harmony_utils.py b/vllm/entrypoints/openai/parser/harmony_utils.py index d8a85ce42ba5..317e388ae4a1 100644 --- a/vllm/entrypoints/openai/parser/harmony_utils.py +++ b/vllm/entrypoints/openai/parser/harmony_utils.py @@ -122,6 +122,14 @@ def process(self, token_id: int) -> None: self._inner.process(token_id) return + # Pattern 3: free text between harmony messages (e.g. model outputs plain + # text after a <|end|> before starting the next channel message). + # The triggered_tags grammar allows free tokens in the sub-dispatch loop, + # so the model may generate trailing text that isn't part of any channel. + # Silently discard these tokens rather than crashing with HarmonyError. + if state == StreamState.EXPECT_START and token_id != _TOK_START: + return + # Pattern 2: <|constrain|> during HEADER → enter skip mode if state == StreamState.HEADER and token_id == _TOK_CONSTRAIN: self._skip_until_message_or_end = True From b871dfc6e65aae055f9e30f625ecd789b44fdfad Mon Sep 17 00:00:00 2001 From: Will Deines Date: Wed, 18 Mar 2026 10:54:24 -0400 Subject: [PATCH 6/7] fix(harmony): derive role from parser state instead of non-existent attribute Track inner parser's current_role during process() calls and use the cached value for Pattern 1 recovery, fixing the broken self._inner.role reference. Pattern 1 only fires after <|end|> (EXPECT_START state), so at least one message has been processed and _last_known_role is guaranteed non-None. Signed-off-by: Will Deines --- vllm/entrypoints/openai/parser/harmony_utils.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/vllm/entrypoints/openai/parser/harmony_utils.py b/vllm/entrypoints/openai/parser/harmony_utils.py index 317e388ae4a1..9b4b066a0f75 100644 --- a/vllm/entrypoints/openai/parser/harmony_utils.py +++ b/vllm/entrypoints/openai/parser/harmony_utils.py @@ -98,10 +98,15 @@ def __init__(self, inner: StreamableParser, encoding): self._inner = inner self._encoding = encoding self._skip_until_message_or_end = False + self._last_known_role: str | None = None # --- error-recovering process() ----------------------------------- def process(self, token_id: int) -> None: + # Track role from inner parser while available + if self._inner.current_role is not None: + self._last_known_role = self._inner.current_role.value + # Pattern 2: skip mode – discard until <|message|> or <|end|> if self._skip_until_message_or_end: if token_id in (_TOK_MESSAGE, _TOK_END): @@ -116,7 +121,12 @@ def process(self, token_id: int) -> None: if state == StreamState.EXPECT_START and token_id == _TOK_CHANNEL: # Inject <|start|> + assistant role token self._inner.process(_TOK_START) - role_tokens = self._encoding.encode(self._inner.role, allowed_special="all") + assert self._last_known_role is not None, ( + "Pattern 1 recovery requires a prior message to establish role" + ) + role_tokens = self._encoding.encode( + self._last_known_role, allowed_special="all" + ) for rt in role_tokens: self._inner.process(rt) self._inner.process(token_id) From 3ce8294fdc299965666d7efc2b6d56d99d5ebe4b Mon Sep 17 00:00:00 2001 From: Will Deines Date: Wed, 18 Mar 2026 14:02:24 -0400 Subject: [PATCH 7/7] fix(harmony): prevent last_tok from recording discarded tokens and fix recipient misrouting Bug 1: ResilientStreamableParser.process() silently discards tokens in Pattern 2 (skip mode) and Pattern 3 (free text in EXPECT_START), but StreamingHarmonyContext.append_output() unconditionally set last_tok to the most recent token. If that token was discarded, render_for_completion() would fail with IndexError searching for it. Now track last_consumed_token in the parser and only update last_tok when a token was actually forwarded. Also add a bounds check in render_for_completion() as a safety net. Bug 2: sanitize_harmony_recipient() filtered out empty parts after sanitization, collapsing e.g. "functions.<|constrain|>json" to "functions" (bare), which failed startswith("functions.") checks and fell through to incorrect routing. Now return empty string when any component sanitizes to empty, triggering the safe no-recipient fallback. Signed-off-by: Will Deines --- .../openai/parser/test_harmony_utils.py | 90 +++++++++++++++++++ .../openai/parser/harmony_utils.py | 19 +++- vllm/entrypoints/openai/responses/context.py | 9 +- 3 files changed, 112 insertions(+), 6 deletions(-) diff --git a/tests/entrypoints/openai/parser/test_harmony_utils.py b/tests/entrypoints/openai/parser/test_harmony_utils.py index 8258fbd659ff..47c53032c8fa 100644 --- a/tests/entrypoints/openai/parser/test_harmony_utils.py +++ b/tests/entrypoints/openai/parser/test_harmony_utils.py @@ -1001,6 +1001,14 @@ def test_container_dotted_contaminated(self) -> None: sanitize_harmony_recipient("container<|channel|>.exec") == "container.exec" ) + def test_full_component_contamination_returns_empty(self) -> None: + """functions.<|constrain|>json → "" (not "functions")""" + assert sanitize_harmony_recipient("functions.<|constrain|>json") == "" + + def test_container_full_component_contamination_returns_empty(self) -> None: + """container.<|channel|>commentary → "" (not "container")""" + assert sanitize_harmony_recipient("container.<|channel|>commentary") == "" + class TestResilientStreamableParser: """Tests for ResilientStreamableParser error recovery.""" @@ -1120,3 +1128,85 @@ def test_messages_recipients_sanitized(self) -> None: f"Leaked control token {tok_str!r} " f"in message recipient: {msg.recipient!r}" ) + + def test_last_consumed_token_tracks_normal_processing(self) -> None: + """Normal tokens forwarded to inner parser update last_consumed_token.""" + encoding = get_encoding() + harmony_str = "<|channel|>final<|message|>Hello world<|end|>" + token_ids = encoding.encode(harmony_str, allowed_special="all") + + parser = get_streamable_parser_for_assistant() + assert parser.last_consumed_token is None + + for tok in token_ids: + parser.process(tok) + + # After processing, last_consumed_token should be the last token + assert parser.last_consumed_token == token_ids[-1] + + def test_pattern3_discarded_tokens_not_in_last_consumed(self) -> None: + """Free-text tokens in EXPECT_START don't update last_consumed_token.""" + encoding = get_encoding() + # Complete a message to reach EXPECT_START state + first_msg = "<|channel|>final<|message|>First.<|end|>" + first_tokens = encoding.encode(first_msg, allowed_special="all") + + parser = get_streamable_parser_for_assistant() + for tok in first_tokens: + parser.process(tok) + + last_consumed_after_first = parser.last_consumed_token + assert last_consumed_after_first is not None + + # Now feed free-text tokens (not <|start|>) — these should be discarded + garbage_tokens = encoding.encode("some free text", allowed_special="all") + for tok in garbage_tokens: + parser.process(tok) + + # last_consumed_token should NOT have changed + assert parser.last_consumed_token == last_consumed_after_first + + def test_pattern2_skip_mode_discarded_tokens_not_in_last_consumed(self) -> None: + """Tokens skipped during Pattern 2 don't update last_consumed_token.""" + encoding = get_encoding() + # Complete a first message + first_msg = "<|channel|>final<|message|>First.<|end|>" + first_tokens = encoding.encode(first_msg, allowed_special="all") + + # Build second message with <|constrain|> in header + start_tok = encoding.encode("<|start|>", allowed_special="all") + role_toks = encoding.encode("assistant", allowed_special="all") + constrain_tok = encoding.encode("<|constrain|>", allowed_special="all") + json_toks = encoding.encode("json", allowed_special="all") + message_tok = encoding.encode("<|message|>", allowed_special="all") + + parser = get_streamable_parser_for_assistant() + for tok in first_tokens: + parser.process(tok) + + last_consumed_after_first = parser.last_consumed_token + + # Feed <|start|>assistant to enter HEADER state + for tok in start_tok: + parser.process(tok) + for tok in role_toks: + parser.process(tok) + + last_consumed_after_header = parser.last_consumed_token + + # Feed <|constrain|> to enter skip mode + for tok in constrain_tok: + parser.process(tok) + + # last_consumed should not change (constrain triggers skip, not forwarded) + assert parser.last_consumed_token == last_consumed_after_header + + # Feed garbage tokens in skip mode — should not update + for tok in json_toks: + parser.process(tok) + assert parser.last_consumed_token == last_consumed_after_header + + # Feed <|message|> to exit skip mode — this IS forwarded + for tok in message_tok: + parser.process(tok) + assert parser.last_consumed_token != last_consumed_after_first diff --git a/vllm/entrypoints/openai/parser/harmony_utils.py b/vllm/entrypoints/openai/parser/harmony_utils.py index 9b4b066a0f75..17de269d0bfc 100644 --- a/vllm/entrypoints/openai/parser/harmony_utils.py +++ b/vllm/entrypoints/openai/parser/harmony_utils.py @@ -66,15 +66,18 @@ def sanitize_harmony_recipient(recipient: str) -> str: """Sanitize a structured recipient name (e.g. ``browser.search``). Splits on ``'.'``, sanitizes each part individually with - :func:`sanitize_harmony_name`, filters out parts that became empty, - and rejoins. This preserves the dotted structure while removing - control tokens from any component. + :func:`sanitize_harmony_name`, and rejoins. If any component is + entirely consumed by control tokens (sanitizes to empty), the whole + recipient is considered corrupt and an empty string is returned so + that callers fall back to the safe no-recipient path. Example: ``browser<|channel|>.search`` → ``browser.search`` + Example: ``functions.<|constrain|>json`` → ``""`` """ parts = recipient.split(".") sanitized_parts = [sanitize_harmony_name(part) for part in parts] - sanitized_parts = [p for p in sanitized_parts if p] + if any(not p for p in sanitized_parts): + return "" return ".".join(sanitized_parts) @@ -99,6 +102,7 @@ def __init__(self, inner: StreamableParser, encoding): self._encoding = encoding self._skip_until_message_or_end = False self._last_known_role: str | None = None + self._last_consumed_token: int | None = None # --- error-recovering process() ----------------------------------- @@ -112,6 +116,7 @@ def process(self, token_id: int) -> None: if token_id in (_TOK_MESSAGE, _TOK_END): self._skip_until_message_or_end = False self._inner.process(token_id) + self._last_consumed_token = token_id # else: silently discard the token return @@ -130,6 +135,7 @@ def process(self, token_id: int) -> None: for rt in role_tokens: self._inner.process(rt) self._inner.process(token_id) + self._last_consumed_token = token_id return # Pattern 3: free text between harmony messages (e.g. model outputs plain @@ -146,6 +152,7 @@ def process(self, token_id: int) -> None: return self._inner.process(token_id) + self._last_consumed_token = token_id # --- delegated properties ----------------------------------------- @@ -186,6 +193,10 @@ def state(self): def last_content_delta(self): return self._inner.last_content_delta + @property + def last_consumed_token(self) -> int | None: + return self._last_consumed_token + REASONING_EFFORT = { "high": ReasoningEffort.HIGH, diff --git a/vllm/entrypoints/openai/responses/context.py b/vllm/entrypoints/openai/responses/context.py index 82106114afd0..49b85fdc01b8 100644 --- a/vllm/entrypoints/openai/responses/context.py +++ b/vllm/entrypoints/openai/responses/context.py @@ -883,7 +883,9 @@ def append_output(self, output: RequestOutput) -> None: self.current_turn_metrics.reset() # Check if the current token is part of reasoning content self._update_num_reasoning_tokens() - self.last_tok = tok + consumed = self.parser.last_consumed_token + if consumed is not None: + self.last_tok = consumed if len(self._messages) - self.num_init_messages < len(self.parser.messages): self._messages.extend( self.parser.messages[len(self._messages) - self.num_init_messages :] @@ -917,7 +919,10 @@ def render_for_completion(self) -> list[int]: last_n = -1 to_process = [] - while rendered_tokens[last_n] != self.last_tok: + while ( + abs(last_n) <= len(rendered_tokens) + and rendered_tokens[last_n] != self.last_tok + ): to_process.append(rendered_tokens[last_n]) last_n -= 1 for tok in reversed(to_process):