diff --git a/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp b/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp index cb21911acb8..96ee6c04168 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp @@ -232,6 +232,97 @@ namespace hpx::execution::experimental { HPX_FORWARD(Receiver, receiver), future_}; } }; + + /////////////////////////////////////////////////////////////////////// + // Scheduler-aware sender wrapper. + // + // Exposes the stored scheduler through its environment so that + // downstream sender algorithms (bulk, sync_wait, etc.) can query + // get_completion_scheduler and obtain the scheduler + // that originated the work. + HPX_CXX_CORE_EXPORT template + requires hpx::traits::is_future_v> + struct as_sender_sender_with_scheduler + : public as_sender_sender_base> + { + using sender_concept = hpx::execution::experimental::sender_t; + using future_type = std::decay_t; + using scheduler_type = std::decay_t; + using base_type = as_sender_sender_base>; + using base_type::future_; + + HPX_NO_UNIQUE_ADDRESS scheduler_type scheduler_; + + // Environment that answers get_completion_scheduler queries. + // Follows the thread_pool_scheduler::sender::env pattern. + struct env + { + scheduler_type const& sched; + + auto query( + hpx::execution::experimental::get_domain_t) const noexcept + { + return hpx::execution::experimental::get_domain(sched); + } + + template + requires std::is_same_v || + std::is_same_v + auto query( + hpx::execution::experimental::get_completion_scheduler_t< + CPO>) const noexcept + { + return sched; + } + }; + + template + requires(!std::is_same_v, + as_sender_sender_with_scheduler>) + explicit as_sender_sender_with_scheduler( + Future_&& future, Scheduler_&& scheduler) + : base_type{HPX_FORWARD(Future_, future)} + , scheduler_(HPX_FORWARD(Scheduler_, scheduler)) + { + } + + as_sender_sender_with_scheduler( + as_sender_sender_with_scheduler&&) = default; + as_sender_sender_with_scheduler& operator=( + as_sender_sender_with_scheduler&&) = default; + as_sender_sender_with_scheduler( + as_sender_sender_with_scheduler const&) = default; + as_sender_sender_with_scheduler& operator=( + as_sender_sender_with_scheduler const&) = default; + + template + static consteval auto get_completion_signatures() noexcept -> + typename base_type::completion_signatures + { + return {}; + } + + template + auto connect(Receiver&& receiver) && + { + return as_sender_operation_state{ + HPX_FORWARD(Receiver, receiver), HPX_MOVE(future_)}; + } + + template + auto connect(Receiver&& receiver) & + { + return as_sender_operation_state{ + HPX_FORWARD(Receiver, receiver), future_}; + } + + constexpr auto get_env() const noexcept + { + return env{scheduler_}; + } + }; } // namespace detail // The as_sender CPO can be used to adapt any HPX future as a sender. The @@ -256,6 +347,19 @@ namespace hpx::execution::experimental { HPX_FORWARD(Future, future)); } + // Scheduler-aware overload: wraps the future into a sender whose + // environment exposes the given scheduler as completion scheduler. + template + requires hpx::traits::is_future_v> && + hpx::execution::experimental::scheduler> + constexpr HPX_FORCEINLINE auto operator()( + Future&& future, Scheduler&& scheduler) const + { + return detail::as_sender_sender_with_scheduler, + std::decay_t>( + HPX_FORWARD(Future, future), HPX_FORWARD(Scheduler, scheduler)); + } + constexpr HPX_FORCEINLINE auto operator()() const { return detail::partial_algorithm{}; diff --git a/libs/core/execution/tests/unit/CMakeLists.txt b/libs/core/execution/tests/unit/CMakeLists.txt index b3a2e8c4b4e..71e2b6ea0fc 100644 --- a/libs/core/execution/tests/unit/CMakeLists.txt +++ b/libs/core/execution/tests/unit/CMakeLists.txt @@ -6,6 +6,7 @@ set(tests algorithm_as_sender + algorithm_as_sender_with_scheduler algorithm_bulk algorithm_continues_on algorithm_ensure_started diff --git a/libs/core/execution/tests/unit/algorithm_as_sender_with_scheduler.cpp b/libs/core/execution/tests/unit/algorithm_as_sender_with_scheduler.cpp new file mode 100644 index 00000000000..5597923929e --- /dev/null +++ b/libs/core/execution/tests/unit/algorithm_as_sender_with_scheduler.cpp @@ -0,0 +1,111 @@ +// Copyright (c) 2026 The STE||AR-Group +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include +#include +#include +#include + +namespace ex = hpx::execution::experimental; +namespace tt = hpx::this_thread::experimental; + +void test_as_sender_with_scheduler_basic() +{ + // Test: future + scheduler -> sender with env + auto f = hpx::async([] { return 42; }); + ex::thread_pool_scheduler sched{}; + auto sender = ex::as_sender(std::move(f), sched); + // Verify compilation and basic properties + (void) sender; +} + +void test_as_sender_with_scheduler_get_env() +{ + // Test: env exposes the scheduler + auto f = hpx::async([] { return 42; }); + ex::thread_pool_scheduler sched{}; + auto sender = ex::as_sender(std::move(f), sched); + auto env = ex::get_env(sender); + // Verify env can be queried for scheduler + auto sched_from_env = ex::get_completion_scheduler(env); + HPX_TEST(sched == sched_from_env); +} + +void test_as_sender_with_scheduler_in_pipeline() +{ + // Full end-to-end test + auto f = hpx::async([] { return 42; }); + ex::thread_pool_scheduler sched{}; + + auto [result] = + tt::sync_wait(ex::as_sender(std::move(f), sched) | ex::then([](int x) { + return x * 2; + })).value(); + + HPX_TEST_EQ(result, 84); +} + +void test_as_sender_with_scheduler_shared_future() +{ + // Test shared_future variant (copyable) + auto f = hpx::make_shared_future(hpx::async([] { return 42; })); + ex::thread_pool_scheduler sched{}; + auto sender = ex::as_sender(f, sched); // f is lvalue, not moved + + auto [result] = + tt::sync_wait(sender | ex::then([](int x) { return x * 2; })).value(); + + HPX_TEST_EQ(result, 84); +} + +void test_as_sender_with_scheduler_void() +{ + // Test void-returning futures + bool executed = false; + auto f = hpx::async([&] { executed = true; }); + ex::thread_pool_scheduler sched{}; + auto sender = ex::as_sender(std::move(f), sched); + tt::sync_wait(std::move(sender)); + HPX_TEST(executed); +} + +void test_as_sender_with_scheduler_error() +{ + // Test exception propagation + auto f = hpx::async([]() -> int { + throw std::runtime_error("test error"); + return 42; + }); + ex::thread_pool_scheduler sched{}; + auto sender = ex::as_sender(std::move(f), sched); + + bool caught = false; + try + { + tt::sync_wait(std::move(sender)); + HPX_TEST(false); // Should have thrown + } + catch (std::runtime_error const& e) + { + caught = true; + HPX_TEST_EQ(std::string(e.what()), "test error"); + } + HPX_TEST(caught); +} + +int main() +{ + test_as_sender_with_scheduler_basic(); + test_as_sender_with_scheduler_get_env(); + test_as_sender_with_scheduler_in_pipeline(); + test_as_sender_with_scheduler_shared_future(); + test_as_sender_with_scheduler_void(); + test_as_sender_with_scheduler_error(); + + return hpx::util::report_errors(); +}