Skip to content

Commit

Permalink
Address FIXMEs (#3988)
Browse files Browse the repository at this point in the history
This PR works on addressing FIXMEs (and reduce the number of outstanding FIXMEs).

Authors:
  - Seunghwa Kang (https://github.com/seunghwak)
  - Naim (https://github.com/naimnv)
  - Ralph Liu (https://github.com/nv-rliu)

Approvers:
  - Naim (https://github.com/naimnv)
  - Joseph Nke (https://github.com/jnke2016)
  - Chuck Hastings (https://github.com/ChuckHastings)

URL: #3988
  • Loading branch information
seunghwak authored Nov 20, 2023
1 parent 0f28b2e commit d34e3d6
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 115 deletions.
45 changes: 0 additions & 45 deletions cpp/include/cugraph/algorithms.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,51 +464,6 @@ k_truss_subgraph(raft::handle_t const& handle,
size_t number_of_vertices,
int k);

// FIXME: Internally distances is of int (signed 32-bit) data type, but current
// template uses data from VT, ET, WT from the legacy::GraphCSR View even if weights
// are not considered
/**
* @Synopsis Performs a breadth first search traversal of a graph starting from a vertex.
*
* @throws cugraph::logic_error with a custom message when an error occurs.
*
* @tparam VT Type of vertex identifiers. Supported value : int (signed,
* 32-bit)
* @tparam ET Type of edge identifiers. Supported value : int (signed,
* 32-bit)
* @tparam WT Type of edge weights. Supported values : int (signed, 32-bit)
*
* @param[in] handle Library handle (RAFT). If a communicator is set in the handle,
the multi GPU version will be selected.
* @param[in] graph cuGraph graph descriptor, should contain the connectivity
* information as a CSR
*
* @param[out] distances If set to a valid pointer, this is populated by distance of
* every vertex in the graph from the starting vertex
*
* @param[out] predecessors If set to a valid pointer, this is populated by bfs traversal
* predecessor of every vertex
*
* @param[out] sp_counters If set to a valid pointer, this is populated by bfs traversal
* shortest_path counter of every vertex
*
* @param[in] start_vertex The starting vertex for breadth first search traversal
*
* @param[in] directed Treat the input graph as directed
*
* @param[in] mg_batch If set to true use SG BFS path when comms are initialized.
*
*/
template <typename VT, typename ET, typename WT>
void bfs(raft::handle_t const& handle,
legacy::GraphCSRView<VT, ET, WT> const& graph,
VT* distances,
VT* predecessors,
double* sp_counters,
const VT start_vertex,
bool directed = true,
bool mg_batch = false);

/**
* @brief Compute Hungarian algorithm on a weighted bipartite graph
*
Expand Down
8 changes: 1 addition & 7 deletions cpp/include/cugraph/utilities/device_comm.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -806,9 +806,6 @@ device_sendrecv(raft::comms::comms_t const& comm,
size_t constexpr tuple_size =
thrust::tuple_size<typename thrust::iterator_traits<InputIterator>::value_type>::value;

// FIXME: NCCL 2.7 supports only one ncclSend and one ncclRecv for a source rank and destination
// rank inside ncclGroupStart/ncclGroupEnd, so we cannot place this inside
// ncclGroupStart/ncclGroupEnd, this restriction will be lifted in NCCL 2.8
detail::device_sendrecv_tuple_iterator_element_impl<InputIterator,
OutputIterator,
size_t{0},
Expand Down Expand Up @@ -866,9 +863,6 @@ device_multicast_sendrecv(raft::comms::comms_t const& comm,
size_t constexpr tuple_size =
thrust::tuple_size<typename thrust::iterator_traits<InputIterator>::value_type>::value;

// FIXME: NCCL 2.7 supports only one ncclSend and one ncclRecv for a source rank and destination
// rank inside ncclGroupStart/ncclGroupEnd, so we cannot place this inside
// ncclGroupStart/ncclGroupEnd, this restriction will be lifted in NCCL 2.8
detail::device_multicast_sendrecv_tuple_iterator_element_impl<InputIterator,
OutputIterator,
size_t{0},
Expand Down
98 changes: 74 additions & 24 deletions cpp/include/cugraph/utilities/host_scalar_comm.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -254,19 +254,11 @@ template <typename T>
std::enable_if_t<std::is_arithmetic<T>::value, std::vector<T>> host_scalar_allgather(
raft::comms::comms_t const& comm, T input, cudaStream_t stream)
{
std::vector<size_t> rx_counts(comm.get_size(), size_t{1});
std::vector<size_t> displacements(rx_counts.size(), size_t{0});
std::iota(displacements.begin(), displacements.end(), size_t{0});
rmm::device_uvector<T> d_outputs(rx_counts.size(), stream);
rmm::device_uvector<T> d_outputs(comm.get_size(), stream);
raft::update_device(d_outputs.data() + comm.get_rank(), &input, 1, stream);
// FIXME: better use allgather
comm.allgatherv(d_outputs.data() + comm.get_rank(),
d_outputs.data(),
rx_counts.data(),
displacements.data(),
stream);
std::vector<T> h_outputs(rx_counts.size());
raft::update_host(h_outputs.data(), d_outputs.data(), rx_counts.size(), stream);
comm.allgather(d_outputs.data() + comm.get_rank(), d_outputs.data(), size_t{1}, stream);
std::vector<T> h_outputs(d_outputs.size());
raft::update_host(h_outputs.data(), d_outputs.data(), d_outputs.size(), stream);
auto status = comm.sync_stream(stream);
CUGRAPH_EXPECTS(status == raft::comms::status_t::SUCCESS, "sync_stream() failure.");
return h_outputs;
Expand All @@ -277,11 +269,6 @@ std::enable_if_t<cugraph::is_thrust_tuple_of_arithmetic<T>::value, std::vector<T
host_scalar_allgather(raft::comms::comms_t const& comm, T input, cudaStream_t stream)
{
size_t constexpr tuple_size = thrust::tuple_size<T>::value;
std::vector<size_t> rx_counts(comm.get_size(), tuple_size);
std::vector<size_t> displacements(rx_counts.size(), size_t{0});
for (size_t i = 0; i < displacements.size(); ++i) {
displacements[i] = i * tuple_size;
}
std::vector<int64_t> h_tuple_scalar_elements(tuple_size);
rmm::device_uvector<int64_t> d_allgathered_tuple_scalar_elements(comm.get_size() * tuple_size,
stream);
Expand All @@ -292,12 +279,10 @@ host_scalar_allgather(raft::comms::comms_t const& comm, T input, cudaStream_t st
h_tuple_scalar_elements.data(),
tuple_size,
stream);
// FIXME: better use allgather
comm.allgatherv(d_allgathered_tuple_scalar_elements.data() + comm.get_rank() * tuple_size,
d_allgathered_tuple_scalar_elements.data(),
rx_counts.data(),
displacements.data(),
stream);
comm.allgather(d_allgathered_tuple_scalar_elements.data() + comm.get_rank() * tuple_size,
d_allgathered_tuple_scalar_elements.data(),
tuple_size,
stream);
std::vector<int64_t> h_allgathered_tuple_scalar_elements(comm.get_size() * tuple_size);
raft::update_host(h_allgathered_tuple_scalar_elements.data(),
d_allgathered_tuple_scalar_elements.data(),
Expand All @@ -318,6 +303,71 @@ host_scalar_allgather(raft::comms::comms_t const& comm, T input, cudaStream_t st
return ret;
}

template <typename T>
std::enable_if_t<std::is_arithmetic<T>::value, T> host_scalar_scatter(
raft::comms::comms_t const& comm,
std::vector<T> const& inputs, // relevant only in root
int root,
cudaStream_t stream)
{
CUGRAPH_EXPECTS(
((comm.get_rank() == root) && (inputs.size() == static_cast<size_t>(comm.get_size()))) ||
((comm.get_rank() != root) && (inputs.size() == 0)),
"inputs.size() should match with comm.get_size() in root and should be 0 otherwise.");
rmm::device_uvector<T> d_outputs(comm.get_size(), stream);
if (comm.get_rank() == root) {
raft::update_device(d_outputs.data(), inputs.data(), inputs.size(), stream);
}
comm.bcast(d_outputs.data(), d_outputs.size(), root, stream);
T h_output{};
raft::update_host(&h_output, d_outputs.data() + comm.get_rank(), 1, stream);
auto status = comm.sync_stream(stream);
CUGRAPH_EXPECTS(status == raft::comms::status_t::SUCCESS, "sync_stream() failure.");
return h_output;
}

template <typename T>
std::enable_if_t<cugraph::is_thrust_tuple_of_arithmetic<T>::value, T> host_scalar_scatter(
raft::comms::comms_t const& comm,
std::vector<T> const& inputs, // relevant only in root
int root,
cudaStream_t stream)
{
CUGRAPH_EXPECTS(
((comm.get_rank() == root) && (inputs.size() == static_cast<size_t>(comm.get_size()))) ||
((comm.get_rank() != root) && (inputs.size() == 0)),
"inputs.size() should match with comm.get_size() in root and should be 0 otherwise.");
size_t constexpr tuple_size = thrust::tuple_size<T>::value;
rmm::device_uvector<int64_t> d_scatter_tuple_scalar_elements(comm.get_size() * tuple_size,
stream);
if (comm.get_rank() == root) {
for (int i = 0; i < comm.get_size(); ++i) {
std::vector<int64_t> h_tuple_scalar_elements(tuple_size);
detail::update_vector_of_tuple_scalar_elements_from_tuple_impl<T, size_t{0}, tuple_size>()
.update(h_tuple_scalar_elements, inputs[i]);
raft::update_device(d_scatter_tuple_scalar_elements.data() + i * tuple_size,
h_tuple_scalar_elements.data(),
tuple_size,
stream);
}
}
comm.bcast(
d_scatter_tuple_scalar_elements.data(), d_scatter_tuple_scalar_elements.size(), root, stream);
std::vector<int64_t> h_tuple_scalar_elements(tuple_size);
raft::update_host(h_tuple_scalar_elements.data(),
d_scatter_tuple_scalar_elements.data() + comm.get_rank() * tuple_size,
tuple_size,
stream);
auto status = comm.sync_stream(stream);
CUGRAPH_EXPECTS(status == raft::comms::status_t::SUCCESS, "sync_stream() failure.");

T ret{};
detail::update_tuple_from_vector_of_tuple_scalar_elements_impl<T, size_t{0}, tuple_size>().update(
ret, h_tuple_scalar_elements);

return ret;
}

// Return value is valid only in root (return value may better be std::optional in C++17 or later)
template <typename T>
std::enable_if_t<std::is_arithmetic<T>::value, std::vector<T>> host_scalar_gather(
Expand Down
5 changes: 0 additions & 5 deletions cpp/include/cugraph/utilities/shuffle_comm.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ compute_tx_rx_counts_offsets_ranks(raft::comms::comms_t const& comm,

rmm::device_uvector<size_t> d_rx_value_counts(comm_size, stream_view);

// FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released.
std::vector<size_t> tx_counts(comm_size, size_t{1});
std::vector<size_t> tx_offsets(comm_size);
std::iota(tx_offsets.begin(), tx_offsets.end(), size_t{0});
Expand Down Expand Up @@ -835,7 +834,6 @@ auto shuffle_values(raft::comms::comms_t const& comm,
allocate_dataframe_buffer<typename thrust::iterator_traits<TxValueIterator>::value_type>(
rx_offsets.size() > 0 ? rx_offsets.back() + rx_counts.back() : size_t{0}, stream_view);

// FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released
// (if num_tx_dst_ranks == num_rx_src_ranks == comm_size).
device_multicast_sendrecv(comm,
tx_value_first,
Expand Down Expand Up @@ -889,7 +887,6 @@ auto groupby_gpu_id_and_shuffle_values(raft::comms::comms_t const& comm,
allocate_dataframe_buffer<typename thrust::iterator_traits<ValueIterator>::value_type>(
rx_offsets.size() > 0 ? rx_offsets.back() + rx_counts.back() : size_t{0}, stream_view);

// FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released
// (if num_tx_dst_ranks == num_rx_src_ranks == comm_size).
device_multicast_sendrecv(comm,
tx_value_first,
Expand Down Expand Up @@ -946,7 +943,6 @@ auto groupby_gpu_id_and_shuffle_kv_pairs(raft::comms::comms_t const& comm,
allocate_dataframe_buffer<typename thrust::iterator_traits<ValueIterator>::value_type>(
rx_keys.size(), stream_view);

// FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released
// (if num_tx_dst_ranks == num_rx_src_ranks == comm_size).
device_multicast_sendrecv(comm,
tx_key_first,
Expand All @@ -959,7 +955,6 @@ auto groupby_gpu_id_and_shuffle_kv_pairs(raft::comms::comms_t const& comm,
rx_src_ranks,
stream_view);

// FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released
// (if num_tx_dst_ranks == num_rx_src_ranks == comm_size).
device_multicast_sendrecv(comm,
tx_value_first,
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/centrality/katz_centrality_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ void katz_centrality(
CUGRAPH_EXPECTS(epsilon >= 0.0, "Invalid input argument: epsilon should be non-negative.");

if (do_expensive_check) {
// FIXME: should I check for betas?

if (has_initial_guess) {
auto num_negative_values =
count_if_v(handle, pull_graph_view, katz_centralities, [] __device__(auto, auto val) {
Expand Down
40 changes: 8 additions & 32 deletions cpp/src/components/weakly_connected_components_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,16 @@ struct v_op_t {
auto tag = thrust::get<1>(tagged_v);
auto v_offset =
vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(thrust::get<0>(tagged_v));
// FIXME: better switch to atomic_ref after
// https://github.com/nvidia/libcudacxx/milestone/2
auto old =
atomicCAS(level_components + v_offset, invalid_component_id<vertex_type>::value, tag);
if (old != invalid_component_id<vertex_type>::value && old != tag) { // conflict
cuda::atomic_ref<vertex_type> v_component(*(level_components + v_offset));
auto old = invalid_component_id<vertex_type>::value;
bool success = v_component.compare_exchange_strong(old, tag, cuda::std::memory_order_relaxed);
if (!success && (old != tag)) { // conflict
return thrust::make_tuple(thrust::optional<size_t>{bucket_idx_conflict},
thrust::optional<std::byte>{std::byte{0}} /* dummy */);
} else {
auto update = (old == invalid_component_id<vertex_type>::value);
return thrust::make_tuple(
update ? thrust::optional<size_t>{bucket_idx_next} : thrust::nullopt,
update ? thrust::optional<std::byte>{std::byte{0}} /* dummy */ : thrust::nullopt);
success ? thrust::optional<size_t>{bucket_idx_next} : thrust::nullopt,
success ? thrust::optional<std::byte>{std::byte{0}} /* dummy */ : thrust::nullopt);
}
}

Expand Down Expand Up @@ -457,33 +455,11 @@ void weakly_connected_components_impl(raft::handle_t const& handle,
std::numeric_limits<vertex_t>::max());
}

// FIXME: we need to add host_scalar_scatter
#if 1
rmm::device_uvector<vertex_t> d_counts(comm_size, handle.get_stream());
raft::update_device(d_counts.data(),
init_max_new_root_counts.data(),
init_max_new_root_counts.size(),
handle.get_stream());
device_bcast(
comm, d_counts.data(), d_counts.data(), d_counts.size(), int{0}, handle.get_stream());
raft::update_host(
&init_max_new_roots, d_counts.data() + comm_rank, size_t{1}, handle.get_stream());
#else
init_max_new_roots =
host_scalar_scatter(comm, init_max_new_root_counts.data(), int{0}, handle.get_stream());
#endif
host_scalar_scatter(comm, init_max_new_root_counts, int{0}, handle.get_stream());
} else {
// FIXME: we need to add host_scalar_scatter
#if 1
rmm::device_uvector<vertex_t> d_counts(comm_size, handle.get_stream());
device_bcast(
comm, d_counts.data(), d_counts.data(), d_counts.size(), int{0}, handle.get_stream());
raft::update_host(
&init_max_new_roots, d_counts.data() + comm_rank, size_t{1}, handle.get_stream());
#else
init_max_new_roots =
host_scalar_scatter(comm, init_max_new_root_counts.data(), int{0}, handle.get_stream());
#endif
host_scalar_scatter(comm, std::vector<vertex_t>{}, int{0}, handle.get_stream());
}

handle.sync_stream();
Expand Down

0 comments on commit d34e3d6

Please sign in to comment.