From 98bebf4f69b49db2d3ca8f59f526cac8193b216f Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 14 Jun 2024 17:20:06 -0300 Subject: [PATCH] Take: Support CA->C and CC->C cases directly in "array_take" with 2 strategies --- .../kernels/vector_selection_take_internal.cc | 310 ++++++++++++------ 1 file changed, 209 insertions(+), 101 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc index a941578f862d9..fefb299fa1f00 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc @@ -736,18 +736,6 @@ class TakeMetaFunction : public MetaFunction { return array_take_func->Execute(args, &options, ctx); } - static Result> ChunkedArrayAsArray( - const std::shared_ptr& values, MemoryPool* pool) { - switch (values->num_chunks()) { - case 0: - return MakeArrayOfNull(values->type(), /*length=*/0, pool); - case 1: - return values->chunk(0); - default: - return Concatenate(values->chunks(), pool); - } - } - private: static Result> TakeAAA(const std::vector& args, const TakeOptions& options, @@ -758,52 +746,37 @@ class TakeMetaFunction : public MetaFunction { return result.array(); } - static Result> TakeCAC( - const std::shared_ptr& values, const Array& indices, - const TakeOptions& options, ExecContext* ctx) { + static Result> TakeCAC(const std::vector& args, + const TakeOptions& options, + ExecContext* ctx) { // "array_take" can handle CA->C cases directly // (via their VectorKernel::exec_chunked) - ARROW_ASSIGN_OR_RAISE(auto result, CallArrayTake({values, indices}, options, ctx)); + DCHECK_EQ(args[0].kind(), Datum::CHUNKED_ARRAY); + DCHECK_EQ(args[1].kind(), Datum::ARRAY); + ARROW_ASSIGN_OR_RAISE(auto result, CallArrayTake(args, options, ctx)); return result.chunked_array(); } - static Result> TakeCCC( - const std::shared_ptr& values, - const std::shared_ptr& indices, const TakeOptions& options, - ExecContext* ctx) { - // XXX: for every chunk in indices, values are gathered from all chunks in values to - // form a new chunk in the result. Performing this concatenation is not ideal, but - // greatly simplifies the implementation before something more efficient is - // implemented. - ARROW_ASSIGN_OR_RAISE(auto values_array, - ChunkedArrayAsArray(values, ctx->memory_pool())); - std::vector args = {std::move(values_array), {}}; - std::vector> new_chunks; - new_chunks.resize(indices->num_chunks()); - for (int i = 0; i < indices->num_chunks(); i++) { - args[1] = indices->chunk(i); - // XXX: this loop can use TakeCAA once it can handle ChunkedArray - // without concatenating first - ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(args, options, ctx)); - new_chunks[i] = MakeArray(chunk); - } - return std::make_shared(std::move(new_chunks), values->type()); + static Result> TakeCCC(const std::vector& args, + const TakeOptions& options, + ExecContext* ctx) { + // "array_take" can handle CC->C cases directly + // (via their VectorKernel::exec_chunked) + DCHECK_EQ(args[0].kind(), Datum::CHUNKED_ARRAY); + DCHECK_EQ(args[1].kind(), Datum::CHUNKED_ARRAY); + ARROW_ASSIGN_OR_RAISE(auto result, CallArrayTake(args, options, ctx)); + return result.chunked_array(); } - static Result> TakeACC(const Array& values, - const ChunkedArray& indices, + static Result> TakeACC(const std::vector& args, const TakeOptions& options, ExecContext* ctx) { - auto num_chunks = indices.num_chunks(); - std::vector> new_chunks(num_chunks); - std::vector args = {values, {}}; - for (int i = 0; i < num_chunks; i++) { - // Take with that indices chunk - args[1] = indices.chunk(i); - ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(args, options, ctx)); - new_chunks[i] = MakeArray(chunk); - } - return std::make_shared(std::move(new_chunks), values.type()); + // "array_take" can handle AC->C cases directly + // (via their VectorKernel::exec_chunked) + DCHECK_EQ(args[0].kind(), Datum::ARRAY); + DCHECK_EQ(args[1].kind(), Datum::CHUNKED_ARRAY); + ARROW_ASSIGN_OR_RAISE(auto result, CallArrayTake(args, options, ctx)); + return result.chunked_array(); } static Result> TakeRAR(const RecordBatch& batch, @@ -828,9 +801,10 @@ class TakeMetaFunction : public MetaFunction { ExecContext* ctx) { auto ncols = table->num_columns(); std::vector> columns(ncols); - + std::vector args = {/*placeholder*/ {}, indices}; for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE(columns[j], TakeCAC(table->column(j), indices, options, ctx)); + args[0] = table->column(j); + ARROW_ASSIGN_OR_RAISE(columns[j], TakeCAC(args, options, ctx)); } return Table::Make(table->schema(), std::move(columns)); } @@ -840,8 +814,10 @@ class TakeMetaFunction : public MetaFunction { const TakeOptions& options, ExecContext* ctx) { auto ncols = table->num_columns(); std::vector> columns(ncols); + std::vector args = {/*placeholder*/ {}, indices}; for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE(columns[j], TakeCCC(table->column(j), indices, options, ctx)); + args[0] = table->column(j); + ARROW_ASSIGN_OR_RAISE(columns[j], TakeCCC(args, options, ctx)); } return Table::Make(table->schema(), std::move(columns)); } @@ -854,18 +830,17 @@ class TakeMetaFunction : public MetaFunction { const auto& take_opts = static_cast(*options); switch (args[0].kind()) { case Datum::ARRAY: - if (index_kind == Datum::ARRAY) { - return TakeAAA(args, take_opts, ctx); - } else if (index_kind == Datum::CHUNKED_ARRAY) { - return TakeACC(*args[0].make_array(), *args[1].chunked_array(), take_opts, ctx); + // "array_take" can handle AA->A and AC->C cases directly + // (via their VectorKernel::exec and VectorKernel::exec_chunked) + if (index_kind == Datum::ARRAY || index_kind == Datum::CHUNKED_ARRAY) { + return CallArrayTake(args, take_opts, ctx); } break; case Datum::CHUNKED_ARRAY: - if (index_kind == Datum::ARRAY) { - return TakeCAC(args[0].chunked_array(), *args[1].make_array(), take_opts, ctx); - } else if (index_kind == Datum::CHUNKED_ARRAY) { - return TakeCCC(args[0].chunked_array(), args[1].chunked_array(), take_opts, - ctx); + // "array_take" can handle CA->C and CC->C cases directly + // (via their VectorKernel::exec_chunked) + if (index_kind == Datum::ARRAY || index_kind == Datum::CHUNKED_ARRAY) { + return CallArrayTake(args, take_opts, ctx); } break; case Datum::RECORD_BATCH: @@ -887,7 +862,7 @@ class TakeMetaFunction : public MetaFunction { return Status::NotImplemented( "Unsupported types for take operation: " "values=", - args[0].ToString(), "indices=", args[1].ToString()); + args[0].ToString(), ", indices=", args[1].ToString()); } }; @@ -901,6 +876,18 @@ std::shared_ptr PrepareOutput(const ExecBatch& batch, int64_t length) return out; } +Result> ChunkedArrayAsArray( + const std::shared_ptr& values, MemoryPool* pool) { + switch (values->num_chunks()) { + case 0: + return MakeArrayOfNull(values->type(), /*length=*/0, pool); + case 1: + return values->chunk(0); + default: + return Concatenate(values->chunks(), pool); + } +} + Status CallAAAKernel(ArrayKernelExec take_aaa_exec, KernelContext* ctx, std::shared_ptr values, std::shared_ptr indices, Datum* out) { @@ -914,31 +901,97 @@ Status CallAAAKernel(ArrayKernelExec take_aaa_exec, KernelContext* ctx, return take_aaa_exec(ctx, exec_span, &result); } -/// \brief Generic VectorKernel::exec_chunked for CA->A cases. +Status CallCAAKernel(VectorKernel::ChunkedExec take_caa_exec, KernelContext* ctx, + std::shared_ptr values, + std::shared_ptr indices, Datum* out) { + int64_t batch_length = values->length(); + std::vector args = {std::move(values), std::move(indices)}; + ExecBatch chunked_array_array_batch(std::move(args), batch_length); + DCHECK_EQ(out->kind(), Datum::ARRAY); + return take_caa_exec(ctx, chunked_array_array_batch, out); +} + +Status TakeACCChunkedExec(ArrayKernelExec take_aaa_exec, KernelContext* ctx, + const ExecBatch& batch, Datum* out) { + auto& values = batch.values[0].array(); + auto& indices = batch.values[1].chunked_array(); + auto num_chunks = indices->num_chunks(); + std::vector> new_chunks(num_chunks); + for (int i = 0; i < num_chunks; i++) { + // Take with that indices chunk + auto& indices_chunk = indices->chunk(i)->data(); + Datum result = PrepareOutput(batch, values->length); + RETURN_NOT_OK(CallAAAKernel(take_aaa_exec, ctx, values, indices_chunk, out)); + new_chunks[i] = MakeArray(result.array()); + } + out->value = std::make_shared(std::move(new_chunks), values->type); + return Status::OK(); +} + +/// \brief Generic (slower) VectorKernel::exec_chunked (`CA->C`, `CC->C`, and `AC->C`). +/// +/// This function concatenates the chunks of the values and then calls the `AA->A` take +/// kernel to handle the `CA->C` cases. The ArrayData returned by the `AA->A` kernel is +/// converted to a ChunkedArray with a single chunk to honor the `CA->C` contract. +/// +/// For `CC->C` cases, it concatenates the chunks of the values and calls the `AA->A` take +/// kernel for each chunk of the indices, producing a new chunked array with the same +/// shape as the indices. /// -/// This function concatenates the chunks of values and then calls the -/// AA->A take kernel. +/// `AC->C` cases are trivially delegated to TakeACCChunkedExec without any concatenation. /// -/// \param take_aaa_exec The AA->A take kernel to use. +/// \param take_aaa_exec The `AA->A` take kernel to use. Status GenericTakeChunkedExec(ArrayKernelExec take_aaa_exec, KernelContext* ctx, const ExecBatch& batch, Datum* out) { - auto& args = batch.values; - if (args[0].kind() == Datum::CHUNKED_ARRAY && args[1].kind() == Datum::ARRAY) { - auto& values = args[0].chunked_array(); - auto& indices = args[1].array(); - ARROW_ASSIGN_OR_RAISE(auto values_array, TakeMetaFunction::ChunkedArrayAsArray( - values, ctx->memory_pool())); - DCHECK_EQ(values_array->length(), batch.length); - Datum result = PrepareOutput(batch, batch.length); - RETURN_NOT_OK( - CallAAAKernel(take_aaa_exec, ctx, values_array->data(), indices, &result)); - out->value = std::make_shared(MakeArray(result.array())); - return Status::OK(); + const auto& args = batch.values; + if (args[0].kind() == Datum::CHUNKED_ARRAY) { + auto& values_chunked = args[0].chunked_array(); + ARROW_ASSIGN_OR_RAISE(auto values_array, + ChunkedArrayAsArray(values_chunked, ctx->memory_pool())); + if (args[1].kind() == Datum::ARRAY) { + // CA->C + auto& indices = args[1].array(); + DCHECK_EQ(values_array->length(), batch.length); + { + // AA->A + RETURN_NOT_OK( + CallAAAKernel(take_aaa_exec, ctx, values_array->data(), indices, out)); + out->value = std::make_shared(MakeArray(out->array())); + } + return Status::OK(); + } else if (args[1].kind() == Datum::CHUNKED_ARRAY) { + // CC->C + const auto& indices = args[1].chunked_array(); + std::vector> new_chunks; + for (int i = 0; i < indices->num_chunks(); i++) { + // AA->A + auto& indices_chunk = indices->chunk(i)->data(); + Datum result = PrepareOutput(batch, values_array->length()); + RETURN_NOT_OK(CallAAAKernel(take_aaa_exec, ctx, values_array->data(), + indices_chunk, &result)); + new_chunks.push_back(MakeArray(result.array())); + } + DCHECK(out->is_array()); + out->value = + std::make_shared(std::move(new_chunks), values_chunked->type()); + return Status::OK(); + } + } else { + // VectorKernel::exec_chunked are only called when at least one of the inputs is + // chunked, so we should be able to assume that args[1] is a chunked array when + // everything is wired up correctly. + if (args[1].kind() == Datum::CHUNKED_ARRAY) { + // AC->C + return TakeACCChunkedExec(take_aaa_exec, ctx, batch, out); + } else { + DCHECK(false) << "Unexpected kind for array_take's exec_chunked kernel: values=" + << args[0].ToString() << ", indices=" << args[1].ToString(); + } } return Status::NotImplemented( "Unsupported kinds for 'array_take', try using 'take': " "values=", - args[0].ToString(), "indices=", args[1].ToString()); + args[0].ToString(), ", indices=", args[1].ToString()); } template @@ -948,41 +1001,96 @@ struct GenericTakeChunkedExecFunctor { } }; +/// \brief Specialized (faster) VectorKernel::exec_chunked (`CA->C`, `CC->C`, `AC->C`). +/// +/// This function doesn't ever need to concatenate the chunks of the values, so it can be +/// more efficient than GenericTakeChunkedExec that can only delegate to the `AA->A` take +/// kernels. +/// +/// For `CA->C` cases, it can call the `CA->A` take kernel directly [1] and trivially +/// convert the result to a ChunkedArray of a single chunk to honor the `CA->C` contract. +/// +/// For `CC->C` cases it can call the `CA->A` take kernel for each chunk of the indices to +/// get each chunk that becomes the ChunkedArray output. +/// +/// `AC->C` cases are trivially delegated to TakeACCChunkedExec. +/// +/// \param take_aaa_exec The `AA->A` take kernel to use. Status SpecialTakeChunkedExec(const ArrayKernelExec take_aaa_exec, VectorKernel::ChunkedExec take_caa_exec, KernelContext* ctx, const ExecBatch& batch, Datum* out) { Datum result = PrepareOutput(batch, batch.length); auto* pool = ctx->memory_pool(); - auto& args = batch.values; - if (args[0].kind() == Datum::CHUNKED_ARRAY && args[1].kind() == Datum::ARRAY) { - auto& values = args[0].chunked_array(); - auto& indices = args[1].array(); + const auto& args = batch.values; + if (args[0].kind() == Datum::CHUNKED_ARRAY) { + auto& values_chunked = args[0].chunked_array(); std::shared_ptr single_chunk = nullptr; - if (values->num_chunks() == 0 || values->length() == 0) { + if (values_chunked->num_chunks() == 0 || values_chunked->length() == 0) { ARROW_ASSIGN_OR_RAISE(single_chunk, - MakeArrayOfNull(values->type(), /*length=*/0, pool)); - } else if (values->num_chunks() == 1) { - single_chunk = values->chunk(0); + MakeArrayOfNull(values_chunked->type(), /*length=*/0, pool)); + } else if (values_chunked->num_chunks() == 1) { + single_chunk = values_chunked->chunk(0); } - if (single_chunk) { - DCHECK_EQ(single_chunk->length(), batch.length); - // If the ChunkedArray was cheaply converted to a single chunk, - // we can use the AA->A take kernel directly. - RETURN_NOT_OK( - CallAAAKernel(take_aaa_exec, ctx, single_chunk->data(), indices, &result)); + + if (args[1].kind() == Datum::ARRAY) { + // CA->C + auto& indices = args[1].array(); + if (single_chunk) { + // AA->A + DCHECK_EQ(single_chunk->length(), batch.length); + // If the ChunkedArray was cheaply converted to a single chunk, + // we can use the AA->A take kernel directly. + RETURN_NOT_OK( + CallAAAKernel(take_aaa_exec, ctx, single_chunk->data(), indices, out)); + out->value = std::make_shared(MakeArray(out->array())); + return Status::OK(); + } + // Instead of concatenating the chunks, we call the CA->A take kernel + // which has a more efficient implementation for this case. At this point, + // that implementation doesn't have to care about empty or single-chunk + // ChunkedArrays. + RETURN_NOT_OK(take_caa_exec(ctx, batch, &result)); + out->value = std::make_shared(MakeArray(result.array())); + return Status::OK(); + } else { + // CC->C + const auto& indices = args[1].chunked_array(); + std::vector> new_chunks; + for (int i = 0; i < indices->num_chunks(); i++) { + auto& indices_chunk = indices->chunk(i)->data(); + result = PrepareOutput(batch, values_chunked->length()); + if (single_chunk) { + // If the ChunkedArray was cheaply converted to a single chunk, + // we can use the AA->A take kernel directly. + RETURN_NOT_OK(CallAAAKernel(take_aaa_exec, ctx, single_chunk->data(), + indices_chunk, &result)); + } else { + RETURN_NOT_OK( + CallCAAKernel(take_caa_exec, ctx, values_chunked, indices_chunk, &result)); + } + new_chunks.push_back(MakeArray(result.array())); + } + DCHECK(out->is_array()); + out->value = + std::make_shared(std::move(new_chunks), values_chunked->type()); + return Status::OK(); + } + } else { + // VectorKernel::exec_chunked are only called when at least one of the inputs is + // chunked, so we should be able to assume that args[1] is a chunked array when + // everything is wired up correctly. + if (args[1].kind() == Datum::CHUNKED_ARRAY) { + // AC->C + return TakeACCChunkedExec(take_aaa_exec, ctx, batch, out); + } else { + DCHECK(false) << "Unexpected kind for array_take's exec_chunked kernel: values=" + << args[0].ToString() << ", indices=" << args[1].ToString(); } - // Instead of concatenating the chunks, we call the CA->A take kernel - // which has a more efficient implementation for this case. At this point, - // that implementation doesn't have to care about empty or single-chunk - // ChunkedArrays. - RETURN_NOT_OK(take_caa_exec(ctx, batch, &result)); - out->value = std::make_shared(MakeArray(result.array())); - return Status::OK(); } return Status::NotImplemented( "Unsupported kinds for 'array_take', try using 'take': " "values=", - args[0].ToString(), "indices=", args[1].ToString()); + args[0].ToString(), ", indices=", args[1].ToString()); } template