diff --git a/libs/core/executors/CMakeLists.txt b/libs/core/executors/CMakeLists.txt index 8deb1494338..2a3c42e15ae 100644 --- a/libs/core/executors/CMakeLists.txt +++ b/libs/core/executors/CMakeLists.txt @@ -26,6 +26,7 @@ set(executors_headers hpx/executors/execution_policy_parameters.hpp hpx/executors/execution_policy_scheduling_property.hpp hpx/executors/execution_policy.hpp + hpx/executors/executor_scheduler.hpp hpx/executors/explicit_scheduler_executor.hpp hpx/executors/fork_join_executor.hpp hpx/executors/limiting_executor.hpp diff --git a/libs/core/executors/include/hpx/executors/executor_scheduler.hpp b/libs/core/executors/include/hpx/executors/executor_scheduler.hpp new file mode 100644 index 00000000000..d633ff69024 --- /dev/null +++ b/libs/core/executors/include/hpx/executors/executor_scheduler.hpp @@ -0,0 +1,176 @@ +// Copyright (c) 2007-2025 Hartmut Kaiser +// Copyright (c) 2026 The STE||AR-Group +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +/// \file parallel/executors/executor_scheduler.hpp + +#pragma once + +#include + +#include +#include +#include + +#include +#include +#include + +namespace hpx::execution::experimental { + + // Forward declaration + template + struct executor_scheduler; + + namespace detail { + + /////////////////////////////////////////////////////////////////////// + template + struct executor_operation_state + { + HPX_NO_UNIQUE_ADDRESS std::decay_t exec_; + HPX_NO_UNIQUE_ADDRESS std::decay_t receiver_; + + template + executor_operation_state(Exec&& exec, Recv&& recv) + : exec_(HPX_FORWARD(Exec, exec)) + , receiver_(HPX_FORWARD(Recv, recv)) + { + } + + executor_operation_state(executor_operation_state&&) = delete; + executor_operation_state(executor_operation_state const&) = delete; + executor_operation_state& operator=( + executor_operation_state&&) = delete; + executor_operation_state& operator=( + executor_operation_state const&) = delete; + + ~executor_operation_state() = default; + + void start() & noexcept + { + hpx::detail::try_catch_exception_ptr( + [&]() { + if constexpr ( + std::is_same_v< + hpx::traits::executor_execution_category_t< + std::decay_t>, + hpx::execution::sequenced_execution_tag>) + { + // For sequenced executors, invoke inline + hpx::execution::experimental::set_value( + HPX_MOVE(receiver_)); + } + else + { + hpx::parallel::execution::post(exec_, + [receiver = HPX_MOVE(receiver_)]() mutable { + hpx::execution::experimental::set_value( + HPX_MOVE(receiver)); + }); + } + }, + [&](std::exception_ptr ep) { + hpx::execution::experimental::set_error( + HPX_MOVE(receiver_), HPX_MOVE(ep)); + }); + } + }; + + /////////////////////////////////////////////////////////////////////// + template + struct executor_sender + { + using sender_concept = hpx::execution::experimental::sender_t; + + HPX_NO_UNIQUE_ADDRESS std::decay_t exec_; + + using completion_signatures = + hpx::execution::experimental::completion_signatures< + hpx::execution::experimental::set_value_t(), + hpx::execution::experimental::set_error_t( + std::exception_ptr)>; + + template + static consteval auto get_completion_signatures() noexcept + -> completion_signatures + { + return {}; + } + + constexpr auto get_env() const noexcept + { + struct env + { + std::decay_t const& exec_; + + constexpr auto query(hpx::execution::experimental:: + get_completion_scheduler_t< + hpx::execution::experimental::set_value_t>) + const noexcept + { + return executor_scheduler{exec_}; + } + }; + return env{exec_}; + } + + template + auto connect(Receiver&& receiver) && + { + return executor_operation_state>{ + HPX_MOVE(exec_), HPX_FORWARD(Receiver, receiver)}; + } + + template + auto connect(Receiver&& receiver) const& + { + return executor_operation_state>{ + exec_, HPX_FORWARD(Receiver, receiver)}; + } + }; + } // namespace detail + + /////////////////////////////////////////////////////////////////////////// + template + struct executor_scheduler + { + using executor_type = std::decay_t; + + HPX_NO_UNIQUE_ADDRESS executor_type exec_; + + constexpr executor_scheduler() = default; + + template + requires(!std::is_same_v, executor_scheduler>) + constexpr explicit executor_scheduler(Exec&& exec) noexcept + : exec_(HPX_FORWARD(Exec, exec)) + { + } + + constexpr bool operator==(executor_scheduler const& rhs) const noexcept + { + return exec_ == rhs.exec_; + } + + constexpr bool operator!=(executor_scheduler const& rhs) const noexcept + { + return !(*this == rhs); + } + + constexpr detail::executor_sender schedule() const& noexcept + { + return {exec_}; + } + + constexpr detail::executor_sender schedule() && noexcept + { + return {HPX_MOVE(exec_)}; + } + }; +} // namespace hpx::execution::experimental diff --git a/libs/core/executors/include/hpx/executors/parallel_executor.hpp b/libs/core/executors/include/hpx/executors/parallel_executor.hpp index 2c8820ba093..3b3012034b7 100644 --- a/libs/core/executors/include/hpx/executors/parallel_executor.hpp +++ b/libs/core/executors/include/hpx/executors/parallel_executor.hpp @@ -68,6 +68,16 @@ namespace hpx::parallel::execution::detail { struct then_bulk_function_result; } // namespace hpx::parallel::execution::detail +namespace hpx::execution::experimental { + template + struct executor_scheduler; + + namespace detail { + template + struct executor_sender; + } // namespace detail +} // namespace hpx::execution::experimental + namespace hpx::execution { /////////////////////////////////////////////////////////////////////////// @@ -156,7 +166,8 @@ namespace hpx::execution { } public: - parallel_policy_executor_base(parallel_policy_executor_base const& rhs) + parallel_policy_executor_base( + parallel_policy_executor_base const& rhs) noexcept : pool_(rhs.pool_) , policy_(rhs.policy_) , first_core_(rhs.first_core_) @@ -169,7 +180,7 @@ namespace hpx::execution { // NOLINTEND(bugprone-crtp-constructor-accessibility) parallel_policy_executor_base& operator=( - parallel_policy_executor_base const& rhs) + parallel_policy_executor_base const& rhs) noexcept { if (this != &rhs) { @@ -673,11 +684,15 @@ namespace hpx::execution { public: /// \cond NOINTERNAL + constexpr hpx::execution::experimental::executor_scheduler< + parallel_policy_executor> + query(hpx::execution::experimental::get_scheduler_t) const noexcept; + constexpr bool operator==( parallel_policy_executor const& rhs) const noexcept { return base_type::policy_ == rhs.policy_ && - base_type::pool_ == rhs.pool; + base_type::pool_ == rhs.pool_; } constexpr bool operator!=( @@ -1021,6 +1036,10 @@ namespace hpx::execution { return *this; } + constexpr hpx::execution::experimental::executor_scheduler< + parallel_policy_executor> + query(hpx::execution::experimental::get_scheduler_t) const noexcept; + private: /// \cond NOINTERNAL friend class hpx::serialization::access; @@ -1192,3 +1211,19 @@ namespace hpx::execution::experimental { }; /// \endcond } // namespace hpx::execution::experimental + +// Break circular dependency: executor_scheduler.hpp includes post.hpp which +// references parallel_executor. Include it here after the class is complete. +#include + +namespace hpx::execution { + template + constexpr hpx::execution::experimental::executor_scheduler< + parallel_policy_executor> + parallel_policy_executor::query( + hpx::execution::experimental::get_scheduler_t) const noexcept + { + return hpx::execution::experimental::executor_scheduler< + parallel_policy_executor>(*this); + } +} // namespace hpx::execution diff --git a/libs/core/executors/include/hpx/executors/restricted_thread_pool_executor.hpp b/libs/core/executors/include/hpx/executors/restricted_thread_pool_executor.hpp index 377c22b66d6..2411b35e298 100644 --- a/libs/core/executors/include/hpx/executors/restricted_thread_pool_executor.hpp +++ b/libs/core/executors/include/hpx/executors/restricted_thread_pool_executor.hpp @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -62,7 +63,8 @@ namespace hpx::execution::experimental { exec_, num_threads); } - restricted_policy_executor(restricted_policy_executor const& other) + restricted_policy_executor( + restricted_policy_executor const& other) noexcept : first_thread_(other.first_thread_) , os_thread_(other.os_thread_.load()) , exec_(other.exec_) @@ -70,7 +72,7 @@ namespace hpx::execution::experimental { } restricted_policy_executor& operator=( - restricted_policy_executor const& rhs) + restricted_policy_executor const& rhs) noexcept { first_thread_ = rhs.first_thread_; os_thread_ = rhs.os_thread_.load(); @@ -226,6 +228,15 @@ namespace hpx::execution::experimental { } /// \endcond + public: + constexpr hpx::execution::experimental::executor_scheduler< + restricted_policy_executor> + query(hpx::execution::experimental::get_scheduler_t) const noexcept + { + return hpx::execution::experimental::executor_scheduler< + restricted_policy_executor>(*this); + } + private: std::uint16_t first_thread_; mutable std::atomic os_thread_; diff --git a/libs/core/executors/include/hpx/executors/sequenced_executor.hpp b/libs/core/executors/include/hpx/executors/sequenced_executor.hpp index 6d61e286ba3..31e96c38e66 100644 --- a/libs/core/executors/include/hpx/executors/sequenced_executor.hpp +++ b/libs/core/executors/include/hpx/executors/sequenced_executor.hpp @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -236,6 +237,17 @@ namespace hpx::execution { #endif } + public: + /// \cond NOINTERNAL + constexpr hpx::execution::experimental::executor_scheduler< + sequenced_executor> + query(hpx::execution::experimental::get_scheduler_t) const noexcept + { + return hpx::execution::experimental::executor_scheduler< + sequenced_executor>(*this); + } + /// \endcond + private: friend class hpx::serialization::access; diff --git a/libs/core/executors/tests/unit/CMakeLists.txt b/libs/core/executors/tests/unit/CMakeLists.txt index cfc469a0704..e8a9df5c75b 100644 --- a/libs/core/executors/tests/unit/CMakeLists.txt +++ b/libs/core/executors/tests/unit/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2025 The STE||AR-Group +# Copyright (c) 2020-2026 The STE||AR-Group # # SPDX-License-Identifier: BSL-1.0 # Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -9,6 +9,7 @@ set(tests annotation_property created_executor execution_policy_mappings + executor_scheduler explicit_scheduler_executor fork_join_executor fork_join_executor_from diff --git a/libs/core/executors/tests/unit/executor_scheduler.cpp b/libs/core/executors/tests/unit/executor_scheduler.cpp new file mode 100644 index 00000000000..6f1e56040bf --- /dev/null +++ b/libs/core/executors/tests/unit/executor_scheduler.cpp @@ -0,0 +1,137 @@ +// Copyright (c) 2026 The STE||AR-Group +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +/////////////////////////////////////////////////////////////////////////////// +void test_executor_scheduler_schedule() +{ + using namespace hpx::execution::experimental; + + hpx::execution::sequenced_executor exec; + + // Retrieve a P2300-compliant scheduler from the legacy executor + // using the native query() member function path + auto sched = exec.query(get_scheduler_t{}); + + // Verify the scheduler satisfies the is_scheduler trait + static_assert(is_scheduler_v, + "executor_scheduler must satisfy is_scheduler"); + + // Capture the main thread's ID + auto main_tid = hpx::this_thread::get_id(); + + // Create a sender pipeline: schedule | then + auto s = + then(schedule(sched), [&]() { return hpx::this_thread::get_id(); }); + + // Execute synchronously and verify the result + auto result = hpx::this_thread::experimental::sync_wait(std::move(s)); + + HPX_TEST(result.has_value()); + + // For a sequenced executor, the work runs inline on the calling thread + auto executed_tid = hpx::get<0>(*result); + HPX_TEST_EQ(executed_tid, main_tid); +} + +/////////////////////////////////////////////////////////////////////////////// +void test_executor_scheduler_schedule_parallel() +{ + using namespace hpx::execution::experimental; + + hpx::execution::parallel_executor exec; + + // Retrieve a P2300-compliant scheduler from the legacy executor + auto sched = exec.query(get_scheduler_t{}); + + // Verify the scheduler satisfies the is_scheduler trait + static_assert(is_scheduler_v, + "executor_scheduler must satisfy is_scheduler"); + + // Capture the main thread's ID + auto main_tid = hpx::this_thread::get_id(); + + // Create a sender pipeline: schedule | then + auto s = + then(schedule(sched), [&]() { return hpx::this_thread::get_id(); }); + + // Execute synchronously and verify the result + auto result = hpx::this_thread::experimental::sync_wait(std::move(s)); + + HPX_TEST(result.has_value()); + + // For a parallel executor, the work may run on a different thread + auto executed_tid = hpx::get<0>(*result); + HPX_TEST_NEQ(executed_tid, hpx::thread::id()); // valid thread ID + (void) main_tid; // used only for documentation +} + +/////////////////////////////////////////////////////////////////////////////// +void test_executor_scheduler_schedule_restricted() +{ + using namespace hpx::execution::experimental; + + hpx::execution::experimental::restricted_thread_pool_executor exec; + + // Retrieve a P2300-compliant scheduler from the legacy executor + auto sched = exec.query(get_scheduler_t{}); + + // Verify the scheduler satisfies the is_scheduler trait + static_assert(is_scheduler_v, + "executor_scheduler must satisfy is_scheduler"); + + // Capture the main thread's ID + auto main_tid = hpx::this_thread::get_id(); + + // Create a sender pipeline: schedule | then + auto s = + then(schedule(sched), [&]() { return hpx::this_thread::get_id(); }); + + // Execute synchronously and verify the result + auto result = hpx::this_thread::experimental::sync_wait(std::move(s)); + + HPX_TEST(result.has_value()); + + // For a restricted executor, the work may run on a different thread + auto executed_tid = hpx::get<0>(*result); + HPX_TEST_NEQ(executed_tid, hpx::thread::id()); // valid thread ID + (void) main_tid; // used only for documentation +} + +/////////////////////////////////////////////////////////////////////////////// +int hpx_main() +{ + test_executor_scheduler_schedule(); + test_executor_scheduler_schedule_parallel(); + test_executor_scheduler_schedule_restricted(); + + return hpx::local::finalize(); +} + +int main(int argc, char* argv[]) +{ + std::vector const cfg = {"hpx.os_threads=all"}; + + hpx::local::init_params init_args; + init_args.cfg = cfg; + + HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv, init_args), 0, + "HPX main exited with non-zero status"); + + return hpx::util::report_errors(); +}