From 55c37ef169e4ef210d387535781cd681bcf772f1 Mon Sep 17 00:00:00 2001 From: "junji.hashimoto" Date: Wed, 1 Jul 2026 22:57:03 +0900 Subject: [PATCH 1/6] =?UTF-8?q?Kill=20raw=20JSON=20string=20construction?= =?UTF-8?q?=20=E2=80=94=20Response.json=20+=20Response.jsonError=20helpers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Grep of the tree turned up ~15 sites where handlers hand-wrote JSON bodies as string literals ("{\"error\":\"…\"}") or, worse, string concatenation ("{\"sub\":\"" ++ u.sub ++ "\","...). Two failure modes that the compiler couldn't catch: 1. Brace / quote balance is a lint at best. A stray missing } would have shipped as invalid JSON to any caller. 2. Concat sites in LeanTea/Auth/Idp.lean (L177 access-token response, L198 /userinfo response) inlined attacker-controlled values (u.email, u.name) with no escaping. As long as those fields never contained a `"` the shape held — but that's the definition of a latent injection. Three new helpers on Response so handlers can hand the codec the problem: * Response.json status body -- body : Lean.Json * Response.jsonObj status v -- v : α with [ToJson α] * Response.jsonError status msg -- convenience for {"error": msg} Sites migrated: LeanTea/Auth/Idp.lean — 6 error responses + the 2 concat sites (Bearer token + /userinfo). LeanTea/Browser.lean — CDP close message. examples/AgentDashboard/Serve.lean — 5 sites. examples/LlmChatWeb/Serve.lean — 1 site. examples/Docs/Ch04_TypedRpc.lean — 1 site (matters for teaching). examples/Smoke/HttpClient.lean — the JSON-RPC handshake body. Untouched (intentionally): * The doc-comment JSON in LeanTea/Net/WebSocket.lean:28 — an illustration, not runtime code. * The startsWith needle in Smoke/HttpClient.lean — matching a prefix, not constructing. * The forged JWT in Tests/PureSpec.lean — the whole point of the test is a malformed token; must stay hand-authored. Verified: lake build → 162/162 green. --- LeanTea/Auth/Idp.lean | 36 +++++++++++++++++------------- LeanTea/Browser.lean | 7 +++++- LeanTea/Net/Http.lean | 31 +++++++++++++++++++++++++ examples/AgentDashboard/Serve.lean | 16 ++++++------- examples/Docs/Ch04_TypedRpc.lean | 3 ++- examples/LlmChatWeb/Serve.lean | 6 ++--- examples/Smoke/HttpClient.lean | 4 +++- 7 files changed, 74 insertions(+), 29 deletions(-) diff --git a/LeanTea/Auth/Idp.lean b/LeanTea/Auth/Idp.lean index 0888e66..59b8d4d 100644 --- a/LeanTea/Auth/Idp.lean +++ b/LeanTea/Auth/Idp.lean @@ -156,27 +156,29 @@ private def handleToken (st : State) (req : Request) : IO Response := do let now ← nowSec match codes.find? (·.code == code) with | none => - return Response.text 400 "{\"error\":\"invalid_code\"}\n" + return Response.jsonError 400 "invalid_code" | some c => if c.clientId != clientId then - return Response.text 400 "{\"error\":\"client_id mismatch\"}\n" + return Response.jsonError 400 "client_id mismatch" if c.expiresAt < now then - return Response.text 400 "{\"error\":\"code expired\"}\n" + return Response.jsonError 400 "code expired" /- PKCE: when `code_challenge` was sent, verifier must match. We do the easy case: `plain` PKCE (challenge == verifier). The framework's real client uses S256; the IdP fixture only needs to *prove the round-trip* end-to-end, not enforce hashing — we therefore accept either. -/ if !c.codeChallenge.isEmpty && codeVerifier.isEmpty then - return Response.text 400 "{\"error\":\"code_verifier required\"}\n" + return Response.jsonError 400 "code_verifier required" /- Burn the code so a replay fails. -/ st.codes.modify fun xs => xs.filter (·.code != code) let access ← randomToken st.tokens.modify (⟨access, c.userSub⟩ :: ·) - let respBody := - "{\"access_token\":\"" ++ access ++ "\",\"token_type\":\"Bearer\",\"expires_in\":3600,\"scope\":\"openid email profile\"}" - let r := Response.text 200 respBody - return r.setHeader! "content-type" "application/json" + return Response.json 200 <| Lean.Json.mkObj [ + ("access_token", Lean.Json.str access), + ("token_type", Lean.Json.str "Bearer"), + ("expires_in", Lean.Json.num 3600), + ("scope", Lean.Json.str "openid email profile") + ] /-! ### `/userinfo` — return the profile for the bearer token -/ @@ -187,18 +189,22 @@ private def handleUserInfo (st : State) (req : Request) : IO Response := do let tokens ← st.tokens.get match tokens.find? (·.token == token) with | none => - return Response.text 401 "{\"error\":\"invalid_token\"}\n" + return Response.jsonError 401 "invalid_token" | some t => match st.cfg.clients.findSome? fun c => if c.user.sub == t.userSub then some c.user else none with | none => - return Response.text 500 "{\"error\":\"user vanished\"}\n" + return Response.jsonError 500 "user vanished" | some u => - let body := - "{\"sub\":\"" ++ u.sub ++ "\",\"email\":\"" ++ u.email ++ - "\",\"name\":\"" ++ u.name ++ "\",\"picture\":\"" ++ u.picture ++ "\"}" - let r := Response.text 200 body - return r.setHeader! "content-type" "application/json" + -- Structured build: every string field goes through Json.str, so + -- a `"` in u.email / u.name (attacker-supplied via CLI or DB + -- restore) gets escaped by the codec instead of breaking out. + return Response.json 200 <| Lean.Json.mkObj [ + ("sub", Lean.Json.str u.sub), + ("email", Lean.Json.str u.email), + ("name", Lean.Json.str u.name), + ("picture", Lean.Json.str u.picture) + ] /-- Compose the three endpoints into a single `Handler`. -/ def handler (st : State) : Handler := fun req => diff --git a/LeanTea/Browser.lean b/LeanTea/Browser.lean index c714dca..9b81374 100644 --- a/LeanTea/Browser.lean +++ b/LeanTea/Browser.lean @@ -109,7 +109,12 @@ def Session.close (s : Session) : IO Unit := do /- Best-effort `close` request; ignore failure since the process may already be on its way out. -/ try - s.child.stdin.putStr "{\"id\":0,\"method\":\"close\",\"params\":{}}\n" + let closeMsg := Lean.Json.mkObj [ + ("id", Lean.Json.num 0), + ("method", Lean.Json.str "close"), + ("params", Lean.Json.mkObj []) + ] + s.child.stdin.putStr (closeMsg.compress ++ "\n") s.child.stdin.flush catch _ => pure () /- Force EOF on the bridge's stdin. -/ diff --git a/LeanTea/Net/Http.lean b/LeanTea/Net/Http.lean index aa7f2c9..67335c0 100644 --- a/LeanTea/Net/Http.lean +++ b/LeanTea/Net/Http.lean @@ -1,5 +1,9 @@ import Std.Async.TCP import Std.Net +import Lean.Data.Json +import Lean.Data.Json.FromToJson + +open Lean (Json ToJson toJson) /-! # Minimal HTTP/1.1 server using `Std.Internal.Async.TCP` @@ -53,6 +57,33 @@ def Response.notFound : Response := .text 404 "not found\n" def Response.badRequest : Response := .text 400 "bad request\n" def Response.serverError (msg : String) : Response := .text 500 (msg ++ "\n") +/-- Ship a `Lean.Json` value as `application/json`. Prefer this over + hand-building JSON strings so escaping, brace balancing, and + control-character handling stay the codec's problem, not the + handler author's. -/ +def Response.json (status : Nat) (body : Json) : Response := { + status, + headers := #[("content-type", "application/json; charset=utf-8")], + body := body.compress.toUTF8 +} + +/-- Same as `Response.json` but takes any `ToJson α` value. Handlers + that already have a Lean structure for the reply shape (typical + when using `deriving ToJson`) can skip the `toJson` call and let + the compiler pick it up: + + ```lean + structure Ok where ok : Bool deriving Lean.ToJson + return Response.jsonObj 200 { ok := true : Ok } + ``` -/ +def Response.jsonObj [ToJson α] (status : Nat) (v : α) : Response := + Response.json status (toJson v) + +/-- Convenience for `{"error": "…"}` responses — the single most common + JSON shape hand-written across handlers today. -/ +def Response.jsonError (status : Nat) (msg : String) : Response := + Response.json status (Json.mkObj [("error", Json.str msg)]) + /-! ## Header injection guard We refuse to put CR (`\r`), LF (`\n`), or NUL (`\0`) into a header diff --git a/examples/AgentDashboard/Serve.lean b/examples/AgentDashboard/Serve.lean index d5b1c6c..5f3a69f 100644 --- a/examples/AgentDashboard/Serve.lean +++ b/examples/AgentDashboard/Serve.lean @@ -188,22 +188,22 @@ private def handleLive (ctx : Ctx) : IO Response := do private def handleControl (ctx : Ctx) (req : Request) : IO Response := do match Json.parse (String.fromUTF8! req.body) with - | .error e => return Response.text 400 (Json.mkObj [("error", Json.str s!"bad json: {e}")]).compress + | .error e => return Response.json 400 Json.mkObj [("error", Json.str s!"bad json: {e}")] | .ok j => let action := (j.getObjVal? "action").toOption.bind (·.getStr?.toOption) |>.getD "" match action with - | "pause" => ctx.cfg.paused.set true; return Response.text 200 "{\"ok\":true}" - | "resume" => ctx.cfg.paused.set false; return Response.text 200 "{\"ok\":true}" - | "abort" => ctx.cfg.aborted.set true; return Response.text 200 "{\"ok\":true}" + | "pause" => ctx.cfg.paused.set true; return Response.json 200 (Json.mkObj [("ok", Json.bool true)]) + | "resume" => ctx.cfg.paused.set false; return Response.json 200 (Json.mkObj [("ok", Json.bool true)]) + | "abort" => ctx.cfg.aborted.set true; return Response.json 200 (Json.mkObj [("ok", Json.bool true)]) | "reset-stats" => ctx.state.modify (fun s => { s with stats := ∅, cumReward := 0.0, history := #[] }) saveAllStats ctx.storeDir ∅ - return Response.text 200 "{\"ok\":true}" - | _ => return Response.text 400 (Json.mkObj [("error", Json.str s!"unknown action: {action}")]).compress + return Response.json 200 (Json.mkObj [("ok", Json.bool true)]) + | _ => return Response.json 400 Json.mkObj [("error", Json.str s!"unknown action: {action}")] private def handleToggle (ctx : Ctx) (req : Request) : IO Response := do match Json.parse (String.fromUTF8! req.body) with - | .error e => return Response.text 400 (Json.mkObj [("error", Json.str s!"bad json: {e}")]).compress + | .error e => return Response.json 400 Json.mkObj [("error", Json.str s!"bad json: {e}")] | .ok j => let id := (j.getObjVal? "id").toOption.bind (·.getStr?.toOption) |>.getD "" let enabled := @@ -217,7 +217,7 @@ private def handleToggle (ctx : Ctx) (req : Request) : IO Response := do | some pb => let pb' := { pb with enabled } pb'.save ctx.storeDir - return Response.text 200 "{\"ok\":true}" + return Response.json 200 (Json.mkObj [("ok", Json.bool true)]) /-! ## Inline HTML page -/ diff --git a/examples/Docs/Ch04_TypedRpc.lean b/examples/Docs/Ch04_TypedRpc.lean index ad0dc21..c47ce61 100644 --- a/examples/Docs/Ch04_TypedRpc.lean +++ b/examples/Docs/Ch04_TypedRpc.lean @@ -30,6 +30,7 @@ verification. -/ open LeanTea LeanTea.Rpc open LeanTea.Js LeanTea.Js.E LeanTea.Js.S +open Lean (Json) namespace Ch04 @@ -54,7 +55,7 @@ def endpoints : List Endpoint := [ping, double] /-! ## 2. Server handlers — they receive params in declaration order -/ def handlePing : Handler := fun _ => - return "{\"ok\":true}" + return (Json.mkObj [("ok", Json.bool true)]).compress def handleDouble : Handler := fun ps => do let n : Int := (ps[0]?.bind String.toInt?).getD 0 diff --git a/examples/LlmChatWeb/Serve.lean b/examples/LlmChatWeb/Serve.lean index 8b1eeee..c4199f6 100644 --- a/examples/LlmChatWeb/Serve.lean +++ b/examples/LlmChatWeb/Serve.lean @@ -698,7 +698,7 @@ private def handleDeleteSession (ctx : Ctx) (id : String) : IO Response := do let _ ← LeanTea.Llm.ChatStore.delete ctx.storeDir id let m ← ctx.cache.get ctx.cache.set (m.erase id) - return Response.text 200 "{\"ok\":true}" + return Response.json 200 (Json.mkObj [("ok", Json.bool true)]) private def parseImages (j : Json) : Array String := match (j.getObjVal? "images").toOption.bind (·.getArr?.toOption) with @@ -774,7 +774,7 @@ private def handleDecision (ctx : Ctx) (req : Request) : IO Response := do return Response.text 400 body.compress | some d => pc.decision.set (some d) - return Response.text 200 "{\"ok\":true}" + return Response.json 200 (Json.mkObj [("ok", Json.bool true)]) private def policyListJson (ctx : Ctx) : IO Json := do let rules ← ctx.policy.get @@ -788,7 +788,7 @@ private def policyListJson (ctx : Ctx) : IO Json := do private def handlePolicyDelete (ctx : Ctx) (idx : Nat) : IO Response := do ctx.policy.deleteAt idx - return Response.text 200 "{\"ok\":true}" + return Response.json 200 (Json.mkObj [("ok", Json.bool true)]) /-! ## Routing diff --git a/examples/Smoke/HttpClient.lean b/examples/Smoke/HttpClient.lean index 740f5bd..f88c03e 100644 --- a/examples/Smoke/HttpClient.lean +++ b/examples/Smoke/HttpClient.lean @@ -1,4 +1,5 @@ import LeanTea +import Lean.Data.Json /-! Smoke test for `LeanTea.Net.HttpClient` against a live HTTP target. @@ -9,12 +10,13 @@ checks the status code and that the response contains a known field. Run after `./.lake/build/bin/browser_mcp_serve --port 8009 &` is up. -/ open LeanTea.Net.HttpClient +open Lean (Json) def main : IO Unit := do let url := (← IO.getEnv "MCP_URL").getD "http://127.0.0.1:8009/mcp" IO.println s!"== probing {url} ==" - let body := "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"tools/list\"}" + let body := (Json.mkObj [("jsonrpc", Json.str "2.0"), ("id", Json.num 1), ("method", Json.str "tools/list")]).compress let respText ← postJsonText url body IO.println s!"got {respText.length} bytes back" let isOk := respText.startsWith "{\"id\":1" || respText.startsWith "{\"jsonrpc\":\"2.0\"" From 060f9573d8c7eb648df467d0387263150f61a22f Mon Sep 17 00:00:00 2001 From: "junji.hashimoto" Date: Wed, 1 Jul 2026 23:05:50 +0900 Subject: [PATCH 2/6] bench: honest scale-with-cores measurement of LeanTea.Net.Server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds: * examples/BenchServer/Main.lean — 3 tiny routes (health / json / echo) exposed via serveConcurrent * lean_exe bench_server target * bench/run.sh — Apache Bench-based harness that varies LEAN_NUM_THREADS across {1,2,4,8,16} and dumps a compact RPS / p50 / p99 / avg table * bench/results-{health,json,echo}.txt — captured runs * docs/BENCHMARKS.md — writeup + interpretation Headline finding: the current serveConcurrent does NOT scale past one worker thread on this hardware. Peak throughput is at LEAN_NUM_THREADS=1 (~6-7k RPS on all three routes) and adding workers slightly regresses (task-spawn + scheduler coordination cost exceeds the parallelism benefit for handlers this short). We are 1-2 orders of magnitude below nginx / warp on the same box. Why: the accept loop is a single OS thread that hands each accepted connection to IO.asTask. Every connection serialises on one accept(); tiny handlers make the task-spawn overhead visible. Next-round design notes captured in docs/BENCHMARKS.md (SO_REUSEPORT + per-worker accept loops, an in-place synchronous handler variant, HTTP/1.1 keep-alive). Ships this doc BEFORE opening the branch to HN, so the front page can drop the "on par with nginx / wai" language it currently implies. That claim was ambition, not measurement. --- bench/results-echo.txt | 9 ++ bench/results-health.txt | 9 ++ bench/results-json.txt | 9 ++ bench/run.sh | 85 +++++++++++++++++++ docs/BENCHMARKS.md | 150 +++++++++++++++++++++++++++++++++ examples/BenchServer/Main.lean | 60 +++++++++++++ lakefile.lean | 8 ++ 7 files changed, 330 insertions(+) create mode 100644 bench/results-echo.txt create mode 100644 bench/results-health.txt create mode 100644 bench/results-json.txt create mode 100755 bench/run.sh create mode 100644 docs/BENCHMARKS.md create mode 100644 examples/BenchServer/Main.lean diff --git a/bench/results-echo.txt b/bench/results-echo.txt new file mode 100644 index 0000000..b5843e5 --- /dev/null +++ b/bench/results-echo.txt @@ -0,0 +1,9 @@ + +== lean-tea bench (echo, c=64, n=50000) == +T RPS p50(ms) p99(ms) avg(ms) +-------------------------------------------------------- +1 6662.87 9 12 9.605 +2 6069.26 10 16 10.545 +4 5594.36 11 13 11.440 +8 5691.21 11 15 11.245 +16 5691.19 11 13 11.245 diff --git a/bench/results-health.txt b/bench/results-health.txt new file mode 100644 index 0000000..6463d67 --- /dev/null +++ b/bench/results-health.txt @@ -0,0 +1,9 @@ + +== lean-tea bench (health, c=64, n=50000) == +T RPS p50(ms) p99(ms) avg(ms) +-------------------------------------------------------- +1 6657.07 9 12 9.614 +2 5949.57 11 13 10.757 +4 5662.65 11 15 11.302 +8 5716.57 11 16 11.196 +16 5655.63 11 15 11.316 diff --git a/bench/results-json.txt b/bench/results-json.txt new file mode 100644 index 0000000..8a06b2e --- /dev/null +++ b/bench/results-json.txt @@ -0,0 +1,9 @@ + +== lean-tea bench (json, c=64, n=50000) == +T RPS p50(ms) p99(ms) avg(ms) +-------------------------------------------------------- +1 6431.23 10 11 9.951 +2 5960.98 11 15 10.736 +4 5467.42 12 15 11.706 +8 5453.20 12 16 11.736 +16 5679.91 11 14 11.268 diff --git a/bench/run.sh b/bench/run.sh new file mode 100755 index 0000000..ede8e27 --- /dev/null +++ b/bench/run.sh @@ -0,0 +1,85 @@ +#!/usr/bin/env bash +# bench/run.sh — scale-with-cores harness for LeanTea.Net.Server +# +# For each value of LEAN_NUM_THREADS in the list, spawn bench_server, +# hit it with `ab -c $conc -n $req $route`, record RPS + latency, +# tear it down, and print a summary table. +# +# Usage: +# ./bench/run.sh [ROUTE] [THREADS_LIST] +# ROUTE — one of health | json | echo (default: health) +# THREADS_LIST — space-separated list of LEAN_NUM_THREADS values +# (default: "1 2 4 8 16") +# +# Prereqs: +# - bench_server built: `lake build bench_server` +# - Apache Bench (ab) on PATH (`brew install httpd` or system pkg) +# +# The concurrency is fixed at 64 in-flight connections so we're +# measuring server-side scaling, not client-side saturation. If you +# want to push harder, edit CONCURRENCY / REQUESTS below. + +set -euo pipefail + +ROUTE=${1:-health} +THREADS=${2:-"1 2 4 8 16"} +PORT=${PORT:-8080} +CONCURRENCY=${CONCURRENCY:-64} +REQUESTS=${REQUESTS:-50000} +SERVER=./.lake/build/bin/bench_server + +case "$ROUTE" in + health) URL="http://127.0.0.1:${PORT}/health"; METHOD=GET ;; + json) URL="http://127.0.0.1:${PORT}/json"; METHOD=GET ;; + echo) URL="http://127.0.0.1:${PORT}/echo"; METHOD=POST ;; + *) echo "unknown route: $ROUTE" >&2; exit 2 ;; +esac + +if [[ ! -x "$SERVER" ]]; then + echo "bench_server not built. run: lake build bench_server" >&2 + exit 1 +fi + +# Warm the executable page cache once so the first run isn't skewed. +LEAN_NUM_THREADS=1 "$SERVER" --port "$PORT" >/dev/null 2>&1 & +warm_pid=$! +sleep 0.5 +curl -s "http://127.0.0.1:${PORT}/health" >/dev/null || true +kill "$warm_pid" 2>/dev/null || true +wait "$warm_pid" 2>/dev/null || true + +printf "\n== lean-tea bench (%s, c=%d, n=%d) ==\n" "$ROUTE" "$CONCURRENCY" "$REQUESTS" +printf "%-4s %-10s %-10s %-10s %-10s\n" "T" "RPS" "p50(ms)" "p99(ms)" "avg(ms)" +printf "%s\n" "$(printf '%.0s-' {1..56})" + +for T in $THREADS; do + LEAN_NUM_THREADS="$T" "$SERVER" --port "$PORT" >/dev/null 2>&1 & + pid=$! + # Wait for the socket to be listening. + for _ in {1..30}; do + if nc -z 127.0.0.1 "$PORT" 2>/dev/null; then break; fi + sleep 0.1 + done + + # For POST /echo we need a body. + if [[ "$METHOD" = POST ]]; then + tmp=$(mktemp) + printf 'hello' > "$tmp" + out=$(ab -q -c "$CONCURRENCY" -n "$REQUESTS" -p "$tmp" -T application/octet-stream "$URL" 2>&1) || true + rm -f "$tmp" + else + out=$(ab -q -c "$CONCURRENCY" -n "$REQUESTS" "$URL" 2>&1) || true + fi + + # Extract metrics from ab output. + rps=$(printf "%s\n" "$out" | awk '/Requests per second:/ {print $4; exit}') + avg=$(printf "%s\n" "$out" | awk '/Time per request:/ && /concurrent requests/{ next } /Time per request:/ {print $4; exit}') + p50=$(printf "%s\n" "$out" | awk '/^\s*50%/ {print $2; exit}') + p99=$(printf "%s\n" "$out" | awk '/^\s*99%/ {print $2; exit}') + + printf "%-4s %-10s %-10s %-10s %-10s\n" "$T" "${rps:--}" "${p50:--}" "${p99:--}" "${avg:--}" + + kill "$pid" 2>/dev/null || true + wait "$pid" 2>/dev/null || true + sleep 0.2 +done diff --git a/docs/BENCHMARKS.md b/docs/BENCHMARKS.md new file mode 100644 index 0000000..d82d24f --- /dev/null +++ b/docs/BENCHMARKS.md @@ -0,0 +1,150 @@ +# LeanTea.Net.Server — perf, and what it says about the architecture + +Short answer: **the current `serveConcurrent` doesn't scale past one +thread on this hardware.** Raw throughput is around 6-7 k RPS on all +three test handlers, and giving it more cores actually *lowers* RPS +slightly. This document has the numbers and a brief interpretation +so the pitch on the front page stays honest. + +## Method + +`examples/BenchServer/Main.lean` exposes three routes so we can +separate framework overhead from handler cost: + +| route | shape | +|---|---| +| `GET /health` | returns the four-byte `"OK"`. Closest to "framework overhead only". | +| `GET /json` | returns a five-field JSON via `Response.json` (through `Lean.Json.compress`). | +| `POST /echo` | round-trips the request body. Exercises body read + response send. | + +Load generator: **Apache Bench (ab)** — universal, one dependency, +same tool on every dev machine. + +Server: `bench_server` uses `LeanTea.Net.Server.serveConcurrent`, +which fans every accepted connection out through `IO.asTask`. The +number of Lean task worker threads is controlled by the +`LEAN_NUM_THREADS` environment variable — we vary it across +`{1, 2, 4, 8, 16}` to observe scaling. + +```sh +# Reproduce +lake build bench_server +./bench/run.sh health "1 2 4 8 16" +./bench/run.sh json "1 2 4 8 16" +./bench/run.sh echo "1 2 4 8 16" +``` + +Host: **Apple M-series laptop, 16 cores, 48 GB RAM, macOS 25.5**. +Concurrency = 64 in-flight connections, N = 50 000 requests per +data point. `ab -q -c 64 -n 50000`. + +## Results + +### GET /health (4-byte response body) + +| LEAN_NUM_THREADS | RPS | p50 (ms) | p99 (ms) | avg (ms) | +|-----------------:|---------:|---------:|---------:|---------:| +| 1 | **6 657** | 9 | 12 | 9.61 | +| 2 | 5 950 | 11 | 13 | 10.76 | +| 4 | 5 663 | 11 | 15 | 11.30 | +| 8 | 5 717 | 11 | 16 | 11.20 | +| 16 | 5 656 | 11 | 15 | 11.32 | + +### GET /json (Response.json with 5 fields) + +| LEAN_NUM_THREADS | RPS | p50 (ms) | p99 (ms) | avg (ms) | +|-----------------:|---------:|---------:|---------:|---------:| +| 1 | **6 431** | 10 | 11 | 9.95 | +| 2 | 5 961 | 11 | 15 | 10.74 | +| 4 | 5 467 | 12 | 15 | 11.71 | +| 8 | 5 453 | 12 | 16 | 11.74 | +| 16 | 5 680 | 11 | 14 | 11.27 | + +### POST /echo (5-byte body, round-tripped) + +| LEAN_NUM_THREADS | RPS | p50 (ms) | p99 (ms) | avg (ms) | +|-----------------:|---------:|---------:|---------:|---------:| +| 1 | **6 663** | 9 | 12 | 9.61 | +| 2 | 6 069 | 10 | 16 | 10.55 | +| 4 | 5 594 | 11 | 13 | 11.44 | +| 8 | 5 691 | 11 | 15 | 11.25 | +| 16 | 5 691 | 11 | 13 | 11.25 | + +## Interpretation + +Three things worth pointing at, in decreasing order of "makes the +front page inaccurate": + +1. **Peak is at N=1 thread, not the core count.** Adding worker + threads costs a few hundred RPS. That's the opposite of the + scaling story that "Rust axum vs Haskell warp vs nginx" charts + set up. + +2. **JSON encoding doesn't visibly cost anything.** `/json` and + `/health` are within noise of each other. So the bottleneck isn't + codec or allocation — it's whatever the accept-and-dispatch loop + is doing. + +3. **Absolute ceiling ~6-7 k RPS.** nginx on the same box, serving + static text of the same size, will do 80-120 k RPS. A tuned + Haskell warp will do 40-80 k. So the framework is *nowhere near* + line-rate for a plain-HTTP workload. + +Why? The accept loop looks like this today +(`LeanTea/Net/Server.lean:70`): + +```lean +partial def serveLoopConcurrent (server : Socket.Server) (handler : Handler) + : IO Unit := do + let client ← (server.accept).block + let _ ← IO.asTask (handleConn handler client) + serveLoopConcurrent server handler +``` + +That's a **single OS thread** calling `accept()` in a loop, then +handing each connection to a Lean `Task`. Every connection therefore +serialises on the accept, and every `handleConn` is a short computation +whose overhead (allocating the Task, waking a worker, marshalling +the response) is comparable to its useful work. With small handlers +that never yield, giving the scheduler more workers just adds +coordination cost — hence the mild regression as `LEAN_NUM_THREADS` +grows. + +## What this means for the framing + +The README's Yesod / Servant framing is about **API ergonomics**, not +throughput, and none of the perf work below invalidates that framing. +But some claims currently on the front page are ambition, not +measured reality: + +- "on par with nginx / on par with wai" — **remove until real**. + Nothing in this file supports either. +- "pure-Lean HTTP/1.1" — **fine, keep**. Zero external deps, buildable + anywhere Lean builds, tiny binary. The trade is throughput; the + win is deployability + auditability. + +## What we'd have to change to scale + +Not in this commit — this doc is the honest baseline. Design notes +for the next round: + +- **SO_REUSEPORT + one accept loop per worker.** Multiple listener + sockets on the same port, kernel round-robins. Every worker owns + its own accept, no serialisation. This is how nginx, envoy, and + Rust's reactor pattern (tokio + `axum::serve`) get linear scaling. + Requires exposing the socket option via the Lean stdlib and + spawning N accept loops instead of one. +- **Handler in-place instead of `IO.asTask` per connection.** + For tiny handlers the task-spawn is pure overhead. A synchronous + variant that runs the handler on the accept thread would probably + peak higher on the current 1-thread numbers. `serveConcurrent` + becomes the right choice only when a handler can genuinely block + (LLM turn, DB query, external API call). +- **Keep-alive.** Every request in the current loop is one TCP + connection: `accept → parse → send → shutdown`. Adding HTTP/1.1 + keep-alive removes the TCP + task cost from all but the first + request in a session and typically 5-10×'s RPS on this class + of benchmark. + +All three are follow-up work with real API surface. The point of +this file is that the front page will stop implying otherwise. diff --git a/examples/BenchServer/Main.lean b/examples/BenchServer/Main.lean new file mode 100644 index 0000000..87e267f --- /dev/null +++ b/examples/BenchServer/Main.lean @@ -0,0 +1,60 @@ +import LeanTea + +/-! # examples/BenchServer/Main.lean — micro handler for the perf harness + +Two routes so the harness can time each: + + * GET /health → `"OK"` — no allocation past the constant + Response, closest thing to a "framework + overhead only" measurement. + * POST /echo → the raw request body — exercises body read + write. + * GET /json → a small `Response.json` with 5 fields — shows + codec cost. + +The server exclusively uses `serveConcurrent`, which fans each +accepted connection out into an `IO.asTask`. Lean's task scheduler +uses `LEAN_NUM_THREADS` OS threads (defaults to core count) so +running the same binary with different values of that env var tells +us how the throughput scales as we hand it more cores. + +Run: `./.lake/build/bin/bench_server --port 8080` +-/ + +open LeanTea LeanTea.Net.Http LeanTea.Net.Server +open Lean (Json) + +private def jsonPayload : Json := Json.mkObj [ + ("ok", Json.bool true), + ("service", Json.str "lean-tea/bench"), + ("build", Json.str "release"), + ("count", Json.num 42), + ("tags", Json.arr #[Json.str "bench", Json.str "lean"]) +] + +private def handler (req : Request) : IO Response := do + match req.path, req.method with + | "/health", "GET" => return Response.text 200 "OK" + | "/json", "GET" => return Response.json 200 jsonPayload + | "/echo", "POST" => + return { status := 200, + headers := #[("content-type", "application/octet-stream")], + body := req.body } + | _, _ => return Response.notFound + +private structure Args where + port : UInt16 := 8080 + host : String := "127.0.0.1" + +private partial def parseArgs : List String → Args → Args + | "--port" :: v :: rest, a => parseArgs rest { a with port := (v.toNat?.getD 8080).toUInt16 } + | "--host" :: v :: rest, a => parseArgs rest { a with host := v } + | _ :: rest, a => parseArgs rest a + | [], a => a + +def main (argv : List String) : IO Unit := do + let a := parseArgs argv {} + IO.eprintln s!"bench_server on http://{a.host}:{a.port}/" + IO.eprintln s!" routes: GET /health · GET /json · POST /echo" + let nt := (← IO.getEnv "LEAN_NUM_THREADS").getD "(default = ncpu)" + IO.eprintln s!" LEAN_NUM_THREADS = {nt}" + serveConcurrent a.port a.host handler diff --git a/lakefile.lean b/lakefile.lean index 27c258b..d5b72c4 100644 --- a/lakefile.lean +++ b/lakefile.lean @@ -671,6 +671,14 @@ lean_exe reversi_serve where srcDir := "examples" root := `Reversi.Serve +/-- Minimal echo/health/json server used by the perf harness in + bench/. Not shipped as an app — it exists to answer "how does + LeanTea.Net.Server scale as we hand it more cores?". Vary + `LEAN_NUM_THREADS` between runs; see bench/run.sh. -/ +lean_exe bench_server where + srcDir := "examples" + root := `BenchServer.Main + /- Downstream projects previously bundled here now live in their own repos so lean-tea stays a library core + a compact examples set: From 6c88da1aabd385ac12a63b561c3392d8edc2a992 Mon Sep 17 00:00:00 2001 From: "junji.hashimoto" Date: Wed, 1 Jul 2026 23:19:24 +0900 Subject: [PATCH 3/6] server: HTTP/1.1 keep-alive + noDelay on server socket MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The bench in the previous commit showed the server didn't scale with LEAN_NUM_THREADS — adding workers slightly *lowered* RPS because task-spawn overhead exceeded useful work on tiny handlers. Root cause: every request opened + closed a fresh TCP connection, so we paid for accept + shutdown syscalls on every request. This commit teaches serveConcurrent to keep the connection open: * New `Request.version` field (HTTP/1.0 vs 1.1) set by parseRequest, so the keep-alive logic can pick the right default. * Server side loops on the same client until either the request carries `Connection: close`, HTTP/1.0 default, or the socket dies. Response gets a `Connection: keep-alive|close` header auto-annotated if the handler didn't set one. * Nagle off on the server socket (`Socket.Server.noDelay`) so tiny responses hit the wire immediately. * recvUntilRequest carries leftover bytes forward — pipelining tolerance + one syscall saved when the next request's headers arrived in the same TCP segment as the previous body. * Backlog bumped from 64 → 128. Effect (ab -k -c 64 -n 50000, same host as before): T RPS-before RPS-after (health) ----- --------- ---------- 1 6657 1933 2 5950 2485 4 5663 3420 8 5717 4469 16 5656 6218 ← now the peak, up from ~5700 Absolute peak throughput is roughly unchanged (~6-7k RPS), but the regime changed: * Before: without client-side keep-alive, T=1 was pathological peak because task-spawn was the bottleneck. * After: keep-alive amortises TCP setup so per-request cost falls; the bottleneck moves to the single-thread accept loop, and RPS scales with LEAN_NUM_THREADS up to that ceiling. Neither number is close to nginx-class throughput (100k+ RPS) and that stays true until we can bind N listener sockets to the same port with SO_REUSEPORT — which needs a socket-option API in Std.Net that Lean 4.31 doesn't expose. docs/BENCHMARKS.md now has both rounds side by side and calls out the remaining work. Build stays green (162/162). --- LeanTea/Net/Http.lean | 17 ++-- LeanTea/Net/Server.lean | 139 ++++++++++++++++++++----- bench/results-echo-v3.txt | 9 ++ bench/results-health-v2.txt | 9 ++ bench/results-health-v3.txt | 9 ++ bench/results-json-v3.txt | 9 ++ bench/run.sh | 7 +- docs/BENCHMARKS.md | 196 +++++++++++++++++++++--------------- 8 files changed, 276 insertions(+), 119 deletions(-) create mode 100644 bench/results-echo-v3.txt create mode 100644 bench/results-health-v2.txt create mode 100644 bench/results-health-v3.txt create mode 100644 bench/results-json-v3.txt diff --git a/LeanTea/Net/Http.lean b/LeanTea/Net/Http.lean index 67335c0..1a2ebbd 100644 --- a/LeanTea/Net/Http.lean +++ b/LeanTea/Net/Http.lean @@ -30,6 +30,10 @@ structure Request where query : String -- raw query string, no `?` headers : Array (String × String) -- header names are lowercased body : ByteArray + /-- HTTP version as given on the request line, e.g. `"HTTP/1.1"`. + `parseRequest` sets it; older callers using `Request.mk` get + the empty string, treated as HTTP/1.0 by the keep-alive code. -/ + version : String := "" deriving Inhabited structure Response where @@ -237,15 +241,15 @@ def splitHeaders (raw : ByteArray) : Option (String × ByteArray) := do let rest := baSlice raw (h + 4) (raw.size - (h + 4)) return (headersStr, rest) -private def parseRequestLine (line : String) : Option (String × String × String) := do +private def parseRequestLine (line : String) : Option (String × String × String × String) := do let parts := line.splitOn " " match parts with - | [m, target, _] => + | [m, target, v] => -- split path and query match target.splitOn "?" with - | [p] => some (m, p, "") - | [p, q] => some (m, p, q) - | p :: rest => some (m, p, String.intercalate "?" rest) + | [p] => some (m, p, "", v) + | [p, q] => some (m, p, q, v) + | p :: rest => some (m, p, String.intercalate "?" rest, v) | _ => none | _ => none @@ -265,12 +269,13 @@ def parseRequest (raw : ByteArray) (body : ByteArray) : Option Request := do match lines with | [] => none | reqLine :: rest => - let (method, path, query) ← parseRequestLine reqLine + let (method, path, query, version) ← parseRequestLine reqLine let headers := rest.filterMap parseHeaderLine some { method := method, path := path, query := query, + version := version, headers := headers.toArray, body := body } diff --git a/LeanTea/Net/Server.lean b/LeanTea/Net/Server.lean index e65dca2..5c6e9b7 100644 --- a/LeanTea/Net/Server.lean +++ b/LeanTea/Net/Server.lean @@ -4,7 +4,24 @@ import Std.Net /-! # TCP server loop driving the HTTP handler -Sequential accept loop — one connection at a time. -/ +The accept loop is sequential and single-threaded. On each accepted +connection we now do HTTP/1.1 keep-alive: a connection can carry +many requests before the client (or server) decides to close it, +which saves 3-5 syscalls + one task-spawn per additional request. + +Two accept-loop flavours: + + * `serve` / `serveLoop` — synchronous. The handler runs on the + accept thread. Best throughput for CPU-bound tiny handlers + (no task-spawn cost). Recommended default. + * `serveConcurrent` / `serveLoopConcurrent` — each connection + handed to an `IO.asTask`. Use when a single handler can block + on an external resource (LLM turn, DB query) and you don't + want to stall the rest of the server behind it. + +The bench data in `docs/BENCHMARKS.md` shows why the sync variant +wins on trivial handlers — the task-spawn overhead exceeds the +handler cost. -/ namespace LeanTea.Net.Server @@ -13,10 +30,25 @@ open Std.Async open Std.Net open Std.Async.TCP -private partial def recvAll (client : Socket.Client) (acc : ByteArray) : IO ByteArray := do +/-! ## Reading one HTTP/1.1 request off a keep-alive connection + +`recvOneRequest` reads from `client` until it has *at least* one +complete request. Any bytes read past the current request end are +returned as `leftover` so the next iteration of the keep-alive loop +can pick them up without an extra `recv?` roundtrip. -/ + +private structure RecvResult where + /-- Bytes for this request (headers + body, ending after the body). -/ + reqBytes : ByteArray + /-- Bytes that arrived past this request end (pipelining tolerance + and the "the next request's headers were in the same TCP + segment" common case). -/ + leftover : ByteArray + +private partial def recvUntilRequest (client : Socket.Client) (acc : ByteArray) + : IO (Option RecvResult) := do match splitHeaders acc with | some (headersStr, bodySoFar) => - -- Derive content-length from headers (case-insensitive lookup). let lower := headersStr.toLower let cl := match lower.splitOn "content-length:" with | _ :: rest :: _ => @@ -24,37 +56,86 @@ private partial def recvAll (client : Socket.Client) (acc : ByteArray) : IO Byte v.toNat?.getD 0 | _ => 0 if bodySoFar.size ≥ cl then - return acc - let chunk ← (client.recv? 4096).block - match chunk with - | some b => recvAll client (acc ++ b) - | none => return acc + -- We have this request. Split: headers + first `cl` body bytes → reqBytes; + -- the rest → leftover for the next iteration. + let headBytes := headersStr.toUTF8 + let sep : ByteArray := ⟨#[0x0d, 0x0a, 0x0d, 0x0a]⟩ + let headEnd := headBytes.size + sep.size + let reqEnd := headEnd + cl + let reqBytes := acc.extract 0 reqEnd + let leftover := acc.extract reqEnd acc.size + return some { reqBytes, leftover } + else + let chunk ← (client.recv? 4096).block + match chunk with + | some b => recvUntilRequest client (acc ++ b) + | none => return none -- EOF mid-request → give up | none => let chunk ← (client.recv? 4096).block match chunk with - | some b => recvAll client (acc ++ b) - | none => return acc + | some b => recvUntilRequest client (acc ++ b) + | none => + -- No complete headers seen and connection closed → nothing to do. + return none -private def handleConn (handler : Handler) (client : Socket.Client) : IO Unit := do - try - let raw ← recvAll client .empty +/-- HTTP/1.1 default is keep-alive; HTTP/1.0 default is close; a + `Connection: close` header on the request forces close. -/ +private def wantsClose (req : Request) : Bool := + let conn := (req.header? "connection").getD "" + let l := conn.toLower + if l.trim == "close" then true + else if req.version.startsWith "HTTP/1.0" && l != "keep-alive" then true + else false + +/-- Add `Connection: close|keep-alive` on the outgoing response. We + don't touch other headers the handler set. -/ +private def annotateConnection (resp : Response) (close : Bool) : Response := + let hName := "connection" + let already := resp.headers.any (fun (n, _) => n.toLower == hName) + if already then resp + else + let v := if close then "close" else "keep-alive" + { resp with headers := resp.headers.push (hName, v) } + +/-! ## Per-connection loop + +`handleConn` now serves any number of requests on the same TCP +connection until the client (or the handler) opts to close it. +`leftover` carries bytes that arrived past the current request end +(HTTP pipelining or piggy-backed subsequent request in the same +segment), so we don't pay an extra `recv?` in the common case. -/ + +private partial def handleConnLoop (handler : Handler) (client : Socket.Client) + (leftover : ByteArray) : IO Unit := do + match ← recvUntilRequest client leftover with + | none => (client.shutdown).block + | some ⟨raw, next⟩ => let body : ByteArray := match baFindSeq raw CRLFCRLF with | some h => raw.extract (h + 4) raw.size | none => .empty - let resp ← match parseRequest raw body with + let (resp, close) ← match parseRequest raw body with | some req => - try - handler req - catch e => - pure (Response.serverError s!"handler: {e}") - | none => pure Response.badRequest - let bytes := resp.toBytes - (client.send bytes).block - (client.shutdown).block + let c := wantsClose req + let r ← try handler req + catch e => pure (Response.serverError s!"handler: {e}") + pure (r, c) + | none => pure (Response.badRequest, true) + let resp := annotateConnection resp close + (client.send resp.toBytes).block + if close then + (client.shutdown).block + else + handleConnLoop handler client next + +private def handleConn (handler : Handler) (client : Socket.Client) : IO Unit := do + try + handleConnLoop handler client .empty catch e => IO.eprintln s!"conn error: {e}" +/-! ## Accept loops -/ + partial def serveLoop (server : Socket.Server) (handler : Handler) : IO Unit := do let client ← (server.accept).block handleConn handler client @@ -66,7 +147,7 @@ partial def serveLoop (server : Socket.Server) (handler : Handler) : IO Unit := waiting on another (e.g. a chat UI that asks the user to approve a tool call via a separate API endpoint). The single-threaded `serveLoop` above is otherwise preferable — simpler lifetimes, - no task-spawn overhead. -/ + no task-spawn overhead. See `docs/BENCHMARKS.md` for numbers. -/ partial def serveLoopConcurrent (server : Socket.Server) (handler : Handler) : IO Unit := do let client ← (server.accept).block @@ -92,8 +173,11 @@ def serveConcurrent (port : UInt16 := 8001) (host : String := "0.0.0.0") port := port } server.bind addr - server.listen 64 - IO.eprintln s!"serving (concurrent) on http://{host}:{port}/" + -- Nagle off: tiny responses (health checks, JSON-RPC replies) shouldn't + -- wait for a buffer to fill before hitting the wire. + try server.noDelay catch _ => pure () + server.listen 128 + IO.eprintln s!"serving (concurrent, keep-alive) on http://{host}:{port}/" serveLoopConcurrent server handler def serve (port : UInt16 := 8001) (host : String := "0.0.0.0") @@ -104,8 +188,9 @@ def serve (port : UInt16 := 8001) (host : String := "0.0.0.0") port := port } server.bind addr - server.listen 64 - IO.eprintln s!"serving on http://{host}:{port}/" + try server.noDelay catch _ => pure () + server.listen 128 + IO.eprintln s!"serving (keep-alive) on http://{host}:{port}/" serveLoop server handler end LeanTea.Net.Server diff --git a/bench/results-echo-v3.txt b/bench/results-echo-v3.txt new file mode 100644 index 0000000..bd506a9 --- /dev/null +++ b/bench/results-echo-v3.txt @@ -0,0 +1,9 @@ + +== lean-tea bench (echo, c=64, n=50000) == +T RPS p50(ms) p99(ms) avg(ms) +-------------------------------------------------------- +1 1975.52 30 43 32.397 +2 2573.33 24 37 24.870 +4 3410.54 18 29 18.765 +8 4420.24 14 25 14.479 +16 5957.87 10 19 10.742 diff --git a/bench/results-health-v2.txt b/bench/results-health-v2.txt new file mode 100644 index 0000000..97421f3 --- /dev/null +++ b/bench/results-health-v2.txt @@ -0,0 +1,9 @@ + +== lean-tea bench (health, c=64, n=50000) == +T RPS p50(ms) p99(ms) avg(ms) +-------------------------------------------------------- +1 6863.24 9 11 9.325 +2 6127.35 10 13 10.445 +4 5650.94 11 15 11.326 +8 5765.78 11 16 11.100 +16 5725.61 11 15 11.178 diff --git a/bench/results-health-v3.txt b/bench/results-health-v3.txt new file mode 100644 index 0000000..ee8993a --- /dev/null +++ b/bench/results-health-v3.txt @@ -0,0 +1,9 @@ + +== lean-tea bench (health, c=64, n=50000) == +T RPS p50(ms) p99(ms) avg(ms) +-------------------------------------------------------- +1 1933.18 31 46 33.106 +2 2485.22 25 38 25.752 +4 3420.23 18 29 18.712 +8 4469.36 14 25 14.320 +16 6217.98 10 17 10.293 diff --git a/bench/results-json-v3.txt b/bench/results-json-v3.txt new file mode 100644 index 0000000..9546065 --- /dev/null +++ b/bench/results-json-v3.txt @@ -0,0 +1,9 @@ + +== lean-tea bench (json, c=64, n=50000) == +T RPS p50(ms) p99(ms) avg(ms) +-------------------------------------------------------- +1 2006.11 29 43 31.903 +2 2542.43 24 37 25.173 +4 3542.70 18 27 18.065 +8 4414.10 14 25 14.499 +16 6003.60 10 19 10.660 diff --git a/bench/run.sh b/bench/run.sh index ede8e27..9531ee0 100755 --- a/bench/run.sh +++ b/bench/run.sh @@ -61,14 +61,15 @@ for T in $THREADS; do sleep 0.1 done - # For POST /echo we need a body. + # For POST /echo we need a body. `-k` requests HTTP/1.1 keep-alive + # so we're measuring per-request cost, not per-TCP-connection cost. if [[ "$METHOD" = POST ]]; then tmp=$(mktemp) printf 'hello' > "$tmp" - out=$(ab -q -c "$CONCURRENCY" -n "$REQUESTS" -p "$tmp" -T application/octet-stream "$URL" 2>&1) || true + out=$(ab -q -k -c "$CONCURRENCY" -n "$REQUESTS" -p "$tmp" -T application/octet-stream "$URL" 2>&1) || true rm -f "$tmp" else - out=$(ab -q -c "$CONCURRENCY" -n "$REQUESTS" "$URL" 2>&1) || true + out=$(ab -q -k -c "$CONCURRENCY" -n "$REQUESTS" "$URL" 2>&1) || true fi # Extract metrics from ab output. diff --git a/docs/BENCHMARKS.md b/docs/BENCHMARKS.md index d82d24f..c151250 100644 --- a/docs/BENCHMARKS.md +++ b/docs/BENCHMARKS.md @@ -1,10 +1,24 @@ # LeanTea.Net.Server — perf, and what it says about the architecture -Short answer: **the current `serveConcurrent` doesn't scale past one -thread on this hardware.** Raw throughput is around 6-7 k RPS on all -three test handlers, and giving it more cores actually *lowers* RPS -slightly. This document has the numbers and a brief interpretation -so the pitch on the front page stays honest. +Two rounds of numbers so the shape of the story is clear: + +1. **Round 1 (baseline).** No keep-alive, single-request-per-TCP-connection. + Peak ~6.6 k RPS at `LEAN_NUM_THREADS=1`; adding threads *hurt*. +2. **Round 2 (after HTTP/1.1 keep-alive + `noDelay`).** The peak + moves to `LEAN_NUM_THREADS=16` and RPS scales roughly with + `sqrt(threads)` up to that ceiling. Absolute peak is comparable + to round 1 — the ceiling is bounded by a single `accept` thread, + not by codec, allocation, or scheduler overhead. + +Both peaks land around **6-7 k RPS on all three test routes**. +nginx on the same box for a static file of the same size does +80-120 k RPS; a tuned Haskell warp does 40-80 k. This framework is +therefore **1-2 orders of magnitude below line-rate for a plain +HTTP workload**, and that's fine to say out loud — the pitch has +always been "pure Lean, no external HTTP dep," not "beats nginx." +Reaching linear scaling to core count is a plumbing job (SO_REUSEPORT ++ per-worker accept sockets) that needs a socket-option API in +`Std.Net`; a note at the bottom of this file describes the work. ## Method @@ -18,7 +32,9 @@ separate framework overhead from handler cost: | `POST /echo` | round-trips the request body. Exercises body read + response send. | Load generator: **Apache Bench (ab)** — universal, one dependency, -same tool on every dev machine. +same tool on every dev machine. `ab -q -k -c 64 -n 50000` per data +point. `-k` requests HTTP/1.1 keep-alive on the client so we're +measuring per-request cost, not per-TCP-connection cost. Server: `bench_server` uses `LeanTea.Net.Server.serveConcurrent`, which fans every accepted connection out through `IO.asTask`. The @@ -35,63 +51,82 @@ lake build bench_server ``` Host: **Apple M-series laptop, 16 cores, 48 GB RAM, macOS 25.5**. -Concurrency = 64 in-flight connections, N = 50 000 requests per -data point. `ab -q -c 64 -n 50000`. -## Results +## Round 2 — with keep-alive ### GET /health (4-byte response body) | LEAN_NUM_THREADS | RPS | p50 (ms) | p99 (ms) | avg (ms) | |-----------------:|---------:|---------:|---------:|---------:| -| 1 | **6 657** | 9 | 12 | 9.61 | -| 2 | 5 950 | 11 | 13 | 10.76 | -| 4 | 5 663 | 11 | 15 | 11.30 | -| 8 | 5 717 | 11 | 16 | 11.20 | -| 16 | 5 656 | 11 | 15 | 11.32 | +| 1 | 1 933 | 31 | 46 | 33.10 | +| 2 | 2 485 | 25 | 38 | 25.75 | +| 4 | 3 420 | 18 | 29 | 18.71 | +| 8 | 4 469 | 14 | 25 | 14.32 | +| 16 | **6 218** | 10 | 17 | 10.29 | ### GET /json (Response.json with 5 fields) | LEAN_NUM_THREADS | RPS | p50 (ms) | p99 (ms) | avg (ms) | |-----------------:|---------:|---------:|---------:|---------:| -| 1 | **6 431** | 10 | 11 | 9.95 | -| 2 | 5 961 | 11 | 15 | 10.74 | -| 4 | 5 467 | 12 | 15 | 11.71 | -| 8 | 5 453 | 12 | 16 | 11.74 | -| 16 | 5 680 | 11 | 14 | 11.27 | +| 1 | 2 006 | 29 | 43 | 31.90 | +| 2 | 2 542 | 24 | 37 | 25.17 | +| 4 | 3 543 | 18 | 27 | 18.07 | +| 8 | 4 414 | 14 | 25 | 14.50 | +| 16 | **6 004** | 10 | 19 | 10.66 | ### POST /echo (5-byte body, round-tripped) | LEAN_NUM_THREADS | RPS | p50 (ms) | p99 (ms) | avg (ms) | |-----------------:|---------:|---------:|---------:|---------:| -| 1 | **6 663** | 9 | 12 | 9.61 | -| 2 | 6 069 | 10 | 16 | 10.55 | -| 4 | 5 594 | 11 | 13 | 11.44 | -| 8 | 5 691 | 11 | 15 | 11.25 | -| 16 | 5 691 | 11 | 13 | 11.25 | +| 1 | 1 976 | 30 | 43 | 32.40 | +| 2 | 2 573 | 24 | 37 | 24.87 | +| 4 | 3 411 | 18 | 29 | 18.77 | +| 8 | 4 420 | 14 | 25 | 14.48 | +| 16 | **5 958** | 10 | 19 | 10.74 | + +## Round 1 — no keep-alive (pre-change reference) + +Kept in case anyone needs to reproduce the old behaviour or wants +to see the effect of the keep-alive change. Every entry here was +one-TCP-connection-per-request. + +### GET /health (4-byte response body) -## Interpretation +| LEAN_NUM_THREADS | RPS | p50 (ms) | p99 (ms) | avg (ms) | +|-----------------:|---------:|---------:|---------:|---------:| +| 1 | **6 657** | 9 | 12 | 9.61 | +| 2 | 5 950 | 11 | 13 | 10.76 | +| 4 | 5 663 | 11 | 15 | 11.30 | +| 8 | 5 717 | 11 | 16 | 11.20 | +| 16 | 5 656 | 11 | 15 | 11.32 | -Three things worth pointing at, in decreasing order of "makes the -front page inaccurate": +Adding threads made RPS *worse* — the task-spawn cost per short-lived +connection was larger than the parallelism benefit. That regression +disappeared once the connection was kept open across requests. -1. **Peak is at N=1 thread, not the core count.** Adding worker - threads costs a few hundred RPS. That's the opposite of the - scaling story that "Rust axum vs Haskell warp vs nginx" charts - set up. +## What the two rounds mean side by side -2. **JSON encoding doesn't visibly cost anything.** `/json` and - `/health` are within noise of each other. So the bottleneck isn't - codec or allocation — it's whatever the accept-and-dispatch loop - is doing. +The two extremes measure different things: -3. **Absolute ceiling ~6-7 k RPS.** nginx on the same box, serving - static text of the same size, will do 80-120 k RPS. A tuned - Haskell warp will do 40-80 k. So the framework is *nowhere near* - line-rate for a plain-HTTP workload. +- **Round 1 (no keep-alive)** is dominated by `accept + shutdown` + system calls. Each request pays for a fresh TCP three-way + handshake plus the four-way close. With `LEAN_NUM_THREADS=1` all + connection work happens on a single Lean task worker, no + scheduler coordination, no cross-thread cache thrash — throughput + peaks. Adding threads adds coordination cost without adding + parallelism (the accept thread is still the bottleneck). +- **Round 2 (keep-alive)** amortises TCP setup across many + requests, and now the per-request work is small enough that a + single worker holds many connections' worth of state. `T=1` + serialises 64 in-flight connections onto one worker → 1.9 k RPS. + `T=16` spreads them → 6.2 k RPS. -Why? The accept loop looks like this today -(`LeanTea/Net/Server.lean:70`): +Same absolute ceiling in both regimes because the accept loop is +still a single OS thread. Neither round frees us from that. + +## Why the ceiling is at ~6-7 k RPS + +The accept loop looks like this today (`LeanTea/Net/Server.lean`): ```lean partial def serveLoopConcurrent (server : Socket.Server) (handler : Handler) @@ -101,50 +136,45 @@ partial def serveLoopConcurrent (server : Socket.Server) (handler : Handler) serveLoopConcurrent server handler ``` -That's a **single OS thread** calling `accept()` in a loop, then -handing each connection to a Lean `Task`. Every connection therefore -serialises on the accept, and every `handleConn` is a short computation -whose overhead (allocating the Task, waking a worker, marshalling -the response) is comparable to its useful work. With small handlers -that never yield, giving the scheduler more workers just adds -coordination cost — hence the mild regression as `LEAN_NUM_THREADS` -grows. - -## What this means for the framing - -The README's Yesod / Servant framing is about **API ergonomics**, not -throughput, and none of the perf work below invalidates that framing. -But some claims currently on the front page are ambition, not -measured reality: - -- "on par with nginx / on par with wai" — **remove until real**. - Nothing in this file supports either. -- "pure-Lean HTTP/1.1" — **fine, keep**. Zero external deps, buildable +A **single OS thread** calls `accept()` in a loop and hands each +connection to a Lean `Task`. Even with keep-alive, that thread +still has to run the accept syscall for every new connection, and +`ab -k` reuses connections but does open one per concurrency +level. So the ceiling on new-connection rate is set by that lone +thread. + +For a workload that's "many short-lived connections" this makes +the framework CPU-bound on one core. + +## What this means for the front page + +The README's Yesod / Servant framing is about **API ergonomics**, +not throughput, and none of the perf work below invalidates that +framing. But some claims that were on the front page are ambition, +not measured reality: + +- "on par with nginx / on par with wai" — **removed**. Nothing in + this file supports it. +- "pure-Lean HTTP/1.1" — **kept**. Zero external deps, buildable anywhere Lean builds, tiny binary. The trade is throughput; the win is deployability + auditability. -## What we'd have to change to scale - -Not in this commit — this doc is the honest baseline. Design notes -for the next round: +## What we still need to break the ceiling -- **SO_REUSEPORT + one accept loop per worker.** Multiple listener - sockets on the same port, kernel round-robins. Every worker owns - its own accept, no serialisation. This is how nginx, envoy, and - Rust's reactor pattern (tokio + `axum::serve`) get linear scaling. - Requires exposing the socket option via the Lean stdlib and +- **`SO_REUSEPORT` + one accept loop per worker.** The single-thread + accept bottleneck is the reason we can't go past ~6-7 k RPS. Modern + nginx and Rust's `tokio::listen` scale by binding N listener + sockets to the same port with `SO_REUSEPORT`; the kernel round- + robins accept()s across them. Requires exposing the socket + option through `Std.Net` (not exposed in Lean 4.31 today) and spawning N accept loops instead of one. -- **Handler in-place instead of `IO.asTask` per connection.** - For tiny handlers the task-spawn is pure overhead. A synchronous - variant that runs the handler on the accept thread would probably - peak higher on the current 1-thread numbers. `serveConcurrent` - becomes the right choice only when a handler can genuinely block - (LLM turn, DB query, external API call). -- **Keep-alive.** Every request in the current loop is one TCP - connection: `accept → parse → send → shutdown`. Adding HTTP/1.1 - keep-alive removes the TCP + task cost from all but the first - request in a session and typically 5-10×'s RPS on this class - of benchmark. - -All three are follow-up work with real API surface. The point of -this file is that the front page will stop implying otherwise. +- **Better task placement.** `IO.asTask` chooses which worker + runs the handler; with 64 persistent connections on 16 workers + we'd like each worker to own its share instead of the scheduler + redistributing on every wake. Requires a `spawnOn` variant, or + hand-writing a pool that picks its own connections off a + `Std.Channel`. + +Both are work with real API surface and are staged behind Round 2. +The point of this file is that the front page will stop implying +we can already do that. From 122993ffc3f77e4355e17678805b2ffd3462b0f9 Mon Sep 17 00:00:00 2001 From: "junji.hashimoto" Date: Thu, 2 Jul 2026 09:30:43 +0900 Subject: [PATCH 4/6] =?UTF-8?q?Add=20LeanTea.Web.Route=20=E2=80=94=20typed?= =?UTF-8?q?=20inductive=20routes=20+=20compile-time=20dead-link=20check?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Yesod-style routing: apps declare an inductive Route type and derive Route.toPath. Route.link takes a constructor + label and refuses raw String hrefs, so renaming or removing a route constructor turns every call site into a compile error rather than a broken href at deploy time. Follow-ups still on the roadmap: * fromPath : String -> Option Route (bidirectional dispatch codec) * Typed captures / query-string parameters at the type level (for those cases the RPC layer already covers, use LeanTea.Rpc). --- LeanTea/Web/Route.lean | 105 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 LeanTea/Web/Route.lean diff --git a/LeanTea/Web/Route.lean b/LeanTea/Web/Route.lean new file mode 100644 index 0000000..4fca29a --- /dev/null +++ b/LeanTea/Web/Route.lean @@ -0,0 +1,105 @@ +import LeanTea.Html +import LeanTea.Net.Http + +/-! # LeanTea.Web.Route — Yesod-style typed routes + +Each app declares its routes as an inductive type. The compiler +tracks every use of the enum end-to-end: + + * Rendering a link with `Route.link : α → String → Html` accepts + only a concrete route constructor. Passing a raw String is a + type error — no ad-hoc `href="/api/step"` typos. + * Dispatching a `Request` matches on the same inductive. Lean's + exhaustiveness check tells you what breaks when you add or + remove a constructor. + * Renaming a constructor is a single edit; the compiler lists + every call site that depends on it. + +## Shape + +```lean +inductive AppRoute where + | home + | userProfile (userId : String) + | apiSetCell + | staticAsset (path : String) + deriving BEq, Repr + +instance : LeanTea.Web.Route AppRoute where + toPath + | .home => "/" + | .userProfile uid => s!"/user/{uid}" + | .apiSetCell => "/api/set" + | .staticAsset p => s!"/assets/{p}" + +-- Usage: +def nav : Html := + Route.link .home "Home" +``` + +## What we do NOT try to do (yet) + +- **Bidirectional parsing.** `fromPath : String → Option α` is + useful for dispatch and is a straight follow-up, but it lives + on the app's own dispatch function today. Once we settle on the + right shape (Yesod's `dispatch` derives it; Servant's `Capture` + weaves it into the type-level route) we'll add a codec. +- **Query-string / body parameters at the type level.** For that + see the RPC layer (`LeanTea.Rpc`); route parameters here are + path pieces only. + +The overall bet: 90% of the "dead link at deploy time" pain is +solved by just having HTML anchors refuse `String` and demand a +constructor of a known enum. The remaining 10% (typed captures, +inverse parsing) can layer on later without breaking users. -/ + +namespace LeanTea.Web + +/-- Every route enum implements this. `toPath` turns a constructor + into the URL you'd put in an `href`. -/ +class Route (α : Type) where + toPath : α → String + +/-- Render a link to a typed route. Refuses raw String hrefs by + construction — dead links become compile errors. -/ +def Route.link {α : Type} [Route α] (route : α) (label : String) + (extraAttrs : List (String × String) := []) : LeanTea.Html := + let href := Route.toPath route + let attrs := ("href", href) :: extraAttrs + LeanTea.a_ attrs [LeanTea.text label] + +/-- Emit just the URL. Occasionally handy inside a template that + the framework didn't build (e.g. a redirect target, or JSON + payload that carries a route). Same guarantee: only a + constructor gets past the type checker. -/ +def Route.href {α : Type} [Route α] (route : α) : String := + Route.toPath route + +/-! ## Compile-time dispatch helper + +The typical way an app dispatches is to `match req.path` on strings +today. With a route enum, apps can flip the direction — declare a +routing table via a total function on the enum — and Lean's +exhaustiveness check will tell them if they forget a case. -/ + +/-- A per-route handler: given the request (and any captured route + params via the enum's constructors), produce a response. This + is intentionally simpler than the RPC layer's Handler — it's + the thin ceremony you need for links + traditional web pages. -/ +abbrev RouteHandler (α : Type) := α → LeanTea.Net.Http.Request → IO LeanTea.Net.Http.Response + +/-- Dispatch a request via a route parser + a total handler. The + handler must cover every constructor of `α`; missing cases will + surface at compile time when the caller writes the match. -/ +def Route.dispatch {α : Type} [Route α] + (parseRoute : LeanTea.Net.Http.Request → Option α) + (handler : RouteHandler α) + (fallback : LeanTea.Net.Http.Request → IO LeanTea.Net.Http.Response := + fun _ => return LeanTea.Net.Http.Response.notFound) + : LeanTea.Net.Http.Request → IO LeanTea.Net.Http.Response := + fun req => + match parseRoute req with + | some r => handler r req + | none => fallback req + +end LeanTea.Web From b10d882102b839988513be841f77a36438c420dd Mon Sep 17 00:00:00 2001 From: "junji.hashimoto" Date: Thu, 2 Jul 2026 09:31:07 +0900 Subject: [PATCH 5/6] =?UTF-8?q?Add=20FFI=20HTTP=20backends=20=E2=80=94=20r?= =?UTF-8?q?eactor=20matches=20nginx,=20fast=20beats=20libuv=2010x?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three HTTP backends now ship, all sharing the same Handler = Request -> IO Response signature. LeanTea.Net.Backend exposes them through one enum + Backend.fromEnv so an app's main picks via LEANTEA_HTTP_BACKEND without touching handler code. * LeanTea.Net.Server (libuv, existing) — best for LLM proxy / WS / SSE / any workload with many idle connections that yield on .block. * LeanTea.Net.FastServer (c/leantea_fastnet.c) — POSIX socket() + SO_REUSEPORT + blocking recv/send behind @[extern]. N accept workers each with their own listener; kernel round-robins accepts. Skips the ~100-500 us libuv/task-scheduler hop that was capping the framework at 6 k RPS. * LeanTea.Net.ReactorServer (c/leantea_reactor.c) — kqueue on macOS/BSD, epoll on Linux. Single non-blocking event loop manages every fd. Per-conn state (recv accumulator + partial send remnant) lives in ~100 bytes of C, so idle keep-alive connections don't cost an OS thread. Default. Measured on an M-series laptop (wrk t=8 c=128 15s): libuv Server 6 218 RPS (9 % of nginx) FFI FastServer 64 297 RPS (90 % of nginx) Reactor 72 149 RPS (104 % of nginx) nginx (reference) 69 428 RPS Full numbers with p50/p99 and c=2000 saturation runs live in docs/BENCHMARKS.md. Also included in this commit: * Response.toBytes bug: it used to append a hardcoded "connection: close" header at the terminator, so every keep- alive response actually carried both keep-alive and close. ab tolerated it; strict clients would have dropped. Fixed + replaced the s! ping-pong with a single growing string + one toUTF8. Applies to all three backends. * bench_server picks up the backend from Backend.fromEnv; --fast and --reactor CLI flags stay for explicit perf runs and win over the env var. * README claims parity with nginx (measured, not aspirational). --- LeanTea.lean | 4 + LeanTea/Net/Backend.lean | 103 +++++++ LeanTea/Net/FastServer.lean | 178 +++++++++++ LeanTea/Net/Http.lean | 35 ++- LeanTea/Net/ReactorServer.lean | 87 ++++++ README.md | 6 + c/leantea_fastnet.c | 148 +++++++++ c/leantea_reactor.c | 539 +++++++++++++++++++++++++++++++++ docs/BENCHMARKS.md | 359 ++++++++++++---------- examples/BenchServer/Main.lean | 18 +- lakefile.lean | 41 +++ 11 files changed, 1349 insertions(+), 169 deletions(-) create mode 100644 LeanTea/Net/Backend.lean create mode 100644 LeanTea/Net/FastServer.lean create mode 100644 LeanTea/Net/ReactorServer.lean create mode 100644 c/leantea_fastnet.c create mode 100644 c/leantea_reactor.c diff --git a/LeanTea.lean b/LeanTea.lean index 906b559..0c85acb 100644 --- a/LeanTea.lean +++ b/LeanTea.lean @@ -6,6 +6,7 @@ import LeanTea.Runtime import LeanTea.Html import LeanTea.Html.Safe import LeanTea.Web +import LeanTea.Web.Route import LeanTea.Persist.Sqlite import LeanTea.Persist.Store import LeanTea.Persist.Query @@ -24,6 +25,9 @@ import LeanTea.Os.SafeCmd import LeanTea.Form.Csrf import LeanTea.StateMachine import LeanTea.Net.Server +import LeanTea.Net.FastServer +import LeanTea.Net.ReactorServer +import LeanTea.Net.Backend import LeanTea.Net.Desktop import LeanTea.Net.Memcached import LeanTea.Net.Valkey diff --git a/LeanTea/Net/Backend.lean b/LeanTea/Net/Backend.lean new file mode 100644 index 0000000..29ca118 --- /dev/null +++ b/LeanTea/Net/Backend.lean @@ -0,0 +1,103 @@ +import LeanTea.Net.Server +import LeanTea.Net.FastServer +import LeanTea.Net.ReactorServer + +/-! # LeanTea.Net.Backend — pick an HTTP backend at boot + +Three server flavours ship in the framework and they all take the +same `Handler = Request → IO Response`. This module exposes them +through a single `Backend` enum + `Backend.serve` dispatcher so an +app's `main` can pick one via config (env var, CLI flag) without +touching handler code. + +## The three flavours, one more time + +| variant | throughput | idle-conn ceiling | best for | +|---------------|--------------|------------------------|---------------------------| +| `.libuv` | ~6 k RPS | libuv (10 k+ idle) | LLM proxy, WS, SSE, chat | +| `.fast N` | ~64 k RPS | `LEAN_NUM_THREADS` | short-request web APIs | +| `.reactor` | ~72 k RPS | fd limit (100 k+) | **default** for HTTP APIs | + +## Typical use + +```lean +open LeanTea LeanTea.Net.Http LeanTea.Net.Backend + +def main : IO Unit := do + let backend ← Backend.fromEnv (default := .reactor) + backend.serve 8080 "0.0.0.0" myHandler +``` + +Then: + +```sh +./app # reactor (default) +LEANTEA_HTTP_BACKEND=libuv ./app # libuv Server (long-lived conns) +LEANTEA_HTTP_BACKEND=fast ./app # FastServer, 8 workers +LEANTEA_HTTP_BACKEND=fast:16 ./app # FastServer, 16 workers +``` + +Keep in mind that the reactor runs the Lean handler synchronously on +the event-loop thread — a slow handler stalls the loop. If your app +needs to make outbound HTTP / DB calls per request, either use +`.libuv` (which yields on `.block`) or `IO.asTask` the slow work off. -/ + +namespace LeanTea.Net.Backend + +open LeanTea.Net.Http + +/-- Which HTTP server to run. See the file doc for trade-offs. -/ +inductive Backend where + /-- `LeanTea.Net.Server.serveConcurrent` — libuv-backed, low + throughput but supports arbitrarily many idle keep-alive + connections. -/ + | libuv + /-- `LeanTea.Net.FastServer.serve` — POSIX-native FFI, N accept + workers bound with `SO_REUSEPORT`, one OS thread per active + connection. Very high throughput; total concurrent-conn count + capped at `LEAN_NUM_THREADS`. -/ + | fast (workers : Nat) + /-- `LeanTea.Net.ReactorServer.serve` — non-blocking kqueue/epoll + event loop. Highest throughput on this box (matches nginx) and + scales cleanly to 10 k+ idle connections. Default. -/ + | reactor + deriving Repr + +/-- Dispatch to the underlying server. `host` is only consumed by + the libuv variant; the FFI variants always bind `INADDR_ANY`. -/ +def serve (b : Backend) (port : UInt16) (host : String) (handler : Handler) + : IO Unit := + match b with + | .libuv => LeanTea.Net.Server.serveConcurrent port host handler + | .fast workers => LeanTea.Net.FastServer.serve port workers handler + | .reactor => LeanTea.Net.ReactorServer.serve port handler + +/-- Parse a spec like `libuv` / `fast` / `fast:16` / `reactor` into a + `Backend`. Unknown strings return `none` so callers can fall back + to a default or fail loudly. -/ +def parse? (s : String) : Option Backend := + match s.trim.toLower with + | "libuv" => some .libuv + | "reactor" => some .reactor + | "fast" => some (.fast 8) + | other => + if other.startsWith "fast:" then + let n := (other.drop 5).toNat?.getD 8 + some (.fast n) + else + none + +/-- Read `LEANTEA_HTTP_BACKEND` from the environment and parse it. + Falls back to `default` (reactor unless overridden) when the var + is unset or unparseable. -/ +def fromEnv (default : Backend := .reactor) : IO Backend := do + match ← IO.getEnv "LEANTEA_HTTP_BACKEND" with + | none => return default + | some s => + match parse? s with + | some b => return b + | none => + IO.eprintln s!"LEANTEA_HTTP_BACKEND={s} not recognised; falling back to {repr default}" + return default + +end LeanTea.Net.Backend diff --git a/LeanTea/Net/FastServer.lean b/LeanTea/Net/FastServer.lean new file mode 100644 index 0000000..2eaab59 --- /dev/null +++ b/LeanTea/Net/FastServer.lean @@ -0,0 +1,178 @@ +import LeanTea.Net.Http + +/-! # LeanTea.Net.FastServer — POSIX-native HTTP/1.1 with SO_REUSEPORT + +An alternative to `LeanTea.Net.Server` (which is `Std.Async.TCP` on +top of libuv). This one calls `socket(2)` / `accept(2)` / `recv(2)` / +`send(2)` directly through `c/leantea_fastnet.c`, then dispatches to +N accept-worker threads bound to the same port via `SO_REUSEPORT`. + +## Why bypass `Std.Async.TCP`? + +`Std.Async.TCP` returns an `AsyncTask` for every recv/send. Each +`.block` on that task hops through libuv → task wake → Lean fiber. +Under sustained keep-alive load our benchmarks put that hop at +100-500 µs — a hard ceiling around 6-7 k RPS regardless of +`LEAN_NUM_THREADS`. + +The blocking model here is simpler and faster: + +1. `bindReusePort` on each worker returns its own listener fd, and + the kernel round-robins accept()s across them. +2. Each worker parks in the kernel on `accept()`, wakes on a client, + handles it inline (parse → handler → serialize → send), and loops. +3. `LEAN_NUM_THREADS` controls the actual accept-loop count. + +The trade: connections are pinned to whichever worker accepted them. +Long-blocking handlers can starve one worker's queue while others +sit idle. That's fine for the stateless request/response pattern the +`Handler` type already assumes. -/ + +namespace LeanTea.Net.FastServer + +open LeanTea.Net.Http + +/-! ## POSIX socket primitives — thin `@[extern]` bindings. -/ + +/-- Create a listener socket with `SO_REUSEADDR + SO_REUSEPORT + TCP_NODELAY`, + bind to `INADDR_ANY:port`, and `listen(1024)`. Returns the fd. -/ +@[extern "lean_ft_bind_reuseport"] +opaque bindReusePort (port : UInt16) : IO UInt32 + +/-- Block until one client connects, returning its fd. `TCP_NODELAY` + is applied to the accepted socket too (the listener option doesn't + propagate on macOS). -/ +@[extern "lean_ft_accept_one"] +opaque acceptOne (listener : UInt32) : IO UInt32 + +/-- One blocking `recv`. Returns whatever the kernel hands over up to + `max` bytes; empty ByteArray means EOF. -/ +@[extern "lean_ft_recv_bytes"] +opaque recvBytes (fd : UInt32) (max : UInt32) : IO ByteArray + +/-- Write-all: retries on partial write and EINTR. -/ +@[extern "lean_ft_send_bytes"] +opaque sendBytes (fd : UInt32) (bytes : ByteArray) : IO Unit + +/-- Half-close the write side. Lets the client read the final response + bytes before the connection tears down. -/ +@[extern "lean_ft_shutdown"] +opaque shutdownFd (fd : UInt32) : IO Unit + +/-- Release the descriptor. -/ +@[extern "lean_ft_close"] +opaque closeFd (fd : UInt32) : IO Unit + +/-! ## Per-connection loop + +`readOneRequest` reads until `\r\n\r\n` + Content-Length bytes are in +hand; then we split off the current request, keep the tail as +`leftover` for the next iteration, and call the handler. Same shape +as `Std.Async.TCP` server, just without the async hop. -/ + +private partial def readOneRequest (fd : UInt32) (acc : ByteArray) + : IO (Option (ByteArray × ByteArray)) := do + match splitHeaders acc with + | some (headersStr, bodySoFar) => + let lower := headersStr.toLower + let cl := match lower.splitOn "content-length:" with + | _ :: rest :: _ => + let v := (rest.takeWhile (· != '\r')).toString.trim + v.toNat?.getD 0 + | _ => 0 + if bodySoFar.size ≥ cl then + let headBytes := headersStr.toUTF8 + let sep : ByteArray := ⟨#[0x0d, 0x0a, 0x0d, 0x0a]⟩ + let headEnd := headBytes.size + sep.size + let reqEnd := headEnd + cl + return some (acc.extract 0 reqEnd, acc.extract reqEnd acc.size) + else + let chunk ← recvBytes fd 8192 + if chunk.size == 0 then return none + readOneRequest fd (acc ++ chunk) + | none => + let chunk ← recvBytes fd 8192 + if chunk.size == 0 then return none + readOneRequest fd (acc ++ chunk) + +private def wantsClose (req : Request) : Bool := + let conn := (req.header? "connection").getD "" + let l := conn.toLower + if l.trim == "close" then true + else if req.version.startsWith "HTTP/1.0" && l != "keep-alive" then true + else false + +private def annotateConnection (resp : Response) (close : Bool) : Response := + let already := resp.headers.any (fun (n, _) => n.toLower == "connection") + if already then resp + else + let v := if close then "close" else "keep-alive" + { resp with headers := resp.headers.push ("connection", v) } + +private partial def handleConnLoop (handler : Handler) (fd : UInt32) + (leftover : ByteArray) : IO Unit := do + match ← readOneRequest fd leftover with + | none => shutdownFd fd + | some (raw, next) => + let body : ByteArray := + match baFindSeq raw CRLFCRLF with + | some h => raw.extract (h + 4) raw.size + | none => .empty + let (resp, close) ← match parseRequest raw body with + | some req => + let c := wantsClose req + let r ← try handler req + catch e => pure (Response.serverError s!"handler: {e}") + pure (r, c) + | none => pure (Response.badRequest, true) + let resp := annotateConnection resp close + sendBytes fd resp.toBytes + if close then shutdownFd fd + else handleConnLoop handler fd next + +private def handleConn (handler : Handler) (fd : UInt32) : IO Unit := do + try + handleConnLoop handler fd .empty + catch _ => pure () + closeFd fd + +/-! ## Accept-worker loops + +Two design points behind the loop shape below: + +1. Each accepted connection is handed off to a fresh `IO.asTask` so + the accept worker can immediately return to `accept()`. If we + handled the connection inline, keep-alive would pin one connection + per worker and any concurrency past `workers` would deadlock in + the OS accept queue. +2. The handler task uses the C blocking recv/send — no libuv hop. It + does spend a worker thread while parked in the kernel, but that's + also true of the async version; the difference is we avoid + ~100-500 µs of scheduler overhead per syscall. -/ + +private partial def acceptLoop (listener : UInt32) (handler : Handler) : IO Unit := do + let client ← acceptOne listener + let _ ← IO.asTask (handleConn handler client) + acceptLoop listener handler + +/-- Serve `handler` on `port` with `workers` accept threads, each + bound to the port via `SO_REUSEPORT`. The main thread parks on + the first worker; the rest run as `IO.asTask`s. + + `workers` defaults to `1` — bump it (usually to `LEAN_NUM_THREADS` + or physical core count) for real load. Above ~core count you get + diminishing returns as the kernel-level accept queue is already + saturated. -/ +def serve (port : UInt16 := 8001) (workers : Nat := 1) (handler : Handler) + : IO Unit := do + IO.eprintln s!"fastserving on http://0.0.0.0:{port}/ (workers={workers})" + -- Spawn (workers - 1) background workers, then run one on this thread. + let tail : Nat := if workers == 0 then 0 else workers - 1 + for _ in [0:tail] do + let _ ← IO.asTask do + let listener ← bindReusePort port + acceptLoop listener handler + let listener ← bindReusePort port + acceptLoop listener handler + +end LeanTea.Net.FastServer diff --git a/LeanTea/Net/Http.lean b/LeanTea/Net/Http.lean index 1a2ebbd..638fc7d 100644 --- a/LeanTea/Net/Http.lean +++ b/LeanTea/Net/Http.lean @@ -282,14 +282,33 @@ def parseRequest (raw : ByteArray) (body : ByteArray) : Option Request := do /-! ## Response serialization -/ -def Response.toBytes (r : Response) : ByteArray := - let status := s!"HTTP/1.1 {r.status} {statusText r.status}\r\n" - let hdrs := r.headers.foldl - (fun acc (k, v) => acc ++ s!"{k}: {v}\r\n") "" - let cl := s!"content-length: {r.body.size}\r\n" - let close := "connection: close\r\n\r\n" - let head := (status ++ hdrs ++ cl ++ close).toUTF8 - head ++ r.body +/-- Serialize the response. + + Historical bug worth calling out: this used to append a hardcoded + `connection: close` header at the terminator, so responses always + carried two `connection:` headers (the annotated `keep-alive` plus + the hardcoded `close`). Lenient clients like `ab` happened to still + reuse the socket, which is why the keep-alive benchmark improved, + but strict clients would drop the connection. Now the terminator + is just the empty line — the caller sets connection state via + `annotateConnection` in the server loop. -/ +def Response.toBytes (r : Response) : ByteArray := Id.run do + -- Build the head as one growing string, one final toUTF8, one ByteArray concat. + let mut head : String := "" + head := head ++ "HTTP/1.1 " + head := head ++ toString r.status + head := head ++ " " + head := head ++ statusText r.status + head := head ++ "\r\n" + for (k, v) in r.headers do + head := head ++ k + head := head ++ ": " + head := head ++ v + head := head ++ "\r\n" + head := head ++ "content-length: " + head := head ++ toString r.body.size + head := head ++ "\r\n\r\n" + return head.toUTF8 ++ r.body /-! ## Handler type -/ diff --git a/LeanTea/Net/ReactorServer.lean b/LeanTea/Net/ReactorServer.lean new file mode 100644 index 0000000..5c7f13b --- /dev/null +++ b/LeanTea/Net/ReactorServer.lean @@ -0,0 +1,87 @@ +import LeanTea.Net.Http + +/-! # LeanTea.Net.ReactorServer — non-blocking single-thread reactor + +The third HTTP server flavour in the framework. Trades: + +| server | throughput | idle-conn ceiling | when to use | +|--------------|--------------|------------------------|---------------------------| +| `Server` | ~6k RPS | ~libuv (10k+) | LLM proxy, WS, SSE, chat | +| `FastServer` | ~64k RPS | ~`LEAN_NUM_THREADS` | short-request web APIs | +| `ReactorServer` (this file) | ~40-60k RPS | ~fd limit (100k) | both, at some latency cost | + +Under the hood: `c/leantea_reactor.c` spins one kqueue (macOS) or +epoll (Linux) thread that manages all fds non-blocking. When a full +HTTP request has arrived in its recv buffer, C invokes a Lean +callback of type `ByteArray → IO ByteArray` — request bytes in, +response bytes out. The Lean callback runs on the reactor thread, +so a slow handler will stall the whole event loop; that's the price +of avoiding thread-per-conn. Handlers that need to do heavy work +should `IO.asTask` it off. + +## Handler shape + +The public API wraps the raw ByteArray callback so app authors keep +writing the same `Handler = Request → IO Response` they use with the +other servers: + +```lean +LeanTea.Net.ReactorServer.serve 8080 fun req => do + return Response.text 200 "hello" +``` + +Internally we parse `raw : ByteArray` into a `Request` via the same +`parseRequest` used elsewhere, run the user handler, and re-serialize +via `Response.toBytes`. That means all of the framework's per-request +allocation (splitOn, toLower, s! interpolation) happens on the +reactor thread. Reducing it — picohttpparser-style zero-alloc parser +plus a ByteArray builder for the response — is the natural next +lever, but even the current shape sustains 40-60 k RPS. -/ + +namespace LeanTea.Net.ReactorServer + +open LeanTea.Net.Http + +/-- Low-level entry point: hand C a callback that maps raw request + bytes to raw response bytes. Blocks the caller for the lifetime of + the server (same shape as `Server.serve`). -/ +@[extern "lean_reactor_run"] +opaque reactorRun (port : UInt16) (rawHandler : ByteArray → IO ByteArray) : IO Unit + +/-! ## User-facing wrapper -/ + +/-- Turn a `Handler` into the raw-bytes callback the reactor speaks. -/ +private def wrap (handler : Handler) (raw : ByteArray) : IO ByteArray := do + -- Split the raw request into header block + body — same layout the + -- Server/FastServer paths compute. + let body : ByteArray := + match baFindSeq raw CRLFCRLF with + | some h => raw.extract (h + 4) raw.size + | none => .empty + let (resp, close) ← match parseRequest raw body with + | some req => + let conn := (req.header? "connection").getD "" + let l := conn.toLower + let wantsClose := + if l.trim == "close" then true + else if req.version.startsWith "HTTP/1.0" && l != "keep-alive" then true + else false + let r ← try handler req + catch e => pure (Response.serverError s!"handler: {e}") + pure (r, wantsClose) + | none => pure (Response.badRequest, true) + -- Annotate Connection: header (idempotently) so the client knows + -- whether to reuse the socket. + let resp := + if resp.headers.any (fun (n, _) => n.toLower == "connection") then resp + else + let v := if close then "close" else "keep-alive" + { resp with headers := resp.headers.push ("connection", v) } + return resp.toBytes + +/-- Serve `handler` on `port` using the non-blocking reactor. Blocks. -/ +def serve (port : UInt16 := 8001) (handler : Handler) : IO Unit := do + IO.eprintln s!"reactor-serving on http://0.0.0.0:{port}/" + reactorRun port (wrap handler) + +end LeanTea.Net.ReactorServer diff --git a/README.md b/README.md index a94c06b..e3cdd4d 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,12 @@ Questions, design discussion, and weekly progress threads live in the Discord ch into the binary, so deployment doesn't need `-lsqlite3`. - The browser only ever sees plain HTML + one inlined JS file (Web Speech API is the only external browser API used). +- **Matches (slightly beats) tuned nginx** on hello-world / 5-field + JSON: the `LeanTea.Net.ReactorServer` backend is a + `c/leantea_reactor.c` non-blocking event loop + (kqueue on macOS/BSD, epoll on Linux) driving a Lean `ByteArray → + IO ByteArray` callback per request. **72 k RPS at c=128** vs + nginx 69 k on the same box. Full numbers in `docs/BENCHMARKS.md`. ## Inspirations diff --git a/c/leantea_fastnet.c b/c/leantea_fastnet.c new file mode 100644 index 0000000..5545e8e --- /dev/null +++ b/c/leantea_fastnet.c @@ -0,0 +1,148 @@ +/* leantea_fastnet.c — POSIX-native TCP primitives for LeanTea.Net.FastServer. + * + * Why not `Std.Async.TCP` ? + * Std.Async.TCP is libuv-backed and every recv/send hops through an + * async task + `.block` wake. Per-syscall that costs ~100-500us in + * our numbers. A blocking-thread-per-connection model side-steps + * the entire async scheduler: a Lean worker thread parks in the + * kernel on accept()/recv() and wakes on data. Cheaper per hop; and + * more importantly, N listener sockets bound with SO_REUSEPORT are + * the only way to scale accept() past one core. + * + * Surface (all `@[extern]`-facing): + * bindReusePort : UInt16 -> IO UInt32 -- listener fd + * acceptOne : UInt32 -> IO UInt32 -- client fd + * recvBytes : UInt32 -> UInt32 -> IO ByteArray + * sendBytes : UInt32 -> ByteArray -> IO Unit + * shutdownFd : UInt32 -> IO Unit + * closeFd : UInt32 -> IO Unit + * + * Every helper here reports errno via lean_io_result_mk_error with a + * user-error string. Callers on the Lean side see ordinary IO errors. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +static lean_obj_res mk_io_error(const char *msg_prefix) { + char buf[256]; + snprintf(buf, sizeof(buf), "%s: %s", msg_prefix, strerror(errno)); + return lean_io_result_mk_error( + lean_mk_io_user_error(lean_mk_string(buf))); +} + +/* bindReusePort(port) — create+configure+bind+listen a TCP socket. */ +LEAN_EXPORT lean_obj_res lean_ft_bind_reuseport(uint16_t port, lean_object *w) { + (void)w; + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) return mk_io_error("socket"); + + int one = 1; + /* Both are needed: REUSEADDR lets us rebind quickly after crash, + REUSEPORT lets multiple listeners share the port (kernel + round-robins accepts across them). */ + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) { + close(fd); return mk_io_error("SO_REUSEADDR"); + } +#ifdef SO_REUSEPORT + if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0) { + close(fd); return mk_io_error("SO_REUSEPORT"); + } +#endif + /* Nagle off — small responses hit the wire immediately. */ + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)); + + struct sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port); + + if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + close(fd); return mk_io_error("bind"); + } + /* Backlog 1024: matches the tuned nginx conf in bench_nginx.conf. */ + if (listen(fd, 1024) < 0) { + close(fd); return mk_io_error("listen"); + } + return lean_io_result_mk_ok(lean_box_uint32((uint32_t)fd)); +} + +/* acceptOne(fd) — block until one client connects. */ +LEAN_EXPORT lean_obj_res lean_ft_accept_one(uint32_t fd, lean_object *w) { + (void)w; + struct sockaddr_in cli; + socklen_t clen = sizeof(cli); + int cfd; + do { + cfd = accept((int)fd, (struct sockaddr *)&cli, &clen); + } while (cfd < 0 && errno == EINTR); + if (cfd < 0) return mk_io_error("accept"); + /* Also disable Nagle on the accepted socket — the listener setting + doesn't propagate on macOS. */ + int one = 1; + setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)); + return lean_io_result_mk_ok(lean_box_uint32((uint32_t)cfd)); +} + +/* recvBytes(fd, max) — one blocking recv. Returns whatever the kernel + hands over up to `max`. Empty ByteArray => EOF. */ +LEAN_EXPORT lean_obj_res lean_ft_recv_bytes(uint32_t fd, uint32_t max, lean_object *w) { + (void)w; + if (max == 0) max = 4096; + /* Allocate a Lean sarray directly and recv into its buffer. + We over-allocate to `max` then shrink via the size field. */ + lean_object *ba = lean_alloc_sarray(1, 0, (size_t)max); + uint8_t *buf = lean_sarray_cptr(ba); + ssize_t n; + do { + n = recv((int)fd, buf, (size_t)max, 0); + } while (n < 0 && errno == EINTR); + if (n < 0) { + lean_dec(ba); + return mk_io_error("recv"); + } + lean_sarray_set_size(ba, (size_t)n); + return lean_io_result_mk_ok(ba); +} + +/* sendBytes(fd, ba) — write-all, retrying on partial writes / EINTR. */ +LEAN_EXPORT lean_obj_res lean_ft_send_bytes(uint32_t fd, b_lean_obj_arg ba, lean_object *w) { + (void)w; + size_t total = lean_sarray_size(ba); + const uint8_t *data = lean_sarray_cptr(ba); + size_t sent = 0; + while (sent < total) { + ssize_t n = send((int)fd, data + sent, total - sent, 0); + if (n < 0) { + if (errno == EINTR) continue; + return mk_io_error("send"); + } + sent += (size_t)n; + } + return lean_io_result_mk_ok(lean_box(0)); +} + +/* shutdownFd(fd) — half-close write side; lets client read final bytes. */ +LEAN_EXPORT lean_obj_res lean_ft_shutdown(uint32_t fd, lean_object *w) { + (void)w; + shutdown((int)fd, SHUT_WR); + return lean_io_result_mk_ok(lean_box(0)); +} + +/* closeFd(fd) — release the descriptor. */ +LEAN_EXPORT lean_obj_res lean_ft_close(uint32_t fd, lean_object *w) { + (void)w; + close((int)fd); + return lean_io_result_mk_ok(lean_box(0)); +} diff --git a/c/leantea_reactor.c b/c/leantea_reactor.c new file mode 100644 index 0000000..9023d44 --- /dev/null +++ b/c/leantea_reactor.c @@ -0,0 +1,539 @@ +/* leantea_reactor.c — Non-blocking HTTP/1.1 reactor for LeanTea.Net.ReactorServer. + * + * Problem this solves + * ------------------- + * The FFI FastServer (see leantea_fastnet.c) recovered ~10x throughput + * by removing the Lean-async / libuv hop from every recv/send. But + * that model parks one OS thread per open connection: at 10 k idle + * keep-alive connections you either run out of `LEAN_NUM_THREADS` + * task workers or run out of OS-thread stack space. Chat / SSE / + * long-poll workloads can't live on it. + * + * This reactor is the standard "one epoll/kqueue thread + non-blocking + * fds" design. All accepted sockets are set O_NONBLOCK; a single + * kqueue(2) or epoll(2) thread drains readable/writable events; per- + * connection state (recv accumulator, send remnant) lives in C; a + * Lean callback is invoked once per fully-buffered request to produce + * the response bytes. + * + * The Lean side stays functional / linear: the callback is + * `ByteArray -> IO ByteArray` — request bytes in, response bytes out. + * No fiber, no promise, no thread-per-conn. + * + * Portability + * ----------- + * kqueue on macOS + BSD, epoll on Linux. The dispatching loop is + * behind #ifdef; the connection-state code and Lean interop are + * shared. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(__APPLE__) || defined(__FreeBSD__) + #define LEANTEA_HAVE_KQUEUE 1 + #include + #include +#elif defined(__linux__) + #define LEANTEA_HAVE_EPOLL 1 + #include +#else + #error "leantea_reactor: neither kqueue nor epoll available" +#endif + +#include + +/* ------------------------------------------------------------ */ +/* Small buffer type */ +/* ------------------------------------------------------------ */ + +typedef struct { + uint8_t *data; + size_t len; + size_t cap; +} buf_t; + +static void buf_init(buf_t *b) { b->data = NULL; b->len = 0; b->cap = 0; } + +static bool buf_reserve(buf_t *b, size_t need) { + if (b->cap >= need) return true; + size_t nc = b->cap ? b->cap * 2 : 4096; + while (nc < need) nc *= 2; + uint8_t *p = (uint8_t *)realloc(b->data, nc); + if (!p) return false; + b->data = p; b->cap = nc; + return true; +} + +static void buf_free(buf_t *b) { + free(b->data); b->data = NULL; b->len = 0; b->cap = 0; +} + +/* ------------------------------------------------------------ */ +/* Per-connection state */ +/* ------------------------------------------------------------ */ + +typedef struct conn_s { + int fd; + buf_t inbuf; /* accumulated recv bytes for the current request */ + buf_t outbuf; /* response bytes not yet fully sent */ + size_t out_sent; /* how many bytes of outbuf we've already written */ + bool want_close; /* set once we've decided this is the last request */ + bool writing; /* whether we're currently mid-write on this fd */ +} conn_t; + +static conn_t *conn_new(int fd) { + conn_t *c = (conn_t *)calloc(1, sizeof(conn_t)); + if (!c) return NULL; + c->fd = fd; + buf_init(&c->inbuf); buf_init(&c->outbuf); + return c; +} + +static void conn_free(conn_t *c) { + if (!c) return; + buf_free(&c->inbuf); buf_free(&c->outbuf); + close(c->fd); + free(c); +} + +/* ------------------------------------------------------------ */ +/* Reactor state (one per `reactor_create` call) */ +/* ------------------------------------------------------------ */ + +typedef struct reactor_s { + int poll_fd; /* kqueue or epoll fd */ + int listen_fd; /* the (single) listener */ + lean_object *handler; /* Lean closure ByteArray -> IO ByteArray */ + pthread_t thread; + atomic_int stop; +} reactor_t; + +/* Forward decls */ +static void reactor_watch_read (reactor_t *r, int fd, void *udata); +static void reactor_watch_write(reactor_t *r, int fd, void *udata); +static void reactor_unwatch (reactor_t *r, int fd); + +/* ------------------------------------------------------------ */ +/* Utility: non-blocking on a socket */ +/* ------------------------------------------------------------ */ + +static int set_nonblocking(int fd) { + int flags = fcntl(fd, F_GETFL, 0); + if (flags < 0) return -1; + return fcntl(fd, F_SETFL, flags | O_NONBLOCK); +} + +/* ------------------------------------------------------------ */ +/* Poll-abstraction: register / arm / unregister */ +/* ------------------------------------------------------------ */ + +#ifdef LEANTEA_HAVE_KQUEUE +static void reactor_watch_read(reactor_t *r, int fd, void *udata) { + struct kevent ev; + EV_SET(&ev, fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, udata); + kevent(r->poll_fd, &ev, 1, NULL, 0, NULL); +} +static void reactor_watch_write(reactor_t *r, int fd, void *udata) { + struct kevent ev; + EV_SET(&ev, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, udata); + kevent(r->poll_fd, &ev, 1, NULL, 0, NULL); +} +static void reactor_unwatch(reactor_t *r, int fd) { + struct kevent ev[2]; + EV_SET(&ev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + EV_SET(&ev[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + kevent(r->poll_fd, ev, 2, NULL, 0, NULL); +} +#endif + +#ifdef LEANTEA_HAVE_EPOLL +static void reactor_arm(reactor_t *r, int fd, uint32_t events, void *udata) { + struct epoll_event ev; + memset(&ev, 0, sizeof(ev)); + ev.events = events | EPOLLET; + ev.data.ptr = udata; + /* Try ADD; if already registered, MOD. */ + if (epoll_ctl(r->poll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) { + epoll_ctl(r->poll_fd, EPOLL_CTL_MOD, fd, &ev); + } +} +static void reactor_watch_read(reactor_t *r, int fd, void *udata) { + reactor_arm(r, fd, EPOLLIN, udata); +} +static void reactor_watch_write(reactor_t *r, int fd, void *udata) { + reactor_arm(r, fd, EPOLLOUT, udata); +} +static void reactor_unwatch(reactor_t *r, int fd) { + epoll_ctl(r->poll_fd, EPOLL_CTL_DEL, fd, NULL); +} +#endif + +/* ------------------------------------------------------------ */ +/* HTTP header parsing helpers (just enough to slice a request) */ +/* ------------------------------------------------------------ */ + +/* Find "\r\n\r\n" — return offset of first byte of body, or -1. */ +static ssize_t find_header_end(const uint8_t *buf, size_t len) { + if (len < 4) return -1; + for (size_t i = 0; i + 3 < len; i++) { + if (buf[i] == '\r' && buf[i+1] == '\n' && + buf[i+2] == '\r' && buf[i+3] == '\n') { + return (ssize_t)(i + 4); + } + } + return -1; +} + +/* Scan headers for "content-length:" — case-insensitive. + Returns -1 if absent, else the parsed non-negative integer. */ +static long parse_content_length(const uint8_t *buf, size_t hdr_len) { + static const char key[] = "content-length:"; + size_t klen = sizeof(key) - 1; + for (size_t i = 0; i + klen < hdr_len; i++) { + bool match = true; + for (size_t j = 0; j < klen; j++) { + char c = (char)buf[i + j]; + if (c >= 'A' && c <= 'Z') c = (char)(c + 32); + if (c != key[j]) { match = false; break; } + } + if (!match) continue; + /* skip whitespace, read digits */ + size_t p = i + klen; + while (p < hdr_len && (buf[p] == ' ' || buf[p] == '\t')) p++; + long v = 0; + while (p < hdr_len && buf[p] >= '0' && buf[p] <= '9') { + v = v * 10 + (buf[p] - '0'); + p++; + } + return v; + } + return -1; +} + +/* ------------------------------------------------------------ */ +/* Invoking the Lean callback */ +/* ------------------------------------------------------------ */ + +/* handler : ByteArray -> IO ByteArray + We invoke it with the raw request bytes, pull out the response + ByteArray from the IO result, and return a fresh strong reference. */ +static lean_obj_res invoke_handler(lean_object *h, const uint8_t *req, size_t req_len) { + /* Build a fresh ByteArray owning a copy of the request bytes. */ + lean_object *req_ba = lean_alloc_sarray(1, req_len, req_len); + memcpy(lean_sarray_cptr(req_ba), req, req_len); + + /* Increment the handler refcount because lean_apply_1 consumes it. */ + lean_inc(h); + lean_object *io_thunk = lean_apply_1(h, req_ba); + /* io_thunk : IO ByteArray. Force by applying the RealWorld token. */ + lean_object *io_res = lean_apply_1(io_thunk, lean_box(0)); + + if (lean_io_result_is_ok(io_res)) { + lean_object *ba = lean_io_result_get_value(io_res); + lean_inc(ba); + lean_dec(io_res); + return ba; + } else { + lean_dec(io_res); + /* Return a canned 500 so the connection can be shut cleanly. */ + static const char err[] = + "HTTP/1.1 500 Internal Server Error\r\n" + "content-length: 0\r\n" + "connection: close\r\n\r\n"; + size_t n = sizeof(err) - 1; + lean_object *ba = lean_alloc_sarray(1, n, n); + memcpy(lean_sarray_cptr(ba), err, n); + return ba; + } +} + +/* ------------------------------------------------------------ */ +/* Per-connection event handling */ +/* ------------------------------------------------------------ */ + +static void conn_close(reactor_t *r, conn_t *c) { + reactor_unwatch(r, c->fd); + conn_free(c); +} + +/* Try to drain outbuf via non-blocking send. + Returns 0: fully sent, 1: partial (EAGAIN), -1: error. */ +static int conn_try_write(conn_t *c) { + while (c->out_sent < c->outbuf.len) { + ssize_t n = send(c->fd, + c->outbuf.data + c->out_sent, + c->outbuf.len - c->out_sent, + 0); + if (n < 0) { + if (errno == EINTR) continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) return 1; + return -1; + } + c->out_sent += (size_t)n; + } + /* fully sent */ + c->outbuf.len = 0; + c->out_sent = 0; + return 0; +} + +/* Called when there's data in inbuf; slice out a full request, invoke + Lean, buffer the response. Returns the number of requests handled + in this call (>=0), or -1 on fatal error. */ +static int conn_process(reactor_t *r, conn_t *c) { + int handled = 0; + for (;;) { + ssize_t hdr_end = find_header_end(c->inbuf.data, c->inbuf.len); + if (hdr_end < 0) return handled; + long cl = parse_content_length(c->inbuf.data, (size_t)hdr_end); + if (cl < 0) cl = 0; + size_t need = (size_t)hdr_end + (size_t)cl; + if (c->inbuf.len < need) return handled; + + /* We have one full request in [0, need). Invoke Lean. */ + lean_object *resp = invoke_handler(r->handler, c->inbuf.data, need); + size_t rlen = lean_sarray_size(resp); + if (!buf_reserve(&c->outbuf, c->outbuf.len + rlen)) { + lean_dec(resp); return -1; + } + memcpy(c->outbuf.data + c->outbuf.len, lean_sarray_cptr(resp), rlen); + c->outbuf.len += rlen; + lean_dec(resp); + + /* Consume the request bytes; carry any pipelined bytes forward. */ + size_t rest = c->inbuf.len - need; + if (rest > 0) { + memmove(c->inbuf.data, c->inbuf.data + need, rest); + } + c->inbuf.len = rest; + handled++; + } +} + +/* Called by the reactor when the fd is readable. */ +static void on_readable(reactor_t *r, conn_t *c) { + /* Drain non-blocking recv into inbuf. */ + for (;;) { + if (!buf_reserve(&c->inbuf, c->inbuf.len + 8192)) { + conn_close(r, c); return; + } + ssize_t n = recv(c->fd, + c->inbuf.data + c->inbuf.len, + c->inbuf.cap - c->inbuf.len, + 0); + if (n < 0) { + if (errno == EINTR) continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) break; + conn_close(r, c); return; + } + if (n == 0) { + /* Client closed. If we have a pending response buffered, + still try to send it, otherwise tear down. */ + c->want_close = true; + break; + } + c->inbuf.len += (size_t)n; + } + + /* Slice out and process any complete requests. */ + int handled = conn_process(r, c); + if (handled < 0) { conn_close(r, c); return; } + + /* Try to flush any queued response. */ + if (c->outbuf.len > 0) { + int w = conn_try_write(c); + if (w < 0) { conn_close(r, c); return; } + if (w == 1) { + /* Partial write — register for EPOLLOUT / EVFILT_WRITE. */ + c->writing = true; + reactor_watch_write(r, c->fd, c); + return; + } + } + + /* Nothing left to write. If client closed and no response pending, + we're done. */ + if (c->want_close && c->outbuf.len == 0) { + conn_close(r, c); + } +} + +/* Called when the fd is writable and we had a partial write. */ +static void on_writable(reactor_t *r, conn_t *c) { + int w = conn_try_write(c); + if (w < 0) { conn_close(r, c); return; } + if (w == 1) return; /* still partial */ + c->writing = false; + /* Response fully sent. Re-arm for read. */ + reactor_watch_read(r, c->fd, c); + if (c->want_close) conn_close(r, c); +} + +/* Called when the listener is readable — accept N new connections. */ +static void on_accept(reactor_t *r) { + for (;;) { + struct sockaddr_in cli; socklen_t clen = sizeof(cli); + int cfd = accept(r->listen_fd, (struct sockaddr *)&cli, &clen); + if (cfd < 0) { + if (errno == EINTR) continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) return; + return; + } + set_nonblocking(cfd); + int one = 1; + setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)); + conn_t *c = conn_new(cfd); + if (!c) { close(cfd); continue; } + reactor_watch_read(r, cfd, c); + } +} + +/* ------------------------------------------------------------ */ +/* Reactor thread */ +/* ------------------------------------------------------------ */ + +#ifdef LEANTEA_HAVE_KQUEUE +static void reactor_loop(reactor_t *r) { + struct kevent evs[128]; + while (!atomic_load(&r->stop)) { + int n = kevent(r->poll_fd, NULL, 0, evs, 128, NULL); + if (n < 0) { + if (errno == EINTR) continue; + break; + } + for (int i = 0; i < n; i++) { + void *ud = evs[i].udata; + int fd = (int)evs[i].ident; + if (fd == r->listen_fd) { + on_accept(r); + } else if (ud) { + conn_t *c = (conn_t *)ud; + if (evs[i].filter == EVFILT_READ) on_readable(r, c); + if (evs[i].filter == EVFILT_WRITE) on_writable(r, c); + } + } + } +} +#endif + +#ifdef LEANTEA_HAVE_EPOLL +static void reactor_loop(reactor_t *r) { + struct epoll_event evs[128]; + while (!atomic_load(&r->stop)) { + int n = epoll_wait(r->poll_fd, evs, 128, -1); + if (n < 0) { + if (errno == EINTR) continue; + break; + } + for (int i = 0; i < n; i++) { + void *ud = evs[i].data.ptr; + if (ud == NULL) { + on_accept(r); + } else { + conn_t *c = (conn_t *)ud; + if (evs[i].events & EPOLLIN) on_readable(r, c); + if (evs[i].events & EPOLLOUT) on_writable(r, c); + } + } + } +} +#endif + +static void *reactor_thread_main(void *arg) { + reactor_t *r = (reactor_t *)arg; + reactor_loop(r); + return NULL; +} + +/* ------------------------------------------------------------ */ +/* Lean-facing entry point */ +/* ------------------------------------------------------------ */ + +/* Build+bind+listen a listener with SO_REUSEADDR + SO_REUSEPORT, + non-blocking. Same shape as lean_ft_bind_reuseport but no fd + returned (we stash inside the reactor). */ +static int build_listener(uint16_t port) { + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) return -1; + int one = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); +#ifdef SO_REUSEPORT + setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)); +#endif + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)); + + struct sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port); + + if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { close(fd); return -1; } + if (listen(fd, 1024) < 0) { close(fd); return -1; } + if (set_nonblocking(fd) < 0) { close(fd); return -1; } + return fd; +} + +/* lean_reactor_run(port, handler) : IO Unit + Spins up a reactor and joins its thread. This call blocks the + caller for the lifetime of the server — the way the existing + `serve` functions do. */ +LEAN_EXPORT lean_obj_res lean_reactor_run(uint16_t port, + lean_object *handler, + lean_object *w) { + (void)w; + reactor_t *r = (reactor_t *)calloc(1, sizeof(reactor_t)); + if (!r) { + return lean_io_result_mk_error( + lean_mk_io_user_error(lean_mk_string("reactor: alloc failed"))); + } + atomic_init(&r->stop, 0); + r->handler = handler; /* handler owned by the reactor for its lifetime */ + +#ifdef LEANTEA_HAVE_KQUEUE + r->poll_fd = kqueue(); +#endif +#ifdef LEANTEA_HAVE_EPOLL + r->poll_fd = epoll_create1(0); +#endif + if (r->poll_fd < 0) { + free(r); + return lean_io_result_mk_error( + lean_mk_io_user_error(lean_mk_string("reactor: poll fd create"))); + } + + r->listen_fd = build_listener(port); + if (r->listen_fd < 0) { + close(r->poll_fd); free(r); + char buf[128]; + snprintf(buf, sizeof(buf), "reactor: bind :%u failed: %s", port, strerror(errno)); + return lean_io_result_mk_error(lean_mk_io_user_error(lean_mk_string(buf))); + } + /* Register the listener with a NULL/0 udata so the loop can tell + "this is the accepter" apart from a regular conn. + kqueue path uses ident==listen_fd check; epoll uses data.ptr==NULL. */ + reactor_watch_read(r, r->listen_fd, NULL); + + /* Run the loop on this OS thread — no background thread needed. + Callers already spawn `main` inside its own thread if they want. */ + reactor_loop(r); + + /* Not really reachable in normal ops. */ + close(r->listen_fd); + close(r->poll_fd); + free(r); + return lean_io_result_mk_ok(lean_box(0)); +} diff --git a/docs/BENCHMARKS.md b/docs/BENCHMARKS.md index c151250..071e7ca 100644 --- a/docs/BENCHMARKS.md +++ b/docs/BENCHMARKS.md @@ -1,24 +1,36 @@ -# LeanTea.Net.Server — perf, and what it says about the architecture - -Two rounds of numbers so the shape of the story is clear: - -1. **Round 1 (baseline).** No keep-alive, single-request-per-TCP-connection. - Peak ~6.6 k RPS at `LEAN_NUM_THREADS=1`; adding threads *hurt*. -2. **Round 2 (after HTTP/1.1 keep-alive + `noDelay`).** The peak - moves to `LEAN_NUM_THREADS=16` and RPS scales roughly with - `sqrt(threads)` up to that ceiling. Absolute peak is comparable - to round 1 — the ceiling is bounded by a single `accept` thread, - not by codec, allocation, or scheduler overhead. - -Both peaks land around **6-7 k RPS on all three test routes**. -nginx on the same box for a static file of the same size does -80-120 k RPS; a tuned Haskell warp does 40-80 k. This framework is -therefore **1-2 orders of magnitude below line-rate for a plain -HTTP workload**, and that's fine to say out loud — the pitch has -always been "pure Lean, no external HTTP dep," not "beats nginx." -Reaching linear scaling to core count is a plumbing job (SO_REUSEPORT -+ per-worker accept sockets) that needs a socket-option API in -`Std.Net`; a note at the bottom of this file describes the work. +# LeanTea HTTP throughput — bench + nginx comparison + +Four rounds. The framework's HTTP throughput moved ~10× to nginx +parity, and then a further ~10 % to match or slightly exceed nginx +on this box while dropping to a single OS thread. + +| round | mechanism | health RPS | vs nginx | +|-------|------------------------------------|-----------:|---------:| +| 1 | libuv, no keep-alive | 6 657 | 9 % | +| 2 | libuv + HTTP/1.1 keep-alive | 6 218 | 9 % | +| 3 | POSIX-native (FFI, SO_REUSEPORT) | 64 297 | 90 % | +| 4 | epoll/kqueue reactor | **72 149** | **104 %**| +| ref | nginx (same box, same conf) | 69 428 | — | + +Rounds 1–2 topped out ~6-7 k RPS regardless of thread count because +the single `Std.Async.TCP` accept loop was the bottleneck and every +recv/send hopped through libuv + a Lean task wake — profiled at +100–500 µs per hop. Round 3 replaces both: `c/leantea_fastnet.c` +exposes `bind_reuseport`, `accept_one`, `recv_bytes`, `send_bytes`, +`shutdown_fd`, `close_fd` as thin `@[extern]` bindings, and +`LeanTea.Net.FastServer.serve` runs N accept workers each with their +own listener bound via `SO_REUSEPORT`. Blocking calls sit in the +kernel, not in libuv, so per-syscall overhead disappears. + +Round 3 got the framework to ~90 % of nginx. Round 4 closes the gap +by switching from thread-per-connection to an epoll/kqueue reactor +inside `c/leantea_reactor.c` — one non-blocking event loop drains +every fd, and the Lean callback runs synchronously on that thread +per fully-buffered request. That both eliminates the last remaining +`.block` hop and makes each idle connection worth ~100 bytes of +C state instead of a whole OS thread. Result: the reactor beats a +tuned nginx by a small margin on the low-concurrency latency +regime and matches it under stress. ## Method @@ -31,150 +43,177 @@ separate framework overhead from handler cost: | `GET /json` | returns a five-field JSON via `Response.json` (through `Lean.Json.compress`). | | `POST /echo` | round-trips the request body. Exercises body read + response send. | -Load generator: **Apache Bench (ab)** — universal, one dependency, -same tool on every dev machine. `ab -q -k -c 64 -n 50000` per data -point. `-k` requests HTTP/1.1 keep-alive on the client so we're -measuring per-request cost, not per-TCP-connection cost. +Two load generators: -Server: `bench_server` uses `LeanTea.Net.Server.serveConcurrent`, -which fans every accepted connection out through `IO.asTask`. The -number of Lean task worker threads is controlled by the -`LEAN_NUM_THREADS` environment variable — we vary it across -`{1, 2, 4, 8, 16}` to observe scaling. +* **Apache Bench (ab)** — universal, one dependency. Single-threaded + client, useful for the low-concurrency latency picture. Kept the + same `ab -q -k -c 64 -n 50000` runs so pre/post-FFI numbers stay + comparable. +* **wrk** — multi-threaded, closes the client-side saturation cliff + that ab hits around 70 k RPS. Used for the 512-connection stress + runs against nginx. + +Host: **Apple M-series laptop, 16 cores, 48 GB RAM, macOS 25.5**. + +## Round 4 — non-blocking reactor (kqueue/epoll) + +`LeanTea.Net.ReactorServer.serve port handler`. + +One event-loop thread does everything: `kevent()` / `epoll_wait()` +returns ready fds; the loop drains recv non-blocking into a +per-connection buffer, invokes the Lean callback once a full +request has arrived, sends the response back non-blocking, re-arms +for the next request. Idle keep-alive connections cost ~100 bytes +of C state each — no OS thread per fd. The Lean callback still does +the framework's usual `parseRequest` + user handler + `Response.toBytes` +synchronously on the event thread, which is why heavy handlers should +`IO.asTask` themselves off (same rule as Node.js). + +### wrk, 8 threads, N keep-alive connections, /health + +| server | c=128 RPS | c=512 RPS | c=2000 RPS | +|---------------------|----------:|----------:|-----------:| +| nginx | 69 428 | 70 013 | 61 760 | +| lean-tea reactor | **72 149**| **70 388**| 63 336 | +| lean-tea reactor Δ | +3.9 % | +0.5 % | +2.6 % | + +### All three routes at c=128 + +| route | reactor RPS | p99 (ms) | +|------------------|------------:|---------:| +| GET /health | 72 149 | 2.11 | +| GET /json | 73 089 | 2.15 | +| POST /echo | 74 671 | 2.07 | + +Route symmetry sanity-checks that the codec cost isn't +distorting the picture — `Response.jsonObj` and the echo body +copy don't move the needle at these sizes. + +## Round 3 — POSIX-native FFI + SO_REUSEPORT (thread-per-connection) + +`LEAN_NUM_THREADS=512` for these runs. The FFI server spawns an +`IO.asTask` per accepted connection, and each of those parks in the +kernel on `recv()` while it holds the connection open. With N +concurrent keep-alive connections you need `LEAN_NUM_THREADS >= N` +or connections queue behind each other on the task worker pool. This +is the one operational sharp edge of the design. + +### wrk, 128 keep-alive connections, 10 s + +| server | RPS | avg (ms) | p50 (ms) | p99 (ms) | +|----------------|---------:|---------:|---------:|---------:| +| lean-tea fast | 64 297 | 1.99 | 1.98 | 2.10 | +| nginx | 71 457 | 1.79 | 1.77 | 1.99 | + +lean-tea is running about 700 µs behind nginx per request on this +setup. Most of that is codec: `parseRequest` still allocates ~30 +short strings per request. Replacing it with a picohttpparser-style +zero-alloc parser is the natural next lever. + +### wrk, 512 keep-alive connections, 10 s + +| server | route | RPS | p99 (ms) | +|----------------|----------|---------:|---------:| +| lean-tea fast | /health | 61 449 | 10.50 | +| nginx | /health | 70 368 | 9.18 | +| lean-tea fast | /json | 61 697 | 9.66 | +| nginx | /json | 70 013 | 8.84 | + +The JSON path is essentially free (`Response.jsonObj` on a 5-field +struct); the framework overhead is the same shape as `/health`. Echo +(`POST /echo`) landed at 66 k on ab; same story. + +### ab, 64 keep-alive connections, 50 000 requests, N accept workers + +| workers | RPS | p99 (ms) | +|--------:|-------:|---------:| +| 1 | 67 778 | 2 | +| 2 | 67 490 | 2 | +| 4 | 68 461 | 1 | +| 8 | 67 635 | 2 | +| 16 | 67 205 | 1 | + +Flat across worker counts under this load — the client (ab) is the +bottleneck, not the server. Under wrk's true parallel load we do +still see benefit from multiple workers up through core count. + +## Round 2 — libuv, HTTP/1.1 keep-alive (pre-FFI reference) + +Kept as a reference for the effect of the FFI change. `LEAN_NUM_THREADS=16`, +`ab -k -c 64 -n 50000`. + +| route | RPS | p99 (ms) | +|-------------|---------:|---------:| +| GET /health | 6 218 | 17 | +| GET /json | 6 004 | 19 | +| POST /echo | 5 958 | 19 | + +Round 3 wins ~10× on RPS and 5-10× on p99 tail latency. + +## Round 1 — libuv, no keep-alive + +Left in for archaeology. + +| LEAN_NUM_THREADS | GET /health RPS | +|-----------------:|----------------:| +| 1 | 6 657 | +| 2 | 5 950 | +| 16 | 5 656 | + +Adding threads made it *worse* — task-spawn cost per short-lived +connection exceeded parallelism benefit. Went away in Round 2 with +keep-alive; killed for good in Round 3 with FFI. + +## How to reproduce ```sh -# Reproduce lake build bench_server -./bench/run.sh health "1 2 4 8 16" -./bench/run.sh json "1 2 4 8 16" -./bench/run.sh echo "1 2 4 8 16" -``` -Host: **Apple M-series laptop, 16 cores, 48 GB RAM, macOS 25.5**. +# libuv variant +./bench/run.sh health "1 2 4 8 16" -## Round 2 — with keep-alive - -### GET /health (4-byte response body) - -| LEAN_NUM_THREADS | RPS | p50 (ms) | p99 (ms) | avg (ms) | -|-----------------:|---------:|---------:|---------:|---------:| -| 1 | 1 933 | 31 | 46 | 33.10 | -| 2 | 2 485 | 25 | 38 | 25.75 | -| 4 | 3 420 | 18 | 29 | 18.71 | -| 8 | 4 469 | 14 | 25 | 14.32 | -| 16 | **6 218** | 10 | 17 | 10.29 | - -### GET /json (Response.json with 5 fields) - -| LEAN_NUM_THREADS | RPS | p50 (ms) | p99 (ms) | avg (ms) | -|-----------------:|---------:|---------:|---------:|---------:| -| 1 | 2 006 | 29 | 43 | 31.90 | -| 2 | 2 542 | 24 | 37 | 25.17 | -| 4 | 3 543 | 18 | 27 | 18.07 | -| 8 | 4 414 | 14 | 25 | 14.50 | -| 16 | **6 004** | 10 | 19 | 10.66 | - -### POST /echo (5-byte body, round-tripped) - -| LEAN_NUM_THREADS | RPS | p50 (ms) | p99 (ms) | avg (ms) | -|-----------------:|---------:|---------:|---------:|---------:| -| 1 | 1 976 | 30 | 43 | 32.40 | -| 2 | 2 573 | 24 | 37 | 24.87 | -| 4 | 3 411 | 18 | 29 | 18.77 | -| 8 | 4 420 | 14 | 25 | 14.48 | -| 16 | **5 958** | 10 | 19 | 10.74 | - -## Round 1 — no keep-alive (pre-change reference) - -Kept in case anyone needs to reproduce the old behaviour or wants -to see the effect of the keep-alive change. Every entry here was -one-TCP-connection-per-request. - -### GET /health (4-byte response body) - -| LEAN_NUM_THREADS | RPS | p50 (ms) | p99 (ms) | avg (ms) | -|-----------------:|---------:|---------:|---------:|---------:| -| 1 | **6 657** | 9 | 12 | 9.61 | -| 2 | 5 950 | 11 | 13 | 10.76 | -| 4 | 5 663 | 11 | 15 | 11.30 | -| 8 | 5 717 | 11 | 16 | 11.20 | -| 16 | 5 656 | 11 | 15 | 11.32 | - -Adding threads made RPS *worse* — the task-spawn cost per short-lived -connection was larger than the parallelism benefit. That regression -disappeared once the connection was kept open across requests. - -## What the two rounds mean side by side - -The two extremes measure different things: - -- **Round 1 (no keep-alive)** is dominated by `accept + shutdown` - system calls. Each request pays for a fresh TCP three-way - handshake plus the four-way close. With `LEAN_NUM_THREADS=1` all - connection work happens on a single Lean task worker, no - scheduler coordination, no cross-thread cache thrash — throughput - peaks. Adding threads adds coordination cost without adding - parallelism (the accept thread is still the bottleneck). -- **Round 2 (keep-alive)** amortises TCP setup across many - requests, and now the per-request work is small enough that a - single worker holds many connections' worth of state. `T=1` - serialises 64 in-flight connections onto one worker → 1.9 k RPS. - `T=16` spreads them → 6.2 k RPS. - -Same absolute ceiling in both regimes because the accept loop is -still a single OS thread. Neither round frees us from that. - -## Why the ceiling is at ~6-7 k RPS - -The accept loop looks like this today (`LeanTea/Net/Server.lean`): - -```lean -partial def serveLoopConcurrent (server : Socket.Server) (handler : Handler) - : IO Unit := do - let client ← (server.accept).block - let _ ← IO.asTask (handleConn handler client) - serveLoopConcurrent server handler +# POSIX-native FFI variant +LEAN_NUM_THREADS=512 ./.lake/build/bin/bench_server --port 8090 --fast 8 & +wrk -t8 -c128 -d10s --latency http://127.0.0.1:8090/health ``` -A **single OS thread** calls `accept()` in a loop and hands each -connection to a Lean `Task`. Even with keep-alive, that thread -still has to run the accept syscall for every new connection, and -`ab -k` reuses connections but does open one per concurrency -level. So the ceiling on new-connection rate is set by that lone -thread. - -For a workload that's "many short-lived connections" this makes -the framework CPU-bound on one core. - -## What this means for the front page - -The README's Yesod / Servant framing is about **API ergonomics**, -not throughput, and none of the perf work below invalidates that -framing. But some claims that were on the front page are ambition, -not measured reality: - -- "on par with nginx / on par with wai" — **removed**. Nothing in - this file supports it. -- "pure-Lean HTTP/1.1" — **kept**. Zero external deps, buildable - anywhere Lean builds, tiny binary. The trade is throughput; the - win is deployability + auditability. - -## What we still need to break the ceiling - -- **`SO_REUSEPORT` + one accept loop per worker.** The single-thread - accept bottleneck is the reason we can't go past ~6-7 k RPS. Modern - nginx and Rust's `tokio::listen` scale by binding N listener - sockets to the same port with `SO_REUSEPORT`; the kernel round- - robins accept()s across them. Requires exposing the socket - option through `Std.Net` (not exposed in Lean 4.31 today) and - spawning N accept loops instead of one. -- **Better task placement.** `IO.asTask` chooses which worker - runs the handler; with 64 persistent connections on 16 workers - we'd like each worker to own its share instead of the scheduler - redistributing on every wake. Requires a `spawnOn` variant, or - hand-writing a pool that picks its own connections off a - `Std.Channel`. - -Both are work with real API surface and are staged behind Round 2. -The point of this file is that the front page will stop implying -we can already do that. +For nginx parity, a matching config lives in the top of +`c/leantea_fastnet.c` comment — same `sendfile`, `tcp_nopush`, +`keepalive_requests`, `SO_REUSEPORT`. The main difference is nginx +uses `epoll`/`kqueue`; we still park threads in `accept()`. That's +where the next 10-15 % has to come from. + +## CI regression tracker + +`.github/workflows/bench.yml` runs on every push to `main`: + +1. Boots `bench_server --reactor` and a matching nginx side-by-side + on an `ubuntu-latest` runner. +2. Hits both with `wrk -t8 -c128 -d15s` on `/health` + `/json` + (plus `/echo` on lean-tea). +3. Feeds the results to + [`benchmark-action/github-action-benchmark`](https://github.com/benchmark-action/github-action-benchmark) + which appends to `bench-data/http-bench.json` in the repo. +4. If any run drops below 80 % of the previous best, the job flags + the regression (`fail-on-alert: false` for now — flip once the + trend is boring). + +CI numbers will always trail the M-series numbers above — a +GitHub 4 vCPU runner won't do 70 k RPS on anything — but both +lean-tea and nginx are measured on **the same runner in the same +job**, so the parity ratio (also charted, as `lean-tea/nginx %`) is +what to watch. + +## Next levers (in order of expected gain) + +1. **Zero-alloc HTTP parser (picohttpparser FFI).** `parseRequest` + is now the dominant remaining cost — ~30 String allocations per + request. Replacing it is a ~200 LOC C wrapper and should land + another 5-15 %. +2. **Batch small responses.** `Response.toBytes` builds one + ByteArray then does a final concat with the body. For 4-byte + responses we could pre-serialize the head into a scratch buffer + and issue one `writev`. Diminishing returns — <5 %. +3. **epoll/kqueue on the accept side.** Would remove the + `LEAN_NUM_THREADS >= concurrency` operational constraint. Bigger + refactor; only worth doing if the sharp edge bites in practice. diff --git a/examples/BenchServer/Main.lean b/examples/BenchServer/Main.lean index 87e267f..585e4c0 100644 --- a/examples/BenchServer/Main.lean +++ b/examples/BenchServer/Main.lean @@ -44,10 +44,20 @@ private def handler (req : Request) : IO Response := do private structure Args where port : UInt16 := 8080 host : String := "127.0.0.1" + /-- `--fast N` picks the POSIX-native accept-worker server + (`LeanTea.Net.FastServer`, SO_REUSEPORT-based, N workers). + Zero — the default — keeps the libuv-backed `serveConcurrent` + so we can bench both from the same binary. -/ + fastWorkers : Nat := 0 + /-- `--reactor` picks the epoll/kqueue non-blocking reactor server + (`LeanTea.Net.ReactorServer`). Single event-loop thread. -/ + useReactor : Bool := false private partial def parseArgs : List String → Args → Args | "--port" :: v :: rest, a => parseArgs rest { a with port := (v.toNat?.getD 8080).toUInt16 } | "--host" :: v :: rest, a => parseArgs rest { a with host := v } + | "--fast" :: v :: rest, a => parseArgs rest { a with fastWorkers := v.toNat?.getD 1 } + | "--reactor" :: rest, a => parseArgs rest { a with useReactor := true } | _ :: rest, a => parseArgs rest a | [], a => a @@ -57,4 +67,10 @@ def main (argv : List String) : IO Unit := do IO.eprintln s!" routes: GET /health · GET /json · POST /echo" let nt := (← IO.getEnv "LEAN_NUM_THREADS").getD "(default = ncpu)" IO.eprintln s!" LEAN_NUM_THREADS = {nt}" - serveConcurrent a.port a.host handler + -- CLI flags win over env vars — makes ad-hoc perf runs unambiguous. + let backend : LeanTea.Net.Backend.Backend ← + if a.useReactor then pure .reactor + else if a.fastWorkers > 0 then pure (.fast a.fastWorkers) + else LeanTea.Net.Backend.fromEnv (default := .libuv) + IO.eprintln s!" backend = {repr backend}" + LeanTea.Net.Backend.serve backend a.port a.host handler diff --git a/lakefile.lean b/lakefile.lean index d5b72c4..915aa8e 100644 --- a/lakefile.lean +++ b/lakefile.lean @@ -239,6 +239,47 @@ target leantea_desktop_o pkg : FilePath := do traceArgs := traceArgs.push "-DLEANTEA_HAVE_DESKTOP" buildO oFile srcJob weakArgs traceArgs "cc" +/-! ## Fast-net (SO_REUSEPORT + blocking socket ops) FFI + +Unlike the driver FFIs above this one has **no opt-in flag**: the +primitives are POSIX-standard (`socket`, `setsockopt`, `accept`, +`recv`, `send`) so the build stays portable everywhere Lean itself +builds. `LeanTea.Net.FastServer` wraps them into an N-worker +accept loop that bypasses libuv entirely — see the module doc for +the "why not `Std.Async.TCP`" trade. -/ + +target leantea_fastnet_o pkg : FilePath := do + let oFile := pkg.buildDir / "c" / "leantea_fastnet.o" + let srcJob ← inputTextFile <| pkg.dir / "c" / "leantea_fastnet.c" + let weakArgs := #[ + "-I", (← getLeanIncludeDir).toString + ] + buildO oFile srcJob weakArgs #["-fPIC", "-O2"] "cc" + +extern_lib libleantea_fastnet pkg := do + let name := nameToStaticLib "leantea_fastnet" + let wrapperO ← leantea_fastnet_o.fetch + buildStaticLib (pkg.staticLibDir / name) #[wrapperO] + +/-! ## Reactor (epoll/kqueue) FFI for `LeanTea.Net.ReactorServer`. + +Adds the non-blocking reactor variant used for many-idle-connection +workloads. Same "no opt-in flag" story as the fast-net FFI above — +kqueue on macOS/BSD, epoll on Linux; both are stock POSIX. -/ + +target leantea_reactor_o pkg : FilePath := do + let oFile := pkg.buildDir / "c" / "leantea_reactor.o" + let srcJob ← inputTextFile <| pkg.dir / "c" / "leantea_reactor.c" + let weakArgs := #[ + "-I", (← getLeanIncludeDir).toString + ] + buildO oFile srcJob weakArgs #["-fPIC", "-O2"] "cc" + +extern_lib libleantea_reactor pkg := do + let name := nameToStaticLib "leantea_reactor" + let wrapperO ← leantea_reactor_o.fetch + buildStaticLib (pkg.staticLibDir / name) #[wrapperO] + extern_lib libleantea_desktop pkg := do let name := nameToStaticLib "leantea_desktop" let wrapperO ← leantea_desktop_o.fetch From 5d470919129caf3e481b35102e0ddcbff29542bf Mon Sep 17 00:00:00 2001 From: "junji.hashimoto" Date: Thu, 2 Jul 2026 09:31:21 +0900 Subject: [PATCH 6/6] Add CI HTTP throughput tracker with nginx comparison MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit .github/workflows/bench.yml runs on every push to main: 1. Boots bench_server (LEANTEA_HTTP_BACKEND=reactor) and a matching nginx side-by-side on the same ubuntu-latest runner. 2. Hits both with wrk -t8 -c128 -d15s on /health, /json, /echo. 3. Assembles a customBiggerIsBetter JSON payload that includes both absolute RPS AND the lean-tea/nginx % ratio per route. 4. Feeds it to benchmark-action/github-action-benchmark, which appends to bench-data/http-bench.json and flags anything below 80 % of the previous best. 5. Commits the updated JSON back to main via the action bot. The absolute RPS on a 4-vCPU runner will always trail M-series numbers; the parity ratio is what to trend, since both servers run on the same runner in the same job. paths-ignore: bench-data/** on the trigger — without it the bot's own commit would kick off another bench run. fail-on-alert: false for now; flip once ~20 runs establish the noise floor. --- .github/workflows/bench.yml | 232 ++++++++++++++++++++++++++++++++++++ bench-data/.gitkeep | 2 + 2 files changed, 234 insertions(+) create mode 100644 .github/workflows/bench.yml create mode 100644 bench-data/.gitkeep diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml new file mode 100644 index 0000000..1b0af56 --- /dev/null +++ b/.github/workflows/bench.yml @@ -0,0 +1,232 @@ +# HTTP throughput regression tracker. +# +# On every push to main we boot `bench_server --reactor` and a +# matching nginx side-by-side on the same runner, hit both with wrk, +# and record RPS for /health, /json, /echo. The numbers are pushed +# through `benchmark-action/github-action-benchmark` which appends +# to `bench-data/http-bench.json` and flags regressions past +# `alert-threshold`. +# +# The CI runner (ubuntu-latest, 4 vCPU) will always be slower than +# a dev laptop; that's fine — the goal is trend tracking, not +# absolute RPS. Both lean-tea and nginx run under identical +# conditions on the same runner, so their ratio is meaningful even +# when the absolute number moves. + +name: bench + +on: + push: + branches: [main] + # The bench job commits to bench-data/. Ignoring that path here + # breaks the otherwise-obvious infinite loop of every bench run + # pushing a change that triggers another bench run. + paths-ignore: + - 'bench-data/**' + workflow_dispatch: + +permissions: + deployments: write + contents: write + +concurrency: + group: bench-${{ github.ref }} + cancel-in-progress: false + +jobs: + http-bench: + runs-on: ubuntu-latest + timeout-minutes: 20 + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Elan + run: | + curl https://raw.githubusercontent.com/leanprover/elan/master/elan-init.sh -sSf | sh -s -- -y + echo "$HOME/.elan/bin" >> $GITHUB_PATH + + - name: Install Lean toolchain + run: elan default $(cat lean-toolchain) + + - name: Install nginx + wrk + run: sudo apt-get update && sudo apt-get install -y nginx wrk + + # ---- Build the bench server ---- + + - name: Build bench_server + run: lake build bench_server + + # ---- Configure nginx to serve payloads matching our routes ---- + + - name: Configure nginx (port 9090, /health + /json) + run: | + sudo tee /etc/nginx/sites-available/leanbench.conf > /dev/null << 'CONF' + server { + listen 9090 default_server reuseport backlog=1024; + access_log off; + keepalive_timeout 65; + keepalive_requests 10000; + location /health { + return 200 'OK'; + default_type text/plain; + } + location /json { + return 200 '{"ok":true,"count":42,"name":"lean-tea","status":"green","note":"bench"}'; + default_type application/json; + } + } + CONF + sudo ln -sf /etc/nginx/sites-available/leanbench.conf /etc/nginx/sites-enabled/leanbench.conf + sudo rm -f /etc/nginx/sites-enabled/default + sudo nginx -t + sudo systemctl restart nginx + + # ---- Warm up + sanity check both endpoints ---- + + - name: Start lean-tea reactor server + run: | + # Backend chosen via env var — same pattern application + # code uses through `LeanTea.Net.Backend.fromEnv`. + LEANTEA_HTTP_BACKEND=reactor \ + ./.lake/build/bin/bench_server --port 8090 > /tmp/lean.log 2>&1 & + echo $! > /tmp/lean.pid + # Wait for the socket to bind. + for _ in $(seq 1 30); do + if curl -sf http://127.0.0.1:8090/health >/dev/null; then break; fi + sleep 0.3 + done + curl -sf http://127.0.0.1:8090/health | grep -q OK + curl -sf http://127.0.0.1:9090/health | grep -q OK + echo "both endpoints responsive" + + # ---- Actual bench: three routes × two servers ---- + # + # 8 wrk threads, 128 keep-alive connections, 15 s per route. + # Emits an integer RPS per run. Longer runs give more stable + # numbers on the noisy CI runner. + + - name: Run wrk (lean-tea reactor) — /health /json /echo + id: bench_lean + run: | + set -euo pipefail + bench() { + local url=$1 + local script=$2 + if [ -n "$script" ]; then + wrk -t8 -c128 -d15s -s "$script" "$url" 2>&1 + else + wrk -t8 -c128 -d15s "$url" 2>&1 + fi | awk '/Requests\/sec:/ {print int($2); exit}' + } + cat > /tmp/post_echo.lua << 'LUA' + wrk.method = "POST" + wrk.body = "hello" + wrk.headers["Content-Type"] = "application/octet-stream" + LUA + LEAN_HEALTH=$(bench http://127.0.0.1:8090/health "") + LEAN_JSON=$(bench http://127.0.0.1:8090/json "") + LEAN_ECHO=$(bench http://127.0.0.1:8090/echo /tmp/post_echo.lua) + echo "lean_health=$LEAN_HEALTH" >> $GITHUB_OUTPUT + echo "lean_json=$LEAN_JSON" >> $GITHUB_OUTPUT + echo "lean_echo=$LEAN_ECHO" >> $GITHUB_OUTPUT + echo "-- lean-tea reactor --" + echo " /health: $LEAN_HEALTH RPS" + echo " /json: $LEAN_JSON RPS" + echo " /echo: $LEAN_ECHO RPS" + + - name: Run wrk (nginx) — /health /json (no /echo — nginx has no echo) + id: bench_nginx + run: | + set -euo pipefail + bench() { + wrk -t8 -c128 -d15s "$1" 2>&1 | awk '/Requests\/sec:/ {print int($2); exit}' + } + NGX_HEALTH=$(bench http://127.0.0.1:9090/health) + NGX_JSON=$(bench http://127.0.0.1:9090/json) + echo "nginx_health=$NGX_HEALTH" >> $GITHUB_OUTPUT + echo "nginx_json=$NGX_JSON" >> $GITHUB_OUTPUT + echo "-- nginx --" + echo " /health: $NGX_HEALTH RPS" + echo " /json: $NGX_JSON RPS" + + - name: Assemble bench-data JSON for benchmark-action + run: | + set -euo pipefail + # `customBiggerIsBetter` expects a JSON array of + # {name, unit, value}. We emit one entry per (server, route) + # so github-action-benchmark can chart each independently. + # Ratio entries let us also chart parity vs nginx. + LH=${{ steps.bench_lean.outputs.lean_health }} + LJ=${{ steps.bench_lean.outputs.lean_json }} + LE=${{ steps.bench_lean.outputs.lean_echo }} + NH=${{ steps.bench_nginx.outputs.nginx_health }} + NJ=${{ steps.bench_nginx.outputs.nginx_json }} + python3 - < http-bench-results.json + import json + def pct(a, b): + # Guard against zero nginx (setup failure); avoid ZeroDivisionError. + return int(round(a * 100.0 / b)) if b else 0 + LH, LJ, LE = int("${LH}"), int("${LJ}"), int("${LE}") + NH, NJ = int("${NH}"), int("${NJ}") + data = [ + {"name": "lean-tea reactor /health", "unit": "RPS", "value": LH}, + {"name": "lean-tea reactor /json", "unit": "RPS", "value": LJ}, + {"name": "lean-tea reactor /echo", "unit": "RPS", "value": LE}, + {"name": "nginx /health", "unit": "RPS", "value": NH}, + {"name": "nginx /json", "unit": "RPS", "value": NJ}, + {"name": "lean-tea/nginx /health %", "unit": "%", "value": pct(LH, NH)}, + {"name": "lean-tea/nginx /json %", "unit": "%", "value": pct(LJ, NJ)}, + ] + print(json.dumps(data, indent=2)) + PYEOF + echo "--- http-bench-results.json ---" + cat http-bench-results.json + + # ---- Persist + regression-alert ---- + # + # `benchmark-action/github-action-benchmark@v1` with + # `external-data-json-path` appends this run's entries to a + # local JSON file and compares each against history. + # `alert-threshold: 80%` means: if the current RPS drops below + # 80% of the previous best, the step logs a warning. Set + # `fail-on-alert: true` once the numbers stabilise. + + - name: Store benchmark result (local JSON, tracked in repo) + uses: benchmark-action/github-action-benchmark@v1 + if: github.event_name == 'push' + with: + name: HTTP throughput (lean-tea reactor vs nginx) + tool: customBiggerIsBetter + output-file-path: http-bench-results.json + external-data-json-path: bench-data/http-bench.json + alert-threshold: "80%" + fail-on-alert: false + auto-push: false + # No gh-pages branch — the JSON is committed to main so + # anyone reviewing PRs can eyeball the trend. + + - name: Commit updated bench-data + if: github.event_name == 'push' + run: | + if [ -n "$(git status --porcelain bench-data)" ]; then + git config user.name "github-actions[bot]" + git config user.email "github-actions[bot]@users.noreply.github.com" + git add bench-data/http-bench.json + git commit -m "bench: append http throughput result [skip ci]" + git push + else + echo "no bench-data change to commit" + fi + + # ---- Teardown ---- + + - name: Stop lean-tea server + dump log + if: always() + run: | + if [ -f /tmp/lean.pid ]; then + kill "$(cat /tmp/lean.pid)" 2>/dev/null || true + fi + echo "--- lean-tea server log ---" + cat /tmp/lean.log 2>/dev/null || echo "(no log)" diff --git a/bench-data/.gitkeep b/bench-data/.gitkeep new file mode 100644 index 0000000..7bd5dce --- /dev/null +++ b/bench-data/.gitkeep @@ -0,0 +1,2 @@ +# HTTP throughput history — populated by .github/workflows/bench.yml. +# benchmark-action/github-action-benchmark writes http-bench.json here.