Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d10ae50
new parallel_scheduler with seq pol
Mar 20, 2026
0b6f35a
trying to optimize parallel_scheduler
Mar 20, 2026
87205af
optimize
charan-003 Mar 21, 2026
7e3f0c9
add ifdef stdexec
charan-003 Mar 22, 2026
82856b6
parallel scheduler uses cached mask
charan-003 Mar 28, 2026
e6e2c1f
add replaceability api
charan-003 Mar 30, 2026
b6ad521
fix minor issues
charan-003 Apr 24, 2026
9e3a1ae
implement P3927
charan-003 Apr 24, 2026
5f3389a
implement p3804
charan-003 Apr 24, 2026
73e8909
fix formating
charan-003 Apr 24, 2026
68724e4
make it truely parallelized
charan-003 Apr 26, 2026
7db2040
get back to old one
charan-003 May 5, 2026
d20d710
resolve conflicts
charan-003 May 6, 2026
6883b05
use HPX bulk
charan-003 May 6, 2026
1112ad5
use get_completion_scheduler
charan-003 May 6, 2026
b7fba94
fix depricated errors
charan-003 May 6, 2026
d968500
resolve conflicts + few migration changes
charan-003 May 17, 2026
c241845
resolve conflicts + few migration changes
charan-003 May 17, 2026
86efdab
minor fix
charan-003 May 17, 2026
7e56a3b
fix execution layer
charan-003 May 17, 2026
07bb855
fix test failurs
charan-003 May 17, 2026
bd1832e
fix
charan-003 May 17, 2026
ff9756d
fix
charan-003 May 17, 2026
901afa4
fix fix -fix deadlocks
charan-003 May 18, 2026
39ad181
fix deadlocks
charan-003 May 18, 2026
e977c18
minor changes
charan-003 May 18, 2026
73663a8
fix tests with non-async polocies
charan-003 May 18, 2026
4e12d72
final fix
charan-003 May 18, 2026
e4efd07
include algorithm include
charan-003 May 19, 2026
6482457
refactor backend implementation
charan-003 May 20, 2026
6867a01
fix duplicate
charan-003 May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,10 @@ namespace hpx::parallel::detail {
ExPolicy> ||
has_scheduler_executor)
{
return util::detail::algorithm_result<ExPolicy>::
get(util::partitioner<ExPolicy>::call(
HPX_FORWARD(ExPolicy, policy), first, count,
HPX_MOVE(iter_fun),
hpx::util::empty_function{}));
return util::call_with_algorithm_result<ExPolicy>(
HPX_FORWARD(ExPolicy, policy), first, count,
HPX_MOVE(iter_fun),
hpx::util::empty_function{});
}
else
{
Expand Down Expand Up @@ -428,10 +427,9 @@ namespace hpx::parallel::detail {
if constexpr (hpx::is_async_execution_policy_v<ExPolicy> ||
has_scheduler_executor)
{
return util::detail::algorithm_result<ExPolicy>::get(
util::partitioner<ExPolicy>::call(
HPX_FORWARD(ExPolicy, policy), first, count,
HPX_MOVE(iter_fun), hpx::util::empty_function{}));
return util::call_with_algorithm_result<ExPolicy>(
HPX_FORWARD(ExPolicy, policy), first, count,
HPX_MOVE(iter_fun), hpx::util::empty_function{});
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,11 +1215,10 @@ namespace hpx::parallel {
if constexpr (hpx::is_async_execution_policy_v<ExPolicy> ||
is_scheduler_policy)
{
return util::detail::algorithm_result<ExPolicy>::get(
util::partitioner<ExPolicy>::call(
HPX_FORWARD(ExPolicy, policy), iter_or_r, size,
part_iterations<ExPolicy, F>{HPX_FORWARD(F, f)},
hpx::util::empty_function{}));
return util::call_with_algorithm_result<ExPolicy>(
HPX_FORWARD(ExPolicy, policy), iter_or_r, size,
part_iterations<ExPolicy, F>{HPX_FORWARD(F, f)},
hpx::util::empty_function{});
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ namespace hpx::parallel::util::detail {

// We attempt to perform some optimizations in case of non-task
// execution.
if constexpr (!hpx::is_async_execution_policy_v<ExPolicy> &&
!hpx::execution_policy_has_scheduler_executor_v<ExPolicy>)
if constexpr (!hpx::is_async_execution_policy_v<ExPolicy>)
Comment thread
charan-003 marked this conversation as resolved.
{
// Switch to sequential execution for one-core, one-chunk case
// if the executor supports it.
Expand Down
25 changes: 23 additions & 2 deletions libs/core/algorithms/include/hpx/parallel/util/partitioner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <hpx/modules/iterator_support.hpp>
#include <hpx/modules/pack_traversal.hpp>
#include <hpx/modules/type_support.hpp>
#include <hpx/parallel/util/detail/algorithm_result.hpp>
#include <hpx/parallel/util/detail/chunk_size.hpp>
#include <hpx/parallel/util/detail/handle_local_exceptions.hpp>
#include <hpx/parallel/util/detail/partitioner_iteration.hpp>
Expand Down Expand Up @@ -75,8 +76,7 @@ namespace hpx::parallel::util::detail {
// We attempt to perform some optimizations in case of non-task
// execution.
if constexpr (Optimize &&
!hpx::is_async_execution_policy_v<ExPolicy> &&
!hpx::execution_policy_has_scheduler_executor_v<ExPolicy>)
!hpx::is_async_execution_policy_v<ExPolicy>)
{
// Switch to sequential execution for one-core, one-chunk case
// if the executor supports it.
Expand Down Expand Up @@ -700,4 +700,25 @@ namespace hpx::parallel::util {
detail::task_static_partitioner>::template apply<R, Result>
{
};

// Helper to call partitioner and wrap the result with
// algorithm_result::get(). Handles both void and non-void return types.
template <typename ExPolicy, typename... Args>
decltype(auto) call_with_algorithm_result(ExPolicy&& policy, Args&&... args)
{
if constexpr (std::is_void_v<decltype(partitioner<ExPolicy>::call(
HPX_FORWARD(ExPolicy, policy),
HPX_FORWARD(Args, args)...))>)
{
partitioner<ExPolicy>::call(
HPX_FORWARD(ExPolicy, policy), HPX_FORWARD(Args, args)...);
return detail::algorithm_result<ExPolicy>::get();
}
else
{
return detail::algorithm_result<ExPolicy>::get(
partitioner<ExPolicy>::call(
HPX_FORWARD(ExPolicy, policy), HPX_FORWARD(Args, args)...));
}
}
} // namespace hpx::parallel::util
9 changes: 9 additions & 0 deletions libs/core/algorithms/tests/performance/foreach_report.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ int hpx_main(hpx::program_options::variables_map& vm)
[&]() { measure_parallel_foreach(data_representation, exec); });
}

{
hpx::execution::experimental::scheduler_executor<
hpx::execution::experimental::parallel_scheduler>
exec(hpx::execution::experimental::get_parallel_scheduler());
hpx::util::perftests_report("for_each", "parallel_scheduler",
test_count,
[&]() { measure_parallel_foreach(data_representation, exec); });
}

{
hpx::execution::parallel_executor exec;
hpx::util::perftests_report("for_each", "parallel_executor",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,18 @@ namespace hpx::cuda::experimental {
S, Env>{},
invoke_function_transformation_fn{},
default_set_error_fn{},
hpx::execution::experimental::ignore_completion{}))
hpx::execution::experimental::ignore_completion{},
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_stopped_t()>{}))
{
return hpx::execution::experimental::transform_completion_signatures(
hpx::execution::experimental::completion_signatures_of_t<
S, Env>{},
invoke_function_transformation_fn{},
default_set_error_fn{},
hpx::execution::experimental::ignore_completion{});
hpx::execution::experimental::ignore_completion{},
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_stopped_t()>{});
}
// clang-format on

Expand Down
8 changes: 6 additions & 2 deletions libs/core/async_mpi/include/hpx/async_mpi/transform_mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,18 @@ namespace hpx::mpi::experimental {
Sender, Env>{},
invoke_function_transformation_fn{},
default_set_error_fn{},
hpx::execution::experimental::ignore_completion{}))
hpx::execution::experimental::ignore_completion{},
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_stopped_t()>{}))
{
return hpx::execution::experimental::transform_completion_signatures(
hpx::execution::experimental::completion_signatures_of_t<
Sender, Env>{},
invoke_function_transformation_fn{},
default_set_error_fn{},
hpx::execution::experimental::ignore_completion{});
hpx::execution::experimental::ignore_completion{},
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_stopped_t()>{});
}
// clang-format on

Expand Down
Loading
Loading