Dev/issue#134
Conversation
- Add detailed exception logging for sync and async HTTP streaming requests - Include request context such as url, method, stream, timeout, status code, and request id - Handle SSE done events consistently across streaming parsers - Avoid swallowing stream read failures while preserving original exception behavior - Fix pylint warning by avoiding redefinition of built-in id
There was a problem hiding this comment.
Code Review
This pull request introduces several enhancements and bug fixes, including support for a wait_timeout_seconds parameter in task polling, the addition of the asynchronous AioTextReRank API, and integration of the trust_env configuration for HTTP clients. It also refactors file URI path resolution in OSS utilities and improves WebSocket connection handling in Qwen TTS. The review feedback highlights two main issues: first, the SSE stream parsers fail to reset the event_type state when encountering blank lines, which can cause subsequent events to inherit incorrect types; second, the task polling wait loops should adjust their sleep durations so they do not exceed the remaining timeout when wait_timeout_seconds is specified.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| async for line in response.content: | ||
| if line: | ||
| line = line.decode("utf8") | ||
| line = line.rstrip("\n").rstrip("\r") | ||
| if line.startswith("event:"): | ||
| event_type = line[len("event:") :].strip() | ||
| if event_type == "error": | ||
| is_error = True |
There was a problem hiding this comment.
The event_type state is not reset when a blank line (event separator) is encountered in the SSE stream. This can cause subsequent events to inherit the previous event's type (e.g., if the previous event was done, all subsequent data lines will be incorrectly ignored). Reset event_type to None when an empty line is parsed.
| async for line in response.content: | |
| if line: | |
| line = line.decode("utf8") | |
| line = line.rstrip("\n").rstrip("\r") | |
| if line.startswith("event:"): | |
| event_type = line[len("event:") :].strip() | |
| if event_type == "error": | |
| is_error = True | |
| async for line in response.content: | |
| if line: | |
| line = line.decode("utf8") | |
| line = line.rstrip("\\n").rstrip("\\r") | |
| if not line: | |
| event_type = None | |
| continue | |
| if line.startswith("event:"): | |
| event_type = line[len("event:") :].strip() | |
| if event_type == "error": | |
| is_error = True |
| for line in response.iter_lines(): | ||
| if line: | ||
| line = line.decode("utf8") | ||
| line = line.rstrip("\n").rstrip("\r") | ||
| if line.startswith("event:"): | ||
| event_type = line[len("event:") :].strip() | ||
| if event_type == "error": | ||
| is_error = True |
There was a problem hiding this comment.
The event_type state is not reset when a blank line (event separator) is encountered in the SSE stream. This can cause subsequent events to inherit the previous event's type (e.g., if the previous event was done, all subsequent data lines will be incorrectly ignored). Reset event_type to None when an empty line is parsed.
for line in response.iter_lines():
if line:
line = line.decode("utf8")
line = line.rstrip("\\n").rstrip("\\r")
if not line:
event_type = None
continue
if line.startswith("event:"):
event_type = line[len("event:") :].strip()
if event_type == "error":
is_error = True
else:
event_type = None| for line in response.iter_lines(): | ||
| if line: | ||
| line = line.decode("utf8") | ||
| line = line.rstrip("\n").rstrip("\r") | ||
| if line.startswith("id:"): | ||
| event_id = line[len("id:") :] | ||
| event.id = event_id.strip() | ||
| elif line.startswith("event:"): | ||
| eventType = line[len("event:") :].strip() | ||
| event.eventType = eventType | ||
| if eventType == "error": | ||
| is_error = True |
There was a problem hiding this comment.
The eventType state is not reset when a blank line (event separator) is encountered in the SSE stream. This can cause subsequent events to inherit the previous event's type (e.g., if the previous event was done, all subsequent data lines will be incorrectly ignored). Reset eventType to None when an empty line is parsed.
for line in response.iter_lines():
if line:
line = line.decode("utf8")
line = line.rstrip("\\n").rstrip("\\r")
if not line:
eventType = None
continue
if line.startswith("id:"):
event_id = line[len("id:") :]
event.id = event_id.strip()
elif line.startswith("event:"):
eventType = line[len("event:") :].strip()
event.eventType = eventType
if eventType == "error":
is_error = True
else:
eventType = None| async for line in response.content: | ||
| if line: | ||
| line = line.decode("utf8") | ||
| line = line.rstrip("\n").rstrip("\r") | ||
| if line.startswith("event:"): | ||
| event_type = line[len("event:") :].strip() | ||
| if event_type == "error": | ||
| is_error = True |
There was a problem hiding this comment.
The event_type state is not reset when a blank line (event separator) is encountered in the SSE stream. This can cause subsequent events to inherit the previous event's type (e.g., if the previous event was done, all subsequent data lines will be incorrectly ignored). Reset event_type to None when an empty line is parsed.
| async for line in response.content: | |
| if line: | |
| line = line.decode("utf8") | |
| line = line.rstrip("\n").rstrip("\r") | |
| if line.startswith("event:"): | |
| event_type = line[len("event:") :].strip() | |
| if event_type == "error": | |
| is_error = True | |
| async for line in response.content: | |
| if line: | |
| line = line.decode("utf8") | |
| line = line.rstrip("\\n").rstrip("\\r") | |
| if not line: | |
| event_type = None | |
| continue | |
| if line.startswith("event:"): | |
| event_type = line[len("event:") :].strip() | |
| if event_type == "error": | |
| is_error = True |
| if ( | ||
| wait_timeout_seconds is not None | ||
| and time.monotonic() - start_time | ||
| >= wait_timeout_seconds | ||
| ): | ||
| raise TimeoutException(f"Wait task {task_id} timeout.") | ||
| await asyncio.sleep(wait_seconds) # 异步等待 |
There was a problem hiding this comment.
When wait_timeout_seconds is specified, sleeping for the full wait_seconds interval can cause the wait loop to block significantly longer than the requested timeout. Adjust the sleep duration to not exceed the remaining timeout.
| if ( | |
| wait_timeout_seconds is not None | |
| and time.monotonic() - start_time | |
| >= wait_timeout_seconds | |
| ): | |
| raise TimeoutException(f"Wait task {task_id} timeout.") | |
| await asyncio.sleep(wait_seconds) # 异步等待 | |
| if wait_timeout_seconds is not None: | |
| remaining = wait_timeout_seconds - (time.monotonic() - start_time) | |
| if remaining <= 0: | |
| raise TimeoutException(f"Wait task {task_id} timeout.") | |
| await asyncio.sleep(min(wait_seconds, remaining)) | |
| else: | |
| await asyncio.sleep(wait_seconds) |
| if ( | ||
| wait_timeout_seconds is not None | ||
| and time.monotonic() - start_time >= wait_timeout_seconds | ||
| ): | ||
| raise TimeoutException(f"Wait task {task_id} timeout.") | ||
| await asyncio.sleep(wait_seconds) # 异步等待 |
There was a problem hiding this comment.
When wait_timeout_seconds is specified, sleeping for the full wait_seconds interval can cause the wait loop to block significantly longer than the requested timeout. Adjust the sleep duration to not exceed the remaining timeout.
| if ( | |
| wait_timeout_seconds is not None | |
| and time.monotonic() - start_time >= wait_timeout_seconds | |
| ): | |
| raise TimeoutException(f"Wait task {task_id} timeout.") | |
| await asyncio.sleep(wait_seconds) # 异步等待 | |
| if wait_timeout_seconds is not None: | |
| remaining = wait_timeout_seconds - (time.monotonic() - start_time) | |
| if remaining <= 0: | |
| raise TimeoutException(f"Wait task {task_id} timeout.") | |
| await asyncio.sleep(min(wait_seconds, remaining)) | |
| else: | |
| await asyncio.sleep(wait_seconds) |
Description
[Describe what this PR does and why]
Related Issue: Fixes #[issue_number] or Relates to #[issue_number]
Security Considerations: [Check if API keys or sensitive credentials are exposed in code/logs]
Type of Change
Component(s) Affected
Checklist
Testing
[How to test these changes]
Additional Notes
[Optional: any other context]