diff --git a/common/unified/components/precision_conversion_kernels.cpp b/common/unified/components/precision_conversion_kernels.cpp index 0402d9bef68..4f3bce969e0 100644 --- a/common/unified/components/precision_conversion_kernels.cpp +++ b/common/unified/components/precision_conversion_kernels.cpp @@ -23,7 +23,7 @@ void convert_precision(std::shared_ptr exec, size, in, out); } -GKO_INSTANTIATE_FOR_EACH_VALUE_CONVERSION(GKO_DECLARE_CONVERT_PRECISION_KERNEL); +GKO_INSTANTIATE_FOR_EACH_POD_CONVERSION(GKO_DECLARE_CONVERT_PRECISION_KERNEL); } // namespace components diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index f7f1e00f17b..b814e660aca 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -135,6 +135,7 @@ if(GINKGO_BUILD_MPI) distributed/matrix.cpp distributed/neighborhood_communicator.cpp distributed/partition_helpers.cpp + distributed/row_gatherer.cpp distributed/vector.cpp distributed/preconditioner/schwarz.cpp) endif() diff --git a/core/base/array.cpp b/core/base/array.cpp index a41f7c07e55..2025cc6480d 100644 --- a/core/base/array.cpp +++ b/core/base/array.cpp @@ -51,7 +51,7 @@ void convert_data(std::shared_ptr exec, size_type size, void convert_data(std::shared_ptr, size_type, \ const From*, To*) -GKO_INSTANTIATE_FOR_EACH_VALUE_CONVERSION(GKO_DECLARE_ARRAY_CONVERSION); +GKO_INSTANTIATE_FOR_EACH_POD_CONVERSION(GKO_DECLARE_ARRAY_CONVERSION); } // namespace detail diff --git a/core/device_hooks/common_kernels.inc.cpp b/core/device_hooks/common_kernels.inc.cpp index fe054856695..d86184c1ce6 100644 --- a/core/device_hooks/common_kernels.inc.cpp +++ b/core/device_hooks/common_kernels.inc.cpp @@ -81,6 +81,11 @@ _macro(SourceType, TargetType) GKO_NOT_COMPILED(GKO_HOOK_MODULE); \ GKO_INSTANTIATE_FOR_EACH_VALUE_CONVERSION(_macro) +#define GKO_STUB_POD_CONVERSION(_macro) \ + template \ + _macro(SourceType, TargetType) GKO_NOT_COMPILED(GKO_HOOK_MODULE); \ + GKO_INSTANTIATE_FOR_EACH_POD_CONVERSION(_macro) + #define GKO_STUB_NON_COMPLEX_VALUE_TYPE(_macro) \ template \ _macro(ValueType) GKO_NOT_COMPILED(GKO_HOOK_MODULE); \ @@ -147,11 +152,6 @@ _macro(IndexType) GKO_NOT_COMPILED(GKO_HOOK_MODULE); \ GKO_INSTANTIATE_FOR_EACH_TEMPLATE_TYPE(_macro) -#define GKO_STUB_VALUE_CONVERSION(_macro) \ - template \ - _macro(SourceType, TargetType) GKO_NOT_COMPILED(GKO_HOOK_MODULE); \ - GKO_INSTANTIATE_FOR_EACH_VALUE_CONVERSION(_macro) - #define GKO_STUB_VALUE_CONVERSION_OR_COPY(_macro) \ template \ _macro(SourceType, TargetType) GKO_NOT_COMPILED(GKO_HOOK_MODULE); \ @@ -173,7 +173,7 @@ namespace GKO_HOOK_MODULE { namespace components { -GKO_STUB_VALUE_CONVERSION(GKO_DECLARE_CONVERT_PRECISION_KERNEL); +GKO_STUB_POD_CONVERSION(GKO_DECLARE_CONVERT_PRECISION_KERNEL); GKO_STUB_INDEX_TYPE(GKO_DECLARE_PREFIX_SUM_NONNEGATIVE_KERNEL); // explicitly instantiate for size_type, as this is // used in the SellP format diff --git a/core/distributed/matrix.cpp b/core/distributed/matrix.cpp index 815bfa59fc2..a923d3e237d 100644 --- a/core/distributed/matrix.cpp +++ b/core/distributed/matrix.cpp @@ -5,6 +5,7 @@ #include "ginkgo/core/distributed/matrix.hpp" #include +#include #include #include #include @@ -30,57 +31,14 @@ GKO_REGISTER_OPERATION(separate_local_nonlocal, template void initialize_communication_pattern( - std::shared_ptr exec, mpi::communicator comm, const index_map& imap, - std::vector& recv_sizes, - std::vector& recv_offsets, - std::vector& send_sizes, - std::vector& send_offsets, - array& gather_idxs) + std::shared_ptr>& row_gatherer) { - // exchange step 1: determine recv_sizes, send_sizes, send_offsets - auto host_recv_targets = - make_temporary_clone(exec->get_master(), &imap.get_remote_target_ids()); - auto host_offsets = make_temporary_clone( - exec->get_master(), &imap.get_remote_global_idxs().get_offsets()); - auto compute_recv_sizes = [](const auto* recv_targets, size_type size, - const auto* offsets, auto& recv_sizes) { - for (size_type i = 0; i < size; ++i) { - recv_sizes[recv_targets[i]] = offsets[i + 1] - offsets[i]; - } - }; - std::fill(recv_sizes.begin(), recv_sizes.end(), 0); - compute_recv_sizes(host_recv_targets->get_const_data(), - host_recv_targets->get_size(), - host_offsets->get_const_data(), recv_sizes); - std::partial_sum(recv_sizes.begin(), recv_sizes.end(), - recv_offsets.begin() + 1); - comm.all_to_all(exec, recv_sizes.data(), 1, send_sizes.data(), 1); - std::partial_sum(send_sizes.begin(), send_sizes.end(), - send_offsets.begin() + 1); - send_offsets[0] = 0; - recv_offsets[0] = 0; - - // exchange step 2: exchange gather_idxs from receivers to senders - auto recv_gather_idxs = - make_const_array_view( - imap.get_executor(), imap.get_non_local_size(), - imap.get_remote_local_idxs().get_const_flat_data()) - .copy_to_array(); - auto use_host_buffer = mpi::requires_host_buffer(exec, comm); - if (use_host_buffer) { - recv_gather_idxs.set_executor(exec->get_master()); - gather_idxs.clear(); - gather_idxs.set_executor(exec->get_master()); - } - gather_idxs.resize_and_reset(send_offsets.back()); - comm.all_to_all_v(use_host_buffer ? exec->get_master() : exec, - recv_gather_idxs.get_const_data(), recv_sizes.data(), - recv_offsets.data(), gather_idxs.get_data(), - send_sizes.data(), send_offsets.data()); - if (use_host_buffer) { - gather_idxs.set_executor(exec); - } + row_gatherer = RowGatherer::create( + row_gatherer->get_executor(), + row_gatherer->get_collective_communicator()->create_with_same_type( + row_gatherer->get_communicator(), imap), + imap); } @@ -101,12 +59,8 @@ Matrix::Matrix( : EnableDistributedLinOp< Matrix>{exec}, DistributedBase{comm}, - imap_(exec), - send_offsets_(comm.size() + 1), - send_sizes_(comm.size()), - recv_offsets_(comm.size() + 1), - recv_sizes_(comm.size()), - gather_idxs_{exec}, + row_gatherer_{RowGatherer::create(exec, comm)}, + imap_{exec}, one_scalar_{}, local_mtx_{local_matrix_template->clone(exec)}, non_local_mtx_{non_local_matrix_template->clone(exec)} @@ -128,12 +82,8 @@ Matrix::Matrix( : EnableDistributedLinOp< Matrix>{exec}, DistributedBase{comm}, - imap_(exec), - send_offsets_(comm.size() + 1), - send_sizes_(comm.size()), - recv_offsets_(comm.size() + 1), - recv_sizes_(comm.size()), - gather_idxs_{exec}, + row_gatherer_{RowGatherer::create(exec, comm)}, + imap_{exec}, one_scalar_{}, non_local_mtx_(::gko::matrix::Coo::create( exec, dim<2>{local_linop->get_size()[0], 0})) @@ -152,12 +102,8 @@ Matrix::Matrix( : EnableDistributedLinOp< Matrix>{exec}, DistributedBase{comm}, + row_gatherer_(RowGatherer::create(exec, comm)), imap_(std::move(imap)), - send_offsets_(comm.size() + 1), - send_sizes_(comm.size()), - recv_offsets_(comm.size() + 1), - recv_sizes_(comm.size()), - gather_idxs_{exec}, one_scalar_{} { this->set_size({imap_.get_global_size(), imap_.get_global_size()}); @@ -166,9 +112,7 @@ Matrix::Matrix( one_scalar_.init(exec, dim<2>{1, 1}); one_scalar_->fill(one()); - initialize_communication_pattern( - this->get_executor(), this->get_communicator(), imap_, recv_sizes_, - recv_offsets_, send_sizes_, send_offsets_, gather_idxs_); + initialize_communication_pattern(imap_, row_gatherer_); } @@ -235,12 +179,8 @@ void Matrix::convert_to( result->get_communicator().size()); result->local_mtx_->copy_from(this->local_mtx_); result->non_local_mtx_->copy_from(this->non_local_mtx_); + result->row_gatherer_->copy_from(this->row_gatherer_); result->imap_ = this->imap_; - result->gather_idxs_ = this->gather_idxs_; - result->send_offsets_ = this->send_offsets_; - result->recv_offsets_ = this->recv_offsets_; - result->recv_sizes_ = this->recv_sizes_; - result->send_sizes_ = this->send_sizes_; result->set_size(this->get_size()); } @@ -254,12 +194,8 @@ void Matrix::move_to( result->get_communicator().size()); result->local_mtx_->move_from(this->local_mtx_); result->non_local_mtx_->move_from(this->non_local_mtx_); + result->row_gatherer_->move_from(this->row_gatherer_); result->imap_ = std::move(this->imap_); - result->gather_idxs_ = std::move(this->gather_idxs_); - result->send_offsets_ = std::move(this->send_offsets_); - result->recv_offsets_ = std::move(this->recv_offsets_); - result->recv_sizes_ = std::move(this->recv_sizes_); - result->send_sizes_ = std::move(this->send_sizes_); result->set_size(this->get_size()); this->set_size({}); } @@ -282,7 +218,6 @@ void Matrix::read_distributed( auto local_part = comm.rank(); // set up LinOp sizes - auto num_parts = static_cast(row_partition->get_num_parts()); auto global_num_rows = row_partition->get_size(); auto global_num_cols = col_partition->get_size(); dim<2> global_dim{global_num_rows, global_num_cols}; @@ -329,9 +264,7 @@ void Matrix::read_distributed( as>(this->non_local_mtx_) ->read(std::move(non_local_data)); - initialize_communication_pattern(exec, comm, imap_, recv_sizes_, - recv_offsets_, send_sizes_, send_offsets_, - gather_idxs_); + initialize_communication_pattern(imap_, row_gatherer_); } @@ -373,55 +306,6 @@ void Matrix::read_distributed( } -template -mpi::request Matrix::communicate( - const local_vector_type* local_b) const -{ - // This function can never return early! - // Even if the non-local part is empty, i.e. this process doesn't need - // any data from other processes, the used MPI calls are collective - // operations. They need to be called on all processes, even if a process - // might not communicate any data. - auto exec = this->get_executor(); - const auto comm = this->get_communicator(); - auto num_cols = local_b->get_size()[1]; - auto send_size = send_offsets_.back(); - auto recv_size = recv_offsets_.back(); - auto send_dim = dim<2>{static_cast(send_size), num_cols}; - auto recv_dim = dim<2>{static_cast(recv_size), num_cols}; - recv_buffer_.init(exec, recv_dim); - send_buffer_.init(exec, send_dim); - - local_b->row_gather(&gather_idxs_, send_buffer_.get()); - - auto use_host_buffer = mpi::requires_host_buffer(exec, comm); - if (use_host_buffer) { - host_recv_buffer_.init(exec->get_master(), recv_dim); - host_send_buffer_.init(exec->get_master(), send_dim); - host_send_buffer_->copy_from(send_buffer_.get()); - } - - mpi::contiguous_type type(num_cols, mpi::type_impl::get_type()); - auto send_ptr = use_host_buffer ? host_send_buffer_->get_const_values() - : send_buffer_->get_const_values(); - auto recv_ptr = use_host_buffer ? host_recv_buffer_->get_values() - : recv_buffer_->get_values(); - exec->synchronize(); -#ifdef GINKGO_HAVE_OPENMPI_PRE_4_1_X - comm.all_to_all_v(use_host_buffer ? exec->get_master() : exec, send_ptr, - send_sizes_.data(), send_offsets_.data(), type.get(), - recv_ptr, recv_sizes_.data(), recv_offsets_.data(), - type.get()); - return {}; -#else - return comm.i_all_to_all_v( - use_host_buffer ? exec->get_master() : exec, send_ptr, - send_sizes_.data(), send_offsets_.data(), type.get(), recv_ptr, - recv_sizes_.data(), recv_offsets_.data(), type.get()); -#endif -} - - template void Matrix::apply_impl( const LinOp* b, LinOp* x) const @@ -437,16 +321,22 @@ void Matrix::apply_impl( dense_x->get_local_values()), dense_x->get_local_vector()->get_stride()); + auto exec = this->get_executor(); auto comm = this->get_communicator(); - auto req = this->communicate(dense_b->get_local_vector()); + auto recv_dim = + dim<2>{static_cast( + row_gatherer_->get_collective_communicator() + ->get_recv_size()), + dense_b->get_size()[1]}; + auto recv_exec = mpi::requires_host_buffer(exec, comm) + ? exec->get_master() + : exec; + recv_buffer_.init(recv_exec, recv_dim); + auto req = + this->row_gatherer_->apply_async(dense_b, recv_buffer_.get()); local_mtx_->apply(dense_b->get_local_vector(), local_x); req.wait(); - auto exec = this->get_executor(); - auto use_host_buffer = mpi::requires_host_buffer(exec, comm); - if (use_host_buffer) { - recv_buffer_->copy_from(host_recv_buffer_.get()); - } non_local_mtx_->apply(one_scalar_.get(), recv_buffer_.get(), one_scalar_.get(), local_x); }, @@ -470,17 +360,23 @@ void Matrix::apply_impl( dense_x->get_local_values()), dense_x->get_local_vector()->get_stride()); + auto exec = this->get_executor(); auto comm = this->get_communicator(); - auto req = this->communicate(dense_b->get_local_vector()); + auto recv_dim = + dim<2>{static_cast( + row_gatherer_->get_collective_communicator() + ->get_recv_size()), + dense_b->get_size()[1]}; + auto recv_exec = mpi::requires_host_buffer(exec, comm) + ? exec->get_master() + : exec; + recv_buffer_.init(recv_exec, recv_dim); + auto req = + this->row_gatherer_->apply_async(dense_b, recv_buffer_.get()); local_mtx_->apply(local_alpha, dense_b->get_local_vector(), local_beta, local_x); req.wait(); - auto exec = this->get_executor(); - auto use_host_buffer = mpi::requires_host_buffer(exec, comm); - if (use_host_buffer) { - recv_buffer_->copy_from(host_recv_buffer_.get()); - } non_local_mtx_->apply(local_alpha, recv_buffer_.get(), one_scalar_.get(), local_x); }, @@ -563,6 +459,8 @@ Matrix::Matrix(const Matrix& other) : EnableDistributedLinOp>{other.get_executor()}, DistributedBase{other.get_communicator()}, + row_gatherer_{RowGatherer::create( + other.get_executor(), other.get_communicator())}, imap_(other.get_executor()) { *this = other; @@ -575,6 +473,8 @@ Matrix::Matrix( : EnableDistributedLinOp>{other.get_executor()}, DistributedBase{other.get_communicator()}, + row_gatherer_{RowGatherer::create( + other.get_executor(), other.get_communicator())}, imap_(other.get_executor()) { *this = std::move(other); @@ -592,12 +492,8 @@ Matrix::operator=( this->set_size(other.get_size()); local_mtx_->copy_from(other.local_mtx_); non_local_mtx_->copy_from(other.non_local_mtx_); + row_gatherer_->copy_from(other.row_gatherer_); imap_ = other.imap_; - gather_idxs_ = other.gather_idxs_; - send_offsets_ = other.send_offsets_; - recv_offsets_ = other.recv_offsets_; - send_sizes_ = other.send_sizes_; - recv_sizes_ = other.recv_sizes_; one_scalar_.init(this->get_executor(), dim<2>{1, 1}); one_scalar_->fill(one()); } @@ -616,12 +512,8 @@ Matrix::operator=(Matrix&& other) other.set_size({}); local_mtx_->move_from(other.local_mtx_); non_local_mtx_->move_from(other.non_local_mtx_); + row_gatherer_->move_from(other.row_gatherer_); imap_ = std::move(other.imap_); - gather_idxs_ = std::move(other.gather_idxs_); - send_offsets_ = std::move(other.send_offsets_); - recv_offsets_ = std::move(other.recv_offsets_); - send_sizes_ = std::move(other.send_sizes_); - recv_sizes_ = std::move(other.recv_sizes_); one_scalar_.init(this->get_executor(), dim<2>{1, 1}); one_scalar_->fill(one()); } diff --git a/core/distributed/row_gatherer.cpp b/core/distributed/row_gatherer.cpp new file mode 100644 index 00000000000..21b4a1b4881 --- /dev/null +++ b/core/distributed/row_gatherer.cpp @@ -0,0 +1,261 @@ +// SPDX-FileCopyrightText: 2017 - 2024 The Ginkgo authors +// +// SPDX-License-Identifier: BSD-3-Clause + +#include "ginkgo/core/distributed/row_gatherer.hpp" + +#include +#include +#include +#include +#include + +#include "core/base/dispatch_helper.hpp" + +namespace gko { +namespace experimental { +namespace distributed { + + +#if GINKGO_HAVE_OPENMPI_POST_4_1_X +using DefaultCollComm = mpi::NeighborhoodCommunicator; +#else +using DefaultCollComm = mpi::DenseCommunicator; +#endif + + +template +void RowGatherer::apply_impl(const LinOp* b, LinOp* x) const +{ + apply_async(b, x).wait(); +} + + +template +void RowGatherer::apply_impl(const LinOp* alpha, const LinOp* b, + const LinOp* beta, LinOp* x) const + GKO_NOT_IMPLEMENTED; + + +template +mpi::request RowGatherer::apply_async(ptr_param b, + ptr_param x) const +{ + int is_inactive; + MPI_Status status; + GKO_ASSERT_NO_MPI_ERRORS( + MPI_Request_get_status(req_listener_, &is_inactive, &status)); + // This is untestable. Some processes might complete the previous request + // while others don't, so it's impossible to create a predictable behavior + // for a test. + GKO_THROW_IF_INVALID(is_inactive, + "Tried to call RowGatherer::apply_async while there " + "is already an active communication. Please use the " + "overload with a workspace to handle multiple " + "connections."); + + auto req = apply_async(b, x, send_workspace_); + req_listener_ = *req.get(); + return req; +} + + +template +mpi::request RowGatherer::apply_async( + ptr_param b, ptr_param x, array& workspace) const +{ + mpi::request req; + + // dispatch global vector + run, std::complex>( + b.get(), [&](const auto* b_global) { + using ValueType = + typename std::decay_t::value_type; + // dispatch local vector with the same precision as the global + // vector + ::gko::precision_dispatch( + [&](auto* x_local) { + auto exec = this->get_executor(); + + auto use_host_buffer = mpi::requires_host_buffer( + exec, coll_comm_->get_base_communicator()); + auto mpi_exec = use_host_buffer ? exec->get_master() : exec; + + GKO_THROW_IF_INVALID( + !use_host_buffer || mpi_exec->memory_accessible( + x_local->get_executor()), + "The receive buffer uses device memory, but MPI " + "support of device memory is not available. Please " + "provide a host buffer or enable MPI support for " + "device memory."); + + auto b_local = b_global->get_local_vector(); + + dim<2> send_size(coll_comm_->get_send_size(), + b_local->get_size()[1]); + auto send_size_in_bytes = + sizeof(ValueType) * send_size[0] * send_size[1]; + workspace.set_executor(mpi_exec); + if (send_size_in_bytes > workspace.get_size()) { + workspace.resize_and_reset(sizeof(ValueType) * + send_size[0] * send_size[1]); + } + auto send_buffer = matrix::Dense::create( + mpi_exec, send_size, + make_array_view( + mpi_exec, send_size[0] * send_size[1], + reinterpret_cast(workspace.get_data())), + send_size[1]); + b_local->row_gather(&send_idxs_, send_buffer); + + auto recv_ptr = x_local->get_values(); + auto send_ptr = send_buffer->get_values(); + + b_local->get_executor()->synchronize(); + mpi::contiguous_type type( + b_local->get_size()[1], + mpi::type_impl::get_type()); + req = coll_comm_->i_all_to_all_v( + mpi_exec, send_ptr, type.get(), recv_ptr, type.get()); + }, + x.get()); + }); + return req; +} + + +template +std::shared_ptr +RowGatherer::get_collective_communicator() const +{ + return coll_comm_; +} + + +template +template +RowGatherer::RowGatherer( + std::shared_ptr exec, + std::shared_ptr coll_comm, + const index_map& imap) + : EnableDistributedLinOp>( + exec, dim<2>{imap.get_non_local_size(), imap.get_global_size()}), + DistributedBase(coll_comm->get_base_communicator()), + coll_comm_(std::move(coll_comm)), + send_idxs_(exec), + send_workspace_(exec), + req_listener_(MPI_REQUEST_NULL) +{ + // check that the coll_comm_ and imap have the same recv size + // the same check for the send size is not possible, since the + // imap doesn't store send indices + GKO_THROW_IF_INVALID( + coll_comm_->get_recv_size() == imap.get_non_local_size(), + "The collective communicator doesn't match the index map."); + + auto comm = coll_comm_->get_base_communicator(); + auto inverse_comm = coll_comm_->create_inverse(); + + send_idxs_.resize_and_reset(coll_comm_->get_send_size()); + inverse_comm + ->i_all_to_all_v(exec, + imap.get_remote_local_idxs().get_const_flat_data(), + send_idxs_.get_data()) + .wait(); +} + + +template +const LocalIndexType* RowGatherer::get_const_row_idxs() const +{ + return send_idxs_.get_const_data(); +} + + +template +RowGatherer::RowGatherer(std::shared_ptr exec, + mpi::communicator comm) + : EnableDistributedLinOp>(exec), + DistributedBase(comm), + coll_comm_(std::make_shared(comm)), + send_idxs_(exec), + send_workspace_(exec), + req_listener_(MPI_REQUEST_NULL) +{} + + +template +RowGatherer::RowGatherer(RowGatherer&& o) noexcept + : EnableDistributedLinOp>(o.get_executor()), + DistributedBase(o.get_communicator()), + send_idxs_(o.get_executor()), + send_workspace_(o.get_executor()), + req_listener_(MPI_REQUEST_NULL) +{ + *this = std::move(o); +} + + +template +RowGatherer& RowGatherer::operator=( + const RowGatherer& o) +{ + if (this != &o) { + this->set_size(o.get_size()); + coll_comm_ = o.coll_comm_; + send_idxs_ = o.send_idxs_; + } + return *this; +} + + +template +RowGatherer& RowGatherer::operator=( + RowGatherer&& o) +{ + if (this != &o) { + this->set_size(o.get_size()); + o.set_size({}); + coll_comm_ = std::exchange( + o.coll_comm_, + std::make_shared(o.get_communicator())); + send_idxs_ = std::move(o.send_idxs_); + send_workspace_ = std::move(o.send_workspace_); + req_listener_ = std::exchange(o.req_listener_, MPI_REQUEST_NULL); + } + return *this; +} + + +template +RowGatherer::RowGatherer(const RowGatherer& o) + : EnableDistributedLinOp>(o.get_executor()), + DistributedBase(o.get_communicator()), + send_idxs_(o.get_executor()) +{ + *this = o; +} + + +#define GKO_DECLARE_ROW_GATHERER(_itype) class RowGatherer<_itype> + +GKO_INSTANTIATE_FOR_EACH_INDEX_TYPE(GKO_DECLARE_ROW_GATHERER); + +#undef GKO_DECLARE_ROW_GATHERER + + +#define GKO_DECLARE_ROW_GATHERER_CONSTRUCTOR(_ltype, _gtype) \ + RowGatherer<_ltype>::RowGatherer( \ + std::shared_ptr exec, \ + std::shared_ptr coll_comm, \ + const index_map<_ltype, _gtype>& imap) + +GKO_INSTANTIATE_FOR_EACH_LOCAL_GLOBAL_INDEX_TYPE( + GKO_DECLARE_ROW_GATHERER_CONSTRUCTOR); + +#undef GKO_DECLARE_ROW_GATHERER_CONSTRUCTOR + + +} // namespace distributed +} // namespace experimental +} // namespace gko diff --git a/core/multigrid/pgm.cpp b/core/multigrid/pgm.cpp index 6234f072dd5..7de5c147686 100644 --- a/core/multigrid/pgm.cpp +++ b/core/multigrid/pgm.cpp @@ -264,18 +264,15 @@ array Pgm::communicate_non_local_agg( { auto exec = gko::as(matrix)->get_executor(); const auto comm = matrix->get_communicator(); - auto send_sizes = matrix->send_sizes_; - auto recv_sizes = matrix->recv_sizes_; - auto send_offsets = matrix->send_offsets_; - auto recv_offsets = matrix->recv_offsets_; - auto gather_idxs = matrix->gather_idxs_; - auto total_send_size = send_offsets.back(); - auto total_recv_size = recv_offsets.back(); + auto coll_comm = matrix->row_gatherer_->get_collective_communicator(); + auto total_send_size = coll_comm->get_send_size(); + auto total_recv_size = coll_comm->get_recv_size(); + auto row_gatherer = matrix->row_gatherer_; array send_agg(exec, total_send_size); exec->run(pgm::make_gather_index( send_agg.get_size(), local_agg.get_const_data(), - gather_idxs.get_const_data(), send_agg.get_data())); + row_gatherer->get_const_row_idxs(), send_agg.get_data())); // temporary index map that contains no remote connections to map // local indices to global @@ -296,16 +293,16 @@ array Pgm::communicate_non_local_agg( seng_global_agg.get_data(), host_send_buffer.get_data()); } - auto type = experimental::mpi::type_impl::get_type(); const auto send_ptr = use_host_buffer ? host_send_buffer.get_const_data() : seng_global_agg.get_const_data(); auto recv_ptr = use_host_buffer ? host_recv_buffer.get_data() : non_local_agg.get_data(); exec->synchronize(); - comm.all_to_all_v(use_host_buffer ? exec->get_master() : exec, send_ptr, - send_sizes.data(), send_offsets.data(), type, recv_ptr, - recv_sizes.data(), recv_offsets.data(), type); + coll_comm + ->i_all_to_all_v(use_host_buffer ? exec->get_master() : exec, send_ptr, + recv_ptr) + .wait(); if (use_host_buffer) { exec->copy_from(exec->get_master(), total_recv_size, recv_ptr, non_local_agg.get_data()); diff --git a/core/test/gtest/ginkgo_mpi_main.cpp b/core/test/gtest/ginkgo_mpi_main.cpp index 07a1c2c343d..76f4546c4ed 100644 --- a/core/test/gtest/ginkgo_mpi_main.cpp +++ b/core/test/gtest/ginkgo_mpi_main.cpp @@ -356,7 +356,13 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); - MPI_Init(&argc, &argv); + int provided_thread_support; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, + &provided_thread_support); + if (provided_thread_support != MPI_THREAD_MULTIPLE) { + throw std::runtime_error( + "This test requires an thread compliant MPI implementation."); + } MPI_Comm comm(MPI_COMM_WORLD); int rank; int size; diff --git a/core/test/mpi/distributed/CMakeLists.txt b/core/test/mpi/distributed/CMakeLists.txt index 9c7b43cf44b..ab676c5cfd1 100644 --- a/core/test/mpi/distributed/CMakeLists.txt +++ b/core/test/mpi/distributed/CMakeLists.txt @@ -1,6 +1,7 @@ ginkgo_create_test(helpers MPI_SIZE 1) ginkgo_create_test(matrix MPI_SIZE 1) ginkgo_create_test(dense_communicator MPI_SIZE 6) +ginkgo_create_test(row_gatherer MPI_SIZE 6) if(NOT GINKGO_HAVE_OPENMPI_PRE_4_1_X) ginkgo_create_test(neighborhood_communicator MPI_SIZE 6) diff --git a/core/test/mpi/distributed/row_gatherer.cpp b/core/test/mpi/distributed/row_gatherer.cpp new file mode 100644 index 00000000000..c2b8a8ab8e9 --- /dev/null +++ b/core/test/mpi/distributed/row_gatherer.cpp @@ -0,0 +1,266 @@ +// SPDX-FileCopyrightText: 2017 - 2024 The Ginkgo authors +// +// SPDX-License-Identifier: BSD-3-Clause + +#include + +#include +#include +#include + +#include "core/test/utils.hpp" +#include "core/test/utils/assertions.hpp" + + +template +class RowGatherer : public ::testing::Test { +protected: + using index_type = IndexType; + using part_type = + gko::experimental::distributed::Partition; + using map_type = + gko::experimental::distributed::index_map; + using row_gatherer_type = + gko::experimental::distributed::RowGatherer; + + RowGatherer() + { + int rank = this->comm.rank(); + auto part = gko::share(part_type::build_from_global_size_uniform( + this->ref, this->comm.size(), this->comm.size() * 3)); + auto recv_connections = + this->template create_recv_connections()[rank]; + auto imap = + map_type{this->ref, part, this->comm.rank(), recv_connections}; + auto coll_comm = + std::make_shared( + this->comm, imap); + rg = row_gatherer_type::create(ref, coll_comm, imap); + } + + void SetUp() override { ASSERT_EQ(comm.size(), 6); } + + template + std::array, 6> create_recv_connections() + { + return {gko::array{ref, {3, 5, 10, 11}}, + gko::array{ref, {0, 1, 7, 12, 13}}, + gko::array{ref, {3, 4, 17}}, + gko::array{ref, {1, 2, 12, 14}}, + gko::array{ref, {4, 5, 9, 10, 15, 16}}, + gko::array{ref, {8, 12, 13, 14}}}; + } + + std::shared_ptr ref = gko::ReferenceExecutor::create(); + gko::experimental::mpi::communicator comm = MPI_COMM_WORLD; + std::shared_ptr rg; +}; + +TYPED_TEST_SUITE(RowGatherer, gko::test::IndexTypes, TypenameNameGenerator); + +TYPED_TEST(RowGatherer, CanDefaultConstruct) +{ + using RowGatherer = typename TestFixture::row_gatherer_type; + + auto rg = RowGatherer::create(this->ref, this->comm); + + GKO_ASSERT_EQUAL_DIMENSIONS(rg, gko::dim<2>()); +} + + +TYPED_TEST(RowGatherer, CanConstructWithEmptCollectiveCommAndIndexMap) +{ + using RowGatherer = typename TestFixture::row_gatherer_type; + using IndexMap = typename TestFixture::map_type; + auto coll_comm = + std::make_shared( + this->comm); + auto map = IndexMap{this->ref}; + + auto rg = RowGatherer::create(this->ref, coll_comm, map); + + GKO_ASSERT_EQUAL_DIMENSIONS(rg, gko::dim<2>()); +} + + +TYPED_TEST(RowGatherer, CanConstructFromCollectiveCommAndIndexMap) +{ + using RowGatherer = typename TestFixture::row_gatherer_type; + using Part = typename TestFixture::part_type; + using IndexMap = typename TestFixture::map_type; + int rank = this->comm.rank(); + auto part = gko::share(Part::build_from_global_size_uniform( + this->ref, this->comm.size(), this->comm.size() * 3)); + auto recv_connections = + this->template create_recv_connections()[rank]; + auto imap = IndexMap{this->ref, part, this->comm.rank(), recv_connections}; + auto coll_comm = + std::make_shared( + this->comm, imap); + + auto rg = RowGatherer::create(this->ref, coll_comm, imap); + + gko::dim<2> size{recv_connections.get_size(), 18}; + GKO_ASSERT_EQUAL_DIMENSIONS(rg, size); +} + + +TYPED_TEST(RowGatherer, CanApply) +{ + using Dense = gko::matrix::Dense; + using Vector = gko::experimental::distributed::Vector; + int rank = this->comm.rank(); + auto offset = static_cast(rank * 3); + auto b = Vector::create( + this->ref, this->comm, gko::dim<2>{18, 1}, + gko::initialize({offset, offset + 1, offset + 2}, this->ref)); + auto x = Dense::create(this->ref, gko::dim<2>{this->rg->get_size()[0], 1}); + + this->rg->apply(b, x); + + auto expected = this->template create_recv_connections()[rank]; + auto expected_vec = Dense::create( + this->ref, gko::dim<2>{expected.get_size(), 1}, expected, 1); + GKO_ASSERT_MTX_NEAR(x, expected_vec, 0.0); +} + + +TYPED_TEST(RowGatherer, CanApplyAsync) +{ + using Dense = gko::matrix::Dense; + using Vector = gko::experimental::distributed::Vector; + int rank = this->comm.rank(); + auto offset = static_cast(rank * 3); + auto b = Vector::create( + this->ref, this->comm, gko::dim<2>{18, 1}, + gko::initialize({offset, offset + 1, offset + 2}, this->ref)); + auto x = Dense::create(this->ref, gko::dim<2>{this->rg->get_size()[0], 1}); + + auto req = this->rg->apply_async(b, x); + req.wait(); + + auto expected = this->template create_recv_connections()[rank]; + auto expected_vec = Dense::create( + this->ref, gko::dim<2>{expected.get_size(), 1}, expected, 1); + GKO_ASSERT_MTX_NEAR(x, expected_vec, 0.0); +} + + +TYPED_TEST(RowGatherer, CanApplyAsyncConsequetively) +{ + using Dense = gko::matrix::Dense; + using Vector = gko::experimental::distributed::Vector; + int rank = this->comm.rank(); + auto offset = static_cast(rank * 3); + auto b = Vector::create( + this->ref, this->comm, gko::dim<2>{18, 1}, + gko::initialize({offset, offset + 1, offset + 2}, this->ref)); + auto x = Dense::create(this->ref, gko::dim<2>{this->rg->get_size()[0], 1}); + + this->rg->apply_async(b, x).wait(); + this->rg->apply_async(b, x).wait(); + + auto expected = this->template create_recv_connections()[rank]; + auto expected_vec = Dense::create( + this->ref, gko::dim<2>{expected.get_size(), 1}, expected, 1); + GKO_ASSERT_MTX_NEAR(x, expected_vec, 0.0); +} + + +TYPED_TEST(RowGatherer, CanApplyAsyncWithWorkspace) +{ + using Dense = gko::matrix::Dense; + using Vector = gko::experimental::distributed::Vector; + int rank = this->comm.rank(); + auto offset = static_cast(rank * 3); + auto b = Vector::create( + this->ref, this->comm, gko::dim<2>{18, 1}, + gko::initialize({offset, offset + 1, offset + 2}, this->ref)); + auto x = Dense::create(this->ref, gko::dim<2>{this->rg->get_size()[0], 1}); + gko::array workspace(this->ref); + + auto req = this->rg->apply_async(b, x, workspace); + req.wait(); + + auto expected = this->template create_recv_connections()[rank]; + auto expected_vec = Dense::create( + this->ref, gko::dim<2>{expected.get_size(), 1}, expected, 1); + GKO_ASSERT_MTX_NEAR(x, expected_vec, 0.0); +} + + +TYPED_TEST(RowGatherer, CanApplyAsyncMultipleTimesWithWorkspace) +{ + using Dense = gko::matrix::Dense; + using Vector = gko::experimental::distributed::Vector; + int rank = this->comm.rank(); + auto offset = static_cast(rank * 3); + auto b1 = Vector::create( + this->ref, this->comm, gko::dim<2>{18, 1}, + gko::initialize({offset, offset + 1, offset + 2}, this->ref)); + auto b2 = gko::clone(b1); + b2->scale(gko::initialize({-1}, this->ref)); + auto x1 = Dense::create(this->ref, gko::dim<2>{this->rg->get_size()[0], 1}); + auto x2 = gko::clone(x1); + gko::array workspace1(this->ref); + gko::array workspace2(this->ref); + + auto req1 = this->rg->apply_async(b1, x1, workspace1); + auto req2 = this->rg->apply_async(b2, x2, workspace2); + req1.wait(); + req2.wait(); + + auto expected = this->template create_recv_connections()[rank]; + auto expected_vec1 = Dense::create( + this->ref, gko::dim<2>{expected.get_size(), 1}, expected, 1); + auto expected_vec2 = gko::clone(expected_vec1); + expected_vec2->scale(gko::initialize({-1}, this->ref)); + GKO_ASSERT_MTX_NEAR(x1, expected_vec1, 0.0); + GKO_ASSERT_MTX_NEAR(x2, expected_vec2, 0.0); +} + + +TYPED_TEST(RowGatherer, CanApplyAsyncWithMultipleColumns) +{ + using Dense = gko::matrix::Dense; + using Vector = gko::experimental::distributed::Vector; + int rank = this->comm.rank(); + auto offset = static_cast(rank * 3); + auto b = Vector::create( + this->ref, this->comm, gko::dim<2>{18, 2}, + gko::initialize({{offset, offset * offset}, + {offset + 1, offset * offset + 1}, + {offset + 2, offset * offset + 2}}, + this->ref)); + auto x = Dense::create(this->ref, gko::dim<2>{this->rg->get_size()[0], 2}); + + this->rg->apply_async(b, x).wait(); + + gko::array expected[] = { + gko::array{this->ref, {3, 9, 5, 11, 10, 82, 11, 83}}, + gko::array{this->ref, {0, 0, 1, 1, 7, 37, 12, 144, 13, 145}}, + gko::array{this->ref, {3, 9, 4, 10, 17, 227}}, + gko::array{this->ref, {1, 1, 2, 2, 12, 144, 14, 146}}, + gko::array{this->ref, + {4, 10, 5, 11, 9, 81, 10, 82, 15, 225, 16, 226}}, + gko::array{this->ref, {8, 38, 12, 144, 13, 145, 14, 146}}}; + auto expected_vec = + Dense::create(this->ref, gko::dim<2>{expected[rank].get_size() / 2, 2}, + expected[rank], 2); + GKO_ASSERT_MTX_NEAR(x, expected_vec, 0.0); +} + + +TYPED_TEST(RowGatherer, ThrowsOnAdvancedApply) +{ + using RowGatherer = typename TestFixture::row_gatherer_type; + using Dense = gko::matrix::Dense; + using Vector = gko::experimental::distributed::Vector; + auto rg = RowGatherer::create(this->ref, this->comm); + auto b = Vector::create(this->ref, this->comm); + auto x = Dense::create(this->ref); + auto alpha = Dense::create(this->ref, gko::dim<2>{1, 1}); + auto beta = Dense::create(this->ref, gko::dim<2>{1, 1}); + + ASSERT_THROW(rg->apply(alpha, b, beta, x), gko::NotImplemented); +} diff --git a/include/ginkgo/core/base/types.hpp b/include/ginkgo/core/base/types.hpp index 3783d2c4b41..5387a38514c 100644 --- a/include/ginkgo/core/base/types.hpp +++ b/include/ginkgo/core/base/types.hpp @@ -682,6 +682,21 @@ GKO_ATTRIBUTES constexpr bool operator!=(precision_reduction x, template _macro(std::complex, std::complex) +/** + * Instantiates a template for each pod type conversion pair compiled by + * Ginkgo. + * + * @param _macro A macro which expands the template instantiation + * (not including the leading `template` specifier). + * Should take two arguments `src` and `dst`, which + * are replaced by the source and destination value type. + */ +#define GKO_INSTANTIATE_FOR_EACH_POD_CONVERSION(_macro) \ + GKO_INSTANTIATE_FOR_EACH_VALUE_CONVERSION(_macro); \ + template _macro(int32, int64); \ + template _macro(int64, int32) + + /** * Instantiates a template for each value type conversion or copy pair compiled * by Ginkgo. diff --git a/include/ginkgo/core/distributed/matrix.hpp b/include/ginkgo/core/distributed/matrix.hpp index 09070d0ca55..cee947b1cae 100644 --- a/include/ginkgo/core/distributed/matrix.hpp +++ b/include/ginkgo/core/distributed/matrix.hpp @@ -17,6 +17,7 @@ #include #include #include +#include namespace gko { @@ -610,32 +611,15 @@ class Matrix std::shared_ptr local_linop, std::shared_ptr non_local_linop); - /** - * Starts a non-blocking communication of the values of b that are shared - * with other processors. - * - * @param local_b The full local vector to be communicated. The subset of - * shared values is automatically extracted. - * @return MPI request for the non-blocking communication. - */ - mpi::request communicate(const local_vector_type* local_b) const; - void apply_impl(const LinOp* b, LinOp* x) const override; void apply_impl(const LinOp* alpha, const LinOp* b, const LinOp* beta, LinOp* x) const override; private: + std::shared_ptr> row_gatherer_; index_map imap_; - std::vector send_offsets_; - std::vector send_sizes_; - std::vector recv_offsets_; - std::vector recv_sizes_; - array gather_idxs_; gko::detail::DenseCache one_scalar_; - gko::detail::DenseCache host_send_buffer_; - gko::detail::DenseCache host_recv_buffer_; - gko::detail::DenseCache send_buffer_; gko::detail::DenseCache recv_buffer_; std::shared_ptr local_mtx_; std::shared_ptr non_local_mtx_; diff --git a/include/ginkgo/core/distributed/row_gatherer.hpp b/include/ginkgo/core/distributed/row_gatherer.hpp new file mode 100644 index 00000000000..3f5bea3ea9a --- /dev/null +++ b/include/ginkgo/core/distributed/row_gatherer.hpp @@ -0,0 +1,196 @@ +// SPDX-FileCopyrightText: 2017 - 2024 The Ginkgo authors +// +// SPDX-License-Identifier: BSD-3-Clause + +#ifndef GKO_PUBLIC_CORE_DISTRIBUTED_ROW_GATHERER_HPP_ +#define GKO_PUBLIC_CORE_DISTRIBUTED_ROW_GATHERER_HPP_ + + +#include + + +#if GINKGO_BUILD_MPI + + +#include + +#include +#include +#include +#include +#include + + +namespace gko { +namespace experimental { +namespace distributed { + + +/** + * The distributed::RowGatherer gathers the rows of distributed::Vector that + * are located on other processes. + * + * Example usage: + * ```c++ + * auto coll_comm = std::make_shared(comm, + * imap); auto rg = distributed::RowGatherer::create(exec, coll_comm, + * imap); + * + * auto b = distributed::Vector::create(...); + * auto x = matrix::Dense::create(...); + * + * auto future = rg->apply_async(b, x); + * // do some computation that doesn't modify b, or access x + * future.wait(); + * // x now contains the gathered rows of b + * ``` + * Using the apply instead of the apply_async will lead to a blocking + * communication. + * + * @note Objects of this class are only available as shared_ptr, since the class + * is derived from std::enable_shared_from_this. + * + * @tparam LocalIndexType the index type for the stored indices + */ +template +class RowGatherer final + : public EnableDistributedLinOp>, + public DistributedBase, + public std::enable_shared_from_this> { + friend class EnableDistributedPolymorphicObject; + +public: + /** + * Asynchronous version of LinOp::apply. + * + * @warning Only one mpi::request can be active at any given time. This + * function will throw if another request is already active. + * + * @param b the input distributed::Vector + * @param x the output matrix::Dense with the rows gathered from b + * @return a mpi::request for this task. The task is guaranteed to + * be completed only after `.wait()` has been called on it. + */ + mpi::request apply_async(ptr_param b, + ptr_param x) const; + + /** + * Asynchronous version of LinOp::apply. + * + * @warning Calling this multiple times with the same workspace and without + * waiting on each previous request will lead to incorrect + * data transfers. + * + * @param b the input distributed::Vector + * @param x the output matrix::Dense with the rows gathered from b + * @param workspace a workspace to store temporary data for the operation. + * This might not be modified before the request is + * waited on. + * @return a mpi::request for this task. The task is guaranteed to + * be completed only after `.wait()` has been called on it. + */ + mpi::request apply_async(ptr_param b, ptr_param x, + array& workspace) const; + + /** + * Get the used collective communicator. + * + * @return the used collective communicator + */ + std::shared_ptr + get_collective_communicator() const; + + /** + * Read access to the (local) rows indices + * + * @return the (local) row indices that are gathered + */ + const LocalIndexType* get_const_row_idxs() const; + + /** + * Creates a distributed::RowGatherer from a given collective communicator + * and index map. + * + * @TODO: using a segmented array instead of the imap would probably be + * more general + * + * @tparam GlobalIndexType the global index type of the index map + * + * @param exec the executor + * @param coll_comm the collective communicator + * @param imap the index map defining which rows to gather + * + * @note The coll_comm and imap have to be compatible. The coll_comm must + * send and recv exactly as many rows as the imap defines. + * + * @return a shared_ptr to the created distributed::RowGatherer + */ + template = + sizeof(LocalIndexType)>> + static std::shared_ptr create( + std::shared_ptr exec, + std::shared_ptr coll_comm, + const index_map& imap) + { + return std::shared_ptr( + new RowGatherer(std::move(exec), std::move(coll_comm), imap)); + } + + /* + * Create method for an empty RowGatherer. + */ + static std::shared_ptr create( + std::shared_ptr exec, mpi::communicator comm) + { + return std::shared_ptr( + new RowGatherer(std::move(exec), std::move(comm))); + } + + RowGatherer(const RowGatherer& o); + + RowGatherer(RowGatherer&& o) noexcept; + + RowGatherer& operator=(const RowGatherer& o); + + RowGatherer& operator=(RowGatherer&& o); + +protected: + void apply_impl(const LinOp* b, LinOp* x) const override; + + void apply_impl(const LinOp* alpha, const LinOp* b, const LinOp* beta, + LinOp* x) const override; + +private: + /** + * @copydoc RowGatherer::create(std::shared_ptr, std::shared_ptr, + * const index_map&) + */ + template + RowGatherer(std::shared_ptr exec, + std::shared_ptr coll_comm, + const index_map& imap); + + /** + * @copydoc RowGatherer::create(std::shared_ptr, mpi::communicator) + */ + RowGatherer(std::shared_ptr exec, mpi::communicator comm); + + std::shared_ptr coll_comm_; + + array send_idxs_; + + mutable array send_workspace_; + + mutable MPI_Request req_listener_{MPI_REQUEST_NULL}; +}; + + +} // namespace distributed +} // namespace experimental +} // namespace gko + +#endif +#endif // GKO_PUBLIC_CORE_DISTRIBUTED_ROW_GATHERER_HPP_ diff --git a/include/ginkgo/ginkgo.hpp b/include/ginkgo/ginkgo.hpp index 9d897ce8762..9051b95e5ef 100644 --- a/include/ginkgo/ginkgo.hpp +++ b/include/ginkgo/ginkgo.hpp @@ -72,6 +72,7 @@ #include +#include #include #include diff --git a/reference/components/precision_conversion_kernels.cpp b/reference/components/precision_conversion_kernels.cpp index db12d9316ee..059d8922939 100644 --- a/reference/components/precision_conversion_kernels.cpp +++ b/reference/components/precision_conversion_kernels.cpp @@ -20,7 +20,7 @@ void convert_precision(std::shared_ptr exec, std::copy_n(in, size, out); } -GKO_INSTANTIATE_FOR_EACH_VALUE_CONVERSION(GKO_DECLARE_CONVERT_PRECISION_KERNEL); +GKO_INSTANTIATE_FOR_EACH_POD_CONVERSION(GKO_DECLARE_CONVERT_PRECISION_KERNEL); } // namespace components