Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-39565: [C++] Do not concatenate chunked values of fixed-width types to run "array_take" #41700

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
844aa5e
Take: Add VectorKernel::ChunkedExec to SelectionKernelData
felipecrv May 3, 2024
9797c9f
Take: VectorKernel::output_chunked should be false for "array_take"
felipecrv May 4, 2024
a135471
Take: Make "array_take" handle CA->C cases by populating VectorKernel…
felipecrv May 4, 2024
36b69af
gather_internal.h: Introduce GatherFromChunks
felipecrv Apr 27, 2024
d4b48a3
Take: Introduce ValueSpan to delay dispatching on chunked-ness
felipecrv May 5, 2024
3435bd5
Take: Implement the FixedWidthTakeChunkedExec() kernel using GatherFr…
felipecrv May 5, 2024
40f2422
Take: Adapt kernel to the ChunkResolver changes
felipecrv Sep 2, 2024
022f6a0
TakeMetaFunction: Update comment about what the MetaFunction does
felipecrv Jun 12, 2024
7f1bc10
Take: Support CA->C and CC->C cases directly in "array_take" with 2 s…
felipecrv Jun 14, 2024
ace70fb
Take: Simplify TakeMetaFunction even further
felipecrv Jun 15, 2024
4a48d3e
Remove all ARROW_NOINLINE from vector_selection_take_internal.cc
felipecrv Jun 26, 2024
49b5e97
gather_intenal.h: Clarify the semantics of ValiditySpan/IsSrcValid
felipecrv Jun 26, 2024
575d6df
Take: Fix silly mistake
felipecrv Aug 16, 2024
d47462e
Small fixes from PR feedback
felipecrv Aug 20, 2024
566a113
Take: Use fixed size blocks of locations when running TakeCA
felipecrv Sep 1, 2024
408b4d8
Take: Lazily build a ChunkResolver from ValuesSpan
felipecrv Sep 1, 2024
e638991
Take: Move the ValuesSpan class to the header
felipecrv Sep 1, 2024
54d410d
Selection: Fix UB -- nothing guarantees these references to spans are…
felipecrv Sep 1, 2024
3140ceb
Selection: Make sub-classes constructable with ValueSpan and ArraySpan's
felipecrv Sep 1, 2024
018320d
Take: Create a signature for Take kernels support AAA and CAA calls
felipecrv Sep 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 160 additions & 7 deletions cpp/src/arrow/compute/kernels/gather_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <vector>

#include "arrow/array/array_base.h"
#include "arrow/array/data.h"
#include "arrow/chunk_resolver.h"
#include "arrow/chunked_array.h"
#include "arrow/type_fwd.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_block_counter.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_util.h"
Expand Down Expand Up @@ -52,6 +58,15 @@ class GatherBaseCRTP {
ARROW_DEFAULT_MOVE_AND_ASSIGN(GatherBaseCRTP);

protected:
template <typename IndexCType>
bool IsSrcValid(const ArraySpan& src_validity, const IndexCType* idx,
int64_t position) const {
// Translate position into index on the source
const int64_t index = idx[position];
ARROW_COMPILER_ASSUME(src_validity.buffers[0].data != nullptr);
return src_validity.IsValid(index);
}

ARROW_FORCE_INLINE int64_t ExecuteNoNulls(int64_t idx_length) {
auto* self = static_cast<GatherImpl*>(this);
for (int64_t position = 0; position < idx_length; position++) {
Expand All @@ -76,8 +91,12 @@ class GatherBaseCRTP {
// doesn't have to be called for resulting null positions. A position is
// considered null if either the index or the source value is null at that
// position.
template <bool kOutputIsZeroInitialized, typename IndexCType>
ARROW_FORCE_INLINE int64_t ExecuteWithNulls(const ArraySpan& src_validity,
//
// ValiditySpan is any class that `GatherImpl::IsSrcValid(src_validity, idx, position)`
// can be called with.
template <bool kOutputIsZeroInitialized, typename IndexCType,
class ValiditySpan = ArraySpan>
pitrou marked this conversation as resolved.
Show resolved Hide resolved
ARROW_FORCE_INLINE int64_t ExecuteWithNulls(const ValiditySpan& src_validity,
int64_t idx_length, const IndexCType* idx,
const ArraySpan& idx_validity,
uint8_t* out_is_valid) {
Expand Down Expand Up @@ -116,12 +135,11 @@ class GatherBaseCRTP {
position += block.length;
}
} else {
// Source values may be null, so we must do random access into src_validity
// Source values may be null, so we must do random access with IsSrcValid()
if (block.popcount == block.length) {
// Faster path: indices are not null but source values may be
for (int64_t i = 0; i < block.length; ++i) {
ARROW_COMPILER_ASSUME(src_validity.buffers[0].data != nullptr);
if (src_validity.IsValid(idx[position])) {
if (self->IsSrcValid(src_validity, idx, position)) {
// value is not null
self->WriteValue(position);
bit_util::SetBit(out_is_valid, position);
Expand All @@ -136,9 +154,9 @@ class GatherBaseCRTP {
// random access in general we have to check the value nullness one by
// one.
for (int64_t i = 0; i < block.length; ++i) {
ARROW_COMPILER_ASSUME(src_validity.buffers[0].data != nullptr);
ARROW_COMPILER_ASSUME(idx_validity.buffers[0].data != nullptr);
if (idx_validity.IsValid(position) && src_validity.IsValid(idx[position])) {
if (idx_validity.IsValid(position) &&
self->IsSrcValid(src_validity, idx, position)) {
// index is not null && value is not null
self->WriteValue(position);
bit_util::SetBit(out_is_valid, position);
Expand Down Expand Up @@ -303,4 +321,139 @@ class Gather</*kValueWidthInBits=*/1, IndexCType, /*kWithFactor=*/false>
}
};

template <typename IndexCType>
struct ChunkedValiditySpan {
const ChunkedArray& chunks_validity;
const TypedChunkLocation<IndexCType>* chunk_location_vec;
const bool may_have_nulls;

ChunkedValiditySpan(const ChunkedArray& chunks_validity,
const TypedChunkLocation<IndexCType>* chunk_location_vec)
: chunks_validity(chunks_validity),
chunk_location_vec(chunk_location_vec),
may_have_nulls(chunks_validity.null_count() > 0) {}

bool MayHaveNulls() const { return may_have_nulls; }

bool IsSrcValid(const IndexCType* idx, int64_t position) const {
// idx is unused because all the indices have been pre-resolved into
// `chunk_location_vec` by ChunkResolver::ResolveMany.
ARROW_UNUSED(idx);
auto loc = chunk_location_vec[position];
return chunks_validity.chunk(static_cast<int>(loc.chunk_index))
->IsValid(loc.index_in_chunk);
}
};

template <int kValueWidthInBits, typename IndexCType, bool kWithFactor>
class GatherFromChunks
: public GatherBaseCRTP<
GatherFromChunks<kValueWidthInBits, IndexCType, kWithFactor>> {
private:
static_assert(!kWithFactor || kValueWidthInBits == 8,
"kWithFactor is only supported for kValueWidthInBits == 8");
static_assert(kValueWidthInBits == 1 || kValueWidthInBits % 8 == 0);
// kValueWidth should not be used if kValueWidthInBits == 1.
static constexpr int kValueWidth = kValueWidthInBits / 8;

// src_residual_bit_offsets_[i] is used to store the bit offset of the first byte (0-7)
// in src_chunks_[i] iff kValueWidthInBits == 1.
const int* src_residual_bit_offsets_ = NULLPTR;
felipecrv marked this conversation as resolved.
Show resolved Hide resolved
// Pre-computed pointers to the start of the values in each chunk.
const uint8_t* const* src_chunks_;
// Number indices resolved in chunk_location_vec_.
const int64_t idx_length_;
const TypedChunkLocation<IndexCType>* chunk_location_vec_;

uint8_t* out_;
int64_t factor_;

public:
void WriteValue(int64_t position) {
auto loc = chunk_location_vec_[position];
auto* chunk = src_chunks_[loc.chunk_index];
if constexpr (kValueWidthInBits == 1) {
auto src_offset = src_residual_bit_offsets_[loc.chunk_index];
bit_util::SetBitTo(out_, position,
bit_util::GetBit(chunk, src_offset + loc.index_in_chunk));
} else if constexpr (kWithFactor) {
const int64_t scaled_factor = kValueWidth * factor_;
memcpy(out_ + position * scaled_factor, chunk + loc.index_in_chunk * scaled_factor,
scaled_factor);
} else {
memcpy(out_ + position * kValueWidth, chunk + loc.index_in_chunk * kValueWidth,
kValueWidth);
}
}

void WriteZero(int64_t position) {
if constexpr (kValueWidthInBits == 1) {
bit_util::ClearBit(out_, position);
} else if constexpr (kWithFactor) {
const int64_t scaled_factor = kValueWidth * factor_;
memset(out_ + position * scaled_factor, 0, scaled_factor);
} else {
memset(out_ + position * kValueWidth, 0, kValueWidth);
}
}

void WriteZeroSegment(int64_t position, int64_t block_length) {
if constexpr (kValueWidthInBits == 1) {
bit_util::SetBitsTo(out_, position, block_length, false);
} else if constexpr (kWithFactor) {
const int64_t scaled_factor = kValueWidth * factor_;
memset(out_ + position * scaled_factor, 0, block_length * scaled_factor);
} else {
memset(out_ + position * kValueWidth, 0, block_length * kValueWidth);
}
}

bool IsSrcValid(const ChunkedValiditySpan<IndexCType>& src_validity,
const IndexCType* idx, int64_t position) const {
return src_validity.IsSrcValid(idx, position);
}

public:
GatherFromChunks(const int* src_residual_bit_offsets, const uint8_t* const* src_chunks,
const int64_t idx_length,
const TypedChunkLocation<IndexCType>* chunk_location_vec, uint8_t* out,
int64_t factor = 1)
: src_residual_bit_offsets_(src_residual_bit_offsets),
src_chunks_(src_chunks),
idx_length_(idx_length),
chunk_location_vec_(chunk_location_vec),
out_(out),
factor_(factor) {
assert(src_chunks && chunk_location_vec_ && out);
if constexpr (kValueWidthInBits == 1) {
assert(src_residual_bit_offsets);
}
assert((kWithFactor || factor == 1) &&
"When kWithFactor is false, the factor is assumed to be 1 at compile time");
}

ARROW_FORCE_INLINE int64_t Execute() { return this->ExecuteNoNulls(idx_length_); }

/// \pre If kOutputIsZeroInitialized, then this->out_ has to be zero initialized.
/// \pre Bits in out_is_valid have to always be zero initialized.
/// \post The bits for the valid elements (and only those) are set in out_is_valid.
/// \post If !kOutputIsZeroInitialized, then positions in this->_out containing null
/// elements have 0s written to them. This might be less efficient than
/// zero-initializing first and calling this->Execute() afterwards.
/// \return The number of valid elements in out.
template <bool kOutputIsZeroInitialized = false>
ARROW_FORCE_INLINE int64_t Execute(const ChunkedArray& src_validity,
const ArraySpan& idx_validity,
uint8_t* out_is_valid) {
assert(idx_length_ == idx_validity.length);
assert(out_is_valid);
assert(idx_validity.type->byte_width() == sizeof(IndexCType));
ChunkedValiditySpan src_validity_span{src_validity, chunk_location_vec_};
assert(src_validity_span.MayHaveNulls() || idx_validity.MayHaveNulls());
// idx=NULLPTR because when it's passed to IsSrcValid() defined above, it's not used.
return this->template ExecuteWithNulls<kOutputIsZeroInitialized, IndexCType>(
src_validity_span, idx_length_, /*idx=*/NULLPTR, idx_validity, out_is_valid);
}
};

} // namespace arrow::internal
3 changes: 3 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_selection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ std::shared_ptr<VectorFunction> MakeIndicesNonZeroFunction(std::string name,
VectorKernel kernel;
kernel.null_handling = NullHandling::OUTPUT_NOT_NULL;
kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
// "array_take" ensures that the output will be be chunked when at least one
// input is chunked, so we need to set this to false.
kernel.output_chunked = false;
kernel.exec = IndicesNonZeroExec;
kernel.exec_chunked = IndicesNonZeroExecChunked;
Expand Down Expand Up @@ -338,6 +340,7 @@ void RegisterVectorSelection(FunctionRegistry* registry) {
VectorKernel take_base;
take_base.init = TakeState::Init;
take_base.can_execute_chunkwise = false;
take_base.output_chunked = false;
RegisterSelectionFunction("array_take", array_take_doc, take_base,
std::move(take_kernels), GetDefaultTakeOptions(), registry);

Expand Down
15 changes: 10 additions & 5 deletions cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -894,18 +894,23 @@ Status ExtensionFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
}

// Transform filter to selection indices and then use Take.
Status FilterWithTakeExec(const ArrayKernelExec& take_exec, KernelContext* ctx,
Status FilterWithTakeExec(TakeKernelExec take_aaa_exec, KernelContext* ctx,
const ExecSpan& batch, ExecResult* out) {
std::shared_ptr<ArrayData> indices;
std::shared_ptr<ArrayData> indices_data;
RETURN_NOT_OK(GetTakeIndices(batch[1].array,
FilterState::Get(ctx).null_selection_behavior,
ctx->memory_pool())
.Value(&indices));
.Value(&indices_data));

KernelContext take_ctx(*ctx);
TakeState state{TakeOptions::NoBoundsCheck()};
take_ctx.SetState(&state);
ExecSpan take_batch({batch[0], ArraySpan(*indices)}, batch.length);
return take_exec(&take_ctx, take_batch, out);

ValuesSpan values(batch[0].array);
std::shared_ptr<ArrayData> out_data = out->array_data();
RETURN_NOT_OK(take_aaa_exec(&take_ctx, values, *indices_data, &out_data));
out->value = std::move(out_data);
return Status::OK();
}

// Due to the special treatment with their Take kernels, we filter Struct and SparseUnion
Expand Down
Loading
Loading