Skip to content

Implement our own to_thread offload for cudf-polars streaming execution#22474

Merged
rapids-bot[bot] merged 3 commits into
rapidsai:release/26.06from
wence-:wence/fea/use-py-executor
May 18, 2026
Merged

Implement our own to_thread offload for cudf-polars streaming execution#22474
rapids-bot[bot] merged 3 commits into
rapidsai:release/26.06from
wence-:wence/fea/use-py-executor

Conversation

@wence-
Copy link
Copy Markdown
Contributor

@wence- wence- commented May 12, 2026

Description

asyncio.to_thread always uses the default asyncio thread pool that contains a hardware-dependent number of threads. Although one can set the default executor on an event loop, when the loop exits, the executor is shut down. Since we want the executor thread pool to persist between collect calls we can't do that. Instead, hang an executor on the IRExecutionContext and use the new to_thread method to offload.

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

@wence- wence- added DO NOT MERGE Hold off on merging; see PR for details improvement Improvement / enhancement to an existing function non-breaking Non-breaking change labels May 12, 2026
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented May 12, 2026

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@github-actions github-actions Bot added Python Affects Python cuDF API. cudf-polars Issues specific to cudf-polars labels May 12, 2026
@wence-
Copy link
Copy Markdown
Contributor Author

wence- commented May 12, 2026

Needs rapidsai/rapidsmpf#1023, marking as do not merge because I don't think this should go to 26.06.

@GPUtester GPUtester moved this to In Progress in cuDF Python May 12, 2026
@wence- wence- force-pushed the wence/fea/use-py-executor branch from cf19f57 to 2388a7a Compare May 12, 2026 15:05
@wence- wence- force-pushed the wence/fea/use-py-executor branch from 2388a7a to c465b8c Compare May 14, 2026 11:31
@wence- wence- marked this pull request as ready for review May 14, 2026 13:55
@wence- wence- requested a review from a team as a code owner May 14, 2026 13:55
@wence- wence- requested a review from vyasr May 14, 2026 13:55
Comment thread python/cudf_polars/cudf_polars/dsl/ir.py
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 14, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • ✅ Review completed - (🔄 Check again to review again)
📝 Walkthrough

Walkthrough

This PR migrates RapidsMPF's thread-offloading pattern from direct asyncio.to_thread() calls to a context-aware IRExecutionContext.to_thread() method. The core change adds a py_executor field and async to_thread() method to IRExecutionContext, updates the AllGatherManager to accept and use the context, wires it through frontend orchestration, and replaces all asyncio.to_thread() calls across distributed services with the new pattern.

Changes

Thread Pool Executor Integration in RapidsMPF

Layer / File(s) Summary
IRExecutionContext threading infrastructure
cudf_polars/dsl/ir.py
IRExecutionContext adds optional py_executor field and async to_thread() method that captures contextvars context and executes callables on the configured ThreadPoolExecutor via run_in_executor. Typing uses ParamSpec/TypeVar scaffolding. Minor cosmetic lambda reformatting in Join.Cross join paths.
AllGather service update for context-aware threading
experimental/rapidsmpf/collectives/allgather.py
AllGatherManager.extract_concatenated() signature adds ir_context: IRExecutionContext parameter and implementation offloads unpack_and_concat via ir_context.to_thread() instead of asyncio.to_thread(). Module removes asyncio import and adds IRExecutionContext import.
Frontend execution context setup and orchestration wiring
experimental/rapidsmpf/frontend/core.py
execute_ir_on_rank constructs IRExecutionContext with the provided py_executor and calls run_actor_network(ctx, actors=nodes) to route execution through the context instead of passing py_executor separately.
Distributed services & IO migration to context-aware threading
experimental/rapidsmpf/collectives/sort.py, groupby.py, io.py, join.py, nodes.py, repartition.py, utils.py
Replace asyncio.to_thread() with ir_context.to_thread() across sort (slice optimization, boundary computation), groupby (tree reduction), IO (read/write/finalize), join (broadcast & shuffle paths), node evaluation, repartition, and utilities. Thread ir_context into AllGatherManager.extract_concatenated() calls.
Test coverage for IRExecutionContext
tests/experimental/test_allgather.py
Test imports include ThreadPoolExecutor and IRExecutionContext. Updated _test_allgather creates IRExecutionContext(ThreadPoolExecutor(max_workers=1)) and passes it to AllGatherManager.extract_concatenated via ir_context parameter.
Lint enforcement for context-aware threading
pyproject.toml
Ruff flake8-tidy-imports.banned-api configuration bans asyncio.to_thread and recommends using ir_context.to_thread instead.

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: implementing a custom to_thread offload mechanism for cudf-polars streaming execution to replace asyncio.to_thread.
Description check ✅ Passed The description clearly explains the motivation and implementation approach: replacing asyncio.to_thread with a persistent executor attached to IRExecutionContext to maintain thread pool continuity between collect calls.
Docstring Coverage ✅ Passed Docstring coverage is 94.74% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@python/cudf_polars/cudf_polars/dsl/ir.py`:
- Around line 156-158: Replace the assert in to_thread() that checks
self.py_executor with an explicit runtime check: if self.py_executor is None:
raise RuntimeError("Execution context must have a thread pool for offload") (or
a suitable exception type) so we never pass None into loop.run_in_executor and
silently fall back to the default executor; update the check at the start of
to_thread() (referencing self.py_executor and loop.run_in_executor) accordingly
to ensure the persistent executor is always used.

In `@python/cudf_polars/tests/experimental/test_allgather.py`:
- Around line 51-55: The ThreadPoolExecutor passed into IRExecutionContext is
never closed causing leaked threads; change the test to create the executor with
a context manager or explicitly shut it down after use—e.g., wrap
ThreadPoolExecutor(max_workers=1) in a with block and construct
IRExecutionContext inside it (or call executor.shutdown(wait=True) after
awaiting allgather.extract_concatenated) so the executor backing
IRExecutionContext is properly closed; reference IRExecutionContext and the
ThreadPoolExecutor instance used for ir_context.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: d2339823-9e70-4290-9f51-6043fae6332f

📥 Commits

Reviewing files that changed from the base of the PR and between 9a85bda and c465b8c.

📒 Files selected for processing (11)
  • python/cudf_polars/cudf_polars/dsl/ir.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/collectives/allgather.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/collectives/sort.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/core.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/groupby.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/io.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/join.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/nodes.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/repartition.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/utils.py
  • python/cudf_polars/tests/experimental/test_allgather.py

Comment thread python/cudf_polars/cudf_polars/dsl/ir.py
Comment thread python/cudf_polars/tests/experimental/test_allgather.py Outdated
@wence- wence- force-pushed the wence/fea/use-py-executor branch 2 times, most recently from 608f2d2 to 420df99 Compare May 14, 2026 16:21
@wence- wence- removed the DO NOT MERGE Hold off on merging; see PR for details label May 14, 2026
@wence-
Copy link
Copy Markdown
Contributor Author

wence- commented May 14, 2026

OK, I did some benchmarking and I think this is positive, and should go in. Needs coordination with the rapidsmpf PR, but please review.

Copy link
Copy Markdown
Contributor

@mroeschke mroeschke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: Would be nice to use a ruff rule disallow asyncio.to_thread like we did with gather here

"asyncio.gather".msg = "Use gather_with_task_group instead."

Comment thread python/cudf_polars/cudf_polars/dsl/ir.py
Copy link
Copy Markdown
Contributor

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thanks.

Have you thought at all about how to protect against reintroducing this in the future? A couple of options:

  1. a pre-commit rule that checks for usage of asyncio.to_thread in cudf-polars.
  2. Some kind of pytest fixture that sets the default executor used by asyncio.to_thread to raise an exception. But I don't know whether this is feasible since I'm not sure exactly when the event loop is created (maybe not until we call run_actor_network?)

@wence- wence- requested a review from a team as a code owner May 14, 2026 17:12
@wence-
Copy link
Copy Markdown
Contributor Author

wence- commented May 14, 2026

Looks good, thanks.

Have you thought at all about how to protect against reintroducing this in the future? A couple of options:

1. a pre-commit rule that checks for usage of `asyncio.to_thread` in cudf-polars.

2. Some kind of pytest fixture that sets the default executor used by `asyncio.to_thread` to raise an exception. But I don't know whether this is feasible since I'm not sure exactly when the event loop is created (maybe not until we call `run_actor_network`?)

Good idea, added a ruff rule at Matt's suggestion

@wence- wence- force-pushed the wence/fea/use-py-executor branch from 60af723 to bba9510 Compare May 14, 2026 17:14
Copy link
Copy Markdown
Contributor

@bdice bdice left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving packaging changes.

Comment thread python/cudf_polars/cudf_polars/dsl/ir.py
Comment thread python/cudf_polars/cudf_polars/dsl/ir.py Outdated
@wence- wence- force-pushed the wence/fea/use-py-executor branch from ab7188a to 48a243a Compare May 15, 2026 16:43
@wence- wence- changed the base branch from main to release/26.06 May 15, 2026 16:43
@wence- wence- force-pushed the wence/fea/use-py-executor branch from 48a243a to 023399d Compare May 18, 2026 10:58
wence- added 3 commits May 18, 2026 12:16
asyncio.to_thread always uses the default asyncio thread pool that contains
a hardware-dependent number of threads. Although one can set the default
executor on an event loop, when the loop exits, the executor is shut down.
Since we want the executor thread pool to persist between collect calls
we can't do that. Instead, hang an executor on the IRExecutionContext and
use the new to_thread method to offload.
@wence- wence- force-pushed the wence/fea/use-py-executor branch from 023399d to 8aa62db Compare May 18, 2026 11:16
@wence-
Copy link
Copy Markdown
Contributor Author

wence- commented May 18, 2026

/merge

@rapids-bot rapids-bot Bot merged commit 5c32d00 into rapidsai:release/26.06 May 18, 2026
87 checks passed
@github-project-automation github-project-automation Bot moved this from In Progress to Done in cuDF Python May 18, 2026
@wence- wence- deleted the wence/fea/use-py-executor branch May 18, 2026 13:08
madsbk pushed a commit to madsbk/cudf that referenced this pull request May 19, 2026
…on (rapidsai#22474)

asyncio.to_thread always uses the default asyncio thread pool that contains a hardware-dependent number of threads. Although one can set the default executor on an event loop, when the loop exits, the executor is shut down. Since we want the executor thread pool to persist between collect calls we can't do that. Instead, hang an executor on the IRExecutionContext and use the new to_thread method to offload.

Authors:
  - Lawrence Mitchell (https://github.com/wence-)

Approvers:
  - Matthew Roeschke (https://github.com/mroeschke)
  - Tom Augspurger (https://github.com/TomAugspurger)
  - Bradley Dice (https://github.com/bdice)
  - Mads R. B. Kristensen (https://github.com/madsbk)

URL: rapidsai#22474
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cudf-polars Issues specific to cudf-polars improvement Improvement / enhancement to an existing function non-breaking Non-breaking change Python Affects Python cuDF API.

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

6 participants