diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 04e8beaf8..b887e4c9c 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -99,7 +99,7 @@ jobs: docker run -d --name ojp-server \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.slowQuerySegregation.enabled=true" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT - name: Wait for ojp-server to start run: sleep 10 @@ -206,7 +206,7 @@ jobs: docker run -d --name ojp-server \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.slowQuerySegregation.enabled=true" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT # Start second OJP server WITH SQL enhancer enabled in OPTIMIZE mode # Tests will run against this server via postgres_connection.csv (port 10593) @@ -218,7 +218,7 @@ jobs: docker run -d --name ojp-server-enhancer \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.port=10593 -Dojp.prometheus.port=9163 -Dojp.server.slowQuerySegregation.enabled=true -Dojp.sql.enhancer.enabled=true -Dojp.sql.enhancer.mode=OPTIMIZE -Dojp.sql.enhancer.dialect=POSTGRESQL" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT - name: Wait for ojp-server to start run: sleep 10 @@ -342,7 +342,7 @@ jobs: docker run -d --name ojp-server \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.slowQuerySegregation.enabled=true" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT - name: Wait for ojp-server to start run: sleep 10 @@ -438,7 +438,7 @@ jobs: docker run -d --name ojp-server \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.slowQuerySegregation.enabled=true" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT - name: Wait for ojp-server to start run: sleep 10 @@ -525,7 +525,7 @@ jobs: docker run -d --name ojp-server \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.slowQuerySegregation.enabled=true" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT - name: Wait for ojp-server to start run: sleep 10 @@ -693,7 +693,7 @@ jobs: docker run -d --name ojp-server \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.slowQuerySegregation.enabled=true" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT - name: Wait for ojp-server to start run: sleep 10 @@ -795,7 +795,7 @@ jobs: docker run -d --name ojp-server-1 \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.port=10591 -Dojp.prometheus.port=9159 -Dojp.server.slowQuerySegregation.enabled=true" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT echo "Started OJP Server 1 container on gRPC port 10591, Prometheus port 9159" # Start second OJP server instance on port 10592 @@ -804,7 +804,7 @@ jobs: docker run -d --name ojp-server-2 \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.port=10592 -Dojp.prometheus.port=9160 -Dojp.server.slowQuerySegregation.enabled=true" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT echo "Started OJP Server 2 container on gRPC port 10592, Prometheus port 9160" - name: Wait for OJP servers to start @@ -913,7 +913,7 @@ jobs: docker run -d --name ojp-server-1 \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.port=10591 -Dojp.prometheus.port=9159 -Dojp.server.slowQuerySegregation.enabled=true" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT echo "OJP Server 1 container restarted" sleep 10 @@ -998,7 +998,7 @@ jobs: docker run -d --name ojp-server-2 \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.port=10592 -Dojp.prometheus.port=9160 -Dojp.server.slowQuerySegregation.enabled=true" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT echo "OJP Server 2 container restarted" sleep 10 @@ -1199,7 +1199,7 @@ jobs: docker run -d --name ojp-server-xa-1 \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.port=10591 -Dojp.prometheus.port=9161 -Dojp.server.slowQuerySegregation.enabled=true -Dorg.slf4j.simpleLogger.log.org.openjproxy.xa.pool=DEBUG" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT echo "Started OJP Server 1 container on gRPC port 10591, Prometheus port 9161 with DEBUG logging" # Start second OJP server instance on port 10592 with DEBUG logging @@ -1209,7 +1209,7 @@ jobs: docker run -d --name ojp-server-xa-2 \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.port=10592 -Dojp.prometheus.port=9162 -Dojp.server.slowQuerySegregation.enabled=true -Dorg.slf4j.simpleLogger.log.org.openjproxy.xa.pool=DEBUG" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT echo "Started OJP Server 2 container on gRPC port 10592, Prometheus port 9162 with DEBUG logging" - name: Wait for OJP servers to start @@ -1329,7 +1329,7 @@ jobs: docker run -d --name ojp-server-xa-1 \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.port=10591 -Dojp.prometheus.port=9161 -Dojp.server.slowQuerySegregation.enabled=true" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT echo "OJP Server 1 container restarted" # Wait for server to fully start @@ -1432,7 +1432,7 @@ jobs: docker run -d --name ojp-server-xa-2 \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.port=10592 -Dojp.prometheus.port=9162 -Dojp.server.slowQuerySegregation.enabled=true" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT echo "OJP Server 2 container restarted" # Wait for server to fully start @@ -1699,7 +1699,7 @@ jobs: docker run -d --name ojp-server \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.slowQuerySegregation.enabled=true" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT - name: Wait for ojp-server to start run: sleep 10 @@ -1802,7 +1802,7 @@ jobs: docker run -d --name ojp-server \ --network host \ -e JAVA_TOOL_OPTIONS="-Dojp.server.slowQuerySegregation.enabled=true" \ - rrobetti/ojp:0.4.17-SNAPSHOT + rrobetti/ojp:0.4.18-SNAPSHOT - name: Wait for ojp-server to start run: sleep 10 diff --git a/documents/analysis/CLIENT_REACTIVE_THROTTLING_ANALYSIS.md b/documents/analysis/CLIENT_REACTIVE_THROTTLING_ANALYSIS.md new file mode 100644 index 000000000..1c3e77296 --- /dev/null +++ b/documents/analysis/CLIENT_REACTIVE_THROTTLING_ANALYSIS.md @@ -0,0 +1,364 @@ +# OJP Client-Side Throttling — Design Notes + +## Why Client-Side Throttling? + +When the OJP server's admission control is under pressure — specifically when requests +wait longer than the configured `connectionTimeout` and result in an **admission timeout** +— the server is already struggling. Every new request that arrives at that moment makes +the situation worse: it extends the wait queue, competes with requests already waiting, +and can cascade into more timeouts. + +Client-side throttling prevents this from happening in the first place. Instead of letting +every client thread fire requests until the server starts rejecting them, each client +limits its own concurrent in-flight count to its fair share of the server's real capacity. +Shorter queues mean lower admission-wait latency, fewer timeouts, and no thundering-herd +spikes when load increases suddenly. + +**Without client throttling (problem):** 80 threads across 8 app servers hit one OJP node +at once (example node configured with a 40-slot pool) → the pool is overwhelmed → many requests queue or timeout → the +server's situation gets worse, not better. + +**With client throttling (solution):** each app server caps itself at `ceil(40/8)×1×0.9 ≈ 4` +concurrent requests → at most ~32 in-flight across the cluster → the pool stays within +capacity, admission timeouts stop, and throughput is maintained. + +## Simple Flow (with examples) + +```mermaid +flowchart TD + A["Connect to OJP server"] --> B["Driver receives SessionInfo: maxAdmission, clientCount, observedPeak"] + B --> C["Compute local limit (fair share with 10% safety margin)"] + C --> D{"Before each SQL call, is inFlight < limit?"} + D -- Yes --> E[Send request to server] + D -- No --> F["Reject locally: SQLTransientException"] + E --> G{"Did server return RESOURCE_EXHAUSTED?"} + G -- No --> H["Release slot and continue"] + G -- Yes --> I["Halve reactive limit (AIMD decrease)"] + I --> F +``` + +**Example 1 (normal):** +- Server says `maxAdmission=40`, `clientCount=8`, `numServers=2` +- Limit = `ceil(40/8) × 2 × 0.9 = 9` +- Each client sends up to 9 concurrent requests; pressure stays controlled. + +**Example 2 (overload signal):** +- A request gets `RESOURCE_EXHAUSTED` from server. +- Driver halves reactive limit (for example `40 → 20`). +- Next bursts are rejected in the client first, so fewer requests hit an overloaded server. + +--- + +## Glossary + +| Acronym / Term | Full name | Meaning in this document | +|---|---|---| +| **AIMD** | Additive Increase, Multiplicative Decrease | Congestion-control algorithm borrowed from TCP. When overload is detected (admission timeout), the limit drops sharply (multiplicative decrease). As successful operations accumulate, the limit grows back one step at a time (additive increase). Prevents thundering-herd bursts while recovering capacity gradually. | +| **CAS** | Compare-And-Swap | A CPU-level atomic instruction used by `AtomicInteger.incrementAndGet()`. Reads a value, computes the new value, and writes it back only if the original value is unchanged — all in one atomic step. Allows safe concurrent updates without locks. | +| **connHash** | Connection hash | A hash of the JDBC URL + username + password that uniquely identifies a datasource/credential pair. All JDBC connections with the same URL and credentials share the same `connHash`. | +| **clientUUID** | Client universally unique identifier | A random identifier generated once per JVM process by the OJP driver. Used by the server to count distinct application instances (JVMs) connected to a given `connHash`, as opposed to counting raw connection objects. | +| **JDBC** | Java Database Connectivity | The standard Java API for connecting to relational databases. OJP implements this API so applications can use it as a drop-in driver. | +| **JVM** | Java Virtual Machine | The runtime that executes Java bytecode. One JVM = one application process. Each JVM has one `clientUUID`. | +| **OLTP** | Online Transaction Processing | Database workloads characterised by many short, fast queries (select/insert/update), as opposed to long analytical queries. The default SQS parameters are tuned for OLTP. | +| **OJP** | Open J Proxy | This project. A JDBC Type 3 proxy driver that forwards JDBC calls over gRPC to a central server which owns the real database connection pools. | +| **SQS** | Slow Query Segregation | An OJP server feature that routes long-running SQL queries to a dedicated "slow lane" pool, preventing them from starving fast queries waiting for connection slots. | +| **TCP CWND** | TCP Congestion Window | TCP's built-in mechanism for avoiding network overload: grow the send window aggressively until packet loss occurs (slow start), then halve it (multiplicative decrease), then grow by one segment per round-trip (additive increase). `observedPeak` follows the same pattern applied to OJP admission slots instead of bytes. | +| **gRPC** | Google Remote Procedure Call | The RPC framework used for communication between the OJP driver and server. Runs over HTTP/2. | + +--- + +## Current Design: Three Configurable Modes + +All three modes use the same `AtomicInteger` fail-fast counter in the driver (no blocking, +no Semaphore) and AIMD step-limited increase. They differ only in where the limit comes from. + +### Protocol additions to `SessionInfo` + +```proto +int32 clientCount = 9; // distinct JVMs (clientUUID) connected to this connHash on this node +int32 maxAdmission = 10; // per-node HikariCP pool size (confirmed: SlotManager.totalSlots = actualPoolSize) +int32 observedPeak = 11; // adaptive effective capacity (0 = no failure observed yet) +``` + +`clusterHealth` already carries the UP/DOWN node list; `numOjpServers` is derived from it. + +--- + +### Proactive Mode + +Throttles from the very first `connect()` using the fair-share formula: + +```java +int effectiveAdmission = (observedPeak > 0) ? observedPeak : maxAdmission; +int rawLimit = (int) Math.ceil((double) effectiveAdmission / clientCount) * numOjpServers; +int limit = Math.max(1, (int)(rawLimit * 0.9)); // 10% safety headroom +``` + +**Why ceiling division + 10% headroom:** +Floor division permanently wastes capacity (`floor(20/7)=2` leaves 6 slots idle). +Ceiling slightly over-allocates (`ceil(20/7)=3`, total=21 vs capacity=20), so the 10% +reduction absorbs one stale `clientCount` error. + +**Why 10%, not more:** Absorbs exactly one missed client join/leave cycle at typical +client counts without meaningfully reducing steady-state throughput. + +**Concurrency control — `AtomicInteger` counter:** + +```java +AtomicInteger inFlight = new AtomicInteger(0); +volatile int limit; // single volatile write on every SessionInfo response + +// Acquire (non-blocking, fail-fast): +if (inFlight.incrementAndGet() > limit) { + inFlight.decrementAndGet(); + throw new SQLException("Client throttle limit exceeded: " + connHash); +} +// Release (always in finally): +inFlight.decrementAndGet(); +``` + +Zero overhead on the happy path. Resizing is one volatile write. + +**AIMD step-limited increase:** +- Decrease: apply immediately (fast overload response). +- Increase: `limit = min(newLimit, currentLimit + 1)` per `SessionInfo` update. + +Rationale: when 4 out of 8 clients disconnect simultaneously, every remaining client +would burst to the new higher limit at once. Step-limited increase prevents this spike. +Under normal query load, `SessionInfo` arrives every few ms; convergence takes seconds. + +**In-transaction bypass:** +When `autoCommit == false`, subsequent statements on that connection skip the `inFlight` +check. Without this, a thread holding an open transaction can block waiting for a permit +while other threads consume all permits, causing the server's transaction timeout to fire +before the thread ever sends its next statement (deadlock-by-timeout). + +**Cross-node `clientCount` caveat (v1 documented limitation):** +Each node counts only its own connected clients. In a 2-node cluster where App1 connects +to Node A and App2 to Node B, both nodes report `clientCount = 1`, so both clients +compute `(10/1)*2 = 20` permits — against a real cluster capacity of 20. The formula +over-allocates. The server's own `SlotManager` is the final safety gate. Fix deferred +to v2 (cross-node count sharing). + +--- + +### Reactive Mode (`observedPeak`) + +Instead of using the static configured pool size, the server tracks the actual peak +in-flight count just before an admission timeout occurred, and sends it as `observedPeak`. + +**How it works (TCP CWND analogy — shrink on loss, grow slowly on clean delivery):** + +Server changes in `SlotManager`: +```java +// On wait-timeout: snap observedPeak down to current active count, with 10% floor +int currentActive = activeFastOperations.get() + activeSlowOperations.get(); +int floor = Math.max(1, (int)(totalSlots * 0.1)); +observedPeak.updateAndGet(prev -> Math.max(floor, Math.min(prev, currentActive))); + +// AIMD recovery: every totalSlots*2 successful releases, increment by 1 +if (successCount.incrementAndGet() % (totalSlots * 2) == 0) { + observedPeak.updateAndGet(prev -> Math.min(totalSlots, prev + 1)); +} +``` + +Initialized to `totalSlots`. Sent as `observedPeak` in `SessionInfo`. +Driver uses it in the proactive formula in place of `maxAdmission`. + +**Key risks and mitigations:** + +| Risk | Mitigation | +|---|---| +| "False floor": single slow query fires timeout at low in-flight count → `observedPeak` collapses | 10% floor (`max(1, totalSlots * 0.1)`) | +| Recovery too slow → server underutilized | K configurable via `ojp.server.admissionControl.observedPeakRecoveryFactor`; default `totalSlots × 2` | +| Recovery too fast → burst risk | AIMD additive increase (+1 per cycle) is inherently slow | +| SQS interaction: slow-lane timeout ≠ total overload | Use `activeSlow + activeFast` total, not per-lane counts | +| Concurrent timeout races updating `observedPeak` | Pre-snapshot `currentActive` before CAS; `updateAndGet` handles the loop | +| Only semaphore wait-timeout is a clean signal | Queue-depth rejections should **not** update `observedPeak` (fires before wait, may reflect low `currentActive` unrelated to capacity) | + +--- + +### Combined Mode (Recommended) + +```java +int effectiveLimit = Math.min(proactiveLimit, reactiveLimit); +``` + +Proactive ensures fair share from day 1. Reactive tightens the limit when the DB +is actually struggling below its configured pool size. Neither alone is sufficient. + +--- + +### Resolved Decisions + +**Q1 — Default configuration:** `combined` mode on by default. +Property `ojp.jdbc.clientThrottle.mode` defaults to `combined`. +Options: `off`, `proactive`, `reactive`, `combined`. + +**Q2 — `clientCount` tracking:** Yes, track distinct `clientUUID` values per `connHash`. +Server maintains `ConcurrentHashMap>` updated on every +session create/terminate. Overhead is acceptable. + +**Q3 — `observedPeak = 0` sentinel:** `0 = uninitialised` is sufficient. When the server +sends `0`, the driver treats reactive limit as unconstrained (`Integer.MAX_VALUE`). + +**Q4 — Reactive-only fairness:** Reactive-only mode is valid for operators who only care +about not overloading the DB and do not need fairness between clients. No-fairness caveat +is documented. Combined mode (default) provides both fairness and adaptive protection. + +--- + +## Summary + +| Dimension | Proactive | Reactive (`observedPeak`) | Combined | +|---|---|---|---| +| Server changes | `clientCount` + `maxAdmission` in `SessionInfo` | `observedPeak` + server AIMD in `SlotManager` | Both | +| Activation | First `connect()` | After first admission timeout | First `connect()`, adapts on timeout | +| Limit source | Static config | Observed runtime capacity | `min(proactive, reactive)` | +| Adapts to DB degradation | No | Yes | Yes | +| Fairness between clients | Guaranteed | Not guaranteed | Guaranteed | +| Collapse risk | Low | Mitigated by 10% floor | Low | +| Implementation complexity | Medium | Medium | Medium–High | +| Recommended | Yes | Yes (adaptive environments) | Yes (best overall) | + +--- + +## Using Client Throttling Together with Slow Query Segregation (SQS) + +Both features address database protection but at different layers: +- **SQS** (server-side) isolates long-running queries into a dedicated slow lane so they + cannot starve fast queries waiting for pool slots. +- **Client throttling** (driver-side) limits how many concurrent requests each application + instance sends to the server in the first place. + +They are complementary and work well together. The key interactions are described below. + +### How the two features interact + +**`observedPeak` and SQS classification modes:** + +`observedPeak` drops when any lane's admission semaphore times out +(`activeFastOperations + activeSlowOperations` total). This total is correct and independent +of which lane fired the timeout. + +With the new `RELATIVE_FAST_BASELINE` mode (the default), the slow-query threshold is set +dynamically at `slowMultiplier × fast-lane baseline` (default 5×). During startup and +workload transitions, the `minSamples` window (default 20 samples) must fill before +classification begins. Until then, **all** queries compete for fast slots. If the fast lane +becomes temporarily saturated during that warm-up window, an admission timeout will cause +`observedPeak` to snap down — even though the server is not truly overloaded. + +With `ABSOLUTE_THRESHOLD` mode, the threshold is a fixed millisecond value +(`slowQueryThresholdMs`, default 1000 ms). This produces stable, predictable classification +and a more stable `observedPeak` signal. + +**`maxAdmission` and SQS slot split:** + +When SQS is inactive (`slowSlotPercentage = 0`), `maxAdmission = SlotManager.totalSlots` = +full HikariCP pool size. +When SQS is active (`slowSlotPercentage > 0`, default 20%), fast queries compete only for the +fast-lane slots (80% of the pool). `ConnectAction` therefore sends +`maxAdmission = SlotManager.fastSlots` so the client's proactive limit is based on the actual +fast-lane capacity rather than the full pool. Without this correction the 90% safety margin +(proactive formula uses `0.9 × maxAdmission / clientCount`) could exceed the fast slot quota, +causing immediate admission timeouts. + +> **Bug fixed (2026-05):** An earlier version incorrectly sent `maxAdmission = totalSlots` +> regardless of the SQS split, causing `proactiveLimit` to exceed `fastSlots` and triggering +> repeated fast-lane timeouts. `ConnectAction` now sends `fastSlots` when `slowSlots > 0`. + +A second fix in the same release corrected `ClientThrottleManager.updateFromSessionInfo()`: +it now skips the update (instead of resetting `reactiveLimit` to `MAX_VALUE`) when +`maxAdmission = 0` in the incoming `SessionInfo`. `executeUpdate` and `executeQuery` +responses carry a minimal `SessionInfo` with `maxAdmission = 0`; the previous reset silently +undid every `notifyServerOverload()` adjustment on the first successful SQL response. + +### Recommendations when running both features together + +**1. Use `combined` throttle mode (default) — always correct with or without SQS.** +`effectiveLimit = min(proactiveLimit, reactiveLimit)` ensures both fairness between clients +and adaptive protection when the DB actually degrades. + +**2. Prefer `RELATIVE_FAST_BASELINE` mode for dynamic workloads.** +It adapts to workload shape without manual tuning. The default parameters +(`slowMultiplier=5.0`, `recoveryMultiplier=3.0`, `minSamples=20`, `baselinePercentile=50`, +`baselineRefreshIntervalSeconds=10`) are well-calibrated for typical OLTP workloads. +The 10% floor on `observedPeak` (from `MIN_OBSERVED_PEAK_RATIO`) prevents the client +budget from collapsing to zero during the warm-up window. + +**3. Use `ABSOLUTE_THRESHOLD` mode for stable, well-characterised workloads.** +If you know your SLA boundary (e.g., anything over 500 ms is slow), `ABSOLUTE_THRESHOLD` +gives a stable classification signal that keeps `observedPeak` from fluctuating during +load transitions. Set `ojp.server.slowQuerySegregation.slowQueryThresholdMs` accordingly. + +**4. On startup, expect a brief conservative period.** +Under `RELATIVE_FAST_BASELINE`, the baseline does not exist until `minSamples` operations +have completed. During those first N queries, everything runs in the fast lane. If you have +a large initial burst, `observedPeak` may momentarily dip. It will recover via AIMD +(+1 per `totalSlots × 2` releases). The 10% floor ensures clients are never throttled +to zero. + +**5. `slowSlotPercentage` tuning.** +A higher slow-slot percentage (e.g., 30%) reduces the risk of fast-lane saturation but +slightly under-utilises the pool when all queries are fast. For typical mixed OLTP+analytics +workloads, 20% (default) is appropriate. With client throttling active, each client already +limits its total in-flight count, so a 20% reservation provides ample isolation. + +**Example configuration — mixed workload with both features:** + +```properties +# Slow Query Segregation +ojp.server.slowQuerySegregation.enabled=true +ojp.server.slowQuerySegregation.classificationMode=RELATIVE_FAST_BASELINE +ojp.server.slowQuerySegregation.slowMultiplier=5.0 +ojp.server.slowQuerySegregation.recoveryMultiplier=3.0 +ojp.server.slowQuerySegregation.minSamples=20 +ojp.server.slowQuerySegregation.baselinePercentile=50 +ojp.server.slowQuerySegregation.baselineRefreshIntervalSeconds=10 + +# Client throttling (driver-side) +ojp.jdbc.clientThrottle.mode=combined # default — no change needed +``` + +**Example configuration — predictable workload (analytics batch + OLTP):** + +```properties +ojp.server.slowQuerySegregation.enabled=true +ojp.server.slowQuerySegregation.classificationMode=ABSOLUTE_THRESHOLD +ojp.server.slowQuerySegregation.slowQueryThresholdMs=800 +ojp.server.slowQuerySegregation.slowSlotPercentage=30 # larger slow lane for analytics + +ojp.jdbc.clientThrottle.mode=combined +``` + +--- + +## Dropped Approaches + +### Purely client-reactive (no server changes) +Driver observes `RESOURCE_EXHAUSTED` / `ServerOverloadException` and activates a local +semaphore after N consecutive rejections. + +**Why dropped:** Incomplete server state view, hard deactivation (requires probe logic or +fixed cooldown), flapping between throttled/unthrottled states, no fairness between clients. +Server already sends the signals but the driver cannot infer the right limit from them. +Superseded by the server-cooperative approach which sends the limit explicitly. + +### `java.util.concurrent.Semaphore` for concurrency control +**Why dropped:** No `setPermits()` — resizing requires draining and re-injecting permits. +Complex, race-prone, high overhead. Replaced by `AtomicInteger` counter + `volatile int limit`. + +### Floor division (`maxAdmission / clientCount`) +**Why dropped:** Permanently wastes capacity. `floor(20/7) = 2`, 6 slots idle even +at full load. Replaced by ceiling division + 10% safety headroom. + +### gRPC built-in controls +Three native gRPC-Java mechanisms were considered: + +| Mechanism | Why dropped | +|---|---| +| `NettyServerBuilder.maxConcurrentCallsPerConnection(N)` | Caps concurrent HTTP/2 streams per TCP connection (global), not per `connHash`/datasource. No per-resource fairness, no AIMD, client receives an abrupt `RST_STREAM` rather than a structured `SQLException`. | +| HTTP/2 flow control (transport byte windows) | Controls bytes in flight, not request count. Not applicable for "max N concurrent SQL executions per datasource". | +| `ClientInterceptor` wrapping every `newCall` | This is exactly what `ClientThrottleManager` does — gRPC provides the hook but not the pre-built implementation. There is no off-the-shelf gRPC interceptor that is informed by server-side `SessionInfo` signals (`maxAdmission`, `observedPeak`, `clientCount`). | + +All three lack the ability to scope the limit to a specific `connHash` and to incorporate the server's actual admission capacity into the client-side budget. The `ClientThrottleManager` approach was chosen because it uses the gRPC-recommended interceptor pattern while adding per-datasource, server-cooperative logic. diff --git a/documents/configuration/ojp-jdbc-configuration.md b/documents/configuration/ojp-jdbc-configuration.md index 4f64515e3..559c3b9f0 100644 --- a/documents/configuration/ojp-jdbc-configuration.md +++ b/documents/configuration/ojp-jdbc-configuration.md @@ -327,6 +327,7 @@ These examples demonstrate recommended settings for each environment and can be | Property | Type | Default | Description | Since | |----------|------|---------|-------------|-------| | `ojp.jdbc.connection.close.synchronous` | boolean | true | Controls `Connection.close()` behavior. `true` = close waits for terminate-session RPC (default), `false` = async close. | 0.4.2-beta | +| `ojp.jdbc.clientThrottle.mode` | string | `combined` | Client-side throttling mode. Controls how the driver limits concurrent in-flight requests per application instance. `off` = disabled, `proactive` = static fair-share only, `reactive` = adaptive `observedPeak` only, `combined` = min(proactive, reactive) — recommended. See [Chapter 8a: Client-Side Throttling](../ebook/part3-chapter8a-client-throttling.md). | 0.5.0-beta | ### Programmatic Configuration via `DriverManager.getConnection()` diff --git a/documents/configuration/ojp-server-configuration.md b/documents/configuration/ojp-server-configuration.md index 4a500cff5..54b793726 100644 --- a/documents/configuration/ojp-server-configuration.md +++ b/documents/configuration/ojp-server-configuration.md @@ -174,14 +174,14 @@ Controls how the server batches rows into gRPC streaming messages when returning | `ojp.server.slowQuerySegregation.idleTimeout` | `OJP_SERVER_SLOWQUERYSEGREGATION_IDLETIMEOUT` | long | 10000 | Idle timeout for slot borrowing (milliseconds) | 0.2.0-beta | | `ojp.server.slowQuerySegregation.slowSlotTimeout` | `OJP_SERVER_SLOWQUERYSEGREGATION_SLOWSLOTTIMEOUT` | long | 120000 | Slow-lane slot wait timeout (ms). When slow query segregation is enabled, this setting takes precedence. | 0.2.0-beta | | `ojp.server.slowQuerySegregation.fastSlotTimeout` | `OJP_SERVER_SLOWQUERYSEGREGATION_FASTSLOTTIMEOUT` | long | 60000 | Fast-lane slot wait timeout (ms). When slow query segregation is enabled, this setting takes precedence. | 0.2.0-beta | -| `ojp.server.slowQuerySegregation.classificationMode` | `OJP_SERVER_SLOWQUERYSEGREGATION_CLASSIFICATIONMODE` | enum (`RELATIVE_FAST_BASELINE` / `ABSOLUTE_THRESHOLD`) | `RELATIVE_FAST_BASELINE` | Slow-query classification strategy. `RELATIVE_FAST_BASELINE` is the default adaptive mode. | 0.4.17-SNAPSHOT | -| `ojp.server.slowQuerySegregation.slowQueryThresholdMs` | `OJP_SERVER_SLOWQUERYSEGREGATION_SLOWQUERYTHRESHOLDMS` | long | 1000 | Deterministic slow-query threshold in milliseconds used by `ABSOLUTE_THRESHOLD` mode. | 0.4.17-SNAPSHOT | -| `ojp.server.slowQuerySegregation.minimumSlowQueryMs` | `OJP_SERVER_SLOWQUERYSEGREGATION_MINIMUMSLOWQUERYMS` | long | 100 | Minimum operation average in milliseconds required before entering slow classification in relative mode. | 0.4.17-SNAPSHOT | -| `ojp.server.slowQuerySegregation.slowMultiplier` | `OJP_SERVER_SLOWQUERYSEGREGATION_SLOWMULTIPLIER` | double | 5.0 | Relative-mode multiplier against fast baseline required to enter slow classification. | 0.4.17-SNAPSHOT | -| `ojp.server.slowQuerySegregation.recoveryMultiplier` | `OJP_SERVER_SLOWQUERYSEGREGATION_RECOVERYMULTIPLIER` | double | 3.0 | Relative-mode multiplier against fast baseline for recovering from slow to fast. Must be less than `slowMultiplier`. | 0.4.17-SNAPSHOT | -| `ojp.server.slowQuerySegregation.minSamples` | `OJP_SERVER_SLOWQUERYSEGREGATION_MINSAMPLES` | int | 20 | Minimum per-query-shape sample count required before classification. | 0.4.17-SNAPSHOT | -| `ojp.server.slowQuerySegregation.baselinePercentile` | `OJP_SERVER_SLOWQUERYSEGREGATION_BASELINEPERCENTILE` | int | 50 | Percentile used to compute fast baseline from currently-fast query-shape averages (1-99). | 0.4.17-SNAPSHOT | -| `ojp.server.slowQuerySegregation.baselineRefreshIntervalSeconds` | `OJP_SERVER_SLOWQUERYSEGREGATION_BASELINEREFRESHINTERVALSECONDS` | long | 10 | Interval for refreshing cached fast baseline (seconds). `0` recomputes baseline on each classification check. | 0.4.17-SNAPSHOT | +| `ojp.server.slowQuerySegregation.classificationMode` | `OJP_SERVER_SLOWQUERYSEGREGATION_CLASSIFICATIONMODE` | enum (`RELATIVE_FAST_BASELINE` / `ABSOLUTE_THRESHOLD`) | `RELATIVE_FAST_BASELINE` | Slow-query classification strategy. `RELATIVE_FAST_BASELINE` is the default adaptive mode. | 0.4.18-SNAPSHOT | +| `ojp.server.slowQuerySegregation.slowQueryThresholdMs` | `OJP_SERVER_SLOWQUERYSEGREGATION_SLOWQUERYTHRESHOLDMS` | long | 1000 | Deterministic slow-query threshold in milliseconds used by `ABSOLUTE_THRESHOLD` mode. | 0.4.18-SNAPSHOT | +| `ojp.server.slowQuerySegregation.minimumSlowQueryMs` | `OJP_SERVER_SLOWQUERYSEGREGATION_MINIMUMSLOWQUERYMS` | long | 100 | Minimum operation average in milliseconds required before entering slow classification in relative mode. | 0.4.18-SNAPSHOT | +| `ojp.server.slowQuerySegregation.slowMultiplier` | `OJP_SERVER_SLOWQUERYSEGREGATION_SLOWMULTIPLIER` | double | 5.0 | Relative-mode multiplier against fast baseline required to enter slow classification. | 0.4.18-SNAPSHOT | +| `ojp.server.slowQuerySegregation.recoveryMultiplier` | `OJP_SERVER_SLOWQUERYSEGREGATION_RECOVERYMULTIPLIER` | double | 3.0 | Relative-mode multiplier against fast baseline for recovering from slow to fast. Must be less than `slowMultiplier`. | 0.4.18-SNAPSHOT | +| `ojp.server.slowQuerySegregation.minSamples` | `OJP_SERVER_SLOWQUERYSEGREGATION_MINSAMPLES` | int | 20 | Minimum per-query-shape sample count required before classification. | 0.4.18-SNAPSHOT | +| `ojp.server.slowQuerySegregation.baselinePercentile` | `OJP_SERVER_SLOWQUERYSEGREGATION_BASELINEPERCENTILE` | int | 50 | Percentile used to compute fast baseline from currently-fast query-shape averages (1-99). | 0.4.18-SNAPSHOT | +| `ojp.server.slowQuerySegregation.baselineRefreshIntervalSeconds` | `OJP_SERVER_SLOWQUERYSEGREGATION_BASELINEREFRESHINTERVALSECONDS` | long | 10 | Interval for refreshing cached fast baseline (seconds). `0` recomputes baseline on each classification check. | 0.4.18-SNAPSHOT | | `ojp.server.admissionControl.maxQueueDepth` | `OJP_SERVER_ADMISSIONCONTROL_MAXQUEUEDEPTH` | int | 0 | Max admission waiters before fail-fast overload (0 = auto as `totalSlots × 2` per semaphore; `totalSlots` is the pool slot count used by admission control) | 0.4.16-SNAPSHOT | ### SQL Enhancer and Schema Loader Settings diff --git a/documents/ebook/README.md b/documents/ebook/README.md index acebb7fef..e7d42726f 100644 --- a/documents/ebook/README.md +++ b/documents/ebook/README.md @@ -6,9 +6,9 @@ This comprehensive e-book provides complete documentation for Open-J-Proxy (OJP) ## Content Overview -**Total Content**: 920KB across 24 chapters + 7 appendices -**Visual Assets**: 237 AI-ready image prompts, 75 Mermaid diagrams -**Completion**: 100% (all 24 chapters written) +**Total Content**: ~930KB across 25 chapters + 7 appendices +**Visual Assets**: 237 AI-ready image prompts, 82 Mermaid diagrams +**Completion**: 100% (all 25 chapters written) ## Documentation Version @@ -34,8 +34,9 @@ Message: feat: Spring Boot starter module for zero-config OJP autoconfiguration - [Chapter 6: Server Configuration](part2-chapter6-server-configuration.md) - [Chapter 7: Framework Integration](part2-chapter7-framework-integration.md) -### Part III: Advanced Features (6 chapters) +### Part III: Advanced Features (7 chapters) - [Chapter 8: Slow Query Segregation](part3-chapter8-slow-query-segregation.md) +- [Chapter 8a: Client-Side Throttling](part3-chapter8a-client-throttling.md) - [Chapter 9: Multinode Deployment](part3-chapter9-multinode-deployment.md) - [Chapter 10: XA Distributed Transactions](part3-chapter10-xa-transactions.md) - [Chapter 11: Security & Network Architecture](part3-chapter11-security.md) @@ -88,10 +89,11 @@ Start with Chapter 3 (Quick Start Guide) for immediate hands-on experience, then ### For Advanced Users 1. Chapter 8: Slow Query Segregation -2. Chapter 9: Multinode Deployment -3. Chapter 10: XA Distributed Transactions -4. Chapter 12: Connection Pool Provider SPI (SQL Enhancer, Pool Housekeeping) -5. Chapter 12a: Query Result Caching +2. Chapter 8a: Client-Side Throttling +3. Chapter 9: Multinode Deployment +4. Chapter 10: XA Distributed Transactions +5. Chapter 12: Connection Pool Provider SPI (SQL Enhancer, Pool Housekeeping) +6. Chapter 12a: Query Result Caching ### For Contributors 1. Chapter 15: Development Environment Setup diff --git a/documents/ebook/part2-chapter5-jdbc-configuration.md b/documents/ebook/part2-chapter5-jdbc-configuration.md index 7115a9006..2d9a3d427 100644 --- a/documents/ebook/part2-chapter5-jdbc-configuration.md +++ b/documents/ebook/part2-chapter5-jdbc-configuration.md @@ -834,6 +834,61 @@ connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); --- +## 5.5 Client-Side Throttling + +When multiple application instances share the same OJP server, each instance should limit +its own concurrent request count to its fair share of the server's connection pool. Without +this, a burst of requests from one instance can overwhelm the server's admission queue and +cause timeouts for all instances. + +OJP's **Client-Side Throttling** handles this automatically. The server sends three numbers +to every connecting client (via `SessionInfo`): + +- **`maxAdmission`** — the configured pool size on this node. +- **`clientCount`** — how many distinct application instances are currently connected for + this database and credential pair. +- **`observedPeak`** — the real in-flight count just before the last admission timeout + (0 if no timeout has occurred). + +The driver uses these to compute a per-instance limit and enforces it with a fail-fast +counter (no blocking). Requests that exceed the limit receive an immediate `SQLException` +rather than queuing and making the server's situation worse. + +### Configuration + +The throttle mode is controlled by a single driver property: + +```properties +# Default: combined (recommended for most deployments) +ojp.jdbc.clientThrottle.mode=combined +``` + +| Value | Description | +|---|---| +| `combined` | Uses both `maxAdmission` (static fairness) and `observedPeak` (adaptive capacity). Takes the stricter of the two limits. **Default.** | +| `proactive` | Static fair-share limit only, based on `maxAdmission` and `clientCount`. | +| `reactive` | Adaptive limit only, based on `observedPeak`. No fairness guarantee between clients. | +| `off` | Disable throttling entirely (legacy compatibility only). | + +For most deployments, `combined` is correct and no change is needed. The feature is on +by default. + +### Disabling per datasource + +```properties +# Default datasource: combined throttling (no change needed) +ojp.jdbc.clientThrottle.mode=combined + +# Disable for a specific datasource (e.g., batch analytics) +analytics.ojp.jdbc.clientThrottle.mode=off +``` + +> **For a full explanation** of how client throttling works, the formula used, AIMD +> adaptation, in-transaction bypass, and multinode behaviour, see +> **[Chapter 8a: Client-Side Throttling](../ebook/part3-chapter8a-client-throttling.md)**. + +--- + ## Summary You now have comprehensive knowledge of OJP JDBC driver configuration: @@ -842,6 +897,7 @@ You now have comprehensive knowledge of OJP JDBC driver configuration: ✅ **Pool Settings**: Configure HikariCP properties via `ojp.properties` ✅ **Client Configuration**: Set up environment-specific and programmatic configs ✅ **Framework Integration**: Properly integrate with Spring Boot, Quarkus, Micronaut +✅ **Client Throttling**: Limit per-instance concurrent requests with `ojp.jdbc.clientThrottle.mode` **Key Takeaways**: - OJP URL wraps your existing database JDBC URL @@ -849,6 +905,7 @@ You now have comprehensive knowledge of OJP JDBC driver configuration: - **Always disable application-level connection pooling** - Standard JDBC transactions work seamlessly with OJP - Framework-specific configuration differs but principle remains: no double pooling +- Client throttling is on by default — each instance self-limits to its fair share of the server's pool In the next chapter, we'll explore OJP Server configuration, covering advanced settings for security, performance, and resilience. diff --git a/documents/ebook/part2-chapter6-server-configuration.md b/documents/ebook/part2-chapter6-server-configuration.md index 3b735dce9..86c103a05 100644 --- a/documents/ebook/part2-chapter6-server-configuration.md +++ b/documents/ebook/part2-chapter6-server-configuration.md @@ -367,7 +367,40 @@ graph TD Q --> R[Adjust Classification] ``` -## 6.8 Configuration Best Practices +## 6.8 Client Throttling Signals + +While client-side throttling is configured on the **driver side** (see +[Chapter 8a: Client-Side Throttling](../ebook/part3-chapter8a-client-throttling.md)), +the server plays an important role: it provides the signals that clients use to compute +their per-instance limit. + +On every `connect()` response, the server populates three fields in `SessionInfo`: + +| Field | What it carries | Source | +|---|---|---| +| `maxAdmission` | The configured pool size on this node | `SlotManager.totalSlots` | +| `clientCount` | Number of distinct application instances (JVMs) connected for this datasource/credential pair | `SessionManagerImpl` ref-count map | +| `observedPeak` | Real peak in-flight count before the last admission timeout; `0` = no timeout yet | `SlotManager` AIMD tracking | + +The server updates `observedPeak` automatically whenever an admission timeout occurs — +no configuration needed. By default, `observedPeak` recovers toward `maxAdmission` at a +rate of +1 every `totalSlots × 2` successful releases. If you need to tune recovery speed, +set `ojp.server.admissionControl.observedPeakRecoveryFactor` to a different multiplier: + +```bash +# Default: recover observedPeak by +1 every (totalSlots × 2) successful releases +-Dojp.server.admissionControl.observedPeakRecoveryFactor=2 + +# Faster recovery (useful for bursty workloads that return to normal quickly) +-Dojp.server.admissionControl.observedPeakRecoveryFactor=1 + +# Slower recovery (more conservative — useful for systems that stay degraded) +-Dojp.server.admissionControl.observedPeakRecoveryFactor=4 +``` + +No other server-side configuration is needed to support client throttling. + +## 6.9 Configuration Best Practices With all these configuration options available, how do you choose the right settings? Start with the defaults—they're designed for typical workloads and provide good performance out of the box. Then adjust based on monitoring data and observed behavior. Don't preemptively tune settings based on assumptions; let your actual workload guide your configuration. @@ -421,7 +454,7 @@ graph LR E --> C ``` -## 6.9 Configuration Validation and Troubleshooting +## 6.10 Configuration Validation and Troubleshooting When things don't work as expected, configuration issues are often the culprit. OJP provides clear error messages when configuration values are invalid or inconsistent. The server validates configuration at startup and fails fast if critical settings are problematic. diff --git a/documents/ebook/part3-chapter8-slow-query-segregation.md b/documents/ebook/part3-chapter8-slow-query-segregation.md index 208926eef..04623286e 100644 --- a/documents/ebook/part3-chapter8-slow-query-segregation.md +++ b/documents/ebook/part3-chapter8-slow-query-segregation.md @@ -393,4 +393,9 @@ Slow Query Segregation helps maintain application responsiveness when you have m The feature requires minimal configuration and adapts automatically. For applications that serve both interactive users and analytical workloads, it is strongly recommended. For pure OLTP or pure OLAP systems, keep it disabled unless your monitoring data shows that separation improves behavior. -In the next chapter, we'll explore another feature that enhances availability and scalability: Multinode Deployment. While Slow Query Segregation ensures efficient use of resources on a single server, multinode deployment lets you spread load across multiple servers for even greater capacity and resilience. +In the next chapter, we'll look at Client-Side Throttling — a complementary feature that prevents application instances from overloading the OJP server in the first place. While Slow Query Segregation protects fast queries from slow ones on the server side, client-side throttling prevents the admission queue from building up on the client side. The two features work well together. + +--- + +**Previous Chapter**: [← Chapter 7: Framework Integration](part2-chapter7-framework-integration.md) +**Next Chapter**: [Chapter 8a: Client-Side Throttling →](part3-chapter8a-client-throttling.md) diff --git a/documents/ebook/part3-chapter8a-client-throttling.md b/documents/ebook/part3-chapter8a-client-throttling.md new file mode 100644 index 000000000..b9c64cb7e --- /dev/null +++ b/documents/ebook/part3-chapter8a-client-throttling.md @@ -0,0 +1,449 @@ +# Chapter 8a: Client-Side Throttling + +## Introduction + +Imagine a busy highway that feeds into a tunnel. The tunnel can only handle 10 lanes of traffic +at a time. Without any control at the on-ramps, hundreds of cars pile up trying to enter +simultaneously, causing gridlock inside and outside the tunnel. Now imagine that the on-ramps +have smart meters that limit the flow based on how many cars are already inside. Traffic keeps +moving because no more cars enter than the tunnel can handle. + +OJP's Client-Side Throttling works exactly like those smart on-ramp meters — but for database +requests. When the OJP server's connection pool is under pressure, it signals back to every +connected application how many concurrent requests it can safely absorb. Each application +then limits itself to its fair share, preventing gridlock at the database level. + +> **Use this feature when** you have multiple application instances connecting to the same +> database through OJP and want to prevent overload cascades. It is enabled by default — no +> configuration change is required to benefit from it. + +--- + +## The Problem: Overload Cascades + +Without client-side throttling, here is what happens when the database slows down: + +1. The OJP server's connection pool fills up — all slots are busy. +2. New requests from applications must wait (queue up) for a free slot. +3. As the queue grows, waiting threads hold resources on the application side. +4. Applications start throwing timeout errors or retry, sending **even more requests**. +5. The queue grows faster, timeouts cascade, and the system becomes unresponsive. + +This is an **overload cascade** — the system gets worse the harder it is pushed, because +clients don't know they should slow down. + +```mermaid +sequenceDiagram + participant App1 + participant App2 + participant OJP Server + participant Database + + App1->>OJP Server: 10 concurrent requests + App2->>OJP Server: 10 concurrent requests + Note over OJP Server: Pool has 10 slots — all full + OJP Server->>Database: 10 queries (pool full) + App1->>OJP Server: 5 more requests (timeout retries) + App2->>OJP Server: 5 more requests (timeout retries) + Note over OJP Server: Queue grows — admission timeouts start + OJP Server-->>App1: Admission timeout error + OJP Server-->>App2: Admission timeout error + Note over App1,App2: Apps retry → more requests → worse cascade +``` + +**With client-side throttling**, each application caps itself before sending requests. The +server never gets more concurrent requests than it can handle, the queue stays short, and +admission timeouts stop happening: + +```mermaid +sequenceDiagram + participant App1 + participant App2 + participant OJP Server + participant Database + + OJP Server-->>App1: SessionInfo: maxAdmission=10, clientCount=2 + OJP Server-->>App2: SessionInfo: maxAdmission=10, clientCount=2 + Note over App1: Local limit = ceil(10/2) × 0.9 = 4 + Note over App2: Local limit = ceil(10/2) × 0.9 = 4 + App1->>OJP Server: at most 4 concurrent requests + App2->>OJP Server: at most 4 concurrent requests + Note over OJP Server: Pool never overwhelmed — queue stays short + OJP Server->>Database: ≤8 queries (well within 10-slot pool) + OJP Server-->>App1: Results (no timeouts) + OJP Server-->>App2: Results (no timeouts) +``` + +--- + +## How It Works + +### Step 1 — Server signals its capacity + +Every time a JDBC connection is established, the OJP server includes three numbers in its +response (inside the `SessionInfo` message that the driver receives): + +| Field | Meaning | Example | +|---|---|---| +| `maxAdmission` | The server's connection pool size on this node | `10` | +| `clientCount` | How many distinct application instances (JVMs) are currently connected for this database/credential pair | `2` | +| `observedPeak` | The highest in-flight count just before the last admission timeout (0 = no timeout has occurred yet) | `8` | + +These numbers tell each client exactly how busy the server is and how many peers are sharing it. + +### Step 2 — Driver computes its fair share + +The driver uses a simple formula to compute its per-client limit: + +``` +effectiveCapacity = observedPeak (if a timeout has occurred) + = maxAdmission (if no timeout has occurred yet) + +rawLimit = ceil(effectiveCapacity / clientCount) × numUpOjpNodes +limit = floor(rawLimit × 0.9) ← 10% safety headroom +limit = max(1, limit) ← always allow at least 1 +``` + +**Example**: 5 application instances, OJP pool size 20, single-node cluster. + +``` +effectiveCapacity = 20 (no timeouts yet) +rawLimit = ceil(20 / 5) × 1 = 4 +limit = floor(4 × 0.9) = 3 +``` + +Each of the 5 apps allows at most 3 concurrent requests. Combined: up to 15 in-flight +against a 20-slot pool — a comfortable 75% utilisation with headroom to spare. + +**Why ceiling division, not floor?** +Floor division (`20/7 = 2`) leaves 6 slots permanently idle even at full load. Ceiling +(`ceil(20/7) = 3`) makes full use of capacity. The 10% deduction then absorbs one stale +`clientCount` reading without affecting steady-state throughput. + +**Why 10% headroom?** +When `clientCount` is slightly stale (a client just connected or disconnected), the limit +could be off by one client's worth of slots. 10% is usually enough to absorb that error +without meaningfully reducing throughput. + +### Step 3 — Driver enforces the limit (fail-fast, non-blocking) + +The driver uses an `AtomicInteger` counter instead of a blocking `Semaphore`. This is +important for performance: + +```java +// Before sending request to OJP server: +if (inFlight.incrementAndGet() > limit) { + inFlight.decrementAndGet(); + throw new SQLException("Client throttle limit reached; request rejected to avoid overloading the database"); +} + +// After request completes (always in finally block): +inFlight.decrementAndGet(); +``` + +- On the **happy path** (under limit): one atomic increment + one comparison. Near-zero overhead. +- When the **limit is exceeded**: the request is rejected immediately with a `SQLException`, no blocking. + +**In-transaction bypass**: if a connection is already inside a transaction (`autoCommit = false`), +subsequent statements on that connection are allowed through without checking the throttle. This +prevents a deadlock where a statement needs to complete in order to free a slot, but cannot get +a slot because the limit is already reached. Transactions typically run to completion quickly anyway. + +### Step 4 — Limit adapts over time (AIMD) + +The limit is not static — it adjusts as `SessionInfo` arrives with updated numbers. However, +increases are step-limited to prevent sudden spikes: + +- **Decrease**: applied immediately. If the server reports fewer available slots, the limit + drops right away (fast overload response). +- **Increase**: capped at `currentLimit + 1` per update cycle (additive increase). + +This is the **AIMD** (Additive Increase, Multiplicative Decrease) algorithm — the same +principle TCP uses to manage congestion on networks. + +**Why step-limited increase?** +Without it, when 4 out of 8 clients disconnect simultaneously, every remaining client instantly +recalculates a much higher limit and sends a burst of queued requests. With AIMD, the limit +grows by 1 each time a `SessionInfo` arrives (every few milliseconds under load), so the burst +takes seconds to materialise — too slow to cause a spike. + +--- + +## Three Modes + +Client throttling has three modes, configurable per application: + +| Mode | How it works | Best for | +|---|---|---| +| `proactive` | Uses `maxAdmission` (static pool size) to compute the limit from day 1. Guarantees fairness between clients. | Stable workloads where the pool size is a reliable capacity signal. | +| `reactive` | Uses `observedPeak` (adaptive real capacity). Limit starts unconstrained; only tightens after the first admission timeout is observed. | Environments where the DB sometimes degrades below its configured pool size. | +| `combined` | `effectiveLimit = min(proactive, reactive)`. Gets fairness from proactive and adaptive protection from reactive. | Recommended for most deployments (this is the **default**). | + +### Visual comparison + +```mermaid +graph TD + A[New Request Arrives] --> B{Throttle Mode?} + + B -->|proactive| C[Compute: ceil'maxAdmission/clientCount' × numUpOjpNodes × 0.9] + B -->|reactive| D{observedPeak > 0?} + B -->|combined| E[Compute both proactive and reactive limits] + + D -->|No — no timeout seen yet| F[No throttle applied] + D -->|Yes — timeout observed| G[Compute: ceil'observedPeak/clientCount' × numUpOjpNodes × 0.9] + + C --> H{inFlight < limit?} + F --> I[Allow request] + G --> H + E --> J{inFlight < min'proactive, reactive'?} + + H -->|Yes| I + H -->|No| K[Reject with SQLException] + J -->|Yes| I + J -->|No| K +``` + +--- + +## The Reactive Signal: `observedPeak` + +`observedPeak` is the server's way of saying "here is the real capacity I have observed under +load — it may be lower than what I am configured for." + +**How the server computes it:** + +The server (`SlotManager`) tracks the peak in-flight count. When an admission timeout occurs +(a request waited too long for a pool slot and gave up), it records: + +``` +observedPeak = max(10% floor, min(current observedPeak, currentActiveCount)) +``` + +This means `observedPeak` snaps **down** immediately on a timeout (fast response). It +recovers slowly: every `totalSlots × 2` successful completions, `observedPeak` increases +by 1. This is the AIMD pattern applied to the server side. + +**Example:** + +Server has 20 slots. Peak load hit 18 in-flight, then a timeout fired. Client count = 3. + +``` +observedPeak snaps down to 18 +Client limit = ceil(18/3) × 0.9 = floor(6 × 0.9) = 5 +``` + +Previously each client had a limit of `ceil(20/3) × 0.9 = 5` (same in this case). +If the timeout had fired at lower in-flight (say, 12 because the DB was momentarily slow): + +``` +observedPeak snaps down to 12 +Client limit = ceil(12/3) × 0.9 = floor(4 × 0.9) = 3 +``` + +Clients automatically become more conservative, reducing pressure on a struggling DB. + +**10% floor:** `observedPeak` never drops below 10% of `totalSlots` (minimum 1). This +prevents a single transient slow query from collapsing all clients to near-zero throughput. + +--- + +## Configuration + +### Driver configuration + +Set in `ojp.properties`, as an environment variable, or as a JVM system property: + +```properties +# Default: combined (recommended) +ojp.jdbc.clientThrottle.mode=combined + +# Options: +# off — disable entirely (legacy compatibility) +# proactive — static fair-share only +# reactive — adaptive observedPeak only (no fairness guarantee) +# combined — min(proactive, reactive) — recommended +``` + +Environment variable equivalent: + +```bash +export OJP_JDBC_CLIENTTHROTTLE_MODE=combined +``` + +### Disabling for a specific datasource + +You can disable throttling for one datasource while keeping it for others: + +```properties +# Default datasource: combined throttling +ojp.jdbc.clientThrottle.mode=combined + +# Analytics datasource: disable throttling (batch jobs can use full capacity) +analytics.ojp.jdbc.clientThrottle.mode=off +``` + +### When to change the default + +The `combined` default is correct for almost all deployments. Only consider changing it if: + +- **`off`**: Your application already has its own concurrency control and you do not want + OJP adding a second layer. +- **`proactive`**: You want fairness guarantees but do not trust `observedPeak` (e.g., your + DB occasionally spikes slowly but recovers, and you do not want client limits to drop). +- **`reactive`**: You have a single-client deployment (or do not care about fairness) but + want the adaptive capacity signal to protect the DB. + +--- + +## Using Client Throttling with Slow Query Segregation + +Both features protect the database but at different layers. They are complementary and work +well together. + +```mermaid +graph LR + A[Application] -->|"≤ per-client limit\n(Client Throttling)"| B[OJP Driver] + B -->|gRPC| C[OJP Server] + C -->|"Fast lane (80%)\nSlow lane (20%)\n(Slow Query Segregation)"| D[(Database)] + + style A fill:#e8f5e9 + style B fill:#fff9c4 + style C fill:#e3f2fd + style D fill:#fce4ec +``` + +- **Client Throttling** (driver-side) limits how many concurrent requests each application + instance sends to the server. +- **Slow Query Segregation** (server-side) ensures that long-running queries cannot starve + fast ones waiting for pool slots. + +Together they provide two layers of protection: the client limits overall in-flight count, +and the server isolates the impact of slow queries. + +### Interaction to be aware of + +`maxAdmission` equals the **full** HikariCP pool size, not just the fast-lane slots. When +Slow Query Segregation is enabled with the default 20% slow slots, 80% of slots serve fast +queries. Clients may occasionally push slightly more fast queries than the fast lane can +absorb — but any resulting admission timeout feeds back through `observedPeak`, which then +reduces the client limit. The system is self-correcting. + +**Startup warm-up:** Under `RELATIVE_FAST_BASELINE` mode (the default for SQS), the +slow-query classification baseline does not exist until 20 samples have been processed. During +startup, all queries compete for fast slots. If load is high at startup, an admission timeout +may fire and `observedPeak` may dip temporarily. The 10% floor prevents clients from being +throttled to zero. The AIMD recovery restores full throughput within seconds. + +### Recommended configuration + +For most deployments, the defaults work correctly together: + +```properties +# Slow Query Segregation — server side +ojp.server.slowQuerySegregation.enabled=true +# classificationMode defaults to RELATIVE_FAST_BASELINE — no change needed + +# Client Throttling — driver side +# ojp.jdbc.clientThrottle.mode defaults to combined — no change needed +``` + +For predictable, well-characterised workloads (e.g., analytics batch + OLTP), use +`ABSOLUTE_THRESHOLD` for a more stable `observedPeak` signal: + +```properties +ojp.server.slowQuerySegregation.enabled=true +ojp.server.slowQuerySegregation.classificationMode=ABSOLUTE_THRESHOLD +ojp.server.slowQuerySegregation.slowQueryThresholdMs=800 + +ojp.jdbc.clientThrottle.mode=combined +``` + +--- + +## Multinode Behaviour + +In a multinode OJP cluster, `clusterHealth` in `SessionInfo` already tells the driver how +many OJP nodes are UP. The formula multiplies the per-node limit by the number of UP nodes: + +``` +rawLimit = ceil(effectiveCapacity / clientCount) × numUpOjpNodes +``` + +**Known v1 limitation (cross-node client count):** Each node counts only the clients directly +connected to it. In a 2-node cluster where App1 connects exclusively to Node A and App2 to +Node B, each node reports `clientCount = 1`. Both clients compute a higher limit than if +both connected to the same node. The server's own admission control (`SlotManager`) is the +final safety gate, so this does not cause incorrect results — it simply means the client +limit is slightly more conservative on single-node connections than on evenly distributed +ones. Cross-node client-count sharing is planned for a future release. + +--- + +## Monitoring and Troubleshooting + +### How to tell throttling is working + +When a request is rejected by the client throttle, the application receives: + +``` +java.sql.SQLException: Client throttle limit reached; request rejected to avoid overloading the database +``` + +This is different from a server-side admission timeout (`ServerOverloadException`). If you +see the client throttle exception, it means the driver is doing its job — it is protecting +the server from a burst of requests that would cause an admission timeout. + +### When client throttle fires too often + +If the client throttle rejects too many requests for your workload, possible causes are: + +1. **`clientCount` is stale** — a client disconnected and the count has not refreshed yet. + The limit will self-correct on the next `SessionInfo` update (usually within milliseconds + of a new request completing). +2. **The pool is genuinely too small** — increase `maxPoolSize` on the OJP server. +3. **Too many application instances** — more clients means each gets a smaller share. Consider + reducing the number of instances or increasing the pool size. +4. **`observedPeak` dropped due to a transient DB blip** — reactive mode is working. The limit + will recover via AIMD as the DB returns to normal. + +### Checking the effective limit + +You can enable DEBUG logging in the driver to see the computed limit on each update: + +```properties +# In logback.xml or application logging config +logging.level.org.openjproxy.jdbc.ClientThrottleManager=DEBUG +``` + +--- + +## Summary + +Client-side throttling prevents overload cascades by having each application instance +limit its own concurrent request count to its fair share of the server's capacity. + +| What | How | +|---|---| +| Signal source | `SessionInfo` fields: `maxAdmission`, `clientCount`, `observedPeak` | +| Limit computation | `ceil(capacity / clientCount) × numUpOjpNodes × 0.9` | +| Enforcement | Fail-fast `AtomicInteger` counter (no blocking, near-zero overhead) | +| Limit updates | AIMD: instant decrease, step-limited increase (`+1` per update cycle) | +| Default mode | `combined` — both static fairness and adaptive capacity | +| In-transaction bypass | Yes — prevents deadlock on open transactions | + +**Key properties:** + +| Property | Default | Options | +|---|---|---| +| `ojp.jdbc.clientThrottle.mode` | `combined` | `off`, `proactive`, `reactive`, `combined` | + +The feature is on by default. For most deployments, no configuration change is needed. + +In the next chapter, we will look at Multinode Deployment — how to run multiple OJP servers +for high availability and greater throughput, and how client throttling interacts with node +failover. + +--- + +**Previous Chapter**: [← Chapter 8: Slow Query Segregation](part3-chapter8-slow-query-segregation.md) +**Next Chapter**: [Chapter 9: Multinode Deployment →](part3-chapter9-multinode-deployment.md) diff --git a/documents/multi-language-client-spec/CLIENT_SPEC.md b/documents/multi-language-client-spec/CLIENT_SPEC.md index 9cc3bd537..5e23709a5 100644 --- a/documents/multi-language-client-spec/CLIENT_SPEC.md +++ b/documents/multi-language-client-spec/CLIENT_SPEC.md @@ -1,7 +1,7 @@ # OJP Multi-Language Client Specification > **Status:** Draft — April 2026 -> **Last updated:** 2026-04-30 +> **Last updated:** 2026-05-20 > **Scope:** Defines every aspect that a new OJP client library (in any language) must implement to be fully compatible with an OJP server. Written language-agnostically; Java-specific concepts are labelled as reference implementation only. > **Reference implementation:** `ojp-jdbc-driver` module. > **Protocol source of truth:** `ojp-grpc-commons/src/main/proto/StatementService.proto` and `echo.proto`. @@ -47,6 +47,7 @@ - 7.11 [Query Result Caching](#711-query-result-caching) - 7.12 [Security / Transport](#712-security--transport) - 7.13 [DataSource / Integration API](#713-datasource--integration-api) + - 7.14 [Client-Side Throttling](#714-client-side-throttling) 8. [Testing Coverage](#8-testing-coverage) --- @@ -114,7 +115,7 @@ If the bound server becomes unhealthy while a sticky session is open, the client **The server owns:** real JDBC connections and connection pool management (pool implementation is pluggable via SPI); transaction state; LOB storage; server-side cursor state; query result caching; slow-query slot management; pool resizing in response to cluster health changes. -**The client owns:** `SessionInfo` propagation (attach current `SessionInfo` to every request; replace with response); `connHash` caching; endpoint health tracking; load balancing; failover; cluster health string building and pushing to surviving servers; session stickiness enforcement (`sessionUUID → targetServer` binding); background health-check task; connection redistribution after server recovery. +**The client owns:** `SessionInfo` propagation (attach current `SessionInfo` to every request; replace with response); `connHash` caching; endpoint health tracking; load balancing; failover; cluster health string building and pushing to surviving servers; session stickiness enforcement (`sessionUUID → targetServer` binding); background health-check task; connection redistribution after server recovery; client-side throttling (reads `maxAdmission`, `clientCount`, and `observedPeak` from `SessionInfo` responses and limits the number of concurrent requests this process sends to each OJP node — see §7.14). --- @@ -392,6 +393,9 @@ connhash_cache[cache_key] = session.connHash | `isXA` | bool | Whether this is an XA session | | `targetServer` | string | `host:port` of the server this session is pinned to | | `clusterHealth` | string | Current cluster health snapshot from the server's perspective | +| `clientCount` | int32 | Number of distinct JVM/process clients connected to this `connHash` on this node | +| `maxAdmission` | int32 | Total connection pool slots on this node (the server's full capacity) | +| `observedPeak` | int32 | Peak concurrent load the server handled before its last admission timeout; `0` = no timeout observed yet (use `maxAdmission` instead) | **Lifecycle rules:** @@ -1230,6 +1234,7 @@ analytics.ojp.health.check.interval=10000 | `ojp.datasource.name` | `"default"` | Active datasource name (sent to the server) | | `ojp.grpc.tls.enabled` | `false` | Enable TLS on gRPC channels | | `ojp.grpc.tls.cert.path` | — | Path to client certificate for mTLS | +| `ojp.jdbc.clientThrottle.mode` | `combined` | Client throttle mode: `off`, `proactive`, `reactive`, or `combined` (see §7.14) | **Duration format** — values support: no suffix = milliseconds; `ms` = milliseconds; `s` = seconds; `m` = minutes. @@ -1301,6 +1306,153 @@ Provide a higher-level `DataSource` (or equivalent) object that holds connection --- +### 7.14 Client-Side Throttling + +#### Why it exists + +When the OJP server's connection pool is overwhelmed, it responds with admission timeouts. Without client-side throttling, all application threads keep retrying, making the overload worse — more requests arrive at the already-struggling server, consuming queue slots and prolonging the outage. + +Client-side throttling breaks the cycle. As soon as the server reports an overload signal through `SessionInfo`, each client caps its own in-flight request count. Fewer requests reach the server, giving the pool space to recover. + +**Example (without throttling):** 50 threads across 5 app servers hit one OJP node at once. The node's 10-slot pool is overwhelmed; 40 requests queue or time out. Those timed-out threads retry — sending even more requests. The situation gets worse. + +**Example (with throttling):** The server reports `maxAdmission=10`, `clientCount=5`. Each client calculates `ceil(10/5) × 0.9 ≈ 1` concurrent request. At most 5–6 requests reach the server simultaneously — within its capacity. Admission timeouts stop and throughput recovers. + +#### Three signals from the server + +Every `SessionInfo` response (on `connect()`, `executeUpdate()`, and `startTransaction()`) carries three throttle signals: + +| Field | Meaning | +|---|---| +| `maxAdmission` | The total pool slot count on this server node. The baseline capacity. | +| `clientCount` | How many distinct client processes (by `clientUUID`) are connected to this `connHash` on this node right now. | +| `observedPeak` | The server's observed peak concurrent load before its last admission timeout. Starts at `0` (no timeout seen yet). Updated downward immediately when a timeout occurs; recovers upward slowly (+1 per `totalSlots × 2` releases). | + +#### Fair-share formula + +Each client computes its per-node budget from these signals: + +``` +rawBudget = ceil(signal / clientCount) × numberOfUpNodes +budget = floor(rawBudget × 0.9) # 10% safety headroom +budget = max(1, budget) # always allow at least 1 +``` + +Where `signal` is: +- **Proactive mode:** use `maxAdmission`. +- **Reactive mode:** use `observedPeak` (falls back to `maxAdmission` when `observedPeak == 0`). +- **Combined mode (default):** compute both, take `min(proactiveBudget, reactiveBudget)`. + +The **10% headroom** exists because ceiling division can slightly over-allocate. Example: server has 20 slots, 3 clients. +- Without headroom: `ceil(20/3) = 7` per client → 3 clients × 7 = **21** requests, exceeding the 20-slot capacity by 1. +- With headroom: `floor(7 × 0.9) = floor(6.3) = 6` per client → 3 × 6 = **18**, safely within capacity. + +The extra slack also absorbs one stale `clientCount` reading (the count is updated asynchronously). + +**AIMD (Additive Increase, Multiplicative Decrease):** limits only change by `+1` per `SessionInfo` update when growing. They drop immediately when the signal decreases. This prevents multiple simultaneous client reconnections from all increasing their budgets at once (which would recreate the burst). + +#### Four throttle modes + +| Mode | Property value | What it uses | +|---|---|---| +| Off | `off` | No throttling; all requests pass through | +| Proactive | `proactive` | Uses `maxAdmission + clientCount` to set a static fair-share limit | +| Reactive | `reactive` | Uses `observedPeak + clientCount` to track actual server capacity | +| **Combined (default)** | `combined` | Uses both; the effective limit is `min(proactiveBudget, reactiveBudget)` | + +Configure via `ojp.jdbc.clientThrottle.mode` (default: `combined`). Per-datasource override: + +```properties +# Global default +ojp.jdbc.clientThrottle.mode=combined + +# Per-datasource override (datasource name: "analytics") +analytics.ojp.jdbc.clientThrottle.mode=off +``` + +#### How the counter works + +Implement a fail-fast (non-blocking) per-`connHash` counter: + +```python +class ClientThrottle: + def __init__(self): + self._in_flight = AtomicInt(0) # thread-safe atomic integer + self.proactive_limit = MAX_INT + self.reactive_limit = MAX_INT + + def try_acquire(self, mode, in_transaction) -> bool: + if mode == "off" or in_transaction: + return True # bypass throttle while inside a transaction + limit = self._effective_limit(mode) + if limit == MAX_INT: + return True # uninitialised — let request through + while True: + cur = self._in_flight.get() + if cur >= limit: + return False # reject: at limit + if self._in_flight.compare_and_set(cur, cur + 1): + return True # accepted + + def release(self, mode, in_transaction): + if mode == "off" or in_transaction: + return + self._in_flight.update(lambda v: max(0, v - 1)) + + def update_from_session_info(self, session_info): + # Called after each executeUpdate / startTransaction response + client_count = max(1, session_info.clientCount) + n_up = count_up_servers(session_info.clusterHealth) + if session_info.maxAdmission > 0: + raw = ceil(session_info.maxAdmission / client_count) * n_up + new_limit = max(1, int(raw * 0.9)) + # AIMD: drop immediately, grow by at most +1 + if new_limit < self.proactive_limit: + self.proactive_limit = new_limit + elif new_limit > self.proactive_limit: + self.proactive_limit = self.proactive_limit + 1 + # similar logic for reactive_limit using observedPeak +``` + +**Important:** the `try_acquire` / `release` pair must bracket every `executeQuery` and `executeUpdate` call. The `release` must happen in a `finally` block so it runs even if the call throws. + +#### In-transaction bypass + +Requests that arrive **inside an open transaction** bypass the throttle (see `in_transaction` flag above). Rejecting a mid-transaction request would cause an abandoned transaction on the server, holding real database connections and blocking other work. The server's admission-control semaphore provides the safety boundary for in-transaction work. + +#### Cross-node note (v1 caveat) + +In multinode deployments, each OJP node sees only its own `clientCount`. Two clients connecting to different nodes each compute their per-node budget independently. If each node has 10 slots and 1 client each, both clients compute `ceil(10/1) × 2 × 0.9 = 18`. Total cluster capacity is 20, and the server's own semaphore remains the final safety gate. + +#### Pseudo-code — complete request lifecycle + +```python +throttle = get_or_create_throttle(session.connHash) # one per connHash + +def execute_update(sql, params): + in_txn = not auto_commit + if not throttle.try_acquire(mode, in_txn): + raise Exception("Client throttle limit reached; request rejected " + "to avoid overloading the database") + try: + resp = stub.executeUpdate(StatementRequest(session=session, sql=sql, + parameters=params, + statementUUID=new_uuid())) + session = resp.session + throttle.update_from_session_info(session) + return resp.value.int_value # affected row count + finally: + throttle.release(mode, in_txn) +``` + +> **Reference implementation:** +> - `ojp-jdbc-driver` — [`ClientThrottleManager`](../../ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/ClientThrottleManager.java): the complete per-`connHash` throttle implementation with atomic CAS counter, AIMD update, and `countUpServers()`. +> - `ojp-jdbc-driver` — [`ClientThrottleMode`](../../ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/ClientThrottleMode.java): `OFF | PROACTIVE | REACTIVE | COMBINED` enum; loaded from `ojp.jdbc.clientThrottle.mode`. +> - `ojp-jdbc-driver` — [`Connection`](../../ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Connection.java): `THROTTLE_MANAGERS` static map (keyed by `connHash`); `setSession()` calls `throttle.updateFromSessionInfo()` after each response. +> - `ojp-jdbc-driver` — [`Statement.acquireThrottle()`](../../ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Statement.java): the `try_acquire` / `release` guard used by `executeQuery` and `executeUpdate`. + +--- + ## 8. Testing Coverage A conformant client implementation must ship a test suite that exercises all the aspects above. Tests that require a live OJP server (and optionally a real database) should be **gated behind feature flags** so the suite can run incrementally in CI. @@ -1400,6 +1552,15 @@ A conformant client implementation must ship a test suite that exercises all the #### Slow query segregation - Send queries that take longer than the slow-query threshold; verify they use the reserved slow-query slots and do not starve fast queries. +#### Client-side throttling +- With throttle mode `proactive`: send more concurrent requests than `ceil(maxAdmission / clientCount) × 0.9`; verify the excess requests are rejected immediately (no blocking) with a throttle error — not forwarded to the server. +- With throttle mode `reactive`: simulate an admission timeout (drive `observedPeak` down); verify the client reduces its budget and rejects excess requests. +- With throttle mode `combined`: verify the effective limit is `min(proactiveBudget, reactiveBudget)`. +- With throttle mode `off`: verify all requests pass through regardless of in-flight count. +- Verify that in-transaction requests bypass the throttle (a mid-transaction request is never rejected by the client). +- Verify AIMD recovery: after a reduced `observedPeak`, send requests at a modest rate; verify the budget increases by at most 1 per `SessionInfo` update. +- Verify per-datasource throttle mode override via `.ojp.jdbc.clientThrottle.mode`. + #### Multi-datasource - Configure two endpoints with different datasource names; verify each endpoint uses its own datasource configuration. @@ -1486,3 +1647,4 @@ H2 tests (in-process, no external dependency) must always be runnable in CI with | XA | `OjpXAResource`, `OjpXAConnection`, `OjpXADataSource` | | Driver entry point | `Driver` | | DataSource wrapper | `OjpDataSource` | +| Client-side throttling | `ClientThrottleManager`, `ClientThrottleMode` | diff --git a/documents/multi-language-client-spec/CLIENT_SPEC_AI.md b/documents/multi-language-client-spec/CLIENT_SPEC_AI.md index 0ddc31ba8..2aad6ecc5 100644 --- a/documents/multi-language-client-spec/CLIENT_SPEC_AI.md +++ b/documents/multi-language-client-spec/CLIENT_SPEC_AI.md @@ -1,7 +1,7 @@ # OJP Client Specification — Machine-Oriented Reference > **Status:** Normative — April 2026 -> **Last updated:** 2026-04-30 +> **Last updated:** 2026-05-20 > **Scope:** Defines the complete behavioral contract for any OJP client implementation. > **Keywords:** MUST, MUST NOT, SHOULD, MAY as defined in RFC 2119. > **Protocol source:** `ojp-grpc-commons/src/main/proto/StatementService.proto`, `echo.proto` @@ -19,12 +19,16 @@ | **Virtual Connection** | A client-side object representing logical access to a database pool, identified by a `SessionInfo` token. Does not correspond 1:1 to a real database connection. | | **Real Connection** | A JDBC connection held by the Server's connection pool. The Client never holds one directly. | | **connHash** | A server-computed SHA-256 string keying a specific connection pool. Computed as SHA-256(`url + user + password + datasource_name`). | -| **SessionInfo** | A proto message propagated on every RPC. Contains `connHash`, `clientUUID`, `sessionUUID`, `transactionInfo`, `sessionStatus`, `isXA`, `targetServer`, `clusterHealth`. | +| **SessionInfo** | A proto message propagated on every RPC. Contains `connHash`, `clientUUID`, `sessionUUID`, `transactionInfo`, `sessionStatus`, `isXA`, `targetServer`, `clusterHealth`, `clientCount`, `maxAdmission`, `observedPeak`. | | **sessionUUID** | A server-assigned handle for a stateful session (transaction, LOB, cursor). Absent until the Server assigns it. | | **targetServer** | The `host:port` the Server binds a `sessionUUID` to. The Client MUST route all requests carrying that `sessionUUID` to this server. | | **clientUUID** | A stable UUID v4 generated once per Client process lifetime. | | **clusterHealth** | A semicolon-delimited string of `host:port(UP\|DOWN)` segments reflecting known endpoint health. | | **connHash cache** | A thread-safe client-side map: `url\|user\|password\|datasourceName → connHash`. Populated on first non-XA `connect()` RPC. | +| **maxAdmission** | The total connection pool slots on a single OJP server node. Sent in `SessionInfo`; used by the client throttle to compute a per-client capacity budget. | +| **observedPeak** | The peak concurrent in-flight load the server observed before its last admission timeout. `0` = no timeout yet (use `maxAdmission` as baseline). Updated downward immediately on overload; recovers slowly (+1 per `totalSlots × 2` releases, AIMD). | +| **clientCount** | The number of distinct client processes (by `clientUUID`) connected to a specific `connHash` on one OJP node. Used to split `maxAdmission`/`observedPeak` fairly across clients. | +| **ClientThrottleManager** | A per-`connHash` client-side component that maintains a fail-fast `AtomicInteger` in-flight counter and limits the number of concurrent requests this process sends to one OJP node. | --- @@ -108,6 +112,9 @@ SessionInfo: isXA: bool targetServer: string # "host:port"; MUST be used for routing when sessionUUID is set clusterHealth: string # server's view of cluster topology + clientCount: int32 # number of distinct client processes connected to this connHash on this node + maxAdmission: int32 # total pool slots on this node (throttle baseline) + observedPeak: int32 # peak load before last admission timeout; 0 = uninitialised StatementRequest: session: SessionInfo # MUST include current SessionInfo @@ -355,7 +362,89 @@ If the bound server is `UNHEALTHY` when a sticky request is made, the client MUS --- -## 8. Versioning and Compatibility +## 8. Client-Side Throttling + +### 8.1 Why It Is Needed + +When the OJP server's pool is overwhelmed (admission timeouts), clients that keep sending new requests make the overload worse. Client-side throttling reads the three `SessionInfo` signals (`maxAdmission`, `clientCount`, `observedPeak`) and limits how many requests this process sends concurrently to each OJP node. + +### 8.2 Throttle Signals + +The server populates these `SessionInfo` fields on every `connect()`, `executeUpdate()`, and `startTransaction()` response: + +| Field | Type | Meaning | +|---|---|---| +| `maxAdmission` | int32 | Total pool slots on this node. The static capacity baseline. | +| `clientCount` | int32 | Distinct client processes (by `clientUUID`) connected to this `connHash` on this node. | +| `observedPeak` | int32 | Peak concurrent load before the last admission timeout. `0` = no timeout observed yet. | + +### 8.3 Fair-Share Formula + +``` +rawBudget = ceil(signal / max(1, clientCount)) × numberOfUpNodes +budget = max(1, floor(rawBudget × 0.9)) # 10% safety headroom +``` + +Where `signal` is `maxAdmission` (proactive mode), `observedPeak` (reactive mode), or both (combined mode takes `min`). + +`numberOfUpNodes` = count of `(UP)` segments in `SessionInfo.clusterHealth`; defaults to 1 if empty. + +### 8.4 AIMD Limit Updates + +Limits MUST follow AIMD when updated from `SessionInfo`: + +- **Decrease:** if the new computed limit is lower than the current limit, apply immediately (`limit = newLimit`). +- **Increase:** if the new computed limit is higher, increase by at most 1 (`limit = currentLimit + 1`). + +This prevents a "reconnect burst" where many clients simultaneously increase their budgets after a recovery event. + +### 8.5 Throttle Modes + +| Mode | Property value | What it computes | +|---|---|---| +| Off | `off` | No throttling; `tryAcquire` always returns `true` | +| Proactive | `proactive` | Budget from `maxAdmission + clientCount` | +| Reactive | `reactive` | Budget from `observedPeak + clientCount`; falls back to `maxAdmission` when `observedPeak == 0` | +| **Combined** (default) | `combined` | `min(proactiveBudget, reactiveBudget)` | + +Configured via `ojp.jdbc.clientThrottle.mode` (default: `combined`). Per-datasource override via `.ojp.jdbc.clientThrottle.mode`. + +### 8.6 Concurrency Rules + +1. The client MUST maintain one `ClientThrottleManager` per `connHash` (keyed in a thread-safe map). Multiple connections with the same credentials share the same throttle instance. +2. Before sending `executeQuery` or `executeUpdate`, the client MUST call `tryAcquire(mode, inTransaction)`. +3. After the RPC returns (success or error), the client MUST call `release(mode, inTransaction)` in a `finally`-equivalent block. +4. `tryAcquire` MUST use a CAS (compare-and-set) loop, not a blocking semaphore, to avoid turning a throttle check into a blocking wait. +5. `tryAcquire` MUST return `true` immediately (bypass throttle) when `mode == OFF` or when `inTransaction == true`. +6. When `tryAcquire` returns `false`, the client MUST raise an error to the caller immediately with a message indicating database overload. The client MUST NOT block, retry, or forward the request to the server. +7. After each `executeUpdate` or `startTransaction` response, the client MUST call `updateFromSessionInfo(sessionInfo)` to refresh proactive and reactive limits. + +### 8.7 In-Transaction Bypass + +Requests inside an open transaction (i.e., `autoCommit == false` for the current connection) MUST bypass the throttle. Rejecting a mid-transaction request would abandon the transaction on the server, holding a real database connection open. + +### 8.8 Counter Implementation + +``` +tryAcquire(mode, inTransaction): + if mode == OFF or inTransaction: return true + limit = effectiveLimit(mode) + if limit == MAX_INT: return true + loop: + cur = inFlight.atomicGet() + if cur >= limit: return false + if inFlight.compareAndSet(cur, cur + 1): return true + +release(mode, inTransaction): + if mode == OFF or inTransaction: return + inFlight.atomicUpdate(v -> max(0, v - 1)) +``` + +The `inFlight` counter MUST be atomically clamped to `max(0, inFlight - 1)` on release to handle edge cases where races could drive it below zero. + +--- + +## 9. Versioning and Compatibility 1. The client MUST be compiled against the same `.proto` files as the target server version. 2. The client SHOULD send only fields defined in the proto version it was compiled against. @@ -365,9 +454,9 @@ If the bound server is `UNHEALTHY` when a sticky request is made, the client MUS --- -## 9. Compliance Requirements +## 10. Compliance Requirements -### 9.1 MUST Implement +### 10.1 MUST Implement - All 21 `StatementService` RPCs and `EchoService.Echo` - `connHash` caching (non-XA cache-hit path with no RPC) @@ -392,8 +481,12 @@ If the bound server is `UNHEALTHY` when a sticky request is made, the client MUS - TLS transport support (plaintext default; TLS when `ojp.grpc.tls.enabled=true`) - `clientUUID` generation (one UUID v4 per process lifetime) - `reinitializePoolOnRecoveredServer()` called **before** `endpoint.markHealthy()` on recovery +- Read `clientCount`, `maxAdmission`, `observedPeak` from every `SessionInfo` response and pass to `ClientThrottleManager.updateFromSessionInfo()` (§8) +- `tryAcquire` / `release` around every `executeQuery` and `executeUpdate` call (§8.6) +- In-transaction bypass in `tryAcquire` (§8.7) +- CAS-based `inFlight` counter for non-blocking throttle checks (§8.8) -### 9.2 SHOULD Implement +### 10.2 SHOULD Implement - Full XA transaction lifecycle (all 10 XA RPCs) - Full-validation health probe (in addition to heartbeat probe) @@ -401,8 +494,9 @@ If the bound server is `UNHEALTHY` when a sticky request is made, the client MUS - Cache rule pass-through via `ConnectionDetails.properties` - `DataSource` wrapper / integration API matching host platform conventions - Per-datasource configuration namespacing +- All four throttle modes (`off`, `proactive`, `reactive`, `combined`) selectable via `ojp.jdbc.clientThrottle.mode` (§8.5) -### 9.3 MAY Implement +### 10.3 MAY Implement - Async / non-blocking RPC API surface - Metrics / telemetry export (OpenTelemetry recommended) @@ -410,7 +504,7 @@ If the bound server is `UNHEALTHY` when a sticky request is made, the client MUS --- -## 10. Action → Protocol Mapping +## 11. Action → Protocol Mapping | High-Level Action | gRPC RPC(s) | Notes | |---|---|---| diff --git a/ojp-datasource-api/pom.xml b/ojp-datasource-api/pom.xml index 456d64d05..a5d4a58fd 100644 --- a/ojp-datasource-api/pom.xml +++ b/ojp-datasource-api/pom.xml @@ -6,13 +6,13 @@ OJP Datasource API ojp-datasource-api - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT Connection Pool Abstraction API for OJP org.openjproxy ojp-parent - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT ../pom.xml diff --git a/ojp-datasource-dbcp/pom.xml b/ojp-datasource-dbcp/pom.xml index 16369462f..ad514bdb7 100644 --- a/ojp-datasource-dbcp/pom.xml +++ b/ojp-datasource-dbcp/pom.xml @@ -6,13 +6,13 @@ OJP Datasource DBCP ojp-datasource-dbcp - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT Apache Commons DBCP Connection Pool Provider for OJP org.openjproxy ojp-parent - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT ../pom.xml @@ -26,7 +26,7 @@ org.openjproxy ojp-datasource-api - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT diff --git a/ojp-datasource-hikari/pom.xml b/ojp-datasource-hikari/pom.xml index 9365020c4..16bdcfec2 100644 --- a/ojp-datasource-hikari/pom.xml +++ b/ojp-datasource-hikari/pom.xml @@ -6,13 +6,13 @@ OJP Datasource HikariCP ojp-datasource-hikari - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT HikariCP Connection Pool Provider for OJP (Default Provider) org.openjproxy ojp-parent - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT ../pom.xml @@ -26,7 +26,7 @@ org.openjproxy ojp-datasource-api - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT diff --git a/ojp-grpc-commons/pom.xml b/ojp-grpc-commons/pom.xml index f89c6ad07..83e9b02d3 100644 --- a/ojp-grpc-commons/pom.xml +++ b/ojp-grpc-commons/pom.xml @@ -6,12 +6,12 @@ OJP Commons ojp-grpc-commons - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT org.openjproxy ojp-parent - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT ../pom.xml diff --git a/ojp-grpc-commons/src/main/java/org/openjproxy/constants/CommonConstants.java b/ojp-grpc-commons/src/main/java/org/openjproxy/constants/CommonConstants.java index 0e66b71fa..f64f8decd 100644 --- a/ojp-grpc-commons/src/main/java/org/openjproxy/constants/CommonConstants.java +++ b/ojp-grpc-commons/src/main/java/org/openjproxy/constants/CommonConstants.java @@ -52,6 +52,8 @@ public class CommonConstants { public static final String MULTINODE_RETRY_ATTEMPTS_PROPERTY = "ojp.multinode.retryAttempts"; public static final String MULTINODE_RETRY_DELAY_PROPERTY = "ojp.multinode.retryDelayMs"; public static final String JDBC_CLOSE_SYNC_PROPERTY = "ojp.jdbc.connection.close.synchronous"; + public static final String JDBC_CLIENT_THROTTLE_MODE_PROPERTY = "ojp.jdbc.clientThrottle.mode"; + public static final String DEFAULT_JDBC_CLIENT_THROTTLE_MODE = "combined"; // Transaction isolation configuration property key public static final String DEFAULT_TRANSACTION_ISOLATION_PROPERTY = "ojp.connection.pool.defaultTransactionIsolation"; diff --git a/ojp-grpc-commons/src/main/proto/StatementService.proto b/ojp-grpc-commons/src/main/proto/StatementService.proto index 7310588de..b225b1e8f 100644 --- a/ojp-grpc-commons/src/main/proto/StatementService.proto +++ b/ojp-grpc-commons/src/main/proto/StatementService.proto @@ -194,6 +194,9 @@ message SessionInfo { bool isXA = 6; // Flag indicating this is an XA session string targetServer = 7; // Server endpoint (host:port) for session stickiness binding string clusterHealth = 8; // Cluster health status: "host1:port1(UP);host2:port2(DOWN);..." + int32 clientCount = 9; + int32 maxAdmission = 10; + int32 observedPeak = 11; } enum ResultType { diff --git a/ojp-jdbc-driver/pom.xml b/ojp-jdbc-driver/pom.xml index 1a891648f..ccce0568f 100644 --- a/ojp-jdbc-driver/pom.xml +++ b/ojp-jdbc-driver/pom.xml @@ -6,12 +6,12 @@ OJP JDBC Driver ojp-jdbc-driver - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT org.openjproxy ojp-parent - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT ../pom.xml @@ -25,7 +25,7 @@ org.openjproxy ojp-grpc-commons - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT @@ -133,7 +133,7 @@ org.openjproxy ojp-testcontainers - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT test diff --git a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/ClientThrottleManager.java b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/ClientThrottleManager.java new file mode 100644 index 000000000..23907db4a --- /dev/null +++ b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/ClientThrottleManager.java @@ -0,0 +1,209 @@ +package org.openjproxy.jdbc; + +import com.openjproxy.grpc.SessionInfo; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Per-connHash, per-node client-side throttle. + * Prevents this JVM from sending more concurrent requests to one OJP node than its fair share. + * + * Mechanism: fail-fast AtomicInteger counter (no blocking, no semaphore). + * Limit updates are a single volatile write; AIMD increase capped at +1 per SessionInfo update. + */ +@Slf4j +public class ClientThrottleManager { + + // 10% safety headroom applied after ceiling division. + // Ceiling division can slightly over-allocate: e.g. 20 server slots / 3 clients + // → ceil(20/3) = 7 per client; 3 × 7 = 21 would exceed real capacity by 1. + // Multiplying by 0.9 brings the per-client budget down by one slot (floor: 6), + // absorbing one stale clientCount value before all clients burst at the same moment. + private static final double THROTTLE_SAFETY_MARGIN = 0.9; + + private final AtomicInteger inFlight = new AtomicInteger(0); + private volatile int proactiveLimit = Integer.MAX_VALUE; + private volatile int reactiveLimit = Integer.MAX_VALUE; + private volatile int lastProactiveLimit = Integer.MAX_VALUE; + private volatile int lastReactiveLimit = Integer.MAX_VALUE; + + /** + * Update limits from a fresh SessionInfo. + * AIMD: decrease immediately; increase capped at currentLimit + 1. + * + *

Only connect responses carry throttle data (maxAdmission > 0). + * executeUpdate/executeQuery responses use a minimal SessionInfo without throttle fields + * (maxAdmission = 0). When maxAdmission is zero, there is nothing to update, so this + * method returns immediately, preserving any reactive limit already set by + * {@link #notifyServerOverload()}.

+ */ + public void updateFromSessionInfo(SessionInfo sessionInfo) { + if (sessionInfo.getMaxAdmission() <= 0) { + // No throttle data in this SessionInfo (e.g., executeUpdate/executeQuery response). + // Skip to preserve any reactive limit set by notifyServerOverload(). + return; + } + int clientCount = Math.max(1, sessionInfo.getClientCount()); + int maxAdmission = sessionInfo.getMaxAdmission(); + int observedPeak = sessionInfo.getObservedPeak(); + + int numOjpServers = countUpServers(sessionInfo.getClusterHealth()); + + if (maxAdmission > 0) { + int rawProactive = (int) Math.min(Integer.MAX_VALUE, + (long) Math.ceil((double) maxAdmission / clientCount) * numOjpServers); + int newProactive = (int) (rawProactive * THROTTLE_SAFETY_MARGIN); + if (newProactive < 1) { + newProactive = 1; + } + if (newProactive < lastProactiveLimit) { + proactiveLimit = newProactive; + } else if (newProactive > lastProactiveLimit) { + proactiveLimit = lastProactiveLimit + 1; + } + lastProactiveLimit = proactiveLimit; + } + + if (observedPeak > 0 && maxAdmission > 0) { + int rawReactive = (int) Math.min(Integer.MAX_VALUE, + (long) Math.ceil((double) observedPeak / clientCount) * numOjpServers); + int newReactive = (int) (rawReactive * THROTTLE_SAFETY_MARGIN); + if (newReactive < 1) { + newReactive = 1; + } + if (newReactive < lastReactiveLimit) { + reactiveLimit = newReactive; + } else if (newReactive > lastReactiveLimit) { + reactiveLimit = lastReactiveLimit + 1; + } + lastReactiveLimit = reactiveLimit; + } else { + reactiveLimit = Integer.MAX_VALUE; + lastReactiveLimit = Integer.MAX_VALUE; + } + + log.debug("ClientThrottleManager updated: proactiveLimit={}, reactiveLimit={}, clientCount={}, maxAdmission={}, observedPeak={}, numServers={}", + proactiveLimit, reactiveLimit, clientCount, maxAdmission, observedPeak, numOjpServers); + } + + /** + * Count UP servers from clusterHealth string "host1:port1(UP);host2:port2(DOWN);..." + * Returns 1 if clusterHealth is empty/null. + */ + private int countUpServers(String clusterHealth) { + if (clusterHealth == null || clusterHealth.isEmpty()) { + return 1; + } + int count = 0; + for (String entry : clusterHealth.split(";")) { + if (entry.contains("(UP)")) { + count++; + } + } + return count > 0 ? count : 1; + } + + /** + * Attempt to acquire a throttle slot. + * + * @param mode the configured throttle mode + * @param inTransaction whether this connection is currently in a transaction (autoCommit=false) + * @return true if the request should proceed, false if it should be rejected + */ + public boolean tryAcquire(ClientThrottleMode mode, boolean inTransaction) { + if (mode == ClientThrottleMode.OFF) { + return true; + } + if (inTransaction) { + return true; + } + + int effectiveLimit = getEffectiveLimit(mode); + if (effectiveLimit == Integer.MAX_VALUE) { + return true; + } + + int current = inFlight.get(); + if (current >= effectiveLimit) { + log.debug("Client throttle rejected: inFlight={}, effectiveLimit={}, mode={}", current, effectiveLimit, mode); + return false; + } + // CAS loop: atomically check-and-increment to avoid exceeding the limit due to races + while (true) { + int cur = inFlight.get(); + if (cur >= effectiveLimit) { + log.debug("Client throttle rejected (CAS): inFlight={}, effectiveLimit={}, mode={}", cur, effectiveLimit, mode); + return false; + } + if (inFlight.compareAndSet(cur, cur + 1)) { + return true; + } + } + } + + /** + * Release a previously acquired slot. Must be called after tryAcquire returned true. + */ + public void release(ClientThrottleMode mode, boolean inTransaction) { + if (mode == ClientThrottleMode.OFF || inTransaction) { + return; + } + // Atomically clamp to 0 to handle any race in concurrent releases + inFlight.updateAndGet(v -> Math.max(0, v - 1)); + } + + private int getEffectiveLimit(ClientThrottleMode mode) { + switch (mode) { + case PROACTIVE: return proactiveLimit; + case REACTIVE: return reactiveLimit; + case COMBINED: + int pl = proactiveLimit; + int rl = reactiveLimit; + return Math.min(pl, rl); + default: return Integer.MAX_VALUE; + } + } + + public int getInFlight() { + return inFlight.get(); + } + + /** + * Called when the server rejects a request with RESOURCE_EXHAUSTED (slot admission timeout). + * Applies a multiplicative decrease to the reactive limit (AIMD: halve on overload). + * + *

Example: reactiveLimit = 8 → notifyServerOverload() → reactiveLimit = 4. + * The next request will be blocked client-side instead of hitting the still-overloaded server. + * If the reactive limit was uninitialised (MAX_VALUE), it is seeded from half the proactive + * limit so the client immediately backs off to a reasonable level.

+ * + *

Thread safety: reads and writes to {@code reactiveLimit} and {@code lastReactiveLimit} + * are individually atomic (volatile), but the read-compute-write sequence is not atomic. + * This is intentional: concurrent calls both halve the limit, producing a more aggressive + * decrease that is desirable when multiple threads observe the same overload. This matches + * the eventually-consistent AIMD design used throughout this class.

+ */ + public void notifyServerOverload() { + int current = reactiveLimit; + int newLimit; + if (current == Integer.MAX_VALUE) { + // Reactive limit was uninitialised — seed from half the proactive limit as a starting point. + int pl = proactiveLimit; + newLimit = pl == Integer.MAX_VALUE ? 1 : Math.max(1, pl / 2); + } else { + newLimit = Math.max(1, current / 2); + } + reactiveLimit = newLimit; + lastReactiveLimit = newLimit; + log.debug("ClientThrottleManager notifyServerOverload: reactiveLimit {} -> {}", current, newLimit); + } + + public int getProactiveLimit() { + return proactiveLimit; + } + + public int getReactiveLimit() { + return reactiveLimit; + } +} diff --git a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/ClientThrottleMode.java b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/ClientThrottleMode.java new file mode 100644 index 000000000..6d5db6eea --- /dev/null +++ b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/ClientThrottleMode.java @@ -0,0 +1,17 @@ +package org.openjproxy.jdbc; + +public enum ClientThrottleMode { + OFF, PROACTIVE, REACTIVE, COMBINED; + + public static ClientThrottleMode fromString(String value) { + if (value == null) { + return COMBINED; + } + switch (value.trim().toUpperCase()) { + case "OFF": return OFF; + case "PROACTIVE": return PROACTIVE; + case "REACTIVE": return REACTIVE; + default: return COMBINED; + } + } +} diff --git a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Connection.java b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Connection.java index f84a767dd..406b6e47d 100644 --- a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Connection.java +++ b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Connection.java @@ -10,7 +10,6 @@ import com.openjproxy.grpc.TargetCall; import com.openjproxy.grpc.TransactionStatus; import lombok.Getter; -import lombok.Setter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.openjproxy.constants.CommonConstants; @@ -29,6 +28,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -50,8 +50,9 @@ public class Connection implements java.sql.Connection { return thread; }); + private static final ConcurrentHashMap THROTTLE_MANAGERS = new ConcurrentHashMap<>(); + @Getter - @Setter private SessionInfo session; private final StatementService statementService; @Getter @@ -60,6 +61,7 @@ public class Connection implements java.sql.Connection { private boolean readOnly = false; private boolean closed; private final boolean closeSynchronously; + private final ClientThrottleMode throttleMode; // For server recovery and connection redistribution private volatile boolean forceInvalid = false; @@ -69,11 +71,41 @@ public Connection(SessionInfo session, StatementService statementService, DbName } public Connection(SessionInfo session, StatementService statementService, DbName dbName, boolean closeSynchronously) { + this(session, statementService, dbName, closeSynchronously, ClientThrottleMode.COMBINED); + } + + public Connection(SessionInfo session, StatementService statementService, DbName dbName, + boolean closeSynchronously, ClientThrottleMode throttleMode) { this.session = session; this.statementService = statementService; this.closed = false; this.dbName = dbName; this.closeSynchronously = closeSynchronously; + this.throttleMode = throttleMode; + updateThrottleLimits(session); + } + + public void setSession(SessionInfo session) { + this.session = session; + updateThrottleLimits(session); + } + + private void updateThrottleLimits(SessionInfo info) { + if (info != null && !info.getConnHash().isEmpty()) { + THROTTLE_MANAGERS.computeIfAbsent(info.getConnHash(), k -> new ClientThrottleManager()) + .updateFromSessionInfo(info); + } + } + + ClientThrottleManager getThrottleManager() { + if (session != null && !session.getConnHash().isEmpty()) { + return THROTTLE_MANAGERS.computeIfAbsent(session.getConnHash(), k -> new ClientThrottleManager()); + } + return null; + } + + ClientThrottleMode getThrottleMode() { + return throttleMode; } /** diff --git a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Driver.java b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Driver.java index 29ac02ef6..330187c13 100644 --- a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Driver.java +++ b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Driver.java @@ -126,8 +126,14 @@ public java.sql.Connection connect(String url, Properties info) throws SQLExcept CommonConstants.JDBC_CLOSE_SYNC_PROPERTY, String.valueOf(CommonConstants.DEFAULT_JDBC_CLOSE_SYNCHRONOUS)) : String.valueOf(CommonConstants.DEFAULT_JDBC_CLOSE_SYNCHRONOUS)); + ClientThrottleMode throttleMode = ClientThrottleMode.fromString( + ojpProperties != null + ? ojpProperties.getProperty(CommonConstants.JDBC_CLIENT_THROTTLE_MODE_PROPERTY, + CommonConstants.DEFAULT_JDBC_CLIENT_THROTTLE_MODE) + : CommonConstants.DEFAULT_JDBC_CLIENT_THROTTLE_MODE); log.debug("Returning new Connection with sessionInfo: {}", sessionInfo); - return new Connection(sessionInfo, statementService, DatabaseUtils.resolveDbName(cleanUrl), closeSynchronously); + return new Connection(sessionInfo, statementService, DatabaseUtils.resolveDbName(cleanUrl), + closeSynchronously, throttleMode); } diff --git a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/PreparedStatement.java b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/PreparedStatement.java index d8de4abca..6c12141ba 100644 --- a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/PreparedStatement.java +++ b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/PreparedStatement.java @@ -9,6 +9,7 @@ import com.openjproxy.grpc.ResourceType; import com.openjproxy.grpc.ResultType; import com.openjproxy.grpc.TargetCall; +import io.grpc.StatusRuntimeException; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -110,9 +111,21 @@ public ResultSet executeQuery() throws SQLException { log.debug("executeQuery called"); this.checkClosed(); log.info("Executing query for -> {}", this.sql); - Iterator itOpResult = this.statementService - .executeQuery(this.connection.getSession(), this.sql, new ArrayList<>(this.paramsMap.values()), this.properties); - return new ResultSet(itOpResult, this.statementService, this); + ClientThrottleManager throttle = this.connection.getThrottleManager(); + ClientThrottleMode mode = this.connection.getThrottleMode(); + boolean inTransaction = !this.connection.getAutoCommit(); + boolean acquired = acquireThrottle(throttle, mode, inTransaction); + try { + Iterator itOpResult = this.statementService + .executeQuery(this.connection.getSession(), this.sql, new ArrayList<>(this.paramsMap.values()), this.properties); + return new ResultSet(itOpResult, this.statementService, this); + } catch (StatusRuntimeException sre) { + throw onServerOverload(throttle, mode, sre); + } finally { + if (acquired) { + throttle.release(mode, inTransaction); + } + } } @Override @@ -120,13 +133,25 @@ public int executeUpdate() throws SQLException { log.debug("executeUpdate called"); this.checkClosed(); log.info("Executing update for -> {}", this.sql); - OpResult result = this.statementService.executeUpdate(this.connection.getSession(), this.sql, - new ArrayList<>(this.paramsMap.values()), this.getStatementUUID(), this.properties); - this.connection.setSession(result.getSession()); - if (StringUtils.isNotBlank(result.getUuid())) { - this.setStatementUUID(result.getUuid()); + ClientThrottleManager throttle = this.connection.getThrottleManager(); + ClientThrottleMode mode = this.connection.getThrottleMode(); + boolean inTransaction = !this.connection.getAutoCommit(); + boolean acquired = acquireThrottle(throttle, mode, inTransaction); + try { + OpResult result = this.statementService.executeUpdate(this.connection.getSession(), this.sql, + new ArrayList<>(this.paramsMap.values()), this.getStatementUUID(), this.properties); + this.connection.setSession(result.getSession()); + if (StringUtils.isNotBlank(result.getUuid())) { + this.setStatementUUID(result.getUuid()); + } + return result.getIntValue(); + } catch (StatusRuntimeException sre) { + throw onServerOverload(throttle, mode, sre); + } finally { + if (acquired) { + throttle.release(mode, inTransaction); + } } - return result.getIntValue(); } @Override diff --git a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/ResultSet.java b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/ResultSet.java index 3adf92a62..c8ade6dce 100644 --- a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/ResultSet.java +++ b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/ResultSet.java @@ -3,6 +3,7 @@ import com.openjproxy.grpc.LobReference; import com.openjproxy.grpc.LobType; import com.openjproxy.grpc.OpResult; +import io.grpc.Status; import io.grpc.StatusRuntimeException; import lombok.Getter; import lombok.SneakyThrows; @@ -80,7 +81,7 @@ public ResultSet(Iterator itOpResult, StatementService statementServic labelsMap.put(labels.get(i).toUpperCase(), i); } } catch (StatusRuntimeException e) { - throw handle(e); + throw handle(onServerOverload(e)); } } @@ -102,19 +103,48 @@ public boolean next() throws SQLException { this.getResultSetUUID(), 1)); this.setNextOpResult(result); } catch (StatusRuntimeException e) { - throw handle(e); + throw handle(onServerOverload(e)); } } if (!this.inRowByRowMode && blockIdx.get() >= currentDataBlock.size() && itResults.hasNext()) { try { this.setNextOpResult(this.nextWithSessionUpdate(itResults.next())); } catch (StatusRuntimeException e) { - throw handle(e); + throw handle(onServerOverload(e)); } } return blockIdx.get() < currentDataBlock.size(); } + private StatusRuntimeException onServerOverload(StatusRuntimeException statusRuntimeException) { + if (statusRuntimeException.getStatus().getCode() != Status.Code.RESOURCE_EXHAUSTED) { + return statusRuntimeException; + } + + try { + if (this.statement == null) { + return statusRuntimeException; + } + java.sql.Connection sqlConnection = this.statement.getConnection(); + if (!(sqlConnection instanceof Connection)) { + return statusRuntimeException; + } + Connection connection = (Connection) sqlConnection; + ClientThrottleMode mode = connection.getThrottleMode(); + if (mode == ClientThrottleMode.OFF) { + return statusRuntimeException; + } + + ClientThrottleManager throttle = connection.getThrottleManager(); + if (throttle != null) { + throttle.notifyServerOverload(); + } + } catch (SQLException sqlException) { + log.debug("Unable to apply client overload backoff from ResultSet path", sqlException); + } + return statusRuntimeException; + } + private void setNextOpResult(OpResult result) { OpQueryResult opQueryResult = ProtoConverter.fromProto(result.getQueryResult()); // Accumulate the row count of the outgoing block before replacing it, diff --git a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Statement.java b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Statement.java index 847bc87bc..ef9891e65 100644 --- a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Statement.java +++ b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Statement.java @@ -7,6 +7,8 @@ import com.openjproxy.grpc.ParameterValue; import com.openjproxy.grpc.ResourceType; import com.openjproxy.grpc.TargetCall; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -63,23 +65,84 @@ protected void checkClosed() throws SQLException { } } + /** + * Attempts to acquire a throttle slot before executing a statement. + * Returns true if a slot was acquired (caller must call releaseThrottle after the work), + * or false if throttling is disabled. + * Throws SQLTransientException immediately if the limit is reached. + */ + protected boolean acquireThrottle(ClientThrottleManager throttle, ClientThrottleMode mode, + boolean inTransaction) throws SQLException { + if (throttle == null) { + return false; + } + if (!throttle.tryAcquire(mode, inTransaction)) { + throw new java.sql.SQLTransientException( + "Client throttle limit reached; request rejected to avoid overloading the database"); + } + return true; + } + + /** + * If the exception is a RESOURCE_EXHAUSTED status from the server, notifies the throttle + * manager to halve its reactive limit (AIMD multiplicative decrease) so that the next + * request is rejected client-side instead of hitting the still-overloaded server. + * The original exception is always returned to the caller for rethrowing. + */ + protected StatusRuntimeException onServerOverload(ClientThrottleManager throttle, ClientThrottleMode mode, + StatusRuntimeException sre) { + if (throttle != null && mode != ClientThrottleMode.OFF + && sre.getStatus().getCode() == Status.Code.RESOURCE_EXHAUSTED) { + throttle.notifyServerOverload(); + } + return sre; + } + @Override public ResultSet executeQuery(String sql) throws SQLException { log.debug("executeQuery: {}", sql); checkClosed(); - Iterator itResults = this.statementService.executeQuery(this.connection.getSession(), sql, - EMPTY_PARAMETERS_LIST, this.statementUUID, this.properties); - return new ResultSet(itResults, this.statementService, this); + ClientThrottleManager throttle = this.connection.getThrottleManager(); + ClientThrottleMode mode = this.connection.getThrottleMode(); + // getAutoCommit() may throw SQLException; evaluate before acquiring a slot + // so that release() is never called without a matching acquire. + boolean inTransaction = !this.connection.getAutoCommit(); + boolean acquired = acquireThrottle(throttle, mode, inTransaction); + try { + Iterator itResults = this.statementService.executeQuery(this.connection.getSession(), sql, + EMPTY_PARAMETERS_LIST, this.statementUUID, this.properties); + return new ResultSet(itResults, this.statementService, this); + } catch (StatusRuntimeException sre) { + throw onServerOverload(throttle, mode, sre); + } finally { + if (acquired) { + throttle.release(mode, inTransaction); + } + } } @Override public int executeUpdate(String sql) throws SQLException { log.debug("executeUpdate: {}", sql); checkClosed(); - OpResult result = this.statementService.executeUpdate(this.connection.getSession(), sql, EMPTY_PARAMETERS_LIST, - this.statementUUID, this.properties); - this.connection.setSession(result.getSession());//TODO see if can do this in one place instead of updating session everywhere - return result.getIntValue(); + ClientThrottleManager throttle = this.connection.getThrottleManager(); + ClientThrottleMode mode = this.connection.getThrottleMode(); + // getAutoCommit() may throw SQLException; evaluate before acquiring a slot + // so that release() is never called without a matching acquire. + boolean inTransaction = !this.connection.getAutoCommit(); + boolean acquired = acquireThrottle(throttle, mode, inTransaction); + try { + OpResult result = this.statementService.executeUpdate(this.connection.getSession(), sql, EMPTY_PARAMETERS_LIST, + this.statementUUID, this.properties); + this.connection.setSession(result.getSession()); + return result.getIntValue(); + } catch (StatusRuntimeException sre) { + throw onServerOverload(throttle, mode, sre); + } finally { + if (acquired) { + throttle.release(mode, inTransaction); + } + } } @Override diff --git a/ojp-server/pom.xml b/ojp-server/pom.xml index 4c2926b82..11c678c13 100644 --- a/ojp-server/pom.xml +++ b/ojp-server/pom.xml @@ -6,12 +6,12 @@ OJP Server ojp-server - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT org.openjproxy ojp-parent - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT ../pom.xml @@ -39,26 +39,26 @@ org.openjproxy ojp-grpc-commons - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT org.openjproxy ojp-datasource-api - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT org.openjproxy ojp-datasource-hikari - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT org.openjproxy ojp-xa-pool-commons - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManager.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManager.java index 121c79c40..495377106 100644 --- a/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManager.java +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManager.java @@ -16,6 +16,8 @@ */ public interface SessionManager { void registerClientUUID(String connectionHash, String clientUUID); + void deregisterClientUUID(String connectionHash, String clientUUID); + int getClientCount(String connectionHash); SessionInfo createSession(String clientUUID, Connection connection); SessionInfo createXASession(String clientUUID, Connection connection, XAConnection xaConnection); SessionInfo createDeferredXASession(String clientUUID, String connectionHash); diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManagerImpl.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManagerImpl.java index 8173f36ce..1b2201816 100644 --- a/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManagerImpl.java +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManagerImpl.java @@ -18,12 +18,14 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; @Slf4j public class SessionManagerImpl implements SessionManager { private Map connectionHashMap = new ConcurrentHashMap<>(); private Map sessionMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> clientRefCounts = new ConcurrentHashMap<>(); private final Map cacheConfigurationMap; /** @@ -47,6 +49,38 @@ public SessionManagerImpl(Map cacheConfigurationMap) public void registerClientUUID(String connectionHash, String clientUUID) { log.debug("Registering client uuid {}", clientUUID); this.connectionHashMap.put(clientUUID, connectionHash); + clientRefCounts.computeIfAbsent(connectionHash, k -> new ConcurrentHashMap<>()) + .compute(clientUUID, (k, v) -> { + if (v == null) { + return new AtomicInteger(1); + } + v.incrementAndGet(); + return v; + }); + } + + @Override + public void deregisterClientUUID(String connectionHash, String clientUUID) { + ConcurrentHashMap perConnHash = clientRefCounts.get(connectionHash); + if (perConnHash == null) { + return; + } + perConnHash.compute(clientUUID, (k, v) -> { + if (v == null) { + return null; + } + return v.decrementAndGet() <= 0 ? null : v; + }); + if (perConnHash.isEmpty()) { + clientRefCounts.remove(connectionHash, perConnHash); + } + } + + @Override + public int getClientCount(String connectionHash) { + ConcurrentHashMap perConnHash = clientRefCounts.get(connectionHash); + // Return 1 as default: assume at least one client for fair-share calculation + return (perConnHash == null || perConnHash.isEmpty()) ? 1 : perConnHash.size(); } @Override @@ -198,6 +232,10 @@ public void terminateSession(SessionInfo sessionInfo) throws SQLException { return; } + if (!sessionInfo.getClientUUID().isEmpty()) { + deregisterClientUUID(sessionInfo.getConnHash(), sessionInfo.getClientUUID()); + } + if (TransactionStatus.TRX_ACTIVE.equals(sessionInfo.getTransactionInfo().getTransactionStatus())) { if (!targetSession.getConnection().getAutoCommit()) { log.debug("Rolling back active transaction"); diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/SlotManager.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/SlotManager.java index 5cddb553e..04d8ba2f7 100644 --- a/ojp-server/src/main/java/org/openjproxy/grpc/server/SlotManager.java +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/SlotManager.java @@ -18,6 +18,11 @@ @Slf4j public class SlotManager { + // Fraction of totalSlots used as a lower bound for observedPeak on admission timeout. + private static final double MIN_OBSERVED_PEAK_RATIO = 0.1; + // AIMD: additive-increase period = totalSlots * this multiplier releases. + private static final int AIMD_RECOVERY_PERIOD_MULTIPLIER = 2; + private final int totalSlots; private final int slowSlots; private final int fastSlots; @@ -40,6 +45,10 @@ public class SlotManager { private final AtomicLong lastSlowActivity = new AtomicLong(0); private final AtomicLong lastFastActivity = new AtomicLong(0); + // AIMD peak tracking + private final AtomicInteger observedPeak = new AtomicInteger(0); + private final AtomicLong releaseCount = new AtomicLong(0); + // Configuration private final AtomicBoolean enabled = new AtomicBoolean(true); @@ -139,6 +148,7 @@ public boolean acquireSlowSlot(long timeoutMs) throws InterruptedException { } log.debug("Failed to acquire slow slot within {}ms timeout", timeoutMs); + recordAdmissionTimeout(); return false; } @@ -188,6 +198,7 @@ public boolean acquireFastSlot(long timeoutMs) throws InterruptedException { } log.debug("Failed to acquire fast slot within {}ms timeout", timeoutMs); + recordAdmissionTimeout(); return false; } @@ -213,6 +224,7 @@ public void releaseSlowSlot() { slowOperationSemaphore.release(); log.debug("Released slow slot back to slow pool. Active slow: {}", activeSlowOperations.get()); } + tickAimdRecovery(); } /** @@ -237,6 +249,7 @@ public void releaseFastSlot() { fastOperationSemaphore.release(); log.debug("Released fast slot back to fast pool. Active fast: {}", activeFastOperations.get()); } + tickAimdRecovery(); } /** @@ -281,6 +294,24 @@ private boolean canWaitForSlot(Semaphore semaphore) { return semaphore.getQueueLength() < maxWaitQueueDepth; } + private void recordAdmissionTimeout() { + int currentActive = activeFastOperations.get() + activeSlowOperations.get(); + int floor = Math.max(1, (int) (totalSlots * MIN_OBSERVED_PEAK_RATIO)); + // Multiplicative decrease: clamp peak down to currentActive (observed saturation point). + // When cur == 0 (no timeout seen yet), use totalSlots as the initial upper bound so that + // Math.min always resolves to currentActive on the first call. + observedPeak.updateAndGet(cur -> Math.max(floor, Math.min(cur == 0 ? totalSlots : cur, currentActive))); + } + + private void tickAimdRecovery() { + long count = releaseCount.incrementAndGet(); + long period = Math.max(1L, (long) totalSlots * AIMD_RECOVERY_PERIOD_MULTIPLIER); + // Additive increase: nudge the peak up by 1 every `period` releases once saturation eases. + if (count % period == 0) { + observedPeak.updateAndGet(cur -> (cur > 0 && cur < totalSlots) ? cur + 1 : cur); + } + } + /** * Gets the current status of the slot manager. * @@ -328,4 +359,19 @@ public boolean isEnabled() { public int getFastSlotsBorrowedToSlow() { return fastSlotsBorrowedToSlow.get(); } public long getIdleTimeoutMs() { return idleTimeoutMs; } public int getMaxWaitQueueDepth() { return maxWaitQueueDepth; } + public int getObservedPeak() { return observedPeak.get(); } + + /** + * Returns the admission slot count to advertise to JDBC clients for throttle budget + * calculations. + * + *

When SQS is active ({@code slowSlots > 0}) fast queries compete only for the + * fast-lane capacity, so the value returned is {@link #getFastSlots()}. Without SQS + * all slots serve fast queries, so this equals {@link #getTotalSlots()}.

+ * + * @return the effective slot count for client-side admission throttling + */ + public int getEffectiveMaxAdmission() { + return slowSlots > 0 ? fastSlots : totalSlots; + } } diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/action/connection/ConnectAction.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/action/connection/ConnectAction.java index ec570c52e..57428accd 100644 --- a/ojp-server/src/main/java/org/openjproxy/grpc/server/action/connection/ConnectAction.java +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/action/connection/ConnectAction.java @@ -12,6 +12,7 @@ import org.openjproxy.constants.CommonConstants; import org.openjproxy.grpc.server.MultinodePoolCoordinator; import org.openjproxy.grpc.server.MultinodeXaCoordinator; +import org.openjproxy.grpc.server.AdmissionControlManager; import org.openjproxy.grpc.server.UnpooledConnectionDetails; import org.openjproxy.grpc.server.action.Action; import org.openjproxy.grpc.server.action.ActionContext; @@ -318,10 +319,22 @@ private void handleRegularConnection(ActionContext context, ConnectionDetails co // For regular connections, just return session info without creating a session yet (lazy allocation) // Server does not populate targetServer - client will set it on future requests + int clientCount = context.getSessionManager().getClientCount(connHash); + int maxAdmission = 0; + int observedPeakValue = 0; + AdmissionControlManager acm = context.getAdmissionControlManagers().get(connHash); + if (acm != null && acm.isEnabled() && acm.getSlotManager() != null) { + maxAdmission = acm.getSlotManager().getEffectiveMaxAdmission(); + observedPeakValue = acm.getSlotManager().getObservedPeak(); + } + SessionInfo sessionInfo = SessionInfo.newBuilder() .setConnHash(connHash) .setClientUUID(connectionDetails.getClientUUID()) .setIsXA(false) + .setClientCount(clientCount) + .setMaxAdmission(maxAdmission) + .setObservedPeak(observedPeakValue) .build(); responseObserver.onNext(sessionInfo); diff --git a/ojp-testcontainers/pom.xml b/ojp-testcontainers/pom.xml index ddb8e6a54..60dfd8068 100644 --- a/ojp-testcontainers/pom.xml +++ b/ojp-testcontainers/pom.xml @@ -6,13 +6,13 @@ OJP TestContainers ojp-testcontainers - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT TestContainers support for OJP Server integration testing org.openjproxy ojp-parent - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT ../pom.xml diff --git a/ojp-xa-pool-commons/pom.xml b/ojp-xa-pool-commons/pom.xml index 6dc401e3d..89cf2e281 100644 --- a/ojp-xa-pool-commons/pom.xml +++ b/ojp-xa-pool-commons/pom.xml @@ -6,13 +6,13 @@ OJP XA Pool Commons ojp-xa-pool-commons - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT Apache Commons Pool 2 XA Connection Pool Provider for OJP org.openjproxy ojp-parent - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT ../pom.xml diff --git a/pom.xml b/pom.xml index 89dc330f8..055e71aa4 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.openjproxy ojp-parent - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT pom OJP Parent diff --git a/spring-boot-starter-ojp/pom.xml b/spring-boot-starter-ojp/pom.xml index 06dd5fcb7..d7e3ec104 100644 --- a/spring-boot-starter-ojp/pom.xml +++ b/spring-boot-starter-ojp/pom.xml @@ -6,13 +6,13 @@ OJP Spring Boot Starter spring-boot-starter-ojp - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT Spring Boot Auto-configuration for OJP (Open J Proxy) org.openjproxy ojp-parent - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT ../pom.xml @@ -28,7 +28,7 @@ org.openjproxy ojp-jdbc-driver - 0.4.17-SNAPSHOT + 0.4.18-SNAPSHOT