diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e871cfd --- /dev/null +++ b/.gitignore @@ -0,0 +1,29 @@ +# Byte-compiled / cached +__pycache__/ +*.py[cod] +*$py.class + +# Build / packaging +build/ +dist/ +*.egg-info/ +*.egg +pip-wheel-metadata/ + +# Virtualenvs +.venv/ +venv/ +env/ + +# Test / coverage +.pytest_cache/ +.coverage +.coverage.* +htmlcov/ +.tox/ + +# IDE / editor +.idea/ +.vscode/ +*.swp +.DS_Store diff --git a/pyproject.toml b/pyproject.toml index ea9cfae..d63804e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,10 @@ description = "Python-Based implementation of SmartFoxServer2X (SFS2X) Protocol. authors = [{name="Zewsic", email="me@zewsic.pro"}] readme = "readme.md" license = {text = "MIT"} -dependencies = [] +dependencies = ["websockets>=12"] + +[project.optional-dependencies] +crypto = ["pycryptodome"] ################################################## # Ruff Config diff --git a/readme.md b/readme.md index 21047b1..768349a 100644 --- a/readme.md +++ b/readme.md @@ -1,30 +1,40 @@ # ZewSFS -**ZewSFS** is a Python-based implementation of the [SmartFoxServer 2X (SFS2X)](https://www.smartfoxserver.com/) -protocol, offering both client and server-side capabilities. This library provides fundamental data types, transport -abstractions (TCP/WebSocket in the future), message encoding/decoding, and extensibility for custom encryption and -compression. +**ZewSFS** is a Python implementation of the +[SmartFoxServer 2X (SFS2X)](https://www.smartfoxserver.com/) binary protocol — +both the client and the server side. It ships the rich SFS2X data model +(`SFSObject`, `SFSArray`, typed fields), a message codec with optional +compression and AES encryption, and pluggable transports for raw TCP and +WebSockets (the standard binary sub-protocol introduced in SFS2X v2.13). -## Table of Contents +``` + ┌──────────────────────────┐ + │ Application │ + └────────────┬─────────────┘ + Message(controller, action, payload) + ┌────────────┴─────────────┐ + │ sfs2x.protocol (codec) │ ← compression, AES + └────────────┬─────────────┘ + on-wire bytes + ┌────────────┴─────────────┐ + │ sfs2x.transport (TCP/WS) │ + └──────────────────────────┘ +``` + +## Contents - [Features](#features) - [Installation](#installation) - -- [Quick Start](#quick-start) - - [Client Example](#client-example) - - [Server Example](#server-example) - -- [Modules Overview](#modules-overview) - - [Core](#core) - - [Protocol](#protocol) - - [Transport](#transport) - -- [Usage Examples](#usage-examples) - - [Working with `SFSObject` and `SFSArray`](#working-with-sfsobject-and-sfsarray) - - [Serialization / Deserialization](#serialization--deserialization) - - [Encrypted or Compressed Packets](#encrypted-or-compressed-packets) - -- [Development Status](#development-status) +- [Quick start](#quick-start) + - [TCP client](#tcp-client) + - [TCP server](#tcp-server) + - [WebSocket client](#websocket-client) + - [WebSocket server](#websocket-server) +- [Modules overview](#modules-overview) +- [Working with `SFSObject` / `SFSArray`](#working-with-sfsobject--sfsarray) +- [Compression and encryption](#compression-and-encryption) +- [URL → transport mapping](#url--transport-mapping) +- [Status and roadmap](#status-and-roadmap) - [Contributing](#contributing) - [License](#license) @@ -32,160 +42,228 @@ compression. ## Features -- **Core**: Rich, low-level data structures (e.g., `SFSObject`, `SFSArray`) mirroring SmartFoxServer object models. -- **Protocol**: Easy-to-use encoding/decoding functions to convert between raw bytes and high-level `Message` objects. -- **Transport**: Ready-made TCP server (via `TCPAcceptor`) and client (`TCPTransport`) for sending and receiving SFS2X - messages. -- **Encryption**: Optional AES-128-CBC support via [PyCryptodome](https://pypi.org/project/pycryptodome/). - ---- +- **Core data model** — `SFSObject`, `SFSArray` and all the typed primitives + (`Bool`, `Byte`, `Short`, `Int`, `Long`, `Float`, `Double`, `UtfString`, + `Text`, and their array variants) with byte-perfect round-trip encoding. +- **Protocol codec** — `Message ↔ bytes` with optional `zlib` compression + over a configurable threshold and optional AES-128-CBC encryption. +- **TCP transport** — async client (`TCPTransport`) and acceptor + (`TCPAcceptor`) built on `asyncio.open_connection` / + `asyncio.start_server`. +- **WebSocket transport** — async client (`WSTransport`) and acceptor + (`WSAcceptor`) speaking SFS2X v2.13+'s binary WebSocket sub-protocol over + `ws://` and `wss://`. Each WebSocket binary frame carries one complete + SFS2X packet so the codec is shared with the TCP transport. +- **URL factories** — `client_from_url("tcp://...")` / + `server_from_url("ws://.../BlueBox/websocket")` pick the right transport + for you. ## Installation -Install module with **pip** or **uv** - - ```bash - uv pip install sfs2x - ``` - -If you plan to use **encrypted packets**, install PyCryptodome: - - ```bash - pip install pycryptodome - ``` +```bash +pip install sfs2x # or: uv pip install sfs2x +``` ---- +`websockets` is pulled in as a hard dependency. AES encryption is optional +and requires PyCryptodome: -## Quick Start +```bash +pip install sfs2x[crypto] # or: pip install pycryptodome +``` -> **Note**: These examples describe the server-client for the low-Level transport module. High-level server and client -> modules are currently under development. +## Quick start -### Transport Client Example +### TCP client ```python import asyncio -from sfs2x.transport.factory import client_from_url +from sfs2x.transport import client_from_url from sfs2x.protocol import Message, ControllerID, SysAction from sfs2x.core import SFSObject -async def run_client(): +async def main() -> None: async with client_from_url("tcp://localhost:9933") as client: - payload = SFSObject() - payload.put_utf_string("message", "Hello from ZewSFS client!") - + payload = SFSObject().put_utf_string("message", "Hello from ZewSFS!") await client.send(Message( controller=ControllerID.SYSTEM, action=SysAction.PUBLIC_MESSAGE, - payload=payload + payload=payload, + )) + print("Reply:", await client.recv()) + + +asyncio.run(main()) +``` + +### TCP server + +```python +import asyncio +from sfs2x.transport import server_from_url, Transport +from sfs2x.protocol import Message +from sfs2x.core import SFSObject + + +async def handle(client: Transport) -> None: + async for msg in client.listen(): + await client.send(Message( + controller=msg.controller, + action=msg.action, + payload=SFSObject({"echo": msg.payload}), )) - response = await client.recv() - print("Response:", response) + +async def main() -> None: + async with server_from_url("tcp://0.0.0.0:9933") as acceptor: + async for client in acceptor: + print(f"Client connected: {client.host}:{client.port}") + asyncio.create_task(handle(client)) -if __name__ == "__main__": - asyncio.run(run_client()) +asyncio.run(main()) ``` -### Server Example +### WebSocket client + +The default SFS2X WebSocket endpoint path is `/BlueBox/websocket`. ```python import asyncio -from sfs2x.transport import server_from_url, TCPTransport +from sfs2x.transport import client_from_url from sfs2x.protocol import Message, ControllerID, SysAction from sfs2x.core import SFSObject -async def handle_client(client: TCPTransport): - async for message in client.listen(): - response_payload = SFSObject(message="Hello back from server!") - +async def main() -> None: + url = "ws://localhost:8080/BlueBox/websocket" + async with client_from_url(url) as client: await client.send(Message( controller=ControllerID.SYSTEM, - action=SysAction.PUBLIC_MESSAGE, - payload=response_payload + action=SysAction.HANDSHAKE, + payload=SFSObject() + .put_utf_string("api", "1.7.3") + .put_utf_string("cl", "Python/ZewSFS") + .put_bool("bin", True), )) + print("Handshake reply:", await client.recv()) -async def run_server(): - async for client in server_from_url("tcp://localhost:9933"): - print(f"New client connected: {client.host}:{client.port}") - asyncio.create_task(handle_client(client)) +asyncio.run(main()) +``` +For TLS, point the client at `wss://...` and (optionally) pass an +`ssl.SSLContext`: -if __name__ == "__main__": - asyncio.run(run_server()) +```python +import ssl +client_from_url("wss://game.example.com/BlueBox/websocket", ssl=ssl.create_default_context()) ``` ---- +### WebSocket server + +```python +import asyncio +from sfs2x.transport import server_from_url, Transport +from sfs2x.protocol import Message -## Modules Overview -### Core +async def handle(client: Transport) -> None: + async for msg in client.listen(): + await client.send(msg) # echo -The `core` package provides fundamental data structures and serialization logic: -1. **Fields and Arrays**: - - `Bool`, `Byte`, `Short`, `Int`, `Long`, `Float`, `Double`, `UtfString`, `Text` - - `BoolArray`, `ByteArray`, `ShortArray`, `IntArray`, `LongArray`, `FloatArray`, `DoubleArray`, `UtfStringArray` +async def main() -> None: + async with server_from_url("ws://0.0.0.0:8080/BlueBox/websocket") as acceptor: + async for client in acceptor: + print(f"WS client connected: {client.host}:{client.port}") + asyncio.create_task(handle(client)) -2. **Containers**: - - `SFSObject` for key-value pairs - - `SFSArray` for sequential lists -3. **Utility Classes**: - - `Buffer` for reading raw bytes - - `Field` as a base for packable items - - `registry`, `decode`, and `encode` for bridging raw bytes ↔ SFS data types +asyncio.run(main()) +``` -### Protocol +## Modules overview -The `protocol` package focuses on reading/writing SFS2X-compliant packets: +### `sfs2x.core` -- **`Message`**: High-level class representing a single SFS2X message with `controller`, `action`, and `payload`. -- **`Flag`**: Enum for packet flags (binary, compressed, encrypted, etc.). -- **`encode` / `decode`**: Convert `Message` ↔ binary packets, optionally using compression and AES encryption. -- **`AESCipher`**: AES-128-CBC encryption/decryption for securing packets (requires PyCryptodome). +Low-level data model and binary encoding/decoding: -### Transport +- **Primitives**: `Bool`, `Byte`, `Short`, `Int`, `Long`, `Float`, `Double`, + `UtfString`, `Text`. +- **Arrays**: `BoolArray`, `ByteArray`, `ShortArray`, `IntArray`, + `LongArray`, `FloatArray`, `DoubleArray`, `UtfStringArray`. +- **Containers**: `SFSObject` (keyed) and `SFSArray` (ordered). +- **Helpers**: `Buffer` (byte cursor), `decode` / `register` (raw + bytes ↔ typed values). -The `transport` package provides abstractions for client-server communication: +### `sfs2x.protocol` -- **`Transport` (abstract)**: Defines the required methods (`open`, `send`, `recv`, `close`) for any transport. -- **`TCPTransport`**: Client-side implementation using asyncio streams (TCP). -- **`TCPAcceptor`**: Server-side implementation using asyncio `start_server` (TCP). -- **`client_from_url` / `server_from_url`**: Factory methods to instantiate a transport from a URL (e.g., - `tcp://localhost:9933`). +Wire-format codec: ---- +- `Message(controller, action, payload)` — high-level packet. +- `Flag` — header bit flags (`BINARY`, `COMPRESSED`, `ENCRYPTED`, + `BIG_SIZE`, `BLUEBOX`). +- `encode(msg, *, compress_threshold, encryption_key)` and + `decode(buf_or_bytes, *, encryption_key)`. +- `AESCipher` — AES-128-CBC helper (requires `pycryptodome`). +- `ControllerID`, `SysAction` — protocol-level enums. + +### `sfs2x.transport` + +Connection-level abstractions and concrete implementations: + +| Symbol | Purpose | +| --------------------- | --------------------------------------------- | +| `Transport` | Abstract base for client-side connections. | +| `Acceptor` | Abstract base for server-side acceptors. | +| `TransportClosedError`| Raised when the connection is no longer open. | +| `TCPTransport` | TCP client via `asyncio.open_connection`. | +| `TCPAcceptor` | TCP server via `asyncio.start_server`. | +| `WSTransport` | WebSocket client (`ws://` / `wss://`). | +| `WSAcceptor` | WebSocket server (default path `/BlueBox/websocket`). | +| `client_from_url` | URL → `Transport`. | +| `server_from_url` | URL → `Acceptor`. | + +A `Transport` lifecycle is: + +```python +async with client_from_url(url) as t: + await t.send(msg) + reply = await t.recv() + async for msg in t.listen(): + ... +``` -## Usage Examples +An `Acceptor` lifecycle mirrors it: -### Working with `SFSObject` and `SFSArray` +```python +async with server_from_url(url) as acceptor: + async for client in acceptor: + ... +``` -**Imperative style**: +## Working with `SFSObject` / `SFSArray` + +**Fluent / imperative**: ```python from sfs2x.core import SFSObject, SFSArray obj = SFSObject() -obj.put_int("score", 1200) -obj.put_double_array("history", [3.14, -4.5, 2.7]) \ - .put_bool("isAdmin", True) +obj.put_int("score", 1200) \ + .put_double_array("history", [3.14, -4.5, 2.7]) \ + .put_bool("isAdmin", True) arr = SFSArray() -arr.add_utf_string("item1") -arr.add_utf_string("item2") - -obj["myArray"] = arr +arr.add_utf_string("item1").add_utf_string("item2") +obj["items"] = arr ``` -**Declarative style**: +**Declarative**: ```python -from sfs2x.core import UtfString, Int, SFSObject, SFSArray +from sfs2x.core import UtfString, Int, SFSObject obj = SFSObject({ "name": UtfString("Zewsic"), @@ -193,53 +271,87 @@ obj = SFSObject({ "items": [ UtfString("Sword"), UtfString("Shield"), - SFSObject({"key": UtfString("value")}) - ], # SFSArray - "object": { - "some": UtfString("Thing") - } # SFSObject + SFSObject({"key": UtfString("value")}), + ], + "object": {"some": UtfString("thing")}, }) ``` -**Argument style**: -> **Note**: Added as experiment, unstable. +**Round-tripping bytes**: + ```python -from sfs2x.core import UtfString, Int, SFSObject, SFSArray +from sfs2x.core import decode, SFSObject, Int -obj = SFSObject( - username=UtfString("Zewsic"), - coins=Int(1200) -) +obj = SFSObject({"example": Int(42)}) +raw = obj.to_bytes() +restored: SFSObject = decode(raw) +print(restored.get("example")) # 42 ``` -### Serialization / Deserialization +## Compression and encryption -```python -from sfs2x.core import decode, SFSObject, Int +Both transports accept the same kwargs and forward them straight to the +codec: -# Serialize -obj = SFSObject({"example": Int(42)}) -raw_bytes = obj.to_bytes() +```python +from sfs2x.transport import client_from_url -# Deserialize -deserialized_obj: SFSObject = decode(raw_bytes) -print(deserialized_obj.get("example")) # 42 +client = client_from_url( + "ws://game.example.com/BlueBox/websocket", + compress_threshold=512, # zlib-compress payloads > 512 bytes + encryption_key=b"my_secret_16byte", # AES-128-CBC +) ``` -### Encrypted or Compressed Packets - -When creating or decoding messages, you can specify a threshold for compression and a key for encryption: +Or, using the codec directly: ```python -from sfs2x.protocol import Message, encode, decode, SysAction, ControllerID +from sfs2x.protocol import Message, encode, decode, ControllerID from sfs2x.core import SFSObject, UtfString -encryption_key = b"my_secret_16byte" +msg = Message(controller=ControllerID.EXTENSION, action=18, + payload={"secret": UtfString("HideMe")}) +packet = encode(msg, compress_threshold=512, encryption_key=b"my_secret_16byte") +restored = decode(packet, encryption_key=b"my_secret_16byte") +``` + +## URL → transport mapping + +| Scheme | Client | Server | Default port | Default path | +| ------- | ------------------ | -------------------- | ------------ | ---------------------- | +| `tcp` | `TCPTransport` | `TCPAcceptor` | `9933` | — | +| `ws` | `WSTransport` | `WSAcceptor` | `8080` | `/BlueBox/websocket` | +| `wss` | `WSTransport(TLS)` | `WSAcceptor(TLS)` | `443` | `/BlueBox/websocket` | -# Compress if payload > 512 bytes, encrypt with a 16-byte key -msg = Message(controller=ControllerID.EXTENSION, action=18, payload={"secret": UtfString("HideMe")}) -packet = encode(msg, compress_threshold=512, encryption_key=encryption_key) +Path defaults are the SFS2X conventions — set an explicit path in the URL +to override (e.g. `ws://host:8080/custom/socket`). For `wss://`, pass +`ssl=ssl.SSLContext(...)` to the factory. -# Decoding -decoded_msg = decode(packet, encryption_key=encryption_key) +## Status and roadmap + +This library covers the **binary protocol** and the **two main transports** +(TCP and WebSocket). It is intended as a low-level building block — there +is no high-level SmartFox-style session yet. + +Planned / in development: + +- `SFSClient` / `SFSServer` high-level wrappers (handshake, login, room and + user variables, extension call helpers). +- BlueBox HTTP tunneling transport. +- UDP transport for unreliable high-frequency channels. + +Contributions and protocol-edge bug reports are very welcome. + +## Contributing + +Issues and pull requests are tracked on +[GitHub](https://github.com/zewmsm/ZewSFS). Run the tests with: + +```bash +pip install -e . websockets pytest pytest-asyncio pycryptodome +pytest ``` + +## License + +MIT — see [LICENSE](LICENSE). diff --git a/requirements.txt b/requirements.txt index e69de29..b703685 100644 --- a/requirements.txt +++ b/requirements.txt @@ -0,0 +1 @@ +websockets>=12 diff --git a/sfs2x/transport/__init__.py b/sfs2x/transport/__init__.py index 286d51e..e892404 100644 --- a/sfs2x/transport/__init__.py +++ b/sfs2x/transport/__init__.py @@ -1,12 +1,17 @@ -from sfs2x.transport.base import Acceptor, Transport # noqa: I001 +from sfs2x.transport.base import Acceptor, Transport, TransportClosedError # noqa: I001 from sfs2x.transport.tcp import TCPAcceptor, TCPTransport +from sfs2x.transport.ws import DEFAULT_WS_PATH, WSAcceptor, WSTransport from sfs2x.transport.factory import client_from_url, server_from_url __all__ = [ + "DEFAULT_WS_PATH", "Acceptor", "TCPAcceptor", "TCPTransport", "Transport", + "TransportClosedError", + "WSAcceptor", + "WSTransport", "client_from_url", "server_from_url", ] diff --git a/sfs2x/transport/base.py b/sfs2x/transport/base.py index 8cc09f0..101f0c4 100644 --- a/sfs2x/transport/base.py +++ b/sfs2x/transport/base.py @@ -1,88 +1,176 @@ +import asyncio from abc import ABC, abstractmethod from collections.abc import AsyncIterator -from typing import Protocol +from types import TracebackType +from typing import Self from sfs2x.core import Buffer from sfs2x.protocol import Message, decode, encode -class Transport(ABC): - """Abstract base class for transports.""" - - _closed: bool - _compress_threshold: int | None = None - _encryption_key: bytes | None = None +class TransportClosedError(ConnectionError): + """Raised when send/recv is attempted on a closed transport.""" - def __init__(self) -> None: - self._closed = True - async def open(self) -> "Transport": +class Transport(ABC): + """Abstract base class for a bidirectional SFS2X message transport. + + Concrete implementations override the four ``_*`` hooks to plug in a + specific wire protocol (TCP streams, WebSocket frames, etc). The + public surface (``open`` / ``send`` / ``recv`` / ``close`` / ``listen``) + is the same regardless of the underlying transport. + """ + + def __init__( + self, + host: str, + port: int, + *, + compress_threshold: int | None = None, + encryption_key: bytes | None = None, + ) -> None: + self.host = host + self.port = port + self._compress_threshold = compress_threshold + self._encryption_key = encryption_key + self._opened = False + + @property + def closed(self) -> bool: + return not self._opened + + async def open(self) -> Self: + if self._opened: + return self await self._open() - self._closed = False + self._opened = True return self async def send(self, msg: Message) -> None: - if self._closed: - err_msg = "Connection closed by remote host" - raise ConnectionError(err_msg) + if not self._opened: + raise TransportClosedError("Connection closed by remote host") await self._send_raw( - encode(msg, compress_threshold=self._compress_threshold, encryption_key=self._encryption_key)) + encode( + msg, + compress_threshold=self._compress_threshold, + encryption_key=self._encryption_key, + ), + ) async def recv(self) -> Message: - if self._closed: - msg = "Connection closed by remote host" - raise ConnectionError(msg) + if not self._opened: + raise TransportClosedError("Connection closed by remote host") raw = await self._recv_raw() return decode(Buffer(raw), encryption_key=self._encryption_key) async def close(self) -> None: - if not self._closed: + if self._opened: + self._opened = False await self._close_impl() - self._closed = True async def listen(self) -> AsyncIterator[Message]: - """Async iterator over incoming messages.""" - while not self._closed: + """Yield messages until the transport is closed.""" + while self._opened: try: yield await self.recv() except (ConnectionError, RuntimeError): break - async def __aenter__(self) -> "Transport": - """Async enter.""" + async def __aenter__(self) -> Self: await self.open() return self - async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: # noqa: ANN001 - """Async exit.""" + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: await self.close() @abstractmethod - async def _open(self) -> None: - ... + async def _open(self) -> None: ... @abstractmethod - async def _send_raw(self, raw: bytes) -> None: - ... + async def _send_raw(self, raw: bytes) -> None: ... @abstractmethod async def _recv_raw(self) -> bytes: - ... + """Return one complete on-wire SFS2X packet (header + body).""" @abstractmethod - async def _close_impl(self) -> None: - ... - - @abstractmethod - def host(self) -> str: - ... + async def _close_impl(self) -> None: ... + + +class Acceptor(ABC): + """Abstract base class for a server-side acceptor. + + Lifecycle:: + + acceptor = SomeAcceptor(host, port) + await acceptor.start() + async for client in acceptor: + ... + await acceptor.close() + + or, equivalently:: + + async with SomeAcceptor(host, port) as acceptor: + async for client in acceptor: + ... + """ + + def __init__( + self, + host: str, + port: int, + *, + compress_threshold: int | None = None, + encryption_key: bytes | None = None, + ) -> None: + self.host = host + self.port = port + self._compress_threshold = compress_threshold + self._encryption_key = encryption_key + self._queue: asyncio.Queue[Transport] = asyncio.Queue() + self._started = False + + async def start(self) -> Self: + if self._started: + return self + await self._start() + self._started = True + return self - @abstractmethod - def port(self) -> int: - ... + async def close(self) -> None: + if not self._started: + return + self._started = False + await self._close() + + async def accept(self) -> AsyncIterator[Transport]: + if not self._started: + await self.start() + while self._started: + yield await self._queue.get() + + def __aiter__(self) -> AsyncIterator[Transport]: + return self.accept() + + async def __aenter__(self) -> Self: + await self.start() + return self + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + await self.close() -class Acceptor(Protocol): - """Async listener for server.""" + @abstractmethod + async def _start(self) -> None: ... - async def __aiter__(self) -> AsyncIterator[Transport]: ... # noqa: D105 + @abstractmethod + async def _close(self) -> None: ... diff --git a/sfs2x/transport/factory.py b/sfs2x/transport/factory.py index 69a77bf..a14666e 100644 --- a/sfs2x/transport/factory.py +++ b/sfs2x/transport/factory.py @@ -1,37 +1,84 @@ from urllib.parse import urlparse -from sfs2x.transport import Acceptor, TCPAcceptor, TCPTransport, Transport +from sfs2x.transport.base import Acceptor, Transport +from sfs2x.transport.tcp import TCPAcceptor, TCPTransport +from sfs2x.transport.ws import DEFAULT_WS_PATH, WSAcceptor, WSTransport +_DEFAULT_PORTS = {"tcp": 9933, "ws": 8080, "wss": 443} -def client_from_url(url: str, *, compress_threshold: int | None = None, encryption_key: bytes | None = None) -> Transport: - """ - Create transport from url. - * ``tcp://host:port`` - * ``ws://host:port/path`` - * ``http://host:port/path - """ +def _parse(url: str) -> tuple[str, str, int, str]: + """Return ``(scheme, host, port, path)`` from ``url``.""" u = urlparse(url) scheme = (u.scheme or "tcp").lower() + if scheme not in _DEFAULT_PORTS: + raise ValueError(f"Unsupported scheme: {scheme!r}. Expected one of {sorted(_DEFAULT_PORTS)}.") + host = u.hostname or "localhost" + port = u.port or _DEFAULT_PORTS[scheme] + path = u.path or DEFAULT_WS_PATH + return scheme, host, port, path - if scheme == "tcp": - port = u.port or 9933 - return TCPTransport(u.hostname or "localhost", port, compress_threshold=compress_threshold, encryption_key=encryption_key) - raise NotImplementedError +def client_from_url( + url: str, + *, + compress_threshold: int | None = None, + encryption_key: bytes | None = None, + ssl: object | None = None, +) -> Transport: + """Build a client transport from a URL. -def server_from_url(url: str, compress_threshold: int | None = None, encryption_key: bytes | None = None) -> TCPAcceptor | Acceptor: - """ - Create acceptor from url. + Supported schemes: - * ``tcp://host:port`` - * ``ws://host:port/path`` - * ``http://host:port/path + * ``tcp://host[:port]`` — raw SFS2X TCP socket (default port ``9933``). + * ``ws://host[:port][/path]`` — SFS2X over WebSocket (default port + ``8080``, default path ``/BlueBox/websocket``). + * ``wss://host[:port][/path]`` — SFS2X over secure WebSocket (default + port ``443``). Pass an ``ssl.SSLContext`` via ``ssl=...`` if needed. """ - u = urlparse(url) - scheme = u.scheme.lower() + scheme, host, port, path = _parse(url) + if scheme == "tcp": + return TCPTransport( + host, + port, + compress_threshold=compress_threshold, + encryption_key=encryption_key, + ) + return WSTransport( + host, + port, + path=path, + secure=scheme == "wss", + ssl=ssl, # type: ignore[arg-type] + compress_threshold=compress_threshold, + encryption_key=encryption_key, + ) + +def server_from_url( + url: str, + *, + compress_threshold: int | None = None, + encryption_key: bytes | None = None, + ssl: object | None = None, +) -> Acceptor: + """Build a server-side acceptor from a URL. + + See :func:`client_from_url` for the supported schemes. + """ + scheme, host, port, path = _parse(url) if scheme == "tcp": - port = u.port or 9933 - return TCPAcceptor(u.hostname or "localhost", port, compress_threshold=compress_threshold, encryption_key=encryption_key) - raise NotImplementedError + return TCPAcceptor( + host, + port, + compress_threshold=compress_threshold, + encryption_key=encryption_key, + ) + return WSAcceptor( + host, + port, + path=path, + ssl=ssl, # type: ignore[arg-type] + compress_threshold=compress_threshold, + encryption_key=encryption_key, + ) diff --git a/sfs2x/transport/tcp.py b/sfs2x/transport/tcp.py index 9da0190..86eb76f 100644 --- a/sfs2x/transport/tcp.py +++ b/sfs2x/transport/tcp.py @@ -1,58 +1,75 @@ import asyncio +import contextlib import logging -from asyncio import AbstractServer, IncompleteReadError, StreamReader, StreamWriter, get_running_loop, start_server -from collections.abc import AsyncIterator +from asyncio import AbstractServer, IncompleteReadError, StreamReader, StreamWriter +from typing import Self from sfs2x.protocol import Flag -from sfs2x.transport import Acceptor, Transport +from sfs2x.transport.base import Acceptor, Transport, TransportClosedError logger = logging.getLogger("SFS2X/TCPTransport") class TCPTransport(Transport): - """SmartFox Transport realisation with Async Streams.""" - - def __init__(self, host: str, port: int, compress_threshold: int | None = None, encryption_key: bytes | None = None) -> None: - super().__init__() - self._host = host - self._port = port + """SFS2X transport over raw TCP using asyncio streams.""" + + def __init__( + self, + host: str, + port: int, + *, + compress_threshold: int | None = None, + encryption_key: bytes | None = None, + ) -> None: + super().__init__( + host, + port, + compress_threshold=compress_threshold, + encryption_key=encryption_key, + ) self._reader: StreamReader | None = None self._writer: StreamWriter | None = None - self._encryption_key = encryption_key - self._compress_threshold = compress_threshold - - @property - def host(self) -> str: - return self._host - @property - def port(self) -> int: - return self._port + @classmethod + def _from_streams( + cls, + reader: StreamReader, + writer: StreamWriter, + *, + compress_threshold: int | None = None, + encryption_key: bytes | None = None, + ) -> Self: + host, port = writer.get_extra_info("peername")[:2] + inst = cls( + host, + port, + compress_threshold=compress_threshold, + encryption_key=encryption_key, + ) + inst._reader = reader + inst._writer = writer + inst._opened = True + return inst async def _open(self) -> None: - self._reader, self._writer = await asyncio.open_connection(self._host, self._port) - logger.info("Opened connection to %s:%s", self._host, self._port) + self._reader, self._writer = await asyncio.open_connection(self.host, self.port) + logger.info("Opened connection to %s:%s", self.host, self.port) async def _send_raw(self, raw: bytes) -> None: - if not self._writer: - msg = "Connection closed by remote host" - raise ConnectionError(msg) - + if self._writer is None: + raise TransportClosedError("Connection closed by remote host") self._writer.write(raw) await self._writer.drain() - logger.info("Sent %s bytes", {len(raw)}) + logger.info("Sent %s bytes", len(raw)) async def _recv_raw(self) -> bytes: - if not self._reader: - msg = "Connection closed by remote host" - raise ConnectionError(msg) - + if self._reader is None: + raise TransportClosedError("Connection closed by remote host") try: - _flags = await self._reader.readexactly(1) - flags = Flag(_flags[0]) + flag_byte = await self._reader.readexactly(1) + flags = Flag(flag_byte[0]) if not flags & Flag.BINARY: - msg = "Invalid packet type" - raise RuntimeWarning(msg) + raise RuntimeError("Invalid packet type") len_bytes = await self._reader.readexactly(2) if flags & Flag.BIG_SIZE: @@ -61,58 +78,59 @@ async def _recv_raw(self) -> bytes: length = int.from_bytes(len_bytes, byteorder="big", signed=False) body = await self._reader.readexactly(length) except IncompleteReadError as e: - msg = "Connection closed by remote host" - raise ConnectionError(msg) from e - - logger.info("Received %s bytes from %s:%s", length, self._host, self._port) + raise TransportClosedError("Connection closed by remote host") from e - return _flags + len_bytes + body + logger.info("Received %s bytes from %s:%s", length, self.host, self.port) + return flag_byte + len_bytes + body async def _close_impl(self) -> None: if self._writer: self._writer.close() - await self._writer.wait_closed() - logger.info("Closed connection to %s:%s", self._host, self._port) + with contextlib.suppress(ConnectionError): + await self._writer.wait_closed() + logger.info("Closed connection to %s:%s", self.host, self.port) class TCPAcceptor(Acceptor): - """Server-Side implementation of the TCP Acceptor.""" - - def __init__(self, host: str, port: int, compress_threshold: int | None = None, encryption_key: bytes | None = None) -> None: - super().__init__() - self._host = host - self._port = port + """Server-side TCP acceptor. + + Each accepted connection is wrapped in a :class:`TCPTransport` ready + to ``send`` / ``recv`` / ``listen`` SFS2X messages. + """ + + def __init__( + self, + host: str, + port: int, + *, + compress_threshold: int | None = None, + encryption_key: bytes | None = None, + ) -> None: + super().__init__( + host, + port, + compress_threshold=compress_threshold, + encryption_key=encryption_key, + ) self._server: AbstractServer | None = None - self._compress_threshold = compress_threshold - self._encryption_key = encryption_key - - async def __aiter__(self) -> AsyncIterator[Transport]: # type: ignore # noqa: PGH003 - """Iterate all new connections.""" - loop = get_running_loop() - self._server = await start_server(self._on_conn, self._host, self._port) - logger.info("Started server on %s:%s", self._host, self._port) - - self._queue: asyncio.Queue[TCPTransport] = asyncio.Queue() - async def producer() -> None: - async with self._server: # type: ignore # noqa: PGH003 - await self._server.serve_forever() # type: ignore # noqa: PGH003 + async def _start(self) -> None: + self._server = await asyncio.start_server(self._on_conn, self.host, self.port) + logger.info("Started TCP server on %s:%s", self.host, self.port) - loop.create_task(producer()) # noqa: RUF006 - - try: - while True: - yield await self._queue.get() - finally: + async def _close(self) -> None: + if self._server is not None: self._server.close() + with contextlib.suppress(Exception): + await self._server.wait_closed() + logger.info("Stopped TCP server on %s:%s", self.host, self.port) async def _on_conn(self, reader: StreamReader, writer: StreamWriter) -> None: - host, port = writer.get_extra_info("peername") - logger.info("Connection from %s:%s", host, port) - transport = TCPTransport(host, port) - transport._reader = reader # noqa: SLF001 - transport._writer = writer # noqa: SLF001 - transport._closed = False # noqa: SLF001 - transport._encryption_key = self._encryption_key # noqa: SLF001 - transport._compress_threshold = self._compress_threshold # noqa: SLF001 + transport = TCPTransport._from_streams( + reader, + writer, + compress_threshold=self._compress_threshold, + encryption_key=self._encryption_key, + ) + logger.info("Connection from %s:%s", transport.host, transport.port) await self._queue.put(transport) diff --git a/sfs2x/transport/ws.py b/sfs2x/transport/ws.py new file mode 100644 index 0000000..a5e08af --- /dev/null +++ b/sfs2x/transport/ws.py @@ -0,0 +1,193 @@ +import contextlib +import logging +import ssl as _ssl +from typing import Self + +import websockets +from websockets.asyncio.client import ClientConnection, connect +from websockets.asyncio.server import Server, ServerConnection, serve +from websockets.exceptions import ConnectionClosed + +from sfs2x.transport.base import Acceptor, Transport, TransportClosedError + +logger = logging.getLogger("SFS2X/WSTransport") + +DEFAULT_WS_PATH = "/BlueBox/websocket" + + +class WSTransport(Transport): + """SFS2X transport over WebSocket binary frames. + + Each WebSocket binary message carries one complete SFS2X packet + (BINARY-flag byte + 2/4-byte big-endian length + body) — the same + framing used over plain TCP. This matches SmartFoxServer 2X v2.13+ + where the WebSocket endpoint speaks the standard binary protocol. + """ + + def __init__( + self, + host: str, + port: int, + *, + path: str = DEFAULT_WS_PATH, + secure: bool = False, + ssl: _ssl.SSLContext | None = None, + compress_threshold: int | None = None, + encryption_key: bytes | None = None, + ) -> None: + super().__init__( + host, + port, + compress_threshold=compress_threshold, + encryption_key=encryption_key, + ) + self.path = path if path.startswith("/") else f"/{path}" + self.secure = secure + self._ssl_ctx = ssl + self._ws: ClientConnection | ServerConnection | None = None + + @classmethod + def _from_ws( + cls, + ws: ServerConnection, + *, + path: str = DEFAULT_WS_PATH, + compress_threshold: int | None = None, + encryption_key: bytes | None = None, + ) -> Self: + host, port = ws.remote_address[:2] + inst = cls( + host, + port, + path=path, + compress_threshold=compress_threshold, + encryption_key=encryption_key, + ) + inst._ws = ws + inst._opened = True + return inst + + @property + def url(self) -> str: + scheme = "wss" if self.secure else "ws" + return f"{scheme}://{self.host}:{self.port}{self.path}" + + async def _open(self) -> None: + ssl_arg = self._ssl_ctx if self.secure else None + self._ws = await connect(self.url, subprotocols=None, ssl=ssl_arg) + logger.info("Opened WS connection to %s", self.url) + + async def _send_raw(self, raw: bytes) -> None: + if self._ws is None: + raise TransportClosedError("Connection closed by remote host") + try: + await self._ws.send(bytes(raw)) + except ConnectionClosed as e: + raise TransportClosedError("Connection closed by remote host") from e + logger.info("Sent %s bytes (ws)", len(raw)) + + async def _recv_raw(self) -> bytes: + if self._ws is None: + raise TransportClosedError("Connection closed by remote host") + try: + frame = await self._ws.recv() + except ConnectionClosed as e: + raise TransportClosedError("Connection closed by remote host") from e + + if isinstance(frame, str): + raise RuntimeError("Received text frame; expected binary SFS2X packet") + + logger.info("Received %s bytes (ws) from %s:%s", len(frame), self.host, self.port) + return frame + + async def _close_impl(self) -> None: + if self._ws is not None: + with contextlib.suppress(ConnectionClosed): + await self._ws.close() + logger.info("Closed WS connection to %s", self.url) + + +class WSAcceptor(Acceptor): + """Server-side WebSocket acceptor. + + Accepts connections to ``path`` (default ``/BlueBox/websocket``) and + yields one :class:`WSTransport` per client. + """ + + def __init__( + self, + host: str, + port: int, + *, + path: str = DEFAULT_WS_PATH, + ssl: _ssl.SSLContext | None = None, + compress_threshold: int | None = None, + encryption_key: bytes | None = None, + ) -> None: + super().__init__( + host, + port, + compress_threshold=compress_threshold, + encryption_key=encryption_key, + ) + self.path = path if path.startswith("/") else f"/{path}" + self._ssl_ctx = ssl + self._server: Server | None = None + + async def _start(self) -> None: + self._server = await serve( + self._on_conn, + self.host, + self.port, + ssl=self._ssl_ctx, + subprotocols=None, + ) + logger.info( + "Started WS server on %s://%s:%s%s", + "wss" if self._ssl_ctx else "ws", + self.host, + self.port, + self.path, + ) + + async def _close(self) -> None: + if self._server is not None: + self._server.close() + with contextlib.suppress(Exception): + await self._server.wait_closed() + logger.info("Stopped WS server on %s:%s", self.host, self.port) + + async def _on_conn(self, ws: ServerConnection) -> None: + request_path = _ws_request_path(ws) + if request_path != self.path: + await ws.close(code=1003, reason=f"Unknown path: {request_path}") + return + + transport = WSTransport._from_ws( + ws, + path=self.path, + compress_threshold=self._compress_threshold, + encryption_key=self._encryption_key, + ) + logger.info("WS connection from %s:%s", transport.host, transport.port) + await self._queue.put(transport) + + # websockets closes the connection when the handler returns, so we + # need to keep it alive while the application is using the transport. + await ws.wait_closed() + + +def _ws_request_path(ws: ServerConnection) -> str: + """Extract the HTTP request path from an incoming WebSocket connection.""" + request = getattr(ws, "request", None) + if request is not None and getattr(request, "path", None): + return request.path + # Fallback for older websockets versions. + return getattr(ws, "path", "/") + + +__all__ = ["DEFAULT_WS_PATH", "WSAcceptor", "WSTransport"] + +# Ensure websockets is importable at module import time so a missing +# install fails fast with a clear hint. +_ = websockets.__version__ diff --git a/tests/test_transport.py b/tests/test_transport.py index effc7fa..b12a77f 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -3,34 +3,50 @@ import pytest_asyncio from sfs2x.core import Float, UtfString, Int, Double, SFSObject -from sfs2x.transport import client_from_url, server_from_url, TCPTransport +from sfs2x.transport import client_from_url, server_from_url, Transport from sfs2x.protocol import Message, ControllerID, SysAction + +async def _echo_handler(conn: Transport) -> None: + async for msg in conn.listen(): + obj = msg.payload.value.get('input') + obj.value *= 2 + msg.payload['resp'] = obj + await conn.send(msg) + + +async def _run_echo_server(url: str, *, encryption_key: bytes | None = None) -> None: + async with server_from_url(url, encryption_key=encryption_key) as acceptor: + async for conn in acceptor: + asyncio.create_task(_echo_handler(conn)) + + @pytest_asyncio.fixture -async def echo_server(event_loop): - server_task = event_loop.create_task(run_echo_server()) +async def tcp_echo_server(): + server_task = asyncio.create_task( + _run_echo_server("tcp://0.0.0.0:9000", encryption_key=b'mega_secured_key') + ) await asyncio.sleep(0.2) - yield - server_task.cancel() with pytest.raises(asyncio.CancelledError): await server_task -async def run_echo_server(): - async for conn in server_from_url("tcp://0.0.0.0:9000", encryption_key=b'mega_secured_key'): - asyncio.create_task(some_handler(conn)) -async def some_handler(conn: TCPTransport): - async for msg in conn.listen(): - print(msg) - obj = msg.payload.value.get('input') - obj.value *= 2 - msg.payload['resp'] = obj - await conn.send(msg) +@pytest_asyncio.fixture +async def ws_echo_server(): + server_task = asyncio.create_task( + _run_echo_server("ws://0.0.0.0:9001/BlueBox/websocket", encryption_key=b'mega_secured_key') + ) + await asyncio.sleep(0.2) + yield + server_task.cancel() + with pytest.raises(asyncio.CancelledError): + await server_task + @pytest.mark.asyncio -async def test_tcp_echo_roundtrip(echo_server): +async def test_tcp_echo_roundtrip(tcp_echo_server): conn = client_from_url("tcp://localhost:9000", encryption_key=b'mega_secured_key') async with conn: @@ -43,6 +59,25 @@ async def test_tcp_echo_roundtrip(echo_server): assert answer.action == test_msg.action assert answer.payload.get('resp') == value.value * 2 + +@pytest.mark.asyncio +async def test_ws_echo_roundtrip(ws_echo_server): + conn = client_from_url( + "ws://localhost:9001/BlueBox/websocket", + encryption_key=b'mega_secured_key', + ) + + async with conn: + for value in [UtfString('Friday, '), Int(8), Double(123.12)]: + test_msg = Message(ControllerID.SYSTEM, SysAction.PING_PONG, SFSObject({'input': value})) + await conn.send(test_msg) + + answer = await conn.recv() + assert answer.controller == test_msg.controller + assert answer.action == test_msg.action + assert answer.payload.get('resp') == value.value * 2 + + @pytest.mark.asyncio async def test_msm_server(): async with client_from_url("tcp://107.20.67.227") as conn: @@ -66,4 +101,4 @@ async def test_msm_server(): await conn.send(Message(ControllerID.SYSTEM, SysAction.LOGIN, auth_info)) resp = await conn.recv() - assert resp.payload['ec'] == 1 \ No newline at end of file + assert resp.payload['ec'] == 1