Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@
*.pyd
*.py~

# Cython files
*.c
*.so

build/
dist/
*.egg-info
Expand Down
1 change: 0 additions & 1 deletion continuous_integration/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ dependencies:
- click
- cloudpickle
- coverage
- cython # Only tested here; also a dependency of crick
- dask # overridden by git tip below; pulls in optional dependencies
- fsspec
- gilknocker
Expand Down
13 changes: 6 additions & 7 deletions distributed/diagnostics/progress_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@

from distributed.core import coerce_to_address, connect
from distributed.diagnostics.progress import AllProgress
from distributed.scheduler import Scheduler
from distributed.utils import color_of
from distributed.worker import dumps_function

logger = logging.getLogger(__name__)


def counts(scheduler, allprogress):
def _counts(scheduler: Scheduler, allprogress: AllProgress) -> dict:
return merge(
{"all": valmap(len, allprogress.all), "nbytes": allprogress.nbytes},
{
Expand All @@ -24,10 +25,8 @@ def counts(scheduler, allprogress):
)


def _remove_all_progress_plugin(self, *args, **kwargs):
# Wrapper function around `Scheduler.remove_plugin` to avoid raising a
# `PicklingError` when using a cythonized scheduler
self.remove_plugin(name=AllProgress.name)
def _teardown(scheduler: Scheduler, allprogress: AllProgress) -> None:
scheduler.remove_plugin(name=allprogress.name)


async def progress_stream(address, interval):
Expand All @@ -54,9 +53,9 @@ async def progress_stream(address, interval):
{
"op": "feed",
"setup": dumps_function(AllProgress),
"function": dumps_function(counts),
"function": dumps_function(_counts),
"interval": interval,
"teardown": dumps_function(_remove_all_progress_plugin),
"teardown": dumps_function(_teardown),
}
)
return comm
Expand Down
Loading