From 6b3d7be83329e7604946c29c474afe642a743875 Mon Sep 17 00:00:00 2001 From: Hackathon User Date: Sat, 9 May 2026 02:56:00 +0530 Subject: [PATCH 1/4] executors: Add P2300 get_scheduler bridge for fork_join_executor Add executor_scheduler infrastructure (executor_scheduler, executor_sender, executor_operation_state) that wraps any HPX executor as a P2300 scheduler. This enables executors to participate in sender/receiver pipelines via stdexec::schedule(). Add a get_scheduler tag_invoke overload to fork_join_executor that returns an executor_scheduler. This allows: fork_join_executor exec{}; auto sched = get_scheduler(exec); sync_wait(then(schedule(sched), []{ return 42; })); The tag_invoke uses a default template parameter with a requires clause to defer instantiation of the incomplete executor_scheduler return type, following the established pattern from the parallel and sequenced executor bridges. New files: - executor_scheduler.hpp: Full scheduler/sender/operation_state impl - fwd/executor_scheduler_fwd.hpp: Forward declarations only Modified files: - fork_join_executor.hpp: Add fwd include + get_scheduler friend - fork_join_executor.cpp: Add test_get_scheduler() with 3 subtests - CMakeLists.txt: Register new headers --- libs/core/executors/CMakeLists.txt | 2 + .../hpx/executors/executor_scheduler.hpp | 142 ++++++++++++++++++ .../hpx/executors/fork_join_executor.hpp | 21 +++ .../executors/fwd/executor_scheduler_fwd.hpp | 25 +++ .../tests/unit/fork_join_executor.cpp | 29 ++++ 5 files changed, 219 insertions(+) create mode 100644 libs/core/executors/include/hpx/executors/executor_scheduler.hpp create mode 100644 libs/core/executors/include/hpx/executors/fwd/executor_scheduler_fwd.hpp diff --git a/libs/core/executors/CMakeLists.txt b/libs/core/executors/CMakeLists.txt index 8deb14943381..1eba41551fc9 100644 --- a/libs/core/executors/CMakeLists.txt +++ b/libs/core/executors/CMakeLists.txt @@ -26,8 +26,10 @@ set(executors_headers hpx/executors/execution_policy_parameters.hpp hpx/executors/execution_policy_scheduling_property.hpp hpx/executors/execution_policy.hpp + hpx/executors/executor_scheduler.hpp hpx/executors/explicit_scheduler_executor.hpp hpx/executors/fork_join_executor.hpp + hpx/executors/fwd/executor_scheduler_fwd.hpp hpx/executors/limiting_executor.hpp hpx/executors/macros.hpp hpx/executors/parallel_executor_aggregated.hpp diff --git a/libs/core/executors/include/hpx/executors/executor_scheduler.hpp b/libs/core/executors/include/hpx/executors/executor_scheduler.hpp new file mode 100644 index 000000000000..23ebd898e9b5 --- /dev/null +++ b/libs/core/executors/include/hpx/executors/executor_scheduler.hpp @@ -0,0 +1,142 @@ +// Copyright (c) 2026 The STE||AR-Group +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +/// \file parallel/executors/executor_scheduler.hpp + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace hpx::execution::experimental { + /////////////////////////////////////////////////////////////////////////// + HPX_CXX_CORE_EXPORT template + struct executor_operation_state + { + HPX_NO_UNIQUE_ADDRESS std::decay_t exec_; + HPX_NO_UNIQUE_ADDRESS std::decay_t receiver_; + + template + executor_operation_state(Exec&& exec, Recv&& recv) + : exec_(HPX_FORWARD(Exec, exec)) + , receiver_(HPX_FORWARD(Recv, recv)) + { + } + + executor_operation_state(executor_operation_state&&) = delete; + executor_operation_state(executor_operation_state const&) = delete; + executor_operation_state& operator=( + executor_operation_state&&) = delete; + executor_operation_state& operator=( + executor_operation_state const&) = delete; + + ~executor_operation_state() = default; + + friend void tag_invoke(start_t, executor_operation_state& os) noexcept + { + hpx::detail::try_catch_exception_ptr( + [&]() { + hpx::parallel::execution::post(os.exec_, [&os]() mutable { + hpx::execution::experimental::set_value( + HPX_MOVE(os.receiver_)); + }); + }, + [&](std::exception_ptr ep) { + hpx::execution::experimental::set_error( + HPX_MOVE(os.receiver_), HPX_MOVE(ep)); + }); + } + }; + + /////////////////////////////////////////////////////////////////////////// + HPX_CXX_CORE_EXPORT template + struct executor_sender + { + using sender_concept = hpx::execution::experimental::sender_t; + + HPX_NO_UNIQUE_ADDRESS std::decay_t exec_; + + using completion_signatures = + hpx::execution::experimental::completion_signatures< + hpx::execution::experimental::set_value_t(), + hpx::execution::experimental::set_error_t(std::exception_ptr)>; + + template + friend auto tag_invoke( + hpx::execution::experimental::get_completion_signatures_t, + executor_sender const&, Env) noexcept -> completion_signatures; + + friend constexpr auto tag_invoke( + hpx::execution::experimental::get_completion_scheduler_t< + hpx::execution::experimental::set_value_t>, + executor_sender const& s) noexcept + { + return executor_scheduler{s.exec_}; + } + + template + friend executor_operation_state tag_invoke( + connect_t, executor_sender&& s, Receiver&& receiver) + { + return {HPX_MOVE(s.exec_), HPX_FORWARD(Receiver, receiver)}; + } + + template + friend executor_operation_state tag_invoke( + connect_t, executor_sender const& s, Receiver&& receiver) + { + return {s.exec_, HPX_FORWARD(Receiver, receiver)}; + } + }; + + /////////////////////////////////////////////////////////////////////////// + HPX_CXX_CORE_EXPORT template + struct executor_scheduler + { + using executor_type = std::decay_t; + + HPX_NO_UNIQUE_ADDRESS executor_type exec_; + + constexpr executor_scheduler() = default; + + template + requires(!std::is_same_v, executor_scheduler>) + explicit executor_scheduler(Exec&& exec) noexcept + : exec_(HPX_FORWARD(Exec, exec)) + { + } + + constexpr bool operator==(executor_scheduler const& rhs) const noexcept + { + return exec_ == rhs.exec_; + } + + constexpr bool operator!=(executor_scheduler const& rhs) const noexcept + { + return !(*this == rhs); + } + + friend executor_sender tag_invoke( + schedule_t, executor_scheduler&& sched) + { + return {HPX_MOVE(sched.exec_)}; + } + + friend executor_sender tag_invoke( + schedule_t, executor_scheduler const& sched) + { + return {sched.exec_}; + } + }; +} // namespace hpx::execution::experimental diff --git a/libs/core/executors/include/hpx/executors/fork_join_executor.hpp b/libs/core/executors/include/hpx/executors/fork_join_executor.hpp index 599501bea1f9..d2ac43188d94 100644 --- a/libs/core/executors/include/hpx/executors/fork_join_executor.hpp +++ b/libs/core/executors/include/hpx/executors/fork_join_executor.hpp @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -1240,6 +1241,14 @@ namespace hpx::execution::experimental { HPX_FORWARD(F, f), HPX_FORWARD(Fs, fs)...); } + template + friend void tag_invoke(hpx::parallel::execution::post_t, + fork_join_executor const& exec, F&& f, Ts&&... ts) + { + exec.shared_data_->async_invoke( + HPX_FORWARD(F, f), HPX_FORWARD(Ts, ts)...); + } + template requires(std::invocable && (std::invocable && ...)) friend decltype(auto) tag_invoke( @@ -1363,6 +1372,18 @@ namespace hpx::execution::experimental { return exec.shared_data_->num_threads_; } + // P2300 get_scheduler bridge: wraps this executor in an + // executor_scheduler so it can participate in sender/receiver + // pipelines. + template + requires std::is_same_v + friend hpx::execution::experimental::executor_scheduler + tag_invoke( + hpx::execution::experimental::get_scheduler_t, Exec const& exec) noexcept + { + return hpx::execution::experimental::executor_scheduler(exec); + } + /// \cond NOINTERNAL enum class init_mode : std::uint8_t { diff --git a/libs/core/executors/include/hpx/executors/fwd/executor_scheduler_fwd.hpp b/libs/core/executors/include/hpx/executors/fwd/executor_scheduler_fwd.hpp new file mode 100644 index 000000000000..7dfe15567d84 --- /dev/null +++ b/libs/core/executors/include/hpx/executors/fwd/executor_scheduler_fwd.hpp @@ -0,0 +1,25 @@ +// Copyright (c) 2026 The STE||AR-Group +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +/// \file parallel/executors/fwd/executor_scheduler_fwd.hpp + +#pragma once + +#include + +namespace hpx::execution::experimental { + + // Forward declarations, see executor_scheduler.hpp + HPX_CXX_CORE_EXPORT template + struct executor_scheduler; + + HPX_CXX_CORE_EXPORT template + struct executor_sender; + + HPX_CXX_CORE_EXPORT template + struct executor_operation_state; + +} // namespace hpx::execution::experimental diff --git a/libs/core/executors/tests/unit/fork_join_executor.cpp b/libs/core/executors/tests/unit/fork_join_executor.cpp index fc53babbee36..1d510eddf4cf 100644 --- a/libs/core/executors/tests/unit/fork_join_executor.cpp +++ b/libs/core/executors/tests/unit/fork_join_executor.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -511,11 +512,39 @@ void test_fork_join_static_large_range() HPX_TEST_EQ(sum.load(), expected_sum); } +/////////////////////////////////////////////////////////////////////////////// +/////////////////////////////////////////////////////////////////////////////// +void test_get_scheduler() +{ + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + + std::cerr << "test_get_scheduler\n"; + + // Test 1: get_scheduler returns a valid scheduler + fork_join_executor exec{}; + auto sched = ex::get_scheduler(exec); + + // Test 2: scheduler can be used to schedule work via then + sync_wait + auto result = hpx::get<0>( + *(tt::sync_wait(ex::then(ex::schedule(sched), []() { return 42; })))); + HPX_TEST_EQ(result, 42); + + // Test 3: scheduled work runs on an HPX worker thread + hpx::thread::id scheduled_thread_id{}; + tt::sync_wait(ex::then(ex::schedule(sched), + [&]() { scheduled_thread_id = hpx::this_thread::get_id(); })); + HPX_TEST(scheduled_thread_id != hpx::thread::id{}); +} + /////////////////////////////////////////////////////////////////////////////// int hpx_main() { static_check_executor(); + // P2300 get_scheduler bridge test + test_get_scheduler(); + // Call regression test for #6922 test_fork_join_static_large_range(); From d595df590c2d58b8d495a428f5ee1cd0b44c6958 Mon Sep 17 00:00:00 2001 From: Hackathon User Date: Sat, 9 May 2026 13:39:45 +0530 Subject: [PATCH 2/4] executors: Add P2300 get_scheduler bridge for fork_join_executor --- .../hpx/executors/fork_join_executor.hpp | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/libs/core/executors/include/hpx/executors/fork_join_executor.hpp b/libs/core/executors/include/hpx/executors/fork_join_executor.hpp index d2ac43188d94..6a0a0d9db9d0 100644 --- a/libs/core/executors/include/hpx/executors/fork_join_executor.hpp +++ b/libs/core/executors/include/hpx/executors/fork_join_executor.hpp @@ -11,7 +11,7 @@ #include #include -#include +#include #include #include #include @@ -1372,17 +1372,7 @@ namespace hpx::execution::experimental { return exec.shared_data_->num_threads_; } - // P2300 get_scheduler bridge: wraps this executor in an - // executor_scheduler so it can participate in sender/receiver - // pipelines. - template - requires std::is_same_v - friend hpx::execution::experimental::executor_scheduler - tag_invoke( - hpx::execution::experimental::get_scheduler_t, Exec const& exec) noexcept - { - return hpx::execution::experimental::executor_scheduler(exec); - } + /// \cond NOINTERNAL enum class init_mode : std::uint8_t @@ -1394,6 +1384,19 @@ namespace hpx::execution::experimental { /// \endcond }; + // P2300 get_scheduler bridge — defined outside the class so that + // executor_scheduler is fully defined at point of use. + // This allows fork_join_executor to participate in P2300 sender/receiver + // pipelines via schedule(get_scheduler(exec)). + inline auto tag_invoke( + hpx::execution::experimental::get_scheduler_t, + fork_join_executor const& exec) noexcept + -> hpx::execution::experimental::executor_scheduler + { + return hpx::execution::experimental::executor_scheduler< + fork_join_executor>(exec); + } + HPX_CXX_CORE_EXPORT HPX_CORE_EXPORT std::ostream& operator<<( std::ostream& os, fork_join_executor::loop_schedule schedule); From 5ca80b74d55f28377c401dcc7d5a48be73ddd7c4 Mon Sep 17 00:00:00 2001 From: Hackathon User Date: Sun, 10 May 2026 02:42:17 +0530 Subject: [PATCH 3/4] executors: Address hkaiser review comments on fork_join_executor P2300 bridge --- .../executors/include/hpx/executors/fork_join_executor.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libs/core/executors/include/hpx/executors/fork_join_executor.hpp b/libs/core/executors/include/hpx/executors/fork_join_executor.hpp index 6a0a0d9db9d0..d4bf41dead46 100644 --- a/libs/core/executors/include/hpx/executors/fork_join_executor.hpp +++ b/libs/core/executors/include/hpx/executors/fork_join_executor.hpp @@ -1245,7 +1245,7 @@ namespace hpx::execution::experimental { friend void tag_invoke(hpx::parallel::execution::post_t, fork_join_executor const& exec, F&& f, Ts&&... ts) { - exec.shared_data_->async_invoke( + exec.shared_data_->sync_invoke( HPX_FORWARD(F, f), HPX_FORWARD(Ts, ts)...); } @@ -1401,6 +1401,7 @@ namespace hpx::execution::experimental { std::ostream& os, fork_join_executor::loop_schedule schedule); /// \cond NOINTERNAL + template <> struct is_bulk_one_way_executor : std::true_type { From be760b0bee760d797ddb600c67ba266bff80d0d4 Mon Sep 17 00:00:00 2001 From: Hackathon User Date: Sun, 10 May 2026 13:24:00 +0530 Subject: [PATCH 4/4] executors: Add is_never_blocking_one_way_executor trait for fork_join_executor --- libs/core/executors/CMakeLists.txt | 2 +- .../include/hpx/executors/executor_scheduler.hpp | 2 +- .../executors/{fwd => }/executor_scheduler_fwd.hpp | 2 +- .../include/hpx/executors/fork_join_executor.hpp | 11 +++++++---- libs/core/executors/tests/unit/fork_join_executor.cpp | 2 +- 5 files changed, 11 insertions(+), 8 deletions(-) rename libs/core/executors/include/hpx/executors/{fwd => }/executor_scheduler_fwd.hpp (92%) diff --git a/libs/core/executors/CMakeLists.txt b/libs/core/executors/CMakeLists.txt index 1eba41551fc9..1f4b432f743c 100644 --- a/libs/core/executors/CMakeLists.txt +++ b/libs/core/executors/CMakeLists.txt @@ -27,9 +27,9 @@ set(executors_headers hpx/executors/execution_policy_scheduling_property.hpp hpx/executors/execution_policy.hpp hpx/executors/executor_scheduler.hpp + hpx/executors/executor_scheduler_fwd.hpp hpx/executors/explicit_scheduler_executor.hpp hpx/executors/fork_join_executor.hpp - hpx/executors/fwd/executor_scheduler_fwd.hpp hpx/executors/limiting_executor.hpp hpx/executors/macros.hpp hpx/executors/parallel_executor_aggregated.hpp diff --git a/libs/core/executors/include/hpx/executors/executor_scheduler.hpp b/libs/core/executors/include/hpx/executors/executor_scheduler.hpp index 23ebd898e9b5..ab1dcf48ff1b 100644 --- a/libs/core/executors/include/hpx/executors/executor_scheduler.hpp +++ b/libs/core/executors/include/hpx/executors/executor_scheduler.hpp @@ -9,7 +9,7 @@ #pragma once #include -#include +#include #include #include #include diff --git a/libs/core/executors/include/hpx/executors/fwd/executor_scheduler_fwd.hpp b/libs/core/executors/include/hpx/executors/executor_scheduler_fwd.hpp similarity index 92% rename from libs/core/executors/include/hpx/executors/fwd/executor_scheduler_fwd.hpp rename to libs/core/executors/include/hpx/executors/executor_scheduler_fwd.hpp index 7dfe15567d84..dc0a00e95a87 100644 --- a/libs/core/executors/include/hpx/executors/fwd/executor_scheduler_fwd.hpp +++ b/libs/core/executors/include/hpx/executors/executor_scheduler_fwd.hpp @@ -4,7 +4,7 @@ // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -/// \file parallel/executors/fwd/executor_scheduler_fwd.hpp +/// \file parallel/executors/executor_scheduler_fwd.hpp #pragma once diff --git a/libs/core/executors/include/hpx/executors/fork_join_executor.hpp b/libs/core/executors/include/hpx/executors/fork_join_executor.hpp index d4bf41dead46..bf928f233afc 100644 --- a/libs/core/executors/include/hpx/executors/fork_join_executor.hpp +++ b/libs/core/executors/include/hpx/executors/fork_join_executor.hpp @@ -1372,8 +1372,6 @@ namespace hpx::execution::experimental { return exec.shared_data_->num_threads_; } - - /// \cond NOINTERNAL enum class init_mode : std::uint8_t { @@ -1388,8 +1386,7 @@ namespace hpx::execution::experimental { // executor_scheduler is fully defined at point of use. // This allows fork_join_executor to participate in P2300 sender/receiver // pipelines via schedule(get_scheduler(exec)). - inline auto tag_invoke( - hpx::execution::experimental::get_scheduler_t, + inline auto tag_invoke(hpx::execution::experimental::get_scheduler_t, fork_join_executor const& exec) noexcept -> hpx::execution::experimental::executor_scheduler { @@ -1402,6 +1399,12 @@ namespace hpx::execution::experimental { /// \cond NOINTERNAL + template <> + struct is_never_blocking_one_way_executor + : std::true_type + { + }; + template <> struct is_bulk_one_way_executor : std::true_type { diff --git a/libs/core/executors/tests/unit/fork_join_executor.cpp b/libs/core/executors/tests/unit/fork_join_executor.cpp index 1d510eddf4cf..7a50507d77eb 100644 --- a/libs/core/executors/tests/unit/fork_join_executor.cpp +++ b/libs/core/executors/tests/unit/fork_join_executor.cpp @@ -8,9 +8,9 @@ #include #include #include -#include #include #include +#include #include #include