Make _EcsSubscriber cloudpickle-safe so ECS workflow dispatch survives the wool reduce boundary — Closes #60#62
Merged
Conversation
wool's WorkerProxy.__wool_reduce__ serializes the dispatched task's discovery, which in the no-spawn WorkerPool branch is cfdb's _EcsSubscriber (the EcsDiscovery rides along as its owner). While the API's WorkerPool is consuming the subscriber, its asyncio.Queue holds a pending-getter Future that cloudpickle cannot serialize, so every ECS workflow dispatch failed with "cannot pickle '_asyncio.Future' object". Add __getstate__/__setstate__ to _EcsSubscriber, mirroring the EcsDiscovery fix one level deeper: strip the live queue and the single-use exhausted flag on pickle and recreate a fresh, inert queue on unpickle. The worker never consumes the restored subscriber, so an empty queue is correct. The regression test drives the subscriber into the live being-consumed state (a getter parked on the queue plus the flipped flag) over a real moto-backed client; the earlier fresh-subscriber test did not reproduce that state and so passed while production failed.
18714f2 to
7c869a3
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
ECS-profile workflow dispatch failed before any preprocessing ran: a
GET /data|/indexfor an uncached processable file returned202, then the job terminatedfailedwithcannot pickle '_asyncio.Future' object. wool'sWorkerProxy.__wool_reduce__serializes the dispatched task's discovery, which in the no-spawnWorkerPoolbranch is cfdb's_EcsSubscriber(theEcsDiscoveryrides along as its_owner). While the API'sWorkerPoolconsumes the subscriber, itsasyncio.Queueholds a pending-getter_asyncio.Futurethat cloudpickle cannot serialize._EcsSubscriberdefined no pickle protocol, so #54'sEcsDiscoveryfix did not reach it.Make
_EcsSubscriberhonor wool's documented picklability contract by adding__getstate__/__setstate__mirroringEcsDiscoveryone level deeper: strip the live queue and the single-use flag on pickle, recreate a fresh inert queue on unpickle. The worker never consumes the restored subscriber, so an empty queue is correct. This is the targeted unblock; a follow-up (#61) replaces the per-object hand-stripping with an inert worker-side discovery stub.Closes #60
Proposed changes
Add the pickle protocol to
_EcsSubscriber(src/cfdb/workflows/discovery.py)__getstate__copies__dict__and drops_queue(its internal getter deque holds the unpicklable pending_asyncio.Futuremid-consumption, and the queue is bound to the API event loop) plus the single-use_exhaustedflag._owner(already pickle-safe viaEcsDiscovery.__getstate__) and_filtercarry across.__setstate__restores the dict, then mints a freshasyncio.Queue()and resets_exhausted = False, yielding a well-formed, inert subscriber.Add a regression test for the consumed-subscriber path (
tests/test_workflows/test_discovery.py)test_subscriber_roundtrip_should_succeed_when_queue_has_pending_gettertoTestEcsDiscoveryPickleAgainstMoto. It drives the subscriber into the live being-consumed shape — a getter parked on the queue plus the flipped flag — over a real moto-backed boto3 client, the state the existing fresh-subscriber round-trip never reproduced (it passed while production failed). It assertscloudpickle.dumpssucceeds and the restored subscriber is inert: empty queue, reset flag, retained filter, rebuilt owner client.Test cases
TestEcsDiscoveryPickleAgainstMoto_EcsSubscriberover a real moto-backed client with a getter parked on its queue and the single-use flag set_EcsSubscriber.__getstate__/__setstate__