diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/for_each_index.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/for_each_index.hpp index 900b7701d501..135609dba74b 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/for_each_index.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/for_each_index.hpp @@ -387,11 +387,10 @@ namespace hpx::parallel::detail { ExPolicy> || has_scheduler_executor) { - return util::detail::algorithm_result:: - get(util::partitioner::call( - HPX_FORWARD(ExPolicy, policy), first, count, - HPX_MOVE(iter_fun), - hpx::util::empty_function{})); + return util::call_with_algorithm_result( + HPX_FORWARD(ExPolicy, policy), first, count, + HPX_MOVE(iter_fun), + hpx::util::empty_function{}); } else { @@ -428,10 +427,9 @@ namespace hpx::parallel::detail { if constexpr (hpx::is_async_execution_policy_v || has_scheduler_executor) { - return util::detail::algorithm_result::get( - util::partitioner::call( - HPX_FORWARD(ExPolicy, policy), first, count, - HPX_MOVE(iter_fun), hpx::util::empty_function{})); + return util::call_with_algorithm_result( + HPX_FORWARD(ExPolicy, policy), first, count, + HPX_MOVE(iter_fun), hpx::util::empty_function{}); } else { diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/for_loop.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/for_loop.hpp index 90ae90f01351..46a650e06d95 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/for_loop.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/for_loop.hpp @@ -1215,11 +1215,10 @@ namespace hpx::parallel { if constexpr (hpx::is_async_execution_policy_v || is_scheduler_policy) { - return util::detail::algorithm_result::get( - util::partitioner::call( - HPX_FORWARD(ExPolicy, policy), iter_or_r, size, - part_iterations{HPX_FORWARD(F, f)}, - hpx::util::empty_function{})); + return util::call_with_algorithm_result( + HPX_FORWARD(ExPolicy, policy), iter_or_r, size, + part_iterations{HPX_FORWARD(F, f)}, + hpx::util::empty_function{}); } else { diff --git a/libs/core/algorithms/include/hpx/parallel/util/foreach_partitioner.hpp b/libs/core/algorithms/include/hpx/parallel/util/foreach_partitioner.hpp index 15c307837075..280c25d535d3 100644 --- a/libs/core/algorithms/include/hpx/parallel/util/foreach_partitioner.hpp +++ b/libs/core/algorithms/include/hpx/parallel/util/foreach_partitioner.hpp @@ -73,8 +73,7 @@ namespace hpx::parallel::util::detail { // We attempt to perform some optimizations in case of non-task // execution. - if constexpr (!hpx::is_async_execution_policy_v && - !hpx::execution_policy_has_scheduler_executor_v) + if constexpr (!hpx::is_async_execution_policy_v) { // Switch to sequential execution for one-core, one-chunk case // if the executor supports it. diff --git a/libs/core/algorithms/include/hpx/parallel/util/partitioner.hpp b/libs/core/algorithms/include/hpx/parallel/util/partitioner.hpp index 396d2660817a..2024dd21981c 100644 --- a/libs/core/algorithms/include/hpx/parallel/util/partitioner.hpp +++ b/libs/core/algorithms/include/hpx/parallel/util/partitioner.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -75,8 +76,7 @@ namespace hpx::parallel::util::detail { // We attempt to perform some optimizations in case of non-task // execution. if constexpr (Optimize && - !hpx::is_async_execution_policy_v && - !hpx::execution_policy_has_scheduler_executor_v) + !hpx::is_async_execution_policy_v) { // Switch to sequential execution for one-core, one-chunk case // if the executor supports it. @@ -700,4 +700,25 @@ namespace hpx::parallel::util { detail::task_static_partitioner>::template apply { }; + + // Helper to call partitioner and wrap the result with + // algorithm_result::get(). Handles both void and non-void return types. + template + decltype(auto) call_with_algorithm_result(ExPolicy&& policy, Args&&... args) + { + if constexpr (std::is_void_v::call( + HPX_FORWARD(ExPolicy, policy), + HPX_FORWARD(Args, args)...))>) + { + partitioner::call( + HPX_FORWARD(ExPolicy, policy), HPX_FORWARD(Args, args)...); + return detail::algorithm_result::get(); + } + else + { + return detail::algorithm_result::get( + partitioner::call( + HPX_FORWARD(ExPolicy, policy), HPX_FORWARD(Args, args)...)); + } + } } // namespace hpx::parallel::util diff --git a/libs/core/algorithms/tests/performance/foreach_report.cpp b/libs/core/algorithms/tests/performance/foreach_report.cpp index 0ee6030a1f70..e5ba3cfd100c 100644 --- a/libs/core/algorithms/tests/performance/foreach_report.cpp +++ b/libs/core/algorithms/tests/performance/foreach_report.cpp @@ -82,6 +82,15 @@ int hpx_main(hpx::program_options::variables_map& vm) [&]() { measure_parallel_foreach(data_representation, exec); }); } + { + hpx::execution::experimental::scheduler_executor< + hpx::execution::experimental::parallel_scheduler> + exec(hpx::execution::experimental::get_parallel_scheduler()); + hpx::util::perftests_report("for_each", "parallel_scheduler", + test_count, + [&]() { measure_parallel_foreach(data_representation, exec); }); + } + { hpx::execution::parallel_executor exec; hpx::util::perftests_report("for_each", "parallel_executor", diff --git a/libs/core/async_cuda/include/hpx/async_cuda/transform_stream.hpp b/libs/core/async_cuda/include/hpx/async_cuda/transform_stream.hpp index 892a61aa66ae..4d91208cdb4e 100644 --- a/libs/core/async_cuda/include/hpx/async_cuda/transform_stream.hpp +++ b/libs/core/async_cuda/include/hpx/async_cuda/transform_stream.hpp @@ -312,14 +312,18 @@ namespace hpx::cuda::experimental { S, Env>{}, invoke_function_transformation_fn{}, default_set_error_fn{}, - hpx::execution::experimental::ignore_completion{})) + hpx::execution::experimental::ignore_completion{}, + hpx::execution::experimental::completion_signatures< + hpx::execution::experimental::set_stopped_t()>{})) { return hpx::execution::experimental::transform_completion_signatures( hpx::execution::experimental::completion_signatures_of_t< S, Env>{}, invoke_function_transformation_fn{}, default_set_error_fn{}, - hpx::execution::experimental::ignore_completion{}); + hpx::execution::experimental::ignore_completion{}, + hpx::execution::experimental::completion_signatures< + hpx::execution::experimental::set_stopped_t()>{}); } // clang-format on diff --git a/libs/core/async_mpi/include/hpx/async_mpi/transform_mpi.hpp b/libs/core/async_mpi/include/hpx/async_mpi/transform_mpi.hpp index 1fdf6c31c17a..aac60aabc4d2 100644 --- a/libs/core/async_mpi/include/hpx/async_mpi/transform_mpi.hpp +++ b/libs/core/async_mpi/include/hpx/async_mpi/transform_mpi.hpp @@ -189,14 +189,18 @@ namespace hpx::mpi::experimental { Sender, Env>{}, invoke_function_transformation_fn{}, default_set_error_fn{}, - hpx::execution::experimental::ignore_completion{})) + hpx::execution::experimental::ignore_completion{}, + hpx::execution::experimental::completion_signatures< + hpx::execution::experimental::set_stopped_t()>{})) { return hpx::execution::experimental::transform_completion_signatures( hpx::execution::experimental::completion_signatures_of_t< Sender, Env>{}, invoke_function_transformation_fn{}, default_set_error_fn{}, - hpx::execution::experimental::ignore_completion{}); + hpx::execution::experimental::ignore_completion{}, + hpx::execution::experimental::completion_signatures< + hpx::execution::experimental::set_stopped_t()>{}); } // clang-format on diff --git a/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp b/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp index cb21911acb8b..c0f1c089f118 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp @@ -18,222 +18,220 @@ #include #include -namespace hpx::execution::experimental { - namespace detail { +namespace hpx::execution::experimental { namespace detail { - /////////////////////////////////////////////////////////////////////////// - // Operation state for sender compatibility - HPX_CXX_CORE_EXPORT template - class as_sender_operation_state + /////////////////////////////////////////////////////////////////////////// + // Operation state for sender compatibility + HPX_CXX_CORE_EXPORT template + class as_sender_operation_state + { + private: + using receiver_type = std::decay_t; + using future_type = std::decay_t; + using result_type = typename future_type::result_type; + + public: + template + as_sender_operation_state(Receiver_&& r, future_type f) + : receiver_(HPX_FORWARD(Receiver_, r)) + , future_(HPX_MOVE(f)) { - private: - using receiver_type = std::decay_t; - using future_type = std::decay_t; - using result_type = typename future_type::result_type; - - public: - template - as_sender_operation_state(Receiver_&& r, future_type f) - : receiver_(HPX_FORWARD(Receiver_, r)) - , future_(HPX_MOVE(f)) - { - } - - as_sender_operation_state(as_sender_operation_state&&) = delete; - as_sender_operation_state& operator=( - as_sender_operation_state&&) = delete; - as_sender_operation_state( - as_sender_operation_state const&) = delete; - as_sender_operation_state& operator=( - as_sender_operation_state const&) = delete; - - void start() & noexcept - { - start_helper(); - } - - private: - void start_helper() & noexcept - { - hpx::detail::try_catch_exception_ptr( - [&]() { - auto state = traits::detail::get_shared_state(future_); - - if (!state) - { - HPX_THROW_EXCEPTION(hpx::error::no_state, - "as_sender_operation_state::start", - "the future has no valid shared state"); - } + } - auto on_completed = [this]() mutable { - if (future_.has_value()) - { - if constexpr (std::is_void_v) - { - hpx::execution::experimental::set_value( - HPX_MOVE(receiver_)); - } - else - { - hpx::execution::experimental::set_value( - HPX_MOVE(receiver_), future_.get()); - } - } - else if (future_.has_exception()) - { - hpx::execution::experimental::set_error( - HPX_MOVE(receiver_), - future_.get_exception_ptr()); - } - }; + as_sender_operation_state(as_sender_operation_state&&) = delete; + as_sender_operation_state& operator=( + as_sender_operation_state&&) = delete; + as_sender_operation_state(as_sender_operation_state const&) = delete; + as_sender_operation_state& operator=( + as_sender_operation_state const&) = delete; - if (!state->is_ready(std::memory_order_relaxed)) - { - state->execute_deferred(); + void start() & noexcept + { + start_helper(); + } - // execute_deferred might have made the future ready - if (!state->is_ready(std::memory_order_relaxed)) + private: + void start_helper() & noexcept + { + hpx::detail::try_catch_exception_ptr( + [&]() { + auto state = traits::detail::get_shared_state(future_); + + if (!state) + { + HPX_THROW_EXCEPTION(hpx::error::no_state, + "as_sender_operation_state::start", + "the future has no valid shared state"); + } + + auto on_completed = [this]() mutable { + if (future_.has_value()) + { + if constexpr (std::is_void_v) { - // The operation state has to be kept alive until - // set_value is called, which means that we don't - // need to move receiver and future into the - // on_completed callback. - state->set_on_completed(HPX_MOVE(on_completed)); + hpx::execution::experimental::set_value( + HPX_MOVE(receiver_)); } else { - on_completed(); + hpx::execution::experimental::set_value( + HPX_MOVE(receiver_), future_.get()); } } + else if (future_.has_exception()) + { + hpx::execution::experimental::set_error( + HPX_MOVE(receiver_), + future_.get_exception_ptr()); + } + }; + + if (!state->is_ready(std::memory_order_relaxed)) + { + state->execute_deferred(); + + // execute_deferred might have made the future ready + if (!state->is_ready(std::memory_order_relaxed)) + { + // The operation state has to be kept alive until + // set_value is called, which means that we don't + // need to move receiver and future into the + // on_completed callback. + state->set_on_completed(HPX_MOVE(on_completed)); + } else { on_completed(); } - }, - [&](std::exception_ptr ep) { - hpx::execution::experimental::set_error( - HPX_MOVE(receiver_), HPX_MOVE(ep)); - }); - } - - HPX_NO_UNIQUE_ADDRESS std::decay_t receiver_; - future_type future_; + } + else + { + on_completed(); + } + }, + [&](std::exception_ptr ep) { + hpx::execution::experimental::set_error( + HPX_MOVE(receiver_), HPX_MOVE(ep)); + }); + } + + HPX_NO_UNIQUE_ADDRESS std::decay_t receiver_; + future_type future_; + }; + + HPX_CXX_CORE_EXPORT template + struct as_sender_sender_base + { + using result_type = typename std::decay_t::result_type; + + std::decay_t future_; + + template + struct set_value_void_checked + { + using type = hpx::execution::experimental::set_value_t( + _result_type); }; - HPX_CXX_CORE_EXPORT template - struct as_sender_sender_base + template + struct set_value_void_checked { - using result_type = typename std::decay_t::result_type; - - std::decay_t future_; - - template - struct set_value_void_checked - { - using type = hpx::execution::experimental::set_value_t( - _result_type); - }; - - template - struct set_value_void_checked - { - using type = hpx::execution::experimental::set_value_t(); - }; - - using completion_signatures = - hpx::execution::experimental::completion_signatures< - typename set_value_void_checked, - result_type>::type, - hpx::execution::experimental::set_error_t( - std::exception_ptr)>; + using type = hpx::execution::experimental::set_value_t(); }; - HPX_CXX_CORE_EXPORT template - struct as_sender_sender; + using completion_signatures = + hpx::execution::experimental::completion_signatures< + typename set_value_void_checked, + result_type>::type, + hpx::execution::experimental::set_error_t(std::exception_ptr)>; + }; - template - struct as_sender_sender> - : public as_sender_sender_base> + HPX_CXX_CORE_EXPORT template + struct as_sender_sender; + + template + struct as_sender_sender> + : public as_sender_sender_base> + { + using sender_concept = hpx::execution::experimental::sender_t; + using future_type = hpx::future; + using base_type = as_sender_sender_base>; + using base_type::future_; + + template + requires(!std::is_same_v, as_sender_sender>) + explicit as_sender_sender(Future&& future) + : base_type{HPX_FORWARD(Future, future)} { - using sender_concept = hpx::execution::experimental::sender_t; - using future_type = hpx::future; - using base_type = as_sender_sender_base>; - using base_type::future_; - - template , as_sender_sender>>> - explicit as_sender_sender(Future&& future) - : base_type{HPX_FORWARD(Future, future)} - { - } - - as_sender_sender(as_sender_sender&&) = default; - as_sender_sender& operator=(as_sender_sender&&) = default; - as_sender_sender(as_sender_sender const&) = delete; - as_sender_sender& operator=(as_sender_sender const&) = delete; - - template - static consteval auto get_completion_signatures() noexcept -> - typename base_type::completion_signatures - { - return {}; - } - - template - auto connect(Receiver&& receiver) && - { - return as_sender_operation_state{ - HPX_FORWARD(Receiver, receiver), HPX_MOVE(future_)}; - } - }; + } + + as_sender_sender(as_sender_sender&&) = default; + as_sender_sender& operator=(as_sender_sender&&) = default; + as_sender_sender(as_sender_sender const&) = delete; + as_sender_sender& operator=(as_sender_sender const&) = delete; - template - struct as_sender_sender> - : as_sender_sender_base> + template + static consteval auto get_completion_signatures() noexcept -> + typename base_type::completion_signatures { - using sender_concept = hpx::execution::experimental::sender_t; - using future_type = hpx::shared_future; - using base_type = as_sender_sender_base>; - using base_type::future_; - - template , as_sender_sender>>> - explicit as_sender_sender(Future&& future) - : base_type{HPX_FORWARD(Future, future)} - { - } - - as_sender_sender(as_sender_sender&&) = default; - as_sender_sender& operator=(as_sender_sender&&) = default; - as_sender_sender(as_sender_sender const&) = default; - as_sender_sender& operator=(as_sender_sender const&) = default; - - template - static consteval auto get_completion_signatures() noexcept -> - typename base_type::completion_signatures - { - return {}; - } - - template - auto connect(Receiver&& receiver) && - { - return as_sender_operation_state{ - HPX_FORWARD(Receiver, receiver), HPX_MOVE(future_)}; - } - - template - auto connect(Receiver&& receiver) & - { - return as_sender_operation_state{ - HPX_FORWARD(Receiver, receiver), future_}; - } - }; - } // namespace detail + return {}; + } + template + auto connect(Receiver&& receiver) && + { + return as_sender_operation_state{ + HPX_FORWARD(Receiver, receiver), HPX_MOVE(future_)}; + } + }; +}} // namespace hpx::execution::experimental::detail + +namespace hpx::execution::experimental { namespace detail { + template + struct as_sender_sender> + : as_sender_sender_base> + { + using sender_concept = hpx::execution::experimental::sender_t; + using future_type = hpx::shared_future; + using base_type = as_sender_sender_base>; + using base_type::future_; + + template + requires(!std::is_same_v, as_sender_sender>) + explicit as_sender_sender(Future&& future) + : base_type{HPX_FORWARD(Future, future)} + { + } + + as_sender_sender(as_sender_sender&&) = default; + as_sender_sender& operator=(as_sender_sender&&) = default; + as_sender_sender(as_sender_sender const&) = default; + as_sender_sender& operator=(as_sender_sender const&) = default; + + template + static consteval auto get_completion_signatures() noexcept -> + typename base_type::completion_signatures + { + return {}; + } + + template + auto connect(Receiver&& receiver) && + { + return as_sender_operation_state{ + HPX_FORWARD(Receiver, receiver), HPX_MOVE(future_)}; + } + + template + auto connect(Receiver&& receiver) & + { + return as_sender_operation_state{ + HPX_FORWARD(Receiver, receiver), future_}; + } + }; +}} // namespace hpx::execution::experimental::detail + +namespace hpx::execution::experimental { // The as_sender CPO can be used to adapt any HPX future as a sender. The // value provided by the future will be used to call set_value on the // connected receiver once the future has become ready. If the future is diff --git a/libs/core/execution/include/hpx/execution/algorithms/bulk.hpp b/libs/core/execution/include/hpx/execution/algorithms/bulk.hpp index 526949664059..57660f875ba8 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/bulk.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/bulk.hpp @@ -9,8 +9,6 @@ #include -#include - #include #include #include @@ -82,7 +80,8 @@ namespace hpx::execution::experimental { hpx::execution::experimental:: completion_signatures_of_t{}, default_set_value_fn{}, default_set_error_fn{}, - hpx::execution::experimental::ignore_completion{}, + hpx::execution::experimental::keep_completion< + hpx::execution::experimental::set_stopped_t>{}, hpx::execution::experimental::completion_signatures< hpx::execution::experimental::set_error_t( std::exception_ptr)>{})) diff --git a/libs/core/execution/include/hpx/execution/algorithms/keep_future.hpp b/libs/core/execution/include/hpx/execution/algorithms/keep_future.hpp index 3a939a878e70..fe987193657c 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/keep_future.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/keep_future.hpp @@ -67,7 +67,8 @@ namespace hpx::execution::experimental { hpx::execution::experimental::set_value_t( std::decay_t), hpx::execution::experimental::set_error_t( - std::exception_ptr)>; + std::exception_ptr), + hpx::execution::experimental::set_stopped_t()>; }; HPX_CXX_CORE_EXPORT template diff --git a/libs/core/execution_base/include/hpx/execution_base/stdexec_forward.hpp b/libs/core/execution_base/include/hpx/execution_base/stdexec_forward.hpp index 7870a3748d83..908afc487052 100644 --- a/libs/core/execution_base/include/hpx/execution_base/stdexec_forward.hpp +++ b/libs/core/execution_base/include/hpx/execution_base/stdexec_forward.hpp @@ -51,6 +51,7 @@ #include #include #include +#include #include #include #include @@ -186,10 +187,12 @@ namespace hpx::execution::experimental { HPX_CXX_CORE_EXPORT using stdexec::transfer; HPX_CXX_CORE_EXPORT using stdexec::transfer_t; - // Bulk (HPX provides its own bulk CPO, but still forwards chunked variants - // used by the thread pool scheduler domain customization on current master) - // HPX_CXX_CORE_EXPORT using stdexec::bulk; - // HPX_CXX_CORE_EXPORT using stdexec::bulk_t; + // Sender for + HPX_CXX_CORE_EXPORT using exec::sender_for; + + // Bulk operations + // Note: HPX defines its own bulk/bulk_t CPO in execution/algorithms/bulk.hpp, + // so we cannot import stdexec::bulk or stdexec::bulk_t here. HPX_CXX_CORE_EXPORT using stdexec::bulk_chunked; HPX_CXX_CORE_EXPORT using stdexec::bulk_chunked_t; HPX_CXX_CORE_EXPORT using stdexec::bulk_unchunked; @@ -198,7 +201,10 @@ namespace hpx::execution::experimental { // Execution policies HPX_CXX_CORE_EXPORT using stdexec::is_execution_policy; HPX_CXX_CORE_EXPORT using stdexec::is_execution_policy_v; - + HPX_CXX_CORE_EXPORT using stdexec::sequenced_policy; + HPX_CXX_CORE_EXPORT using stdexec::parallel_policy; + HPX_CXX_CORE_EXPORT using stdexec::parallel_unsequenced_policy; + HPX_CXX_CORE_EXPORT using stdexec::unsequenced_policy; HPX_CXX_CORE_EXPORT inline constexpr stdexec::parallel_policy par{}; HPX_CXX_CORE_EXPORT inline constexpr stdexec::parallel_unsequenced_policy par_unseq{}; @@ -335,6 +341,11 @@ namespace hpx::execution::experimental { HPX_CXX_CORE_EXPORT using stdexec::operation_state; + // sender invokes + HPX_CXX_CORE_EXPORT template + inline constexpr bool sender_invokes_algorithm_v = + stdexec::__sender_for; + namespace stdexec_non_standard_tag_invoke { // Presently, the stdexec repository implements tag invoke, @@ -360,7 +371,6 @@ namespace hpx::execution::experimental { // Additional stdexec concepts and utilities needed for domain customization HPX_CXX_CORE_EXPORT using stdexec::__completes_on; - HPX_CXX_CORE_EXPORT using stdexec::__sender_for; } // namespace stdexec_internal } // namespace hpx::execution::experimental diff --git a/libs/core/executors/CMakeLists.txt b/libs/core/executors/CMakeLists.txt index 8deb14943381..d0e4c067e3b0 100644 --- a/libs/core/executors/CMakeLists.txt +++ b/libs/core/executors/CMakeLists.txt @@ -32,6 +32,8 @@ set(executors_headers hpx/executors/macros.hpp hpx/executors/parallel_executor_aggregated.hpp hpx/executors/parallel_executor.hpp + hpx/executors/parallel_scheduler.hpp + hpx/executors/parallel_scheduler_backend.hpp hpx/executors/post.hpp hpx/executors/restricted_thread_pool_executor.hpp hpx/executors/scheduler_executor.hpp @@ -93,8 +95,9 @@ if(HPX_WITH_DATAPAR) endif() # cmake-format: on -set(executors_sources current_executor.cpp exception_list_callbacks.cpp - fork_join_executor.cpp service_executors.cpp +set(executors_sources + current_executor.cpp exception_list_callbacks.cpp fork_join_executor.cpp + parallel_scheduler.cpp service_executors.cpp ) include(HPX_AddModule) diff --git a/libs/core/executors/include/hpx/executors/parallel_scheduler.hpp b/libs/core/executors/include/hpx/executors/parallel_scheduler.hpp new file mode 100644 index 000000000000..573041e3959b --- /dev/null +++ b/libs/core/executors/include/hpx/executors/parallel_scheduler.hpp @@ -0,0 +1,876 @@ +// Copyright (c) 2025 Sai Charan Arvapally +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace hpx::execution::experimental { + + // Forward declaration for parallel_scheduler_domain + HPX_CXX_CORE_EXPORT class parallel_scheduler; + + HPX_CXX_CORE_EXPORT HPX_CORE_EXPORT parallel_scheduler + get_parallel_scheduler(); + + // Virtual bulk dispatch infrastructure for P2079R10. + // + // transform_sender must return a single concrete type, but we + // need two execution paths: + // - Fast path (default HPX backend): thread_pool_bulk_sender + // with work-stealing, NUMA awareness, etc. + // - Virtual path (custom backends): routes through + // backend->schedule_bulk_chunked/unchunked(). + // + // Solution: type-erase the operation state behind a virtual + // base class. Cost: one heap allocation per bulk operation. + // For bulk work processing thousands of elements, this is + // negligible. + namespace detail { + + // Virtual base for type-erased bulk operation states. + HPX_CXX_CORE_EXPORT struct base_parallel_bulk_op + { + virtual ~base_parallel_bulk_op() = default; + virtual void start() noexcept = 0; + }; + + // Fast path: wraps thread_pool_bulk_sender's connected + // operation state. Zero overhead beyond the heap allocation. + HPX_CXX_CORE_EXPORT template + struct fast_parallel_bulk_op final : base_parallel_bulk_op + { + using inner_op_t = + hpx::execution::experimental::connect_result_t; + + inner_op_t inner_; + + fast_parallel_bulk_op(FastSender&& s, Receiver&& r) + : inner_(hpx::execution::experimental::connect( + HPX_MOVE(s), HPX_MOVE(r))) + { + } + + void start() noexcept override + { + hpx::execution::experimental::start(inner_); + } + }; + + // Virtual dispatch path: connects child sender to an internal + // receiver. When the child completes with values, constructs a + // concrete_proxy in inline aligned storage (no heap allocation) and + // calls backend->schedule_bulk_chunked() or schedule_bulk_unchunked(). + HPX_CXX_CORE_EXPORT template + struct virtual_parallel_bulk_op final : base_parallel_bulk_op + { + std::shared_ptr backend_; + std::size_t + count_; // Count passed to backend (1 for seq, shape for par) + std::size_t + actual_shape_; // P3804R2: Actual shape for proxy execution + F f_; + std::decay_t receiver_; + + // Pre-allocated storage passed to the backend as scratch space. + alignas(parallel_scheduler_storage_alignment) + std::byte storage_[parallel_scheduler_storage_size]; + + // ---- Nested concrete proxy template ------------------------- + // Lifted out of do_bulk() so that sizeof/alignof are computable + // for the inline storage below. Ts... are the decayed value types + // types forwarded by the child sender. + template + struct concrete_proxy final + : parallel_scheduler_bulk_item_receiver_proxy + { + virtual_parallel_bulk_op& op_; + std::tuple values_; + + // Takes values by value so both lvalue and rvalue arguments + // from the child sender are handled uniformly. + concrete_proxy(virtual_parallel_bulk_op& o, Ts... ts) + : op_(o) + , values_(HPX_MOVE(ts)...) + { + } + + void execute( + std::size_t begin, std::size_t end) noexcept override + { + if constexpr (IsChunked) + { + if constexpr (IsParallel) + { + std::apply( + [&](auto&... vals) { + op_.f_(begin, end, vals...); + }, + values_); + } + else + { + // P3804R2: seq policy -> f(0, shape, args...) + std::apply( + [&](auto&... vals) { + op_.f_(0, op_.actual_shape_, vals...); + }, + values_); + } + } + else + { + if constexpr (IsParallel) + { + for (std::size_t i = begin; i < end; ++i) + { + std::apply( + [&](auto&... vals) { op_.f_(i, vals...); }, + values_); + } + } + else + { + // P3804R2: seq -> for(i=0; i>; + + // mk_decayed_tuple = std::tuple,...> + template + using mk_decayed_tuple = std::tuple...>; + + // std::variant...>> for each value sig + using value_variant_t = value_types_of_t; + + static_assert(std::variant_size_v == 1, + "virtual_parallel_bulk_op: child sender must have exactly " + "one value completion signature"); + + // std::tuple, decay_t, ...> + using value_tuple_t = + std::variant_alternative_t<0, value_variant_t>; + + // concrete_proxy from std::tuple + template + struct proxy_for_tuple; + template + struct proxy_for_tuple> + { + using type = concrete_proxy; + }; + using proxy_t = typename proxy_for_tuple::type; + + // ---- Inline proxy storage ------------------------------------ + // Eliminates the second heap allocation that make_unique + // would require. Valid from do_bulk() until the first completion + // signal is delivered, after which the operation state is + // released and this destructor runs. + alignas(proxy_t) std::byte proxy_buf_[sizeof(proxy_t)]; + bool proxy_active_ = false; + + proxy_t& active_proxy() noexcept + { + return *std::launder(reinterpret_cast(proxy_buf_)); + } + + // ---- Child receiver ----------------------------------------- + struct child_receiver + { + using receiver_concept = + hpx::execution::experimental::receiver_t; + virtual_parallel_bulk_op* self_; + + template + void set_value(Vs&&... vs) & noexcept + { + self_->do_bulk(HPX_FORWARD(Vs, vs)...); + } + + template + void set_value(Vs&&... vs) && noexcept + { + static_cast(*this).set_value( + HPX_FORWARD(Vs, vs)...); + } + + void set_error(std::exception_ptr ep) & noexcept + { + hpx::execution::experimental::set_error( + HPX_MOVE(self_->receiver_), HPX_MOVE(ep)); + } + + void set_error(std::exception_ptr ep) && noexcept + { + static_cast(*this).set_error(HPX_MOVE(ep)); + } + + void set_stopped() & noexcept + { + hpx::execution::experimental::set_stopped( + HPX_MOVE(self_->receiver_)); + } + + void set_stopped() && noexcept + { + static_cast(*this).set_stopped(); + } + + auto get_env() const noexcept + { + return hpx::execution::experimental::get_env( + self_->receiver_); + } + }; + + // Connected child sender's operation state. + hpx::execution::experimental::connect_result_t + child_op_; + + virtual_parallel_bulk_op( + std::shared_ptr b, + std::size_t count, std::size_t shape, F f, ChildSender&& child, + Receiver&& rcvr) + : backend_(HPX_MOVE(b)) + , count_(count) + , actual_shape_(shape) + , f_(HPX_MOVE(f)) + , receiver_(HPX_FORWARD(Receiver, rcvr)) + , child_op_(hpx::execution::experimental::connect( + HPX_FORWARD(ChildSender, child), child_receiver{this})) + { + } + + ~virtual_parallel_bulk_op() + { + if (proxy_active_) + active_proxy().~proxy_t(); + } + + void start() noexcept override + { + hpx::execution::experimental::start(child_op_); + } + + // Called by child_receiver::set_value when the child sender + // completes. Constructs the proxy via placement new into the + // inline buffer (no heap allocation) then dispatches to the + // backend. + template + void do_bulk(Vs&&... vs) noexcept + { + hpx::detail::try_catch_exception_ptr( + [&]() { + new (proxy_buf_) proxy_t(*this, HPX_FORWARD(Vs, vs)...); + proxy_active_ = true; + + std::span span(storage_); + if constexpr (IsChunked) + { + backend_->schedule_bulk_chunked( + count_, active_proxy(), span); + } + else + { + backend_->schedule_bulk_unchunked( + count_, active_proxy(), span); + } + }, + [&](std::exception_ptr ep) { + hpx::execution::experimental::set_error( + HPX_MOVE(receiver_), HPX_MOVE(ep)); + }); + } + }; + + // Unified sender returned by parallel_scheduler_domain's + // transform_sender. Holds either the fast-path + // thread_pool_bulk_sender or virtual dispatch data. + HPX_CXX_CORE_EXPORT template + struct parallel_bulk_dispatch_sender + { + using sender_concept = sender_t; + + struct fast_path_data + { + FastSender sender_; + }; + + struct virtual_path_data + { + std::shared_ptr backend_; + std::size_t count_; // P3804R2: 1 for seq, shape for par + std::size_t actual_shape_; // P3804R2: Actual shape value + F f_; + ChildSender child_; + }; + + std::variant data_; + + template + static consteval auto get_completion_signatures() noexcept + -> decltype(hpx::execution::experimental:: + transform_completion_signatures( + hpx::execution::experimental:: + completion_signatures_of_t{}, + hpx::execution::experimental::keep_completion< + hpx::execution::experimental::set_value_t>{}, + hpx::execution::experimental::keep_completion< + hpx::execution::experimental::set_error_t>{}, + hpx::execution::experimental::keep_completion< + hpx::execution::experimental::set_stopped_t>{}, + hpx::execution::experimental::completion_signatures< + hpx::execution::experimental::set_error_t( + std::exception_ptr)>{})) + { + return {}; + } + + // Unified operation state: holds type-erased op via + // unique_ptr. + template + struct dispatch_op + { + std::unique_ptr impl_; + + explicit dispatch_op(std::unique_ptr p) + : impl_(HPX_MOVE(p)) + { + } + + dispatch_op(dispatch_op&&) = delete; + dispatch_op(dispatch_op const&) = delete; + dispatch_op& operator=(dispatch_op&&) = delete; + dispatch_op& operator=(dispatch_op const&) = delete; + + void start() noexcept + { + impl_->start(); + } + }; + + // connect: creates the right op state behind the + // type-erased pointer. + template + friend dispatch_op> tag_invoke( + hpx::execution::experimental::connect_t, + parallel_bulk_dispatch_sender&& self, Receiver&& rcvr) + { + if (auto* fast = std::get_if(&self.data_)) + { + return dispatch_op>{ + std::make_unique>>(HPX_MOVE(fast->sender_), + HPX_FORWARD(Receiver, rcvr))}; + } + else + { + auto& vp = std::get(self.data_); + return dispatch_op>{ + std::make_unique>>( + HPX_MOVE(vp.backend_), vp.count_, vp.actual_shape_, + HPX_MOVE(vp.f_), HPX_MOVE(vp.child_), + HPX_FORWARD(Receiver, rcvr))}; + } + } + }; + + } // namespace detail + + // P2079R10: Domain for parallel_scheduler bulk operations. + // The existing thread_pool_domain checks __completes_on with + // thread_pool_policy_scheduler, but parallel_scheduler's sender + // returns parallel_scheduler as the completion scheduler. + // This domain bridges the gap by extracting the underlying + // thread_pool_policy_scheduler and delegating to HPX's optimized + // thread_pool_bulk_sender. + HPX_CXX_CORE_EXPORT struct parallel_scheduler_domain + : hpx::execution::experimental::detail::sync_wait_domain + { + template + auto transform_sender(hpx::execution::experimental::set_value_t, + Sender&& sndr, Env const& /*env*/) const + { + if constexpr (hpx::execution::experimental::stdexec_internal:: + __completes_on) + { + // Extract bulk parameters using structured binding + auto&& [tag, data, child] = sndr; + auto&& [pol, shape, f] = data; + + // Get the parallel_scheduler from the bulk sender's env. + // The outer if constexpr(__completes_on) guarantees this query succeeds, + // using the same env_of_t that __completes_on checks. + auto par_sched = + hpx::execution::experimental::get_completion_scheduler< + hpx::execution::experimental::set_value_t>( + hpx::execution::experimental::get_env(sndr)); + + // Extract the underlying thread pool scheduler from the + // backend. For the default HPX backend this returns the + // concrete thread_pool_policy_scheduler; for custom backends + // it returns nullptr (bulk goes through virtual dispatch). + auto const* underlying_ptr = + par_sched.get_underlying_scheduler(); + auto const* pu_mask_ptr = par_sched.get_pu_mask(); + + // Only bulk_chunked_t uses the chunked f(begin, end, ...) + // signature. Both bulk_t (P3481R5 high-level) and + // bulk_unchunked_t use the unchunked f(index, ...) signature + // that HPX's bulk users pass. Treating bulk_t as chunked here + // would force f(begin, end, ...) on user lambdas that take a + // single index, causing a template instantiation failure. + constexpr bool is_chunked = + sender_invokes_algorithm_v; + + // Determine parallelism at compile time from policy type + // (pol is a __policy_wrapper, use __get() to unwrap) + constexpr bool is_parallel = + !is_sequenced_policy_v>; + + constexpr bool is_unsequenced = is_unsequenced_bulk_policy_v< + std::decay_t>; + + auto iota_shape = + hpx::util::counting_shape(decltype(shape){0}, shape); + + // Compute the fast-path sender type (needed even on the + // virtual path so both branches return the same type). + using fast_sender_t = hpx::execution::experimental::detail:: + thread_pool_bulk_sender, + std::decay_t, + std::decay_t, is_chunked, is_parallel, + is_unsequenced>; + + using dispatch_sender_t = + detail::parallel_bulk_dispatch_sender, + std::decay_t, is_chunked, is_parallel>; + + // Fast path: default HPX backend with underlying scheduler + // available. Create optimized thread_pool_bulk_sender + // with work-stealing, NUMA awareness, etc. Use the same + // processing-unit mask as thread_pool_domain (pool-derived) + // rather than the backend's cached full_mask so mask and + // worker-thread cardinality stay aligned (fixes P2079 / small + // --hpx:threads counts). + if (underlying_ptr != nullptr) + { + auto underlying = *underlying_ptr; + hpx::threads::mask_type pu_mask = *pu_mask_ptr; + + auto fast_sender = fast_sender_t(HPX_MOVE(underlying), + HPX_FORWARD(decltype(child), child), + HPX_MOVE(iota_shape), HPX_FORWARD(decltype(f), f), + HPX_MOVE(pu_mask)); + + return dispatch_sender_t{ + typename dispatch_sender_t::fast_path_data{ + HPX_MOVE(fast_sender)}}; + } + + // Virtual dispatch path: custom backend without an + // underlying thread_pool_policy_scheduler. Routes + // through backend->schedule_bulk_chunked/unchunked(). + // + // P3804R2: Pass (is_parallel ? shape : 1) to backend. + // When seq policy, backend receives count=1 and proxy + // will execute all work in a single call: + // - chunked: proxy.execute(0, shape) -> f(0, shape, args...) + // - unchunked: proxy.execute(0, shape) -> + // for(i=0; i(is_parallel ? shape : 1), + static_cast(shape), + HPX_FORWARD(decltype(f), f), + HPX_FORWARD(decltype(child), child)}}; + } + else + { + // P2079R10: bulk operations require the parallel_scheduler + // in the environment. Add a continues_on transition to the + // parallel_scheduler before the bulk algorithm. + static_assert( + hpx::execution::experimental::stdexec_internal:: + __completes_on, + "Cannot dispatch bulk algorithm to the parallel_scheduler: " + "no parallel_scheduler found in the environment. " + "Add a continues_on transition to the parallel_scheduler " + "before the bulk algorithm."); + } + } + }; + + // P2079R10 parallel_scheduler implementation. + // Stores a shared_ptr for replaceability. + // The default backend wraps HPX's thread_pool_policy_scheduler. + HPX_CXX_CORE_EXPORT class parallel_scheduler + { + public: + parallel_scheduler() = delete; + + parallel_scheduler(parallel_scheduler const& other) noexcept = default; + parallel_scheduler(parallel_scheduler&& other) noexcept = default; + parallel_scheduler& operator=( + parallel_scheduler const&) noexcept = default; + parallel_scheduler& operator=(parallel_scheduler&&) noexcept = default; + + // P2079R10 6.4: two schedulers compare equal iff BACKEND-OF(lhs) + // and BACKEND-OF(rhs) refer to the same object, i.e., their + // shared_ptr targets are identical. Pointer equality is the only + // comparison mandated by the standard; equal_to() on the backend + // interface is an HPX-specific extension that custom backends may + // implement for their own purposes but is not used here. + friend bool operator==(parallel_scheduler const& lhs, + parallel_scheduler const& rhs) noexcept + { + return lhs.backend_.get() == rhs.backend_.get(); + } + + friend bool operator!=(parallel_scheduler const& lhs, + parallel_scheduler const& rhs) noexcept + { + return !(lhs == rhs); + } + + // P2079R10: query() member for forward progress guarantee + // (modern stdexec pattern, preferred over tag_invoke) + constexpr forward_progress_guarantee query( + get_forward_progress_guarantee_t) const noexcept + { + return forward_progress_guarantee::parallel; + } + + // Scheduling properties: forward to the wrapped thread_pool_policy_scheduler + // when present so callers use get_processing_units_mask(sched), + // get_first_core(sched), processing_units_count(..., sched), etc., + // consistent with thread_pool_policy_scheduler. + friend std::size_t tag_invoke( + get_first_core_t, parallel_scheduler const& sched) noexcept + { + if (auto const* u = sched.get_underlying_scheduler()) + return get_first_core(*u); + return 0; + } + + template + friend std::size_t tag_invoke(processing_units_count_t, Parameters&&, + parallel_scheduler const& sched, + hpx::chrono::steady_duration const& = hpx::chrono::null_duration, + std::size_t = 0) + { + if (auto const* u = sched.get_underlying_scheduler()) + return processing_units_count( + null_parameters, *u, hpx::chrono::null_duration, 0); + return 1; + } + + friend auto tag_invoke( + get_processing_units_mask_t, parallel_scheduler const& sched) + { + if (auto const* cached = sched.get_pu_mask()) + return *cached; + if (auto const* u = sched.get_underlying_scheduler()) + return get_processing_units_mask(*u); + return hpx::threads::create_topology().get_machine_affinity_mask(); + } + + friend auto tag_invoke( + get_cores_mask_t, parallel_scheduler const& sched) + { + if (auto const* u = sched.get_underlying_scheduler()) + return get_cores_mask(*u); + return hpx::threads::create_topology().get_machine_affinity_mask(); + } + + // P2079R10: operation_state owns the receiver and manages the + // frontend/backend boundary. On start(), it checks the stop token + // and then delegates to the backend. + template + struct operation_state + { + // Concrete receiver_proxy that adapts the actual Receiver + // to the type-erased proxy interface. + struct concrete_receiver_proxy final + : parallel_scheduler_receiver_proxy + { + std::decay_t& receiver_; + + explicit concrete_receiver_proxy( + std::decay_t& rcvr) noexcept + : receiver_(rcvr) + { + } + + void set_value() noexcept override + { + hpx::execution::experimental::set_value( + HPX_MOVE(receiver_)); + } + + void set_error(std::exception_ptr ep) noexcept override + { + hpx::execution::experimental::set_error( + HPX_MOVE(receiver_), HPX_MOVE(ep)); + } + + void set_stopped() noexcept override + { + hpx::execution::experimental::set_stopped( + HPX_MOVE(receiver_)); + } + + // P2079R10 4.2: allow backends to poll for cancellation. + // Forwards the stop token state of the actual receiver. + bool stop_requested() const noexcept override + { + return get_stop_token(get_env(receiver_)).stop_requested(); + } + }; + + HPX_NO_UNIQUE_ADDRESS std::decay_t receiver_; + std::shared_ptr backend_; + // The proxy must be a member (not a local) because the + // backend's schedule() posts work asynchronously. The + // operation_state outlives the completion per the + // sender/receiver protocol. + concrete_receiver_proxy proxy_; + + // P2079R10 4.2: pre-allocated storage for the backend. + alignas(parallel_scheduler_storage_alignment) + std::byte storage_[parallel_scheduler_storage_size]; + + template + operation_state(Receiver_&& receiver, + std::shared_ptr backend) + : receiver_(HPX_FORWARD(Receiver_, receiver)) + , backend_(HPX_MOVE(backend)) + , proxy_(receiver_) + { + } + + operation_state(operation_state&&) = delete; + operation_state(operation_state const&) = delete; + operation_state& operator=(operation_state&&) = delete; + operation_state& operator=(operation_state const&) = delete; + + void start() noexcept + { + // P2079R10 4.1: if stop_token is stopped, complete + // with set_stopped as soon as is practical. + auto stop_token = get_stop_token(get_env(receiver_)); + if (stop_token.stop_requested()) + { + set_stopped(HPX_MOVE(receiver_)); + return; + } + + // Delegate to the backend via the member proxy, + // passing pre-allocated storage per P2079R10 / P3927R2. + backend_->schedule(proxy_, std::span(storage_)); + } + }; + + // Nested sender type + template + struct sender + { + Scheduler sched_; + + using sender_concept = sender_t; + using completion_signatures = + ::hpx::execution::experimental::completion_signatures< + set_value_t(), set_error_t(std::exception_ptr), + set_stopped_t()>; + + template + friend operation_state> tag_invoke( + connect_t, sender const& s, Receiver&& receiver) noexcept(std:: + is_nothrow_constructible_v, + Receiver>) + { + return { + HPX_FORWARD(Receiver, receiver), s.sched_.get_backend()}; + } + + template + friend operation_state> + tag_invoke(connect_t, sender&& s, Receiver&& receiver) noexcept( + std::is_nothrow_constructible_v, + Receiver>) + { + return { + HPX_FORWARD(Receiver, receiver), s.sched_.get_backend()}; + } + + struct env + { + Scheduler const& sched_; + + // P2079R10: expose completion scheduler for set_value_t + // and set_stopped_t + auto query( + get_completion_scheduler_t) const noexcept + { + return sched_; + } + + auto query( + get_completion_scheduler_t) const noexcept + { + return sched_; + } + + // Domain query + parallel_scheduler_domain query(get_domain_t) const noexcept + { + return {}; + } + }; + + env get_env() const noexcept + { + return {sched_}; + } + }; + + // Direct schedule() member for modern stdexec + sender schedule() const noexcept + { + return {*this}; + } + + // Domain customization for bulk operations + parallel_scheduler_domain query(get_domain_t) const noexcept + { + return {}; + } + + // Required for stdexec domain resolution: when a bulk sender's + // completing domain is resolved, stdexec queries the completion + // scheduler with get_completion_domain_t. Without + // this, the resolution falls to default_domain and our + // parallel_scheduler_domain::transform_sender is never called. + parallel_scheduler_domain query( + get_completion_domain_t) const noexcept + { + return {}; + } + + // Access the backend (for connect and domain transform). + std::shared_ptr const& get_backend() + const noexcept + { + return backend_; + } + + // HPX-specific: access the underlying thread pool scheduler + // from the backend (returns nullptr for custom backends). + thread_pool_policy_scheduler const* + get_underlying_scheduler() const noexcept + { + return backend_ ? backend_->get_underlying_scheduler() : nullptr; + } + + // HPX-specific: access the cached PU mask from the backend + // (returns nullptr for custom backends). + hpx::threads::mask_type const* get_pu_mask() const noexcept + { + return backend_ ? backend_->get_pu_mask() : nullptr; + } + + private: + // P2079R10: Construct from a backend shared_ptr. Private; only + // get_parallel_scheduler() (and copy/move) may produce instances. + explicit parallel_scheduler( + std::shared_ptr backend) noexcept + : backend_(HPX_MOVE(backend)) + { + } + + friend parallel_scheduler get_parallel_scheduler(); + + std::shared_ptr backend_; + }; + + // Stream output operator for parallel_scheduler + HPX_CXX_CORE_EXPORT HPX_CORE_EXPORT std::ostream& operator<<( + std::ostream& os, parallel_scheduler const&); + +} // namespace hpx::execution::experimental diff --git a/libs/core/executors/include/hpx/executors/parallel_scheduler_backend.hpp b/libs/core/executors/include/hpx/executors/parallel_scheduler_backend.hpp new file mode 100644 index 000000000000..3c65382fc473 --- /dev/null +++ b/libs/core/executors/include/hpx/executors/parallel_scheduler_backend.hpp @@ -0,0 +1,169 @@ +// Copyright (c) 2025 Sai Charan Arvapally +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +namespace hpx::execution::experimental { + + // P2079R10: Abstract backend interface for parallel_scheduler. + // This mirrors stdexec's system_context_replaceability::parallel_scheduler_backend + // but is expressed as a simple abstract class rather than using stdexec's __any + // type-erasure machinery. + // + // The backend is responsible for: + // - schedule(): post a unit of work to the execution context + // - schedule_bulk_chunked(): post chunked bulk work + // - schedule_bulk_unchunked(): post unchunked bulk work + // + // The receiver_proxy / bulk_item_receiver_proxy interfaces allow the backend + // to complete operations without knowing the concrete receiver type. + + // P2079R10 / P3804R2 receiver_proxy: type-erased completion interface. + // The backend calls these to signal completion back to the frontend. + // stop_requested() allows the backend to poll for cancellation during + // execution (partial substitute for try_query). + // + // P3804R2: No virtual destructor - objects are never destroyed polymorphically. + // The frontend knows the concrete type and destroys it directly. + HPX_CXX_CORE_EXPORT struct parallel_scheduler_receiver_proxy + { + virtual void set_value() noexcept = 0; + virtual void set_error(std::exception_ptr) noexcept = 0; + virtual void set_stopped() noexcept = 0; + // P2079R10 4.2 / P3804R2: backends can poll this to check if work should stop. + // Returns true if the associated stop token has been signalled. + // const-qualified per P3804R2 (aligns with try_query being const). + virtual bool stop_requested() const noexcept + { + return false; + } + + protected: + // P3804R2: Protected non-virtual destructor. + // Prevents polymorphic deletion while allowing derived classes to clean up. + ~parallel_scheduler_receiver_proxy() = default; + }; + + // P2079R10 bulk_item_receiver_proxy: extends receiver_proxy with + // execute(begin, end) for bulk work items. + HPX_CXX_CORE_EXPORT struct parallel_scheduler_bulk_item_receiver_proxy + : parallel_scheduler_receiver_proxy + { + virtual void execute(std::size_t begin, std::size_t end) noexcept = 0; + }; + + // P2079R10 4.2: Pre-allocated storage for backend operation states. + // The frontend provides a std::span of this size to each + // backend method so the backend can avoid heap allocation. + // Backends that need more can fall back to their own allocation. + HPX_CXX_CORE_EXPORT inline constexpr std::size_t + parallel_scheduler_storage_size = 256; + HPX_CXX_CORE_EXPORT inline constexpr std::size_t + parallel_scheduler_storage_alignment = alignof(std::max_align_t); + + // P2079R10 / P3927R2: Abstract backend interface + HPX_CXX_CORE_EXPORT struct parallel_scheduler_backend + { + virtual ~parallel_scheduler_backend() = default; + + // Schedule a single unit of work. On completion, call proxy.set_value(). + // storage: pre-allocated scratch space from the frontend's + // operation_state (parallel_scheduler_storage_size bytes). + // P3927R2: parameter order is (receiver, storage) + virtual void schedule(parallel_scheduler_receiver_proxy& proxy, + std::span storage) noexcept = 0; + + // Schedule chunked bulk work of size count. + // The backend partitions [0, count) into subranges and calls + // proxy.execute(begin, end) for each subrange, then proxy.set_value(). + // P3927R2: parameter order is (shape, receiver, storage) + virtual void schedule_bulk_chunked(std::size_t count, + parallel_scheduler_bulk_item_receiver_proxy& proxy, + std::span storage) noexcept = 0; + + // Schedule unchunked bulk work of size count. + // The backend calls proxy.execute(i, i+1) for each i in [0, count), + // then proxy.set_value(). + // P3927R2: parameter order is (shape, receiver, storage) + virtual void schedule_bulk_unchunked(std::size_t count, + parallel_scheduler_bulk_item_receiver_proxy& proxy, + std::span storage) noexcept = 0; + + // custom equality for backends. + // P2079R10 section 6.4 defines parallel_scheduler equality purely by + // shared_ptr target identity (pointer equality), so this method is + // NOT called by parallel_scheduler::operator==. + // Custom backends may implement it for their own comparisons. + virtual bool equal_to( + parallel_scheduler_backend const& other) const noexcept = 0; + + // Access the underlying thread pool scheduler (HPX-specific). + // Returns nullptr if this backend doesn't wrap a thread_pool_policy_scheduler. + // Used by parallel_scheduler_domain::transform_sender to create + // optimized thread_pool_bulk_sender directly (bypassing virtual dispatch + // for bulk operations when the default HPX backend is in use). + virtual thread_pool_policy_scheduler const* + get_underlying_scheduler() const noexcept + { + return nullptr; + } + + // Access the cached PU mask (HPX-specific). + // Returns nullptr if unavailable. + virtual hpx::threads::mask_type const* get_pu_mask() const noexcept + { + return nullptr; + } + }; + + // P2079R10: Function pointer factory type for replacing the default + // backend. Using a function pointer avoids platform-specific weak-linking + // issues while still providing P2079R10 replaceability semantics. + HPX_CXX_CORE_EXPORT using parallel_scheduler_backend_factory_t = + std::shared_ptr (*)(); + + // P2079R10: Get the current parallel_scheduler_backend. + // Thread-safe. Creates the default backend on first call via the factory. + // Can be replaced at any time via set_parallel_scheduler_backend(). + HPX_CXX_CORE_EXPORT HPX_CORE_EXPORT + std::shared_ptr + query_parallel_scheduler_backend(); + + // P2079R10: Replace the parallel scheduler backend factory. + // The new factory is used the next time query_parallel_scheduler_backend() + // creates a backend (only if no backend has been created yet, or after + // set_parallel_scheduler_backend() clears the current one). + HPX_CXX_CORE_EXPORT HPX_CORE_EXPORT parallel_scheduler_backend_factory_t + set_parallel_scheduler_backend_factory( + parallel_scheduler_backend_factory_t new_factory) noexcept; + + // P2079R10: Directly replace the active backend. + // Takes effect immediately: the next get_parallel_scheduler() call + // returns a scheduler backed by new_backend. + // Thread-safe, but must not be called while active operations are + // in-flight on the current backend. + HPX_CXX_CORE_EXPORT HPX_CORE_EXPORT void set_parallel_scheduler_backend( + std::shared_ptr new_backend); + +} // namespace hpx::execution::experimental diff --git a/libs/core/executors/include/hpx/executors/scheduler_executor.hpp b/libs/core/executors/include/hpx/executors/scheduler_executor.hpp index 7d72ae5bde59..9c5af2535c18 100644 --- a/libs/core/executors/include/hpx/executors/scheduler_executor.hpp +++ b/libs/core/executors/include/hpx/executors/scheduler_executor.hpp @@ -18,6 +18,9 @@ #include #include +#include +#include + #include #include #include @@ -27,6 +30,134 @@ namespace hpx::execution::experimental { + namespace detail { + + // Trait to detect schedulers that expose a thread pool backend, + // enabling direct dispatch via index_queue_bulk_sync_execute + // instead of the slower sender/receiver pipeline. + template + struct has_thread_pool_backend : std::false_type + { + }; + + template + struct has_thread_pool_backend> + : std::true_type + { + }; + + // parallel_scheduler wraps thread_pool_policy_scheduler; use the same + // index_queue fast path with thread_pool_params + // so pu_mask() can return the cached mask from get_pu_mask(). + template <> + struct has_thread_pool_backend : std::true_type + { + }; + + // Helper to extract thread pool parameters from a scheduler + template + struct thread_pool_params; // primary: not defined + + template <> + struct thread_pool_params + { + static auto* pool(parallel_scheduler const& sched) + { + return sched.get_underlying_scheduler()->get_thread_pool(); + } + static std::size_t first_core(parallel_scheduler const& sched) + { + return hpx::execution::experimental::get_first_core(sched); + } + static std::size_t num_cores(parallel_scheduler const& sched) + { + return hpx::execution::experimental::processing_units_count( + hpx::execution::experimental::null_parameters, sched, + hpx::chrono::null_duration, 0); + } + static auto const& policy(parallel_scheduler const& sched) + { + return sched.get_underlying_scheduler()->policy(); + } + }; + + template + struct thread_pool_params> + { + static auto* pool(thread_pool_policy_scheduler const& sched) + { + return sched.get_thread_pool(); + } + static std::size_t first_core( + thread_pool_policy_scheduler const& sched) + { + return hpx::execution::experimental::get_first_core(sched); + } + static std::size_t num_cores( + thread_pool_policy_scheduler const& sched) + { + return hpx::execution::experimental::processing_units_count( + hpx::execution::experimental::null_parameters, sched, + hpx::chrono::null_duration, 0); + } + static Policy const& policy( + thread_pool_policy_scheduler const& sched) + { + return sched.policy(); + } + }; + + // Bundle pool / affinity parameters for index_queue_bulk_* fast paths. + template + struct thread_pool_bulk_dispatch_data + { + using PT = thread_pool_params>; + + decltype(PT::pool(std::declval())) pool; + std::size_t first_core; + std::size_t num_cores; + decltype(PT::policy(std::declval())) policy; + decltype(hpx::execution::experimental::get_processing_units_mask( + std::declval())) mask; + }; + + template + HPX_FORCEINLINE thread_pool_bulk_dispatch_data> + make_thread_pool_bulk_dispatch_data(Scheduler const& sched) + { + using PT = thread_pool_params>; + return { + PT::pool(sched), + PT::first_core(sched), + PT::num_cores(sched), + PT::policy(sched), + hpx::execution::experimental::get_processing_units_mask(sched), + }; + } + + template + HPX_FORCEINLINE decltype(auto) scheduler_bulk_async_via_thread_pool( + Scheduler const& sched, F&& f, S const& shape, Ts&&... ts) + { + auto const env = make_thread_pool_bulk_dispatch_data(sched); + return hpx::parallel::execution::detail:: + index_queue_bulk_async_execute(env.pool, env.first_core, + env.num_cores, env.policy, HPX_FORWARD(F, f), shape, + env.mask, HPX_FORWARD(Ts, ts)...); + } + + template + HPX_FORCEINLINE decltype(auto) scheduler_bulk_sync_via_thread_pool( + Scheduler const& sched, F&& f, S const& shape, Ts&&... ts) + { + auto const env = make_thread_pool_bulk_dispatch_data(sched); + return hpx::parallel::execution::detail:: + index_queue_bulk_sync_execute(env.pool, env.first_core, + env.num_cores, env.policy, HPX_FORWARD(F, f), shape, + env.mask, HPX_FORWARD(Ts, ts)...); + } + } // namespace detail + namespace detail { HPX_CXX_CORE_EXPORT template @@ -183,14 +314,52 @@ namespace hpx::execution::experimental { if constexpr (std::is_void_v) { - return make_future(bulk(schedule(exec.sched_), n, - [shape, - bound_f = hpx::bind_back(HPX_FORWARD(F, f), - HPX_FORWARD(Ts, ts)...)](size_type i) mutable { - auto it = std::ranges::begin(shape); - std::ranges::advance(it, i); - HPX_INVOKE(bound_f, *it); - })); + if constexpr (detail::has_thread_pool_backend< + std::decay_t>::value) + { + return detail::scheduler_bulk_async_via_thread_pool( + exec.sched_, HPX_FORWARD(F, f), shape, + HPX_FORWARD(Ts, ts)...); + } + else if constexpr (requires { + exec.sched_.get_underlying_scheduler(); + }) + { + using underlying_type = std::decay_t< + decltype(exec.sched_.get_underlying_scheduler())>; + if constexpr (detail::has_thread_pool_backend< + underlying_type>::value) + { + auto const& underlying = + exec.sched_.get_underlying_scheduler(); + return detail::scheduler_bulk_async_via_thread_pool( + underlying, HPX_FORWARD(F, f), shape, + HPX_FORWARD(Ts, ts)...); + } + else + { + return make_future(bulk(schedule(exec.sched_), n, + [shape, + bound_f = hpx::bind_back( + HPX_FORWARD(F, f), HPX_FORWARD(Ts, ts)...)]( + size_type i) mutable { + auto it = std::ranges::begin(shape); + std::ranges::advance(it, i); + HPX_INVOKE(bound_f, *it); + })); + } + } + else + { + return make_future(bulk(schedule(exec.sched_), n, + [shape, + bound_f = hpx::bind_back(HPX_FORWARD(F, f), + HPX_FORWARD(Ts, ts)...)](size_type i) mutable { + auto it = std::ranges::begin(shape); + std::ranges::advance(it, i); + HPX_INVOKE(bound_f, *it); + })); + } } else { @@ -248,21 +417,64 @@ namespace hpx::execution::experimental { using result_type = hpx::util::detail::invoke_deferred_result_t; - // hpx::execution::experimental::bulk requires integral shape using size_type = decltype(std::ranges::size(shape)); size_type const n = std::ranges::size(shape); - return hpx::util::void_guard(), - // NOLINTNEXTLINE(bugprone-unchecked-optional-access) - *hpx::this_thread::experimental::sync_wait(bulk( - schedule(exec.sched_), n, - [shape, - bound_f = hpx::bind_back(HPX_FORWARD(F, f), - HPX_FORWARD(Ts, ts)...)](size_type i) mutable { - auto it = std::ranges::begin(shape); - std::ranges::advance(it, i); - HPX_INVOKE(bound_f, *it); - })); + if constexpr (detail::has_thread_pool_backend< + std::decay_t>::value) + { + return hpx::util::void_guard(), + detail::scheduler_bulk_sync_via_thread_pool(exec.sched_, + HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...); + } + else if constexpr (requires { + exec.sched_.get_underlying_scheduler(); + }) + { + using underlying_type = std::decay_t< + decltype(exec.sched_.get_underlying_scheduler())>; + if constexpr (detail::has_thread_pool_backend< + underlying_type>::value) + { + auto const& underlying = + exec.sched_.get_underlying_scheduler(); + + return hpx::util::void_guard(), + detail::scheduler_bulk_sync_via_thread_pool( + underlying, HPX_FORWARD(F, f), shape, + HPX_FORWARD(Ts, ts)...); + } + else + { + return hpx::util::void_guard(), + // NOLINTNEXTLINE(bugprone-unchecked-optional-access) + *hpx::this_thread::experimental::sync_wait(bulk( + schedule(exec.sched_), n, + [shape, + bound_f = hpx::bind_back(HPX_FORWARD(F, f), + HPX_FORWARD(Ts, ts)...)]( + size_type i) mutable { + auto it = std::ranges::begin(shape); + std::ranges::advance(it, i); + HPX_INVOKE(bound_f, *it); + })); + } + } + else + { + return hpx::util::void_guard(), + // NOLINTNEXTLINE(bugprone-unchecked-optional-access) + *hpx::this_thread::experimental::sync_wait( + bulk(schedule(exec.sched_), n, + [shape, + bound_f = hpx::bind_back(HPX_FORWARD(F, f), + HPX_FORWARD(Ts, ts)...)]( + size_type i) mutable { + auto it = std::ranges::begin(shape); + std::ranges::advance(it, i); + HPX_INVOKE(bound_f, *it); + })); + } } template @@ -278,7 +490,6 @@ namespace hpx::execution::experimental { if constexpr (std::is_void_v) { - // the overall return value is future auto pre_req = when_all(keep_future(HPX_FORWARD(Future, predecessor))); diff --git a/libs/core/executors/include/hpx/executors/thread_pool_scheduler.hpp b/libs/core/executors/include/hpx/executors/thread_pool_scheduler.hpp index 737c6de2aefa..35907d10ccdf 100644 --- a/libs/core/executors/include/hpx/executors/thread_pool_scheduler.hpp +++ b/libs/core/executors/include/hpx/executors/thread_pool_scheduler.hpp @@ -36,9 +36,9 @@ // Forward declaration namespace hpx::execution::experimental::detail { - HPX_CXX_CORE_EXPORT template + typename Shape, typename F, bool IsChunked, bool IsParallel, + bool IsUnsequenced> class thread_pool_bulk_sender; } // namespace hpx::execution::experimental::detail @@ -74,21 +74,40 @@ namespace hpx::execution::experimental { // Concept to match bulk sender types HPX_CXX_CORE_EXPORT template concept bulk_chunked_or_unchunked_sender = - hpx::execution::experimental::stdexec_internal::__sender_for || - hpx::execution::experimental::stdexec_internal::__sender_for || - hpx::execution::experimental::stdexec_internal::__sender_for; - // Domain customization for stdexec bulk operations and sync_wait. - // Following the stdexec parallel_scheduler pattern (set_value_t tag-based). + HPX_CXX_CORE_EXPORT template + inline constexpr bool is_sequenced_policy_v = false; + + template <> + inline constexpr bool is_sequenced_policy_v = true; + + template <> + inline constexpr bool is_sequenced_policy_v = true; + + HPX_CXX_CORE_EXPORT template + inline constexpr bool is_unsequenced_bulk_policy_v = false; + + template <> + inline constexpr bool is_unsequenced_bulk_policy_v = + true; + + template <> + inline constexpr bool + is_unsequenced_bulk_policy_v = true; + + // Domain customization for stdexec bulk operations and sync_wait, + // with thread-pool parallelism derived from wrapped execution policies. HPX_CXX_CORE_EXPORT template struct thread_pool_domain : hpx::execution::experimental::detail::sync_wait_domain { - // transform_sender for bulk operations - // (following stdexec parallel_scheduler pattern) + // transform_sender for bulk operations (stdexec parallel_scheduler pattern) template requires std::same_as< std::decay_t; + constexpr bool is_chunked = sender_invokes_algorithm_v; + + constexpr bool is_parallel = + !is_sequenced_policy_v>; + + constexpr bool is_unsequenced = is_unsequenced_bulk_policy_v< + std::decay_t>; + + auto pu_mask = + hpx::execution::experimental::get_processing_units_mask(sched); return hpx::execution::experimental::detail:: thread_pool_bulk_sender, - decltype(iota_shape), std::decay_t, - is_chunked>(HPX_MOVE(sched), + std::decay_t, + std::decay_t, is_chunked, is_parallel, + is_unsequenced>{HPX_MOVE(sched), HPX_FORWARD(decltype(child), child), HPX_MOVE(iota_shape), - HPX_FORWARD(decltype(f), f)); + HPX_FORWARD(decltype(f), f), HPX_MOVE(pu_mask)}; } }; HPX_CXX_CORE_EXPORT template struct thread_pool_policy_scheduler { + // Expose the policy type for domain customization + using policy_type = Policy; + // Associate the parallel_execution_tag tag type as a default with this // scheduler, except if the given launch policy is sync. using execution_category = @@ -202,6 +230,12 @@ namespace hpx::execution::experimental { thread_pool_policy_scheduler const& scheduler, Sender&& sender, Shape const& shape, F&& f) { + constexpr bool is_parallel = + !std::is_same_v && + !is_sequenced_policy_v; + constexpr bool is_unsequenced = + is_unsequenced_bulk_policy_v; + if constexpr (std::is_integral_v>) { auto iota_shape = hpx::util::counting_shape(shape); @@ -221,16 +255,16 @@ namespace hpx::execution::experimental { return detail::thread_pool_bulk_sender, decltype(iota_shape), - decltype(wrapped_f), true>{scheduler, - HPX_FORWARD(Sender, sender), iota_shape, + decltype(wrapped_f), true, is_parallel, is_unsequenced>{ + scheduler, HPX_FORWARD(Sender, sender), iota_shape, HPX_MOVE(wrapped_f)}; } else { return detail::thread_pool_bulk_sender, decltype(iota_shape), - std::decay_t, false>{scheduler, - HPX_FORWARD(Sender, sender), iota_shape, + std::decay_t, false, is_parallel, is_unsequenced>{ + scheduler, HPX_FORWARD(Sender, sender), iota_shape, HPX_FORWARD(F, f)}; } } @@ -248,16 +282,17 @@ namespace hpx::execution::experimental { return detail::thread_pool_bulk_sender, std::decay_t, - decltype(wrapped_f), true>{scheduler, - HPX_FORWARD(Sender, sender), shape, + decltype(wrapped_f), true, is_parallel, is_unsequenced>{ + scheduler, HPX_FORWARD(Sender, sender), shape, HPX_MOVE(wrapped_f)}; } else { return detail::thread_pool_bulk_sender, std::decay_t, - std::decay_t, false>{scheduler, - HPX_FORWARD(Sender, sender), shape, HPX_FORWARD(F, f)}; + std::decay_t, false, is_parallel, is_unsequenced>{ + scheduler, HPX_FORWARD(Sender, sender), shape, + HPX_FORWARD(F, f)}; } } } @@ -378,16 +413,25 @@ namespace hpx::execution::experimental { void start() & noexcept { + auto stop_token = hpx::execution::experimental::get_stop_token( + hpx::execution::experimental::get_env(receiver)); + if (stop_token.stop_requested()) + { + hpx::execution::experimental::set_stopped( + HPX_MOVE(receiver)); + return; + } #if defined(HPX_CLANG_VERSION) #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wdeprecated-declarations" #endif hpx::detail::try_catch_exception_ptr( [&]() { - scheduler.execute([this]() mutable { - hpx::execution::experimental::set_value( - HPX_MOVE(receiver)); - }); + scheduler.execute( + [receiver = HPX_MOVE(receiver)]() mutable { + hpx::execution::experimental::set_value( + HPX_MOVE(receiver)); + }); }, [&](std::exception_ptr ep) { hpx::execution::experimental::set_error( diff --git a/libs/core/executors/include/hpx/executors/thread_pool_scheduler_bulk.hpp b/libs/core/executors/include/hpx/executors/thread_pool_scheduler_bulk.hpp index 4cef61960d2c..85158072e53d 100644 --- a/libs/core/executors/include/hpx/executors/thread_pool_scheduler_bulk.hpp +++ b/libs/core/executors/include/hpx/executors/thread_pool_scheduler_bulk.hpp @@ -59,6 +59,40 @@ namespace hpx::execution::experimental::detail { return static_cast(chunk_size); } + // For bulk_chunked: create exactly num_threads large chunks (one per worker). + // Unlike get_bulk_scheduler_chunk_size which creates ~8x more chunks per + // thread for fine-grained work stealing, this variant maximises spatial + // locality and minimises work-stealing queue overhead for the chunked case. + // Work stealing is still attempted but rarely needed for uniform workloads. + HPX_CXX_CORE_EXPORT constexpr std::uint32_t + get_bulk_scheduler_chunk_size_chunked( + std::uint32_t const num_threads, std::size_t const n) noexcept + { + if (num_threads == 0) + return static_cast(n); + // ceiling division: ceil(n / num_threads) -> one chunk per worker thread + return static_cast( + (n + static_cast(num_threads) - 1) / num_threads); + } + + /// Round a chunk up to a multiple of 16 when it is + /// smaller than size + HPX_CXX_CORE_EXPORT constexpr std::uint32_t align_chunk_for_vectorization( + std::uint32_t chunk, std::uint32_t const size) noexcept + { + constexpr std::uint32_t g = 16; + if (chunk == 0 || chunk >= size) + return chunk; + std::uint64_t c = chunk; + if (c % g != 0) + { + c = ((c + g - 1) / g) * g; + } + if (c > size) + c = size; + return static_cast(c); + } + // For bulk_unchunked: f(index, ...) HPX_CXX_CORE_EXPORT template @@ -149,9 +183,9 @@ namespace hpx::execution::experimental::detail { using index_pack_type = hpx::detail::fused_index_pack_t; auto const i_begin = - static_cast(index) * task_f->chunk_size; - auto const i_end = - (std::min) (i_begin + task_f->chunk_size, task_f->size); + static_cast(index) * op_state->chunk_size; + auto const i_end = (std::min) (i_begin + op_state->chunk_size, + static_cast(op_state->size)); if constexpr (OperationState::is_chunked) { @@ -161,12 +195,14 @@ namespace hpx::execution::experimental::detail { } else { - // bulk_unchunked: f(index, values...) for each element - // In unchunked case, chunk_size is 1 - // so each chunk will only have one element. - // The index used for invocation is i_begin. - bulk_scheduler_invoke_helper( - index_pack_type{}, op_state->f, i_begin, ts); + // bulk_unchunked: one element call f(shape_index, values...) per i. + auto it = std::ranges::next( + hpx::util::begin(op_state->shape), i_begin); + for (auto i = i_begin; i < i_end; ++i, ++it) + { + bulk_scheduler_invoke_helper( + index_pack_type{}, op_state->f, *it, ts); + } } } @@ -183,7 +219,7 @@ namespace hpx::execution::experimental::detail { do_work_chunk(ts, *index); } - if (task_f->allow_stealing) + if (op_state->allow_stealing) { // Then steal from the opposite end of the neighboring queues static constexpr auto opposite_end = @@ -216,7 +252,7 @@ namespace hpx::execution::experimental::detail { void operator()(Ts& ts) const { // schedule chunks from the end, if needed - if (task_f->reverse_placement) + if (op_state->reverse_placement) { do_work(ts); } @@ -257,11 +293,7 @@ namespace hpx::execution::experimental::detail { struct task_function { OperationState* const op_state; - std::size_t const size; - std::uint32_t const chunk_size; std::uint32_t const worker_thread; - bool reverse_placement; - bool allow_stealing; // Visit the values sent by the predecessor sender. void do_work() const @@ -287,7 +319,8 @@ namespace hpx::execution::experimental::detail { // Otherwise, it will call set_value on the connected receiver. void finish() const { - if (--(op_state->tasks_remaining.data_) == 0) + if (op_state->tasks_remaining.data_.fetch_sub( + 1, std::memory_order_acq_rel) == 1) { if (op_state->bad_alloc_thrown.load(std::memory_order_relaxed)) { @@ -349,18 +382,29 @@ namespace hpx::execution::experimental::detail { OperationState* op_state; template - void set_error(E&& e) && noexcept + void set_error(E&& e) & noexcept { hpx::execution::experimental::set_error( HPX_MOVE(op_state->receiver), HPX_FORWARD(E, e)); } - void set_stopped() && noexcept + template + void set_error(E&& e) && noexcept + { + static_cast(*this).set_error(HPX_FORWARD(E, e)); + } + + void set_stopped() & noexcept { hpx::execution::experimental::set_stopped( HPX_MOVE(op_state->receiver)); } + void set_stopped() && noexcept + { + static_cast(*this).set_stopped(); + } + // Initialize a queue for a worker thread. void init_queue_depth_first(std::size_t const worker_thread, std::size_t const size, std::size_t num_threads) noexcept @@ -476,32 +520,69 @@ namespace hpx::execution::experimental::detail { return; } - // Calculate chunk size based on execution mode + // Calculate chunk size based on execution mode and sequential policy std::uint32_t chunk_size; std::uint32_t num_chunks; - if constexpr (OperationState::is_chunked) + + // For sequential policy: single chunk covering entire range + if constexpr (!OperationState::is_parallel) { - chunk_size = get_bulk_scheduler_chunk_size( + if constexpr (OperationState::is_chunked) + { + chunk_size = size; + num_chunks = 1; + } + else + { + chunk_size = 1; + num_chunks = size; + } + op_state->num_worker_threads = 1; + } + else if constexpr (OperationState::is_chunked) + { + // One large chunk per worker thread: minimises queue overhead + // and maximises locality for memory-bound work. + chunk_size = get_bulk_scheduler_chunk_size_chunked( op_state->num_worker_threads, size); num_chunks = (size + chunk_size - 1) / chunk_size; } else { - chunk_size = 1; - num_chunks = size; + chunk_size = get_bulk_scheduler_chunk_size( + op_state->num_worker_threads, size); + num_chunks = (size + chunk_size - 1) / chunk_size; + } + + if constexpr (OperationState::is_unsequenced && + OperationState::is_parallel) + { + chunk_size = align_chunk_for_vectorization(chunk_size, size); + num_chunks = (size + chunk_size - 1) / chunk_size; } // launch only as many tasks as we have chunks std::size_t const num_pus = op_state->num_worker_threads; - if (num_chunks < + if constexpr (!OperationState::is_parallel) + { + // Sequential: force single task execution + op_state->tasks_remaining.data_.store( + 1, std::memory_order_relaxed); + op_state->pu_mask = detail::limit_mask(op_state->pu_mask, 1); + } + else if (num_chunks < static_cast(op_state->num_worker_threads)) { op_state->num_worker_threads = num_chunks; - op_state->tasks_remaining.data_ = num_chunks; + op_state->tasks_remaining.data_.store( + num_chunks, std::memory_order_relaxed); op_state->pu_mask = detail::limit_mask(op_state->pu_mask, num_chunks); } + op_state->size = size; + op_state->chunk_size = chunk_size; + HPX_ASSERT(hpx::threads::count(op_state->pu_mask) == op_state->num_worker_threads); @@ -552,10 +633,10 @@ namespace hpx::execution::experimental::detail { rp.get_pu_num(local_worker_thread + op_state->first_thread); } - bool reverse_placement = + op_state->reverse_placement = hint.placement_mode() == placement::depth_first_reverse || hint.placement_mode() == placement::breadth_first_reverse; - bool allow_stealing = + op_state->allow_stealing = !hpx::threads::do_not_share_function(hint.sharing_mode()); for (std::uint32_t pu = 0; @@ -591,8 +672,7 @@ namespace hpx::execution::experimental::detail { // Schedule task for this worker thread do_work_task( - task_function{op_state, size, chunk_size, - worker_thread, reverse_placement, allow_stealing}); + task_function{op_state, worker_thread}); ++worker_thread; } @@ -604,14 +684,19 @@ namespace hpx::execution::experimental::detail { // Handle the queue for the local thread. if (main_thread_ok) { - do_work_local(task_function{this->op_state, - size, chunk_size, local_worker_thread, reverse_placement, - allow_stealing}); + do_work_local(task_function{ + this->op_state, local_worker_thread}); } } template - void set_value(Ts&&... ts) && noexcept + requires((OperationState::is_chunked && + std::invocable...>) || + (!OperationState::is_chunked && + std::invocable...>) ) + void set_value(Ts&&... ts) & noexcept { hpx::detail::try_catch_exception_ptr( [&]() { this->execute(HPX_FORWARD(Ts, ts)...); }, @@ -620,6 +705,19 @@ namespace hpx::execution::experimental::detail { HPX_MOVE(op_state->receiver), HPX_MOVE(ep)); }); } + + template + requires((OperationState::is_chunked && + std::invocable...>) || + (!OperationState::is_chunked && + std::invocable...>) ) + void set_value(Ts&&... ts) && noexcept + { + static_cast(*this).set_value( + HPX_FORWARD(Ts, ts)...); + } }; // This sender represents bulk work that will be performed using the @@ -639,7 +737,8 @@ namespace hpx::execution::experimental::detail { // threads. // HPX_CXX_CORE_EXPORT template + typename Shape, typename F, bool IsChunked, bool IsParallel, + bool IsUnsequenced> class thread_pool_bulk_sender { private: @@ -709,6 +808,13 @@ namespace hpx::execution::experimental::detail { std::decay_t const& pred_snd; thread_pool_policy_scheduler const& sch; + constexpr auto query( + hpx::execution::experimental::get_completion_scheduler_t< + hpx::execution::experimental::set_value_t>) const noexcept + { + return sch; + } + template requires(meta::value>) @@ -745,7 +851,7 @@ namespace hpx::execution::experimental::detail { } }; - // It may be also be correct to forward the entire env of the + // It may also be correct to forward the entire env of the // pred. sender. constexpr auto get_env() const noexcept { @@ -757,6 +863,8 @@ namespace hpx::execution::experimental::detail { struct operation_state { static constexpr bool is_chunked = IsChunked; + static constexpr bool is_parallel = IsParallel; + static constexpr bool is_unsequenced = IsUnsequenced; using operation_state_type = hpx::execution::experimental::connect_result_t>> queues; + HPX_NO_UNIQUE_ADDRESS std::decay_t shape; HPX_NO_UNIQUE_ADDRESS std::decay_t f; HPX_NO_UNIQUE_ADDRESS std::decay_t receiver; @@ -811,6 +925,13 @@ namespace hpx::execution::experimental::detail { void start() & noexcept { + auto stop_token = + stdexec::get_stop_token(stdexec::get_env(receiver)); + if (stop_token.stop_requested()) + { + stdexec::set_stopped(HPX_MOVE(receiver)); + return; + } hpx::execution::experimental::start(op_state); } }; diff --git a/libs/core/executors/src/parallel_scheduler.cpp b/libs/core/executors/src/parallel_scheduler.cpp new file mode 100644 index 000000000000..cccd481f1ce1 --- /dev/null +++ b/libs/core/executors/src/parallel_scheduler.cpp @@ -0,0 +1,387 @@ +// Copyright (c) 2025 Sai Charan Arvapally +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace hpx::execution::experimental { + + namespace detail { + + // Default HPX backend: wraps the existing thread_pool_policy_scheduler. + // This is the backend returned by query_parallel_scheduler_backend() + // unless the user provides a replacement at runtime. + class hpx_parallel_scheduler_backend final + : public parallel_scheduler_backend + { + public: + explicit hpx_parallel_scheduler_backend( + thread_pool_policy_scheduler sched) + : scheduler_(sched) + , pu_mask_(hpx::execution::experimental::detail::full_mask( + hpx::execution::experimental::get_first_core(scheduler_), + hpx::execution::experimental::processing_units_count( + hpx::execution::experimental::null_parameters, + scheduler_, hpx::chrono::null_duration, 0))) + { + } + + void schedule(parallel_scheduler_receiver_proxy& proxy, + std::span) noexcept override + { + hpx::detail::try_catch_exception_ptr( + [&]() { + scheduler_.execute( + [&proxy]() mutable { proxy.set_value(); }); + }, + [&](std::exception_ptr ep) { + proxy.set_error(HPX_MOVE(ep)); + }); + } + + void schedule_bulk_chunked(std::size_t count, + parallel_scheduler_bulk_item_receiver_proxy& proxy, + std::span) noexcept override + { + hpx::detail::try_catch_exception_ptr( + [&]() { + if (count == 0) + { + proxy.set_value(); + return; + } + + auto const num_threads = static_cast( + hpx::execution::experimental:: + processing_units_count( + hpx::execution::experimental:: + null_parameters, + scheduler_, hpx::chrono::null_duration, 0)); + auto const chunk_size = static_cast( + hpx::execution::experimental::detail:: + get_bulk_scheduler_chunk_size_chunked( + num_threads, count)); + auto const n_chunks = + (count + chunk_size - 1) / chunk_size; + + auto sync = std::make_shared(n_chunks); + std::size_t chunks_posted = 0; + + for (std::size_t c = 0; c < n_chunks; ++c) + { + auto const begin = c * chunk_size; + auto const end = + (std::min) (begin + chunk_size, count); + + bool post_ok = true; + hpx::detail::try_catch_exception_ptr( + [&]() { + // Each task owns a copy of the shared_ptr, + // keeping sync alive until the last task + // finishes (i.e., until set_value/set_error + // is called). + scheduler_.execute( + [&proxy, sync, begin, end]() noexcept { + proxy.execute(begin, end); + if (sync->decrement()) + sync->signal(proxy); + }); + ++chunks_posted; + }, + [&](std::exception_ptr ep) { + post_ok = false; + sync->try_set_error(HPX_MOVE(ep)); + }); + + if (!post_ok) + break; + } + + // Retire any chunks that were never posted so the + // countdown can reach zero even when posting failed. + auto const not_posted = n_chunks - chunks_posted; + if (not_posted > 0 && sync->decrement(not_posted)) + sync->signal(proxy); + }, + [&](std::exception_ptr ep) { + // Setup (make_shared / chunk size computation) threw; + // no tasks have been posted yet. + proxy.set_error(HPX_MOVE(ep)); + }); + } + + void schedule_bulk_unchunked(std::size_t count, + parallel_scheduler_bulk_item_receiver_proxy& proxy, + std::span) noexcept override + { + hpx::detail::try_catch_exception_ptr( + [&]() { + if (count == 0) + { + proxy.set_value(); + return; + } + + auto const num_threads = static_cast( + hpx::execution::experimental:: + processing_units_count( + hpx::execution::experimental:: + null_parameters, + scheduler_, hpx::chrono::null_duration, 0)); + // Reuse the chunked helper: ceil(count / num_threads) + // elements per task, giving roughly one task per thread. + auto const chunk_size = static_cast( + hpx::execution::experimental::detail:: + get_bulk_scheduler_chunk_size_chunked( + num_threads, count)); + auto const n_chunks = + (count + chunk_size - 1) / chunk_size; + + auto sync = std::make_shared(n_chunks); + std::size_t chunks_posted = 0; + + for (std::size_t c = 0; c < n_chunks; ++c) + { + auto const begin = c * chunk_size; + auto const end = + (std::min) (begin + chunk_size, count); + + bool post_ok = true; + hpx::detail::try_catch_exception_ptr( + [&]() { + scheduler_.execute( + [&proxy, sync, begin, end]() noexcept { + // Call execute(i, i+1) for every + // element in this task's slice. + for (std::size_t i = begin; i < end; + ++i) + proxy.execute(i, i + 1); + if (sync->decrement()) + sync->signal(proxy); + }); + ++chunks_posted; + }, + [&](std::exception_ptr ep) { + post_ok = false; + sync->try_set_error(HPX_MOVE(ep)); + }); + + if (!post_ok) + break; + } + + auto const not_posted = n_chunks - chunks_posted; + if (not_posted > 0 && sync->decrement(not_posted)) + sync->signal(proxy); + }, + [&](std::exception_ptr ep) { + proxy.set_error(HPX_MOVE(ep)); + }); + } + + bool equal_to( + parallel_scheduler_backend const& other) const noexcept override + { + auto const* p = + dynamic_cast(&other); + return p != nullptr && p->scheduler_ == scheduler_; + } + + thread_pool_policy_scheduler const* + get_underlying_scheduler() const noexcept override + { + return &scheduler_; + } + + hpx::threads::mask_type const* get_pu_mask() const noexcept override + { + return &pu_mask_; + } + + private: + thread_pool_policy_scheduler scheduler_; + hpx::threads::mask_type pu_mask_; + + // Shared synchronization state for a single parallel bulk dispatch. + // One instance is created per schedule_bulk_* call and shared among + // all chunk tasks via shared_ptr. + // + // Lifetime guarantee: the shared_ptr keeps this object alive until + // the last task drops its copy, which only happens after one of the + // completion signals (set_value / set_error) has been called on the + // proxy. The proxy itself is guaranteed alive until that point by + // the P2079R10 precondition on schedule_bulk_chunked/unchunked. + struct bulk_sync_state + { + // Counts down from n_chunks to 0. The task that observes 0 is + // responsible for calling the completion signal on the proxy. + std::atomic remaining; + + // Set to true by the first task that encounters an error. + // Written before remaining reaches 0, so the acq_rel fence on + // remaining guarantees visibility for the completing task. + std::atomic has_error{false}; + + // Stores the first error. Protected by the has_error CAS: + // only one thread writes it, and it is read after acquiring + // has_error with memory_order_acquire. + std::exception_ptr first_error; + + explicit bulk_sync_state(std::size_t n) noexcept + : remaining(n) + { + } + + // Record ep as the first error (thread-safe; first caller wins). + void try_set_error(std::exception_ptr ep) noexcept + { + bool expected = false; + if (has_error.compare_exchange_strong( + expected, true, std::memory_order_acq_rel)) + { + first_error = HPX_MOVE(ep); + } + } + + // Subtract n from remaining. Returns true iff remaining was + // exactly n before the subtraction (i.e., it is now 0). + bool decrement(std::size_t n = 1) noexcept + { + return remaining.fetch_sub(n, std::memory_order_acq_rel) == + n; + } + + // Call set_value or set_error on proxy based on error state. + // Must only be called by the single task for which decrement() + // returned true (i.e., the task that made remaining reach 0). + void signal( + parallel_scheduler_bulk_item_receiver_proxy& proxy) noexcept + { + if (has_error.load(std::memory_order_acquire)) + proxy.set_error(HPX_MOVE(first_error)); + else + proxy.set_value(); + } + }; + }; + + // Singleton-like shared thread pool for parallel_scheduler + static hpx::threads::thread_pool_base* get_default_parallel_pool() + { + // clang-format off + static hpx::threads::thread_pool_base* default_pool = + hpx::threads::detail::get_self_or_default_pool(); + // clang-format on + return default_pool; + } + + // Default factory creates the HPX backend + static std::shared_ptr + default_parallel_scheduler_backend_factory() + { + auto pool = get_default_parallel_pool(); + if (!pool) + { + std::terminate(); + } + return std::make_shared( + thread_pool_policy_scheduler( + pool, hpx::launch::async)); + } + + // Mutex protecting the live backend instance. + static std::mutex& get_backend_mutex() noexcept + { + static std::mutex mtx; + return mtx; + } + + // The live backend instance. nullptr until first query. + // Protected by get_backend_mutex(). + static std::shared_ptr& + get_backend_storage() noexcept + { + static std::shared_ptr backend; + return backend; + } + + // Storage for the current factory (only used to create the first + // backend, or after set_parallel_scheduler_backend() clears the + // current one). + static parallel_scheduler_backend_factory_t& + get_backend_factory_storage() noexcept + { + static parallel_scheduler_backend_factory_t factory = + &default_parallel_scheduler_backend_factory; + return factory; + } + + } // namespace detail + + std::shared_ptr + query_parallel_scheduler_backend() + { + std::lock_guard lock(detail::get_backend_mutex()); + auto& storage = detail::get_backend_storage(); + if (!storage) + { + storage = detail::get_backend_factory_storage()(); + } + return storage; + } + + parallel_scheduler_backend_factory_t set_parallel_scheduler_backend_factory( + parallel_scheduler_backend_factory_t new_factory) noexcept + { + std::lock_guard lock(detail::get_backend_mutex()); + auto& storage = detail::get_backend_factory_storage(); + auto old = storage; + storage = new_factory; + return old; + } + + void set_parallel_scheduler_backend( + std::shared_ptr new_backend) + { + std::lock_guard lock(detail::get_backend_mutex()); + detail::get_backend_storage() = HPX_MOVE(new_backend); + } + + parallel_scheduler get_parallel_scheduler() + { + auto backend = query_parallel_scheduler_backend(); + if (!backend) + { + // As per P2079R10, terminate if backend is unavailable. + std::terminate(); + } + return parallel_scheduler(HPX_MOVE(backend)); + } + + std::ostream& operator<<(std::ostream& os, parallel_scheduler const&) + { + return os << "parallel_scheduler"; + } + +} // namespace hpx::execution::experimental diff --git a/libs/core/executors/tests/unit/CMakeLists.txt b/libs/core/executors/tests/unit/CMakeLists.txt index cfc469a07049..326468de85a3 100644 --- a/libs/core/executors/tests/unit/CMakeLists.txt +++ b/libs/core/executors/tests/unit/CMakeLists.txt @@ -17,6 +17,7 @@ set(tests parallel_executor_parameters parallel_fork_executor parallel_policy_executor + parallel_scheduler polymorphic_executor scheduler_executor sequenced_executor @@ -59,7 +60,9 @@ endforeach() if(HPX_WITH_CXX_MODULES AND (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")) # Clang (last tested version is v22) fails compiling the following tests when # C++ module support is enabled. - set(failing_clang_tests explicit_scheduler_executor thread_pool_scheduler) + set(failing_clang_tests explicit_scheduler_executor parallel_scheduler + thread_pool_scheduler + ) foreach(test ${failing_clang_tests}) target_compile_definitions( ${test}_test PRIVATE HPX_HAVE_FORCE_NO_CXX_MODULES diff --git a/libs/core/executors/tests/unit/parallel_scheduler.cpp b/libs/core/executors/tests/unit/parallel_scheduler.cpp new file mode 100644 index 000000000000..55b0331c2245 --- /dev/null +++ b/libs/core/executors/tests/unit/parallel_scheduler.cpp @@ -0,0 +1,1113 @@ +// Copyright (c) 2025 Sai Charan Arvapally +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ex = hpx::execution::experimental; + +// Include stdexec async_scope for stop token testing +#include + +int hpx_main(int, char*[]) +{ + // Type and Concept Tests + // parallel_scheduler models scheduler concept + { + auto sched = ex::get_parallel_scheduler(); + static_assert(ex::scheduler, + "parallel_scheduler must model scheduler"); + } + + // parallel_scheduler is not default constructible + { + static_assert(!std::is_default_constructible_v, + "parallel_scheduler should not be default constructible"); + static_assert(std::is_destructible_v, + "parallel_scheduler should be destructible"); + } + + // parallel_scheduler is copyable and movable + { + static_assert(std::is_copy_constructible_v, + "parallel_scheduler should be copy constructible"); + static_assert(std::is_move_constructible_v, + "parallel_scheduler should be move constructible"); + static_assert( + std::is_nothrow_copy_constructible_v, + "copy constructor should be noexcept"); + static_assert( + std::is_nothrow_move_constructible_v, + "move constructor should be noexcept"); + static_assert(std::is_nothrow_copy_assignable_v, + "copy assignment should be noexcept"); + static_assert(std::is_nothrow_move_assignable_v, + "move assignment should be noexcept"); + } + + // A copied scheduler is equal to the original + { + auto sched1 = ex::get_parallel_scheduler(); + auto sched2 = sched1; + HPX_TEST(sched1 == sched2); + } + + // Two schedulers from get_parallel_scheduler() are equal + { + auto sched1 = ex::get_parallel_scheduler(); + auto sched2 = ex::get_parallel_scheduler(); + HPX_TEST(sched1 == sched2); + } + + // schedule() produces a sender + { + auto snd = ex::schedule(ex::get_parallel_scheduler()); + using sender_t = decltype(snd); + + static_assert( + ex::sender, "schedule() result must model sender"); + static_assert(ex::sender_of, + "schedule() result must be sender_of"); + static_assert(ex::sender_of, + "schedule() result must be sender_of"); + } + + // Trivial schedule task (bare sync_wait, no then) + { + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + ex::sync_wait(ex::schedule(sched)); + } + + // Simple schedule runs on thread pool (work executes on the + // scheduler's context, which may be the calling thread with + // cooperative sync_wait) + { + std::thread::id pool_id{}; + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto snd = ex::then( + ex::schedule(sched), [&] { pool_id = std::this_thread::get_id(); }); + + ex::sync_wait(std::move(snd)); + + HPX_TEST(pool_id != std::thread::id{}); + } + + // Forward progress guarantee is parallel + { + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + HPX_TEST(ex::get_forward_progress_guarantee(sched) == + ex::forward_progress_guarantee::parallel); + } + + // get_completion_scheduler returns the scheduler + { + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + HPX_TEST(ex::get_completion_scheduler( + ex::get_env(ex::schedule(sched))) == sched); + } + + // Chain task: two then calls execute on same thread + { + std::thread::id pool_id{}; + std::thread::id pool_id2{}; + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto snd = ex::then( + ex::schedule(sched), [&] { pool_id = std::this_thread::get_id(); }); + auto snd2 = ex::then( + std::move(snd), [&] { pool_id2 = std::this_thread::get_id(); }); + + ex::sync_wait(std::move(snd2)); + + HPX_TEST(pool_id != std::thread::id{}); + HPX_TEST(pool_id == pool_id2); + } + + // P2079R10 example: schedule + then chain with values + { + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + auto begin = ex::schedule(sched); + auto hi = ex::then(begin, [] { return 13; }); + auto add_42 = ex::then(hi, [](int arg) { return arg + 42; }); + auto [i] = ex::sync_wait(add_42).value(); + HPX_TEST_EQ(i, 55); + } + + // Error propagation + { + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + bool caught_error = false; + + auto snd = ex::schedule(sched) | + ex::then([]() -> int { throw std::runtime_error("test error"); }); + + try + { + ex::sync_wait(std::move(snd)); + HPX_TEST(false); + } + catch (std::runtime_error const& e) + { + caught_error = true; + HPX_TEST_EQ(std::string(e.what()), std::string("test error")); + } + HPX_TEST(caught_error); + } + + // when_all with multiple senders + { + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto s1 = ex::schedule(sched) | ex::then([] { return 1; }); + auto s2 = ex::schedule(sched) | ex::then([] { return 2; }); + auto s3 = ex::schedule(sched) | ex::then([] { return 3; }); + + auto [r1, r2, r3] = ex::sync_wait(ex::when_all(s1, s2, s3)).value(); + HPX_TEST_EQ(r1, 1); + HPX_TEST_EQ(r2, 2); + HPX_TEST_EQ(r3, 3); + } + + // Simple bulk task + { + constexpr std::size_t num_tasks = 16; + std::thread::id pool_ids[num_tasks]{}; + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto bulk_snd = ex::schedule(sched) | + ex::bulk_unchunked(ex::par, num_tasks, [&](unsigned long id) { + pool_ids[id] = std::this_thread::get_id(); + }); + + ex::sync_wait(std::move(bulk_snd)); + + for (auto pool_id : pool_ids) + { + HPX_TEST(pool_id != std::thread::id{}); + } + } + + // Bulk chaining with value propagation + { + constexpr std::size_t num_tasks = 16; + std::thread::id pool_id{}; + std::thread::id propagated_pool_ids[num_tasks]{}; + std::thread::id pool_ids[num_tasks]{}; + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto snd = ex::then(ex::schedule(sched), [&] { + pool_id = std::this_thread::get_id(); + return pool_id; + }); + + auto bulk_snd = std::move(snd) | + ex::bulk_unchunked(ex::par, num_tasks, + [&](unsigned long id, std::thread::id propagated_pool_id) { + propagated_pool_ids[id] = propagated_pool_id; + pool_ids[id] = std::this_thread::get_id(); + }); + + std::optional> res = + ex::sync_wait(std::move(bulk_snd)); + + // first schedule ran on the scheduler's context + HPX_TEST(pool_id != std::thread::id{}); + + // bulk items ran and propagated the received value + for (std::size_t i = 0; i < num_tasks; ++i) + { + HPX_TEST(pool_ids[i] != std::thread::id{}); + HPX_TEST(propagated_pool_ids[i] == pool_id); + } + + // result of bulk is the same as the first schedule + HPX_TEST(res.has_value()); + HPX_TEST(std::get<0>(res.value()) == pool_id); + } + + // Bulk error handling + { + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + bool caught_error = false; + + auto bulk_snd = ex::schedule(sched) | + ex::bulk_unchunked(ex::par, 20, [](std::size_t i) { + if (i == 10) + throw std::runtime_error("Bulk error"); + }); + + try + { + ex::sync_wait(std::move(bulk_snd)); + HPX_TEST(false); + } + catch (std::runtime_error const& e) + { + caught_error = true; + HPX_TEST( + std::string(e.what()).find("Bulk error") != std::string::npos); + } + HPX_TEST(caught_error); + } + + // Simple bulk_chunked task + { + constexpr std::size_t num_tasks = 16; + std::thread::id pool_ids[num_tasks]{}; + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto bulk_snd = ex::bulk_chunked(ex::schedule(sched), ex::par, + num_tasks, [&](unsigned long b, unsigned long e) { + for (unsigned long id = b; id < e; ++id) + pool_ids[id] = std::this_thread::get_id(); + }); + + ex::sync_wait(std::move(bulk_snd)); + + for (auto pool_id : pool_ids) + { + HPX_TEST(pool_id != std::thread::id{}); + } + } + + // bulk_chunked performs chunking (with large shape) + { + std::atomic has_chunking{false}; + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto bulk_snd = ex::bulk_chunked(ex::schedule(sched), ex::par, 10000, + [&](unsigned long b, unsigned long e) { + if ((e - b) > 1) + has_chunking = true; + }); + + ex::sync_wait(std::move(bulk_snd)); + HPX_TEST(has_chunking.load()); + } + + // bulk_chunked covers the entire range + { + constexpr std::size_t num_tasks = 200; + bool covered[num_tasks]{}; + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto bulk_snd = ex::bulk_chunked(ex::schedule(sched), ex::par, + num_tasks, [&](unsigned long b, unsigned long e) { + for (auto i = b; i < e; ++i) + covered[i] = true; + }); + + ex::sync_wait(std::move(bulk_snd)); + + for (std::size_t i = 0; i < num_tasks; ++i) + { + HPX_TEST(covered[i]); + } + } + + // bulk_chunked with seq doesn't do chunking (single chunk) + { + constexpr std::size_t num_tasks = 200; + std::atomic execution_count{0}; + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto bulk_snd = ex::bulk_chunked(ex::schedule(sched), ex::seq, + num_tasks, [&](std::size_t b, std::size_t e) { + HPX_TEST_EQ(b, std::size_t(0)); + HPX_TEST_EQ(e, num_tasks); + execution_count++; + }); + + ex::sync_wait(std::move(bulk_snd)); + + // Per P2079R10 reference: seq should produce exactly 1 chunk + // with b==0, e==num_tasks. + HPX_TEST_EQ(execution_count.load(), 1); + } + + // Simple bulk_unchunked task + { + constexpr std::size_t num_tasks = 16; + std::thread::id pool_ids[num_tasks]{}; + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto bulk_snd = ex::bulk_unchunked( + ex::schedule(sched), ex::par, num_tasks, [&](unsigned long id) { + pool_ids[id] = std::this_thread::get_id(); + }); + + ex::sync_wait(std::move(bulk_snd)); + + for (auto pool_id : pool_ids) + { + HPX_TEST(pool_id != std::thread::id{}); + } + } + + // bulk_unchunked with seq runs everything on one thread + { + constexpr std::size_t num_tasks = 16; + std::thread::id pool_ids[num_tasks]{}; + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto bulk_snd = ex::bulk_unchunked( + ex::schedule(sched), ex::seq, num_tasks, [&](unsigned long id) { + pool_ids[id] = std::this_thread::get_id(); + std::this_thread::sleep_for(std::chrono::milliseconds{1}); + }); + + ex::sync_wait(std::move(bulk_snd)); + + for (auto pool_id : pool_ids) + { + HPX_TEST(pool_id != std::thread::id{}); + // Per P2079R10 reference: all should be on same thread with seq. + HPX_TEST(pool_id == pool_ids[0]); + } + } + + // bulk with par_unseq) + { + constexpr std::size_t num_tasks = 128; + std::atomic count{0}; + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto bulk_snd = ex::schedule(sched) | + ex::bulk_unchunked(ex::par_unseq, num_tasks, [&](std::size_t) { + count.fetch_add(1, std::memory_order_relaxed); + }); + + ex::sync_wait(std::move(bulk_snd)); + HPX_TEST_EQ(count.load(), num_tasks); + } + + // Stop token support test (P2079R10 requirement) + { + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + experimental::execution::async_scope scope; + scope.request_stop(); + HPX_TEST(scope.get_stop_source().stop_requested()); + + bool called = false; + auto snd = ex::then(ex::schedule(sched), [&called] { called = true; }); + + scope.spawn(std::move(snd)); + ex::sync_wait(scope.on_empty()); + + HPX_TEST(!called); + } + + // Test set_value_t completion scheduler query + { + auto sched = ex::get_parallel_scheduler(); + auto snd = ex::schedule(sched); + auto env = ex::get_env(snd); + + // Query the completion scheduler for set_value_t + auto completion_sched = + ex::get_completion_scheduler(env); + HPX_TEST_EQ(completion_sched, sched); + } + + // Test that set_stopped_t IS now exposed (per project decision / Isidoros) + { + auto sched = ex::get_parallel_scheduler(); + auto snd = ex::schedule(sched); + auto env = ex::get_env(snd); + + auto stopped_sched = + ex::get_completion_scheduler(env); + HPX_TEST_EQ(stopped_sched, sched); + } + + // Test receiver double-move safety: if execute() throws, receiver is still valid + { + auto sched = ex::get_parallel_scheduler(); + auto snd = ex::schedule(sched) | ex::then([]() { return 42; }); + + // This should complete successfully without double-move issues + ex::sync_wait(std::move(snd)); + } + + // Test bulk_unchunked with completes_on pattern + { + auto sched = ex::get_parallel_scheduler(); + std::vector v(10, 0); + + auto snd = ex::schedule(sched) | ex::then([]() { return 77; }) | + ex::bulk_unchunked( + ex::par, 10, [&v](std::size_t i, int val) { v[i] = val; }); + + ex::sync_wait(std::move(snd)); + + // All elements should be set to 77 + for (int i = 0; i < 10; ++i) + { + HPX_TEST_EQ(v[i], 77); + } + } + + // Test bulk_unchunked with multiple value arguments + { + auto sched = ex::get_parallel_scheduler(); + std::vector v(10, 0); + + auto snd = ex::schedule(sched) | ex::then([]() { return 88; }) | + ex::bulk_unchunked( + ex::par, 10, [&v](std::size_t i, int val) { v[i] = val; }); + + ex::sync_wait(std::move(snd)); + + // All elements should be set to 88 + for (int i = 0; i < 10; ++i) + { + HPX_TEST_EQ(v[i], 88); + } + } + + // Test sequential bulk with completes_on + { + auto sched = ex::get_parallel_scheduler(); + std::vector v(5, 0); + std::set thread_ids; + + auto snd = ex::schedule(sched) | ex::then([]() { return 55; }) | + ex::bulk_chunked(ex::seq, 5, + [&v, &thread_ids](std::size_t begin, std::size_t end, int val) { + for (std::size_t i = begin; i < end; ++i) + v[i] = val; + thread_ids.insert(std::this_thread::get_id()); + }); + + ex::sync_wait(std::move(snd)); + + // All elements should be set to 55 + for (int i = 0; i < 5; ++i) + { + HPX_TEST_EQ(v[i], 55); + } + // Sequential execution should use only 1 thread + HPX_TEST_EQ(thread_ids.size(), std::size_t(1)); + } + + // Unchunked internal chunking: large shape covers entire range + { + constexpr std::size_t n = 100000; + auto sched = ex::get_parallel_scheduler(); + std::vector> flags(n); + for (auto& f : flags) + f.store(0, std::memory_order_relaxed); + + auto snd = ex::bulk_unchunked( + ex::schedule(sched), ex::par, n, [&](std::size_t i) { + flags[i].fetch_add(1, std::memory_order_relaxed); + }); + + ex::sync_wait(std::move(snd)); + + for (std::size_t i = 0; i < n; ++i) + { + HPX_TEST_EQ(flags[i].load(), 1); + } + } + + // Unchunked internal chunking: value propagation with large shape + { + constexpr std::size_t n = 50000; + auto sched = ex::get_parallel_scheduler(); + std::vector results(n, 0); + + auto snd = ex::schedule(sched) | ex::then([]() { return 7; }) | + ex::bulk_unchunked(ex::par, n, + [&](std::size_t i, int val) { results[i] = val + 1; }); + + auto [passthrough] = ex::sync_wait(std::move(snd)).value(); + HPX_TEST_EQ(passthrough, 7); + + for (std::size_t i = 0; i < n; ++i) + { + HPX_TEST_EQ(results[i], 8); + } + } + + // Unchunked + bulk large shape covers entire range + { + constexpr std::size_t n = 100000; + auto sched = ex::get_parallel_scheduler(); + std::vector> flags(n); + for (auto& f : flags) + f.store(0, std::memory_order_relaxed); + + auto snd = ex::schedule(sched) | + ex::bulk_unchunked(ex::par, n, [&](std::size_t i) { + flags[i].fetch_add(1, std::memory_order_relaxed); + }); + + ex::sync_wait(std::move(snd)); + + for (std::size_t i = 0; i < n; ++i) + { + HPX_TEST_EQ(flags[i].load(), 1); + } + } + + // Chained bulk: bulk -> then -> bulk (composability via sender chaining) + { + constexpr std::size_t n = 256; + auto sched = ex::get_parallel_scheduler(); + std::vector> phase1(n); + std::vector> phase2(n); + for (auto& p : phase1) + p.store(0, std::memory_order_relaxed); + for (auto& p : phase2) + p.store(0, std::memory_order_relaxed); + + auto snd = ex::schedule(sched) | + ex::bulk_unchunked(ex::par, n, + [&](std::size_t i) { + phase1[i].store(1, std::memory_order_relaxed); + }) | + ex::bulk_unchunked(ex::par, n, [&](std::size_t i) { + phase2[i].store(phase1[i].load(std::memory_order_relaxed) + 1, + std::memory_order_relaxed); + }); + + ex::sync_wait(std::move(snd)); + + for (std::size_t i = 0; i < n; ++i) + { + HPX_TEST_EQ(phase1[i].load(), 1); + HPX_TEST_EQ(phase2[i].load(), 2); + } + } + + // Mixed bulk variants chained: bulk_chunked -> bulk_unchunked + { + constexpr std::size_t n = 200; + auto sched = ex::get_parallel_scheduler(); + std::vector> results(n); + for (auto& r : results) + r.store(0, std::memory_order_relaxed); + + auto snd = + ex::bulk_chunked(ex::schedule(sched), ex::par, n, + [&](std::size_t begin, std::size_t end) { + for (std::size_t i = begin; i < end; ++i) + results[i].fetch_add(10, std::memory_order_relaxed); + }) | + ex::bulk_unchunked(ex::par, n, [&](std::size_t i) { + results[i].fetch_add(1, std::memory_order_relaxed); + }); + + ex::sync_wait(std::move(snd)); + + for (std::size_t i = 0; i < n; ++i) + { + HPX_TEST_EQ(results[i].load(), 11); + } + } + + // P2079R10 Replaceability API tests + + // Backend via shared_ptr: two schedulers from get_parallel_scheduler share backend + { + auto sched1 = ex::get_parallel_scheduler(); + auto sched2 = ex::get_parallel_scheduler(); + HPX_TEST(sched1 == sched2); + + // Both share the same backend pointer + HPX_TEST(sched1.get_backend() == sched2.get_backend()); + } + + // Backend provides underlying scheduler (default HPX backend) + { + auto sched = ex::get_parallel_scheduler(); + auto const* underlying = sched.get_underlying_scheduler(); + HPX_TEST(underlying != nullptr); + } + + // Backend provides PU mask (default HPX backend) + { + auto sched = ex::get_parallel_scheduler(); + auto const* pu_mask = sched.get_pu_mask(); + HPX_TEST(pu_mask != nullptr); + } + + // query_parallel_scheduler_backend returns a valid backend + { + auto backend = ex::query_parallel_scheduler_backend(); + HPX_TEST(backend != nullptr); + } + + // Custom backend: schedule completes via proxy + { + struct counting_backend final : ex::parallel_scheduler_backend + { + std::atomic& schedule_count; + + explicit counting_backend(std::atomic& count) + : schedule_count(count) + { + } + + void schedule(ex::parallel_scheduler_receiver_proxy& proxy, + std::span) noexcept override + { + schedule_count.fetch_add(1, std::memory_order_relaxed); + proxy.set_value(); + } + + void schedule_bulk_chunked(std::size_t count, + ex::parallel_scheduler_bulk_item_receiver_proxy& proxy, + std::span) noexcept override + { + for (std::size_t b = 0; b < count; b += 64) + { + auto e = (std::min) (b + std::size_t(64), count); + proxy.execute(b, e); + } + proxy.set_value(); + } + + void schedule_bulk_unchunked(std::size_t count, + ex::parallel_scheduler_bulk_item_receiver_proxy& proxy, + std::span) noexcept override + { + for (std::size_t i = 0; i < count; ++i) + proxy.execute(i, i + 1); + proxy.set_value(); + } + + bool equal_to(ex::parallel_scheduler_backend const& other) + const noexcept override + { + return this == &other; + } + }; + + std::atomic count{0}; + auto backend = std::make_shared(count); + auto orig = ex::query_parallel_scheduler_backend(); + ex::set_parallel_scheduler_backend(backend); + auto sched = ex::get_parallel_scheduler(); + + // schedule through custom backend + auto snd = ex::schedule(sched) | ex::then([] { return 99; }); + auto [val] = ex::sync_wait(std::move(snd)).value(); + HPX_TEST_EQ(val, 99); + HPX_TEST(count.load() > 0); + + ex::set_parallel_scheduler_backend(orig); + } + + // Custom backend equality: same pointer => equal + { + struct dummy_backend final : ex::parallel_scheduler_backend + { + void schedule(ex::parallel_scheduler_receiver_proxy& proxy, + std::span) noexcept override + { + proxy.set_value(); + } + void schedule_bulk_chunked(std::size_t, + ex::parallel_scheduler_bulk_item_receiver_proxy& proxy, + std::span) noexcept override + { + proxy.set_value(); + } + void schedule_bulk_unchunked(std::size_t, + ex::parallel_scheduler_bulk_item_receiver_proxy& proxy, + std::span) noexcept override + { + proxy.set_value(); + } + bool equal_to(ex::parallel_scheduler_backend const& other) + const noexcept override + { + return this == &other; + } + }; + + auto b1 = std::make_shared(); + auto b2 = std::make_shared(); + + auto orig = ex::query_parallel_scheduler_backend(); + + ex::set_parallel_scheduler_backend(b1); + auto s1 = ex::get_parallel_scheduler(); + auto s2 = ex::get_parallel_scheduler(); // same backend + + ex::set_parallel_scheduler_backend(b2); + auto s3 = ex::get_parallel_scheduler(); // different backend + + HPX_TEST(s1 == s2); + HPX_TEST(!(s1 == s3)); + + ex::set_parallel_scheduler_backend(orig); + } + + // Default backend: schedulers from different get_parallel_scheduler() calls + // share the same backend and are equal + { + auto s1 = ex::get_parallel_scheduler(); + auto s2 = ex::get_parallel_scheduler(); + HPX_TEST(s1 == s2); + HPX_TEST(s1.get_backend().get() == s2.get_backend().get()); + } + + // set_parallel_scheduler_backend() actually replaces the live backend + { + struct marker_backend final : ex::parallel_scheduler_backend + { + std::atomic& hit; + explicit marker_backend(std::atomic& h) + : hit(h) + { + } + + void schedule(ex::parallel_scheduler_receiver_proxy& p, + std::span) noexcept override + { + hit.fetch_add(1, std::memory_order_relaxed); + p.set_value(); + } + void schedule_bulk_chunked(std::size_t, + ex::parallel_scheduler_bulk_item_receiver_proxy& p, + std::span) noexcept override + { + p.set_value(); + } + void schedule_bulk_unchunked(std::size_t, + ex::parallel_scheduler_bulk_item_receiver_proxy& p, + std::span) noexcept override + { + p.set_value(); + } + bool equal_to( + ex::parallel_scheduler_backend const& o) const noexcept override + { + return this == &o; + } + }; + + std::atomic hit{0}; + auto orig = ex::query_parallel_scheduler_backend(); + + // Install the marker backend + ex::set_parallel_scheduler_backend( + std::make_shared(hit)); + + // get_parallel_scheduler() must now use the marker backend + auto sched = ex::get_parallel_scheduler(); + ex::sync_wait(ex::schedule(sched)); + HPX_TEST(hit.load() > 0); + + // Restore the original backend so other tests are unaffected + ex::set_parallel_scheduler_backend(orig); + HPX_TEST(ex::get_parallel_scheduler() == ex::get_parallel_scheduler()); + } + + // Virtual bulk dispatch: custom backend that implements bulk via + // schedule_bulk_chunked. This verifies that the parallel_bulk_dispatch_sender + // correctly routes through the virtual path when get_underlying_scheduler() + // returns nullptr. + { + struct bulk_counting_backend final : ex::parallel_scheduler_backend + { + std::atomic& schedule_hits; + std::atomic& bulk_hits; + + bulk_counting_backend( + std::atomic& sched, std::atomic& bulk) + : schedule_hits(sched) + , bulk_hits(bulk) + { + } + + void schedule(ex::parallel_scheduler_receiver_proxy& p, + std::span) noexcept override + { + schedule_hits.fetch_add(1, std::memory_order_relaxed); + p.set_value(); + } + void schedule_bulk_chunked(std::size_t count, + ex::parallel_scheduler_bulk_item_receiver_proxy& p, + std::span) noexcept override + { + bulk_hits.fetch_add(1, std::memory_order_relaxed); + // Execute all elements in one chunk + if (count > 0) + p.execute(0, count); + p.set_value(); + } + void schedule_bulk_unchunked(std::size_t count, + ex::parallel_scheduler_bulk_item_receiver_proxy& p, + std::span) noexcept override + { + bulk_hits.fetch_add(1, std::memory_order_relaxed); + for (std::size_t i = 0; i < count; ++i) + p.execute(i, i + 1); + p.set_value(); + } + bool equal_to( + ex::parallel_scheduler_backend const& o) const noexcept override + { + return this == &o; + } + // Returns nullptr: triggers virtual dispatch path + }; + + std::atomic sched_hits{0}; + std::atomic bulk_hits{0}; + auto b = std::make_shared(sched_hits, bulk_hits); + auto orig = ex::query_parallel_scheduler_backend(); + ex::set_parallel_scheduler_backend(b); + auto sched = ex::get_parallel_scheduler(); + + // Bulk operation through virtual dispatch + std::vector results(10, 0); + auto bulk_snd = ex::schedule(sched) | + ex::bulk_unchunked( + ex::par, 10, [&results](std::size_t i) { results[i] = 42; }); + ex::sync_wait(std::move(bulk_snd)); + + // Verify: schedule was called (for the child sender) and + // bulk was dispatched through the backend + HPX_TEST(sched_hits.load() > 0); + HPX_TEST(bulk_hits.load() > 0); + for (int i = 0; i < 10; ++i) + { + HPX_TEST_EQ(results[i], 42); + } + + ex::set_parallel_scheduler_backend(orig); + } + + // stop_requested() on the proxy: returns false when no stop is in flight. + // The backend can call this to poll for cancellation during schedule(). + { + bool proxy_saw_stop = false; + + struct stop_check_backend final : ex::parallel_scheduler_backend + { + bool& saw_; + explicit stop_check_backend(bool& b) + : saw_(b) + { + } + + void schedule(ex::parallel_scheduler_receiver_proxy& p, + std::span) noexcept override + { + // No stop has been requested; proxy must report false. + saw_ = p.stop_requested(); + p.set_value(); + } + void schedule_bulk_chunked(std::size_t, + ex::parallel_scheduler_bulk_item_receiver_proxy& p, + std::span) noexcept override + { + p.set_value(); + } + void schedule_bulk_unchunked(std::size_t, + ex::parallel_scheduler_bulk_item_receiver_proxy& p, + std::span) noexcept override + { + p.set_value(); + } + bool equal_to( + ex::parallel_scheduler_backend const& o) const noexcept override + { + return this == &o; + } + }; + + auto b = std::make_shared(proxy_saw_stop); + auto orig = ex::query_parallel_scheduler_backend(); + ex::set_parallel_scheduler_backend(b); + auto sched = ex::get_parallel_scheduler(); + ex::sync_wait(ex::schedule(sched)); + HPX_TEST(!proxy_saw_stop); + ex::set_parallel_scheduler_backend(orig); + } + + // ======================================================================== + // P3804R2 VERIFICATION TESTS + // ======================================================================== + // These tests verify the P3804R2 specification for execution policy + // handling in bulk operations. P3804R2 clarifies that: + // - seq policy: Backend receives count=1, executes all work sequentially + // - par policy: Backend receives count=shape, distributes work in parallel + + // P3804R2: bulk_chunked with seq policy calls f(0, shape) exactly once + { + constexpr std::size_t num_tasks = 200; + std::atomic execution_count{0}; + std::size_t observed_begin = 999; + std::size_t observed_end = 999; + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto bulk_snd = ex::bulk_chunked(ex::schedule(sched), ex::seq, + num_tasks, [&](std::size_t b, std::size_t e) { + observed_begin = b; + observed_end = e; + execution_count++; + }); + + ex::sync_wait(std::move(bulk_snd)); + + // P3804R2 3.7: seq policy should produce exactly 1 call + // with f(0, shape, args...) + HPX_TEST_EQ(execution_count.load(), 1); + HPX_TEST_EQ(observed_begin, std::size_t(0)); + HPX_TEST_EQ(observed_end, num_tasks); + } + + // P3804R2: bulk_chunked with par policy creates multiple chunks + { + constexpr std::size_t num_tasks = 10000; + std::atomic chunk_count{0}; + std::atomic has_chunking{false}; + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto bulk_snd = ex::bulk_chunked(ex::schedule(sched), ex::par, + num_tasks, [&](std::size_t b, std::size_t e) { + chunk_count++; + if ((e - b) > 1) + has_chunking = true; + }); + + ex::sync_wait(std::move(bulk_snd)); + + // P3804R2 3.7: par policy should create multiple chunks when + // multiple threads are available + if (hpx::get_os_thread_count() > 1) + { + HPX_TEST(chunk_count.load() > 1); + HPX_TEST(has_chunking.load()); + } + else + { + HPX_TEST(chunk_count.load() >= 1); + } + } + + // P3804R2: bulk_unchunked with seq executes all items on same thread + { + constexpr std::size_t num_tasks = 50; + std::thread::id pool_ids[num_tasks]; + std::atomic execution_count{0}; + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto bulk_snd = ex::bulk_unchunked( + ex::schedule(sched), ex::seq, num_tasks, [&](std::size_t id) { + pool_ids[id] = std::this_thread::get_id(); + execution_count++; + }); + + ex::sync_wait(std::move(bulk_snd)); + + // P3804R2 3.7: seq policy should execute sequentially + // All items should execute on the same thread + HPX_TEST_EQ(execution_count.load(), static_cast(num_tasks)); + std::thread::id first_thread = pool_ids[0]; + for (std::size_t i = 1; i < num_tasks; ++i) + { + HPX_TEST_EQ(pool_ids[i], first_thread); + } + } + + // P3804R2: bulk_unchunked with par uses multiple threads + { + constexpr std::size_t num_tasks = 200; + std::thread::id pool_ids[num_tasks]; + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto bulk_snd = ex::bulk_unchunked(ex::schedule(sched), ex::par, + num_tasks, + [&](std::size_t id) { pool_ids[id] = std::this_thread::get_id(); }); + + ex::sync_wait(std::move(bulk_snd)); + + // P3804R2 3.7: par policy should use multiple threads when + // enough threads are available. With cooperative sync_wait the + // calling thread participates, so with few threads (e.g. 2) all + // work might run on a single thread. + for (auto tid : pool_ids) + { + HPX_TEST(tid != std::thread::id{}); + } + } + + // P3804R2: Verify all elements are processed exactly once with seq + { + constexpr std::size_t num_tasks = 100; + std::atomic counters[num_tasks]; + for (auto& c : counters) + c.store(0); + + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto bulk_snd = ex::bulk_unchunked(ex::schedule(sched), ex::seq, + num_tasks, [&](std::size_t id) { counters[id]++; }); + + ex::sync_wait(std::move(bulk_snd)); + + // Every element should be processed exactly once + for (std::size_t i = 0; i < num_tasks; ++i) + { + HPX_TEST_EQ(counters[i].load(), 1); + } + } + + // P3804R2: Verify all elements are processed exactly once with par + { + constexpr std::size_t num_tasks = 1000; + std::atomic counters[num_tasks]; + for (auto& c : counters) + c.store(0); + + ex::parallel_scheduler sched = ex::get_parallel_scheduler(); + + auto bulk_snd = ex::bulk_unchunked(ex::schedule(sched), ex::par, + num_tasks, [&](std::size_t id) { counters[id]++; }); + + ex::sync_wait(std::move(bulk_snd)); + + // Every element should be processed exactly once + for (std::size_t i = 0; i < num_tasks; ++i) + { + HPX_TEST_EQ(counters[i].load(), 1); + } + } + + return hpx::local::finalize(); +} + +int main(int argc, char* argv[]) +{ + HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0, + "HPX main exited with non-zero status"); + return hpx::util::report_errors(); +} diff --git a/libs/core/executors/tests/unit/thread_pool_scheduler.cpp b/libs/core/executors/tests/unit/thread_pool_scheduler.cpp index 12c6b098669d..7f71522598f0 100644 --- a/libs/core/executors/tests/unit/thread_pool_scheduler.cpp +++ b/libs/core/executors/tests/unit/thread_pool_scheduler.cpp @@ -59,9 +59,10 @@ struct is_thread_pool_bulk_sender : std::false_type }; template -struct is_thread_pool_bulk_sender> + bool IsChunked, bool IsParallel, bool IsUnsequenced> +struct is_thread_pool_bulk_sender< + hpx::execution::experimental::detail::thread_pool_bulk_sender> : std::true_type { }; @@ -1804,7 +1805,7 @@ void test_stdexec_domain_queries() // 3. Verify the domain type is thread_pool_domain static_assert( std::is_same_v>, - "scheduler domain should be thread_pool_domain"); + "scheduler domain should be thread_pool_domain"); // 4. Verify transform_sender produces thread_pool_bulk_sender for // bulk_chunked (proves the domain customization is picked up) { @@ -2139,6 +2140,18 @@ void test_completion_scheduler() "the completion scheduler should be a thread_pool_scheduler"); } + { + auto sender = ex::bulk( + ex::schedule(ex::thread_pool_scheduler{}), + hpx::execution::parallel_task_policy{}, 10, [](int) {}); + auto completion_scheduler = + ex::get_completion_scheduler(ex::get_env(sender)); + static_assert( + std::is_same_v, + ex::thread_pool_scheduler>, + "the completion scheduler should be a thread_pool_scheduler"); + } + { auto sender = ex::then( ex::bulk(ex::continues_on(ex::just(42), ex::thread_pool_scheduler{}), diff --git a/tests/performance/local/stream.cpp b/tests/performance/local/stream.cpp index cd6562554ced..3b66f2e9a764 100644 --- a/tests/performance/local/stream.cpp +++ b/tests/performance/local/stream.cpp @@ -603,10 +603,28 @@ int hpx_main(hpx::program_options::variables_map& vm) timing = run_benchmark<>(warmup_iterations, iterations, vector_size, std::move(alloc), std::move(policy)); } + else if (executor == 6) + { + // parallel_scheduler natively. + // Using it via scheduler_executor for parallel algorithms. + using executor_type = + hpx::execution::experimental::scheduler_executor< + hpx::execution::experimental::parallel_scheduler>; + + executor_type exec( + hpx::execution::experimental::get_parallel_scheduler()); + auto policy = hpx::execution::par.on(exec); + hpx::compute::host::detail::policy_allocator + alloc(policy); + + timing = run_benchmark<>(warmup_iterations, iterations, vector_size, + std::move(alloc), std::move(policy)); + } else { HPX_THROW_EXCEPTION(hpx::error::commandline_option_error, - "hpx_main", "Invalid executor id given (0-4 allowed"); + "hpx_main", "Invalid executor id given (0-6 allowed"); } } time_total = mysecond() - time_total; @@ -660,10 +678,10 @@ int hpx_main(hpx::program_options::variables_map& vm) "max,add_bytes,add_bw,add_avg,add_min,add_max,triad_bytes," "triad_bw,triad_avg,triad_min,triad_max\n"); } - std::size_t const num_executors = 6; + std::size_t const num_executors = 7; char const* executors[num_executors] = {"parallel-serial", "block", "parallel-parallel", "fork_join_executor", "scheduler_executor", - "block_fork_join_executor"}; + "block_fork_join_executor", "parallel_scheduler"}; hpx::util::format_to(std::cout, "{},{},{},", executors[executor], hpx::get_os_thread_count(), vector_size); }