From 6db954c1e0667344e9034e5594013902504300dc Mon Sep 17 00:00:00 2001 From: Shivansh Date: Sat, 16 May 2026 16:09:14 +0530 Subject: [PATCH 1/2] execution: Add scheduler-aware overload to as_sender() for P2300 pipelines Signed-off-by: Shivansh Singh --- .../hpx/execution/algorithms/as_sender.hpp | 187 ++++++++++++++++++ 1 file changed, 187 insertions(+) diff --git a/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp b/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp index cb21911acb8..f258e750d69 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp @@ -232,6 +232,178 @@ namespace hpx::execution::experimental { HPX_FORWARD(Receiver, receiver), future_}; } }; + + /////////////////////////////////////////////////////////////////////// + // Scheduler-aware sender wrapper. + // + // Exposes the stored scheduler through its environment so that + // downstream sender algorithms (bulk, sync_wait, etc.) can query + // get_completion_scheduler and obtain the scheduler + // that originated the work. + HPX_CXX_CORE_EXPORT template + struct as_sender_sender_with_scheduler; + + template + struct as_sender_sender_with_scheduler, Scheduler> + : public as_sender_sender_base> + { + using sender_concept = hpx::execution::experimental::sender_t; + using future_type = hpx::future; + using scheduler_type = std::decay_t; + using base_type = as_sender_sender_base>; + using base_type::future_; + + HPX_NO_UNIQUE_ADDRESS scheduler_type scheduler_; + + // Environment that answers get_completion_scheduler queries. + // Follows the thread_pool_scheduler::sender::env pattern. + struct env + { + scheduler_type const& sched; + + auto query(hpx::execution::experimental::get_domain_t) + const noexcept + { + return hpx::execution::experimental::get_domain(sched); + } + + template + requires std::is_same_v || + std::is_same_v + auto query( + hpx::execution::experimental:: + get_completion_scheduler_t) const noexcept + { + return sched; + } + }; + + template , + as_sender_sender_with_scheduler>>> + explicit as_sender_sender_with_scheduler( + Future_&& future, Scheduler_&& scheduler) + : base_type{HPX_FORWARD(Future_, future)} + , scheduler_(HPX_FORWARD(Scheduler_, scheduler)) + { + } + + as_sender_sender_with_scheduler( + as_sender_sender_with_scheduler&&) = default; + as_sender_sender_with_scheduler& operator=( + as_sender_sender_with_scheduler&&) = default; + as_sender_sender_with_scheduler( + as_sender_sender_with_scheduler const&) = delete; + as_sender_sender_with_scheduler& operator=( + as_sender_sender_with_scheduler const&) = delete; + + template + static consteval auto get_completion_signatures() noexcept -> + typename base_type::completion_signatures + { + return {}; + } + + template + auto connect(Receiver&& receiver) && + { + return as_sender_operation_state{ + HPX_FORWARD(Receiver, receiver), HPX_MOVE(future_)}; + } + + constexpr auto get_env() const noexcept + { + return env{scheduler_}; + } + }; + + template + struct as_sender_sender_with_scheduler, Scheduler> + : public as_sender_sender_base> + { + using sender_concept = hpx::execution::experimental::sender_t; + using future_type = hpx::shared_future; + using scheduler_type = std::decay_t; + using base_type = + as_sender_sender_base>; + using base_type::future_; + + HPX_NO_UNIQUE_ADDRESS scheduler_type scheduler_; + + struct env + { + scheduler_type const& sched; + + auto query(hpx::execution::experimental::get_domain_t) + const noexcept + { + return hpx::execution::experimental::get_domain(sched); + } + + template + requires std::is_same_v || + std::is_same_v + auto query( + hpx::execution::experimental:: + get_completion_scheduler_t) const noexcept + { + return sched; + } + }; + + template , + as_sender_sender_with_scheduler>>> + explicit as_sender_sender_with_scheduler( + Future_&& future, Scheduler_&& scheduler) + : base_type{HPX_FORWARD(Future_, future)} + , scheduler_(HPX_FORWARD(Scheduler_, scheduler)) + { + } + + as_sender_sender_with_scheduler( + as_sender_sender_with_scheduler&&) = default; + as_sender_sender_with_scheduler& operator=( + as_sender_sender_with_scheduler&&) = default; + as_sender_sender_with_scheduler( + as_sender_sender_with_scheduler const&) = default; + as_sender_sender_with_scheduler& operator=( + as_sender_sender_with_scheduler const&) = default; + + template + static consteval auto get_completion_signatures() noexcept -> + typename base_type::completion_signatures + { + return {}; + } + + template + auto connect(Receiver&& receiver) && + { + return as_sender_operation_state{ + HPX_FORWARD(Receiver, receiver), HPX_MOVE(future_)}; + } + + template + auto connect(Receiver&& receiver) & + { + return as_sender_operation_state{ + HPX_FORWARD(Receiver, receiver), future_}; + } + + constexpr auto get_env() const noexcept + { + return env{scheduler_}; + } + }; } // namespace detail // The as_sender CPO can be used to adapt any HPX future as a sender. The @@ -256,6 +428,21 @@ namespace hpx::execution::experimental { HPX_FORWARD(Future, future)); } + // Scheduler-aware overload: wraps the future into a sender whose + // environment exposes the given scheduler as completion scheduler. + template + requires hpx::traits::is_future_v> && + hpx::execution::experimental::scheduler< + std::decay_t> + constexpr HPX_FORCEINLINE auto operator()( + Future&& future, Scheduler&& scheduler) const + { + return detail::as_sender_sender_with_scheduler< + std::decay_t, std::decay_t>( + HPX_FORWARD(Future, future), + HPX_FORWARD(Scheduler, scheduler)); + } + constexpr HPX_FORCEINLINE auto operator()() const { return detail::partial_algorithm{}; From b530f7815ae3fcee267ac7d42aa2d62f263b7f89 Mon Sep 17 00:00:00 2001 From: Shivansh Date: Sat, 16 May 2026 21:11:22 +0530 Subject: [PATCH 2/2] execution: Address hkaiser review on as_sender scheduler overload - Replace enable_if_t with C++20 requires clause in constructor - Deduplicate as_sender_sender_with_scheduler specializations into primary template - Use requires(is_future_v) to cover both hpx::future and hpx::shared_future Signed-off-by: Shivansh Signed-off-by: Shivansh Singh --- .../hpx/execution/algorithms/as_sender.hpp | 119 +++--------------- libs/core/execution/tests/unit/CMakeLists.txt | 1 + .../algorithm_as_sender_with_scheduler.cpp | 111 ++++++++++++++++ 3 files changed, 130 insertions(+), 101 deletions(-) create mode 100644 libs/core/execution/tests/unit/algorithm_as_sender_with_scheduler.cpp diff --git a/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp b/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp index f258e750d69..96ee6c04168 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp @@ -241,16 +241,14 @@ namespace hpx::execution::experimental { // get_completion_scheduler and obtain the scheduler // that originated the work. HPX_CXX_CORE_EXPORT template - struct as_sender_sender_with_scheduler; - - template - struct as_sender_sender_with_scheduler, Scheduler> - : public as_sender_sender_base> + requires hpx::traits::is_future_v> + struct as_sender_sender_with_scheduler + : public as_sender_sender_base> { using sender_concept = hpx::execution::experimental::sender_t; - using future_type = hpx::future; + using future_type = std::decay_t; using scheduler_type = std::decay_t; - using base_type = as_sender_sender_base>; + using base_type = as_sender_sender_base>; using base_type::future_; HPX_NO_UNIQUE_ADDRESS scheduler_type scheduler_; @@ -261,107 +259,28 @@ namespace hpx::execution::experimental { { scheduler_type const& sched; - auto query(hpx::execution::experimental::get_domain_t) - const noexcept - { - return hpx::execution::experimental::get_domain(sched); - } - - template - requires std::is_same_v || - std::is_same_v auto query( - hpx::execution::experimental:: - get_completion_scheduler_t) const noexcept - { - return sched; - } - }; - - template , - as_sender_sender_with_scheduler>>> - explicit as_sender_sender_with_scheduler( - Future_&& future, Scheduler_&& scheduler) - : base_type{HPX_FORWARD(Future_, future)} - , scheduler_(HPX_FORWARD(Scheduler_, scheduler)) - { - } - - as_sender_sender_with_scheduler( - as_sender_sender_with_scheduler&&) = default; - as_sender_sender_with_scheduler& operator=( - as_sender_sender_with_scheduler&&) = default; - as_sender_sender_with_scheduler( - as_sender_sender_with_scheduler const&) = delete; - as_sender_sender_with_scheduler& operator=( - as_sender_sender_with_scheduler const&) = delete; - - template - static consteval auto get_completion_signatures() noexcept -> - typename base_type::completion_signatures - { - return {}; - } - - template - auto connect(Receiver&& receiver) && - { - return as_sender_operation_state{ - HPX_FORWARD(Receiver, receiver), HPX_MOVE(future_)}; - } - - constexpr auto get_env() const noexcept - { - return env{scheduler_}; - } - }; - - template - struct as_sender_sender_with_scheduler, Scheduler> - : public as_sender_sender_base> - { - using sender_concept = hpx::execution::experimental::sender_t; - using future_type = hpx::shared_future; - using scheduler_type = std::decay_t; - using base_type = - as_sender_sender_base>; - using base_type::future_; - - HPX_NO_UNIQUE_ADDRESS scheduler_type scheduler_; - - struct env - { - scheduler_type const& sched; - - auto query(hpx::execution::experimental::get_domain_t) - const noexcept + hpx::execution::experimental::get_domain_t) const noexcept { return hpx::execution::experimental::get_domain(sched); } template requires std::is_same_v || - std::is_same_v + hpx::execution::experimental::set_value_t> || + std::is_same_v auto query( - hpx::execution::experimental:: - get_completion_scheduler_t) const noexcept + hpx::execution::experimental::get_completion_scheduler_t< + CPO>) const noexcept { return sched; } }; - template , - as_sender_sender_with_scheduler>>> + template + requires(!std::is_same_v, + as_sender_sender_with_scheduler>) explicit as_sender_sender_with_scheduler( Future_&& future, Scheduler_&& scheduler) : base_type{HPX_FORWARD(Future_, future)} @@ -432,15 +351,13 @@ namespace hpx::execution::experimental { // environment exposes the given scheduler as completion scheduler. template requires hpx::traits::is_future_v> && - hpx::execution::experimental::scheduler< - std::decay_t> + hpx::execution::experimental::scheduler> constexpr HPX_FORCEINLINE auto operator()( Future&& future, Scheduler&& scheduler) const { - return detail::as_sender_sender_with_scheduler< - std::decay_t, std::decay_t>( - HPX_FORWARD(Future, future), - HPX_FORWARD(Scheduler, scheduler)); + return detail::as_sender_sender_with_scheduler, + std::decay_t>( + HPX_FORWARD(Future, future), HPX_FORWARD(Scheduler, scheduler)); } constexpr HPX_FORCEINLINE auto operator()() const diff --git a/libs/core/execution/tests/unit/CMakeLists.txt b/libs/core/execution/tests/unit/CMakeLists.txt index b3a2e8c4b4e..71e2b6ea0fc 100644 --- a/libs/core/execution/tests/unit/CMakeLists.txt +++ b/libs/core/execution/tests/unit/CMakeLists.txt @@ -6,6 +6,7 @@ set(tests algorithm_as_sender + algorithm_as_sender_with_scheduler algorithm_bulk algorithm_continues_on algorithm_ensure_started diff --git a/libs/core/execution/tests/unit/algorithm_as_sender_with_scheduler.cpp b/libs/core/execution/tests/unit/algorithm_as_sender_with_scheduler.cpp new file mode 100644 index 00000000000..5597923929e --- /dev/null +++ b/libs/core/execution/tests/unit/algorithm_as_sender_with_scheduler.cpp @@ -0,0 +1,111 @@ +// Copyright (c) 2026 The STE||AR-Group +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include +#include +#include +#include + +namespace ex = hpx::execution::experimental; +namespace tt = hpx::this_thread::experimental; + +void test_as_sender_with_scheduler_basic() +{ + // Test: future + scheduler -> sender with env + auto f = hpx::async([] { return 42; }); + ex::thread_pool_scheduler sched{}; + auto sender = ex::as_sender(std::move(f), sched); + // Verify compilation and basic properties + (void) sender; +} + +void test_as_sender_with_scheduler_get_env() +{ + // Test: env exposes the scheduler + auto f = hpx::async([] { return 42; }); + ex::thread_pool_scheduler sched{}; + auto sender = ex::as_sender(std::move(f), sched); + auto env = ex::get_env(sender); + // Verify env can be queried for scheduler + auto sched_from_env = ex::get_completion_scheduler(env); + HPX_TEST(sched == sched_from_env); +} + +void test_as_sender_with_scheduler_in_pipeline() +{ + // Full end-to-end test + auto f = hpx::async([] { return 42; }); + ex::thread_pool_scheduler sched{}; + + auto [result] = + tt::sync_wait(ex::as_sender(std::move(f), sched) | ex::then([](int x) { + return x * 2; + })).value(); + + HPX_TEST_EQ(result, 84); +} + +void test_as_sender_with_scheduler_shared_future() +{ + // Test shared_future variant (copyable) + auto f = hpx::make_shared_future(hpx::async([] { return 42; })); + ex::thread_pool_scheduler sched{}; + auto sender = ex::as_sender(f, sched); // f is lvalue, not moved + + auto [result] = + tt::sync_wait(sender | ex::then([](int x) { return x * 2; })).value(); + + HPX_TEST_EQ(result, 84); +} + +void test_as_sender_with_scheduler_void() +{ + // Test void-returning futures + bool executed = false; + auto f = hpx::async([&] { executed = true; }); + ex::thread_pool_scheduler sched{}; + auto sender = ex::as_sender(std::move(f), sched); + tt::sync_wait(std::move(sender)); + HPX_TEST(executed); +} + +void test_as_sender_with_scheduler_error() +{ + // Test exception propagation + auto f = hpx::async([]() -> int { + throw std::runtime_error("test error"); + return 42; + }); + ex::thread_pool_scheduler sched{}; + auto sender = ex::as_sender(std::move(f), sched); + + bool caught = false; + try + { + tt::sync_wait(std::move(sender)); + HPX_TEST(false); // Should have thrown + } + catch (std::runtime_error const& e) + { + caught = true; + HPX_TEST_EQ(std::string(e.what()), "test error"); + } + HPX_TEST(caught); +} + +int main() +{ + test_as_sender_with_scheduler_basic(); + test_as_sender_with_scheduler_get_env(); + test_as_sender_with_scheduler_in_pipeline(); + test_as_sender_with_scheduler_shared_future(); + test_as_sender_with_scheduler_void(); + test_as_sender_with_scheduler_error(); + + return hpx::util::report_errors(); +}