Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions src/cfdb/workflows/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,54 @@ def __init__(
self._queue: asyncio.Queue[Optional[DiscoveryEvent]] = asyncio.Queue()
self._exhausted = False

def __getstate__(self) -> dict[str, Any]:
"""Strip the live queue so the subscriber survives the wool reduce.

wool's ``WorkerProxy.__wool_reduce__`` reduces the *subscriber*
(the discovery rides along as ``_owner``), so ``_EcsSubscriber``
MUST cloudpickle — the same contract ``EcsDiscovery.__getstate__``
honors one level up. While the API's ``WorkerPool`` is consuming
the subscriber, ``_iter`` is parked on ``await self._queue.get()``,
so the queue's internal getter deque holds a pending
``_asyncio.Future`` that cloudpickle cannot serialize (and the
queue is bound to the API's event loop besides). Drop ``_queue``
and the single-use ``_exhausted`` flag; ``__setstate__`` recreates
a fresh, empty queue. Under cfdb's single-level dispatch the worker
never consumes the restored subscriber, so an inert empty queue is
correct — only ``_owner`` (itself made picklable by
``EcsDiscovery.__getstate__``) and ``_filter`` carry meaning across
the boundary. Any events still buffered in the queue (and the
``None`` shutdown sentinel) are discarded by design: they are only
meaningful to the API-side iterator on the originating loop, never
to the restored copy.
"""
state = self.__dict__.copy()
# Loop-bound / live-runtime fields — recreated fresh in
# ``__setstate__`` so a stale ``_exhausted`` flag or a queue
# carrying a pending getter never rides along in the pickled state.
state.pop("_queue", None)
state.pop("_exhausted", None)
return state

def __setstate__(self, state: dict[str, Any]) -> None:
"""Restore the subscriber with a fresh, inert queue.

Mirrors ``EcsDiscovery.__setstate__``: config (``_owner``,
``_filter``) is restored from ``state`` while the live runtime
field stripped by ``__getstate__`` — the event queue — is
recreated empty and the single-use ``_exhausted`` flag is reset to
False, so the unpickled object is a well-formed, never-iterated
subscriber. Under cfdb's single-level dispatch the worker never
enters the discovery context or drives the iterator, so the fresh
queue simply stays empty. Unlike ``EcsDiscovery.__setstate__``'s
``if self._client is None`` guard, the reset here is unconditional:
``__getstate__`` always strips both fields, so they never arrive in
``state`` and there is nothing to preserve.
"""
self.__dict__.update(state)
self._queue = asyncio.Queue()
self._exhausted = False

def __aiter__(self) -> AsyncIterator[DiscoveryEvent]:
return self._iter()

Expand Down
76 changes: 76 additions & 0 deletions tests/test_workflows/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,82 @@ def test_subscriber_roundtrip_should_rebuild_owner_client_and_lock(self):
assert isinstance(owner._state_lock, asyncio.Lock)
assert owner._state_lock is not original_lock

@pytest.mark.asyncio
async def test_subscriber_roundtrip_should_succeed_when_queue_has_pending_getter(
self,
):
"""Test that a mid-consumption subscriber survives the wool reduce.

Given:
An ``_EcsSubscriber`` from ``EcsDiscovery.subscribe()`` over a
real boto3 ``ecs`` client (built inside ``moto.mock_aws()``)
driven into the live "being consumed" shape: a getter parked
on its queue (so the queue holds an ``_asyncio.Future``) and
the single-use ``_exhausted`` flag flipped, exactly as
``_iter`` leaves it while the API's ``WorkerPool`` is parked on
``await self._queue.get()``. The fresh, never-consumed
subscriber in the sibling test above does not reproduce the
production failure; this one does.
When:
The subscriber is ``cloudpickle.dumps``'d and ``loads``'d back
inside the same moto context (both hops inside ``mock_aws()``
because ``_owner.__setstate__`` rebuilds its client via
``build_ecs_client``).
Then:
Serialization succeeds — without ``_EcsSubscriber.__getstate__``
it raises ``cannot pickle '_asyncio.Future'`` — and the restored
subscriber is a fresh, inert handle: an empty ``asyncio.Queue``,
``_exhausted`` reset to False, the original ``_filter`` retained,
and an ``_owner`` whose ``_client`` was rebuilt (not left None).
"""
import boto3
from moto import mock_aws

with mock_aws():
# Arrange — a real client, an EcsDiscovery, and a subscriber
# forced into the "being consumed" state: the single-use flag
# set and a getter parked on the queue so it holds an
# _asyncio.Future (the object cloudpickle chokes on today).
client = boto3.client("ecs", region_name="us-east-1")
discovery = EcsDiscovery(
cluster="c",
task_definition_family="worker",
client=client,
)
# The ctor forbids client + region together; set the rebuild
# region directly so __setstate__'s build_ecs_client resolves a
# region on a clean runner with no default AWS region (e.g. CI).
discovery._region_name = "us-east-1"
subscriber = discovery.subscribe(filter=lambda _meta: True)
subscriber._exhausted = True
getter = asyncio.ensure_future(subscriber._queue.get())
await asyncio.sleep(0) # let the getter park inside queue.get()
assert not getter.done() # precondition: a Future is parked
# The parked getter is registered as a pending Future in the
# queue's internal getter deque — the exact object cloudpickle
# rejects pre-fix.
assert subscriber._queue._getters

# Act — dump + load both inside mock_aws so the __setstate__
# rebuild on _owner can construct a fresh moto-backed client.
# Cancel the parked getter regardless of outcome so a pre-fix
# TypeError still leaves no dangling task.
try:
restored = cloudpickle.loads(cloudpickle.dumps(subscriber))
finally:
getter.cancel()
with pytest.raises(asyncio.CancelledError):
await getter

# Assert — the restored subscriber is a fresh, inert handle...
assert isinstance(restored._queue, asyncio.Queue)
assert restored._queue.empty()
assert restored._exhausted is False
assert restored._filter is not None
# ...wrapping an owner whose client was rebuilt, not left None.
assert restored._owner._client is not None
assert restored._owner._client is not client


class TestEcsDiscoveryPickleUnpicklableClient:
"""Round-trip resilience when the live client is deliberately unpicklable."""
Expand Down
Loading