Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion python/turboapi/async_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import threading

from .exceptions import HTTPException
from .responses import Response
from .responses import Response, StreamingResponse

_dumps = json.dumps
_thread_local = threading.local()
Expand Down Expand Up @@ -148,6 +148,15 @@ def run_coroutine(coro):


def _normalize_response_tuple(result):
if isinstance(result, StreamingResponse):
# 5-tuple streaming ABI: (status, content_type, b"", iterator, headers)
return (
result.status_code,
result.media_type or "text/plain",
b"",
result.body_iter(),
result.headers or {},
)
if isinstance(result, Response):
body = result.body if isinstance(result.body, bytes) else result.body.encode("utf-8")
return (result.status_code, result.media_type or "application/json", body)
Expand Down
47 changes: 47 additions & 0 deletions python/turboapi/request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1257,10 +1257,19 @@ def create_pos_handler(original_handler):
_dumps = _json.dumps
_returns_md = _returns_model(original_handler)
from turboapi.responses import Response as _Response
from turboapi.responses import StreamingResponse as _StreamingResponse

def pos_handler(*args):
try:
result = original_handler(*args)
if isinstance(result, _StreamingResponse):
return (
result.status_code,
result.media_type or "text/plain",
b"",
result.body_iter(),
result.headers or {},
)
if isinstance(result, _Response):
body = (
result.body if isinstance(result.body, bytes) else result.body.encode("utf-8")
Expand All @@ -1286,10 +1295,19 @@ def create_async_pos_handler(original_handler):
_dumps = _json.dumps
_returns_md = _returns_model(original_handler)
from turboapi.responses import Response as _Response
from turboapi.responses import StreamingResponse as _StreamingResponse

async def pos_handler(*args):
try:
result = await original_handler(*args)
if isinstance(result, _StreamingResponse):
return (
result.status_code,
result.media_type or "text/plain",
b"",
result.body_iter(),
result.headers or {},
)
if isinstance(result, _Response):
body = (
result.body if isinstance(result.body, bytes) else result.body.encode("utf-8")
Expand Down Expand Up @@ -1338,12 +1356,22 @@ def create_fast_handler(original_handler, route_definition):
_returns_md = _returns_model(original_handler)

from turboapi.responses import Response as _Response
from turboapi.responses import StreamingResponse as _StreamingResponse

if not param_names:
# Zero-arg handler: fastest possible path — returns 3-tuple for Zig
def fast_handler_noargs(**kwargs):
try:
result = original_handler()
if isinstance(result, _StreamingResponse):
ct = result.media_type or "text/plain"
return (
result.status_code,
ct,
b"",
result.body_iter(),
result.headers or {},
)
if isinstance(result, _Response):
ct = result.media_type or "application/json"
body = (
Expand Down Expand Up @@ -1402,6 +1430,15 @@ def fast_handler(**kwargs):

result = original_handler(**call_kwargs)

if isinstance(result, _StreamingResponse):
ct = result.media_type or "text/plain"
return (
result.status_code,
ct,
b"",
result.body_iter(),
result.headers or {},
)
if isinstance(result, _Response):
ct = result.media_type or "application/json"
body = (
Expand Down Expand Up @@ -1449,6 +1486,7 @@ def create_fast_async_handler(original_handler, route_definition, eager: bool =
_returns_md = _returns_model(original_handler)

from turboapi.responses import Response as _Response
from turboapi.responses import StreamingResponse as _StreamingResponse

def build_call_kwargs(kwargs):
call_kwargs = {}
Expand Down Expand Up @@ -1511,6 +1549,15 @@ async def fast_handler(**kwargs):
call_kwargs = build_call_kwargs(kwargs)
result = await original_handler(**call_kwargs)

if isinstance(result, _StreamingResponse):
ct = result.media_type or "text/plain"
return (
result.status_code,
ct,
b"",
result.body_iter(),
result.headers or {},
)
if isinstance(result, _Response):
ct = result.media_type or "application/json"
body = (
Expand Down
56 changes: 55 additions & 1 deletion python/turboapi/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def __init__(
self._cookies: list[str] = []

async def body_iterator(self) -> AsyncIterator[bytes]:
"""Iterate over the response body chunks."""
"""Iterate over the response body chunks (async)."""
if hasattr(self._content_iterator, "__aiter__"):
async for chunk in self._content_iterator:
if isinstance(chunk, str):
Expand All @@ -184,6 +184,60 @@ async def body_iterator(self) -> AsyncIterator[bytes]:
else:
yield chunk

def body_iter(self):
"""Return a sync iterator of bytes chunks suitable for the Zig
chunked-transfer write path.

Sync sources are wrapped directly. Async sources are driven one
chunk at a time via the worker thread's event loop — each call
to next() runs the loop until the source yields one chunk. That
gives real-time streaming for SSE / token streams without
introducing a separate event loop or background thread.
"""
src = self._content_iterator
if hasattr(src, "__aiter__"):
return _AsyncToSyncChunkIterator(src)
return _sync_chunk_iter(src)


def _sync_chunk_iter(src):
"""Encode str chunks → bytes; pass bytes through."""
for chunk in src:
if isinstance(chunk, str):
yield chunk.encode("utf-8")
else:
yield chunk


class _AsyncToSyncChunkIterator:
"""Drive an async iterator one chunk at a time on the worker's loop.

Reuses the per-thread event loop from `turboapi.async_pool` so we
don't spawn a fresh loop per chunk. StopAsyncIteration → StopIteration.
"""

__slots__ = ("_aiter", "_loop_runner")

def __init__(self, async_source):
self._aiter = async_source.__aiter__()
self._loop_runner = None

def __iter__(self):
return self

def __next__(self):
# Lazy-import to avoid a circular import at module load.
if self._loop_runner is None:
from turboapi.async_pool import ensure_event_loop
self._loop_runner = ensure_event_loop().run_until_complete
try:
chunk = self._loop_runner(self._aiter.__anext__())
except StopAsyncIteration:
raise StopIteration from None
if isinstance(chunk, str):
return chunk.encode("utf-8")
return chunk


class FileResponse(Response):
"""File response for serving files from disk.
Expand Down
Loading