add support for multigraph to disjoint sampling#5520
Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
alexbarghi-nv
left a comment
There was a problem hiding this comment.
Tests are passing for disjoint sampling now - approved.
seunghwak
left a comment
There was a problem hiding this comment.
Review the de-duplicate function first.
If I understood the problem correctly, I think we can simplify this code quite a bit. Let me know if I misunderstood the problem.
| std::optional<rmm::device_uvector<int>> keep_ranks{std::nullopt}; | ||
| rmm::device_uvector<vertex_t> keep_majors(result_minors.size(), handle.get_stream()); | ||
| std::optional<rmm::device_uvector<int32_t>> keep_labels{std::nullopt}; | ||
| rmm::device_uvector<vertex_t> keep_minors(result_minors.size(), handle.get_stream()); |
There was a problem hiding this comment.
Something very tedious, but why define keep_labels before keep_minors? This violates (majors, minors, labels) ordering before here.
| // index so it tie-breaks rows that share the same (label, minor). | ||
| rmm::device_uvector<vertex_t> local_positions(keep_minors.size(), handle.get_stream()); | ||
| thrust::sequence( | ||
| handle.get_thrust_policy(), local_positions.begin(), local_positions.end(), size_t{0}); |
There was a problem hiding this comment.
In single GPU, local_positions == keep_positions, so isn't this just a duplicate?
| local_positions.end(), | ||
| keep_positions.data(), | ||
| tmp.begin()); | ||
| keep_positions = std::move(tmp); |
There was a problem hiding this comment.
In SG, this is just same as keep_positions = std::move(local_positions), right?
| std::optional<rmm::device_uvector<int32_t>>&& result_labels, | ||
| bool call_from_sampling) | ||
| std::optional<rmm::device_uvector<int32_t>>&& result_labels) | ||
| { |
There was a problem hiding this comment.
If I understood correctly, we want to keep just one edge per (label, minor) or minor (if no label).
I think this code is overly complicated for this purpose.
In SG, what we need is
auto key_pair_first = thrust::make_zip_iterator(result_labels->begin(), result_minors.begin());
thrust::sort_by_key(
handle.get_thrust_policy(),
key_pair_first, key_pair_first + result_labels->size(),
thrust::make_zip_iterator(result_majors.begin(), tmp_edge_indices->begin()));
auto [keep_count, keep_flags] =
detail::mark_entries(handle,
result_labels->size(),
detail::is_first_in_run_t<decltype(key_pair_first)>{key_pair_first});
thrust::partition(...); // use keep_flags as stencil
copy the second half to discard vectors, then the first half becomes keep vectors.
We need an additional step to discard duplicate (label, minor) edges across GPUs.
If we want to minimize the communication volume,
we can sort the keep (label, minor) pairs by the owning GPU ID.
We call shuffle_values which return rx_counts as well. We update keep_flags, send the keep flags back using rx_counts as tx_counts. Using the keep flags, discard the original (label, minor) pairs. Go back to the first half, keep in the first half only if the pair is in the survived (label, minor) pair. Move to the discard partition otherwise.
Or if you want something simpler,
shuffle (label, minor, rank) triplets based on the minor's vertex partition ID. Run, sort and unique to keep only one rank value per (label, minor). Shuffle back based on the rank. Now you have survived (label, minor) pairs. This involves additional rank values, and need to shuffle back triplets instead of just flags, but might be simpler.
| rmm::device_uvector<size_t> carryover_frontier_capacity(0, handle.get_stream()); | ||
|
|
||
| cugraph::key_bucket_view_t<vertex_t, tag_t, multi_gpu, false> active_bucket_view = | ||
| key_bucket_view; |
There was a problem hiding this comment.
auto active_key_bucket_view = key_bucket_view;
| Ks, | ||
| active_major_labels, | ||
| with_replacement); | ||
| } |
There was a problem hiding this comment.
else {} here can't happen (as bias should be float or double), right?
Better document this,
else {
CUGRAPH_FAIL("should not be reached.");
}
|
|
||
| if (discarded_majors.size() != 0) { | ||
| size_t const num_types = Ks.size(); | ||
| CUGRAPH_EXPECTS(num_types >= 1, "Ks must be non-empty."); |
There was a problem hiding this comment.
Should we better check this at the beginning of this function?
| carryover_frontier_capacity = std::move(agg_counts); | ||
| } else { | ||
| CUGRAPH_EXPECTS( | ||
| std::holds_alternative<rmm::device_uvector<int32_t>>(discarded_tmp_indices), |
There was a problem hiding this comment.
We defined edge_type_t at the beginning of this function, shouldn't we use it?
| if (agg_labels) { | ||
| thrust::sort(handle.get_thrust_policy(), | ||
| thrust::make_zip_iterator(agg_labels->begin(), agg_majors.begin()), | ||
| thrust::make_zip_iterator(agg_labels->end(), agg_majors.end())); | ||
| } else { | ||
| cugraph::detail::sort_ints( | ||
| handle, raft::device_span<vertex_t>{agg_majors.data(), agg_majors.size()}); | ||
| } |
There was a problem hiding this comment.
This code is common regardless of num_types, should we better sort before the if else block.
Adds support for multigraph to the disjoint sampling implementation. Changes include:
Closes #5500