From 7c869a3a1dc78f8072f35456155fe9fd37f6028d Mon Sep 17 00:00:00 2001 From: Conrad Date: Sun, 21 Jun 2026 21:14:54 -0400 Subject: [PATCH] fix: Make _EcsSubscriber cloudpickle-safe across the wool reduce wool's WorkerProxy.__wool_reduce__ serializes the dispatched task's discovery, which in the no-spawn WorkerPool branch is cfdb's _EcsSubscriber (the EcsDiscovery rides along as its owner). While the API's WorkerPool is consuming the subscriber, its asyncio.Queue holds a pending-getter Future that cloudpickle cannot serialize, so every ECS workflow dispatch failed with "cannot pickle '_asyncio.Future' object". Add __getstate__/__setstate__ to _EcsSubscriber, mirroring the EcsDiscovery fix one level deeper: strip the live queue and the single-use exhausted flag on pickle and recreate a fresh, inert queue on unpickle. The worker never consumes the restored subscriber, so an empty queue is correct. The regression test drives the subscriber into the live being-consumed state (a getter parked on the queue plus the flipped flag) over a real moto-backed client; the earlier fresh-subscriber test did not reproduce that state and so passed while production failed. --- src/cfdb/workflows/discovery.py | 48 ++++++++++++++++ tests/test_workflows/test_discovery.py | 76 ++++++++++++++++++++++++++ 2 files changed, 124 insertions(+) diff --git a/src/cfdb/workflows/discovery.py b/src/cfdb/workflows/discovery.py index ac807ac..46516c6 100644 --- a/src/cfdb/workflows/discovery.py +++ b/src/cfdb/workflows/discovery.py @@ -538,6 +538,54 @@ def __init__( self._queue: asyncio.Queue[Optional[DiscoveryEvent]] = asyncio.Queue() self._exhausted = False + def __getstate__(self) -> dict[str, Any]: + """Strip the live queue so the subscriber survives the wool reduce. + + wool's ``WorkerProxy.__wool_reduce__`` reduces the *subscriber* + (the discovery rides along as ``_owner``), so ``_EcsSubscriber`` + MUST cloudpickle — the same contract ``EcsDiscovery.__getstate__`` + honors one level up. While the API's ``WorkerPool`` is consuming + the subscriber, ``_iter`` is parked on ``await self._queue.get()``, + so the queue's internal getter deque holds a pending + ``_asyncio.Future`` that cloudpickle cannot serialize (and the + queue is bound to the API's event loop besides). Drop ``_queue`` + and the single-use ``_exhausted`` flag; ``__setstate__`` recreates + a fresh, empty queue. Under cfdb's single-level dispatch the worker + never consumes the restored subscriber, so an inert empty queue is + correct — only ``_owner`` (itself made picklable by + ``EcsDiscovery.__getstate__``) and ``_filter`` carry meaning across + the boundary. Any events still buffered in the queue (and the + ``None`` shutdown sentinel) are discarded by design: they are only + meaningful to the API-side iterator on the originating loop, never + to the restored copy. + """ + state = self.__dict__.copy() + # Loop-bound / live-runtime fields — recreated fresh in + # ``__setstate__`` so a stale ``_exhausted`` flag or a queue + # carrying a pending getter never rides along in the pickled state. + state.pop("_queue", None) + state.pop("_exhausted", None) + return state + + def __setstate__(self, state: dict[str, Any]) -> None: + """Restore the subscriber with a fresh, inert queue. + + Mirrors ``EcsDiscovery.__setstate__``: config (``_owner``, + ``_filter``) is restored from ``state`` while the live runtime + field stripped by ``__getstate__`` — the event queue — is + recreated empty and the single-use ``_exhausted`` flag is reset to + False, so the unpickled object is a well-formed, never-iterated + subscriber. Under cfdb's single-level dispatch the worker never + enters the discovery context or drives the iterator, so the fresh + queue simply stays empty. Unlike ``EcsDiscovery.__setstate__``'s + ``if self._client is None`` guard, the reset here is unconditional: + ``__getstate__`` always strips both fields, so they never arrive in + ``state`` and there is nothing to preserve. + """ + self.__dict__.update(state) + self._queue = asyncio.Queue() + self._exhausted = False + def __aiter__(self) -> AsyncIterator[DiscoveryEvent]: return self._iter() diff --git a/tests/test_workflows/test_discovery.py b/tests/test_workflows/test_discovery.py index 1d5fd9a..fdb4631 100644 --- a/tests/test_workflows/test_discovery.py +++ b/tests/test_workflows/test_discovery.py @@ -912,6 +912,82 @@ def test_subscriber_roundtrip_should_rebuild_owner_client_and_lock(self): assert isinstance(owner._state_lock, asyncio.Lock) assert owner._state_lock is not original_lock + @pytest.mark.asyncio + async def test_subscriber_roundtrip_should_succeed_when_queue_has_pending_getter( + self, + ): + """Test that a mid-consumption subscriber survives the wool reduce. + + Given: + An ``_EcsSubscriber`` from ``EcsDiscovery.subscribe()`` over a + real boto3 ``ecs`` client (built inside ``moto.mock_aws()``) + driven into the live "being consumed" shape: a getter parked + on its queue (so the queue holds an ``_asyncio.Future``) and + the single-use ``_exhausted`` flag flipped, exactly as + ``_iter`` leaves it while the API's ``WorkerPool`` is parked on + ``await self._queue.get()``. The fresh, never-consumed + subscriber in the sibling test above does not reproduce the + production failure; this one does. + When: + The subscriber is ``cloudpickle.dumps``'d and ``loads``'d back + inside the same moto context (both hops inside ``mock_aws()`` + because ``_owner.__setstate__`` rebuilds its client via + ``build_ecs_client``). + Then: + Serialization succeeds — without ``_EcsSubscriber.__getstate__`` + it raises ``cannot pickle '_asyncio.Future'`` — and the restored + subscriber is a fresh, inert handle: an empty ``asyncio.Queue``, + ``_exhausted`` reset to False, the original ``_filter`` retained, + and an ``_owner`` whose ``_client`` was rebuilt (not left None). + """ + import boto3 + from moto import mock_aws + + with mock_aws(): + # Arrange — a real client, an EcsDiscovery, and a subscriber + # forced into the "being consumed" state: the single-use flag + # set and a getter parked on the queue so it holds an + # _asyncio.Future (the object cloudpickle chokes on today). + client = boto3.client("ecs", region_name="us-east-1") + discovery = EcsDiscovery( + cluster="c", + task_definition_family="worker", + client=client, + ) + # The ctor forbids client + region together; set the rebuild + # region directly so __setstate__'s build_ecs_client resolves a + # region on a clean runner with no default AWS region (e.g. CI). + discovery._region_name = "us-east-1" + subscriber = discovery.subscribe(filter=lambda _meta: True) + subscriber._exhausted = True + getter = asyncio.ensure_future(subscriber._queue.get()) + await asyncio.sleep(0) # let the getter park inside queue.get() + assert not getter.done() # precondition: a Future is parked + # The parked getter is registered as a pending Future in the + # queue's internal getter deque — the exact object cloudpickle + # rejects pre-fix. + assert subscriber._queue._getters + + # Act — dump + load both inside mock_aws so the __setstate__ + # rebuild on _owner can construct a fresh moto-backed client. + # Cancel the parked getter regardless of outcome so a pre-fix + # TypeError still leaves no dangling task. + try: + restored = cloudpickle.loads(cloudpickle.dumps(subscriber)) + finally: + getter.cancel() + with pytest.raises(asyncio.CancelledError): + await getter + + # Assert — the restored subscriber is a fresh, inert handle... + assert isinstance(restored._queue, asyncio.Queue) + assert restored._queue.empty() + assert restored._exhausted is False + assert restored._filter is not None + # ...wrapping an owner whose client was rebuilt, not left None. + assert restored._owner._client is not None + assert restored._owner._client is not client + class TestEcsDiscoveryPickleUnpicklableClient: """Round-trip resilience when the live client is deliberately unpicklable."""