Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds distributed row gatherer #1589

Open
wants to merge 8 commits into
base: neighborhood-communicator
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/unified/components/precision_conversion_kernels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ void convert_precision(std::shared_ptr<const DefaultExecutor> exec,
size, in, out);
}

GKO_INSTANTIATE_FOR_EACH_VALUE_CONVERSION(GKO_DECLARE_CONVERT_PRECISION_KERNEL);
GKO_INSTANTIATE_FOR_EACH_POD_CONVERSION(GKO_DECLARE_CONVERT_PRECISION_KERNEL);


} // namespace components
Expand Down
1 change: 1 addition & 0 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ if(GINKGO_BUILD_MPI)
distributed/matrix.cpp
distributed/neighborhood_communicator.cpp
distributed/partition_helpers.cpp
distributed/row_gatherer.cpp
distributed/vector.cpp
distributed/preconditioner/schwarz.cpp)
endif()
Expand Down
2 changes: 1 addition & 1 deletion core/base/array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void convert_data(std::shared_ptr<const Executor> exec, size_type size,
void convert_data<From, To>(std::shared_ptr<const Executor>, size_type, \
const From*, To*)

GKO_INSTANTIATE_FOR_EACH_VALUE_CONVERSION(GKO_DECLARE_ARRAY_CONVERSION);
GKO_INSTANTIATE_FOR_EACH_POD_CONVERSION(GKO_DECLARE_ARRAY_CONVERSION);


} // namespace detail
Expand Down
12 changes: 6 additions & 6 deletions core/device_hooks/common_kernels.inc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@
_macro(SourceType, TargetType) GKO_NOT_COMPILED(GKO_HOOK_MODULE); \
GKO_INSTANTIATE_FOR_EACH_VALUE_CONVERSION(_macro)

#define GKO_STUB_POD_CONVERSION(_macro) \
template <typename SourceType, typename TargetType> \
_macro(SourceType, TargetType) GKO_NOT_COMPILED(GKO_HOOK_MODULE); \
GKO_INSTANTIATE_FOR_EACH_POD_CONVERSION(_macro)

#define GKO_STUB_NON_COMPLEX_VALUE_TYPE(_macro) \
template <typename ValueType> \
_macro(ValueType) GKO_NOT_COMPILED(GKO_HOOK_MODULE); \
Expand Down Expand Up @@ -147,11 +152,6 @@
_macro(IndexType) GKO_NOT_COMPILED(GKO_HOOK_MODULE); \
GKO_INSTANTIATE_FOR_EACH_TEMPLATE_TYPE(_macro)

#define GKO_STUB_VALUE_CONVERSION(_macro) \
template <typename SourceType, typename TargetType> \
_macro(SourceType, TargetType) GKO_NOT_COMPILED(GKO_HOOK_MODULE); \
GKO_INSTANTIATE_FOR_EACH_VALUE_CONVERSION(_macro)

#define GKO_STUB_VALUE_CONVERSION_OR_COPY(_macro) \
template <typename SourceType, typename TargetType> \
_macro(SourceType, TargetType) GKO_NOT_COMPILED(GKO_HOOK_MODULE); \
Expand All @@ -173,7 +173,7 @@ namespace GKO_HOOK_MODULE {
namespace components {


GKO_STUB_VALUE_CONVERSION(GKO_DECLARE_CONVERT_PRECISION_KERNEL);
GKO_STUB_POD_CONVERSION(GKO_DECLARE_CONVERT_PRECISION_KERNEL);
GKO_STUB_INDEX_TYPE(GKO_DECLARE_PREFIX_SUM_NONNEGATIVE_KERNEL);
// explicitly instantiate for size_type, as this is
// used in the SellP format
Expand Down
200 changes: 46 additions & 154 deletions core/distributed/matrix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "ginkgo/core/distributed/matrix.hpp"

#include <ginkgo/core/base/precision_dispatch.hpp>
#include <ginkgo/core/distributed/neighborhood_communicator.hpp>
#include <ginkgo/core/distributed/vector.hpp>
#include <ginkgo/core/matrix/coo.hpp>
#include <ginkgo/core/matrix/csr.hpp>
Expand All @@ -30,57 +31,14 @@ GKO_REGISTER_OPERATION(separate_local_nonlocal,

template <typename LocalIndexType, typename GlobalIndexType>
void initialize_communication_pattern(
std::shared_ptr<const Executor> exec, mpi::communicator comm,
const index_map<LocalIndexType, GlobalIndexType>& imap,
std::vector<comm_index_type>& recv_sizes,
std::vector<comm_index_type>& recv_offsets,
std::vector<comm_index_type>& send_sizes,
std::vector<comm_index_type>& send_offsets,
array<LocalIndexType>& gather_idxs)
std::shared_ptr<RowGatherer<LocalIndexType>>& row_gatherer)
{
// exchange step 1: determine recv_sizes, send_sizes, send_offsets
auto host_recv_targets =
make_temporary_clone(exec->get_master(), &imap.get_remote_target_ids());
auto host_offsets = make_temporary_clone(
exec->get_master(), &imap.get_remote_global_idxs().get_offsets());
auto compute_recv_sizes = [](const auto* recv_targets, size_type size,
const auto* offsets, auto& recv_sizes) {
for (size_type i = 0; i < size; ++i) {
recv_sizes[recv_targets[i]] = offsets[i + 1] - offsets[i];
}
};
std::fill(recv_sizes.begin(), recv_sizes.end(), 0);
compute_recv_sizes(host_recv_targets->get_const_data(),
host_recv_targets->get_size(),
host_offsets->get_const_data(), recv_sizes);
std::partial_sum(recv_sizes.begin(), recv_sizes.end(),
recv_offsets.begin() + 1);
comm.all_to_all(exec, recv_sizes.data(), 1, send_sizes.data(), 1);
std::partial_sum(send_sizes.begin(), send_sizes.end(),
send_offsets.begin() + 1);
send_offsets[0] = 0;
recv_offsets[0] = 0;

// exchange step 2: exchange gather_idxs from receivers to senders
auto recv_gather_idxs =
make_const_array_view(
imap.get_executor(), imap.get_non_local_size(),
imap.get_remote_local_idxs().get_const_flat_data())
.copy_to_array();
auto use_host_buffer = mpi::requires_host_buffer(exec, comm);
if (use_host_buffer) {
recv_gather_idxs.set_executor(exec->get_master());
gather_idxs.clear();
gather_idxs.set_executor(exec->get_master());
}
gather_idxs.resize_and_reset(send_offsets.back());
comm.all_to_all_v(use_host_buffer ? exec->get_master() : exec,
recv_gather_idxs.get_const_data(), recv_sizes.data(),
recv_offsets.data(), gather_idxs.get_data(),
send_sizes.data(), send_offsets.data());
if (use_host_buffer) {
gather_idxs.set_executor(exec);
}
row_gatherer = RowGatherer<LocalIndexType>::create(
row_gatherer->get_executor(),
row_gatherer->get_collective_communicator()->create_with_same_type(
row_gatherer->get_communicator(), imap),
imap);
}


Expand All @@ -101,12 +59,8 @@ Matrix<ValueType, LocalIndexType, GlobalIndexType>::Matrix(
: EnableDistributedLinOp<
Matrix<value_type, local_index_type, global_index_type>>{exec},
DistributedBase{comm},
imap_(exec),
send_offsets_(comm.size() + 1),
send_sizes_(comm.size()),
recv_offsets_(comm.size() + 1),
recv_sizes_(comm.size()),
gather_idxs_{exec},
row_gatherer_{RowGatherer<LocalIndexType>::create(exec, comm)},
imap_{exec},
one_scalar_{},
local_mtx_{local_matrix_template->clone(exec)},
non_local_mtx_{non_local_matrix_template->clone(exec)}
Expand All @@ -128,12 +82,8 @@ Matrix<ValueType, LocalIndexType, GlobalIndexType>::Matrix(
: EnableDistributedLinOp<
Matrix<value_type, local_index_type, global_index_type>>{exec},
DistributedBase{comm},
imap_(exec),
send_offsets_(comm.size() + 1),
send_sizes_(comm.size()),
recv_offsets_(comm.size() + 1),
recv_sizes_(comm.size()),
gather_idxs_{exec},
row_gatherer_{RowGatherer<LocalIndexType>::create(exec, comm)},
imap_{exec},
one_scalar_{},
non_local_mtx_(::gko::matrix::Coo<ValueType, LocalIndexType>::create(
exec, dim<2>{local_linop->get_size()[0], 0}))
Expand All @@ -152,12 +102,8 @@ Matrix<ValueType, LocalIndexType, GlobalIndexType>::Matrix(
: EnableDistributedLinOp<
Matrix<value_type, local_index_type, global_index_type>>{exec},
DistributedBase{comm},
row_gatherer_(RowGatherer<LocalIndexType>::create(exec, comm)),
imap_(std::move(imap)),
send_offsets_(comm.size() + 1),
send_sizes_(comm.size()),
recv_offsets_(comm.size() + 1),
recv_sizes_(comm.size()),
gather_idxs_{exec},
one_scalar_{}
{
this->set_size({imap_.get_global_size(), imap_.get_global_size()});
Expand All @@ -166,9 +112,7 @@ Matrix<ValueType, LocalIndexType, GlobalIndexType>::Matrix(
one_scalar_.init(exec, dim<2>{1, 1});
one_scalar_->fill(one<value_type>());

initialize_communication_pattern(
this->get_executor(), this->get_communicator(), imap_, recv_sizes_,
recv_offsets_, send_sizes_, send_offsets_, gather_idxs_);
initialize_communication_pattern(imap_, row_gatherer_);
}


Expand Down Expand Up @@ -235,12 +179,8 @@ void Matrix<ValueType, LocalIndexType, GlobalIndexType>::convert_to(
result->get_communicator().size());
result->local_mtx_->copy_from(this->local_mtx_);
result->non_local_mtx_->copy_from(this->non_local_mtx_);
result->row_gatherer_->copy_from(this->row_gatherer_);
result->imap_ = this->imap_;
result->gather_idxs_ = this->gather_idxs_;
result->send_offsets_ = this->send_offsets_;
result->recv_offsets_ = this->recv_offsets_;
result->recv_sizes_ = this->recv_sizes_;
result->send_sizes_ = this->send_sizes_;
result->set_size(this->get_size());
}

Expand All @@ -254,12 +194,8 @@ void Matrix<ValueType, LocalIndexType, GlobalIndexType>::move_to(
result->get_communicator().size());
result->local_mtx_->move_from(this->local_mtx_);
result->non_local_mtx_->move_from(this->non_local_mtx_);
result->row_gatherer_->move_from(this->row_gatherer_);
result->imap_ = std::move(this->imap_);
result->gather_idxs_ = std::move(this->gather_idxs_);
result->send_offsets_ = std::move(this->send_offsets_);
result->recv_offsets_ = std::move(this->recv_offsets_);
result->recv_sizes_ = std::move(this->recv_sizes_);
result->send_sizes_ = std::move(this->send_sizes_);
result->set_size(this->get_size());
this->set_size({});
}
Expand All @@ -282,7 +218,6 @@ void Matrix<ValueType, LocalIndexType, GlobalIndexType>::read_distributed(
auto local_part = comm.rank();

// set up LinOp sizes
auto num_parts = static_cast<size_type>(row_partition->get_num_parts());
auto global_num_rows = row_partition->get_size();
auto global_num_cols = col_partition->get_size();
dim<2> global_dim{global_num_rows, global_num_cols};
Expand Down Expand Up @@ -329,9 +264,7 @@ void Matrix<ValueType, LocalIndexType, GlobalIndexType>::read_distributed(
as<ReadableFromMatrixData<ValueType, LocalIndexType>>(this->non_local_mtx_)
->read(std::move(non_local_data));

initialize_communication_pattern(exec, comm, imap_, recv_sizes_,
recv_offsets_, send_sizes_, send_offsets_,
gather_idxs_);
initialize_communication_pattern(imap_, row_gatherer_);
}


Expand Down Expand Up @@ -373,55 +306,6 @@ void Matrix<ValueType, LocalIndexType, GlobalIndexType>::read_distributed(
}


template <typename ValueType, typename LocalIndexType, typename GlobalIndexType>
mpi::request Matrix<ValueType, LocalIndexType, GlobalIndexType>::communicate(
const local_vector_type* local_b) const
{
// This function can never return early!
// Even if the non-local part is empty, i.e. this process doesn't need
// any data from other processes, the used MPI calls are collective
// operations. They need to be called on all processes, even if a process
// might not communicate any data.
auto exec = this->get_executor();
const auto comm = this->get_communicator();
auto num_cols = local_b->get_size()[1];
auto send_size = send_offsets_.back();
auto recv_size = recv_offsets_.back();
auto send_dim = dim<2>{static_cast<size_type>(send_size), num_cols};
auto recv_dim = dim<2>{static_cast<size_type>(recv_size), num_cols};
recv_buffer_.init(exec, recv_dim);
send_buffer_.init(exec, send_dim);

local_b->row_gather(&gather_idxs_, send_buffer_.get());

auto use_host_buffer = mpi::requires_host_buffer(exec, comm);
if (use_host_buffer) {
host_recv_buffer_.init(exec->get_master(), recv_dim);
host_send_buffer_.init(exec->get_master(), send_dim);
host_send_buffer_->copy_from(send_buffer_.get());
}

mpi::contiguous_type type(num_cols, mpi::type_impl<ValueType>::get_type());
auto send_ptr = use_host_buffer ? host_send_buffer_->get_const_values()
: send_buffer_->get_const_values();
auto recv_ptr = use_host_buffer ? host_recv_buffer_->get_values()
: recv_buffer_->get_values();
exec->synchronize();
#ifdef GINKGO_HAVE_OPENMPI_PRE_4_1_X
comm.all_to_all_v(use_host_buffer ? exec->get_master() : exec, send_ptr,
send_sizes_.data(), send_offsets_.data(), type.get(),
recv_ptr, recv_sizes_.data(), recv_offsets_.data(),
type.get());
return {};
#else
return comm.i_all_to_all_v(
use_host_buffer ? exec->get_master() : exec, send_ptr,
send_sizes_.data(), send_offsets_.data(), type.get(), recv_ptr,
recv_sizes_.data(), recv_offsets_.data(), type.get());
#endif
}


template <typename ValueType, typename LocalIndexType, typename GlobalIndexType>
void Matrix<ValueType, LocalIndexType, GlobalIndexType>::apply_impl(
const LinOp* b, LinOp* x) const
Expand All @@ -437,16 +321,22 @@ void Matrix<ValueType, LocalIndexType, GlobalIndexType>::apply_impl(
dense_x->get_local_values()),
dense_x->get_local_vector()->get_stride());

auto exec = this->get_executor();
auto comm = this->get_communicator();
auto req = this->communicate(dense_b->get_local_vector());
auto recv_dim =
dim<2>{static_cast<size_type>(
row_gatherer_->get_collective_communicator()
->get_recv_size()),
dense_b->get_size()[1]};
auto recv_exec = mpi::requires_host_buffer(exec, comm)
? exec->get_master()
: exec;
recv_buffer_.init(recv_exec, recv_dim);
auto req =
this->row_gatherer_->apply_async(dense_b, recv_buffer_.get());
local_mtx_->apply(dense_b->get_local_vector(), local_x);
req.wait();

auto exec = this->get_executor();
auto use_host_buffer = mpi::requires_host_buffer(exec, comm);
if (use_host_buffer) {
recv_buffer_->copy_from(host_recv_buffer_.get());
}
non_local_mtx_->apply(one_scalar_.get(), recv_buffer_.get(),
one_scalar_.get(), local_x);
},
Expand All @@ -470,17 +360,23 @@ void Matrix<ValueType, LocalIndexType, GlobalIndexType>::apply_impl(
dense_x->get_local_values()),
dense_x->get_local_vector()->get_stride());

auto exec = this->get_executor();
auto comm = this->get_communicator();
auto req = this->communicate(dense_b->get_local_vector());
auto recv_dim =
dim<2>{static_cast<size_type>(
row_gatherer_->get_collective_communicator()
->get_recv_size()),
dense_b->get_size()[1]};
auto recv_exec = mpi::requires_host_buffer(exec, comm)
? exec->get_master()
: exec;
recv_buffer_.init(recv_exec, recv_dim);
auto req =
this->row_gatherer_->apply_async(dense_b, recv_buffer_.get());
local_mtx_->apply(local_alpha, dense_b->get_local_vector(),
local_beta, local_x);
req.wait();

auto exec = this->get_executor();
auto use_host_buffer = mpi::requires_host_buffer(exec, comm);
if (use_host_buffer) {
recv_buffer_->copy_from(host_recv_buffer_.get());
}
non_local_mtx_->apply(local_alpha, recv_buffer_.get(),
one_scalar_.get(), local_x);
},
Expand Down Expand Up @@ -563,6 +459,8 @@ Matrix<ValueType, LocalIndexType, GlobalIndexType>::Matrix(const Matrix& other)
: EnableDistributedLinOp<Matrix<value_type, local_index_type,
global_index_type>>{other.get_executor()},
DistributedBase{other.get_communicator()},
row_gatherer_{RowGatherer<LocalIndexType>::create(
other.get_executor(), other.get_communicator())},
imap_(other.get_executor())
{
*this = other;
Expand All @@ -575,6 +473,8 @@ Matrix<ValueType, LocalIndexType, GlobalIndexType>::Matrix(
: EnableDistributedLinOp<Matrix<value_type, local_index_type,
global_index_type>>{other.get_executor()},
DistributedBase{other.get_communicator()},
row_gatherer_{RowGatherer<LocalIndexType>::create(
other.get_executor(), other.get_communicator())},
imap_(other.get_executor())
{
*this = std::move(other);
Expand All @@ -592,12 +492,8 @@ Matrix<ValueType, LocalIndexType, GlobalIndexType>::operator=(
this->set_size(other.get_size());
local_mtx_->copy_from(other.local_mtx_);
non_local_mtx_->copy_from(other.non_local_mtx_);
row_gatherer_->copy_from(other.row_gatherer_);
imap_ = other.imap_;
gather_idxs_ = other.gather_idxs_;
send_offsets_ = other.send_offsets_;
recv_offsets_ = other.recv_offsets_;
send_sizes_ = other.send_sizes_;
recv_sizes_ = other.recv_sizes_;
one_scalar_.init(this->get_executor(), dim<2>{1, 1});
one_scalar_->fill(one<value_type>());
}
Expand All @@ -616,12 +512,8 @@ Matrix<ValueType, LocalIndexType, GlobalIndexType>::operator=(Matrix&& other)
other.set_size({});
local_mtx_->move_from(other.local_mtx_);
non_local_mtx_->move_from(other.non_local_mtx_);
row_gatherer_->move_from(other.row_gatherer_);
imap_ = std::move(other.imap_);
gather_idxs_ = std::move(other.gather_idxs_);
send_offsets_ = std::move(other.send_offsets_);
recv_offsets_ = std::move(other.recv_offsets_);
send_sizes_ = std::move(other.send_sizes_);
recv_sizes_ = std::move(other.recv_sizes_);
one_scalar_.init(this->get_executor(), dim<2>{1, 1});
one_scalar_->fill(one<value_type>());
}
Expand Down
Loading
Loading