Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,97 @@ namespace hpx::execution::experimental {
HPX_FORWARD(Receiver, receiver), future_};
}
};

///////////////////////////////////////////////////////////////////////
// Scheduler-aware sender wrapper.
//
// Exposes the stored scheduler through its environment so that
// downstream sender algorithms (bulk, sync_wait, etc.) can query
// get_completion_scheduler<set_value_t> and obtain the scheduler
// that originated the work.
HPX_CXX_CORE_EXPORT template <typename Future, typename Scheduler>
requires hpx::traits::is_future_v<std::decay_t<Future>>
struct as_sender_sender_with_scheduler
: public as_sender_sender_base<std::decay_t<Future>>
{
using sender_concept = hpx::execution::experimental::sender_t;
using future_type = std::decay_t<Future>;
using scheduler_type = std::decay_t<Scheduler>;
using base_type = as_sender_sender_base<std::decay_t<Future>>;
using base_type::future_;

HPX_NO_UNIQUE_ADDRESS scheduler_type scheduler_;

// Environment that answers get_completion_scheduler queries.
// Follows the thread_pool_scheduler::sender::env pattern.
struct env
{
scheduler_type const& sched;

auto query(
hpx::execution::experimental::get_domain_t) const noexcept
{
return hpx::execution::experimental::get_domain(sched);
}

template <typename CPO>
requires std::is_same_v<CPO,
hpx::execution::experimental::set_value_t> ||
std::is_same_v<CPO,
hpx::execution::experimental::set_stopped_t>
auto query(
hpx::execution::experimental::get_completion_scheduler_t<
CPO>) const noexcept
{
return sched;
}
};

template <typename Future_, typename Scheduler_>
requires(!std::is_same_v<std::decay_t<Future_>,
as_sender_sender_with_scheduler>)
explicit as_sender_sender_with_scheduler(
Future_&& future, Scheduler_&& scheduler)
: base_type{HPX_FORWARD(Future_, future)}
, scheduler_(HPX_FORWARD(Scheduler_, scheduler))
{
}

as_sender_sender_with_scheduler(
as_sender_sender_with_scheduler&&) = default;
as_sender_sender_with_scheduler& operator=(
as_sender_sender_with_scheduler&&) = default;
as_sender_sender_with_scheduler(
as_sender_sender_with_scheduler const&) = default;
as_sender_sender_with_scheduler& operator=(
as_sender_sender_with_scheduler const&) = default;

template <typename Self, typename... Env>
static consteval auto get_completion_signatures() noexcept ->
typename base_type::completion_signatures
{
return {};
}

template <typename Receiver>
auto connect(Receiver&& receiver) &&
{
return as_sender_operation_state<Receiver, future_type>{
HPX_FORWARD(Receiver, receiver), HPX_MOVE(future_)};
}

template <typename Receiver>
auto connect(Receiver&& receiver) &
{
return as_sender_operation_state<Receiver, future_type>{
HPX_FORWARD(Receiver, receiver), future_};
}

constexpr auto get_env() const noexcept
{
return env{scheduler_};
}
};
} // namespace detail

// The as_sender CPO can be used to adapt any HPX future as a sender. The
Expand All @@ -256,6 +347,19 @@ namespace hpx::execution::experimental {
HPX_FORWARD(Future, future));
}

// Scheduler-aware overload: wraps the future into a sender whose
// environment exposes the given scheduler as completion scheduler.
template <typename Future, typename Scheduler>
requires hpx::traits::is_future_v<std::decay_t<Future>> &&
hpx::execution::experimental::scheduler<std::decay_t<Scheduler>>
constexpr HPX_FORCEINLINE auto operator()(
Future&& future, Scheduler&& scheduler) const
{
return detail::as_sender_sender_with_scheduler<std::decay_t<Future>,
std::decay_t<Scheduler>>(
HPX_FORWARD(Future, future), HPX_FORWARD(Scheduler, scheduler));
}

constexpr HPX_FORCEINLINE auto operator()() const
{
return detail::partial_algorithm<as_sender_t>{};
Expand Down
1 change: 1 addition & 0 deletions libs/core/execution/tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

set(tests
algorithm_as_sender
algorithm_as_sender_with_scheduler
algorithm_bulk
algorithm_continues_on
algorithm_ensure_started
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (c) 2026 The STE||AR-Group
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/execution.hpp>
#include <hpx/executors/thread_pool_scheduler.hpp>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will break C++20 module compilation. Please use generated module headers only:

Suggested change
#include <hpx/executors/thread_pool_scheduler.hpp>
#include <hpx/modules/executors.hpp>

#include <hpx/future.hpp>
#include <hpx/init.hpp>
#include <hpx/modules/futures.hpp>
#include <hpx/modules/testing.hpp>

namespace ex = hpx::execution::experimental;
namespace tt = hpx::this_thread::experimental;

void test_as_sender_with_scheduler_basic()
{
// Test: future + scheduler -> sender with env
auto f = hpx::async([] { return 42; });
ex::thread_pool_scheduler sched{};
auto sender = ex::as_sender(std::move(f), sched);
// Verify compilation and basic properties
(void) sender;
}

void test_as_sender_with_scheduler_get_env()
{
// Test: env exposes the scheduler
auto f = hpx::async([] { return 42; });
ex::thread_pool_scheduler sched{};
auto sender = ex::as_sender(std::move(f), sched);
auto env = ex::get_env(sender);
// Verify env can be queried for scheduler
auto sched_from_env = ex::get_completion_scheduler<ex::set_value_t>(env);
HPX_TEST(sched == sched_from_env);
}

void test_as_sender_with_scheduler_in_pipeline()
{
// Full end-to-end test
auto f = hpx::async([] { return 42; });
ex::thread_pool_scheduler sched{};

auto [result] =
tt::sync_wait(ex::as_sender(std::move(f), sched) | ex::then([](int x) {
return x * 2;
})).value();

HPX_TEST_EQ(result, 84);
}

void test_as_sender_with_scheduler_shared_future()
{
// Test shared_future variant (copyable)
auto f = hpx::make_shared_future(hpx::async([] { return 42; }));
ex::thread_pool_scheduler sched{};
auto sender = ex::as_sender(f, sched); // f is lvalue, not moved

auto [result] =
tt::sync_wait(sender | ex::then([](int x) { return x * 2; })).value();

HPX_TEST_EQ(result, 84);
}

void test_as_sender_with_scheduler_void()
{
// Test void-returning futures
bool executed = false;
auto f = hpx::async([&] { executed = true; });
ex::thread_pool_scheduler sched{};
auto sender = ex::as_sender(std::move(f), sched);
tt::sync_wait(std::move(sender));
HPX_TEST(executed);
}

void test_as_sender_with_scheduler_error()
{
// Test exception propagation
auto f = hpx::async([]() -> int {
throw std::runtime_error("test error");
return 42;
});
ex::thread_pool_scheduler sched{};
auto sender = ex::as_sender(std::move(f), sched);

bool caught = false;
try
{
tt::sync_wait(std::move(sender));
HPX_TEST(false); // Should have thrown
}
catch (std::runtime_error const& e)
{
caught = true;
HPX_TEST_EQ(std::string(e.what()), "test error");
}
HPX_TEST(caught);
}

int main()
{
test_as_sender_with_scheduler_basic();
test_as_sender_with_scheduler_get_env();
test_as_sender_with_scheduler_in_pipeline();
test_as_sender_with_scheduler_shared_future();
test_as_sender_with_scheduler_void();
test_as_sender_with_scheduler_error();

return hpx::util::report_errors();
}
Loading