From 7b4beb0218809832c820351ce3d77bffb6aa8e6c Mon Sep 17 00:00:00 2001 From: Conrad Date: Thu, 25 Jun 2026 14:21:09 -0400 Subject: [PATCH] fix: Widen wool keepalive margin to stop GOAWAY too_many_pings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The worker dispatch channel ran on wool's default gRPC options, which set the client keepalive cadence (keepalive_time_ms=30s) exactly equal to the server's no-data ping floor (http2_min_recv_ping_interval_without_data_ms=30s) with a 2-strike budget. A workflow dispatch is a long-lived stream-stream RPC that goes quiet during a subprocess stage (samtools/sort on a large file); with keepalive_permit_without_calls on, the client keeps pinging into that silence. Over Fargate's awsvpc ENI, inter-ping jitter lands pings a hair under the 30s floor, the server counts strikes, and after three it sends GOAWAY too_many_pings — surfacing on the API as UNAVAILABLE and failing the job. Give the cadence real margin via a shared worker_grpc_options(): client pings once a minute, server floor drops to 20s, strike budget to 5. A ping can now arrive 3x early and still clear the floor. Wired into both worker entrypoints (worker_main for ECS, worker_lan for local) from one config point; the worker advertises the channel options to clients via discovery metadata, so both directions stay consistent. --- src/cfdb/workflows/grpc_options.py | 71 ++++++++++++++++++++++++++++++ src/cfdb/workflows/worker_lan.py | 18 ++++---- src/cfdb/workflows/worker_main.py | 5 +++ 3 files changed, 86 insertions(+), 8 deletions(-) create mode 100644 src/cfdb/workflows/grpc_options.py diff --git a/src/cfdb/workflows/grpc_options.py b/src/cfdb/workflows/grpc_options.py new file mode 100644 index 0000000..aca69d8 --- /dev/null +++ b/src/cfdb/workflows/grpc_options.py @@ -0,0 +1,71 @@ +"""gRPC channel/server options for the wool worker dispatch channel. + +Centralizes the keepalive and ping-enforcement settings the worker +applies to its gRPC server (and advertises to dispatching clients via +discovery metadata), so both worker entrypoints — ``worker_main`` (ECS) +and ``worker_lan`` (local) — stay in lockstep. + +Why this exists: wool's defaults set the client keepalive cadence +(:attr:`ChannelOptions.keepalive_time_ms`, 30 s) *exactly equal* to the +server's no-data ping floor +(:attr:`WorkerOptions.http2_min_recv_ping_interval_without_data_ms`, +30 s), with a strike budget of 2. A workflow dispatch is a long-lived +stream-stream RPC that goes quiet for minutes during a subprocess stage +(``samtools sort`` / GNU ``sort`` on a multi-GB file); with +``keepalive_permit_without_calls`` on, the client keeps pinging into +that silence. Over a real network (Fargate's awsvpc ENI), inter-ping +arrival jitter routinely lands a ping a hair under the 30 s floor, which +the server counts as a strike — three strikes and it sends +``GOAWAY too_many_pings``, surfacing on the API as +``UNAVAILABLE: Too many pings`` and failing the job. + +The fix is margin: the client pings once a minute while the server's +floor sits at 20 s, so a ping can arrive up to 3x early and still clear +the floor — strikes effectively never accumulate. The lowered server +floor plus the larger strike budget stop the GOAWAY even if the +advertised cadence somehow fails to propagate to the client, because the +GOAWAY is a purely server-side decision. All other channel settings +(100 MB message limits, stream concurrency, compression) keep wool's +defaults. +""" + +from __future__ import annotations + +from wool.runtime.worker.base import ChannelOptions, WorkerOptions + +#: Client keepalive ping cadence (ms). Comfortably above the server +#: floor below so jitter never pushes a ping under the limit. +KEEPALIVE_TIME_MS = 60_000 + +#: Time (ms) to await a keepalive ack before declaring the peer dead. +KEEPALIVE_TIMEOUT_MS = 20_000 + +#: Server-side floor (ms) on the interval between no-data client pings. +#: Well under :data:`KEEPALIVE_TIME_MS` so a ping cannot be a strike. +MIN_RECV_PING_INTERVAL_MS = 20_000 + +#: Strike budget before the server sends ``GOAWAY too_many_pings``. +#: Cushion on top of the cadence margin. +MAX_PING_STRIKES = 5 + + +def worker_grpc_options() -> WorkerOptions: + """Build the :class:`WorkerOptions` for a cfdb wool worker. + + :returns: + Worker server options carrying the keepalive cadence (advertised + to clients) and the relaxed no-data ping enforcement that + prevents spurious ``too_many_pings`` GOAWAYs on long-lived, + low-data dispatch streams. + """ + return WorkerOptions( + channel=ChannelOptions( + keepalive_time_ms=KEEPALIVE_TIME_MS, + keepalive_timeout_ms=KEEPALIVE_TIMEOUT_MS, + # Keep pinging during quiet stages so a genuinely dead worker + # is still detected; the cadence margin makes this safe. + keepalive_permit_without_calls=True, + ), + http2_min_recv_ping_interval_without_data_ms=MIN_RECV_PING_INTERVAL_MS, + max_ping_strikes=MAX_PING_STRIKES, + ) diff --git a/src/cfdb/workflows/worker_lan.py b/src/cfdb/workflows/worker_lan.py index a9d2512..8296b00 100644 --- a/src/cfdb/workflows/worker_lan.py +++ b/src/cfdb/workflows/worker_lan.py @@ -48,6 +48,7 @@ from cfdb.workflows import WORKER_MAX_CONCURRENT_TASKS from cfdb.workflows.backpressure import backpressure_for from cfdb.workflows.credentials import build_worker_credentials +from cfdb.workflows.grpc_options import worker_grpc_options logger = logging.getLogger(__name__) @@ -104,16 +105,17 @@ def _signal_handler_threaded(*_: object) -> None: except NotImplementedError: signal.signal(sig, _signal_handler_threaded) - # Bind per-worker backpressure onto the spawn factory so each spawned - # LocalWorker serializes its routines (mirrors the ECS worker_main - # wiring). ``functools.partial(..., backpressure=hook)`` keeps wool's + # Bind per-worker backpressure and the relaxed keepalive/ping options + # onto the spawn factory so each spawned LocalWorker serializes its + # routines and won't trip GOAWAY too_many_pings on a quiet dispatch + # stream (mirrors the ECS worker_main wiring). Pre-binding only + # non-``host`` kwargs via ``functools.partial`` keeps wool's # ``declares_host`` True, so the pool still prescribes the bind host. backpressure = backpressure_for(WORKER_MAX_CONCURRENT_TASKS) - worker_factory = ( - functools.partial(wool.LocalWorker, backpressure=backpressure) - if backpressure is not None - else wool.LocalWorker - ) + worker_kwargs: dict[str, object] = {"options": worker_grpc_options()} + if backpressure is not None: + worker_kwargs["backpressure"] = backpressure + worker_factory = functools.partial(wool.LocalWorker, **worker_kwargs) pool = wool.WorkerPool( spawn=workers, diff --git a/src/cfdb/workflows/worker_main.py b/src/cfdb/workflows/worker_main.py index 5a92264..0f19f48 100644 --- a/src/cfdb/workflows/worker_main.py +++ b/src/cfdb/workflows/worker_main.py @@ -39,6 +39,7 @@ from cfdb.workflows.backpressure import backpressure_for from cfdb.workflows.constants import DEFAULT_WORKER_PORT from cfdb.workflows.credentials import build_worker_credentials +from cfdb.workflows.grpc_options import worker_grpc_options if TYPE_CHECKING: from aiohttp import web @@ -149,6 +150,10 @@ def _signal_handler_threaded(*_: object) -> None: port=worker_port, credentials=credentials, backpressure=backpressure, + # Relaxed keepalive/ping enforcement so a long, quiet dispatch + # stream doesn't trip the worker's server into GOAWAY + # too_many_pings; see cfdb.workflows.grpc_options. + options=worker_grpc_options(), ) await worker.start() try: