diff --git a/libs/full/segmented_algorithms/CMakeLists.txt b/libs/full/segmented_algorithms/CMakeLists.txt index a3acd7912b97..d740f29fd106 100644 --- a/libs/full/segmented_algorithms/CMakeLists.txt +++ b/libs/full/segmented_algorithms/CMakeLists.txt @@ -15,6 +15,7 @@ set(segmented_algorithms_headers hpx/parallel/segmented_algorithms/all_any_none.hpp hpx/parallel/segmented_algorithms/count.hpp hpx/parallel/segmented_algorithms/detail/dispatch.hpp + hpx/parallel/segmented_algorithms/detail/merge.hpp hpx/parallel/segmented_algorithms/detail/reduce.hpp hpx/parallel/segmented_algorithms/detail/scan.hpp hpx/parallel/segmented_algorithms/detail/transfer.hpp @@ -26,6 +27,7 @@ set(segmented_algorithms_headers hpx/parallel/segmented_algorithms/generate.hpp hpx/parallel/segmented_algorithms/inclusive_scan.hpp hpx/parallel/segmented_algorithms/is_sorted.hpp + hpx/parallel/segmented_algorithms/merge.hpp hpx/parallel/segmented_algorithms/minmax.hpp hpx/parallel/segmented_algorithms/reduce.hpp hpx/parallel/segmented_algorithms/replace.hpp @@ -50,7 +52,7 @@ add_hpx_module( HEADERS ${segmented_algorithms_headers} COMPAT_HEADERS ${segmented_algorithms_compat_headers} DEPENDENCIES hpx_core - MODULE_DEPENDENCIES hpx_async_colocated hpx_async_distributed + MODULE_DEPENDENCIES hpx_async_colocated hpx_async_distributed hpx_collectives hpx_distribution_policies hpx_naming_base CMAKE_SUBDIRS examples tests ) diff --git a/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithm.hpp b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithm.hpp index 195b04fdb55e..0b28d8b06eef 100644 --- a/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithm.hpp +++ b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithm.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include diff --git a/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/detail/merge.hpp b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/detail/merge.hpp new file mode 100644 index 000000000000..5efcc15b11d6 --- /dev/null +++ b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/detail/merge.hpp @@ -0,0 +1,1065 @@ +// Copyright (c) 2026 Abhishek Bansal +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace hpx::collectives { + template + std::vector> all_gather(hpx::launch::sync_policy, + char const*, T&&, num_sites_arg const, this_site_arg const, + generation_arg const, root_site_arg const); + + template + std::vector all_to_all(hpx::launch::sync_policy, char const*, + std::vector&&, num_sites_arg const, this_site_arg const, + generation_arg const, root_site_arg const); +} // namespace hpx::collectives + +namespace hpx::parallel::detail { + + // ===================================================================== + // Segmented Merge + // ===================================================================== + // + // Merges two sorted distributed ranges (A, B) into a distributed + // destination range (D) using the co-rank technique. + // + // Overview: + // Each range is composed of segments (slices) spread across + // localities. The coordinator decomposes the three ranges into + // globally-ordered slice metadata, then launches a kernel + // on every participating locality. The kernel proceeds in phases: + // + // Phase 1 Startup barrier: ensures all handle registries are + // populated before any remote co-rank probes fire. + // Phase 2 Co-rank planning: for each local destination slice, + // binary-search (co-rank) determines which sub-ranges + // of A and B contribute to it. Successive slices narrow + // the search bounds for reduced remote fetches. + // Phase 3 Interval sharing: all localities exchange their + // computed intervals via all_gather. + // Phase 4 Payload packing: local input data is packed into + // per-peer batches (one flat buffer + descriptors per + // peer) to minimize serialization overhead. + // Phase 5 Payload exchange: two all_to_all calls ship A and B + // batches to the localities that own the destination. + // Phase 6 Local merge: each locality resolves received fragment + // descriptors to zero-copy pointers into the batch + // buffers, then performs a standard stable merge into + // its destination slices. + // + // Key types: + // slice_metadata global descriptor for one segment + // owned_handle local-only handle with actual iterators + // interval co-rank result for one dest slice + // fragment_desc lightweight header indexing into a batch + // payload_batch wire type: descriptors + flat value buffer + // resolved_fragment kernel-internal pointer+count view + // + // ===================================================================== + + /////////////////////////////////////////////////////////////////////////// + // slice_metadata: globally-shared descriptor for one contiguous segment + // of a segmented range (A, B, or D). + struct distributed_merge_slice_metadata + { + std::uint64_t slice_id = 0; + std::uint64_t site_index = 0; + std::uint64_t global_begin = 0; + std::uint64_t global_end = 0; + std::uint32_t locality_id = naming::invalid_locality_id; + }; + + /////////////////////////////////////////////////////////////////////////// + // owned_handle: local-only handle holding the actual iterators for + // a segment owned by this locality. Never serialized. Local iterators + // are meaningless on remote localities. + template + struct distributed_merge_owned_handle + { + std::uint64_t slice_id = 0; + std::uint64_t global_begin = 0; + std::uint64_t global_end = 0; + LocalIter local_begin{}; + }; + + /////////////////////////////////////////////////////////////////////////// + // interval: co-rank planning result for one destination slice. + struct distributed_merge_interval + { + std::uint64_t dest_slice_id = 0; + std::uint64_t site_index = 0; + std::uint64_t A_begin_rank = 0; + std::uint64_t A_end_rank = 0; + std::uint64_t B_begin_rank = 0; + std::uint64_t B_end_rank = 0; + }; + + /////////////////////////////////////////////////////////////////////////// + // Fragment descriptor: on the wire it indexes into the batch's shared + // values buffer via (value_offset, value_count). After receiving, the + // data pointer is resolved to point directly into the buffer. + struct distributed_merge_fragment_desc + { + std::uint64_t dest_slice_id = 0; + std::uint64_t input_global_begin = 0; + std::uint64_t value_offset = 0; + std::uint64_t value_count = 0; + }; + + // Payload batch: all fragments destined for one peer, packed into a + // single flat values buffer with lightweight descriptors. + template + struct distributed_merge_payload_batch + { + std::vector fragments; + std::vector values; + + private: + friend class hpx::serialization::access; + + template + void serialize(Archive& ar, unsigned) + { + ar & fragments & values; + } + }; + + /////////////////////////////////////////////////////////////////////////// + // collective_context: manages unique basenames and generation counters + // so concurrent distributed merge invocations never collide. + struct distributed_merge_collective_ctx + { + std::string invocation_ns; + std::unordered_map generations; + + explicit distributed_merge_collective_ctx(std::string ns) + : invocation_ns(HPX_MOVE(ns)) + { + } + + [[nodiscard]] std::string basename(std::string const& phase) const + { + return invocation_ns + "/" + phase; + } + + std::uint64_t next_gen(std::string const& phase) + { + return ++generations[phase]; + } + }; + + // ----------------------------------------------------------------- + // Handle registry + remote fetcher + // + // Co-rank probes need to read individual elements from input ranges + // that may live on remote localities. Since local iterators cannot be + // serialized, each locality registers its owned handles in a static + // per-invocation registry. Remote localities invoke a plain HPX action + // (fetcher) that looks up the handle and reads the requested element. + // The RAII guard ensures cleanup after the kernel completes. + // ----------------------------------------------------------------- + + /////////////////////////////////////////////////////////////////////////// + // Static per-locality registry for slice handles. Tagged by RegistryTag + // so that A and B registries are always distinct, even when value types + // and iterator types are the same. + template + struct distributed_merge_registry + { + using handle_type = distributed_merge_owned_handle; + + static hpx::spinlock& mtx() + { + static hpx::spinlock m; + return m; + } + + static auto& map() + { + static std::unordered_map> + registry; + return registry; + } + }; + + // RAII guard that registers handles on construction and deregisters + // on destruction. + template + struct distributed_merge_registry_guard + { + using registry_type = + distributed_merge_registry; + using handle_type = typename registry_type::handle_type; + + distributed_merge_registry_guard( + std::string ns, std::vector const& handles) + : invocation_ns_(HPX_MOVE(ns)) + { + std::unordered_map indexed; + indexed.reserve(handles.size()); + for (auto const& h : handles) + { + indexed.emplace(h.slice_id, h); + } + + std::lock_guard lk(registry_type::mtx()); + registry_type::map().emplace(invocation_ns_, HPX_MOVE(indexed)); + } + + ~distributed_merge_registry_guard() + { + std::lock_guard lk(registry_type::mtx()); + registry_type::map().erase(invocation_ns_); + } + + distributed_merge_registry_guard( + distributed_merge_registry_guard const&) = delete; + distributed_merge_registry_guard& operator=( + distributed_merge_registry_guard const&) = delete; + + private: + std::string invocation_ns_; + }; + + /////////////////////////////////////////////////////////////////////////// + // Registry tag types ensure A and B never collide. + struct distributed_merge_tag_A + { + }; + struct distributed_merge_tag_B + { + }; + + /////////////////////////////////////////////////////////////////////////// + // Remote value fetcher: reads a single value from a registered slice. + // Used by co-rank probes when the target slice is on a remote locality. + template + struct distributed_merge_fetcher + { + using registry_type = + distributed_merge_registry; + using local_traits = + hpx::traits::segmented_local_iterator_traits; + + static T call(std::string const& invocation_ns, std::uint64_t slice_id, + std::uint64_t offset) + { + std::lock_guard lk(registry_type::mtx()); + + auto& reg = registry_type::map(); + auto ns_it = reg.find(invocation_ns); + if (ns_it == reg.end()) + { + HPX_THROW_EXCEPTION(hpx::error::bad_parameter, + "distributed_merge_fetcher::call", + "slice registry entry not found for invocation " + "namespace"); + } + + auto& handle_map = ns_it->second; + auto handle_it = handle_map.find(slice_id); + if (handle_it == handle_map.end()) + { + HPX_THROW_EXCEPTION(hpx::error::bad_parameter, + "distributed_merge_fetcher::call", + "slice handle not found for slice_id"); + } + + auto raw = local_traits::local(handle_it->second.local_begin); + std::advance(raw, static_cast(offset)); + return *raw; + } + + struct action + : hpx::actions::make_action_t< + decltype(&distributed_merge_fetcher::call), + &distributed_merge_fetcher::call, action> + { + }; + }; + + /////////////////////////////////////////////////////////////////////////// + // Helper: binary search a sorted layout to find the slice containing a + // given global rank. + inline std::uint64_t distributed_merge_find_slice( + std::vector const& layout, + std::uint64_t global_rank) + { + auto it = std::upper_bound(layout.begin(), layout.end(), global_rank, + [](std::uint64_t rank, distributed_merge_slice_metadata const& s) { + return rank < s.global_end; + }); + + if (it == layout.end() || global_rank < it->global_begin) + { + HPX_THROW_EXCEPTION(hpx::error::bad_parameter, + "distributed_merge_find_slice", + "global rank outside layout bounds"); + } + + return static_cast(std::distance(layout.begin(), it)); + } + + /////////////////////////////////////////////////////////////////////////// + // build_ordered_slices: decompose a segmented range [first, last) into + // globally ordered slice metadata + per-locality handle maps. + template + using distributed_merge_local_iter_t = + typename hpx::traits::segmented_iterator_traits< + SegIter>::local_iterator; + + template + using distributed_merge_handle_t = + distributed_merge_owned_handle>; + + template + using distributed_merge_handle_map_t = std::map>>; + + template + std::pair, + distributed_merge_handle_map_t> + distributed_merge_build_slices(SegIter first, SegIter last) + { + using traits = hpx::traits::segmented_iterator_traits; + using segment_iterator = typename traits::segment_iterator; + using local_iterator = typename traits::local_iterator; + + std::vector layout; + distributed_merge_handle_map_t handles; + + if (first == last) + { + return {HPX_MOVE(layout), HPX_MOVE(handles)}; + } + + std::uint64_t next_id = 0; + std::uint64_t offset = 0; + + auto add_slice = [&](segment_iterator const& seg, local_iterator lb, + local_iterator le) { + auto const len = static_cast(std::distance(lb, le)); + if (len == 0) + return; + + auto const id = traits::get_id(seg); + auto const loc_id = naming::get_locality_id_from_id(id); + + layout.push_back(distributed_merge_slice_metadata{ + next_id, 0, offset, offset + len, loc_id}); + + handles[loc_id].push_back( + distributed_merge_owned_handle{ + next_id, offset, offset + len, HPX_MOVE(lb)}); + + ++next_id; + offset += len; + }; + + segment_iterator sit = traits::segment(first); + segment_iterator send = traits::segment(last); + + if (sit == send) + { + add_slice(sit, traits::local(first), traits::local(last)); + return {HPX_MOVE(layout), HPX_MOVE(handles)}; + } + + add_slice(sit, traits::local(first), traits::end(sit)); + + for (++sit; sit != send; ++sit) + { + add_slice(sit, traits::begin(sit), traits::end(sit)); + } + + add_slice(send, traits::begin(send), traits::local(last)); + + return {HPX_MOVE(layout), HPX_MOVE(handles)}; + } + + /////////////////////////////////////////////////////////////////////////// + // Collect all unique participant localities from A, B, D layouts. + inline std::vector + distributed_merge_collect_participants_and_assign_sites( + std::vector& A_layout, + std::vector& B_layout, + std::vector& D_layout) + { + // Collect unique locality IDs and assign site indices. + // std::map keeps keys sorted, giving deterministic site ordering. + std::map loc_to_site; + auto collect = + [&loc_to_site]( + std::vector const& l) { + for (auto const& s : l) + loc_to_site.emplace(s.locality_id, 0); + }; + collect(A_layout); + collect(B_layout); + collect(D_layout); + + // Assign contiguous site indices and build participants. + std::vector participants; + participants.reserve(loc_to_site.size()); + std::uint64_t site = 0; + for (auto& [loc_id, idx] : loc_to_site) + { + idx = site++; + participants.push_back(naming::get_id_from_locality_id(loc_id)); + } + + // Assign site_index in all layouts. + for (auto& s : A_layout) + s.site_index = loc_to_site[s.locality_id]; + for (auto& s : B_layout) + s.site_index = loc_to_site[s.locality_id]; + for (auto& s : D_layout) + s.site_index = loc_to_site[s.locality_id]; + + return participants; + } + + /////////////////////////////////////////////////////////////////////////// + // Get handles for a specific locality, or empty vector if none. + template + auto distributed_merge_get_handles( + HandleMap const& handles, std::uint32_t locality_id) + { + using mapped_type = typename HandleMap::mapped_type; + auto it = handles.find(locality_id); + if (it == handles.end()) + return mapped_type{}; + return it->second; + } + + /////////////////////////////////////////////////////////////////////////// + // Unique invocation namespace generator. + inline std::string distributed_merge_make_ns() + { + static std::atomic counter{0}; + return "/hpx/distributed_merge/" + + std::to_string(hpx::agas::get_locality_id()) + "/" + + std::to_string(++counter); + } + + // ----------------------------------------------------------------- + // SPMD kernel + // + // One instance runs on every participating locality. Each instance + // owns a subset of A, B, and D slices. The six phases are fully + // symmetric. Every locality executes the same code path, differing + // only in which slices it owns and which data it sends/receives. + // Communication is purely collective (all_gather, all_to_all), so + // there are no point-to-point messages to manage. + // ----------------------------------------------------------------- + + /////////////////////////////////////////////////////////////////////////// + // The distributed merge SPMD kernel. Launched as an action on every + // participating locality. + template + struct distributed_merge_kernel + { + using handle1_t = distributed_merge_owned_handle; + using handle2_t = distributed_merge_owned_handle; + using handle_dest_t = distributed_merge_owned_handle; + using batch1_t = distributed_merge_payload_batch; + using batch2_t = distributed_merge_payload_batch; + using desc_t = distributed_merge_fragment_desc; + using fetch_action1_t = + typename distributed_merge_fetcher::action; + using fetch_action2_t = + typename distributed_merge_fetcher::action; + + /////////////////////////////////////////////////////////////////////// + // Cached remote/local element fetch for co-rank probing. + template + static Val fetch_at_rank(std::uint64_t rank, + std::vector const& layout, + std::unordered_map const& idx, + std::unordered_map& cache, + std::uint64_t this_site, + std::vector const& participants, + std::string const& invocation_ns) + { + auto cit = cache.find(rank); + if (cit != cache.end()) + return cit->second; + + std::uint64_t const si = distributed_merge_find_slice(layout, rank); + auto const& slice = layout[si]; + std::uint64_t const off = rank - slice.global_begin; + + Val val; + if (slice.site_index == this_site) + { + using lt = hpx::traits::segmented_local_iterator_traits; + + auto it = idx.find(slice.slice_id); + HPX_ASSERT(it != idx.end()); + auto raw = lt::local(it->second->local_begin); + std::advance(raw, static_cast(off)); + val = *raw; + } + else + { + val = hpx::async(FetchAction{}, participants[slice.site_index], + invocation_ns, slice.slice_id, off) + .get(); + } + + cache.emplace(rank, val); + return val; + } + + /////////////////////////////////////////////////////////////////////// + // Stable co-rank: find (i, j) with i + j = K such that the first K + // merged elements are exactly A[0,i) and B[0,j), with A-before-B + // on equal keys. [lo, hi] optionally narrow the search range on + // the A-index (clamped to the feasible interval). + template + static std::pair co_rank(std::uint64_t K, + std::uint64_t n1, std::uint64_t n2, AAt&& A_at, BAt&& B_at, + Comp const& comp, std::uint64_t lo = 0, + std::uint64_t hi = (std::numeric_limits::max)()) + { + std::uint64_t const feasible_lo = (K > n2) ? (K - n2) : 0; + std::uint64_t const feasible_hi = (std::min) (K, n1); + + lo = (std::max) (lo, feasible_lo); + hi = (std::min) (hi, feasible_hi); + + while (true) + { + std::uint64_t const i = lo + (hi - lo) / 2; + std::uint64_t const j = K - i; + + if (i > 0 && j < n2 && comp(B_at(j), A_at(i - 1))) + { + hi = i - 1; + } + else if (j > 0 && i < n1 && !comp(B_at(j - 1), A_at(i))) + { + lo = i + 1; + } + else + { + return {i, j}; + } + } + } + + /////////////////////////////////////////////////////////////////////// + // Local stable merge of reconstructed A and B fragments into a + // destination slice. + // Resolved fragment: a pointer+count span into a received batch's + // values buffer, with global rank for ordering. + template + struct resolved_fragment + { + std::uint64_t input_global_begin; + T const* data; + std::uint64_t count; + }; + + template + static void merge_into_dest_impl(handle_dest_t const& dest_handle, + std::vector>& A_frags, + std::vector>& B_frags, Comp const& comp) + { + using dest_local_traits = + hpx::traits::segmented_local_iterator_traits; + + auto by_rank = [](auto const& a, auto const& b) { + return a.input_global_begin < b.input_global_begin; + }; + std::sort(A_frags.begin(), A_frags.end(), by_rank); + std::sort(B_frags.begin(), B_frags.end(), by_rank); + + std::uint64_t a_fi = 0, a_off = 0; + std::uint64_t b_fi = 0, b_off = 0; + + auto a_done = [&]() { return a_fi >= A_frags.size(); }; + auto b_done = [&]() { return b_fi >= B_frags.size(); }; + auto a_val = [&]() -> TA const& { + return A_frags[a_fi].data[a_off]; + }; + auto b_val = [&]() -> TB const& { + return B_frags[b_fi].data[b_off]; + }; + auto a_advance = [&]() { + if (++a_off >= A_frags[a_fi].count) + { + ++a_fi; + a_off = 0; + } + }; + auto b_advance = [&]() { + if (++b_off >= B_frags[b_fi].count) + { + ++b_fi; + b_off = 0; + } + }; + + auto dest_raw = dest_local_traits::local(dest_handle.local_begin); + + while (!a_done() || !b_done()) + { + if (a_done()) + { + *dest_raw = b_val(); + b_advance(); + } + else if (b_done()) + { + *dest_raw = a_val(); + a_advance(); + } + else if (comp(b_val(), a_val())) + { + *dest_raw = b_val(); + b_advance(); + } + else + { + *dest_raw = a_val(); + a_advance(); + } + ++dest_raw; + } + } + + /////////////////////////////////////////////////////////////////////// + // Pack fragments from local handles into per-peer batches. + template + static void pack_batches(std::vector const& handles, + std::vector const& intervals, + BeginRankFn begin_rank_of, EndRankFn end_rank_of, + std::vector>& send) + { + using lt = hpx::traits::segmented_local_iterator_traits; + + for (auto const& h : handles) + { + for (auto const& iv : intervals) + { + std::uint64_t const ob = + (std::max) (h.global_begin, begin_rank_of(iv)); + std::uint64_t const oe = + (std::min) (h.global_end, end_rank_of(iv)); + + if (ob >= oe) + continue; + + auto& batch = send[iv.site_index]; + std::uint64_t const off = batch.values.size(); + std::uint64_t const cnt = oe - ob; + + batch.fragments.push_back(distributed_merge_fragment_desc{ + iv.dest_slice_id, ob, off, cnt}); + + auto raw = lt::local(h.local_begin); + std::advance( + raw, static_cast(ob - h.global_begin)); + for (std::uint64_t k = 0; k != cnt; ++k, ++raw) + batch.values.push_back(*raw); + } + } + } + + /////////////////////////////////////////////////////////////////////// + // The main kernel entry point, executed as an action on each + // participating locality. + static void call(std::string invocation_ns, + std::vector participants, std::uint64_t this_site, + std::vector A_layout, + std::vector B_layout, + std::vector D_layout, + std::vector A_handles, std::vector B_handles, + std::vector D_handles, std::uint64_t n1, + std::uint64_t n2, Comp comp) + { + // --- Register local slice handles in static registries --- + distributed_merge_registry_guard + guard_A(invocation_ns, A_handles); + distributed_merge_registry_guard + guard_B(invocation_ns, B_handles); + + std::uint64_t const P = participants.size(); + distributed_merge_collective_ctx ctx(HPX_MOVE(invocation_ns)); + + // --- Phase 1: Startup barrier --- + // Ensures all localities have registered their handles before + // any remote co-rank probes can fire. + { + auto bn = ctx.basename("startup_sync"); + [[maybe_unused]] auto sync_result = + hpx::collectives::all_gather(hpx::launch::sync, bn.c_str(), + std::uint8_t(1), hpx::collectives::num_sites_arg(P), + hpx::collectives::this_site_arg(this_site), + hpx::collectives::generation_arg( + ctx.next_gen("startup_sync")), + hpx::collectives::root_site_arg(0)); + } + + // --- Build indexed handle maps for O(1) lookup --- + std::unordered_map A_idx; + for (auto const& h : A_handles) + A_idx.emplace(h.slice_id, &h); + + std::unordered_map B_idx; + for (auto const& h : B_handles) + B_idx.emplace(h.slice_id, &h); + + // --- Cached co-rank probe functions --- + std::unordered_map A_cache; + std::unordered_map B_cache; + + auto A_at = [&](std::uint64_t rank) -> Value1 { + return fetch_at_rank(rank, + A_layout, A_idx, A_cache, this_site, participants, + ctx.invocation_ns); + }; + auto B_at = [&](std::uint64_t rank) -> Value2 { + return fetch_at_rank(rank, + B_layout, B_idx, B_cache, this_site, participants, + ctx.invocation_ns); + }; + + // --- Phase 2: Co-rank planning per owned dest slice --- + // Successive slices narrow the search range: the end + // boundary of slice k bounds the start of slice k+1. + std::vector local_intervals; + local_intervals.reserve(D_handles.size()); + + std::uint64_t prev_i = 0, prev_j = 0; + + for (auto const& dh : D_handles) + { + auto dit = std::find_if(D_layout.begin(), D_layout.end(), + [&dh](distributed_merge_slice_metadata const& s) { + return s.slice_id == dh.slice_id; + }); + HPX_ASSERT(dit != D_layout.end()); + + auto const [i0, j0] = co_rank(dit->global_begin, n1, n2, A_at, + B_at, comp, prev_i, dit->global_begin - prev_j); + auto const [i1, j1] = co_rank(dit->global_end, n1, n2, A_at, + B_at, comp, i0, dit->global_end - j0); + + local_intervals.push_back(distributed_merge_interval{ + dit->slice_id, dit->site_index, i0, i1, j0, j1}); + + prev_i = i1; + prev_j = j1; + } + + // --- Phase 3: Share intervals via all_gather --- + auto intervals_bn = ctx.basename("intervals"); + auto gathered = hpx::collectives::all_gather(hpx::launch::sync, + intervals_bn.c_str(), HPX_MOVE(local_intervals), + hpx::collectives::num_sites_arg(P), + hpx::collectives::this_site_arg(this_site), + hpx::collectives::generation_arg(ctx.next_gen("intervals")), + hpx::collectives::root_site_arg(0)); + + // Flatten gathered intervals + std::vector all_intervals; + { + std::uint64_t total = 0; + for (auto const& v : gathered) + total += v.size(); + all_intervals.reserve(total); + for (auto& v : gathered) + { + all_intervals.insert(all_intervals.end(), + std::make_move_iterator(v.begin()), + std::make_move_iterator(v.end())); + } + } + + // --- Phase 4: Pack payload batches --- + std::vector send_A(P); + std::vector send_B(P); + + pack_batches( + A_handles, all_intervals, + [](auto const& iv) { return iv.A_begin_rank; }, + [](auto const& iv) { return iv.A_end_rank; }, send_A); + pack_batches( + B_handles, all_intervals, + [](auto const& iv) { return iv.B_begin_rank; }, + [](auto const& iv) { return iv.B_end_rank; }, send_B); + + // --- Phase 5: Payload exchange --- + auto bn_A = ctx.basename("payload_A"); + auto bn_B = ctx.basename("payload_B"); + + auto recv_A = + hpx::collectives::all_to_all(hpx::launch::sync, bn_A.c_str(), + HPX_MOVE(send_A), hpx::collectives::num_sites_arg(P), + hpx::collectives::this_site_arg(this_site), + hpx::collectives::generation_arg(ctx.next_gen("payload_A")), + hpx::collectives::root_site_arg(0)); + + auto recv_B = + hpx::collectives::all_to_all(hpx::launch::sync, bn_B.c_str(), + HPX_MOVE(send_B), hpx::collectives::num_sites_arg(P), + hpx::collectives::this_site_arg(this_site), + hpx::collectives::generation_arg(ctx.next_gen("payload_B")), + hpx::collectives::root_site_arg(0)); + + // --- Phase 6: Resolve fragments and merge --- + // Each received batch contains a flat values buffer and + // lightweight descriptors. We resolve each descriptor to a + // pointer directly into the buffer (zero-copy), grouped by + // destination slice, then perform a local stable merge. + using rfrag1_t = resolved_fragment; + using rfrag2_t = resolved_fragment; + + std::unordered_map> A_by_dest; + std::unordered_map> B_by_dest; + + for (auto& batch : recv_A) + { + for (auto const& f : batch.fragments) + { + A_by_dest[f.dest_slice_id].push_back(rfrag1_t{ + f.input_global_begin, + batch.values.data() + f.value_offset, f.value_count}); + } + } + for (auto& batch : recv_B) + { + for (auto const& f : batch.fragments) + { + B_by_dest[f.dest_slice_id].push_back(rfrag2_t{ + f.input_global_begin, + batch.values.data() + f.value_offset, f.value_count}); + } + } + + // Merge into each owned destination slice + for (auto const& dh : D_handles) + { + merge_into_dest_impl( + dh, A_by_dest[dh.slice_id], B_by_dest[dh.slice_id], comp); + } + } + + struct action + : hpx::actions::make_action_t< + decltype(&distributed_merge_kernel::call), + &distributed_merge_kernel::call, action> + { + }; + }; + + // ----------------------------------------------------------------- + // Coordinator + // + // Runs on the calling locality. Decomposes the three segmented + // ranges into slice metadata + handle maps, determines which + // localities participate, then launches the SPMD kernel on each + // one via async actions. Waits for all kernels to complete and + // propagates any errors. + // ----------------------------------------------------------------- + + /////////////////////////////////////////////////////////////////////////// + // Coordinator: the main entry point for distributed segmented merge. + // Called from the tag_invoke overloads. + template + hpx::parallel::util::detail::algorithm_result_t + segmented_merge(ExPolicy&& /* policy */, SegIter1 first1, SegIter1 last1, + SegIter2 first2, SegIter2 last2, DestIter dest, Comp&& comp) + { + using traits1 = hpx::traits::segmented_iterator_traits; + using traits2 = hpx::traits::segmented_iterator_traits; + using traits3 = hpx::traits::segmented_iterator_traits; + + using local_iterator1 = typename traits1::local_iterator; + using local_iterator2 = typename traits2::local_iterator; + using local_iterator3 = typename traits3::local_iterator; + + using value_type1 = typename std::iterator_traits::value_type; + using value_type2 = typename std::iterator_traits::value_type; + using comp_type = std::decay_t; + + using result_type = + hpx::parallel::util::detail::algorithm_result; + + // Validate local raw iterators have random-access capability + using local_raw1 = + typename hpx::traits::segmented_local_iterator_traits< + local_iterator1>::local_raw_iterator; + using local_raw2 = + typename hpx::traits::segmented_local_iterator_traits< + local_iterator2>::local_raw_iterator; + using local_raw3 = + typename hpx::traits::segmented_local_iterator_traits< + local_iterator3>::local_raw_iterator; + + static_assert( + std::is_base_of_v::iterator_category>, + "distributed merge requires random-access local iterators " + "for the first input"); + static_assert( + std::is_base_of_v::iterator_category>, + "distributed merge requires random-access local iterators " + "for the second input"); + static_assert( + std::is_base_of_v::iterator_category>, + "distributed merge requires random-access local iterators " + "for the destination"); + + // Both empty, nothing to do. + if (first1 == last1 && first2 == last2) + { + return result_type::get(HPX_MOVE(dest)); + } + + // Decompose ranges into ordered slices + auto [A_layout, A_handles] = + distributed_merge_build_slices(first1, last1); + auto [B_layout, B_handles] = + distributed_merge_build_slices(first2, last2); + + auto const n1 = + static_cast(std::distance(first1, last1)); + auto const n2 = + static_cast(std::distance(first2, last2)); + auto const N = n1 + n2; + + auto dest_end = dest; + std::advance(dest_end, static_cast(N)); + + auto [D_layout, D_handles] = + distributed_merge_build_slices(dest, dest_end); + + if (D_layout.empty()) + { + return result_type::get(HPX_MOVE(dest_end)); + } + + // Determine participants and assign site indices + auto participants = + distributed_merge_collect_participants_and_assign_sites( + A_layout, B_layout, D_layout); + + // Create kernel action type and invocation namespace + using kernel_type = distributed_merge_kernel; + using action_type = typename kernel_type::action; + + std::string const ns = distributed_merge_make_ns(); + comp_type const comp_copy = HPX_FORWARD(Comp, comp); + + // Launch kernel on every participant + std::vector> futures; + futures.reserve(participants.size()); + + for (std::uint64_t site = 0; site != participants.size(); ++site) + { + auto const loc_id = + naming::get_locality_id_from_id(participants[site]); + + futures.push_back(hpx::async(action_type{}, participants[site], ns, + participants, site, A_layout, B_layout, D_layout, + distributed_merge_get_handles(A_handles, loc_id), + distributed_merge_get_handles(B_handles, loc_id), + distributed_merge_get_handles(D_handles, loc_id), n1, n2, + comp_copy)); + } + + // Collect errors from all participants + auto collect_errors = [](std::vector> fs) { + std::list errors; + for (auto& f : fs) + { + try + { + f.get(); + } + catch (...) + { + errors.push_back(std::current_exception()); + } + } + if (!errors.empty()) + { + throw hpx::exception_list(HPX_MOVE(errors)); + } + }; + + // Return based on execution policy + if constexpr (hpx::is_async_execution_policy_v>) + { + return result_type::get(hpx::when_all(HPX_MOVE(futures)) + .then([dest_end = HPX_MOVE(dest_end), + collect_errors = HPX_MOVE(collect_errors)]( + hpx::future>>&& + f) mutable -> DestIter { + collect_errors(f.get()); + return dest_end; + })); + } + else + { + collect_errors(hpx::when_all(HPX_MOVE(futures)).get()); + return result_type::get(HPX_MOVE(dest_end)); + } + } + +} // namespace hpx::parallel::detail + +HPX_IS_BITWISE_SERIALIZABLE( + hpx::parallel::detail::distributed_merge_slice_metadata) +HPX_IS_BITWISE_SERIALIZABLE(hpx::parallel::detail::distributed_merge_interval) +HPX_IS_BITWISE_SERIALIZABLE( + hpx::parallel::detail::distributed_merge_fragment_desc) diff --git a/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/merge.hpp b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/merge.hpp new file mode 100644 index 000000000000..ecf649dbc51d --- /dev/null +++ b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/merge.hpp @@ -0,0 +1,64 @@ +// Copyright (c) 2026 Abhishek Bansal +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#include +#include +#include + +#include +#include +#include + +namespace hpx::segmented { + + // clang-format off + template + requires(hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v && + hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v && + hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v && + hpx::is_invocable_v::value_type, + typename std::iterator_traits::value_type>) + DestIter tag_invoke(hpx::merge_t, InIter1 first1, InIter1 last1, + InIter2 first2, InIter2 last2, DestIter dest, Comp comp = Comp()) + // clang-format on + { + return hpx::parallel::detail::segmented_merge(hpx::execution::seq, + first1, last1, first2, last2, dest, HPX_MOVE(comp)); + } + + // clang-format off + template + requires(hpx::is_execution_policy_v && + hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v && + hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v && + hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v && + hpx::is_invocable_v::value_type, + typename std::iterator_traits::value_type>) + hpx::parallel::util::detail::algorithm_result_t + tag_invoke(hpx::merge_t, ExPolicy&& policy, InIter1 first1, + InIter1 last1, InIter2 first2, InIter2 last2, DestIter dest, + Comp comp = Comp()) + // clang-format on + { + return hpx::parallel::detail::segmented_merge( + HPX_FORWARD(ExPolicy, policy), first1, last1, first2, last2, dest, + HPX_MOVE(comp)); + } +} // namespace hpx::segmented diff --git a/libs/full/segmented_algorithms/tests/unit/CMakeLists.txt b/libs/full/segmented_algorithms/tests/unit/CMakeLists.txt index 370cf494d8ae..aecf38cb2ab8 100644 --- a/libs/full/segmented_algorithms/tests/unit/CMakeLists.txt +++ b/libs/full/segmented_algorithms/tests/unit/CMakeLists.txt @@ -22,6 +22,7 @@ set(tests partitioned_vector_iter partitioned_vector_max_element1 partitioned_vector_max_element2 + partitioned_vector_merge partitioned_vector_min_element1 partitioned_vector_min_element2 partitioned_vector_minmax_element1 @@ -104,3 +105,23 @@ foreach(test ${tests}) ) endforeach() + +# Multi-locality merge test (requires 4 localities) +set(partitioned_vector_merge_multi_FLAGS DEPENDENCIES + partitioned_vector_component +) + +add_hpx_executable( + partitioned_vector_merge_multi_test INTERNAL_FLAGS + SOURCES partitioned_vector_merge_multi.cpp + ${partitioned_vector_merge_multi_FLAGS} + EXCLUDE_FROM_ALL + HPX_PREFIX ${HPX_BUILD_PREFIX} + FOLDER "Tests/Unit/Modules/Full/SegmentedAlgorithms" +) + +add_hpx_unit_test( + "modules.segmented_algorithms" partitioned_vector_merge_multi + LOCALITIES 4 + THREADS_PER_LOCALITY 2 +) diff --git a/libs/full/segmented_algorithms/tests/unit/partitioned_vector_merge.cpp b/libs/full/segmented_algorithms/tests/unit/partitioned_vector_merge.cpp new file mode 100644 index 000000000000..e3114bf38b44 --- /dev/null +++ b/libs/full/segmented_algorithms/tests/unit/partitioned_vector_merge.cpp @@ -0,0 +1,941 @@ +// Copyright (c) 2026 Abhishek Bansal +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include + +#if !defined(HPX_COMPUTE_DEVICE_CODE) + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +/////////////////////////////////////////////////////////////////////////////// +// A tagged value type for testing merge stability (A-before-B on equal keys) +struct tagged_value +{ + int key = 0; + int tag = 0; // 0 = from A, 1 = from B + + friend bool operator==(tagged_value const& lhs, tagged_value const& rhs) + { + return lhs.key == rhs.key && lhs.tag == rhs.tag; + } + +private: + friend class hpx::serialization::access; + + template + void serialize(Archive& ar, unsigned) + { + ar & key & tag; + } +}; + +std::ostream& operator<<(std::ostream& os, tagged_value const& v) +{ + return os << '{' << v.key << ',' << v.tag << '}'; +} + +HPX_REGISTER_PARTITIONED_VECTOR_DECLARATION(tagged_value) +HPX_REGISTER_PARTITIONED_VECTOR(tagged_value) + +/////////////////////////////////////////////////////////////////////////////// +// Helpers +template +void assign_vector(hpx::partitioned_vector& v, std::vector const& data) +{ + HPX_TEST_EQ(v.size(), data.size()); + auto it = v.begin(); + for (T const& val : data) + { + *it++ = val; + } +} + +template +std::vector collect_vector(hpx::partitioned_vector const& v) +{ + std::vector result; + result.reserve(v.size()); + for (auto it = v.begin(); it != v.end(); ++it) + { + result.push_back(*it); + } + return result; +} + +template +void print_sequence(char const* label, std::vector const& values) +{ + std::cerr << label << ": "; + for (auto const& v : values) + { + std::cerr << v << ' '; + } + std::cerr << '\n'; +} + +// Verify merge result against std::merge reference, with diagnostics +template +void verify_merge(hpx::partitioned_vector const& D, + std::vector const& expected, char const* test_name) +{ + auto actual = collect_vector(D); + if (!(actual == expected) && hpx::get_locality_id() == 0) + { + std::cerr << "FAIL in " << test_name << ":\n"; + print_sequence(" actual ", actual); + print_sequence(" expected", expected); + } + HPX_TEST(actual == expected); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 1: Stability: A elements come before B on equal keys +template +void test_merge_stability(ExPolicy&& policy) +{ + std::vector localities = hpx::find_all_localities(); + + using vector_type = hpx::partitioned_vector; + + std::vector const A_data{ + {1, 0}, {2, 0}, {2, 0}, {3, 0}, {5, 0}, {5, 0}, {8, 0}, {13, 0}}; + std::vector const B_data{ + {1, 1}, {2, 1}, {2, 1}, {4, 1}, {5, 1}, {9, 1}, {13, 1}}; + + vector_type A(A_data.size(), hpx::container_layout(localities)); + vector_type B(B_data.size(), hpx::container_layout(localities)); + vector_type D( + A_data.size() + B_data.size(), hpx::container_layout(localities)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + auto comp = [](tagged_value const& lhs, tagged_value const& rhs) { + return lhs.key < rhs.key; + }; + + std::vector expected; + expected.reserve(D.size()); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected), comp); + + auto result = hpx::merge(HPX_FORWARD(ExPolicy, policy), A.begin(), A.end(), + B.begin(), B.end(), D.begin(), comp); + + if constexpr (hpx::is_async_execution_policy_v>) + { + HPX_TEST(result.get() == D.end()); + } + else + { + HPX_TEST(result == D.end()); + } + + verify_merge(D, expected, "test_merge_stability"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 2: No-policy overload +void test_merge_no_policy() +{ + std::vector localities = hpx::find_all_localities(); + + using vector_type = hpx::partitioned_vector; + + std::vector const A_data{ + {0, 0}, {1, 0}, {1, 0}, {4, 0}, {4, 0}, {10, 0}}; + std::vector const B_data{ + {1, 1}, {1, 1}, {2, 1}, {4, 1}, {9, 1}}; + + vector_type A(A_data.size(), hpx::container_layout(localities)); + vector_type B(B_data.size(), hpx::container_layout(localities)); + vector_type D( + A_data.size() + B_data.size(), hpx::container_layout(localities)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + auto comp = [](tagged_value const& lhs, tagged_value const& rhs) { + return lhs.key < rhs.key; + }; + + std::vector expected; + expected.reserve(D.size()); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected), comp); + + HPX_TEST(hpx::merge(A.begin(), A.end(), B.begin(), B.end(), D.begin(), + comp) == D.end()); + verify_merge(D, expected, "test_merge_no_policy"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 3: Mixed types (int + long long -> long long) +template +void test_merge_mixed_types(ExPolicy&& policy) +{ + std::vector localities = hpx::find_all_localities(); + + hpx::partitioned_vector A(6, hpx::container_layout(localities)); + hpx::partitioned_vector B(5, hpx::container_layout(localities)); + hpx::partitioned_vector D(11, hpx::container_layout(localities)); + + std::vector const A_data{1, 2, 2, 5, 8, 13}; + std::vector const B_data{0, 2, 3, 8, 21}; + + assign_vector(A, A_data); + assign_vector(B, B_data); + + auto comp = [](auto const& lhs, auto const& rhs) { return lhs < rhs; }; + + std::vector expected; + expected.reserve(D.size()); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected), comp); + + auto result = hpx::merge(HPX_FORWARD(ExPolicy, policy), A.begin(), A.end(), + B.begin(), B.end(), D.begin(), comp); + + if constexpr (hpx::is_async_execution_policy_v>) + { + HPX_TEST(result.get() == D.end()); + } + else + { + HPX_TEST(result == D.end()); + } + + verify_merge(D, expected, "test_merge_mixed_types"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 4: Empty inputs +template +void test_merge_empty_inputs(ExPolicy&& policy) +{ + std::vector localities = hpx::find_all_localities(); + auto layout = hpx::container_layout(localities); + + // Empty A, non-empty B + { + hpx::partitioned_vector A_empty(0, layout); + hpx::partitioned_vector B(5, layout); + hpx::partitioned_vector D(5, layout); + + std::vector const B_data{1, 2, 3, 5, 8}; + assign_vector(B, B_data); + + auto result = hpx::merge(HPX_FORWARD(ExPolicy, policy), A_empty.begin(), + A_empty.end(), B.begin(), B.end(), D.begin()); + + if constexpr (hpx::is_async_execution_policy_v>) + { + HPX_TEST(result.get() == D.end()); + } + else + { + HPX_TEST(result == D.end()); + } + + verify_merge(D, B_data, "test_merge_empty_A"); + } + + // Non-empty A, empty B + { + hpx::partitioned_vector A(4, layout); + hpx::partitioned_vector B_empty(0, layout); + hpx::partitioned_vector D(4, layout); + + std::vector const A_data{0, 1, 1, 2}; + assign_vector(A, A_data); + + auto result = hpx::merge(HPX_FORWARD(ExPolicy, policy), A.begin(), + A.end(), B_empty.begin(), B_empty.end(), D.begin()); + + if constexpr (hpx::is_async_execution_policy_v>) + { + HPX_TEST(result.get() == D.end()); + } + else + { + HPX_TEST(result == D.end()); + } + + verify_merge(D, A_data, "test_merge_empty_B"); + } +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 5: Both inputs empty +void test_merge_both_empty() +{ + std::vector localities = hpx::find_all_localities(); + auto layout = hpx::container_layout(localities); + + hpx::partitioned_vector A(0, layout); + hpx::partitioned_vector B(0, layout); + hpx::partitioned_vector D(0, layout); + + auto result = hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + HPX_TEST(result == D.end()); + HPX_TEST(D.size() == 0); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 6: Single element in each input +void test_merge_single_elements() +{ + std::vector localities = hpx::find_all_localities(); + auto layout = hpx::container_layout(localities); + + // A=[3], B=[1] -> D=[1,3] + { + hpx::partitioned_vector A(1, layout); + hpx::partitioned_vector B(1, layout); + hpx::partitioned_vector D(2, layout); + + assign_vector(A, std::vector{3}); + assign_vector(B, std::vector{1}); + + hpx::merge(hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), + D.begin()); + + verify_merge(D, std::vector{1, 3}, "test_merge_single_3_1"); + } + + // A=[1], B=[1] -> D=[1,1] (stability: A first) + { + using vector_type = hpx::partitioned_vector; + + vector_type A(1, layout); + vector_type B(1, layout); + vector_type D(2, layout); + + assign_vector(A, std::vector{{5, 0}}); + assign_vector(B, std::vector{{5, 1}}); + + auto comp = [](tagged_value const& lhs, tagged_value const& rhs) { + return lhs.key < rhs.key; + }; + + hpx::merge(hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), + D.begin(), comp); + + // Stability: A's element ({5,0}) must come before B's ({5,1}) + std::vector expected{{5, 0}, {5, 1}}; + verify_merge(D, expected, "test_merge_single_equal_stability"); + } +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 7: All duplicate keys: heavy stability test +void test_merge_all_duplicates() +{ + std::vector localities = hpx::find_all_localities(); + auto layout = hpx::container_layout(localities); + + using vector_type = hpx::partitioned_vector; + + // 6 elements in A all with key=7, 4 elements in B all with key=7 + std::vector A_data(6, {7, 0}); + std::vector B_data(4, {7, 1}); + + vector_type A(A_data.size(), layout); + vector_type B(B_data.size(), layout); + vector_type D(A_data.size() + B_data.size(), layout); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + auto comp = [](tagged_value const& lhs, tagged_value const& rhs) { + return lhs.key < rhs.key; + }; + + // Stable merge: all 6 A elements (tag=0) then all 4 B elements (tag=1) + std::vector expected; + expected.reserve(10); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected), comp); + + hpx::merge(hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), + D.begin(), comp); + + verify_merge(D, expected, "test_merge_all_duplicates"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 8: Non-overlapping ranges (A all less than B) +void test_merge_nonoverlapping_A_less() +{ + std::vector localities = hpx::find_all_localities(); + auto layout = hpx::container_layout(localities); + + std::vector const A_data{1, 2, 3, 4}; + std::vector const B_data{10, 11, 12, 13}; + + hpx::partitioned_vector A(A_data.size(), layout); + hpx::partitioned_vector B(B_data.size(), layout); + hpx::partitioned_vector D(A_data.size() + B_data.size(), layout); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + std::vector expected{1, 2, 3, 4, 10, 11, 12, 13}; + verify_merge(D, expected, "test_merge_nonoverlapping_A_less"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 9: Non-overlapping ranges (A all greater than B) +void test_merge_nonoverlapping_A_greater() +{ + std::vector localities = hpx::find_all_localities(); + auto layout = hpx::container_layout(localities); + + std::vector const A_data{10, 11, 12, 13}; + std::vector const B_data{1, 2, 3, 4}; + + hpx::partitioned_vector A(A_data.size(), layout); + hpx::partitioned_vector B(B_data.size(), layout); + hpx::partitioned_vector D(A_data.size() + B_data.size(), layout); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + std::vector expected{1, 2, 3, 4, 10, 11, 12, 13}; + verify_merge(D, expected, "test_merge_nonoverlapping_A_greater"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 10: Highly skewed sizes (A much larger than B) +void test_merge_skewed_sizes() +{ + std::vector localities = hpx::find_all_localities(); + auto layout = hpx::container_layout(localities); + + // A has 20 elements, B has 2 + std::vector A_data(20); + std::iota(A_data.begin(), A_data.end(), 0); // 0..19 + + std::vector const B_data{5, 15}; + + hpx::partitioned_vector A(A_data.size(), layout); + hpx::partitioned_vector B(B_data.size(), layout); + hpx::partitioned_vector D(A_data.size() + B_data.size(), layout); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(D.size()); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + verify_merge(D, expected, "test_merge_skewed_sizes"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 11: Default comparator (no custom comp: uses hpx::parallel::detail::less) +void test_merge_default_comp() +{ + std::vector localities = hpx::find_all_localities(); + auto layout = hpx::container_layout(localities); + + std::vector const A_data{1, 3, 5, 7, 9}; + std::vector const B_data{2, 4, 6, 8, 10}; + + hpx::partitioned_vector A(A_data.size(), layout); + hpx::partitioned_vector B(B_data.size(), layout); + hpx::partitioned_vector D(A_data.size() + B_data.size(), layout); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + // Call without explicit comparator: uses default less + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + std::vector expected{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + verify_merge(D, expected, "test_merge_default_comp"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 12: Non-contiguous A partitions on one locality +// With 3 partitions on 2 localities, round-robin gives: +// partition 0 -> loc0, partition 1 -> loc1, partition 2 -> loc0 +// So loc0 owns partitions 0 and 2 (non-contiguous in global order). +// This exercises the slice-based design where one locality has +// discontiguous pieces of a range. +void test_merge_noncontiguous_A() +{ + std::vector localities = hpx::find_all_localities(); + if (localities.size() < 2) + return; // need 2+ localities + + using vector_type = hpx::partitioned_vector; + + // A: 12 elements in 3 partitions (4 each) -> [loc0, loc1, loc0] + // loc0 owns A[0..4) and A[8..12): non-contiguous + std::vector const A_data{ + {1, 0}, {3, 0}, {5, 0}, {7, 0}, // partition 0 on loc0 + {2, 0}, {4, 0}, {6, 0}, {8, 0}, // partition 1 on loc1 + {9, 0}, {10, 0}, {11, 0}, {12, 0} // partition 2 on loc0 + }; + + // B: 6 elements in 2 partitions (3 each) -> [loc0, loc1] + std::vector const B_data{ + {2, 1}, {6, 1}, {10, 1}, // partition 0 on loc0 + {4, 1}, {8, 1}, {12, 1} // partition 1 on loc1 + }; + + vector_type A(A_data.size(), hpx::container_layout(3, localities)); + vector_type B(B_data.size(), hpx::container_layout(localities)); + vector_type D( + A_data.size() + B_data.size(), hpx::container_layout(localities)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + auto comp = [](tagged_value const& lhs, tagged_value const& rhs) { + return lhs.key < rhs.key; + }; + + std::vector expected; + expected.reserve(D.size()); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected), comp); + + hpx::merge(hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), + D.begin(), comp); + + verify_merge(D, expected, "test_merge_noncontiguous_A"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 13: Non-contiguous D partitions on one locality +// D has 3 partitions on 2 localities: [loc0, loc1, loc0] +// So loc0 writes to dest[0..N/3) and dest[2*N/3..N): non-contiguous. +void test_merge_noncontiguous_D() +{ + std::vector localities = hpx::find_all_localities(); + if (localities.size() < 2) + return; + + // A: 6 elements in 2 partitions -> [loc0, loc1] + std::vector const A_data{1, 3, 5, 7, 9, 11}; + + // B: 6 elements in 2 partitions -> [loc0, loc1] + std::vector const B_data{2, 4, 6, 8, 10, 12}; + + // D: 12 elements in 3 partitions (4 each) -> [loc0, loc1, loc0] + // loc0 owns D[0..4) and D[8..12): non-contiguous destination + hpx::partitioned_vector A( + A_data.size(), hpx::container_layout(localities)); + hpx::partitioned_vector B( + B_data.size(), hpx::container_layout(localities)); + hpx::partitioned_vector D( + A_data.size() + B_data.size(), hpx::container_layout(3, localities)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(D.size()); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + verify_merge(D, expected, "test_merge_noncontiguous_D"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 14: Different ownership: A only on loc0, B only on loc1, D on both +// This forces remote co-rank probes: loc0 must probe B data on loc1, +// and loc1 must probe A data on loc0. +void test_merge_different_ownership() +{ + std::vector localities = hpx::find_all_localities(); + if (localities.size() < 2) + return; + + std::vector loc0_only = {localities[0]}; + std::vector loc1_only = {localities[1]}; + + // A: 8 elements, single partition on loc0 + std::vector const A_data{1, 3, 5, 7, 9, 11, 13, 15}; + + // B: 6 elements, single partition on loc1 + std::vector const B_data{2, 4, 6, 8, 10, 12}; + + hpx::partitioned_vector A( + A_data.size(), hpx::container_layout(loc0_only)); + hpx::partitioned_vector B( + B_data.size(), hpx::container_layout(loc1_only)); + hpx::partitioned_vector D( + A_data.size() + B_data.size(), hpx::container_layout(localities)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(D.size()); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + verify_merge(D, expected, "test_merge_different_ownership"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 15: All three ranges on completely different localities +// A on loc0, B on loc1, D on loc0 (but D's owner is also an A owner, +// so it must fetch B data remotely for co-rank and for payload). +void test_merge_A_and_D_on_loc0_B_on_loc1() +{ + std::vector localities = hpx::find_all_localities(); + if (localities.size() < 2) + return; + + std::vector loc0_only = {localities[0]}; + std::vector loc1_only = {localities[1]}; + + std::vector const A_data{1, 4, 7, 10}; + std::vector const B_data{2, 5, 8, 11}; + + hpx::partitioned_vector A( + A_data.size(), hpx::container_layout(loc0_only)); + hpx::partitioned_vector B( + B_data.size(), hpx::container_layout(loc1_only)); + // D entirely on loc0: loc0 must receive all B payload from loc1 + hpx::partitioned_vector D( + A_data.size() + B_data.size(), hpx::container_layout(loc0_only)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(D.size()); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + verify_merge(D, expected, "test_merge_A_D_loc0_B_loc1"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 16: Multiple non-contiguous partitions for BOTH A and D +// A: 4 partitions on 2 locs -> [loc0, loc1, loc0, loc1] +// D: 3 partitions on 2 locs -> [loc0, loc1, loc0] +// Both A and D have discontiguous slices on loc0. +void test_merge_noncontiguous_A_and_D() +{ + std::vector localities = hpx::find_all_localities(); + if (localities.size() < 2) + return; + + using vector_type = hpx::partitioned_vector; + + // A: 8 elements in 4 partitions (2 each) + // [loc0, loc1, loc0, loc1]: loc0 has partitions {0,2}, loc1 has {1,3} + std::vector const A_data{ + {1, 0}, {5, 0}, // partition 0 on loc0 + {2, 0}, {6, 0}, // partition 1 on loc1 + {9, 0}, {13, 0}, // partition 2 on loc0 + {10, 0}, {14, 0} // partition 3 on loc1 + }; + + // B: 4 elements in 2 partitions (2 each) -> [loc0, loc1] + std::vector const B_data{ + {3, 1}, {7, 1}, // partition 0 on loc0 + {11, 1}, {15, 1} // partition 1 on loc1 + }; + + // D: 12 elements in 3 partitions (4 each) -> [loc0, loc1, loc0] + // loc0 has D partitions {0,2}: non-contiguous destination + vector_type A(A_data.size(), hpx::container_layout(4, localities)); + vector_type B(B_data.size(), hpx::container_layout(localities)); + vector_type D( + A_data.size() + B_data.size(), hpx::container_layout(3, localities)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + auto comp = [](tagged_value const& lhs, tagged_value const& rhs) { + return lhs.key < rhs.key; + }; + + std::vector expected; + expected.reserve(D.size()); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected), comp); + + hpx::merge(hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), + D.begin(), comp); + + verify_merge(D, expected, "test_merge_noncontiguous_A_and_D"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 17: Participant owns no slices of one or more ranges +// A on loc0, B on loc0, D on loc1 +// loc1 owns no A and no B slices, but owns all D slices. +// It must receive all payload from loc0. +// loc0 owns all input but no D slices: it sends everything, writes nothing. +void test_merge_participant_no_input_slices() +{ + std::vector localities = hpx::find_all_localities(); + if (localities.size() < 2) + return; + + std::vector loc0_only = {localities[0]}; + std::vector loc1_only = {localities[1]}; + + std::vector const A_data{1, 3, 5, 7}; + std::vector const B_data{2, 4, 6, 8}; + + hpx::partitioned_vector A( + A_data.size(), hpx::container_layout(loc0_only)); + hpx::partitioned_vector B( + B_data.size(), hpx::container_layout(loc0_only)); + // D entirely on loc1 + hpx::partitioned_vector D( + A_data.size() + B_data.size(), hpx::container_layout(loc1_only)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(D.size()); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + verify_merge(D, expected, "test_merge_participant_no_input_slices"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 18: Interleaved data with many duplicates at boundary +void test_merge_boundary_duplicates() +{ + std::vector localities = hpx::find_all_localities(); + auto layout = hpx::container_layout(localities); + + using vector_type = hpx::partitioned_vector; + + // Keys cluster at value 5: tests co-rank probe behavior + // around dense duplicate regions + std::vector const A_data{ + {1, 0}, {5, 0}, {5, 0}, {5, 0}, {5, 0}, {10, 0}}; + std::vector const B_data{ + {3, 1}, {5, 1}, {5, 1}, {5, 1}, {7, 1}}; + + vector_type A(A_data.size(), layout); + vector_type B(B_data.size(), layout); + vector_type D(A_data.size() + B_data.size(), layout); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + auto comp = [](tagged_value const& lhs, tagged_value const& rhs) { + return lhs.key < rhs.key; + }; + + std::vector expected; + expected.reserve(D.size()); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected), comp); + + hpx::merge(hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), + D.begin(), comp); + + verify_merge(D, expected, "test_merge_boundary_duplicates"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 19: Larger data set: stress test with many elements +void test_merge_larger_data() +{ + std::vector localities = hpx::find_all_localities(); + auto layout = hpx::container_layout(localities); + + // Generate sorted data: A = even numbers, B = odd numbers + std::size_t const N_A = 100; + std::size_t const N_B = 80; + + std::vector A_data(N_A); + std::vector B_data(N_B); + + for (std::size_t i = 0; i < N_A; ++i) + A_data[i] = static_cast(2 * i); // 0, 2, 4, ..., 198 + + for (std::size_t i = 0; i < N_B; ++i) + B_data[i] = static_cast(2 * i + 1); // 1, 3, 5, ..., 159 + + hpx::partitioned_vector A(N_A, layout); + hpx::partitioned_vector B(N_B, layout); + hpx::partitioned_vector D(N_A + N_B, layout); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(N_A + N_B); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + verify_merge(D, expected, "test_merge_larger_data"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 20: Larger data with non-contiguous partitions (multi-segment stress) +void test_merge_larger_noncontiguous() +{ + std::vector localities = hpx::find_all_localities(); + if (localities.size() < 2) + return; + + // A: 60 elements in 3 partitions (20 each) -> [loc0, loc1, loc0] + std::size_t const N_A = 60; + std::vector A_data(N_A); + for (std::size_t i = 0; i < N_A; ++i) + A_data[i] = static_cast(i * 3); // 0, 3, 6, ..., 177 + + // B: 40 elements in 2 partitions (20 each) -> [loc0, loc1] + std::size_t const N_B = 40; + std::vector B_data(N_B); + for (std::size_t i = 0; i < N_B; ++i) + B_data[i] = static_cast(i * 3 + 1); // 1, 4, 7, ..., 118 + + // D: 100 elements in 3 partitions -> [loc0, loc1, loc0] + // D sizes must divide: we need to pick something safe + // 100 / 3 doesn't divide evenly, so use 4 partitions: 100/4 = 25 + hpx::partitioned_vector A(N_A, hpx::container_layout(3, localities)); + hpx::partitioned_vector B(N_B, hpx::container_layout(localities)); + hpx::partitioned_vector D( + N_A + N_B, hpx::container_layout(4, localities)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(N_A + N_B); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + verify_merge(D, expected, "test_merge_larger_noncontiguous"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 21: par(task) with non-contiguous partitions +// Ensures the async return path works with complex layouts. +void test_merge_async_noncontiguous() +{ + std::vector localities = hpx::find_all_localities(); + if (localities.size() < 2) + return; + + std::vector const A_data{1, 4, 7, 10, 13, 16}; + std::vector const B_data{2, 5, 8, 11, 14, 17}; + + // A: 3 partitions [loc0, loc1, loc0], B: 2 partitions, D: 3 partitions + hpx::partitioned_vector A( + A_data.size(), hpx::container_layout(3, localities)); + hpx::partitioned_vector B( + B_data.size(), hpx::container_layout(localities)); + hpx::partitioned_vector D( + A_data.size() + B_data.size(), hpx::container_layout(3, localities)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(D.size()); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + auto result = hpx::merge(hpx::execution::par(hpx::execution::task), + A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + HPX_TEST(result.get() == D.end()); + verify_merge(D, expected, "test_merge_async_noncontiguous"); +} + +/////////////////////////////////////////////////////////////////////////////// +int main() +{ + using namespace hpx::execution; + + // Basic tests (all locality counts) + test_merge_no_policy(); + test_merge_stability(seq); + test_merge_stability(par); + test_merge_stability(par(task)); + test_merge_mixed_types(seq); + test_merge_mixed_types(par(task)); + test_merge_empty_inputs(seq); + test_merge_empty_inputs(par(task)); + test_merge_both_empty(); + test_merge_single_elements(); + test_merge_all_duplicates(); + test_merge_nonoverlapping_A_less(); + test_merge_nonoverlapping_A_greater(); + test_merge_skewed_sizes(); + test_merge_default_comp(); + test_merge_boundary_duplicates(); + test_merge_larger_data(); + + // Multi-locality tests (require 2+ localities, skip on single) + test_merge_noncontiguous_A(); + test_merge_noncontiguous_D(); + test_merge_different_ownership(); + test_merge_A_and_D_on_loc0_B_on_loc1(); + test_merge_noncontiguous_A_and_D(); + test_merge_participant_no_input_slices(); + test_merge_larger_noncontiguous(); + test_merge_async_noncontiguous(); + + return hpx::util::report_errors(); +} + +#endif diff --git a/libs/full/segmented_algorithms/tests/unit/partitioned_vector_merge_multi.cpp b/libs/full/segmented_algorithms/tests/unit/partitioned_vector_merge_multi.cpp new file mode 100644 index 000000000000..41ff3f1b2107 --- /dev/null +++ b/libs/full/segmented_algorithms/tests/unit/partitioned_vector_merge_multi.cpp @@ -0,0 +1,528 @@ +// Copyright (c) 2026 Abhishek Bansal +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +// Tests for distributed merge with 4+ localities. +// These exercise complex multi-locality distribution patterns that +// cannot be tested with only 2 localities. +// +// NOTE: container_layout(N, locs) where N is not a multiple of +// locs.size() triggers a partitioned_vector construction bug. +// All tests use partition counts that are multiples of the locality +// count to work around this. + +#include + +#if !defined(HPX_COMPUTE_DEVICE_CODE) + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +/////////////////////////////////////////////////////////////////////////////// +struct tagged_value +{ + int key = 0; + int tag = 0; + + friend bool operator==(tagged_value const& lhs, tagged_value const& rhs) + { + return lhs.key == rhs.key && lhs.tag == rhs.tag; + } + +private: + friend class hpx::serialization::access; + + template + void serialize(Archive& ar, unsigned) + { + ar & key & tag; + } +}; + +std::ostream& operator<<(std::ostream& os, tagged_value const& v) +{ + return os << '{' << v.key << ',' << v.tag << '}'; +} + +HPX_REGISTER_PARTITIONED_VECTOR_DECLARATION(tagged_value) +HPX_REGISTER_PARTITIONED_VECTOR(tagged_value) + +/////////////////////////////////////////////////////////////////////////////// +template +void assign_vector(hpx::partitioned_vector& v, std::vector const& data) +{ + HPX_TEST_EQ(v.size(), data.size()); + auto it = v.begin(); + for (T const& val : data) + { + *it++ = val; + } +} + +template +std::vector collect_vector(hpx::partitioned_vector const& v) +{ + std::vector result; + result.reserve(v.size()); + for (auto it = v.begin(); it != v.end(); ++it) + { + result.push_back(*it); + } + return result; +} + +template +void print_sequence(char const* label, std::vector const& values) +{ + std::cerr << label << ": "; + for (auto const& v : values) + { + std::cerr << v << ' '; + } + std::cerr << '\n'; +} + +template +void verify_merge(hpx::partitioned_vector const& D, + std::vector const& expected, char const* test_name) +{ + auto actual = collect_vector(D); + if (!(actual == expected) && hpx::get_locality_id() == 0) + { + std::cerr << "FAIL in " << test_name << ":\n"; + print_sequence(" actual ", actual); + print_sequence(" expected", expected); + } + HPX_TEST(actual == expected); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 1: Each range on a completely disjoint set of localities +// A on {loc0, loc1}, B on {loc2, loc3}, D on all four. +void test_merge_disjoint_ownership() +{ + auto locs = hpx::find_all_localities(); + HPX_TEST(locs.size() >= 4); + if (locs.size() < 4) + return; + + std::vector locs_01 = {locs[0], locs[1]}; + std::vector locs_23 = {locs[2], locs[3]}; + + std::vector A_data(10); + for (std::size_t i = 0; i < 10; ++i) + A_data[i] = static_cast(2 * i); // 0,2,4,...,18 + + std::vector B_data(10); + for (std::size_t i = 0; i < 10; ++i) + B_data[i] = static_cast(2 * i + 1); // 1,3,5,...,19 + + hpx::partitioned_vector A( + A_data.size(), hpx::container_layout(locs_01)); + hpx::partitioned_vector B( + B_data.size(), hpx::container_layout(locs_23)); + hpx::partitioned_vector D( + A_data.size() + B_data.size(), hpx::container_layout(locs)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(20); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + verify_merge(D, expected, "test_merge_disjoint_ownership"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 2: Non-contiguous A and D across 4 localities +// A: 8 partitions on 4 localities, each locality owns 2 non-adjacent +// partitions (e.g. loc0 owns partitions {0, 4}). +// D: also 8 partitions to test non-contiguous destination writes. +void test_merge_noncontiguous_4loc() +{ + auto locs = hpx::find_all_localities(); + HPX_TEST(locs.size() >= 4); + if (locs.size() < 4) + return; + + // A: 24 elements in 8 partitions (3 each), round-robin + // loc0: partitions {0,4}, loc1: {1,5}, loc2: {2,6}, loc3: {3,7} + std::vector A_data(24); + for (std::size_t i = 0; i < 24; ++i) + A_data[i] = static_cast(2 * i); // 0,2,...,46 + + // B: 8 elements, 4 partitions (2 each), one per locality + std::vector B_data{1, 5, 9, 13, 17, 21, 25, 29}; + + hpx::partitioned_vector A( + A_data.size(), hpx::container_layout(8, locs)); + hpx::partitioned_vector B(B_data.size(), hpx::container_layout(locs)); + // D: 32 elements in 8 partitions (4 each) + hpx::partitioned_vector D( + A_data.size() + B_data.size(), hpx::container_layout(8, locs)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(32); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + verify_merge(D, expected, "test_merge_noncontiguous_4loc"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 3: A on one locality, B on another, D on two others +// All three ranges have completely different owners. +void test_merge_three_disjoint_owner_sets() +{ + auto locs = hpx::find_all_localities(); + HPX_TEST(locs.size() >= 4); + if (locs.size() < 4) + return; + + std::vector loc0 = {locs[0]}; + std::vector loc1 = {locs[1]}; + std::vector locs_23 = {locs[2], locs[3]}; + + std::vector const A_data{1, 4, 7, 10, 13, 16}; + std::vector const B_data{2, 5, 8, 11, 14, 17}; + + hpx::partitioned_vector A(A_data.size(), hpx::container_layout(loc0)); + hpx::partitioned_vector B(B_data.size(), hpx::container_layout(loc1)); + hpx::partitioned_vector D( + A_data.size() + B_data.size(), hpx::container_layout(locs_23)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(12); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + verify_merge(D, expected, "test_merge_three_disjoint_owner_sets"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 4: Many small partitions across 4 localities +// 8 partitions per range = 2 per locality. Tests the algorithm with +// many segments and many intervals in the all_gather/all_to_all. +void test_merge_many_partitions_4loc() +{ + auto locs = hpx::find_all_localities(); + HPX_TEST(locs.size() >= 4); + if (locs.size() < 4) + return; + + std::vector A_data(24); + for (std::size_t i = 0; i < 24; ++i) + A_data[i] = static_cast(2 * i); + + std::vector B_data(24); + for (std::size_t i = 0; i < 24; ++i) + B_data[i] = static_cast(2 * i + 1); + + hpx::partitioned_vector A( + A_data.size(), hpx::container_layout(8, locs)); + hpx::partitioned_vector B( + B_data.size(), hpx::container_layout(8, locs)); + hpx::partitioned_vector D( + A_data.size() + B_data.size(), hpx::container_layout(8, locs)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(48); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + verify_merge(D, expected, "test_merge_many_partitions_4loc"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 5: Stability with 4 localities and tagged values +// Heavy duplicate keys spread across multiple localities. +void test_merge_stability_4loc() +{ + auto locs = hpx::find_all_localities(); + HPX_TEST(locs.size() >= 4); + if (locs.size() < 4) + return; + + using vector_type = hpx::partitioned_vector; + + // Both A and B must be globally sorted by key across all partitions. + // A: 16 elements, 4 partitions (4 each), one per locality + std::vector A_data{ + {1, 0}, {2, 0}, {3, 0}, {4, 0}, // loc0 + {5, 0}, {6, 0}, {7, 0}, {8, 0}, // loc1 + {9, 0}, {10, 0}, {11, 0}, {12, 0}, // loc2 + {13, 0}, {14, 0}, {15, 0}, {16, 0} // loc3 + }; + + // B: 8 elements, 4 partitions (2 each): duplicate keys with A + std::vector B_data{ + {1, 1}, {2, 1}, // loc0 + {5, 1}, {6, 1}, // loc1 + {9, 1}, {10, 1}, // loc2 + {13, 1}, {14, 1} // loc3 + }; + + vector_type A(A_data.size(), hpx::container_layout(locs)); + vector_type B(B_data.size(), hpx::container_layout(locs)); + vector_type D(A_data.size() + B_data.size(), hpx::container_layout(locs)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + auto comp = [](tagged_value const& lhs, tagged_value const& rhs) { + return lhs.key < rhs.key; + }; + + std::vector expected; + expected.reserve(24); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected), comp); + + hpx::merge(hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), + D.begin(), comp); + + verify_merge(D, expected, "test_merge_stability_4loc"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 6: Skewed: one locality has all the data, others have nothing +// A on loc0 only, B on loc0 only, D spread across all 4. +void test_merge_single_source_multi_dest() +{ + auto locs = hpx::find_all_localities(); + HPX_TEST(locs.size() >= 4); + if (locs.size() < 4) + return; + + std::vector loc0 = {locs[0]}; + + std::vector const A_data{1, 3, 5, 7, 9, 11, 13, 15}; + std::vector const B_data{2, 4, 6, 8, 10, 12, 14, 16}; + + hpx::partitioned_vector A(A_data.size(), hpx::container_layout(loc0)); + hpx::partitioned_vector B(B_data.size(), hpx::container_layout(loc0)); + hpx::partitioned_vector D( + A_data.size() + B_data.size(), hpx::container_layout(locs)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(16); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + verify_merge(D, expected, "test_merge_single_source_multi_dest"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 7: Reverse: D on one locality, A and B spread across 4. +// loc0 is the sole dest owner, must receive payload from all others. +void test_merge_multi_source_single_dest() +{ + auto locs = hpx::find_all_localities(); + HPX_TEST(locs.size() >= 4); + if (locs.size() < 4) + return; + + std::vector loc0 = {locs[0]}; + + std::vector A_data(16); + for (std::size_t i = 0; i < 16; ++i) + A_data[i] = static_cast(2 * i); + + std::vector B_data(16); + for (std::size_t i = 0; i < 16; ++i) + B_data[i] = static_cast(2 * i + 1); + + hpx::partitioned_vector A(A_data.size(), hpx::container_layout(locs)); + hpx::partitioned_vector B(B_data.size(), hpx::container_layout(locs)); + hpx::partitioned_vector D( + A_data.size() + B_data.size(), hpx::container_layout(loc0)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(32); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + verify_merge(D, expected, "test_merge_multi_source_single_dest"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 8: par(task) with 4 localities: async path +void test_merge_async_4loc() +{ + auto locs = hpx::find_all_localities(); + HPX_TEST(locs.size() >= 4); + if (locs.size() < 4) + return; + + std::vector A_data(20); + std::vector B_data(20); + for (std::size_t i = 0; i < 20; ++i) + { + A_data[i] = static_cast(2 * i); + B_data[i] = static_cast(2 * i + 1); + } + + hpx::partitioned_vector A(A_data.size(), hpx::container_layout(locs)); + hpx::partitioned_vector B(B_data.size(), hpx::container_layout(locs)); + hpx::partitioned_vector D( + A_data.size() + B_data.size(), hpx::container_layout(locs)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(40); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + auto result = hpx::merge(hpx::execution::par(hpx::execution::task), + A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + HPX_TEST(result.get() == D.end()); + verify_merge(D, expected, "test_merge_async_4loc"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 9: Larger stress test: 200 elements, many partitions, 4 locs +void test_merge_stress_4loc() +{ + auto locs = hpx::find_all_localities(); + HPX_TEST(locs.size() >= 4); + if (locs.size() < 4) + return; + + std::size_t const N_A = 120; + std::size_t const N_B = 80; + + std::vector A_data(N_A); + std::vector B_data(N_B); + + for (std::size_t i = 0; i < N_A; ++i) + A_data[i] = static_cast(i * 3); + + for (std::size_t i = 0; i < N_B; ++i) + B_data[i] = static_cast(i * 3 + 1); + + // 8 partitions round-robin across 4 locs = 2 per loc per range + hpx::partitioned_vector A(N_A, hpx::container_layout(8, locs)); + hpx::partitioned_vector B(N_B, hpx::container_layout(8, locs)); + hpx::partitioned_vector D(N_A + N_B, hpx::container_layout(8, locs)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(N_A + N_B); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + verify_merge(D, expected, "test_merge_stress_4loc"); +} + +/////////////////////////////////////////////////////////////////////////////// +// Test 10: Skewed sizes with non-contiguous partitions +// A: large (32 elems, 8 partitions), B: small (4 elems, 4 partitions) +// Tests co-rank when one input dominates. +void test_merge_skewed_noncontiguous() +{ + auto locs = hpx::find_all_localities(); + HPX_TEST(locs.size() >= 4); + if (locs.size() < 4) + return; + + std::vector A_data(32); + for (std::size_t i = 0; i < 32; ++i) + A_data[i] = static_cast(i); // 0..31 + + std::vector B_data{5, 15, 25, 35}; + + hpx::partitioned_vector A( + A_data.size(), hpx::container_layout(8, locs)); + hpx::partitioned_vector B(B_data.size(), hpx::container_layout(locs)); + hpx::partitioned_vector D( + A_data.size() + B_data.size(), hpx::container_layout(locs)); + + assign_vector(A, A_data); + assign_vector(B, B_data); + + std::vector expected; + expected.reserve(36); + std::merge(A_data.begin(), A_data.end(), B_data.begin(), B_data.end(), + std::back_inserter(expected)); + + hpx::merge( + hpx::execution::seq, A.begin(), A.end(), B.begin(), B.end(), D.begin()); + + verify_merge(D, expected, "test_merge_skewed_noncontiguous"); +} + +/////////////////////////////////////////////////////////////////////////////// +int main() +{ + test_merge_disjoint_ownership(); + test_merge_noncontiguous_4loc(); + test_merge_three_disjoint_owner_sets(); + test_merge_many_partitions_4loc(); + test_merge_stability_4loc(); + test_merge_single_source_multi_dest(); + test_merge_multi_source_single_dest(); + test_merge_async_4loc(); + test_merge_stress_4loc(); + test_merge_skewed_noncontiguous(); + + return hpx::util::report_errors(); +} + +#endif