Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ jobs:

- name: Run tests (TurboAPI + TurboPG)
run: |
DESELECT="--deselect tests/test_fastapi_parity.py::TestWebSocket::test_websocket_send_receive"
DESELECT="$DESELECT --deselect tests/test_fastapi_parity.py::TestWebSocket::test_websocket_send_json"
DESELECT=""
if [ "$RUNNER_OS" = "macOS" ]; then
DESELECT="$DESELECT --deselect tests/test_query_and_headers.py::test_query_parameters_comprehensive"
fi
Expand Down
4 changes: 1 addition & 3 deletions .github/workflows/pre-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ jobs:

- name: Run tests
run: |
python -m pytest tests/ -v --tb=short \
--deselect tests/test_fastapi_parity.py::TestWebSocket::test_websocket_send_receive \
--deselect tests/test_fastapi_parity.py::TestWebSocket::test_websocket_send_json
python -m pytest tests/ -v --tb=short
# Linux x86_64 segfaults on Zig thread cleanup between tests (known issue)
continue-on-error: ${{ matrix.os == 'ubuntu-latest' }}

Expand Down
4 changes: 1 addition & 3 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ docker compose up --build

```bash
# Python tests (requires Python 3.14t)
uv run --python 3.14t python -m pytest tests/ -p no:anchorpy \
--deselect tests/test_fastapi_parity.py::TestWebSocket
uv run --python 3.14t python -m pytest tests/ -p no:anchorpy

# Zig unit tests (includes fuzz seed corpus)
cd zig && zig build test
Expand All @@ -38,7 +37,6 @@ cd zig && zig build test --fuzz
```

**Known test exclusions:**
- WebSocket tests (`TestWebSocket`) — pre-existing failure, deselect
- `anchorpy` plugin — causes import error, disable with `-p no:anchorpy`

## Benchmarks
Expand Down
44 changes: 24 additions & 20 deletions tests/test_fastapi_parity.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
WebSocket, exception handling, OpenAPI, TestClient, static files, lifespan, etc.
"""

import asyncio
import json
import os
import tempfile

import pytest
from turboapi import (
APIKeyCookie,
APIKeyHeader,
Expand Down Expand Up @@ -566,25 +566,29 @@ def test_websocket_disconnect_exception(self):
assert exc.code == 1001
assert exc.reason == "Going away"

@pytest.mark.asyncio
async def test_websocket_send_receive(self):
ws = WebSocket()
await ws.accept()
assert ws.client_state == "connected"

await ws._receive_queue.put({"type": "text", "data": "hello"})
msg = await ws.receive_text()
assert msg == "hello"

@pytest.mark.asyncio
async def test_websocket_send_json(self):
ws = WebSocket()
await ws.accept()
await ws.send_json({"key": "value"})

sent = await ws._send_queue.get()
assert sent["type"] == "text"
assert json.loads(sent["data"]) == {"key": "value"}
def test_websocket_send_receive(self):
async def _run():
ws = WebSocket()
await ws.accept()
assert ws.client_state == "connected"

await ws._receive_queue.put({"type": "text", "data": "hello"})
msg = await ws.receive_text()
assert msg == "hello"

asyncio.run(_run())

def test_websocket_send_json(self):
async def _run():
ws = WebSocket()
await ws.accept()
await ws.send_json({"key": "value"})

sent = await ws._send_queue.get()
assert sent["type"] == "text"
assert json.loads(sent["data"]) == {"key": "value"}

asyncio.run(_run())


# ============================================================
Expand Down
213 changes: 213 additions & 0 deletions tests/test_sse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
"""Tests for SSE (Server-Sent Events) wire format and EventSourceResponse.

Covers `python/turboapi/sse.py`:
* `ServerSentEvent` dataclass + `.encode()`
* `format_sse_event` wire format
* `EventSourceResponse` headers / media type / iterator wrapping

Note: end-to-end streaming over the Zig HTTP server is NOT exercised here —
the current Zig dispatch path sends `Response.body` directly and does not
yet iterate `StreamingResponse.body_iterator()`. Tracking that gap in a
follow-up issue. These tests lock in the parts that are functional today
(everything below the transport layer).
"""

from __future__ import annotations

import asyncio
import json

import pytest
from turboapi.sse import EventSourceResponse, ServerSentEvent, format_sse_event

# ---------------------------------------------------------------------------
# format_sse_event / ServerSentEvent.encode wire format
# ---------------------------------------------------------------------------


def test_format_data_only_string():
out = format_sse_event(ServerSentEvent(data="hello"))
assert out == "data: hello\n\n"


def test_format_data_only_dict_serializes_json():
out = format_sse_event(ServerSentEvent(data={"a": 1, "b": "two"}))
# Single-line JSON → single `data:` line
assert out.startswith("data: ")
assert out.endswith("\n\n")
payload = out.removeprefix("data: ").removesuffix("\n\n")
assert json.loads(payload) == {"a": 1, "b": "two"}


def test_format_data_multiline_string_emits_one_data_per_line():
out = format_sse_event(ServerSentEvent(data="line1\nline2\nline3"))
# Each newline-separated chunk becomes its own `data:` line
assert "data: line1" in out
assert "data: line2" in out
assert "data: line3" in out
assert out.endswith("\n\n")


def test_format_event_name():
out = format_sse_event(ServerSentEvent(data="hi", event="update"))
assert "event: update\n" in out
assert "data: hi\n" in out


def test_format_id_field():
out = format_sse_event(ServerSentEvent(data="hi", id="42"))
assert "id: 42\n" in out


def test_format_id_can_be_int():
out = format_sse_event(ServerSentEvent(data="hi", id=99))
assert "id: 99\n" in out


def test_format_retry_field():
out = format_sse_event(ServerSentEvent(data="hi", retry=3000))
assert "retry: 3000\n" in out


def test_format_comment_only():
out = format_sse_event(ServerSentEvent(comment="ping"))
# Comments are `:` prefixed per the SSE spec.
assert out.startswith(": ping")
assert out.endswith("\n\n")


def test_format_multiline_comment():
out = format_sse_event(ServerSentEvent(comment="line1\nline2"))
assert ": line1" in out
assert ": line2" in out


def test_format_combined_fields_in_canonical_order():
out = format_sse_event(
ServerSentEvent(
data="payload",
event="msg",
id=7,
retry=1500,
comment="diagnostic",
)
)
# Field order per the sse.py implementation: comment → event → id → retry → data.
# Locking that order in so reorder regressions are visible.
lines = out.splitlines()
nonempty = [ln for ln in lines if ln]
assert nonempty[0].startswith(": ")
assert nonempty[1].startswith("event: ")
assert nonempty[2].startswith("id: ")
assert nonempty[3].startswith("retry: ")
assert nonempty[4].startswith("data: ")


def test_encode_method_matches_format_function():
evt = ServerSentEvent(data={"x": 1}, event="e", id="1")
assert evt.encode() == format_sse_event(evt)


def test_format_terminates_with_double_newline():
# SSE protocol requires \n\n to delimit events.
out = format_sse_event(ServerSentEvent(data="x"))
assert out.endswith("\n\n")


# ---------------------------------------------------------------------------
# EventSourceResponse construction
# ---------------------------------------------------------------------------


async def _gen_three():
yield ServerSentEvent(data="a")
yield ServerSentEvent(data="b")
yield ServerSentEvent(data="c")


def test_event_source_response_sets_sse_headers():
resp = EventSourceResponse(_gen_three())
assert resp.media_type == "text/event-stream"
# SSE-required headers should be present:
assert resp.headers["content-type"] == "text/event-stream"
assert resp.headers["cache-control"] == "no-cache"
assert resp.headers["connection"] == "keep-alive"
# Disable nginx/proxy buffering so events flush immediately:
assert resp.headers["x-accel-buffering"] == "no"


def test_event_source_response_status_code_default_200():
resp = EventSourceResponse(_gen_three())
assert resp.status_code == 200


def test_event_source_response_custom_headers_merge():
resp = EventSourceResponse(_gen_three(), headers={"x-custom": "yes"})
assert resp.headers["x-custom"] == "yes"
# SSE defaults must not be overridden by partial header dict:
assert resp.headers["content-type"] == "text/event-stream"


def test_event_source_response_iterates_events_and_encodes_each():
"""Drain the wrapped iterator and confirm each yield is SSE-encoded."""
resp = EventSourceResponse(_gen_three(), ping_interval=999)

async def drain():
out = []
async for chunk in resp._wrap_with_ping(_gen_three()):
out.append(chunk)
if len(out) >= 3:
break
return out

chunks = asyncio.run(drain())
assert len(chunks) == 3
assert chunks[0] == "data: a\n\n"
assert chunks[1] == "data: b\n\n"
assert chunks[2] == "data: c\n\n"


def test_event_source_response_auto_wraps_dict_as_data():
async def gen():
yield {"key": "val"}

resp = EventSourceResponse(gen(), ping_interval=999)

async def drain():
async for chunk in resp._wrap_with_ping(gen()):
return chunk

chunk = asyncio.run(drain())
assert chunk.startswith("data: ")
assert json.loads(chunk.removeprefix("data: ").removesuffix("\n\n")) == {"key": "val"}


def test_event_source_response_auto_wraps_string_as_data():
async def gen():
yield "plain"

resp = EventSourceResponse(gen(), ping_interval=999)

async def drain():
async for chunk in resp._wrap_with_ping(gen()):
return chunk

chunk = asyncio.run(drain())
assert chunk == "data: plain\n\n"


# ---------------------------------------------------------------------------
# Streaming-over-Zig integration gap (acknowledged, not executed)
# ---------------------------------------------------------------------------


@pytest.mark.skip(
reason=(
"End-to-end SSE streaming over the Zig HTTP server is not yet wired up: "
"request_handler dispatches `Response.body` directly and does not iterate "
"StreamingResponse.body_iterator(). Tracked in follow-up issue."
)
)
def test_event_source_response_end_to_end_over_zig_server():
"""Reserved placeholder — un-skip when the Zig dispatch path streams chunks."""
raise NotImplementedError