From fbd38af8421d52bf5f04e81637e07c3fee5e322d Mon Sep 17 00:00:00 2001 From: Bhoomish Date: Wed, 4 Feb 2026 14:46:39 +0000 Subject: [PATCH 01/17] Fix bug #6647: Correct type handling in reduce Signed-off-by: Bhoomish --- .../hpx/parallel/algorithms/reduce.hpp | 7 +- .../tests/regressions/CMakeLists.txt | 40 +++++------ .../tests/regressions/reduce_6647.cpp | 69 +++++++++++++++++++ 3 files changed, 89 insertions(+), 27 deletions(-) create mode 100644 libs/core/algorithms/tests/regressions/reduce_6647.cpp diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp index 01e04e3d7379..b44ecafb2cc8 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp @@ -63,7 +63,7 @@ namespace hpx { /// with an execution policy object of type \a sequenced_policy /// execute in sequential order in the calling thread. /// - /// The reduce operations in the parallel \a copy_if algorithm invoked + /// The reduce operations in the parallel \a reduce algorithm invoked /// with an execution policy object of type \a parallel_policy /// or \a parallel_task_policy are permitted to execute in an unordered /// fashion in unspecified threads, and indeterminately sequenced @@ -418,9 +418,8 @@ namespace hpx::parallel { } auto f1 = [r](FwdIterB part_begin, std::size_t part_size) -> T { - T val = *part_begin; - return detail::sequential_reduce( - ++part_begin, --part_size, HPX_MOVE(val), r); + return reduce_partition( + part_begin, part_size, r); }; return util::partitioner::call( diff --git a/libs/core/algorithms/tests/regressions/CMakeLists.txt b/libs/core/algorithms/tests/regressions/CMakeLists.txt index 79fbcceea1cd..5ace1c4c6d3f 100644 --- a/libs/core/algorithms/tests/regressions/CMakeLists.txt +++ b/libs/core/algorithms/tests/regressions/CMakeLists.txt @@ -1,8 +1,8 @@ -# Copyright (c) 2014-2025 Hartmut Kaiser +#Copyright(c) 2014 - 2025 Hartmut Kaiser # -# SPDX-License-Identifier: BSL-1.0 -# Distributed under the Boost Software License, Version 1.0. (See accompanying -# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +#SPDX - License - Identifier : BSL - 1.0 +#Distributed under the Boost Software License, Version 1.0.(See accompanying +#file LICENSE_1_0.txt or copy at http: //www.boost.org/LICENSE_1_0.txt) set(tests chunk_size_variable_regression @@ -21,6 +21,7 @@ set(tests mismatch_differently_sized_ranges num_cores reduce_3641 + reduce_6647 scan_different_inits scan_non_commutative scan_shortlength @@ -34,28 +35,21 @@ set(tests ranges_facilities ) -if(HPX_WITH_DATAPAR) - list(APPEND tests for_each_datapar) -endif() + if (HPX_WITH_DATAPAR) list(APPEND tests for_each_datapar) endif() -foreach(test ${tests}) - set(sources ${test}.cpp) + foreach (test ${tests}) set(sources ${test}.cpp) - set(${test}_PARAMETERS THREADS_PER_LOCALITY 4) + set(${test} _PARAMETERS THREADS_PER_LOCALITY 4) - source_group("Source Files" FILES ${sources}) + source_group("Source Files" FILES ${sources}) - # add example executable - add_hpx_executable( - ${test}_test INTERNAL_FLAGS - SOURCES ${sources} - EXCLUDE_FROM_ALL ${${test}_FLAGS} - FOLDER "Tests/Regressions/Modules/Core/Algorithms/" - ) +#add example executable + add_hpx_executable(${test} _test INTERNAL_FLAGS SOURCES ${ + sources} EXCLUDE_FROM_ALL ${${test} _FLAGS} FOLDER + "Tests/Regressions/Modules/Core/Algorithms/") - target_link_libraries( - ${test}_test PRIVATE hpx_iterator_support_test_utilities - ) + target_link_libraries(${test} _test PRIVATE + hpx_iterator_support_test_utilities) - add_hpx_regression_test("modules.algorithms" ${test} ${${test}_PARAMETERS}) -endforeach() + add_hpx_regression_test("modules.algorithms" ${ + test} ${${test} _PARAMETERS}) endforeach() diff --git a/libs/core/algorithms/tests/regressions/reduce_6647.cpp b/libs/core/algorithms/tests/regressions/reduce_6647.cpp new file mode 100644 index 000000000000..b57cdd611da8 --- /dev/null +++ b/libs/core/algorithms/tests/regressions/reduce_6647.cpp @@ -0,0 +1,69 @@ +// Copyright (c) 2026 Bhoomish Gupta +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +// #6647:Incorrect reduce implementation + +#include +#include +#include + +#include +#include +#include + +struct minmax +{ + std::pair operator()( + std::pair lhs, std::pair rhs) const + { + return { + lhs.first < rhs.first ? lhs.first : rhs.first, + lhs.second < rhs.second ? rhs.second : lhs.second, + }; + } + + std::pair operator()(std::pair lhs, int rhs) const + { + return (*this)(lhs, std::pair{rhs, rhs}); + } + + std::pair operator()(int lhs, std::pair rhs) const + { + return (*this)(std::pair{lhs, lhs}, rhs); + } + + std::pair operator()(int lhs, int rhs) const + { + return (*this)( + std::pair{lhs, lhs}, std::pair{rhs, rhs}); + } +}; + +int hpx_main() +{ + std::vector c = {3, 1, 4, 1, 5, 9, 2, 6}; + + auto result = hpx::reduce(hpx::execution::seq, c.begin(), c.end(), + std::pair{INT_MAX, INT_MIN}, minmax{}); + + HPX_TEST_EQ(result.first, 1); + HPX_TEST_EQ(result.second, 9); + + result = hpx::reduce(hpx::execution::par, c.begin(), c.end(), + std::pair{INT_MAX, INT_MIN}, minmax{}); + + HPX_TEST_EQ(result.first, 1); + HPX_TEST_EQ(result.second, 9); + + return hpx::local::finalize(); +} + +int main(int argc, char* argv[]) +{ + HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0, + "HPX main exited with non-zero status"); + + return hpx::util::report_errors(); +} From 752d983fd3e56c973caf8d7e7281a1fff1854601 Mon Sep 17 00:00:00 2001 From: Bhoomish Date: Wed, 4 Feb 2026 20:09:51 +0000 Subject: [PATCH 02/17] Fixed Iterator Category Mismatch Signed-off-by: Bhoomish --- .../tests/regressions/CMakeLists.txt | 39 +++++++++++-------- .../tests/regressions/reduce_6647.cpp | 2 +- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/libs/core/algorithms/tests/regressions/CMakeLists.txt b/libs/core/algorithms/tests/regressions/CMakeLists.txt index 5ace1c4c6d3f..48e2507d9d62 100644 --- a/libs/core/algorithms/tests/regressions/CMakeLists.txt +++ b/libs/core/algorithms/tests/regressions/CMakeLists.txt @@ -1,8 +1,8 @@ -#Copyright(c) 2014 - 2025 Hartmut Kaiser +# Copyright (c) 2014-2025 Hartmut Kaiser # -#SPDX - License - Identifier : BSL - 1.0 -#Distributed under the Boost Software License, Version 1.0.(See accompanying -#file LICENSE_1_0.txt or copy at http: //www.boost.org/LICENSE_1_0.txt) +# SPDX-License-Identifier: BSL-1.0 +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) set(tests chunk_size_variable_regression @@ -35,21 +35,28 @@ set(tests ranges_facilities ) - if (HPX_WITH_DATAPAR) list(APPEND tests for_each_datapar) endif() +if(HPX_WITH_DATAPAR) + list(APPEND tests for_each_datapar) +endif() - foreach (test ${tests}) set(sources ${test}.cpp) +foreach(test ${tests}) + set(sources ${test}.cpp) - set(${test} _PARAMETERS THREADS_PER_LOCALITY 4) + set(${test}_PARAMETERS THREADS_PER_LOCALITY 4) - source_group("Source Files" FILES ${sources}) + source_group("Source Files" FILES ${sources}) -#add example executable - add_hpx_executable(${test} _test INTERNAL_FLAGS SOURCES ${ - sources} EXCLUDE_FROM_ALL ${${test} _FLAGS} FOLDER - "Tests/Regressions/Modules/Core/Algorithms/") + # add example executable + add_hpx_executable( + ${test}_test INTERNAL_FLAGS + SOURCES ${sources} + EXCLUDE_FROM_ALL ${${test}_FLAGS} + FOLDER "Tests/Regressions/Modules/Core/Algorithms/" + ) - target_link_libraries(${test} _test PRIVATE - hpx_iterator_support_test_utilities) + target_link_libraries( + ${test}_test PRIVATE hpx_iterator_support_test_utilities + ) - add_hpx_regression_test("modules.algorithms" ${ - test} ${${test} _PARAMETERS}) endforeach() + add_hpx_regression_test("modules.algorithms" ${test} ${${test}_PARAMETERS}) +endforeach() diff --git a/libs/core/algorithms/tests/regressions/reduce_6647.cpp b/libs/core/algorithms/tests/regressions/reduce_6647.cpp index b57cdd611da8..f971e2a2727c 100644 --- a/libs/core/algorithms/tests/regressions/reduce_6647.cpp +++ b/libs/core/algorithms/tests/regressions/reduce_6647.cpp @@ -1,5 +1,5 @@ // Copyright (c) 2026 Bhoomish Gupta -// +// #SPDX - License - Identifier : BSL - 1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) From 7f81ec521016757f97170e5d8c477ec04e3f9eb6 Mon Sep 17 00:00:00 2001 From: Bhoomish Date: Wed, 4 Feb 2026 20:41:32 +0000 Subject: [PATCH 03/17] Added test file in CMakeLists Signed-off-by: Bhoomish --- .../hpx/parallel/algorithms/reduce.hpp | 181 ++++++++++++++---- .../parallel/container_algorithms/reduce.hpp | 19 +- .../hpx/parallel/util/detail/chunk_size.hpp | 86 ++++++--- .../tests/regressions/reduce_6647.cpp | 3 +- .../executors/execution_parameters.hpp | 63 ++++++ .../executors/execution_parameters_fwd.hpp | 27 +++ 6 files changed, 301 insertions(+), 78 deletions(-) diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp index b44ecafb2cc8..c95d19664490 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp @@ -382,59 +382,156 @@ namespace hpx::parallel { namespace detail { /// \cond NOINTERNAL - HPX_CXX_CORE_EXPORT template - struct reduce : public algorithm, T> + + // Custom executor parameters for reduce algorithm to prevent single-element partitions. + // reduce_partition requires at least 2 elements per partition because it initializes + // the accumulator via op(*first, *next(first)), which is the only way to produce a T + // from value_type elements when T may differ from value_type (e.g. minmax). + struct reduce_executor_parameters { - constexpr reduce() noexcept - : algorithm("reduce") + template + HPX_FORCEINLINE constexpr std::pair + adjust_chunk_size_and_max_chunks(Executor&&, + std::size_t num_elements, std::size_t num_cores, + std::size_t /*max_chunks*/, + std::size_t chunk_size) const noexcept { + // Ensure minimum chunk size of 2 + if (chunk_size < 2) + { + chunk_size = (num_elements + num_cores - 1) / num_cores; + chunk_size = (std::max)(chunk_size, std::size_t(2)); + } + + // chunk_size_iterator gives the last partition num_elements % chunk_size + // elements (or chunk_size if evenly divisible). If the remainder is 1, + // that partition would violate reduce_partition's >= 2 requirement. + // Bump chunk_size until the remainder is 0 or >= 2. + while ( + num_elements > chunk_size && num_elements % chunk_size == 1) + { + chunk_size++; + } + + std::size_t new_max_chunks = + (num_elements + chunk_size - 1) / chunk_size; + + return {chunk_size, new_max_chunks}; } + }; + /// \endcond + } // namespace detail +} // namespace hpx::parallel + +// Specialize trait to make reduce_executor_parameters a valid executor parameters type +namespace hpx::execution::experimental { + + template <> + struct is_executor_parameters< + hpx::parallel::detail::reduce_executor_parameters> : std::true_type + { + }; +} // namespace hpx::execution::experimental + +namespace hpx::parallel { + namespace detail { + + // Helper function to reduce a partition without requiring an init value. + // Assumes partition size is always >= 2 (enforced by reduce_executor_parameters). + template + T reduce_partition( + FwdIterB part_begin, std::size_t part_size, Reduce const& r) + { + HPX_ASSERT(part_size >= 2); + + // Combine first two elements using the reduction operator + T init = HPX_INVOKE(r, *part_begin, *std::next(part_begin)); + + if (part_size == 2) + { + return init; + } + + // Reduce remaining elements + return sequential_reduce( + std::next(part_begin, 2), part_size - 2, HPX_MOVE(init), r); + } + + HPX_CXX_EXPORT template + struct reduce : public algorithm, T> + { + constexpr reduce() noexcept + : algorithm, T>("reduce") + { + } + + template + static constexpr T sequential(ExPolicy&& policy, InIterB first, + InIterE last, T_&& init, Reduce&& r) + { + return sequential_reduce( + HPX_FORWARD(ExPolicy, policy), first, last, + HPX_FORWARD(T_, init), HPX_FORWARD(Reduce, r)); + } + + template + static decltype(auto) parallel(ExPolicy&& policy, FwdIterB first, + FwdIterE last, T_&& init, Reduce&& r) + { + constexpr bool has_scheduler_executor = + hpx::execution_policy_has_scheduler_executor_v; - template - static constexpr T sequential(ExPolicy&& policy, InIterB first, - InIterE last, T_&& init, Reduce&& r) + // Handle empty range + if constexpr (!has_scheduler_executor) { - return detail::sequential_reduce( - HPX_FORWARD(ExPolicy, policy), first, last, - HPX_FORWARD(T_, init), HPX_FORWARD(Reduce, r)); + if (first == last) + { + return util::detail::algorithm_result::get( + HPX_FORWARD(T_, init)); + } } - template - static decltype(auto) parallel(ExPolicy&& policy, FwdIterB first, - FwdIterE last, T_&& init, Reduce&& r) + // Handle single-element case: can't partition into size >= 2 + // This must be checked for all execution policies + auto const count = distance(first, last); + if (count == 1) { - constexpr bool has_scheduler_executor = - hpx::execution_policy_has_scheduler_executor_v; - - if constexpr (!has_scheduler_executor) + T result = HPX_INVOKE(r, HPX_FORWARD(T_, init), *first); + if constexpr (has_scheduler_executor) { - if (first == last) - { - return util::detail::algorithm_result::get( - HPX_FORWARD(T_, init)); - } + return result; + } + else + { + return util::detail::algorithm_result::get( + HPX_MOVE(result)); } - - auto f1 = [r](FwdIterB part_begin, std::size_t part_size) -> T { - return reduce_partition( - part_begin, part_size, r); - }; - - return util::partitioner::call( - HPX_FORWARD(ExPolicy, policy), first, - detail::distance(first, last), HPX_MOVE(f1), - hpx::unwrapping( - [init = HPX_FORWARD(T_, init), - r = HPX_FORWARD(Reduce, r)](auto&& results) -> T { - return detail::sequential_reduce( - hpx::util::begin(results), - hpx::util::size(results), init, r); - })); } - }; - /// \endcond + + auto f1 = [r](FwdIterB part_begin, std::size_t part_size) -> T { + return reduce_partition( + part_begin, part_size, r); + }; + + auto reduce_policy = + policy.with(reduce_executor_parameters{}); + using reduce_policy_type = std::decay_t; + + return util::partitioner::call( + HPX_MOVE(reduce_policy), first, distance(first, last), + HPX_MOVE(f1), + hpx::unwrapping( + [init = HPX_FORWARD(T_, init), r = HPX_FORWARD(Reduce, r)]( + auto&& results) -> T { + return sequential_reduce( + hpx::util::begin(results), hpx::util::size(results), + init, r); + })); + } + }; + /// \endcond } // namespace detail } // namespace hpx::parallel diff --git a/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp index d4c62d0a07f4..6173205d1ca8 100644 --- a/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp @@ -805,7 +805,7 @@ namespace hpx::ranges { using value_type = typename std::iterator_traits::value_type; - return hpx::parallel::detail::reduce().call( + return hpx::parallel::reduce().call( HPX_FORWARD(ExPolicy, policy), first, last, value_type{}, std::plus{}); } @@ -830,7 +830,7 @@ namespace hpx::ranges { static_assert(std::forward_iterator, "Requires at least forward iterator."); - return hpx::parallel::detail::reduce().call( + return hpx::parallel::reduce().call( HPX_FORWARD(ExPolicy, policy), hpx::util::begin(rng), hpx::util::end(rng), value_type{}, std::plus{}); } @@ -876,8 +876,8 @@ namespace hpx::ranges { static_assert(std::input_iterator, "Requires at least input iterator."); - return hpx::parallel::detail::reduce().call(hpx::execution::seq, - first, last, HPX_MOVE(init), std::plus{}); + return hpx::parallel::detail::reduce().call(hpx::execution::seq, first, + last, HPX_MOVE(init), std::plus{}); } template ::value_type; - return hpx::parallel::detail::reduce().call( - hpx::execution::seq, first, last, value_type{}, - std::plus{}); + return hpx::parallel::reduce().call(hpx::execution::seq, + first, last, value_type{}, std::plus{}); } template @@ -926,9 +925,9 @@ namespace hpx::ranges { static_assert(std::input_iterator, "Requires at least input iterator."); - return hpx::parallel::detail::reduce().call( - hpx::execution::seq, hpx::util::begin(rng), hpx::util::end(rng), - value_type{}, std::plus{}); + return hpx::parallel::reduce().call(hpx::execution::seq, + hpx::util::begin(rng), hpx::util::end(rng), value_type{}, + std::plus{}); } } reduce{}; } // namespace hpx::ranges diff --git a/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp b/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp index e4d476a2def3..d87ffc262e6c 100644 --- a/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp +++ b/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp @@ -67,12 +67,12 @@ namespace hpx::parallel::util::detail { chunk_size = (count + cores_times_4 - 1) / cores_times_4; // we should not consider more chunks than we have elements - max_chunks = (std::min) (cores_times_4, count); // -V112 + max_chunks = (std::min)(cores_times_4, count); // -V112 // we should not make chunks smaller than what's determined by // the max chunk size - chunk_size = (std::max) (chunk_size, - (count + max_chunks - 1) / max_chunks); + chunk_size = (std::max)( + chunk_size, (count + max_chunks - 1) / max_chunks); } else { @@ -150,15 +150,24 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::get_chunk_size(policy.parameters(), policy.executor(), hpx::chrono::null_duration, cores, count); - // make sure, chunk size and max_chunks are consistent - adjust_chunk_size_and_max_chunks(cores, count, max_chunks, chunk_size); + auto [adj_chunk, adj_max] = + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + policy.parameters(), policy.executor(), count, cores, + max_chunks, chunk_size); + if (adj_chunk != 0) + chunk_size = adj_chunk; + if (adj_max != 0) + max_chunks = adj_max; + else if (adj_chunk == 0) + adjust_chunk_size_and_max_chunks( + cores, count, max_chunks, chunk_size); auto last = next_or_subrange(it_or_r, count, 0); Stride stride = parallel::detail::abs(s); if (stride != 1) { - chunk_size = (std::max) (static_cast(stride), + chunk_size = (std::max)(static_cast(stride), (chunk_size + stride - 1) / stride * stride); } @@ -202,13 +211,13 @@ namespace hpx::parallel::util::detail { if (stride != 1) { // rounding up - test_chunk_size = (std::max) (static_cast(stride), + test_chunk_size = (std::max)(static_cast(stride), (test_chunk_size + stride - 1) / stride * stride); } add_ready_future(workitems, f1, it_or_r, test_chunk_size); - test_chunk_size = (std::min) (count, test_chunk_size); + test_chunk_size = (std::min)(count, test_chunk_size); count -= test_chunk_size; it_or_r = next_or_subrange(it_or_r, test_chunk_size, count); @@ -232,14 +241,23 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::get_chunk_size(policy.parameters(), policy.executor(), iteration_duration, cores, count); - // make sure, chunk size and max_chunks are consistent - adjust_chunk_size_and_max_chunks(cores, count, max_chunks, chunk_size); + auto [adj_chunk, adj_max] = + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + policy.parameters(), policy.executor(), count, cores, + max_chunks, chunk_size); + if (adj_chunk != 0) + chunk_size = adj_chunk; + if (adj_max != 0) + max_chunks = adj_max; + else if (adj_chunk == 0) + adjust_chunk_size_and_max_chunks( + cores, count, max_chunks, chunk_size); auto last = next_or_subrange(it_or_r, count, 0); if (stride != 1) { - chunk_size = (std::max) (static_cast(stride), + chunk_size = (std::max)(static_cast(stride), (chunk_size + stride - 1) / stride * stride); } @@ -288,7 +306,7 @@ namespace hpx::parallel::util::detail { // we should not consider more chunks than we have elements if (max_chunks != 0) { - max_chunks = (std::min) (max_chunks, count); + max_chunks = (std::min)(max_chunks, count); } while (count != 0) @@ -304,16 +322,16 @@ namespace hpx::parallel::util::detail { if (stride != 1) { - chunk_size = (std::max) (static_cast(stride), + chunk_size = (std::max)(static_cast(stride), (chunk_size + stride - 1) / stride * stride); } // in last chunk, consider only remaining number of elements - std::size_t chunk = (std::min) (chunk_size, count); + std::size_t chunk = (std::min)(chunk_size, count); shape.emplace_back(it_or_r, chunk); - chunk = (std::min) (count, chunk); + chunk = (std::min)(count, chunk); count -= chunk; it_or_r = next_or_subrange(it_or_r, chunk, count); @@ -410,12 +428,21 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::get_chunk_size(policy.parameters(), policy.executor(), hpx::chrono::null_duration, cores, count); - // make sure, chunk size and max_chunks are consistent - adjust_chunk_size_and_max_chunks(cores, count, max_chunks, chunk_size); + auto [adj_chunk, adj_max] = + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + policy.parameters(), policy.executor(), count, cores, + max_chunks, chunk_size); + if (adj_chunk != 0) + chunk_size = adj_chunk; + if (adj_max != 0) + max_chunks = adj_max; + else if (adj_chunk == 0) + adjust_chunk_size_and_max_chunks( + cores, count, max_chunks, chunk_size); if (stride != 1) { - chunk_size = (std::max) (static_cast(stride), + chunk_size = (std::max)(static_cast(stride), static_cast( (chunk_size + stride - 1) / stride * stride)); } @@ -467,7 +494,7 @@ namespace hpx::parallel::util::detail { if (stride != 1) { - test_chunk_size = (std::max) (static_cast(stride), + test_chunk_size = (std::max)(static_cast(stride), (test_chunk_size + stride - 1) / stride * stride); } @@ -501,12 +528,21 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::get_chunk_size(policy.parameters(), policy.executor(), iteration_duration, cores, count); - // make sure, chunk size and max_chunks are consistent - adjust_chunk_size_and_max_chunks(cores, count, max_chunks, chunk_size); + auto [adj_chunk, adj_max] = + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + policy.parameters(), policy.executor(), count, cores, + max_chunks, chunk_size); + if (adj_chunk != 0) + chunk_size = adj_chunk; + if (adj_max != 0) + max_chunks = adj_max; + else if (adj_chunk == 0) + adjust_chunk_size_and_max_chunks( + cores, count, max_chunks, chunk_size); if (stride != 1) { - chunk_size = (std::max) (static_cast(stride), + chunk_size = (std::max)(static_cast(stride), (chunk_size + stride - 1) / stride * stride); } @@ -558,7 +594,7 @@ namespace hpx::parallel::util::detail { // we should not consider more chunks than we have elements if (max_chunks != 0) { - max_chunks = (std::min) (max_chunks, count); + max_chunks = (std::min)(max_chunks, count); } std::size_t base_idx = 0; @@ -575,12 +611,12 @@ namespace hpx::parallel::util::detail { if (stride != 1) { - chunk_size = (std::max) (static_cast(stride), + chunk_size = (std::max)(static_cast(stride), (chunk_size + stride - 1) / stride * stride); } // in last chunk, consider only remaining number of elements - std::size_t chunk = (std::min) (chunk_size, count); + std::size_t chunk = (std::min)(chunk_size, count); shape.emplace_back(first, chunk, base_idx); diff --git a/libs/core/algorithms/tests/regressions/reduce_6647.cpp b/libs/core/algorithms/tests/regressions/reduce_6647.cpp index f971e2a2727c..ee47427cbb3e 100644 --- a/libs/core/algorithms/tests/regressions/reduce_6647.cpp +++ b/libs/core/algorithms/tests/regressions/reduce_6647.cpp @@ -1,5 +1,6 @@ // Copyright (c) 2026 Bhoomish Gupta -// #SPDX - License - Identifier : BSL - 1.0 +// +// SPDX - License - Identifier : BSL - 1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) diff --git a/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp b/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp index 0624eb36ea8c..4a64c6990a79 100644 --- a/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp +++ b/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp @@ -599,6 +599,69 @@ namespace hpx::execution::experimental::detail { call(static_cast(params), HPX_FORWARD(Executor, exec)); } }; + + /////////////////////////////////////////////////////////////////////// + // define member traits + HPX_HAS_MEMBER_XXX_TRAIT_DEF(adjust_chunk_size_and_max_chunks) + + /////////////////////////////////////////////////////////////////////// + // default property implementation allowing to handle + // adjust_chunk_size_and_max_chunks + struct adjust_chunk_size_and_max_chunks_property + { + // default implementation + template + HPX_FORCEINLINE static constexpr std::pair + adjust_chunk_size_and_max_chunks( + Target, std::size_t, std::size_t, std::size_t, std::size_t) noexcept + { + // return zero which means no adjustment + return {0, 0}; // {adjusted_chunk_size, adjusted_max_chunks} + } + }; + + ////////////////////////////////////////////////////////////////////// + // Generate a type that is guaranteed to support + // adjust_chunk_size_and_max_chunks + using get_adjust_chunk_size_and_max_chunks_t = + get_parameters_property_t; + + inline constexpr get_adjust_chunk_size_and_max_chunks_t + get_adjust_chunk_size_and_max_chunks{}; + + /////////////////////////////////////////////////////////////////////// + // customization point for interface adjust_chunk_size_and_max_chunks() + template + struct adjust_chunk_size_and_max_chunks_fn_helper>> + { + template + HPX_FORCEINLINE static constexpr std::pair + call(Parameters& params, Executor&& exec, std::size_t num_elements, + std::size_t num_cores, std::size_t num_chunks, + std::size_t chunk_size) + { + auto get_prop = get_adjust_chunk_size_and_max_chunks( + HPX_FORWARD(Executor, exec), params, + adjust_chunk_size_and_max_chunks_property{}); + + return get_prop.first.adjust_chunk_size_and_max_chunks( + HPX_FORWARD(decltype(get_prop.second), get_prop.second), + num_elements, num_cores, num_chunks, chunk_size); + } + + template + HPX_FORCEINLINE static constexpr std::pair + call(AnyParameters params, Executor&& exec, std::size_t num_elements, + std::size_t num_cores, std::size_t num_chunks, + std::size_t chunk_size) + { + return call(static_cast(params), + HPX_FORWARD(Executor, exec), num_elements, num_cores, + num_chunks, chunk_size); + } + }; /// \endcond /// \cond NOINTERNAL diff --git a/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp b/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp index 05f0e5c07f5f..3bbc20f96f2a 100644 --- a/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp +++ b/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp @@ -60,6 +60,10 @@ namespace hpx::execution::experimental { template struct collect_execution_parameters_fn_helper; + + template + struct adjust_chunk_size_and_max_chunks_fn_helper; /// \endcond } // namespace detail @@ -422,6 +426,29 @@ namespace hpx::execution::experimental { } } collect_execution_parameters{}; + HPX_CXX_EXPORT inline constexpr struct adjust_chunk_size_and_max_chunks_t + final + : hpx::functional::detail::tag_priority< + adjust_chunk_size_and_max_chunks_t> + { + private: + template + requires(hpx::traits::is_executor_parameters_v && + hpx::traits::is_executor_any_v) + friend HPX_FORCEINLINE decltype(auto) tag_fallback_invoke( + adjust_chunk_size_and_max_chunks_t, Parameters&& params, + Executor&& exec, std::size_t num_elements, std::size_t num_cores, + std::size_t num_chunks, std::size_t chunk_size) + { + return detail::adjust_chunk_size_and_max_chunks_fn_helper< + hpx::util::decay_unwrap_t, + std::decay_t>::call(HPX_FORWARD(Parameters, params), + HPX_FORWARD(Executor, exec), num_elements, num_cores, + num_chunks, chunk_size); + } + + } adjust_chunk_size_and_max_chunks{}; + template <> struct is_scheduling_property : std::true_type From c76f72079ccd3be10d2bb458e889df9f7e01da77 Mon Sep 17 00:00:00 2001 From: Bhoomish Date: Wed, 11 Feb 2026 00:32:36 +0000 Subject: [PATCH 04/17] Fixed namespace error in previous commit Signed-off-by: Bhoomish --- .../hpx/parallel/algorithms/reduce.hpp | 20 ++++++++----------- .../parallel/container_algorithms/reduce.hpp | 19 +++++++++--------- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp index c95d19664490..a2a161bb9a31 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp @@ -433,8 +433,7 @@ namespace hpx::execution::experimental { }; } // namespace hpx::execution::experimental -namespace hpx::parallel { - namespace detail { +namespace hpx::parallel { namespace detail { // Helper function to reduce a partition without requiring an init value. // Assumes partition size is always >= 2 (enforced by reduce_executor_parameters). @@ -470,9 +469,8 @@ namespace hpx::parallel { static constexpr T sequential(ExPolicy&& policy, InIterB first, InIterE last, T_&& init, Reduce&& r) { - return sequential_reduce( - HPX_FORWARD(ExPolicy, policy), first, last, - HPX_FORWARD(T_, init), HPX_FORWARD(Reduce, r)); + return sequential_reduce(HPX_FORWARD(ExPolicy, policy), + first, last, HPX_FORWARD(T_, init), HPX_FORWARD(Reduce, r)); } template = 2 // This must be checked for all execution policies - auto const count = distance(first, last); + auto const count = hpx::parallel::detail::distance(first, last); if (count == 1) { T result = HPX_INVOKE(r, HPX_FORWARD(T_, init), *first); @@ -515,13 +513,12 @@ namespace hpx::parallel { part_begin, part_size, r); }; - auto reduce_policy = - policy.with(reduce_executor_parameters{}); + auto reduce_policy = policy.with(reduce_executor_parameters{}); using reduce_policy_type = std::decay_t; return util::partitioner::call( - HPX_MOVE(reduce_policy), first, distance(first, last), - HPX_MOVE(f1), + HPX_MOVE(reduce_policy), first, + hpx::parallel::detail::distance(first, last), HPX_MOVE(f1), hpx::unwrapping( [init = HPX_FORWARD(T_, init), r = HPX_FORWARD(Reduce, r)]( auto&& results) -> T { @@ -532,8 +529,7 @@ namespace hpx::parallel { } }; /// \endcond - } // namespace detail -} // namespace hpx::parallel +}} // namespace hpx::parallel::detail namespace hpx { diff --git a/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp index 6173205d1ca8..d4c62d0a07f4 100644 --- a/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp @@ -805,7 +805,7 @@ namespace hpx::ranges { using value_type = typename std::iterator_traits::value_type; - return hpx::parallel::reduce().call( + return hpx::parallel::detail::reduce().call( HPX_FORWARD(ExPolicy, policy), first, last, value_type{}, std::plus{}); } @@ -830,7 +830,7 @@ namespace hpx::ranges { static_assert(std::forward_iterator, "Requires at least forward iterator."); - return hpx::parallel::reduce().call( + return hpx::parallel::detail::reduce().call( HPX_FORWARD(ExPolicy, policy), hpx::util::begin(rng), hpx::util::end(rng), value_type{}, std::plus{}); } @@ -876,8 +876,8 @@ namespace hpx::ranges { static_assert(std::input_iterator, "Requires at least input iterator."); - return hpx::parallel::detail::reduce().call(hpx::execution::seq, first, - last, HPX_MOVE(init), std::plus{}); + return hpx::parallel::detail::reduce().call(hpx::execution::seq, + first, last, HPX_MOVE(init), std::plus{}); } template ::value_type; - return hpx::parallel::reduce().call(hpx::execution::seq, - first, last, value_type{}, std::plus{}); + return hpx::parallel::detail::reduce().call( + hpx::execution::seq, first, last, value_type{}, + std::plus{}); } template @@ -925,9 +926,9 @@ namespace hpx::ranges { static_assert(std::input_iterator, "Requires at least input iterator."); - return hpx::parallel::reduce().call(hpx::execution::seq, - hpx::util::begin(rng), hpx::util::end(rng), value_type{}, - std::plus{}); + return hpx::parallel::detail::reduce().call( + hpx::execution::seq, hpx::util::begin(rng), hpx::util::end(rng), + value_type{}, std::plus{}); } } reduce{}; } // namespace hpx::ranges From 8c843953b93f4cf1e13f1e09a22cecd18ec720bd Mon Sep 17 00:00:00 2001 From: Bhoomish Date: Wed, 4 Feb 2026 14:46:39 +0000 Subject: [PATCH 05/17] Fix bug #6647: Correct type handling in reduce Signed-off-by: Bhoomish --- .../hpx/parallel/algorithms/reduce.hpp | 2 +- .../tests/regressions/CMakeLists.txt | 54 +++++++++++-------- .../tests/regressions/reduce_6647.cpp | 3 ++ 3 files changed, 35 insertions(+), 24 deletions(-) diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp index a2a161bb9a31..2c8bafa1b87e 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp @@ -456,7 +456,7 @@ namespace hpx::parallel { namespace detail { std::next(part_begin, 2), part_size - 2, HPX_MOVE(init), r); } - HPX_CXX_EXPORT template + HPX_CXX_CORE_EXPORT template struct reduce : public algorithm, T> { constexpr reduce() noexcept diff --git a/libs/core/algorithms/tests/regressions/CMakeLists.txt b/libs/core/algorithms/tests/regressions/CMakeLists.txt index 48e2507d9d62..8abe90d4f87d 100644 --- a/libs/core/algorithms/tests/regressions/CMakeLists.txt +++ b/libs/core/algorithms/tests/regressions/CMakeLists.txt @@ -1,9 +1,10 @@ -# Copyright (c) 2014-2025 Hartmut Kaiser +#Copyright(c) 2014 - 2025 Hartmut Kaiser # -# SPDX-License-Identifier: BSL-1.0 -# Distributed under the Boost Software License, Version 1.0. (See accompanying -# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +#SPDX - License - Identifier : BSL - 1.0 +#Distributed under the Boost Software License, Version 1.0.(See accompanying +#file LICENSE_1_0.txt or copy at http: //www.boost.org/LICENSE_1_0.txt) +<<<<<<< HEAD set(tests chunk_size_variable_regression count_3646 @@ -34,29 +35,36 @@ set(tests transform_inclusive_scan_4787 ranges_facilities ) +======= +set(tests count_3646 fill_executor_5016 findfirstof_more_searched_for + find_proxy_support for_each_annotated_function for_each_on_main_thread + for_loop_2281 for_loop_5735 for_loop_with_auto_chunk_size + includes_empty_ranges minimal_findend + mismatch_differently_sized_ranges num_cores reduce_3641 + reduce_6647 scan_different_inits scan_non_commutative + scan_shortlength search_larger_2nd_range + search_zerolength set_operations_3442 + stable_merge_2964 static_chunker_2282 + transform_inclusive_scan_4786 + transform_inclusive_scan_4787 + ranges_facilities) +>>>>>>> 7cd0cb47ab (Fix bug #6647: Correct type handling in reduce) -if(HPX_WITH_DATAPAR) - list(APPEND tests for_each_datapar) -endif() + if (HPX_WITH_DATAPAR) list(APPEND tests for_each_datapar) endif() -foreach(test ${tests}) - set(sources ${test}.cpp) + foreach (test ${tests}) set(sources ${test}.cpp) - set(${test}_PARAMETERS THREADS_PER_LOCALITY 4) + set(${test} _PARAMETERS THREADS_PER_LOCALITY 4) - source_group("Source Files" FILES ${sources}) + source_group("Source Files" FILES ${sources}) - # add example executable - add_hpx_executable( - ${test}_test INTERNAL_FLAGS - SOURCES ${sources} - EXCLUDE_FROM_ALL ${${test}_FLAGS} - FOLDER "Tests/Regressions/Modules/Core/Algorithms/" - ) +#add example executable + add_hpx_executable(${test} _test INTERNAL_FLAGS SOURCES ${ + sources} EXCLUDE_FROM_ALL ${${test} _FLAGS} FOLDER + "Tests/Regressions/Modules/Core/Algorithms/") - target_link_libraries( - ${test}_test PRIVATE hpx_iterator_support_test_utilities - ) + target_link_libraries(${test} _test PRIVATE + hpx_iterator_support_test_utilities) - add_hpx_regression_test("modules.algorithms" ${test} ${${test}_PARAMETERS}) -endforeach() + add_hpx_regression_test("modules.algorithms" ${ + test} ${${test} _PARAMETERS}) endforeach() diff --git a/libs/core/algorithms/tests/regressions/reduce_6647.cpp b/libs/core/algorithms/tests/regressions/reduce_6647.cpp index ee47427cbb3e..12d20c07002d 100644 --- a/libs/core/algorithms/tests/regressions/reduce_6647.cpp +++ b/libs/core/algorithms/tests/regressions/reduce_6647.cpp @@ -1,6 +1,9 @@ // Copyright (c) 2026 Bhoomish Gupta // +<<<<<<< HEAD // SPDX - License - Identifier : BSL - 1.0 +======= +>>>>>>> 7cd0cb47ab (Fix bug #6647: Correct type handling in reduce) // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) From 516d7ae840ac85fecb6a8d3ab65c7a2c1060b6a5 Mon Sep 17 00:00:00 2001 From: Bhoomish Date: Wed, 4 Feb 2026 20:09:51 +0000 Subject: [PATCH 06/17] Fixed Iterator Category Mismatch Signed-off-by: Bhoomish --- .../tests/regressions/CMakeLists.txt | 54 ++++++++----------- .../tests/regressions/reduce_6647.cpp | 5 +- 2 files changed, 24 insertions(+), 35 deletions(-) diff --git a/libs/core/algorithms/tests/regressions/CMakeLists.txt b/libs/core/algorithms/tests/regressions/CMakeLists.txt index 8abe90d4f87d..48e2507d9d62 100644 --- a/libs/core/algorithms/tests/regressions/CMakeLists.txt +++ b/libs/core/algorithms/tests/regressions/CMakeLists.txt @@ -1,10 +1,9 @@ -#Copyright(c) 2014 - 2025 Hartmut Kaiser +# Copyright (c) 2014-2025 Hartmut Kaiser # -#SPDX - License - Identifier : BSL - 1.0 -#Distributed under the Boost Software License, Version 1.0.(See accompanying -#file LICENSE_1_0.txt or copy at http: //www.boost.org/LICENSE_1_0.txt) +# SPDX-License-Identifier: BSL-1.0 +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -<<<<<<< HEAD set(tests chunk_size_variable_regression count_3646 @@ -35,36 +34,29 @@ set(tests transform_inclusive_scan_4787 ranges_facilities ) -======= -set(tests count_3646 fill_executor_5016 findfirstof_more_searched_for - find_proxy_support for_each_annotated_function for_each_on_main_thread - for_loop_2281 for_loop_5735 for_loop_with_auto_chunk_size - includes_empty_ranges minimal_findend - mismatch_differently_sized_ranges num_cores reduce_3641 - reduce_6647 scan_different_inits scan_non_commutative - scan_shortlength search_larger_2nd_range - search_zerolength set_operations_3442 - stable_merge_2964 static_chunker_2282 - transform_inclusive_scan_4786 - transform_inclusive_scan_4787 - ranges_facilities) ->>>>>>> 7cd0cb47ab (Fix bug #6647: Correct type handling in reduce) - if (HPX_WITH_DATAPAR) list(APPEND tests for_each_datapar) endif() +if(HPX_WITH_DATAPAR) + list(APPEND tests for_each_datapar) +endif() - foreach (test ${tests}) set(sources ${test}.cpp) +foreach(test ${tests}) + set(sources ${test}.cpp) - set(${test} _PARAMETERS THREADS_PER_LOCALITY 4) + set(${test}_PARAMETERS THREADS_PER_LOCALITY 4) - source_group("Source Files" FILES ${sources}) + source_group("Source Files" FILES ${sources}) -#add example executable - add_hpx_executable(${test} _test INTERNAL_FLAGS SOURCES ${ - sources} EXCLUDE_FROM_ALL ${${test} _FLAGS} FOLDER - "Tests/Regressions/Modules/Core/Algorithms/") + # add example executable + add_hpx_executable( + ${test}_test INTERNAL_FLAGS + SOURCES ${sources} + EXCLUDE_FROM_ALL ${${test}_FLAGS} + FOLDER "Tests/Regressions/Modules/Core/Algorithms/" + ) - target_link_libraries(${test} _test PRIVATE - hpx_iterator_support_test_utilities) + target_link_libraries( + ${test}_test PRIVATE hpx_iterator_support_test_utilities + ) - add_hpx_regression_test("modules.algorithms" ${ - test} ${${test} _PARAMETERS}) endforeach() + add_hpx_regression_test("modules.algorithms" ${test} ${${test}_PARAMETERS}) +endforeach() diff --git a/libs/core/algorithms/tests/regressions/reduce_6647.cpp b/libs/core/algorithms/tests/regressions/reduce_6647.cpp index 12d20c07002d..efb49ae2e205 100644 --- a/libs/core/algorithms/tests/regressions/reduce_6647.cpp +++ b/libs/core/algorithms/tests/regressions/reduce_6647.cpp @@ -1,9 +1,6 @@ // Copyright (c) 2026 Bhoomish Gupta -// -<<<<<<< HEAD // SPDX - License - Identifier : BSL - 1.0 -======= ->>>>>>> 7cd0cb47ab (Fix bug #6647: Correct type handling in reduce) +// // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) From 4320f20aeb06bc3576a5082607602422d80ed9b4 Mon Sep 17 00:00:00 2001 From: Bhoomish Date: Wed, 4 Feb 2026 20:41:32 +0000 Subject: [PATCH 07/17] Added test file in CMakeLists Signed-off-by: Bhoomish --- .../parallel/container_algorithms/reduce.hpp | 19 +++++++++---------- .../tests/regressions/reduce_6647.cpp | 2 +- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp index d4c62d0a07f4..6173205d1ca8 100644 --- a/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp @@ -805,7 +805,7 @@ namespace hpx::ranges { using value_type = typename std::iterator_traits::value_type; - return hpx::parallel::detail::reduce().call( + return hpx::parallel::reduce().call( HPX_FORWARD(ExPolicy, policy), first, last, value_type{}, std::plus{}); } @@ -830,7 +830,7 @@ namespace hpx::ranges { static_assert(std::forward_iterator, "Requires at least forward iterator."); - return hpx::parallel::detail::reduce().call( + return hpx::parallel::reduce().call( HPX_FORWARD(ExPolicy, policy), hpx::util::begin(rng), hpx::util::end(rng), value_type{}, std::plus{}); } @@ -876,8 +876,8 @@ namespace hpx::ranges { static_assert(std::input_iterator, "Requires at least input iterator."); - return hpx::parallel::detail::reduce().call(hpx::execution::seq, - first, last, HPX_MOVE(init), std::plus{}); + return hpx::parallel::detail::reduce().call(hpx::execution::seq, first, + last, HPX_MOVE(init), std::plus{}); } template ::value_type; - return hpx::parallel::detail::reduce().call( - hpx::execution::seq, first, last, value_type{}, - std::plus{}); + return hpx::parallel::reduce().call(hpx::execution::seq, + first, last, value_type{}, std::plus{}); } template @@ -926,9 +925,9 @@ namespace hpx::ranges { static_assert(std::input_iterator, "Requires at least input iterator."); - return hpx::parallel::detail::reduce().call( - hpx::execution::seq, hpx::util::begin(rng), hpx::util::end(rng), - value_type{}, std::plus{}); + return hpx::parallel::reduce().call(hpx::execution::seq, + hpx::util::begin(rng), hpx::util::end(rng), value_type{}, + std::plus{}); } } reduce{}; } // namespace hpx::ranges diff --git a/libs/core/algorithms/tests/regressions/reduce_6647.cpp b/libs/core/algorithms/tests/regressions/reduce_6647.cpp index efb49ae2e205..ee47427cbb3e 100644 --- a/libs/core/algorithms/tests/regressions/reduce_6647.cpp +++ b/libs/core/algorithms/tests/regressions/reduce_6647.cpp @@ -1,6 +1,6 @@ // Copyright (c) 2026 Bhoomish Gupta -// SPDX - License - Identifier : BSL - 1.0 // +// SPDX - License - Identifier : BSL - 1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) From 43c57c7d8d501dff5fc5fda35cf24e420c5ec930 Mon Sep 17 00:00:00 2001 From: Bhoomish Date: Wed, 11 Feb 2026 00:32:36 +0000 Subject: [PATCH 08/17] Fixed namespace error in previous commit Signed-off-by: Bhoomish --- .../parallel/container_algorithms/reduce.hpp | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp index 6173205d1ca8..d4c62d0a07f4 100644 --- a/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/container_algorithms/reduce.hpp @@ -805,7 +805,7 @@ namespace hpx::ranges { using value_type = typename std::iterator_traits::value_type; - return hpx::parallel::reduce().call( + return hpx::parallel::detail::reduce().call( HPX_FORWARD(ExPolicy, policy), first, last, value_type{}, std::plus{}); } @@ -830,7 +830,7 @@ namespace hpx::ranges { static_assert(std::forward_iterator, "Requires at least forward iterator."); - return hpx::parallel::reduce().call( + return hpx::parallel::detail::reduce().call( HPX_FORWARD(ExPolicy, policy), hpx::util::begin(rng), hpx::util::end(rng), value_type{}, std::plus{}); } @@ -876,8 +876,8 @@ namespace hpx::ranges { static_assert(std::input_iterator, "Requires at least input iterator."); - return hpx::parallel::detail::reduce().call(hpx::execution::seq, first, - last, HPX_MOVE(init), std::plus{}); + return hpx::parallel::detail::reduce().call(hpx::execution::seq, + first, last, HPX_MOVE(init), std::plus{}); } template ::value_type; - return hpx::parallel::reduce().call(hpx::execution::seq, - first, last, value_type{}, std::plus{}); + return hpx::parallel::detail::reduce().call( + hpx::execution::seq, first, last, value_type{}, + std::plus{}); } template @@ -925,9 +926,9 @@ namespace hpx::ranges { static_assert(std::input_iterator, "Requires at least input iterator."); - return hpx::parallel::reduce().call(hpx::execution::seq, - hpx::util::begin(rng), hpx::util::end(rng), value_type{}, - std::plus{}); + return hpx::parallel::detail::reduce().call( + hpx::execution::seq, hpx::util::begin(rng), hpx::util::end(rng), + value_type{}, std::plus{}); } } reduce{}; } // namespace hpx::ranges From 9e48cb6329a2135ce6b5edea24066601df0830a0 Mon Sep 17 00:00:00 2001 From: Bhoomish Date: Thu, 12 Feb 2026 22:59:10 +0000 Subject: [PATCH 09/17] Clang Formatted Signed-off-by: Bhoomish --- .../hpx/parallel/algorithms/reduce.hpp | 2 +- .../hpx/parallel/util/detail/chunk_size.hpp | 34 +++++++++---------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp index 2c8bafa1b87e..d820684b4242 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp @@ -400,7 +400,7 @@ namespace hpx::parallel { if (chunk_size < 2) { chunk_size = (num_elements + num_cores - 1) / num_cores; - chunk_size = (std::max)(chunk_size, std::size_t(2)); + chunk_size = (std::max) (chunk_size, std::size_t(2)); } // chunk_size_iterator gives the last partition num_elements % chunk_size diff --git a/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp b/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp index d87ffc262e6c..9b1dd4ff89af 100644 --- a/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp +++ b/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp @@ -67,12 +67,12 @@ namespace hpx::parallel::util::detail { chunk_size = (count + cores_times_4 - 1) / cores_times_4; // we should not consider more chunks than we have elements - max_chunks = (std::min)(cores_times_4, count); // -V112 + max_chunks = (std::min) (cores_times_4, count); // -V112 // we should not make chunks smaller than what's determined by // the max chunk size - chunk_size = (std::max)( - chunk_size, (count + max_chunks - 1) / max_chunks); + chunk_size = (std::max) (chunk_size, + (count + max_chunks - 1) / max_chunks); } else { @@ -167,7 +167,7 @@ namespace hpx::parallel::util::detail { if (stride != 1) { - chunk_size = (std::max)(static_cast(stride), + chunk_size = (std::max) (static_cast(stride), (chunk_size + stride - 1) / stride * stride); } @@ -211,13 +211,13 @@ namespace hpx::parallel::util::detail { if (stride != 1) { // rounding up - test_chunk_size = (std::max)(static_cast(stride), + test_chunk_size = (std::max) (static_cast(stride), (test_chunk_size + stride - 1) / stride * stride); } add_ready_future(workitems, f1, it_or_r, test_chunk_size); - test_chunk_size = (std::min)(count, test_chunk_size); + test_chunk_size = (std::min) (count, test_chunk_size); count -= test_chunk_size; it_or_r = next_or_subrange(it_or_r, test_chunk_size, count); @@ -257,7 +257,7 @@ namespace hpx::parallel::util::detail { if (stride != 1) { - chunk_size = (std::max)(static_cast(stride), + chunk_size = (std::max) (static_cast(stride), (chunk_size + stride - 1) / stride * stride); } @@ -306,7 +306,7 @@ namespace hpx::parallel::util::detail { // we should not consider more chunks than we have elements if (max_chunks != 0) { - max_chunks = (std::min)(max_chunks, count); + max_chunks = (std::min) (max_chunks, count); } while (count != 0) @@ -322,16 +322,16 @@ namespace hpx::parallel::util::detail { if (stride != 1) { - chunk_size = (std::max)(static_cast(stride), + chunk_size = (std::max) (static_cast(stride), (chunk_size + stride - 1) / stride * stride); } // in last chunk, consider only remaining number of elements - std::size_t chunk = (std::min)(chunk_size, count); + std::size_t chunk = (std::min) (chunk_size, count); shape.emplace_back(it_or_r, chunk); - chunk = (std::min)(count, chunk); + chunk = (std::min) (count, chunk); count -= chunk; it_or_r = next_or_subrange(it_or_r, chunk, count); @@ -442,7 +442,7 @@ namespace hpx::parallel::util::detail { if (stride != 1) { - chunk_size = (std::max)(static_cast(stride), + chunk_size = (std::max) (static_cast(stride), static_cast( (chunk_size + stride - 1) / stride * stride)); } @@ -494,7 +494,7 @@ namespace hpx::parallel::util::detail { if (stride != 1) { - test_chunk_size = (std::max)(static_cast(stride), + test_chunk_size = (std::max) (static_cast(stride), (test_chunk_size + stride - 1) / stride * stride); } @@ -542,7 +542,7 @@ namespace hpx::parallel::util::detail { if (stride != 1) { - chunk_size = (std::max)(static_cast(stride), + chunk_size = (std::max) (static_cast(stride), (chunk_size + stride - 1) / stride * stride); } @@ -594,7 +594,7 @@ namespace hpx::parallel::util::detail { // we should not consider more chunks than we have elements if (max_chunks != 0) { - max_chunks = (std::min)(max_chunks, count); + max_chunks = (std::min) (max_chunks, count); } std::size_t base_idx = 0; @@ -611,12 +611,12 @@ namespace hpx::parallel::util::detail { if (stride != 1) { - chunk_size = (std::max)(static_cast(stride), + chunk_size = (std::max) (static_cast(stride), (chunk_size + stride - 1) / stride * stride); } // in last chunk, consider only remaining number of elements - std::size_t chunk = (std::min)(chunk_size, count); + std::size_t chunk = (std::min) (chunk_size, count); shape.emplace_back(first, chunk, base_idx); From 8cf4009e8ed91233d27e4bb08e7e8ca4c2636f01 Mon Sep 17 00:00:00 2001 From: Bhoomish Date: Thu, 12 Feb 2026 23:40:26 +0000 Subject: [PATCH 10/17] Added missing include for HPX_ASSERT in reduce.hpp Signed-off-by: Bhoomish --- libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp index d820684b4242..dd312771f55e 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp @@ -356,6 +356,7 @@ namespace hpx { #else // DOXYGEN #include +#include #include #include #include From bf3537e7263863806187f0a308283ab6d8b2fdfa Mon Sep 17 00:00:00 2001 From: BhoomishGupta Date: Wed, 18 Feb 2026 13:08:22 +0000 Subject: [PATCH 11/17] Enhance reduce algorithm documentation and add adjust_chunk_size_and_max_chunks functionality Signed-off-by: BhoomishGupta --- .../hpx/parallel/algorithms/reduce.hpp | 21 ++++++++------ .../executors/execution_parameters.hpp | 28 +++++++++++++++++++ .../executors/execution_parameters_fwd.hpp | 25 +++++++++++++++-- 3 files changed, 62 insertions(+), 12 deletions(-) diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp index dd312771f55e..1c7279c6229e 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp @@ -384,10 +384,11 @@ namespace hpx::parallel { /// \cond NOINTERNAL - // Custom executor parameters for reduce algorithm to prevent single-element partitions. - // reduce_partition requires at least 2 elements per partition because it initializes - // the accumulator via op(*first, *next(first)), which is the only way to produce a T - // from value_type elements when T may differ from value_type (e.g. minmax). + // Custom executor parameters for reduce algorithm to prevent + // single-element partitions. reduce_partition requires at least 2 + // elements per partition because it initializes the accumulator via + // op(*first, *next(first)), which is the only way to produce a T from + // value_type elements when T may differ from value_type (e.g. minmax). struct reduce_executor_parameters { template @@ -404,9 +405,10 @@ namespace hpx::parallel { chunk_size = (std::max) (chunk_size, std::size_t(2)); } - // chunk_size_iterator gives the last partition num_elements % chunk_size - // elements (or chunk_size if evenly divisible). If the remainder is 1, - // that partition would violate reduce_partition's >= 2 requirement. + // chunk_size_iterator gives the last partition + // num_elements % chunk_size elements (or chunk_size if + // evenly divisible). If the remainder is 1, that partition + // would violate reduce_partition's >= 2 requirement. // Bump chunk_size until the remainder is 0 or >= 2. while ( num_elements > chunk_size && num_elements % chunk_size == 1) @@ -424,7 +426,8 @@ namespace hpx::parallel { } // namespace detail } // namespace hpx::parallel -// Specialize trait to make reduce_executor_parameters a valid executor parameters type +// Specialize trait to make reduce_executor_parameters a valid executor +// parameters type namespace hpx::execution::experimental { template <> @@ -437,7 +440,7 @@ namespace hpx::execution::experimental { namespace hpx::parallel { namespace detail { // Helper function to reduce a partition without requiring an init value. - // Assumes partition size is always >= 2 (enforced by reduce_executor_parameters). + // Assumes partition size >= 2 (enforced by reduce_executor_parameters). template T reduce_partition( FwdIterB part_begin, std::size_t part_size, Reduce const& r) diff --git a/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp b/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp index 4a64c6990a79..fa05d59f36f1 100644 --- a/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp +++ b/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp @@ -884,6 +884,30 @@ namespace hpx::execution::experimental::detail { } }; + /////////////////////////////////////////////////////////////////////// + template + struct adjust_chunk_size_and_max_chunks_call_helper + { + }; + + template + struct adjust_chunk_size_and_max_chunks_call_helper>> + { + template + HPX_FORCEINLINE std::pair + adjust_chunk_size_and_max_chunks(Executor&& exec, + std::size_t num_elements, std::size_t num_cores, + std::size_t num_chunks, std::size_t chunk_size) + { + auto& wrapped = + static_cast*>(this)->member_.get(); + return wrapped.adjust_chunk_size_and_max_chunks( + HPX_FORWARD(Executor, exec), num_elements, num_cores, + num_chunks, chunk_size); + } + }; + /////////////////////////////////////////////////////////////////////// template struct base_member_helper @@ -909,6 +933,8 @@ namespace hpx::execution::experimental::detail { , processing_units_count_call_helper> , reset_thread_distribution_call_helper> , collect_execution_parameters_call_helper> + , adjust_chunk_size_and_max_chunks_call_helper> { using wrapper_type = std::reference_wrapper; @@ -948,6 +974,8 @@ namespace hpx::execution::experimental::detail { HPX_STATIC_ASSERT_ON_PARAMETERS_AMBIGUITY(maximal_number_of_chunks); HPX_STATIC_ASSERT_ON_PARAMETERS_AMBIGUITY(reset_thread_distribution); HPX_STATIC_ASSERT_ON_PARAMETERS_AMBIGUITY(collect_execution_parameters); + HPX_STATIC_ASSERT_ON_PARAMETERS_AMBIGUITY( + adjust_chunk_size_and_max_chunks); constexpr executor_parameters() requires(hpx::util::all_of_v...>) diff --git a/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp b/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp index 3bbc20f96f2a..db8d3d6693b2 100644 --- a/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp +++ b/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp @@ -426,8 +426,28 @@ namespace hpx::execution::experimental { } } collect_execution_parameters{}; - HPX_CXX_EXPORT inline constexpr struct adjust_chunk_size_and_max_chunks_t - final + /// Adjust the chunk size and maximal number of chunks for a parallel + /// algorithm execution + /// + /// \param params [in] The executor parameters object to use for + /// adjusting the chunk size. + /// \param exec [in] The executor object which will be used + /// for scheduling of the loop iterations. + /// \param num_elements [in] The overall number of elements for the + /// algorithm. + /// \param num_cores [in] The overall number of cores to utilize + /// for the algorithm. + /// \param num_chunks [in] The overall number of chunks for the + /// algorithm. + /// \param chunk_size [in] The size of the chunks created for the + /// algorithm. + /// + /// \note This calls params.adjust_chunk_size_and_max_chunks(exec, ...) + /// if it exists; otherwise it returns {0, 0} indicating no + /// adjustment. + /// + HPX_CXX_CORE_EXPORT inline constexpr struct + adjust_chunk_size_and_max_chunks_t final : hpx::functional::detail::tag_priority< adjust_chunk_size_and_max_chunks_t> { @@ -446,7 +466,6 @@ namespace hpx::execution::experimental { HPX_FORWARD(Executor, exec), num_elements, num_cores, num_chunks, chunk_size); } - } adjust_chunk_size_and_max_chunks{}; template <> From 65d6a1bd9801ad49a4ae05dc5e60464773ad8af1 Mon Sep 17 00:00:00 2001 From: BhoomishGupta Date: Sun, 22 Feb 2026 18:18:57 +0000 Subject: [PATCH 12/17] Refactor chunk size adjustment logic and enhance related tests Signed-off-by: BhoomishGupta --- .../hpx/parallel/util/detail/chunk_size.hpp | 36 ++------ .../executors/execution_parameters.hpp | 46 ++++++++-- .../executors/execution_parameters_fwd.hpp | 4 +- .../unit/executor_parameters_dispatching.cpp | 85 +++++++++++++++++++ 4 files changed, 136 insertions(+), 35 deletions(-) diff --git a/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp b/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp index 9b1dd4ff89af..7decbb5c8ebe 100644 --- a/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp +++ b/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp @@ -154,13 +154,8 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::adjust_chunk_size_and_max_chunks( policy.parameters(), policy.executor(), count, cores, max_chunks, chunk_size); - if (adj_chunk != 0) - chunk_size = adj_chunk; - if (adj_max != 0) - max_chunks = adj_max; - else if (adj_chunk == 0) - adjust_chunk_size_and_max_chunks( - cores, count, max_chunks, chunk_size); + chunk_size = adj_chunk; + max_chunks = adj_max; auto last = next_or_subrange(it_or_r, count, 0); Stride stride = parallel::detail::abs(s); @@ -245,13 +240,8 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::adjust_chunk_size_and_max_chunks( policy.parameters(), policy.executor(), count, cores, max_chunks, chunk_size); - if (adj_chunk != 0) - chunk_size = adj_chunk; - if (adj_max != 0) - max_chunks = adj_max; - else if (adj_chunk == 0) - adjust_chunk_size_and_max_chunks( - cores, count, max_chunks, chunk_size); + chunk_size = adj_chunk; + max_chunks = adj_max; auto last = next_or_subrange(it_or_r, count, 0); @@ -432,13 +422,8 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::adjust_chunk_size_and_max_chunks( policy.parameters(), policy.executor(), count, cores, max_chunks, chunk_size); - if (adj_chunk != 0) - chunk_size = adj_chunk; - if (adj_max != 0) - max_chunks = adj_max; - else if (adj_chunk == 0) - adjust_chunk_size_and_max_chunks( - cores, count, max_chunks, chunk_size); + chunk_size = adj_chunk; + max_chunks = adj_max; if (stride != 1) { @@ -532,13 +517,8 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::adjust_chunk_size_and_max_chunks( policy.parameters(), policy.executor(), count, cores, max_chunks, chunk_size); - if (adj_chunk != 0) - chunk_size = adj_chunk; - if (adj_max != 0) - max_chunks = adj_max; - else if (adj_chunk == 0) - adjust_chunk_size_and_max_chunks( - cores, count, max_chunks, chunk_size); + chunk_size = adj_chunk; + max_chunks = adj_max; if (stride != 1) { diff --git a/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp b/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp index fa05d59f36f1..3902dbd89dc6 100644 --- a/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp +++ b/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp @@ -609,14 +609,50 @@ namespace hpx::execution::experimental::detail { // adjust_chunk_size_and_max_chunks struct adjust_chunk_size_and_max_chunks_property { - // default implementation template HPX_FORCEINLINE static constexpr std::pair - adjust_chunk_size_and_max_chunks( - Target, std::size_t, std::size_t, std::size_t, std::size_t) noexcept + adjust_chunk_size_and_max_chunks(Target, std::size_t num_elements, + std::size_t num_cores, std::size_t max_chunks, + std::size_t chunk_size) noexcept { - // return zero which means no adjustment - return {0, 0}; // {adjusted_chunk_size, adjusted_max_chunks} + if (max_chunks == 0) + { + if (chunk_size == 0) + { + std::size_t const cores_times_4 = 4 * num_cores; // -V112 + + chunk_size = + (num_elements + cores_times_4 - 1) / cores_times_4; + + max_chunks = + (std::min) (cores_times_4, num_elements); // -V112 + + chunk_size = (std::max) (chunk_size, + (num_elements + max_chunks - 1) / max_chunks); + } + else + { + // max_chunks == 0 && chunk_size != 0 + max_chunks = (num_elements + chunk_size - 1) / chunk_size; + } + } + else if (chunk_size == 0) + { + chunk_size = (num_elements + max_chunks - 1) / max_chunks; + } + else + { + // max_chunks != 0 && chunk_size != 0 + std::size_t const calculated_max_chunks = + (num_elements + chunk_size - 1) / chunk_size; + + if (calculated_max_chunks > max_chunks) + { + chunk_size = (num_elements + max_chunks - 1) / max_chunks; + } + } + + return {chunk_size, max_chunks}; } }; diff --git a/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp b/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp index db8d3d6693b2..e39d4be90f60 100644 --- a/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp +++ b/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp @@ -443,8 +443,8 @@ namespace hpx::execution::experimental { /// algorithm. /// /// \note This calls params.adjust_chunk_size_and_max_chunks(exec, ...) - /// if it exists; otherwise it returns {0, 0} indicating no - /// adjustment. + /// if it exists; otherwise it applies the default chunk size and + /// max chunks adjustment logic. /// HPX_CXX_CORE_EXPORT inline constexpr struct adjust_chunk_size_and_max_chunks_t final diff --git a/libs/core/execution/tests/unit/executor_parameters_dispatching.cpp b/libs/core/execution/tests/unit/executor_parameters_dispatching.cpp index d97a4de0b677..7a7f0b43ae40 100644 --- a/libs/core/execution/tests/unit/executor_parameters_dispatching.cpp +++ b/libs/core/execution/tests/unit/executor_parameters_dispatching.cpp @@ -651,6 +651,91 @@ void test_mark_end_execution() } } +/////////////////////////////////////////////////////////////////////////////// +// adjust_chunk_size_and_max_chunks + +struct test_executor_adjust_chunk_size : hpx::execution::parallel_executor +{ + test_executor_adjust_chunk_size() = default; + + template + friend std::pair tag_invoke( + hpx::execution::experimental::adjust_chunk_size_and_max_chunks_t, + Parameters&&, test_executor_adjust_chunk_size, + std::size_t /*num_elements*/, std::size_t /*num_cores*/, + std::size_t /*num_chunks*/, std::size_t /*chunk_size*/) + { + ++exec_count; + return {1, 1}; + } +}; + +template <> +struct hpx::execution::experimental::is_two_way_executor< + test_executor_adjust_chunk_size> : std::true_type +{ +}; + +struct test_adjust_chunk_size +{ + template + friend std::pair tag_override_invoke( + hpx::execution::experimental::adjust_chunk_size_and_max_chunks_t, + test_adjust_chunk_size, Executor&&, std::size_t /*num_elements*/, + std::size_t /*num_cores*/, std::size_t /*num_chunks*/, + std::size_t /*chunk_size*/) + { + ++params_count; + return {1, 1}; + } +}; + +template <> +struct hpx::execution::experimental::is_executor_parameters< + test_adjust_chunk_size> : std::true_type +{ +}; + +/////////////////////////////////////////////////////////////////////////////// +void test_adjust_chunk_size_and_max_chunks() +{ + { + params_count = 0; + exec_count = 0; + + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + test_adjust_chunk_size{}, hpx::execution::par.executor(), 100, 4, 0, + 0); + + HPX_TEST_EQ(params_count, static_cast(1)); + HPX_TEST_EQ(exec_count, static_cast(0)); + } + + { + params_count = 0; + exec_count = 0; + + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + hpx::execution::par.parameters(), test_executor_adjust_chunk_size{}, + 100, 4, 0, 0); + + HPX_TEST_EQ(params_count, static_cast(0)); + HPX_TEST_EQ(exec_count, static_cast(1)); + } + + { + params_count = 0; + exec_count = 0; + + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + test_adjust_chunk_size{}, test_executor_adjust_chunk_size{}, 100, 4, + 0, 0); + + HPX_TEST_EQ(params_count, static_cast(1)); + HPX_TEST_EQ(exec_count, static_cast(0)); + } +} + /////////////////////////////////////////////////////////////////////////////// int hpx_main() { From 85e2e2ef40eb74fbaa0ecb7f2b098a914b730bbe Mon Sep 17 00:00:00 2001 From: BhoomishGupta Date: Tue, 24 Feb 2026 18:35:53 +0000 Subject: [PATCH 13/17] Add rebind_executor_parameters functionality and update related includes Signed-off-by: BhoomishGupta --- .../hpx/parallel/algorithms/reduce.hpp | 12 ++++++-- .../hpx/parallel/util/detail/chunk_size.hpp | 29 +++++++++---------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp index 1c7279c6229e..2ff2506f28d9 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp @@ -123,7 +123,7 @@ namespace hpx { /// with an execution policy object of type \a sequenced_policy /// execute in sequential order in the calling thread. /// - /// The reduce operations in the parallel \a copy_if algorithm invoked + /// The reduce operations in the parallel \a reduce algorithm invoked /// with an execution policy object of type \a parallel_policy /// or \a parallel_task_policy are permitted to execute in an unordered /// fashion in unspecified threads, and indeterminately sequenced @@ -357,6 +357,7 @@ namespace hpx { #include #include +#include #include #include #include @@ -517,12 +518,17 @@ namespace hpx::parallel { namespace detail { part_begin, part_size, r); }; - auto reduce_policy = policy.with(reduce_executor_parameters{}); + auto rebound_params = + hpx::execution::experimental::rebind_executor_parameters( + policy.parameters(), reduce_executor_parameters{}); + auto reduce_policy = + hpx::execution::experimental::create_rebound_policy( + policy, HPX_MOVE(rebound_params)); using reduce_policy_type = std::decay_t; return util::partitioner::call( HPX_MOVE(reduce_policy), first, - hpx::parallel::detail::distance(first, last), HPX_MOVE(f1), + count, HPX_MOVE(f1), hpx::unwrapping( [init = HPX_FORWARD(T_, init), r = HPX_FORWARD(Reduce, r)]( auto&& results) -> T { diff --git a/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp b/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp index 7decbb5c8ebe..466fc06f588e 100644 --- a/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp +++ b/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -150,12 +151,10 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::get_chunk_size(policy.parameters(), policy.executor(), hpx::chrono::null_duration, cores, count); - auto [adj_chunk, adj_max] = + std::tie(chunk_size, max_chunks) = hpx::execution::experimental::adjust_chunk_size_and_max_chunks( policy.parameters(), policy.executor(), count, cores, max_chunks, chunk_size); - chunk_size = adj_chunk; - max_chunks = adj_max; auto last = next_or_subrange(it_or_r, count, 0); Stride stride = parallel::detail::abs(s); @@ -236,12 +235,10 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::get_chunk_size(policy.parameters(), policy.executor(), iteration_duration, cores, count); - auto [adj_chunk, adj_max] = + std::tie(chunk_size, max_chunks) = hpx::execution::experimental::adjust_chunk_size_and_max_chunks( policy.parameters(), policy.executor(), count, cores, max_chunks, chunk_size); - chunk_size = adj_chunk; - max_chunks = adj_max; auto last = next_or_subrange(it_or_r, count, 0); @@ -307,8 +304,10 @@ namespace hpx::parallel::util::detail { hpx::chrono::null_duration, cores, count); // make sure, chunk size and max_chunks are consistent - adjust_chunk_size_and_max_chunks( - cores, count, max_chunks, chunk_size, true); + std::tie(chunk_size, max_chunks) = + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + policy.parameters(), policy.executor(), count, cores, + max_chunks, chunk_size); if (stride != 1) { @@ -418,12 +417,10 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::get_chunk_size(policy.parameters(), policy.executor(), hpx::chrono::null_duration, cores, count); - auto [adj_chunk, adj_max] = + std::tie(chunk_size, max_chunks) = hpx::execution::experimental::adjust_chunk_size_and_max_chunks( policy.parameters(), policy.executor(), count, cores, max_chunks, chunk_size); - chunk_size = adj_chunk; - max_chunks = adj_max; if (stride != 1) { @@ -513,12 +510,10 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::get_chunk_size(policy.parameters(), policy.executor(), iteration_duration, cores, count); - auto [adj_chunk, adj_max] = + std::tie(chunk_size, max_chunks) = hpx::execution::experimental::adjust_chunk_size_and_max_chunks( policy.parameters(), policy.executor(), count, cores, max_chunks, chunk_size); - chunk_size = adj_chunk; - max_chunks = adj_max; if (stride != 1) { @@ -586,8 +581,10 @@ namespace hpx::parallel::util::detail { hpx::chrono::null_duration, cores, count); // make sure, chunk size and max_chunks are consistent - adjust_chunk_size_and_max_chunks( - cores, count, max_chunks, chunk_size, true); + std::tie(chunk_size, max_chunks) = + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + policy.parameters(), policy.executor(), count, cores, + max_chunks, chunk_size); if (stride != 1) { From cc46be85d0b62c095a30cdbb93426c0227338cd1 Mon Sep 17 00:00:00 2001 From: BhoomishGupta Date: Tue, 24 Feb 2026 18:35:53 +0000 Subject: [PATCH 14/17] Add rebind_executor_parameters functionality and update related includes Signed-off-by: BhoomishGupta --- libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp index 2ff2506f28d9..554cfa301dd6 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp @@ -528,7 +528,7 @@ namespace hpx::parallel { namespace detail { return util::partitioner::call( HPX_MOVE(reduce_policy), first, - count, HPX_MOVE(f1), + hpx::parallel::detail::distance(first, last), HPX_MOVE(f1), hpx::unwrapping( [init = HPX_FORWARD(T_, init), r = HPX_FORWARD(Reduce, r)]( auto&& results) -> T { From 81d246efe725cc15eb434cd3fd1157eef8afe22d Mon Sep 17 00:00:00 2001 From: BhoomishGupta Date: Sat, 4 Apr 2026 10:16:37 +0530 Subject: [PATCH 15/17] Testing (Stash) Signed-off-by: BhoomishGupta --- .../tests/regressions/reduce_6647.cpp | 31 ++- .../executors/rebind_executor_parameters.hpp | 69 +++++- .../unit/executor_parameters_dispatching.cpp | 1 + .../tests/unit/rebind_executor_parameters.cpp | 209 ++++++++++++++++++ 4 files changed, 297 insertions(+), 13 deletions(-) diff --git a/libs/core/algorithms/tests/regressions/reduce_6647.cpp b/libs/core/algorithms/tests/regressions/reduce_6647.cpp index ee47427cbb3e..88b603a438e5 100644 --- a/libs/core/algorithms/tests/regressions/reduce_6647.cpp +++ b/libs/core/algorithms/tests/regressions/reduce_6647.cpp @@ -42,21 +42,30 @@ struct minmax } }; -int hpx_main() +void test_reduce_case( + std::vector const& c, std::pair const& expected) { - std::vector c = {3, 1, 4, 1, 5, 9, 2, 6}; - - auto result = hpx::reduce(hpx::execution::seq, c.begin(), c.end(), - std::pair{INT_MAX, INT_MIN}, minmax{}); + auto const init = std::pair{INT_MAX, INT_MIN}; - HPX_TEST_EQ(result.first, 1); - HPX_TEST_EQ(result.second, 9); + auto result = + hpx::reduce(hpx::execution::seq, c.begin(), c.end(), init, minmax{}); + HPX_TEST_EQ(result.first, expected.first); + HPX_TEST_EQ(result.second, expected.second); - result = hpx::reduce(hpx::execution::par, c.begin(), c.end(), - std::pair{INT_MAX, INT_MIN}, minmax{}); + result = + hpx::reduce(hpx::execution::par, c.begin(), c.end(), init, minmax{}); + HPX_TEST_EQ(result.first, expected.first); + HPX_TEST_EQ(result.second, expected.second); +} - HPX_TEST_EQ(result.first, 1); - HPX_TEST_EQ(result.second, 9); +int hpx_main() +{ + test_reduce_case({}, {INT_MAX, INT_MIN}); + test_reduce_case({5}, {5, 5}); + test_reduce_case({3, 1}, {1, 3}); + test_reduce_case({3, 1, 4}, {1, 4}); + test_reduce_case({9, 2, 7, 1, 6}, {1, 9}); + test_reduce_case({3, 1, 4, 1, 5, 9, 2, 6}, {1, 9}); return hpx::local::finalize(); } diff --git a/libs/core/execution/include/hpx/execution/executors/rebind_executor_parameters.hpp b/libs/core/execution/include/hpx/execution/executors/rebind_executor_parameters.hpp index 40b1aff2f1b3..31dc919ff556 100644 --- a/libs/core/execution/include/hpx/execution/executors/rebind_executor_parameters.hpp +++ b/libs/core/execution/include/hpx/execution/executors/rebind_executor_parameters.hpp @@ -16,6 +16,7 @@ #include #include #include +#include namespace hpx::execution::experimental { @@ -240,6 +241,22 @@ namespace hpx::execution::experimental { collect_execution_parameters_t, Params&&, InnerParams&&, Executor&&, std::size_t, std::size_t, std::size_t, std::size_t>; + // wrapping adjust_chunk_size_and_max_chunks + HPX_CXX_CORE_EXPORT template + inline constexpr bool supports_adjust_chunk_size_and_max_chunks_v = + hpx::functional::detail::is_tag_override_invocable_v< + adjust_chunk_size_and_max_chunks_t, Params&&, Executor&&, + std::size_t, std::size_t, std::size_t, std::size_t> || + hpx::execution::experimental::detail:: + has_adjust_chunk_size_and_max_chunks_v>; + + HPX_CXX_CORE_EXPORT template + inline constexpr bool supports_wrapping_adjust_chunk_size_and_max_chunks_v = + hpx::functional::detail::is_tag_override_invocable_v< + adjust_chunk_size_and_max_chunks_t, Params&&, InnerParams&&, + Executor&&, std::size_t, std::size_t, std::size_t, std::size_t>; + /////////////////////////////////////////////////////////////////////////// HPX_CXX_CORE_EXPORT template requires(hpx::executor_parameters && @@ -402,8 +419,8 @@ namespace hpx::execution::experimental { detail::wrapped_forward(this_), HPX_FORWARD(Executor, exec)); } - if constexpr (supports_mark_begin_execution_v< - detail::wrapping_t, Executor>) + else if constexpr (supports_mark_begin_execution_v< + detail::wrapping_t, Executor>) { mark_begin_execution(detail::wrapping_forward(this_), HPX_FORWARD(Executor, exec)); @@ -622,6 +639,54 @@ namespace hpx::execution::experimental { } } + // wrapping adjust_chunk_size_and_max_chunks + + // clang-format off + template + requires(std::same_as> && ( + supports_wrapping_adjust_chunk_size_and_max_chunks_v< + detail::wrapping_t, detail::wrapped_t, + Executor> || + supports_adjust_chunk_size_and_max_chunks_v< + detail::wrapping_t, Executor> || + supports_adjust_chunk_size_and_max_chunks_v< + detail::wrapped_t, Executor> + )) + // clang-format on + friend constexpr std::pair + tag_override_invoke( + hpx::execution::experimental::adjust_chunk_size_and_max_chunks_t, + Params&& this_, Executor&& exec, std::size_t const num_elements, + std::size_t const num_cores, std::size_t const num_chunks, + std::size_t const chunk_size) + { + if constexpr (supports_wrapping_adjust_chunk_size_and_max_chunks_v< + detail::wrapping_t, + detail::wrapped_t, Executor>) + { + return adjust_chunk_size_and_max_chunks( + detail::wrapping_forward(this_), + detail::wrapped_forward(this_), + HPX_FORWARD(Executor, exec), num_elements, num_cores, + num_chunks, chunk_size); + } + else if constexpr (supports_adjust_chunk_size_and_max_chunks_v< + detail::wrapping_t, Executor>) + { + return adjust_chunk_size_and_max_chunks( + detail::wrapping_forward(this_), + HPX_FORWARD(Executor, exec), num_elements, num_cores, + num_chunks, chunk_size); + } + else + { + return adjust_chunk_size_and_max_chunks( + detail::wrapped_forward(this_), + HPX_FORWARD(Executor, exec), num_elements, num_cores, + num_chunks, chunk_size); + } + } + Wrapped wrapped; Wrapping wrapping; }; diff --git a/libs/core/execution/tests/unit/executor_parameters_dispatching.cpp b/libs/core/execution/tests/unit/executor_parameters_dispatching.cpp index 7a7f0b43ae40..cc4921ff5955 100644 --- a/libs/core/execution/tests/unit/executor_parameters_dispatching.cpp +++ b/libs/core/execution/tests/unit/executor_parameters_dispatching.cpp @@ -747,6 +747,7 @@ int hpx_main() test_mark_begin_execution(); test_mark_end_of_scheduling(); test_mark_end_execution(); + test_adjust_chunk_size_and_max_chunks(); return hpx::local::finalize(); } diff --git a/libs/core/execution/tests/unit/rebind_executor_parameters.cpp b/libs/core/execution/tests/unit/rebind_executor_parameters.cpp index 2dc47c3c8a49..33bad6bb0e81 100644 --- a/libs/core/execution/tests/unit/rebind_executor_parameters.cpp +++ b/libs/core/execution/tests/unit/rebind_executor_parameters.cpp @@ -793,6 +793,213 @@ void replace_collect_execution_parameters() } } +/////////////////////////////////////////////////////////////////////////////// +// test parameters object with adjust_chunk_size_and_max_chunks +struct test_replaced_adjust_chunk_size_and_max_chunks +{ + explicit test_replaced_adjust_chunk_size_and_max_chunks( + std::atomic& invoked) noexcept + : invoked(&invoked) + { + } + + template + friend std::pair tag_override_invoke( + hpx::execution::experimental::adjust_chunk_size_and_max_chunks_t, + test_replaced_adjust_chunk_size_and_max_chunks& self, Executor&&, + std::size_t num_elements, std::size_t num_cores, std::size_t num_chunks, + std::size_t chunk_size) noexcept + { + *self.invoked = true; + + if (chunk_size == 0) + { + chunk_size = (num_elements + num_cores - 1) / num_cores; + } + + if (chunk_size == 0) + { + chunk_size = 1; + } + + if (num_chunks == 0) + { + num_chunks = (num_elements + chunk_size - 1) / chunk_size; + } + + return {chunk_size, num_chunks}; + } + + std::atomic* invoked; +}; + +struct test_wrapping_adjust_chunk_size_and_max_chunks +{ + explicit test_wrapping_adjust_chunk_size_and_max_chunks( + std::atomic& invoked) noexcept + : invoked(&invoked) + { + } + + template + friend std::pair tag_override_invoke( + hpx::execution::experimental::adjust_chunk_size_and_max_chunks_t, + test_wrapping_adjust_chunk_size_and_max_chunks& self, + InnerParams&& inner, Executor&& exec, std::size_t num_elements, + std::size_t num_cores, std::size_t num_chunks, std::size_t chunk_size) + { + auto result = + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + HPX_FORWARD(InnerParams, inner), HPX_FORWARD(Executor, exec), + num_elements, num_cores, num_chunks, chunk_size); + + *self.invoked = true; + return result; + } + + std::atomic* invoked; +}; + +namespace hpx::execution::experimental { + + template <> + struct is_executor_parameters< + test_replaced_adjust_chunk_size_and_max_chunks> : std::true_type + { + }; + + template <> + struct is_executor_parameters< + test_wrapping_adjust_chunk_size_and_max_chunks> : std::true_type + { + }; +} // namespace hpx::execution::experimental + +void replace_adjust_chunk_size_and_max_chunks() +{ + using namespace hpx::execution; + using namespace hpx::execution::experimental; + + // replace chunk adjustment with another parameters object exposing it + { + std::atomic invoked_replaced(false); + + auto params = + join_executor_parameters(experimental::static_chunk_size()); + auto bound_params = rebind_executor_parameters(params, + test_replaced_adjust_chunk_size_and_max_chunks(invoked_replaced)); + auto policy = create_rebound_policy(par, bound_params); + parameters_test(policy); + + HPX_TEST(invoked_replaced); + } + + // replace a parameters object not exposing chunk adjustment + { + std::atomic invoked_replaced(false); + + auto params = join_executor_parameters(experimental::max_num_chunks()); + auto bound_params = rebind_executor_parameters(params, + test_replaced_adjust_chunk_size_and_max_chunks(invoked_replaced)); + auto policy = create_rebound_policy(par, bound_params); + parameters_test(policy); + + HPX_TEST(invoked_replaced); + } + + // replace chunk adjustment with a parameters object not exposing it + { + std::atomic invoked_replaced(false); + + auto params = join_executor_parameters( + test_replaced_adjust_chunk_size_and_max_chunks(invoked_replaced)); + auto bound_params = + rebind_executor_parameters(params, experimental::num_cores(4)); + auto policy = create_rebound_policy(par, bound_params); + parameters_test(policy); + + HPX_TEST(invoked_replaced); + } + + // test wrapped chunk adjustment + { + std::atomic invoked_replaced(false); + std::atomic invoked_inner_replaced(false); + + auto params = join_executor_parameters( + test_replaced_adjust_chunk_size_and_max_chunks( + invoked_inner_replaced)); + auto bound_params = rebind_executor_parameters(params, + test_wrapping_adjust_chunk_size_and_max_chunks(invoked_replaced)); + auto policy = create_rebound_policy(par, bound_params); + parameters_test(policy); + + HPX_TEST(invoked_replaced); + HPX_TEST(invoked_inner_replaced); + } +} + +/////////////////////////////////////////////////////////////////////////////// +// test mark_begin_execution dispatching order +struct test_dual_mark_begin_execution +{ + explicit test_dual_mark_begin_execution(std::atomic& wrapping_wrapped, + std::atomic& wrapping_only) noexcept + : wrapping_wrapped(&wrapping_wrapped) + , wrapping_only(&wrapping_only) + { + } + + template + friend void tag_override_invoke( + hpx::execution::experimental::mark_begin_execution_t, + test_dual_mark_begin_execution& self, InnerParams&&, + Executor&&) noexcept + { + ++*self.wrapping_wrapped; + } + + template + friend void tag_override_invoke( + hpx::execution::experimental::mark_begin_execution_t, + test_dual_mark_begin_execution& self, Executor&&) noexcept + { + ++*self.wrapping_only; + } + + std::atomic* wrapping_wrapped; + std::atomic* wrapping_only; +}; + +namespace hpx::execution::experimental { + + template <> + struct is_executor_parameters + : std::true_type + { + }; +} // namespace hpx::execution::experimental + +void verify_single_mark_begin_dispatch() +{ + using namespace hpx::execution; + using namespace hpx::execution::experimental; + + std::atomic wrapping_wrapped_count(0); + std::atomic wrapping_only_count(0); + + auto params = join_executor_parameters(experimental::num_cores(4)); + auto bound_params = rebind_executor_parameters(params, + test_dual_mark_begin_execution( + wrapping_wrapped_count, wrapping_only_count)); + auto policy = create_rebound_policy(par, bound_params); + + parameters_test(policy); + + HPX_TEST_EQ(wrapping_wrapped_count.load(), 1); + HPX_TEST_EQ(wrapping_only_count.load(), 0); +} + /////////////////////////////////////////////////////////////////////////////// int hpx_main() { @@ -802,6 +1009,8 @@ int hpx_main() replace_execution_markers(); replace_processing_units_count(); replace_collect_execution_parameters(); + replace_adjust_chunk_size_and_max_chunks(); + verify_single_mark_begin_dispatch(); return hpx::local::finalize(); } From 4ad03fb1b6494332a1d7ee0a8267ebfebe160324 Mon Sep 17 00:00:00 2001 From: BhoomishGupta Date: Wed, 15 Apr 2026 22:45:39 +0530 Subject: [PATCH 16/17] Fixed CI issues Signed-off-by: BhoomishGupta --- .../algorithms/include/hpx/parallel/algorithms/reduce.hpp | 3 ++- libs/core/algorithms/tests/regressions/reduce_6647.cpp | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp index 554cfa301dd6..9b3369184fe1 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp @@ -357,9 +357,10 @@ namespace hpx { #include #include -#include #include +#include #include +#include #include #include #include diff --git a/libs/core/algorithms/tests/regressions/reduce_6647.cpp b/libs/core/algorithms/tests/regressions/reduce_6647.cpp index 88b603a438e5..6d9e49fc07a5 100644 --- a/libs/core/algorithms/tests/regressions/reduce_6647.cpp +++ b/libs/core/algorithms/tests/regressions/reduce_6647.cpp @@ -1,10 +1,10 @@ // Copyright (c) 2026 Bhoomish Gupta // -// SPDX - License - Identifier : BSL - 1.0 +// SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// #6647:Incorrect reduce implementation +// #6647: Incorrect reduce implementation #include #include From 8dc2f283fe9d5599b954632432f3353f1198660e Mon Sep 17 00:00:00 2001 From: BhoomishGupta Date: Fri, 17 Apr 2026 20:52:00 +0530 Subject: [PATCH 17/17] Refactor chunk size adjustment logic and enhance related tests Signed-off-by: BhoomishGupta --- .../hpx/parallel/algorithms/reduce.hpp | 57 +++++++++--- .../hpx/parallel/util/detail/chunk_size.hpp | 57 +++--------- .../executors/execution_parameters.hpp | 87 +++++++++++-------- .../tests/unit/rebind_executor_parameters.cpp | 4 +- 4 files changed, 105 insertions(+), 100 deletions(-) diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp index 9b3369184fe1..4428698f1649 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp @@ -393,14 +393,18 @@ namespace hpx::parallel { // value_type elements when T may differ from value_type (e.g. minmax). struct reduce_executor_parameters { - template - HPX_FORCEINLINE constexpr std::pair - adjust_chunk_size_and_max_chunks(Executor&&, - std::size_t num_elements, std::size_t num_cores, - std::size_t /*max_chunks*/, - std::size_t chunk_size) const noexcept + private: + static HPX_FORCEINLINE constexpr std::pair + adjust_chunk_size_and_max_chunks_impl(std::size_t num_elements, + std::size_t num_cores, std::size_t max_chunks, + std::size_t chunk_size) noexcept { - // Ensure minimum chunk size of 2 + if (num_elements <= 1) + { + return {chunk_size, max_chunks}; + } + + // Ensure minimum chunk size of 2 for reduce_partition. if (chunk_size < 2) { chunk_size = (num_elements + num_cores - 1) / num_cores; @@ -415,13 +419,41 @@ namespace hpx::parallel { while ( num_elements > chunk_size && num_elements % chunk_size == 1) { - chunk_size++; + ++chunk_size; } - std::size_t new_max_chunks = - (num_elements + chunk_size - 1) / chunk_size; + max_chunks = (num_elements + chunk_size - 1) / chunk_size; + return {chunk_size, max_chunks}; + } + + public: + template + HPX_FORCEINLINE constexpr std::pair + adjust_chunk_size_and_max_chunks(Executor&&, + std::size_t num_elements, std::size_t num_cores, + std::size_t max_chunks, std::size_t chunk_size) const noexcept + { + return adjust_chunk_size_and_max_chunks_impl( + num_elements, num_cores, max_chunks, chunk_size); + } - return {chunk_size, new_max_chunks}; + template + friend HPX_FORCEINLINE constexpr std::pair + tag_override_invoke(hpx::execution::experimental:: + adjust_chunk_size_and_max_chunks_t, + reduce_executor_parameters const&, InnerParams&& inner, + Executor&& exec, std::size_t num_elements, + std::size_t num_cores, std::size_t max_chunks, + std::size_t chunk_size) + { + auto [adjusted_chunk_size, adjusted_max_chunks] = hpx:: + execution::experimental::adjust_chunk_size_and_max_chunks( + HPX_FORWARD(InnerParams, inner), + HPX_FORWARD(Executor, exec), num_elements, num_cores, + max_chunks, chunk_size); + + return adjust_chunk_size_and_max_chunks_impl(num_elements, + num_cores, adjusted_max_chunks, adjusted_chunk_size); } }; /// \endcond @@ -528,8 +560,7 @@ namespace hpx::parallel { namespace detail { using reduce_policy_type = std::decay_t; return util::partitioner::call( - HPX_MOVE(reduce_policy), first, - hpx::parallel::detail::distance(first, last), HPX_MOVE(f1), + HPX_MOVE(reduce_policy), first, count, HPX_MOVE(f1), hpx::unwrapping( [init = HPX_FORWARD(T_, init), r = HPX_FORWARD(Reduce, r)]( auto&& results) -> T { diff --git a/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp b/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp index 466fc06f588e..44f68e10cf1e 100644 --- a/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp +++ b/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp @@ -60,26 +60,9 @@ namespace hpx::parallel::util::detail { { if (max_chunks == 0) { - if (chunk_size == 0) - { - std::size_t const cores_times_4 = 4 * cores; // -V112 - - // try to calculate chunk-size and maximum number of chunks - chunk_size = (count + cores_times_4 - 1) / cores_times_4; - - // we should not consider more chunks than we have elements - max_chunks = (std::min) (cores_times_4, count); // -V112 - - // we should not make chunks smaller than what's determined by - // the max chunk size - chunk_size = (std::max) (chunk_size, - (count + max_chunks - 1) / max_chunks); - } - else - { - // max_chunks == 0 && chunk_size != 0 - max_chunks = (count + chunk_size - 1) / chunk_size; - } + std::tie(chunk_size, max_chunks) = hpx::execution::experimental:: + detail::adjust_chunk_size_and_max_chunks_default( + count, cores, max_chunks, chunk_size); return; } @@ -89,25 +72,9 @@ namespace hpx::parallel::util::detail { return; } - if (chunk_size == 0) - { - // max_chunks != 0 - chunk_size = (count + max_chunks - 1) / max_chunks; - } - else - { - // max_chunks != 0 && chunk_size != 0 - - // in this case we make sure that there are no more chunks than - // max_chunks - std::size_t const calculated_max_chunks = - (count + chunk_size - 1) / chunk_size; - - if (calculated_max_chunks > max_chunks) - { - chunk_size = (count + max_chunks - 1) / max_chunks; - } - } + std::tie(chunk_size, max_chunks) = hpx::execution::experimental:: + detail::adjust_chunk_size_and_max_chunks_default( + count, cores, max_chunks, chunk_size); } //////////////////////////////////////////////////////////////////////////// @@ -304,10 +271,8 @@ namespace hpx::parallel::util::detail { hpx::chrono::null_duration, cores, count); // make sure, chunk size and max_chunks are consistent - std::tie(chunk_size, max_chunks) = - hpx::execution::experimental::adjust_chunk_size_and_max_chunks( - policy.parameters(), policy.executor(), count, cores, - max_chunks, chunk_size); + adjust_chunk_size_and_max_chunks( + cores, count, max_chunks, chunk_size, true); if (stride != 1) { @@ -581,10 +546,8 @@ namespace hpx::parallel::util::detail { hpx::chrono::null_duration, cores, count); // make sure, chunk size and max_chunks are consistent - std::tie(chunk_size, max_chunks) = - hpx::execution::experimental::adjust_chunk_size_and_max_chunks( - policy.parameters(), policy.executor(), count, cores, - max_chunks, chunk_size); + adjust_chunk_size_and_max_chunks( + cores, count, max_chunks, chunk_size, true); if (stride != 1) { diff --git a/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp b/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp index 3902dbd89dc6..5c33f8c4faee 100644 --- a/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp +++ b/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp @@ -605,54 +605,65 @@ namespace hpx::execution::experimental::detail { HPX_HAS_MEMBER_XXX_TRAIT_DEF(adjust_chunk_size_and_max_chunks) /////////////////////////////////////////////////////////////////////// - // default property implementation allowing to handle + // shared default implementation allowing to handle // adjust_chunk_size_and_max_chunks - struct adjust_chunk_size_and_max_chunks_property + HPX_FORCEINLINE constexpr std::pair + adjust_chunk_size_and_max_chunks_default(std::size_t num_elements, + std::size_t num_cores, std::size_t max_chunks, + std::size_t chunk_size) noexcept { - template - HPX_FORCEINLINE static constexpr std::pair - adjust_chunk_size_and_max_chunks(Target, std::size_t num_elements, - std::size_t num_cores, std::size_t max_chunks, - std::size_t chunk_size) noexcept + if (max_chunks == 0) { - if (max_chunks == 0) + if (chunk_size == 0) { - if (chunk_size == 0) - { - std::size_t const cores_times_4 = 4 * num_cores; // -V112 - - chunk_size = - (num_elements + cores_times_4 - 1) / cores_times_4; - - max_chunks = - (std::min) (cores_times_4, num_elements); // -V112 - - chunk_size = (std::max) (chunk_size, - (num_elements + max_chunks - 1) / max_chunks); - } - else - { - // max_chunks == 0 && chunk_size != 0 - max_chunks = (num_elements + chunk_size - 1) / chunk_size; - } + std::size_t const cores_times_4 = 4 * num_cores; // -V112 + + chunk_size = (num_elements + cores_times_4 - 1) / cores_times_4; + + max_chunks = + (std::min) (cores_times_4, num_elements); // -V112 + + chunk_size = (std::max) (chunk_size, + (num_elements + max_chunks - 1) / max_chunks); } - else if (chunk_size == 0) + else { - chunk_size = (num_elements + max_chunks - 1) / max_chunks; + // max_chunks == 0 && chunk_size != 0 + max_chunks = (num_elements + chunk_size - 1) / chunk_size; } - else + } + else if (chunk_size == 0) + { + chunk_size = (num_elements + max_chunks - 1) / max_chunks; + } + else + { + // max_chunks != 0 && chunk_size != 0 + std::size_t const calculated_max_chunks = + (num_elements + chunk_size - 1) / chunk_size; + + if (calculated_max_chunks > max_chunks) { - // max_chunks != 0 && chunk_size != 0 - std::size_t const calculated_max_chunks = - (num_elements + chunk_size - 1) / chunk_size; - - if (calculated_max_chunks > max_chunks) - { - chunk_size = (num_elements + max_chunks - 1) / max_chunks; - } + chunk_size = (num_elements + max_chunks - 1) / max_chunks; } + } + + return {chunk_size, max_chunks}; + } - return {chunk_size, max_chunks}; + /////////////////////////////////////////////////////////////////////// + // default property implementation allowing to handle + // adjust_chunk_size_and_max_chunks + struct adjust_chunk_size_and_max_chunks_property + { + template + HPX_FORCEINLINE static constexpr std::pair + adjust_chunk_size_and_max_chunks(Target, std::size_t num_elements, + std::size_t num_cores, std::size_t max_chunks, + std::size_t chunk_size) noexcept + { + return adjust_chunk_size_and_max_chunks_default( + num_elements, num_cores, max_chunks, chunk_size); } }; diff --git a/libs/core/execution/tests/unit/rebind_executor_parameters.cpp b/libs/core/execution/tests/unit/rebind_executor_parameters.cpp index 33bad6bb0e81..11ecf4f6847c 100644 --- a/libs/core/execution/tests/unit/rebind_executor_parameters.cpp +++ b/libs/core/execution/tests/unit/rebind_executor_parameters.cpp @@ -953,7 +953,7 @@ struct test_dual_mark_begin_execution template friend void tag_override_invoke( hpx::execution::experimental::mark_begin_execution_t, - test_dual_mark_begin_execution& self, InnerParams&&, + test_dual_mark_begin_execution const& self, InnerParams&&, Executor&&) noexcept { ++*self.wrapping_wrapped; @@ -962,7 +962,7 @@ struct test_dual_mark_begin_execution template friend void tag_override_invoke( hpx::execution::experimental::mark_begin_execution_t, - test_dual_mark_begin_execution& self, Executor&&) noexcept + test_dual_mark_begin_execution const& self, Executor&&) noexcept { ++*self.wrapping_only; }