Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions src/cfdb/workflows/grpc_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""gRPC channel/server options for the wool worker dispatch channel.

Centralizes the keepalive and ping-enforcement settings the worker
applies to its gRPC server (and advertises to dispatching clients via
discovery metadata), so both worker entrypoints — ``worker_main`` (ECS)
and ``worker_lan`` (local) — stay in lockstep.

Why this exists: wool's defaults set the client keepalive cadence
(:attr:`ChannelOptions.keepalive_time_ms`, 30 s) *exactly equal* to the
server's no-data ping floor
(:attr:`WorkerOptions.http2_min_recv_ping_interval_without_data_ms`,
30 s), with a strike budget of 2. A workflow dispatch is a long-lived
stream-stream RPC that goes quiet for minutes during a subprocess stage
(``samtools sort`` / GNU ``sort`` on a multi-GB file); with
``keepalive_permit_without_calls`` on, the client keeps pinging into
that silence. Over a real network (Fargate's awsvpc ENI), inter-ping
arrival jitter routinely lands a ping a hair under the 30 s floor, which
the server counts as a strike — three strikes and it sends
``GOAWAY too_many_pings``, surfacing on the API as
``UNAVAILABLE: Too many pings`` and failing the job.

The fix is margin: the client pings once a minute while the server's
floor sits at 20 s, so a ping can arrive up to 3x early and still clear
the floor — strikes effectively never accumulate. The lowered server
floor plus the larger strike budget stop the GOAWAY even if the
advertised cadence somehow fails to propagate to the client, because the
GOAWAY is a purely server-side decision. All other channel settings
(100 MB message limits, stream concurrency, compression) keep wool's
defaults.
"""

from __future__ import annotations

from wool.runtime.worker.base import ChannelOptions, WorkerOptions

#: Client keepalive ping cadence (ms). Comfortably above the server
#: floor below so jitter never pushes a ping under the limit.
KEEPALIVE_TIME_MS = 60_000

#: Time (ms) to await a keepalive ack before declaring the peer dead.
KEEPALIVE_TIMEOUT_MS = 20_000

#: Server-side floor (ms) on the interval between no-data client pings.
#: Well under :data:`KEEPALIVE_TIME_MS` so a ping cannot be a strike.
MIN_RECV_PING_INTERVAL_MS = 20_000

#: Strike budget before the server sends ``GOAWAY too_many_pings``.
#: Cushion on top of the cadence margin.
MAX_PING_STRIKES = 5


def worker_grpc_options() -> WorkerOptions:
"""Build the :class:`WorkerOptions` for a cfdb wool worker.

:returns:
Worker server options carrying the keepalive cadence (advertised
to clients) and the relaxed no-data ping enforcement that
prevents spurious ``too_many_pings`` GOAWAYs on long-lived,
low-data dispatch streams.
"""
return WorkerOptions(
channel=ChannelOptions(
keepalive_time_ms=KEEPALIVE_TIME_MS,
keepalive_timeout_ms=KEEPALIVE_TIMEOUT_MS,
# Keep pinging during quiet stages so a genuinely dead worker
# is still detected; the cadence margin makes this safe.
keepalive_permit_without_calls=True,
),
http2_min_recv_ping_interval_without_data_ms=MIN_RECV_PING_INTERVAL_MS,
max_ping_strikes=MAX_PING_STRIKES,
)
18 changes: 10 additions & 8 deletions src/cfdb/workflows/worker_lan.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from cfdb.workflows import WORKER_MAX_CONCURRENT_TASKS
from cfdb.workflows.backpressure import backpressure_for
from cfdb.workflows.credentials import build_worker_credentials
from cfdb.workflows.grpc_options import worker_grpc_options

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -104,16 +105,17 @@ def _signal_handler_threaded(*_: object) -> None:
except NotImplementedError:
signal.signal(sig, _signal_handler_threaded)

# Bind per-worker backpressure onto the spawn factory so each spawned
# LocalWorker serializes its routines (mirrors the ECS worker_main
# wiring). ``functools.partial(..., backpressure=hook)`` keeps wool's
# Bind per-worker backpressure and the relaxed keepalive/ping options
# onto the spawn factory so each spawned LocalWorker serializes its
# routines and won't trip GOAWAY too_many_pings on a quiet dispatch
# stream (mirrors the ECS worker_main wiring). Pre-binding only
# non-``host`` kwargs via ``functools.partial`` keeps wool's
# ``declares_host`` True, so the pool still prescribes the bind host.
backpressure = backpressure_for(WORKER_MAX_CONCURRENT_TASKS)
worker_factory = (
functools.partial(wool.LocalWorker, backpressure=backpressure)
if backpressure is not None
else wool.LocalWorker
)
worker_kwargs: dict[str, object] = {"options": worker_grpc_options()}
if backpressure is not None:
worker_kwargs["backpressure"] = backpressure
worker_factory = functools.partial(wool.LocalWorker, **worker_kwargs)

pool = wool.WorkerPool(
spawn=workers,
Expand Down
5 changes: 5 additions & 0 deletions src/cfdb/workflows/worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from cfdb.workflows.backpressure import backpressure_for
from cfdb.workflows.constants import DEFAULT_WORKER_PORT
from cfdb.workflows.credentials import build_worker_credentials
from cfdb.workflows.grpc_options import worker_grpc_options

if TYPE_CHECKING:
from aiohttp import web
Expand Down Expand Up @@ -149,6 +150,10 @@ def _signal_handler_threaded(*_: object) -> None:
port=worker_port,
credentials=credentials,
backpressure=backpressure,
# Relaxed keepalive/ping enforcement so a long, quiet dispatch
# stream doesn't trip the worker's server into GOAWAY
# too_many_pings; see cfdb.workflows.grpc_options.
options=worker_grpc_options(),
)
await worker.start()
try:
Expand Down
Loading