Refactor load balancer protocol to delegate worker selection instead of dispatching tasks — Closes #155#156
Open
conradbzura wants to merge 4 commits into
Open
Refactor load balancer protocol to delegate worker selection instead of dispatching tasks — Closes #155#156conradbzura wants to merge 4 commits into
conradbzura wants to merge 4 commits into
Conversation
7b4937b to
1144ba2
Compare
1144ba2 to
0439c22
Compare
Split the LoadBalancerLike protocol into two: the new
DelegatingLoadBalancerLike, which only selects worker candidates, and
the deprecated DispatchingLoadBalancerLike, which retains the old
responsibility of owning task dispatch. LoadBalancerLike becomes a
transitional union alias that narrows to the delegating protocol in
the next major release.
A delegating load balancer implements delegate(*, context), an async
generator that yields (WorkerMetadata, WorkerConnection) candidates
and receives dispatch outcomes from the proxy:
- anext() requests the next candidate
- athrow(exc) reports a failure; the proxy evicts the worker from
the context before throwing non-transient errors so the balancer
observes the capacity change
- asend(metadata) reports a success; the generator MUST terminate
after this signal — yielding another candidate is a protocol
violation and surfaces as a RuntimeError from WorkerProxy
The context passed to delegate is typed as the new read-only
LoadBalancerContextView protocol, enforcing at the type level that
load balancers cannot mutate pool membership. Eviction lives in the
proxy exclusively.
WorkerProxy.dispatch now branches on the balancer's protocol and, for
delegating balancers, owns the full dispatch-retry-evict loop via
_delegate_dispatch. The loop catches Exception (not BaseException)
for non-transient errors so CancelledError, KeyboardInterrupt, and
SystemExit propagate without eviction or retry — cancellation is
caller intent, not a worker health signal. A stream-ownership
handoff via a sentinel variable ensures the gRPC call is released if
the proxy unwinds between connection.dispatch returning and the
stream being handed back to the caller. A DeprecationWarning is
emitted once at start() when a DispatchingLoadBalancerLike is passed.
RoundRobinLoadBalancer reduces to a pure cycling generator: no
dispatch, no error classification, no eviction. The checkpoint-by-UID
termination logic and pickle-safe __reduce__ are preserved.
The load balancer README is rewritten around the delegate protocol,
documents the asend-is-terminal contract, and calls out the
deprecation.
Protocol conformance tests in test_base.py now cover all three protocols — LoadBalancerContextView, DispatchingLoadBalancerLike, DelegatingLoadBalancerLike — plus the LoadBalancerLike union alias, including an edge-case test for classes that implement both protocols during migration. test_roundrobin.py is rewritten around the delegate API. The 12 tests cover empty context exhaustion, first-yield behavior, index advancement on asend and athrow, full-cycle exhaustion via checkpoint, context mutation reactivity (proxy-driven eviction between yields), asend as terminal signal, concurrent delegate drivers for fairness, Hypothesis-driven round-robin sequence verification, and non-transient error handling. The obsolete dispatch_side_effect_factory fixture is removed from loadbalancer/conftest.py. test_proxy.py adds TestWorkerProxyDelegateDispatch to cover the proxy-owned retry-evict loop. Tests exercise transient errors (skip without eviction), non-transient errors (evict before notifying the balancer), candidate exhaustion in both the initial anext and the non-transient athrow paths, empty delegate behavior, success notification via asend, the asend-is-terminal contract (verified via a malformed balancer that raises RuntimeError and closes the orphaned stream), cancellation during connection.dispatch (no eviction, no retry, CancelledError propagates) and cancellation during asend (orphaned stream is aclosed before CancelledError propagates). Two tests cover the legacy path: the deprecation warning is emitted at start(), and dispatch through a legacy DispatchingLoadBalancerLike still works. Existing dispatch tests in test_proxy.py (spy_loadbalancer_with_workers, FailingLoadBalancer, WaitingLoadBalancer, StubLoadBalancer) are updated from the legacy dispatch method to the delegate async generator. Dead worker_*_callback helpers on the spy balancer — which the production proxy never called — are removed. The mock_worker_connection fixture's dispatch stub now accepts the timeout keyword argument the proxy passes. test_public.py is updated to expect the three new public exports: DelegatingLoadBalancerLike, DispatchingLoadBalancerLike, LoadBalancerContextView.
The test guide requires tests to exercise only public APIs. Three violations introduced in the prior commit are corrected: The pickle roundtrip test for RoundRobinLoadBalancer was probing _index and _lock directly. It now drives delegate() via the public API before and after pickle, asserting that the restored instance starts cycling from position zero on the same context. TestWorkerProxyDelegateDispatch._start_proxy_with_workers was seeding workers via proxy._loadbalancer_context.add_worker, bypassing the public discovery flow. Replaced with _make_proxy_with_workers, which constructs the proxy with a ReducibleAsyncIterator discovery stream, patches WorkerConnection so the sentinel creates the intended mock connections, and waits via the public proxy.workers property. The legacy dispatch test was similarly using _loadbalancer_context directly; it now uses the same helper. The orphaned empty loadbalancer conftest.py is deleted.
The loadbalancer README example used except BaseException around the delegate yield, which would swallow GeneratorExit from aclose() and CancelledError from task cancellation. Corrected to except Exception to match the RoundRobinLoadBalancer implementation. The worker README error classification table header said "Load balancer behavior" but post-refactor this is the proxy's responsibility. Updated to "Dispatch behavior" with clearer action descriptions (skip vs evict). The discovery README referenced the deprecated size parameter in its description of pool modes. Updated to spawn.
0439c22 to
fb30a3d
Compare
conradbzura
commented
May 3, 2026
|
|
||
| The dispatch method accepts a :py:class:`Task` and returns an async | ||
| iterator that yields task results from the worker. | ||
| class DispatchingLoadBalancerLike(Protocol): |
Contributor
Author
There was a problem hiding this comment.
We can't rename this as it's not backwards compatible.
conradbzura
commented
May 3, 2026
| ) -> AsyncGenerator[ | ||
| tuple[WorkerMetadata, WorkerConnection], | ||
| WorkerMetadata | None, | ||
| ]: ... |
Contributor
Author
There was a problem hiding this comment.
Add a docstring to delegate.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Split the load balancer protocol so that balancers own only worker selection and routing policy, while
WorkerProxyowns the dispatch-retry-evict loop. IntroduceDelegatingLoadBalancerLike— a protocol whose singledelegatemethod yields(WorkerMetadata, WorkerConnection)candidates and receives dispatch outcomes from the proxy viaasend(success) andathrow(failure). The proxy callsWorkerConnection.dispatchdirectly, classifies errors as transient vs non-transient, evicts workers on non-transient failures, and enforces a contract thatasendis terminal — yielding after a success signal raisesRuntimeError.The previous
LoadBalancerLikeprotocol is renamed toDispatchingLoadBalancerLikeand deprecated with both a@typing_extensions.deprecatedannotation and a runtimeDeprecationWarningatWorkerProxy.start().LoadBalancerLikebecomes a transitional union alias that narrows toDelegatingLoadBalancerLikein the next major release.RoundRobinLoadBalancermigrates to the new protocol and reduces to a pure cycling generator with no dispatch, error-classification, or eviction code.A new read-only
LoadBalancerContextViewprotocol enforces at the type level that delegating balancers cannot mutate pool membership. The proxy passesLoadBalancerContext(which satisfies both the view and the full mutable protocol) but the balancer's type annotation only sees the read-only surface.Cancellation semantics are preserved from the pre-refactor behavior:
CancelledErrorpropagates through the proxy without eviction or retry, and a stream-ownership handoff via a sentinel variable ensures the gRPC call is released if the proxy unwinds betweenconnection.dispatchreturning and the stream being handed to the caller.Closes #155
Proposed changes
New protocols and read-only context (
loadbalancer/base.py)Add three new public types:
LoadBalancerContextView— runtime-checkable protocol exposing only theworkersproperty. Passed toDelegatingLoadBalancerLike.delegateso balancers can observe pool membership without mutating it.DelegatingLoadBalancerLike— runtime-checkable protocol with a singledelegate(*, context)async generator method. The generator supports three proxy-driven signals:anext()(next candidate),athrow(exc)(dispatch failed), andasend(metadata)(dispatch succeeded — generator must terminate).DispatchingLoadBalancerLike— the formerLoadBalancerLike, renamed and annotated with@deprecated. Retains the olddispatch(task, *, context, timeout)signature for backwards compatibility.LoadBalancerLikebecomesTypeAlias = DispatchingLoadBalancerLike | DelegatingLoadBalancerLike.LoadBalancerContextLikenow inherits fromLoadBalancerContextView, preserving backwards compatibility for the deprecated path.RoundRobinLoadBalancer migration (
loadbalancer/roundrobin.py)Replace
dispatchwithdelegate. The implementation reduces to:(metadata, connection)athrow(failure): advance to next workerasendwith non-None value (success):returnreturn→ proxy raisesNoWorkersAvailableThe
_locknow scopes only the index read/advance, not the yield — eliminating the pre-refactor risk of holding the lock across a slowconnection.dispatch.Proxy-owned dispatch loop (
worker/proxy.py)WorkerProxy.dispatchbranches on the balancer's protocol at dispatch time:DelegatingLoadBalancerLike: route through_delegate_dispatch, which owns the full retry-evict loop. The loop usesexcept Exception(notBaseException) for non-transient errors soCancelledError,KeyboardInterrupt, andSystemExitpropagate without eviction or retry. An inner try/finally with astream = Nonesentinel ensures the gRPC stream isaclosed if the success path unwinds before handoff (cancellation duringasend, orRuntimeErrorfrom the contract check).DispatchingLoadBalancerLike: fall through to the legacylb.dispatch(task, context=..., timeout=...)call. ADeprecationWarningis emitted once atstart().isinstance(lb, DelegatingLoadBalancerLike)is checked first so classes implementing both protocols during migration use the new path.Documentation updates
Update all affected READMEs and docstrings for the new protocol:
loadbalancer/README.mdaround the delegate protocol. Document the three-signal contract, theasend-is-terminal rule, and the deprecation path. Replace theLeastLoadedBalancerexample with a delegate-based sketch demonstrating in-flight tracking viaasend/athrow.WorkerProxyandWorkerPoolclass docstring examples from staledispatch-override patterns todelegate-based examples.worker/README.mderror classification table header from "Load balancer behavior" to "Dispatch behavior" reflecting the proxy's ownership of retry/eviction.discovery/README.mdreference to the deprecatedsizeparameter (nowspawn).Public exports (
wool/__init__.py)Export
DelegatingLoadBalancerLike,DispatchingLoadBalancerLike, andLoadBalancerContextView.LoadBalancerLikecontinues to be exported as the union alias.Test cases
TestLoadBalancerContextViewworkerspropertyLoadBalancerContextViewTestLoadBalancerContextViewworkerspropertyLoadBalancerContextViewTestLoadBalancerContextViewLoadBalancerContextLoadBalancerContextViewTestDispatchingLoadBalancerLikedispatchDispatchingLoadBalancerLikeTestDispatchingLoadBalancerLikedispatchDispatchingLoadBalancerLikeTestDelegatingLoadBalancerLikedelegateas async generatorDelegatingLoadBalancerLikeTestDelegatingLoadBalancerLikedelegateDelegatingLoadBalancerLikeTestLoadBalancerLikeTestLoadBalancerLikeTestLoadBalancerLikeTestLoadBalancerLikeTestRoundRobinLoadBalancerRoundRobinLoadBalancerinstanceDelegatingLoadBalancerLikeTestRoundRobinLoadBalancerTestRoundRobinLoadBalancerdelegate()driven withanextStopAsyncIterationon first callTestRoundRobinLoadBalancerdelegate()driven withanextTestRoundRobinLoadBalancerdelegate()driven againTestRoundRobinLoadBalancerathrowTestRoundRobinLoadBalancerathrowcalled repeatedlyStopAsyncIterationTestRoundRobinLoadBalancerathrowcalled after context mutationTestRoundRobinLoadBalancerasend(metadata)calledStopAsyncIterationraisedTestRoundRobinLoadBalancerdelegate()drivers each completing withasendTestRoundRobinLoadBalancerathrowwith non-transient errorTestRoundRobinLoadBalancerworkers[0..F]in orderTestWorkerProxyDelegateDispatchTransientRpcError, second succeedsdispatch()calledTestWorkerProxyDelegateDispatchException, second succeedsdispatch()calledathrow, second succeedsTestWorkerProxyDelegateDispatchdispatch()calledNoWorkersAvailableraised via initial-anext pathTestWorkerProxyDelegateDispatchdispatch()calledNoWorkersAvailableraised, worker evictedTestWorkerProxyDelegateDispatchTransientRpcError, no further candidatesdispatch()calledNoWorkersAvailableraisedTestWorkerProxyDelegateDispatchdispatch()succeedsasend(metadata)equal to the winning workerTestWorkerProxyDelegateDispatchasenddispatch()calledRuntimeErrorraised, orphaned streamaclosedTestWorkerProxyDelegateDispatchconnection.dispatchhangsCancelledErrorpropagates, worker not evicted, no retryTestWorkerProxyDelegateDispatchasendbookkeepingaclosecalled,CancelledErrorpropagatesTestWorkerProxyDelegateDispatchDispatchingLoadBalancerLikeonlyproxy.start()calledDeprecationWarningemitted referencingDelegatingLoadBalancerLikeTestWorkerProxyDelegateDispatchDispatchingLoadBalancerLikedispatch()calleddispatchmethod invoked and stream returnedtest_publicwool.__all__listDelegatingLoadBalancerLike,DispatchingLoadBalancerLike,LoadBalancerContextViewpresent