diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a4aee8..d4e86dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ All notable changes to this project. The format is loosely based on ### Added +- M34: Linux-only `EpollServer` gateway transport prototype. It uses one `epoll` loop, + nonblocking accept/read/write, per-client outbound buffers, and one deterministic `Session` per + client; `qsl-gateway --epoll` opts in on Linux. +- M34: epoll gateway tests cover platform scoping, invalid bind-host rejection, and two + simultaneous loopback clients handled by one event loop, plus soft-backpressure and hard-cap + response-budget cases, including disconnect-after-write draining and queued-reply preservation + before a later over-cap close. - M33: deterministic pipeline scheduling perturbation (`PipelinePerturbation`) so concurrency tests exercise different input/engine/output pacing patterns without timing-sensitive sleeps. - M33: `make concurrency-stress` / `scripts/concurrency_stress.sh`, an opt-in repeated @@ -30,6 +37,9 @@ All notable changes to this project. The format is loosely based on ### Documentation +- M34: updated socket-gateway docs and added ADR 0010 to distinguish the Linux epoll architecture + prototype from M35 multi-client load/socket-pressure evidence. The docs cover EAGAIN/EWOULDBLOCK, + partial writes, half-close flushing, and bounded outbound buffering. - M33: concurrency docs now distinguish static happens-before reasoning, TSan, deterministic schedule perturbation, and repeated stress as evidence over executed schedules rather than proof over all interleavings. diff --git a/CMakeLists.txt b/CMakeLists.txt index 255ea5c..e1c392d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,7 +15,8 @@ add_library(qsl_core src/core/types.cpp src/protocol/codec.cpp src/engine/order_ src/gateway/order_gateway.cpp src/feed/market_data.cpp src/feed/publisher.cpp src/replay/event_log.cpp src/replay/command.cpp src/replay/recovery.cpp src/gateway/session.cpp src/gateway/tcp_server.cpp - src/feed/udp_feed.cpp src/replay/fixture.cpp src/replay/shrink.cpp) + src/gateway/epoll_server.cpp src/feed/udp_feed.cpp src/replay/fixture.cpp + src/replay/shrink.cpp) target_include_directories(qsl_core PUBLIC include) target_link_libraries(qsl_core PRIVATE qsl_warnings) diff --git a/HANDOFF.md b/HANDOFF.md index cd0a362..fc1e6c5 100644 --- a/HANDOFF.md +++ b/HANDOFF.md @@ -14,10 +14,10 @@ Keep all four in the repo root. --- ## Current handoff -The repo is released at `v0.1.0`. M0–M32 are merged. Most recently M32 = PR #96 (squash commit -f122ee8): PMR-backed order-book node allocation experiment, engine-level storage benchmark, and -issue #95 for future intrusive/custom-node `OrderPool` storage. **M33 is the active -milestone; draft PR #97 is open, CI is green, and Codex review found no major issues.** +The repo is released at `v0.1.0`. M0–M33 are merged. Most recently M33 = PR #97 (squash commit +fe8679a): deterministic scheduling perturbation, opt-in repeated concurrency stress, and docs +framing TSan/perturbation/stress as evidence rather than proof. **M34 is the active milestone and +draft PR #98 is open.** Background — M29 delivered (merged, constrained-environment only): @@ -30,7 +30,12 @@ Background — M29 delivered (merged, constrained-environment only): - The repository does **not** currently claim real hardware PMU evidence. - Issue #90 tracks full PMU-backed evidence generation on a bare-metal or PMU-capable Linux target. -M33 is in progress on `feat/m33-advanced-concurrency-validation`. To resume it: +M34 is in progress on `feat/m34-epoll-gateway-architecture` with draft PR #98 open. Codex review +fixes are applied locally: the epoll path budgets high-fanout responses at the Session/gateway +boundary before mutating engine state, drains already-readable `EPOLLIN` bytes before honoring +`EPOLLHUP`, suppresses reads once a session is closing, ignores stale fd events through +per-connection generation tokens, and preserves queued replies from earlier accepted frames before +a later over-cap close. To resume it: ```text /resume @@ -57,8 +62,8 @@ gh release view v0.1.0 Current state: -- main tip: `f122ee8` (PR #96, M32) -- active branch: `feat/m33-advanced-concurrency-validation` (M33 draft PR #97 open; CI/Codex clean) +- main tip: `fe8679a` (PR #97, M33) +- active branch: `feat/m34-epoll-gateway-architecture` (M34 draft PR #98 open) - release tag: `v0.1.0` - open follow-up issue: #90 for full Linux hardware PMU perf evidence - open follow-up issue: #95 for future intrusive/custom-node `OrderPool` order-book @@ -66,9 +71,9 @@ Current state: ### Next milestone -M33 — Advanced concurrency validation. Add non-flaky scheduling perturbation or longer stress -modes, document that TSan/stress are empirical evidence rather than proof, and keep long-running or -Linux-only validation as explicit opt-in commands rather than normal CI requirements. +M34 — epoll gateway architecture. Add a Linux event-driven gateway prototype with multi-client +readiness handling, nonblocking accept/read/write behavior, bounded per-client response buffering, +and preserved deterministic session semantics. Keep load/pressure testing for M35. ### Phase III / IV purpose @@ -81,9 +86,9 @@ networking research. Current priority order: 1. Issue #90 — real Linux hardware PMU perf evidence. -2. M33 — advanced concurrency validation (in progress). -3. M34 — epoll gateway architecture. -4. M35 — multi-client socket pressure. +2. M34 — epoll gateway architecture (in progress). +3. M35 — multi-client socket pressure. +4. M36 — NUMA awareness study. ### Forbidden shortcuts diff --git a/PROGRESS.md b/PROGRESS.md index 0aadf55..e21a61f 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -19,15 +19,15 @@ Do not rely on prior chat memory. ## Current state -- **Active milestone:** M33 — Advanced concurrency validation (in progress) -- **Status:** draft PR open (#97); local verification passed; CI green; Codex review clean -- **Active branch:** `feat/m33-advanced-concurrency-validation` -- **Last completed milestone:** M32 — Pool-backed order-book storage experiment (squash-merged, PR #96, commit f122ee8; Codex auto-review clean — no major issues; CI green) +- **Active milestone:** M34 — epoll gateway architecture (in progress) +- **Status:** draft PR open (#98); Codex review fixes applied locally +- **Active branch:** `feat/m34-epoll-gateway-architecture` +- **Last completed milestone:** M33 — Advanced concurrency validation (squash-merged, PR #97, commit fe8679a; Codex review clean — no major issues; CI green) - **Release:** `v0.1.0` published as a GitHub release (tag on commit 9857e1a); no packages published -- **`make check` passing:** last verified on M33 (189/189) on 2026-06-02; `make asan` also passed 189/189, `make tsan` passed 19/19 concurrency tests, and a two-loop `make concurrency-stress` smoke passed. -- **Last action:** PR #97 CI passed all 6 jobs and Codex review found no major issues. -- **Next action:** human review/squash-merge PR #97; do not merge from the automation side. -- **Blockers:** none for M33. Issue #90 (full hardware PMU evidence) remains blocked on PMU-capable Linux access; issue #95 tracks future direct intrusive/custom-node storage and is not part of M33. +- **`make check` passing:** last verified on M34 (191/191) on 2026-06-03; `make asan` also passed 191/191. Linux Docker verification built `qsl-gateway`/`test_epoll_gateway` and passed 7 epoll tests / 273 assertions (multi-client, backpressure, hard-cap, queued-reply, and disconnect-after-write coverage). +- **Last action:** addressed the latest Codex PR #98 epoll review findings locally: closing sessions no longer re-arm reads, client events carry generation tokens to ignore stale fd events, and queued replies from earlier frames in the same read are preserved before a later over-cap close. +- **Next action:** commit/push the latest M34 Codex review fixes, trigger Codex review, and monitor PR #98 CI; do not merge. +- **Blockers:** none for M34 on Linux-targeted code paths; issue #90 remains blocked on PMU-capable Linux access; issue #95 remains future intrusive/custom-node storage. --- @@ -177,6 +177,10 @@ Status key: - [M5] Rejected modifies do not mutate engine state or consume sequence numbers. - [M4] Active resting `OrderId`s are unique per symbol; duplicate active IDs are no-ops in M4 and become structured `DuplicateOrderId` rejections in M5. - [M4] Tests cover no orphaned liquidity after duplicate-id attempts. +- [M34] Epoll response budgeting is enforced at the Session/gateway boundary: over-cap `NewOrder` fanout is rejected before appending responses or mutating engine state. +- [M34] Epoll transport drains writable backlog before accepting more input for a client, retries `EINTR` sends, drains `EPOLLIN` data before honoring `EPOLLHUP`, treats `EPOLLERR` as an immediate close, suppresses `EPOLLIN` after EOF, and closes active clients on fatal listener failure. +- [M34] Epoll client events carry fd-generation tokens; stale events from a closed connection cannot act on a new connection that reused the same numeric fd. +- [M34] Once a session is closing, epoll does not re-arm `EPOLLIN`; queued replies are either flushed under the hard cap or discarded immediately when no reply was accepted in the current read. --- @@ -202,7 +206,7 @@ compiler-, and build-dependent — these are from one machine, not a production- > If stopping mid-milestone, write exactly what is half-done and the precise next step. Clear this when the milestone merges. -- _M33 draft PR #97 is open on `feat/m33-advanced-concurrency-validation`. Implemented deterministic `PipelinePerturbation` hooks/tests, `make concurrency-stress`, and concurrency-methodology docs. Local verification: `make check` 189/189, `make asan` 189/189, `make tsan` 19/19 concurrency tests, `QSL_CONCURRENCY_STRESS_LOOPS=2 make concurrency-stress` 2/2 loops, `bash -n scripts/concurrency_stress.sh`, and `git diff --check`. PR #97 CI passed all 6 jobs and Codex review found no major issues. Next: human review/squash-merge; do not merge. Clear this block when M33 merges._ +- _M34 PR #98 open on `feat/m34-epoll-gateway-architecture`: Linux `EpollServer`, `qsl-gateway --epoll`, Linux-gated epoll tests, socket-gateway docs, ADR 0010. Codex review fixes addressed: read-backpressure, `--epoll` flag/port parsing robustness, soft high-water mark + hard outbound cap, bounded Session response generation before gateway mutation, O(n²)-flush → write-offset, `EINTR`-on-send retry, transient/fatal accept handling, `EPOLLHUP` read-drain before close, close-after-flush read suppression, fd-generation stale-event guards, queued-reply preservation before a later over-cap close, EOF read re-arm suppression, and duplicate-port rejection. Verification: `make check` 191/191, `make asan` 191/191, `make fmt-check`, `git diff --check`, Docker Ubuntu `test_epoll_gateway` 7 tests / 273 assertions. Next: commit/push this review fix, trigger Codex review, and monitor CI. Clear this block when M34 merges._ --- @@ -257,8 +261,8 @@ Lower priority: | M30 | Kernel/socket path profiling and Linux socket hardening | `feat/m30-socket-profiling-hardening` | ☑ merged | #92 | syscall/socket-buffer/UDP pressure evidence; epoll deferred to M34/M35 | | M31 | External review / maintainer signal | `docs/m31-external-review` | ☑ merged | #93 | Review-request checklist + feedback template; review request opened as issue #94 | | M32 | Pool-backed order-book storage experiment | `feat/m32-pool-backed-order-book-storage` | ☑ merged | #96 | PMR-backed node allocation in order-book paths; direct intrusive `OrderPool` storage deferred to #95 | -| M33 | Advanced concurrency validation | `feat/m33-advanced-concurrency-validation` | ◐ draft PR | #97 | Scheduling perturbation, longer stress, and stronger concurrency methodology | -| M34 | epoll gateway architecture | `feat/m34-epoll-gateway-architecture` | ☐ not started | — | Event-driven multi-client gateway design | +| M33 | Advanced concurrency validation | `feat/m33-advanced-concurrency-validation` | ☑ merged | #97 | Scheduling perturbation, longer stress, and stronger concurrency methodology | +| M34 | epoll gateway architecture | `feat/m34-epoll-gateway-architecture` | ◐ draft PR | #98 | Event-driven multi-client gateway design | | M35 | Multi-client load and socket-pressure testing | `feat/m35-multi-client-socket-pressure` | ☐ not started | — | TCP/UDP stress, buffer pressure, backpressure investigation | | M36 | NUMA awareness study | `feat/m36-numa-awareness-study` | ☐ not started | — | CPU affinity and NUMA locality measurements where hardware exists | | M37 | Lock-free ingress pipeline | `feat/m37-lock-free-ingress-pipeline` | ☐ not started | — | Ingress contention experiment; not lock-free matching | @@ -269,6 +273,13 @@ Lower priority: ## Decision log additions +- [2026-06-02] M34: started after M33 (#97) squash-merged (commit fe8679a). Scope: Linux `epoll` gateway architecture prototype only — event-driven multi-client readiness, nonblocking accept/read/write behavior, deterministic `Session` semantics preserved. Do not start M35 load/socket-pressure testing and do not make production-capacity claims. +- [2026-06-02] M34: added `EpollServer`, a Linux-only event-driven transport with one `epoll` loop, nonblocking `accept4`/read/write, per-client outbound buffers, and one existing deterministic `Session` per connection. `qsl-gateway --epoll` opts in; the blocking `TcpServer` remains the default. +- [2026-06-02] M34: epoll tests are platform-scoped. macOS verifies unsupported mode; Docker Ubuntu Linux verifies availability, invalid bind-host rejection, and two simultaneous loopback clients handled by one event loop without thread-per-connection design. +- [2026-06-02] M34: local verification passed: `make check` 190/190, `make asan` 190/190, `git diff --check`, and Docker Ubuntu Linux `test_epoll_gateway` 3 tests / 36 assertions. +- [2026-06-02] M34: opened draft PR #98; do not merge from the automation side. +- [2026-06-03] M34: Codex review of #98 iterated several rounds (CI green throughout), each fix verified on macOS and Linux Docker as noted in the current-state block: read-backpressure; `--epoll` flag-or-port parsing with whole-token/range/duplicate validation; a soft high-water mark (stop reading) plus a hard outbound cap; O(n²) front-erase flush replaced with a write offset; `EINTR`-on-send retry; survival of transient `accept4` errors (`ECONNABORTED`/pending network errors) instead of tearing down the loop; bounded high-fanout `NewOrder` response budgeting before gateway mutation; `EPOLLERR` immediate close; `EPOLLHUP` drain of already-readable bytes before close; no `EPOLLIN` re-arm once a session is closing; fd-generation checks for stale events after fd reuse; and queued-reply preservation when an over-cap frame follows earlier accepted frames in the same read. Issue #99 remains a broader follow-up for shared streaming/byte-budgeted response generation outside the epoll-specific bounded path. +- [2026-06-02] M33: PR #97 squash-merged (commit fe8679a); CI passed all 6 jobs and Codex review found no major issues. M33 delivered deterministic pipeline scheduling perturbation, opt-in repeated concurrency stress, and docs framing TSan/perturbation/stress as evidence rather than proof. - [2026-06-02] M33: started after M32 (#96) squash-merged (commit f122ee8). Scope: advanced concurrency validation only — deterministic scheduling perturbation and/or longer stress modes, stronger concurrency methodology docs, opt-in long-running/Linux checks where appropriate. Do not claim proof; TSan and stress tests remain dynamic evidence over executed schedules. - [2026-06-02] M33: added deterministic `PipelinePerturbation` yield hooks to the threaded pipeline and a regression test that compares perturbed pipeline output against the single-threaded reference across seeded property flows, queue capacities, and per-stage yield patterns. - [2026-06-02] M33: added `make concurrency-stress` / `scripts/concurrency_stress.sh` as an opt-in repeated concurrency-label test loop. Normal CI remains non-flaky; longer local/Linux runs are documented through explicit knobs rather than hidden in the default gate. @@ -345,13 +356,13 @@ Quant Systems Lab — Linux Systems + Exchange Infrastructure Simulator ## Next action remains -Current action is M33 on `feat/m33-advanced-concurrency-validation`; see the top-level current state +Current action is M34 on `feat/m34-epoll-gateway-architecture`; see the top-level current state block for the exact next step. -After M33 squash-merges, resume with: +After M34 squash-merges, resume with: ```text -/start-milestone 34 +/start-milestone 35 ``` Issue #90 remains the evidence debt for full Linux hardware PMU artifacts. Work it only on a diff --git a/README.md b/README.md index 4b6be36..a73e286 100644 --- a/README.md +++ b/README.md @@ -107,9 +107,10 @@ the core numbers above. - **Synthetic and local.** No real market data, no real venue connectivity, no order types beyond limit/market + GTC/IOC. -- **Networking remains simple.** The TCP gateway is intentionally one-connection-at-a-time; the - threaded gateway-engine-feed pipeline is an opt-in correctness prototype, not a production - event loop. +- **Networking remains scoped.** The default TCP gateway is intentionally + one-connection-at-a-time; Linux builds also include an opt-in `epoll` gateway prototype for + event-driven multi-client readiness. It is architecture validation, not a production event loop + or capacity claim. - **Benchmarks are microbenchmarks**, not end-to-end or production latency (see above). - **Networking is minimal**: loopback TCP order entry and a UDP market-data feed, unauthenticated, no TLS, no framing recovery beyond disconnect-on-malformed. The socket path is diff --git a/apps/qsl-gateway/main.cpp b/apps/qsl-gateway/main.cpp index e3d5823..64088e8 100644 --- a/apps/qsl-gateway/main.cpp +++ b/apps/qsl-gateway/main.cpp @@ -1,15 +1,49 @@ #include "qsl/engine/matching_engine.hpp" +#include "qsl/gateway/epoll_server.hpp" #include "qsl/gateway/order_gateway.hpp" #include "qsl/gateway/tcp_server.hpp" +#include #include +#include #include +#include #include -// qsl-gateway [port] -> serve the binary order gateway on 127.0.0.1:port (default 9009). +// qsl-gateway [port] [--epoll] -> serve on 127.0.0.1:port (default 9009). // No authentication; localhost only. This is a local simulator, not a real venue. int main(int argc, char **argv) { - const std::uint16_t port = (argc >= 2) ? static_cast(std::stoul(argv[1])) : 9009; + // Flags may appear before or instead of the port; the first non-flag argument is the port. So + // `qsl-gateway`, `qsl-gateway 9009`, `qsl-gateway --epoll`, and `qsl-gateway 9009 --epoll` all + // work (parsing `--epoll` as a port previously aborted with std::invalid_argument). + std::uint16_t port = 9009; + bool use_epoll = false; + bool port_set = false; + for (int i = 1; i < argc; ++i) { + const std::string arg = argv[i]; + if (arg == "--epoll") { + use_epoll = true; + continue; + } + // The first non-flag arg is the port. Require the WHOLE token to be a single in-range + // number, so typos like "9009x" (std::stoul stops at the 'x' and accepts 9009), out-of- + // range values like "70000" (the uint16_t cast would truncate to 4464), or a second port + // token like "9009 9010" (which would silently bind the last) all fail fast. + std::size_t consumed = 0; + unsigned long value = 0; + try { + value = std::stoul(arg, &consumed); + } catch (const std::exception &) { + consumed = 0; // parse failed -> fall through to the usage error + } + if (port_set || consumed != arg.size() || + value > std::numeric_limits::max()) { + std::cerr << "usage: qsl-gateway [port] [--epoll] (one optional port, 0-65535)\n"; + return 2; + } + port = static_cast(value); + port_set = true; + } qsl::engine::MatchingEngine engine; engine.register_symbol("AAPL"); // SymbolId 0 @@ -17,6 +51,22 @@ int main(int argc, char **argv) { qsl::gateway::OrderGateway gateway{engine, qsl::gateway::RiskConfig{/*max_order_quantity=*/1'000'000, /*max_notional=*/1'000'000'000}}; + if (use_epoll) { + qsl::gateway::EpollServer server{gateway}; + if (!qsl::gateway::EpollServer::supported()) { + std::cerr << "epoll gateway mode requires Linux\n"; + return 2; + } + + std::cout << "qsl-gateway epoll listening on 127.0.0.1:" << port + << " (no auth, localhost only)\n"; + if (!server.run("127.0.0.1", port)) { + std::cerr << "failed to start epoll server on port " << port << "\n"; + return 1; + } + return 0; + } + qsl::gateway::TcpServer server{gateway}; std::cout << "qsl-gateway listening on 127.0.0.1:" << port << " (no auth, localhost only)\n"; diff --git a/docs/adr/0008-socket-evidence-loopback-constrained-epoll-deferred.md b/docs/adr/0008-socket-evidence-loopback-constrained-epoll-deferred.md index 21456c8..6ea9c79 100644 --- a/docs/adr/0008-socket-evidence-loopback-constrained-epoll-deferred.md +++ b/docs/adr/0008-socket-evidence-loopback-constrained-epoll-deferred.md @@ -1,4 +1,4 @@ -# ADR 0008: Socket Evidence Is Loopback-Constrained and epoll Is Deferred +# ADR 0008: Socket Evidence Is Loopback-Constrained and epoll Was Deferred ## Status @@ -12,8 +12,8 @@ feed. Two constraints shape what can honestly be claimed: 1. The gateway syscall / rusage profile needs Linux tools (`strace`, procfs) and was generated in containerized Linux (Docker) because the primary development host is macOS. Every socket experiment runs over loopback (`127.0.0.1`). -2. An `epoll`-based event-driven gateway is a natural next step, but `epoll` is a Linux-specific - API that cannot be compiled or tested on the macOS development host. +2. An `epoll`-based event-driven gateway was a natural next step, but `epoll` is a Linux-specific + API that could not be compiled or tested on the macOS development host during M30. Treating loopback profiling as real-network evidence, or committing untested platform-specific code, would be misleading and would violate the project's measured-evidence and no-untested-C++ @@ -25,9 +25,9 @@ bars. policy ADR 0007 applies to `perf`. They are labeled as such, carry OS/kernel/compiler/commit/ dirty-tree metadata, and never claim NIC/driver/real-network behavior or production capacity. - M30 profiles and hardens the **existing** one-connection-at-a-time gateway rather than rewriting - it. The `epoll` multi-client architecture is deferred to **M34**, and multi-client - socket-pressure testing to **M35**, so no untested Linux-only code is committed now. `io_uring` - is discussed only. + it. The `epoll` multi-client architecture was deferred to **M34**, and multi-client + socket-pressure testing to **M35**, so no untested Linux-only code was committed in M30. + ADR 0010 records the later tested M34 epoll prototype. `io_uring` is discussed only. - The UDP receive-buffer (`SO_RCVBUF`) knob is added and justified by a **measured** burst/loss experiment, not by assertion. Loss is detected (sequence gaps), not recovered; there is no retransmit/gap-fill channel. @@ -36,5 +36,5 @@ bars. M30 lands a real, measured socket profiling/hardening workflow and a justified hardening knob without overclaiming. Event-driven serving and real-network/hardware evidence remain explicit -later milestones (M34/M35) and acknowledged evidence debts, rather than implicit gaps or -untested code in the tree. +separately testable milestones and acknowledged evidence debts, rather than implicit gaps or +overclaimed artifacts. diff --git a/docs/adr/0010-linux-epoll-gateway-prototype.md b/docs/adr/0010-linux-epoll-gateway-prototype.md new file mode 100644 index 0000000..9589712 --- /dev/null +++ b/docs/adr/0010-linux-epoll-gateway-prototype.md @@ -0,0 +1,62 @@ +# ADR 0010: Linux epoll Gateway Prototype + +## Status + +Accepted + +## Context + +M9 intentionally used a blocking one-connection-at-a-time `TcpServer` so protocol/session +semantics could be proven without event-loop complexity. M30 profiled and hardened that socket +path, then deferred event-driven serving because Linux-only `epoll` code needed its own tested +milestone. + +The pure `Session` layer already owns framing, malformed-input handling, risk dispatch, and +response encoding. Rewriting those semantics for an event loop would be unnecessary and risky. + +## Decision + +M34 adds `EpollServer`, a Linux-only transport prototype. It uses one `epoll` loop, nonblocking +`accept4`, nonblocking client sockets, per-client outbound buffers, and one `Session` per +connection. The default `qsl-gateway` path remains the portable blocking `TcpServer`; Linux users +can opt into the prototype with: + +```bash +./build/dev/qsl-gateway 9009 --epoll +``` + +The epoll path handles readiness and buffering only. It does not change order-gateway risk logic, +matching, protocol codecs, or session semantics. + +Per-client response buffering is bounded. The epoll transport applies a soft high-water mark that +stops reading from a client until pending output drains, and a hard cap enforced through a bounded +`Session` append path. High-fanout `NewOrder` responses are previewed against current engine state +before they reach the gateway, so an over-cap response drops the connection without appending a +partial response and without mutating engine state. + +Disconnect handling distinguishes socket errors from peer hangups: `EPOLLERR` closes immediately, +while `EPOLLHUP` is honored only after any already-readable `EPOLLIN` bytes have been drained into +the session. Hard-cap overflow closes immediately when the over-cap frame appended nothing; if the +same read already queued replies for earlier accepted frames, reads are disabled and the bounded +pending replies are flushed before close. + +Each client event stores a per-connection generation token with the fd. If a closed fd is reused +for a new connection while stale events from the old connection remain in the same `epoll_wait` +batch, the generation mismatch makes those stale events no-ops. Once a session is closing +(`close_after_flush`), the server no longer re-arms `EPOLLIN`; it only flushes pending replies and +then closes. + +## Consequences + +The repo now has a real event-driven multi-client gateway architecture without adopting +thread-per-connection design. Nonblocking `EAGAIN` / `EWOULDBLOCK` and partial writes are handled +as transport backpressure through retry-on-readiness, not as protocol errors. + +This remains a simulator prototype: + +- no production-capacity or low-latency claim; +- no TLS/auth/rate limiting; +- no `io_uring`; +- no multi-client load or socket-pressure benchmark in M34. + +M35 owns load and pressure testing for the event-driven path. diff --git a/docs/socket_gateway.md b/docs/socket_gateway.md index e452b61..4952fdd 100644 --- a/docs/socket_gateway.md +++ b/docs/socket_gateway.md @@ -10,6 +10,9 @@ the M2 binary protocol. It is split into two pieces: - **`TcpServer`** (`include/qsl/gateway/tcp_server.hpp`) — a thin POSIX-socket transport: `serve_connection(fd)` runs a `Session` over one connected socket; `run(host, port)` binds, listens, and accepts connections one at a time. +- **`EpollServer`** (`include/qsl/gateway/epoll_server.hpp`) — a Linux-only event-driven + transport prototype: one `epoll` loop accepts multiple clients and drives one `Session` per + connection with nonblocking reads and writes. ## Message flow @@ -28,11 +31,47 @@ client <- Ack / Reject / Fill / HeartbeatAck <- Session The wire is a byte stream, not a message stream. `Session` accumulates bytes in a buffer and only processes a frame once the 16-byte header plus the declared `body_len` are present; a -frame split across multiple `read()`s is held until complete. Outbound responses are written -with a send-all loop that tolerates partial writes. The socket write path uses -`send(..., MSG_NOSIGNAL)` where available, and the platform socket option where available, -so a client that drops before reading a response cannot terminate the gateway through -`SIGPIPE`. +frame split across multiple `read()`s is held until complete. + +The blocking `TcpServer` writes responses with a send-all loop that tolerates partial writes. +The Linux `EpollServer` keeps a per-client outbound buffer and leaves the connection registered +for `EPOLLOUT` until all pending response bytes are accepted by the kernel. Both write paths use +`send(..., MSG_NOSIGNAL)` where available, and the platform socket option where available, so a +client that drops before reading a response cannot terminate the gateway through `SIGPIPE`. + +The epoll path treats `EAGAIN` / `EWOULDBLOCK` as normal nonblocking backpressure: + +- `accept4(..., SOCK_NONBLOCK)` loops until the listening socket would block. +- client reads loop until the connection would block, reaches EOF, errors, or the `Session` + flags a malformed stream for disconnect. +- client writes loop until the outbound buffer is empty or `send()` would block. +- a peer that half-closes after sending requests can still receive queued responses; the + connection closes after the pending outbound buffer drains. +- if Linux reports `EPOLLIN` together with `EPOLLHUP`, the epoll path drains the already-readable + bytes before honoring the hangup; `EPOLLERR` remains an immediate close. +- once a session is closing after malformed input or an over-cap frame with queued replies, + `EPOLLIN` is not re-armed; the connection is write-only until its pending output drains. +- client readiness events carry a per-connection generation token, so stale events in the same + `epoll_wait` batch cannot act on a newer connection that reused the same numeric fd. + +A client that keeps sending requests but stops reading its responses cannot grow the gateway's +memory without bound. Each connection's outbound buffer has a high-water mark +(`EpollServerOptions::max_outbuf_bytes`, default 1 MiB): once the backlog reaches it the server +stops reading from that client (drops `EPOLLIN`, keeping only `EPOLLOUT`), so unread requests back +up in the kernel receive buffer and TCP flow control pushes back on the sender. Reads resume once +the backlog drains below the mark. + +That soft mark bounds how many *further* requests a non-reading peer induces, but a single +request's response can fan out — a market order sweeping a deep book returns one fill per resting +maker. So a **hard cap** (`EpollServerOptions::max_outbuf_hard_bytes`, default 8 MiB) is the +absolute ceiling. The epoll path asks `Session` to append responses directly into the per-client +buffer under that byte budget; before a `NewOrder` reaches the gateway, the session previews the +accepted/rejected outcome and exact fill count against current engine state. If the full response +would exceed the cap, the connection is dropped without appending a partial response and without +mutating engine state. If the same read already queued valid replies for earlier accepted frames, +those replies are flushed and reads are disabled before close. A client that reads its responses +keeps the backlog near zero and trips neither threshold; only a peer that stops reading and then +induces an over-cap response is disconnected. ## Malformed frames @@ -45,15 +84,36 @@ so a client that drops before reading a response cannot terminate the gateway th ## Disconnect and heartbeat - Graceful disconnect: when the peer closes its write side, `read()` returns 0 (EOF) and the - server finishes serving and closes the connection. + server finishes serving and closes the connection; a complete request delivered before a hangup + is still drained before the connection is removed. - Heartbeats are a liveness round-trip only; the gateway does not yet time out idle peers. -## Why it is intentionally simple +## Event-driven gateway mode -A single-threaded accept-and-serve loop (one connection at a time) keeps the code easy to -reason about and avoids concurrency bugs. The goal is a credible, debuggable systems -demonstration, not a production matching venue, so there is no thread pool, no epoll/kqueue -event loop, and no TLS. +The default demo still uses `TcpServer` because it is portable and easiest to inspect. On Linux, +`qsl-gateway` can run the epoll prototype explicitly: + +```bash +./build/dev/qsl-gateway 9009 --epoll # explicit port +./build/dev/qsl-gateway --epoll # default port 9009; the flag may precede or replace the port +``` + +This mode is single-threaded and event-driven: it does not create one thread per connection. +Each connected client owns its own `Session`, so deterministic framing, malformed-frame handling, +risk checks, and response encoding are shared with the blocking transport. M34 tests the real +loopback socket path with two simultaneous clients (and a backpressure case under a small +high-water mark) and verifies every client receives correct, in-order responses through one event +loop. + +This is architecture validation, not a production-capacity claim. Multi-client load, socket +pressure, connection scaling, and throughput measurements remain M35 scope. + +## Why it is still intentionally simple + +The blocking path remains a single-threaded accept-and-serve loop (one connection at a time). +The epoll path is also single-threaded, but it multiplexes readiness across multiple clients. +There is still no thread pool, no TLS, no authentication, no rate limiting, and no real venue +connectivity. ## Security diff --git a/docs/socket_hardening.md b/docs/socket_hardening.md index d6a2de2..45f4644 100644 --- a/docs/socket_hardening.md +++ b/docs/socket_hardening.md @@ -66,14 +66,9 @@ stated plainly so the gap counter is not mistaken for reliability. ## Intentionally out of scope (and why) -- **`epoll` event-driven gateway.** The gateway serves one connection at a time (M9). An - `epoll`-based multi-client server (nonblocking `accept`/`recv`/`send`, `EAGAIN`/`EWOULDBLOCK` - handling, bounded event batch, clean shutdown) is a real next step, but it is a Linux-specific - API that cannot be compiled or tested on the macOS development host, and adding untested - platform-specific code would violate the project's "no untested C++" bar. It is therefore - deferred to its own milestone (**M34 — epoll gateway architecture**), with multi-client load - and socket-pressure testing in **M35**. M30 deliberately profiles and hardens the *existing* - design rather than rewriting it. +- **Event-driven load evidence.** M34 adds a Linux `epoll` gateway architecture prototype, but + M30's socket-profile artifacts still describe the original blocking gateway path. Multi-client + load, socket-pressure measurements, and capacity conclusions remain **M35** scope. - **`io_uring`.** Discussed only. It could reduce syscall overhead on the gateway path, but it is a substantial, kernel-version-sensitive dependency and is not justified by any measured bottleneck here. No `io_uring` code exists; none is claimed. diff --git a/docs/socket_profiling.md b/docs/socket_profiling.md index e2100f3..f13e00e 100644 --- a/docs/socket_profiling.md +++ b/docs/socket_profiling.md @@ -123,9 +123,9 @@ clean checkout on a bare-metal Linux host for a clean-tree version. - **Loopback only.** No NIC, device driver, queue discipline, routing, or real-network loss / reordering / latency is exercised. Loopback removes exactly the parts that dominate real network cost. -- **Single connection at a time.** The gateway (M9) serves one connection at a time by design; - this profiles that design. Event-driven multi-client serving (`epoll`) and multi-client load - are intentionally out of M30 scope and tracked as later milestones (M34/M35). +- **Single connection at a time.** The gateway profiled in M30 serves one connection at a time by + design; this artifact profiles that design. M34 adds an `epoll` architecture prototype, but + multi-client load and socket-pressure evidence are still M35 scope. - **`strace` perturbs timing.** Use Pass 1 (procfs rusage) for the user/kernel CPU split; use Pass 2 only for the syscall *mix*. - **Synthetic, deterministic flow.** The workload is the repo's seeded synthetic flow, not real diff --git a/include/qsl/engine/matching_engine.hpp b/include/qsl/engine/matching_engine.hpp index f14cb0c..1e9f43d 100644 --- a/include/qsl/engine/matching_engine.hpp +++ b/include/qsl/engine/matching_engine.hpp @@ -14,6 +14,8 @@ namespace qsl::engine { +using core::OrderType; + /// Interns external symbol names to compact numeric SymbolIds (assigned in /// registration order). class SymbolRegistry { @@ -72,6 +74,8 @@ class MatchingEngine { [[nodiscard]] std::optional best_bid(SymbolId symbol) const; [[nodiscard]] std::optional best_ask(SymbolId symbol) const; + [[nodiscard]] std::size_t fill_count(SymbolId symbol, Side side, Price price, OrderType type, + Quantity quantity) const; private: OrderBook *find_book(SymbolId symbol) noexcept; diff --git a/include/qsl/engine/order_book.hpp b/include/qsl/engine/order_book.hpp index 21f6f2e..1b53011 100644 --- a/include/qsl/engine/order_book.hpp +++ b/include/qsl/engine/order_book.hpp @@ -60,6 +60,8 @@ class OrderBook { [[nodiscard]] QuantityTotal quantity_at(Side side, Price price) const; [[nodiscard]] std::size_t order_count() const; [[nodiscard]] bool contains(OrderId id) const; + [[nodiscard]] std::size_t fill_count(Side taker_side, Price limit, bool is_market, + Quantity quantity) const; // Aggregate resting quantity per price level, best price first. [[nodiscard]] std::vector bid_levels() const; diff --git a/include/qsl/gateway/epoll_server.hpp b/include/qsl/gateway/epoll_server.hpp new file mode 100644 index 0000000..d3910c5 --- /dev/null +++ b/include/qsl/gateway/epoll_server.hpp @@ -0,0 +1,52 @@ +#pragma once + +#include "qsl/gateway/order_gateway.hpp" + +#include +#include +#include +#include + +namespace qsl::gateway { + +struct EpollServerOptions { + std::size_t max_events = 64; + int wait_timeout_ms = 50; + // Soft outbound high-water mark: while a client's pending response bytes reach this, the server + // stops reading more requests from it (drops EPOLLIN) and resumes once the backlog drains, so a + // peer that floods many small requests without reading gets polite backpressure rather than + // unbounded buffering. + std::size_t max_outbuf_bytes = 1U << 20; // 1 MiB + // Hard outbound cap: before forwarding a request to the gateway, the session accounts for the + // exact response bytes it can produce (Ack/Reject plus one Fill per matched maker). If + // appending that response would exceed this cap, the connection is dropped without mutating + // engine state. A client that reads its responses keeps the backlog near zero and never trips + // this. 0 disables the hard cap. + std::size_t max_outbuf_hard_bytes = 8U << 20; // 8 MiB +}; + +/// Linux epoll-based TCP front end for the order gateway. It is a transport prototype: +/// each connection still owns a deterministic Session, while one event loop multiplexes +/// accept/read/write readiness for multiple clients. +class EpollServer { + public: + explicit EpollServer(OrderGateway &gateway) : gateway_(gateway) {} + + [[nodiscard]] static bool supported() noexcept; + + void request_stop() noexcept { stop_requested_.store(true, std::memory_order_release); } + + /// Serve an already-bound/listening TCP socket. The caller owns the descriptor lifetime. + /// Exposed for tests so they can bind an ephemeral port without changing server state. + bool serve_listen_socket(int listen_fd, EpollServerOptions options = {}); + + /// Bind host:port, listen, and serve clients until request_stop() or an unrecoverable + /// setup error. Returns false if the platform or socket setup does not support epoll. + bool run(const std::string &host, std::uint16_t port, EpollServerOptions options = {}); + + private: + OrderGateway &gateway_; + std::atomic stop_requested_{false}; +}; + +} // namespace qsl::gateway diff --git a/include/qsl/gateway/order_gateway.hpp b/include/qsl/gateway/order_gateway.hpp index d14795f..0993aa0 100644 --- a/include/qsl/gateway/order_gateway.hpp +++ b/include/qsl/gateway/order_gateway.hpp @@ -6,12 +6,14 @@ #include "qsl/engine/matching_engine.hpp" #include "qsl/engine/risk.hpp" +#include #include #include namespace qsl::gateway { using core::OrderId; +using core::OrderType; using core::Price; using core::Quantity; using core::RejectReason; @@ -35,6 +37,11 @@ struct GatewayResult { } }; +struct NewOrderPreview { + bool accepted; + std::size_t fill_count; +}; + /// In-process order gateway: applies deterministic risk checks before forwarding accepted /// commands to the engine. Stateless beyond the engine reference and the risk config; /// "duplicate" and "unknown order" are defined by current engine state (resting orders). @@ -48,6 +55,10 @@ class OrderGateway { GatewayResult cancel(SymbolId symbol, OrderId id); GatewayResult modify(SymbolId symbol, OrderId id, Price new_price, Quantity new_quantity); + [[nodiscard]] NewOrderPreview preview_new_order(SymbolId symbol, OrderId id, Side side, + Price price, Quantity quantity, + OrderType type) const; + private: MatchingEngine &engine_; RiskConfig config_; diff --git a/include/qsl/gateway/session.hpp b/include/qsl/gateway/session.hpp index f9b723d..c985ba8 100644 --- a/include/qsl/gateway/session.hpp +++ b/include/qsl/gateway/session.hpp @@ -8,6 +8,11 @@ namespace qsl::gateway { +enum class SessionStatus { + Ok, + OutputLimitExceeded, +}; + /// Pure byte-level protocol session: no sockets. Buffers inbound bytes, decodes whole /// frames, drives the in-process OrderGateway, and returns response bytes. A malformed /// frame (bad header or undecodable body) flags the session for disconnect rather than @@ -18,12 +23,18 @@ class Session { /// Consume inbound bytes; return any response bytes to send back. [[nodiscard]] std::vector on_bytes(std::span input); + /// Consume inbound bytes and append responses directly to `out`, stopping before any + /// frame would push `out.size()` past `max_output_bytes`. This is used by bounded + /// transports so response fanout is accounted before gateway state is mutated. + [[nodiscard]] SessionStatus on_bytes(std::span input, + std::vector &out, std::size_t max_output_bytes); /// True once the peer should be disconnected (malformed frame seen). [[nodiscard]] bool logged_out() const noexcept { return logged_out_; } private: - void process_frame(std::span frame, std::vector &out); + SessionStatus process_frame(std::span frame, std::vector &out, + std::size_t max_output_bytes); OrderGateway &gateway_; std::vector inbuf_; diff --git a/src/engine/matching_engine.cpp b/src/engine/matching_engine.cpp index 8639961..c9d4c4b 100644 --- a/src/engine/matching_engine.cpp +++ b/src/engine/matching_engine.cpp @@ -123,6 +123,15 @@ std::optional MatchingEngine::best_ask(SymbolId symbol) const { return it == books_.end() ? std::nullopt : it->second.best_ask(); } +std::size_t MatchingEngine::fill_count(SymbolId symbol, Side side, Price price, OrderType type, + Quantity quantity) const { + const auto it = books_.find(symbol); + if (it == books_.end()) { + return 0; + } + return it->second.fill_count(side, price, type == OrderType::Market, quantity); +} + EngineSnapshot MatchingEngine::snapshot() const { EngineSnapshot snap; snap.last_seq = seq_; diff --git a/src/engine/order_book.cpp b/src/engine/order_book.cpp index f44381c..0be86eb 100644 --- a/src/engine/order_book.cpp +++ b/src/engine/order_book.cpp @@ -48,6 +48,32 @@ void OrderBook::match_against(OppMap &opposite, OrderId taker_id, bool taker_is_ } } +template +std::size_t count_matches(const OppMap &opposite, bool taker_is_buy, Price limit, bool is_market, + Quantity quantity) { + std::size_t count = 0; + for (auto level_it = opposite.begin(); quantity > 0 && level_it != opposite.end(); ++level_it) { + const Price level_price = level_it->first; + if (!is_market) { + if (taker_is_buy && level_price > limit) { + break; + } + if (!taker_is_buy && level_price < limit) { + break; + } + } + for (const Order &maker : level_it->second) { + if (quantity == 0) { + break; + } + const Quantity traded = std::min(quantity, maker.quantity); + quantity -= traded; + ++count; + } + } + return count; +} + OrderBook::Level &OrderBook::level_for(Side side, Price price) { if (side == Side::Buy) { auto [it, inserted] = bids_.emplace(price, Level{Level::allocator_type{resource_}}); @@ -183,6 +209,17 @@ bool OrderBook::contains(OrderId id) const { return index_.find(id) != index_.end(); } +std::size_t OrderBook::fill_count(Side taker_side, Price limit, bool is_market, + Quantity quantity) const { + if (taker_side == Side::Buy) { + return count_matches(asks_, /*taker_is_buy=*/true, limit, is_market, quantity); + } + if (taker_side == Side::Sell) { + return count_matches(bids_, /*taker_is_buy=*/false, limit, is_market, quantity); + } + return 0; +} + namespace { template std::vector collect_levels(const LevelMap &book) { std::vector levels; diff --git a/src/gateway/epoll_server.cpp b/src/gateway/epoll_server.cpp new file mode 100644 index 0000000..18af2a1 --- /dev/null +++ b/src/gateway/epoll_server.cpp @@ -0,0 +1,385 @@ +#include "qsl/gateway/epoll_server.hpp" + +#include "qsl/gateway/session.hpp" + +#if defined(__linux__) +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +namespace qsl::gateway { + +bool EpollServer::supported() noexcept { +#if defined(__linux__) + return true; +#else + return false; +#endif +} + +#if defined(__linux__) +namespace { + +inline constexpr std::uint32_t kListenerGeneration = 0; + +struct FdCloser { + void operator()(int *fd) const noexcept { + if (fd != nullptr && *fd >= 0) { + ::close(*fd); + } + delete fd; + } +}; + +using UniqueFd = std::unique_ptr; + +UniqueFd make_fd(int fd) { + return UniqueFd(new int(fd)); // NOLINT(cppcoreguidelines-owning-memory): small fd RAII shim +} + +bool is_would_block() noexcept { + return errno == EAGAIN || errno == EWOULDBLOCK; +} + +bool set_nonblocking(int fd) { + const int flags = ::fcntl(fd, F_GETFL, 0); + if (flags < 0) { + return false; + } + return ::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0; +} + +std::uint64_t pack_event_data(int fd, std::uint32_t generation) noexcept { + return (static_cast(generation) << 32U) | static_cast(fd); +} + +int event_fd(const epoll_event &ev) noexcept { + return static_cast(static_cast(ev.data.u64)); +} + +std::uint32_t event_generation(const epoll_event &ev) noexcept { + return static_cast(ev.data.u64 >> 32U); +} + +std::uint32_t next_client_generation(std::uint32_t &next_generation) noexcept { + const std::uint32_t generation = next_generation; + ++next_generation; + if (next_generation == kListenerGeneration) { + ++next_generation; + } + return generation; +} + +bool add_fd(int epoll_fd, int fd, std::uint32_t events, std::uint32_t generation) { + epoll_event ev{}; + ev.events = events; + ev.data.u64 = pack_event_data(fd, generation); + return ::epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev) == 0; +} + +bool mod_fd(int epoll_fd, int fd, std::uint32_t events, std::uint32_t generation) { + epoll_event ev{}; + ev.events = events; + ev.data.u64 = pack_event_data(fd, generation); + return ::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ev) == 0; +} + +struct ClientState { + ClientState(OrderGateway &gateway, std::uint32_t connection_generation) + : generation(connection_generation), session(gateway) {} + + std::uint32_t generation; + Session session; + std::vector outbuf; + std::size_t sent = 0; // bytes at the front of outbuf already written to the socket + bool input_closed = false; + bool close_after_flush = false; + + // Unsent bytes. The soft/hard limits and the "needs write" check use this, not outbuf.size(). + [[nodiscard]] std::size_t pending() const noexcept { return outbuf.size() - sent; } + // Reclaim already-sent bytes from the front. Done once per append (amortized), never per send. + void drop_sent_prefix() { + if (sent > 0) { + outbuf.erase(outbuf.begin(), outbuf.begin() + static_cast(sent)); + sent = 0; + } + } +}; + +// Flush as much of the client's pending output as the nonblocking socket accepts. Advances a write +// offset rather than erasing from the front after every send -- erase-per-send is O(n^2) when +// draining a large buffer and can stall the single event loop. Returns false only on a real send +// error; EAGAIN/EWOULDBLOCK (socket full) and EINTR (signal) are normal and retryable. +bool send_some(int fd, ClientState &client) { + while (client.sent < client.outbuf.size()) { + const ssize_t n = ::send(fd, client.outbuf.data() + client.sent, + client.outbuf.size() - client.sent, MSG_NOSIGNAL); + if (n > 0) { + client.sent += static_cast(n); + continue; + } + if (n < 0 && is_would_block()) { + return true; // socket full: resume on the next EPOLLOUT + } + if (n < 0 && errno == EINTR) { + continue; // interrupted by a signal: retry rather than dropping the connection + } + return false; + } + client.outbuf.clear(); // fully flushed + client.sent = 0; + return true; +} + +void close_client(int epoll_fd, int fd) noexcept { + ::epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr); + ::close(fd); +} + +void close_all_clients(int epoll_fd, std::unordered_map &clients) noexcept { + for (const auto &entry : clients) { + close_client(epoll_fd, entry.first); + } + clients.clear(); +} + +} // namespace + +bool EpollServer::serve_listen_socket(int listen_fd, EpollServerOptions options) { + if (listen_fd < 0 || options.max_events == 0 || !set_nonblocking(listen_fd)) { + return false; + } + + UniqueFd epoll_fd = make_fd(::epoll_create1(EPOLL_CLOEXEC)); + if (*epoll_fd < 0) { + return false; + } + if (!add_fd(*epoll_fd, listen_fd, EPOLLIN, kListenerGeneration)) { + return false; + } + + std::vector events(options.max_events); + std::unordered_map clients; + std::uint32_t next_generation = 1; + + stop_requested_.store(false, std::memory_order_release); + while (!stop_requested_.load(std::memory_order_acquire)) { + const int ready = ::epoll_wait(*epoll_fd, events.data(), static_cast(events.size()), + options.wait_timeout_ms); + if (ready < 0) { + if (errno == EINTR) { + continue; + } + close_all_clients(*epoll_fd, clients); + return false; + } + + for (int i = 0; i < ready; ++i) { + const epoll_event &ready_event = events[static_cast(i)]; + const int fd = event_fd(ready_event); + const std::uint32_t generation = event_generation(ready_event); + const std::uint32_t ev = ready_event.events; + + if (generation == kListenerGeneration && fd == listen_fd) { + for (;;) { + const int conn = ::accept4(listen_fd, nullptr, nullptr, SOCK_NONBLOCK); + if (conn >= 0) { + const std::uint32_t client_generation = + next_client_generation(next_generation); + auto emplaced = clients.try_emplace(conn, gateway_, client_generation); + if (!emplaced.second || + !add_fd(*epoll_fd, conn, EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP, + client_generation)) { + clients.erase(conn); + ::close(conn); + continue; + } + continue; + } + if (is_would_block()) { + break; + } + // Transient per-connection errors -- a connection aborted before accept, or a + // pending network error that Linux reports through accept -- must not tear down + // the whole server; skip this connection and keep serving the rest (accept(2)). + if (errno == EINTR || errno == ECONNABORTED || errno == EPROTO || + errno == ENETDOWN || errno == ENOPROTOOPT || errno == EHOSTDOWN || + errno == ENONET || errno == EHOSTUNREACH || errno == EOPNOTSUPP || + errno == ENETUNREACH) { + continue; + } + close_all_clients(*epoll_fd, clients); + return false; // genuinely fatal listener error + } + continue; + } + + auto it = clients.find(fd); + if (it == clients.end()) { + continue; + } + ClientState &client = it->second; + if (client.generation != generation) { + continue; + } + bool close_now = false; + const bool socket_error = (ev & EPOLLERR) != 0U; + const bool peer_hung_up = (ev & EPOLLHUP) != 0U; + + if (socket_error) { + close_now = true; + } + + // Drain any writable backlog first, so the read below sees an up-to-date pending() and + // the hard cap is not enforced against a stale buffer that send_some() could free + // (which would otherwise falsely drop a client that has resumed reading). + if (!close_now && (ev & EPOLLOUT) != 0U) { + close_now = !send_some(fd, client); + } + + if (!close_now && !client.input_closed && !client.close_after_flush && + (ev & EPOLLIN) != 0U) { + std::array buffer{}; + for (;;) { + // Soft backpressure: once the unsent backlog reaches the soft high-water mark, + // stop reading more requests before processing another (the re-arm below drops + // EPOLLIN), resuming once it drains. + if (!peer_hung_up && client.pending() >= options.max_outbuf_bytes) { + break; + } + const ssize_t n = ::read(fd, buffer.data(), buffer.size()); + if (n > 0) { + client.drop_sent_prefix(); // reclaim already-sent bytes before growing + const std::size_t out_before = client.outbuf.size(); + const std::size_t max_output = options.max_outbuf_hard_bytes == 0 + ? std::numeric_limits::max() + : options.max_outbuf_hard_bytes; + const SessionStatus status = client.session.on_bytes( + std::span(buffer.data(), static_cast(n)), + client.outbuf, max_output); + if (status == SessionStatus::OutputLimitExceeded) { + if (client.outbuf.size() > out_before) { + client.close_after_flush = true; + } else { + close_now = true; + } + break; + } + if (client.session.logged_out()) { + client.close_after_flush = true; + break; + } + continue; // re-checks the soft high-water mark at the top before reading + } + if (n == 0) { + client.input_closed = true; + break; + } + if (is_would_block()) { + break; + } + if (errno == EINTR) { + continue; + } + close_now = true; + break; + } + } + + if (!close_now) { + // EPOLLHUP can arrive together with EPOLLIN for bytes already delivered by the + // peer. Drain those bytes above before honoring the hangup; after that there is no + // receiving peer to flush to, so close immediately instead of re-arming a hot HUP. + if (peer_hung_up) { + close_now = true; + } else { + if ((ev & EPOLLRDHUP) != 0U) { + client.input_closed = true; + } + const bool want_write = client.pending() > 0; + if (!want_write && (client.input_closed || client.close_after_flush)) { + // Fully flushed and the peer is done / logged out / half-closed. RDHUP is a + // half-close: the peer can still receive queued responses, so keep flushing + // until want_write is false before closing. + close_now = true; + } else { + std::uint32_t want = EPOLLRDHUP | EPOLLERR | EPOLLHUP; + // Backpressure: only keep reading while the unsent backlog is below the + // high-water mark, so a client that stops reading its responses cannot make + // the gateway buffer unbounded output. Reads resume once the backlog + // drains. + if (!client.input_closed && !client.close_after_flush && + client.pending() < options.max_outbuf_bytes) { + want |= EPOLLIN; + } + if (want_write) { + want |= EPOLLOUT; // still have bytes to flush + } + close_now = !mod_fd(*epoll_fd, fd, want, client.generation); + } + } + } + + if (close_now) { + close_client(*epoll_fd, fd); + clients.erase(it); + } + } + } + + close_all_clients(*epoll_fd, clients); + return true; +} + +bool EpollServer::run(const std::string &host, std::uint16_t port, EpollServerOptions options) { + UniqueFd listen_fd = make_fd(::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)); + if (*listen_fd < 0) { + return false; + } + + int yes = 1; + ::setsockopt(*listen_fd, SOL_SOCKET, SO_REUSEADDR, &yes, static_cast(sizeof(yes))); + + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + const int parsed = ::inet_pton(AF_INET, host.c_str(), &addr.sin_addr); + if (parsed != 1) { + return false; + } + + auto *generic = reinterpret_cast(&addr); // NOLINT: POSIX socket API + if (::bind(*listen_fd, generic, static_cast(sizeof(addr))) < 0 || + ::listen(*listen_fd, 128) < 0) { + return false; + } + return serve_listen_socket(*listen_fd, options); +} + +#else + +bool EpollServer::serve_listen_socket(int, EpollServerOptions) { + static_cast(gateway_); + return false; +} + +bool EpollServer::run(const std::string &, std::uint16_t, EpollServerOptions) { + static_cast(gateway_); + return false; +} + +#endif + +} // namespace qsl::gateway diff --git a/src/gateway/order_gateway.cpp b/src/gateway/order_gateway.cpp index 2026db2..a2ed1e1 100644 --- a/src/gateway/order_gateway.cpp +++ b/src/gateway/order_gateway.cpp @@ -59,4 +59,19 @@ GatewayResult OrderGateway::modify(SymbolId symbol, OrderId id, Price new_price, return GatewayResult::accept(engine_.modify(symbol, id, new_price, new_quantity)); } +NewOrderPreview OrderGateway::preview_new_order(SymbolId symbol, OrderId id, Side side, Price price, + Quantity quantity, OrderType type) const { + if (!engine_.has_symbol(symbol) || engine_.contains(symbol, id)) { + return NewOrderPreview{/*accepted=*/false, /*fill_count=*/0}; + } + const RejectReason reason = (type == OrderType::Market) + ? engine::check_market(config_, side, quantity) + : engine::check_limit(config_, side, price, quantity); + if (reason != RejectReason::None) { + return NewOrderPreview{/*accepted=*/false, /*fill_count=*/0}; + } + return NewOrderPreview{/*accepted=*/true, + /*fill_count=*/engine_.fill_count(symbol, side, price, type, quantity)}; +} + } // namespace qsl::gateway diff --git a/src/gateway/session.cpp b/src/gateway/session.cpp index 0b864d9..3025e78 100644 --- a/src/gateway/session.cpp +++ b/src/gateway/session.cpp @@ -3,79 +3,171 @@ #include "qsl/engine/events.hpp" #include "qsl/protocol/codec.hpp" +#include +#include #include namespace qsl::gateway { namespace { -void append(std::vector &out, const std::vector &bytes) { +constexpr std::size_t frame_bytes(std::size_t body_bytes) noexcept { + return protocol::kHeaderSize + body_bytes; +} + +inline constexpr std::size_t kHeartbeatAckFrameBytes = frame_bytes(protocol::kHeartbeatBodySize); +inline constexpr std::size_t kAckFrameBytes = frame_bytes(protocol::kAckBodySize); +inline constexpr std::size_t kRejectFrameBytes = frame_bytes(protocol::kRejectBodySize); +inline constexpr std::size_t kFillFrameBytes = frame_bytes(protocol::kFillBodySize); + +bool has_space(std::size_t current, std::size_t add, std::size_t limit) noexcept { + return current <= limit && add <= limit - current; +} + +std::size_t add_saturating(std::size_t a, std::size_t b) noexcept { + const std::size_t max = std::numeric_limits::max(); + return b > max - a ? max : a + b; +} + +std::size_t mul_saturating(std::size_t a, std::size_t b) noexcept { + const std::size_t max = std::numeric_limits::max(); + return a != 0 && b > max / a ? max : a * b; +} + +std::size_t result_wire_size(const GatewayResult &result) noexcept { + if (!result.accepted) { + return kRejectFrameBytes; + } + std::size_t size = kAckFrameBytes; + for (const auto &event : result.events) { + if (std::holds_alternative(event)) { + size = add_saturating(size, kFillFrameBytes); + } + } + return size; +} + +SessionStatus append(std::vector &out, const std::vector &bytes, + std::size_t max_output_bytes) { + if (!has_space(out.size(), bytes.size(), max_output_bytes)) { + return SessionStatus::OutputLimitExceeded; + } out.insert(out.end(), bytes.begin(), bytes.end()); + return SessionStatus::Ok; } // Translate a gateway result into wire responses: a Reject, or an Ack followed by a Fill // per trade. -void emit_result(core::OrderId order_id, const GatewayResult &result, std::vector &out) { +SessionStatus emit_result(core::OrderId order_id, const GatewayResult &result, + std::vector &out, std::size_t max_output_bytes) { + if (!has_space(out.size(), result_wire_size(result), max_output_bytes)) { + return SessionStatus::OutputLimitExceeded; + } if (!result.accepted) { - append(out, protocol::encode(protocol::Reject{order_id, result.reason})); - return; + return append(out, protocol::encode(protocol::Reject{order_id, result.reason}), + max_output_bytes); } const core::SeqNo ack_seq = result.events.empty() ? 0 : engine::seq_of(result.events.front()); - append(out, protocol::encode(protocol::Ack{order_id, ack_seq})); + if (append(out, protocol::encode(protocol::Ack{order_id, ack_seq}), max_output_bytes) != + SessionStatus::Ok) { + return SessionStatus::OutputLimitExceeded; + } for (const auto &event : result.events) { if (const auto *t = std::get_if(&event)) { - append(out, protocol::encode(protocol::Fill{t->taker_id, t->maker_id, t->price, - t->quantity, t->seq})); + if (append(out, + protocol::encode( + protocol::Fill{t->taker_id, t->maker_id, t->price, t->quantity, t->seq}), + max_output_bytes) != SessionStatus::Ok) { + return SessionStatus::OutputLimitExceeded; + } } } + return SessionStatus::Ok; +} + +SessionStatus ensure_new_order_budget(const OrderGateway &gateway, const protocol::NewOrder &order, + std::vector &out, std::size_t max_output_bytes) { + const NewOrderPreview preview = gateway.preview_new_order( + order.symbol, order.order_id, order.side, order.price, order.quantity, order.type); + const std::size_t required = + preview.accepted + ? add_saturating(kAckFrameBytes, mul_saturating(preview.fill_count, kFillFrameBytes)) + : kRejectFrameBytes; + return has_space(out.size(), required, max_output_bytes) ? SessionStatus::Ok + : SessionStatus::OutputLimitExceeded; } } // namespace -void Session::process_frame(std::span frame, std::vector &out) { +SessionStatus Session::process_frame(std::span frame, std::vector &out, + std::size_t max_output_bytes) { const auto header = protocol::decode_header(frame); // already validated by on_bytes switch (header.value.type) { case protocol::MsgType::NewOrder: { const auto request = protocol::decode_new_order(frame); if (!request.ok()) { logged_out_ = true; - return; + return SessionStatus::Ok; + } + if (max_output_bytes != std::numeric_limits::max() && + ensure_new_order_budget(gateway_, request.value, out, max_output_bytes) != + SessionStatus::Ok) { + logged_out_ = true; + return SessionStatus::OutputLimitExceeded; } const auto &o = request.value; const GatewayResult result = (o.type == core::OrderType::Market) ? gateway_.new_market(o.symbol, o.order_id, o.side, o.quantity) : gateway_.new_limit(o.symbol, o.order_id, o.side, o.price, o.quantity, o.tif); - emit_result(o.order_id, result, out); - return; + if (emit_result(o.order_id, result, out, max_output_bytes) != SessionStatus::Ok) { + logged_out_ = true; + return SessionStatus::OutputLimitExceeded; + } + return SessionStatus::Ok; } case protocol::MsgType::CancelOrder: { const auto request = protocol::decode_cancel_order(frame); if (!request.ok()) { logged_out_ = true; - return; + return SessionStatus::Ok; } - emit_result(request.value.order_id, - gateway_.cancel(request.value.symbol, request.value.order_id), out); - return; + if (!has_space(out.size(), std::max(kAckFrameBytes, kRejectFrameBytes), max_output_bytes)) { + logged_out_ = true; + return SessionStatus::OutputLimitExceeded; + } + const GatewayResult result = gateway_.cancel(request.value.symbol, request.value.order_id); + if (emit_result(request.value.order_id, result, out, max_output_bytes) != + SessionStatus::Ok) { + logged_out_ = true; + return SessionStatus::OutputLimitExceeded; + } + return SessionStatus::Ok; } case protocol::MsgType::Heartbeat: { const auto request = protocol::decode_heartbeat(frame); if (!request.ok()) { logged_out_ = true; - return; + return SessionStatus::Ok; } - append(out, protocol::encode(protocol::HeartbeatAck{request.value.token})); - return; + if (!has_space(out.size(), kHeartbeatAckFrameBytes, max_output_bytes)) { + logged_out_ = true; + return SessionStatus::OutputLimitExceeded; + } + return append(out, protocol::encode(protocol::HeartbeatAck{request.value.token}), + max_output_bytes); } default: logged_out_ = true; // unexpected (e.g. a response) message -> drop - return; + return SessionStatus::Ok; } } -std::vector Session::on_bytes(std::span input) { +SessionStatus Session::on_bytes(std::span input, std::vector &out, + std::size_t max_output_bytes) { + if (logged_out_) { + return SessionStatus::Ok; + } inbuf_.insert(inbuf_.end(), input.begin(), input.end()); - std::vector out; std::size_t consumed = 0; while (!logged_out_) { const std::span remaining(inbuf_.data() + consumed, @@ -92,10 +184,21 @@ std::vector Session::on_bytes(std::span input) { if (remaining.size() < total) { break; // wait for the full body } - process_frame(remaining.subspan(0, total), out); + const SessionStatus status = + process_frame(remaining.subspan(0, total), out, max_output_bytes); + if (status != SessionStatus::Ok) { + inbuf_.erase(inbuf_.begin(), inbuf_.begin() + static_cast(consumed)); + return status; + } consumed += total; } inbuf_.erase(inbuf_.begin(), inbuf_.begin() + static_cast(consumed)); + return SessionStatus::Ok; +} + +std::vector Session::on_bytes(std::span input) { + std::vector out; + static_cast(on_bytes(input, out, std::numeric_limits::max())); return out; } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 4a5967d..cdb7bf3 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -13,8 +13,9 @@ find_package(Threads REQUIRED) foreach(t test_smoke test_types test_clock test_protocol test_order_book test_matching_engine test_risk_gateway test_market_data test_event_log test_replay test_session - test_tcp_gateway test_md_feed test_invariants test_fuzz_protocol test_fixture_export - test_shrink test_oracle_selftest test_reject_coverage test_spsc_ring test_order_pool) + test_tcp_gateway test_epoll_gateway test_md_feed test_invariants test_fuzz_protocol + test_fixture_export test_shrink test_oracle_selftest test_reject_coverage test_spsc_ring + test_order_pool) add_executable(${t} unit/${t}.cpp) target_link_libraries(${t} PRIVATE qsl_core qsl_warnings Catch2::Catch2WithMain Threads::Threads) catch_discover_tests(${t}) diff --git a/tests/unit/test_epoll_gateway.cpp b/tests/unit/test_epoll_gateway.cpp new file mode 100644 index 0000000..9d0a3da --- /dev/null +++ b/tests/unit/test_epoll_gateway.cpp @@ -0,0 +1,363 @@ +#include "qsl/engine/matching_engine.hpp" +#include "qsl/gateway/epoll_server.hpp" +#include "qsl/gateway/order_gateway.hpp" +#include "qsl/protocol/codec.hpp" + +#include + +#if defined(__linux__) +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +using namespace qsl::gateway; +using namespace qsl::protocol; + +#if defined(__linux__) +namespace { + +void set_read_timeout(int fd) { + timeval timeout{}; + timeout.tv_sec = 2; + REQUIRE(::setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, + static_cast(sizeof(timeout))) == 0); +} + +void write_all(int fd, const std::vector &data) { + std::size_t off = 0; + while (off < data.size()) { + const ssize_t n = ::send(fd, data.data() + off, data.size() - off, MSG_NOSIGNAL); + REQUIRE(n > 0); + off += static_cast(n); + } +} + +std::vector read_types(int fd, std::size_t expected) { + std::vector received; + std::vector types; + std::array buf{}; + + while (types.size() < expected) { + const ssize_t n = ::read(fd, buf.data(), buf.size()); + REQUIRE(n > 0); + received.insert(received.end(), buf.begin(), buf.begin() + n); + + std::size_t off = 0; + while (off + kHeaderSize <= received.size()) { + const auto header = decode_header(std::span(received).subspan(off)); + REQUIRE(header.error == DecodeError::None); + const std::size_t total = kHeaderSize + header.value.body_len; + if (off + total > received.size()) { + break; + } + types.push_back(header.value.type); + off += total; + } + received.erase(received.begin(), received.begin() + static_cast(off)); + } + return types; +} + +int bind_loopback_listener(sockaddr_in &bound) { + const int listen_fd = ::socket(AF_INET, SOCK_STREAM, 0); + REQUIRE(listen_fd >= 0); + int yes = 1; + REQUIRE(::setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &yes, + static_cast(sizeof(yes))) == 0); + + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_port = 0; + REQUIRE(::inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr) == 1); + auto *addr_generic = reinterpret_cast(&addr); // NOLINT: POSIX socket API + REQUIRE(::bind(listen_fd, addr_generic, static_cast(sizeof(addr))) == 0); + REQUIRE(::listen(listen_fd, 16) == 0); + + socklen_t bound_len = sizeof(bound); + auto *bound_generic = reinterpret_cast(&bound); // NOLINT: POSIX socket API + REQUIRE(::getsockname(listen_fd, bound_generic, &bound_len) == 0); + return listen_fd; +} + +int connect_client(const sockaddr_in &bound) { + const int fd = ::socket(AF_INET, SOCK_STREAM, 0); + REQUIRE(fd >= 0); + set_read_timeout(fd); + const auto *generic = reinterpret_cast(&bound); // NOLINT: POSIX socket API + REQUIRE(::connect(fd, generic, static_cast(sizeof(bound))) == 0); + return fd; +} + +} // namespace +#endif + +TEST_CASE("epoll gateway availability is platform scoped", "[gateway][epoll]") { +#if defined(__linux__) + REQUIRE(EpollServer::supported()); +#else + REQUIRE_FALSE(EpollServer::supported()); +#endif +} + +#if defined(__linux__) + +TEST_CASE("epoll gateway handles multiple clients through one event loop", "[gateway][epoll]") { + sockaddr_in bound{}; + const int listen_fd = bind_loopback_listener(bound); + + qsl::engine::MatchingEngine engine; + engine.register_symbol("AAPL"); + OrderGateway gateway{engine, RiskConfig{1000, 1'000'000}}; + EpollServer server{gateway}; + std::atomic server_ok{false}; + + std::thread server_thread([&] { + server_ok.store( + server.serve_listen_socket( + listen_fd, EpollServerOptions{/*max_events=*/16, /*wait_timeout_ms=*/10}), + std::memory_order_release); + }); + + const int client1 = connect_client(bound); + const int client2 = connect_client(bound); + + write_all(client1, + encode(NewOrder{1, 0, 100, 5, Side::Sell, OrderType::Limit, TimeInForce::GTC}, 1)); + const auto first = read_types(client1, 1); + REQUIRE(first == std::vector{MsgType::Ack}); + + write_all(client2, encode(Heartbeat{42})); + const auto heartbeat = read_types(client2, 1); + REQUIRE(heartbeat == std::vector{MsgType::HeartbeatAck}); + + write_all(client2, + encode(NewOrder{2, 0, 100, 5, Side::Buy, OrderType::Limit, TimeInForce::GTC}, 2)); + const auto cross = read_types(client2, 2); + REQUIRE(cross == std::vector{MsgType::Ack, MsgType::Fill}); + + write_all(client1, encode(Heartbeat{99})); + const auto still_alive = read_types(client1, 1); + REQUIRE(still_alive == std::vector{MsgType::HeartbeatAck}); + + REQUIRE(::close(client1) == 0); + REQUIRE(::close(client2) == 0); + server.request_stop(); + server_thread.join(); + REQUIRE(server_ok.load(std::memory_order_acquire)); + REQUIRE(::close(listen_fd) == 0); +} + +TEST_CASE("epoll gateway rejects invalid bind hosts", "[gateway][epoll]") { + qsl::engine::MatchingEngine engine; + engine.register_symbol("AAPL"); + OrderGateway gateway{engine, RiskConfig{1000, 1'000'000}}; + EpollServer server{gateway}; + + REQUIRE_FALSE(server.run("localhost", 0)); + REQUIRE_FALSE(server.run("not-an-ip", 0)); +} + +TEST_CASE("epoll gateway applies backpressure without dropping responses", "[gateway][epoll]") { + sockaddr_in bound{}; + const int listen_fd = bind_loopback_listener(bound); + + qsl::engine::MatchingEngine engine; + engine.register_symbol("AAPL"); + OrderGateway gateway{engine, RiskConfig{1000, 1'000'000}}; + EpollServer server{gateway}; + std::atomic server_ok{false}; + + // A tiny outbound high-water mark forces the read-gating path: once a few responses are + // buffered the server must stop reading (drop EPOLLIN) and resume only as the backlog drains. + // The contract under that path is that no response is dropped or reordered and nothing + // deadlocks. + std::thread server_thread([&] { + server_ok.store(server.serve_listen_socket( + listen_fd, EpollServerOptions{/*max_events=*/16, /*wait_timeout_ms=*/10, + /*max_outbuf_bytes=*/256}), + std::memory_order_release); + }); + + const int client = connect_client(bound); + + // Send many resting buy orders at one price (no cross -> exactly one Ack each) before reading + // any response, so the server's outbound backlog repeatedly crosses the high-water mark. + constexpr std::uint64_t kOrders = 64; + for (std::uint64_t i = 1; i <= kOrders; ++i) { + write_all(client, + encode(NewOrder{i, 0, 100, 5, Side::Buy, OrderType::Limit, TimeInForce::GTC}, i)); + } + + const auto types = read_types(client, static_cast(kOrders)); + REQUIRE(types.size() == static_cast(kOrders)); + REQUIRE(std::all_of(types.begin(), types.end(), [](MsgType t) { return t == MsgType::Ack; })); + + REQUIRE(::close(client) == 0); + server.request_stop(); + server_thread.join(); + REQUIRE(server_ok.load(std::memory_order_acquire)); + REQUIRE(::close(listen_fd) == 0); +} + +TEST_CASE("epoll gateway drops a non-reading client that exceeds the hard buffer cap", + "[gateway][epoll]") { + sockaddr_in bound{}; + const int listen_fd = bind_loopback_listener(bound); + + qsl::engine::MatchingEngine engine; + engine.register_symbol("AAPL"); + OrderGateway gateway{engine, RiskConfig{1000, 1'000'000}}; + EpollServer server{gateway}; + std::atomic server_ok{false}; + + // Tiny soft mark and hard cap: a single crossing order whose response (Ack + one Fill per + // maker) exceeds the hard cap must make the server drop the connection rather than buffer it. + std::thread server_thread([&] { + server_ok.store(server.serve_listen_socket( + listen_fd, EpollServerOptions{/*max_events=*/16, /*wait_timeout_ms=*/10, + /*max_outbuf_bytes=*/64, + /*max_outbuf_hard_bytes=*/64}), + std::memory_order_release); + }); + + const int client = connect_client(bound); + + // Build a book of eight resting sells at one price, reading each Ack so the buffer stays low. + constexpr std::uint64_t kMakers = 8; + for (std::uint64_t i = 1; i <= kMakers; ++i) { + write_all( + client, + encode(NewOrder{i, 0, 100, 5, Side::Sell, OrderType::Limit, TimeInForce::GTC}, i)); + REQUIRE(read_types(client, 1) == std::vector{MsgType::Ack}); + } + + // One crossing buy fills all eight makers -> Ack + 8 Fills, far past the 64-byte hard cap, so + // the server enforces the cap before buffering and drops the connection instead. + write_all(client, encode(NewOrder{kMakers + 1, 0, 100, 40, Side::Buy, OrderType::Limit, + TimeInForce::GTC}, + kMakers + 1)); + + // The drop surfaces as a clean EOF (read returns 0) rather than the over-cap response. A + // non-zero read here would mean the server buffered/sent it (cap not enforced); a -1 timeout + // would mean it neither answered nor closed. + std::array buf{}; + ssize_t n = 0; + for (int tries = 0; tries < 50; ++tries) { + n = ::read(client, buf.data(), buf.size()); + if (n <= 0) { + break; + } + } + REQUIRE(n == 0); // server closed the connection (hard cap enforced) + + REQUIRE(::close(client) == 0); + server.request_stop(); + server_thread.join(); + REQUIRE(server_ok.load(std::memory_order_acquire)); + const auto snapshot = engine.snapshot(); + REQUIRE(snapshot.last_seq == kMakers); + REQUIRE(snapshot.symbols.size() == 1); + REQUIRE(snapshot.symbols.front().order_count == kMakers); + REQUIRE(snapshot.symbols.front().asks.size() == 1); + REQUIRE(snapshot.symbols.front().asks.front().quantity == kMakers * 5); + REQUIRE(::close(listen_fd) == 0); +} + +TEST_CASE("epoll gateway preserves queued replies before later over-cap close", + "[gateway][epoll]") { + sockaddr_in bound{}; + const int listen_fd = bind_loopback_listener(bound); + + qsl::engine::MatchingEngine engine; + engine.register_symbol("AAPL"); + OrderGateway gateway{engine, RiskConfig{1000, 1'000'000}}; + EpollServer server{gateway}; + std::atomic server_ok{false}; + + std::thread server_thread([&] { + server_ok.store(server.serve_listen_socket( + listen_fd, EpollServerOptions{/*max_events=*/16, /*wait_timeout_ms=*/10, + /*max_outbuf_bytes=*/1024, + /*max_outbuf_hard_bytes=*/kHeaderSize + + kAckBodySize}), + std::memory_order_release); + }); + + const int client = connect_client(bound); + + auto batch = encode(NewOrder{1, 0, 100, 5, Side::Sell, OrderType::Limit, TimeInForce::GTC}, 1); + const auto second = + encode(NewOrder{2, 0, 101, 5, Side::Sell, OrderType::Limit, TimeInForce::GTC}, 2); + batch.insert(batch.end(), second.begin(), second.end()); + write_all(client, batch); + + REQUIRE(read_types(client, 1) == std::vector{MsgType::Ack}); + + std::array buf{}; + const ssize_t n = ::read(client, buf.data(), buf.size()); + REQUIRE(n == 0); + + REQUIRE(::close(client) == 0); + server.request_stop(); + server_thread.join(); + REQUIRE(server_ok.load(std::memory_order_acquire)); + + const auto snapshot = engine.snapshot(); + REQUIRE(snapshot.last_seq == 1); + REQUIRE(snapshot.symbols.size() == 1); + REQUIRE(snapshot.symbols.front().order_count == 1); + REQUIRE(snapshot.symbols.front().asks.size() == 1); + REQUIRE(snapshot.symbols.front().asks.front().price == 100); + REQUIRE(snapshot.symbols.front().asks.front().quantity == 5); + REQUIRE(::close(listen_fd) == 0); +} + +TEST_CASE("epoll gateway drains a complete request before hangup close", "[gateway][epoll]") { + sockaddr_in bound{}; + const int listen_fd = bind_loopback_listener(bound); + + qsl::engine::MatchingEngine engine; + engine.register_symbol("AAPL"); + OrderGateway gateway{engine, RiskConfig{1000, 1'000'000}}; + EpollServer server{gateway}; + std::atomic server_ok{false}; + + std::thread server_thread([&] { + server_ok.store( + server.serve_listen_socket( + listen_fd, EpollServerOptions{/*max_events=*/16, /*wait_timeout_ms=*/10}), + std::memory_order_release); + }); + + const int client = connect_client(bound); + write_all(client, + encode(NewOrder{1, 0, 100, 5, Side::Sell, OrderType::Limit, TimeInForce::GTC}, 1)); + REQUIRE(::close(client) == 0); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + server.request_stop(); + server_thread.join(); + REQUIRE(server_ok.load(std::memory_order_acquire)); + + const auto snapshot = engine.snapshot(); + REQUIRE(snapshot.last_seq == 1); + REQUIRE(snapshot.symbols.size() == 1); + REQUIRE(snapshot.symbols.front().order_count == 1); + REQUIRE(snapshot.symbols.front().asks.size() == 1); + REQUIRE(snapshot.symbols.front().asks.front().quantity == 5); + REQUIRE(::close(listen_fd) == 0); +} + +#endif diff --git a/tests/unit/test_session.cpp b/tests/unit/test_session.cpp index 24cc0d4..ac37ce7 100644 --- a/tests/unit/test_session.cpp +++ b/tests/unit/test_session.cpp @@ -82,6 +82,32 @@ TEST_CASE("a market order crosses resting liquidity", "[session]") { REQUIRE(decode_fill(frames[1]).value.maker_id == 1); } +TEST_CASE("a bounded session rejects over-budget fanout before gateway mutation", "[session]") { + MatchingEngine eng; + eng.register_symbol("AAPL"); + OrderGateway gw{eng, RiskConfig{1000, 1'000'000}}; + Session session{gw}; + + constexpr std::uint64_t kMakers = 8; + for (std::uint64_t i = 1; i <= kMakers; ++i) { + static_cast(session.on_bytes( + encode(NewOrder{i, 0, 100, 1, Side::Sell, OrderType::Limit, TimeInForce::GTC}, i))); + } + const auto before = eng.snapshot(); + + std::vector out; + const NewOrder sweep{ + kMakers + 1, 0, 100, static_cast(kMakers), Side::Buy, OrderType::Limit, + TimeInForce::GTC}; + const SessionStatus status = session.on_bytes(encode(sweep, kMakers + 1), out, + /*max_output_bytes=*/64); + + REQUIRE(status == SessionStatus::OutputLimitExceeded); + REQUIRE(session.logged_out()); + REQUIRE(out.empty()); + REQUIRE(eng.snapshot() == before); +} + TEST_CASE("a cancel order through the session yields an Ack", "[session]") { MatchingEngine eng; eng.register_symbol("AAPL");