Skip to content

Commit

Permalink
Take: Implement the FixedWidthTakeChunkedExec() kernel using GatherFr…
Browse files Browse the repository at this point in the history
…omChunks
  • Loading branch information
felipecrv committed Jun 10, 2024
1 parent 4a484e7 commit f4b4e12
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 8 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/kernels/vector_selection_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Status MapFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
Status VarBinaryTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status LargeVarBinaryTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status FixedWidthTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status FixedWidthTakeChunkedExec(KernelContext*, const ExecBatch&, Datum*);
Status ListTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status LargeListTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status FSLTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Expand Down
147 changes: 139 additions & 8 deletions cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ using internal::BinaryBitBlockCounter;
using internal::BitBlockCount;
using internal::BitBlockCounter;
using internal::CheckIndexBounds;
using internal::ChunkResolver;
using internal::OptionalBitBlockCounter;

namespace compute {
Expand Down Expand Up @@ -363,6 +364,95 @@ class ValuesSpan {
}
};

struct ChunkedFixedWidthValuesSpan {
private:
// src_residual_bit_offsets_[i] is used to store the bit offset of the first byte (0-7)
// in src_chunks_[i] iff kValueWidthInBits == 1.
std::vector<int> src_residual_bit_offsets;
// Pre-computed pointers to the start of the values in each chunk.
std::vector<const uint8_t*> src_chunks;

public:
ARROW_NOINLINE
explicit ChunkedFixedWidthValuesSpan(const ChunkedArray& values) {
bool chunk_values_are_byte_sized = util::FixedWidthInBytes(*values.type()) >= 0;
if (chunk_values_are_byte_sized) {
src_residual_bit_offsets.resize(values.num_chunks());
}
src_chunks.resize(values.num_chunks());

for (int i = 0; i < values.num_chunks(); ++i) {
const ArraySpan chunk{*values.chunk(i)->data()};
DCHECK(util::IsFixedWidthLike(chunk));

auto offset_pointer = util::OffsetPointerOfFixedBitWidthValues(chunk);
if (chunk_values_are_byte_sized) {
src_residual_bit_offsets[i] = offset_pointer.first;
} else {
DCHECK_EQ(offset_pointer.first, 0);
}
src_chunks[i] = offset_pointer.second;
}
}

ARROW_NOINLINE
~ChunkedFixedWidthValuesSpan() = default;

const int* src_residual_bit_offsets_data() const {
return src_residual_bit_offsets.empty() ? nullptr : src_residual_bit_offsets.data();
}

const uint8_t* const* src_chunks_data() const { return src_chunks.data(); }
};

/// \brief Logical indices resolved against a chunked array.
struct ResolvedIndicesState {
private:
std::unique_ptr<Buffer> chunk_index_vec_buffer = NULLPTR;
std::unique_ptr<Buffer> index_in_chunk_vec_buffer = NULLPTR;

ARROW_NOINLINE
Status AllocateBuffers(int64_t n_indices, int64_t sizeof_index_type, MemoryPool* pool) {
ARROW_ASSIGN_OR_RAISE(chunk_index_vec_buffer,
AllocateBuffer(n_indices * sizeof_index_type, pool));
ARROW_ASSIGN_OR_RAISE(index_in_chunk_vec_buffer,
AllocateBuffer(n_indices * sizeof_index_type, pool));
return Status::OK();
}

public:
ARROW_NOINLINE
~ResolvedIndicesState() = default;

template <typename IndexCType>
Status InitWithIndices(const ArrayVector& chunks, int64_t idx_length,
const IndexCType* idx, MemoryPool* pool) {
RETURN_NOT_OK(AllocateBuffers(idx_length, sizeof(IndexCType), pool));
auto* chunk_index_vec = chunk_index_vec_buffer->mutable_data_as<IndexCType>();
auto* index_in_chunk_vec = index_in_chunk_vec_buffer->mutable_data_as<IndexCType>();
// All indices are resolved in one go without checking the validity bitmap.
// This is OK as long the output corresponding to the invalid indices is not used.
ChunkResolver resolver(chunks);
bool enough_precision = resolver.ResolveMany<IndexCType>(
/*n_indices=*/idx_length, /*logical_index_vec=*/idx, chunk_index_vec,
/*chunk_hint=*/static_cast<IndexCType>(0), index_in_chunk_vec);
if (ARROW_PREDICT_FALSE(!enough_precision)) {
return Status::IndexError("IndexCType is too small");
}
return Status::OK();
}

template <typename IndexCType>
const IndexCType* chunk_index_vec() const {
return chunk_index_vec_buffer->data_as<IndexCType>();
}

template <typename IndexCType>
const IndexCType* index_in_chunk_vec() const {
return index_in_chunk_vec_buffer->data_as<IndexCType>();
}
};

// ----------------------------------------------------------------------
// Implement optimized take for primitive types from boolean to
// 1/2/4/8/16/32-byte C-type based types and fixed-size binary (0 or more
Expand Down Expand Up @@ -403,9 +493,9 @@ struct FixedWidthTakeImpl {
(factor > 0 && kValueWidthInBits == 8 && // factors are used with bytes
static_cast<int64_t>(factor * kValueWidthInBits) == bit_width));
#endif
// XXX: support values.is_chunked() case
assert(!values.is_chunked());
return Exec(ctx, values.array(), indices, out_arr, factor);
return values.is_chunked()
? ChunkedExec(ctx, values.chunked_array(), indices, out_arr, factor)
: Exec(ctx, values.array(), indices, out_arr, factor);
}

static Status Exec(KernelContext* ctx, const ArraySpan& values,
Expand Down Expand Up @@ -439,6 +529,41 @@ struct FixedWidthTakeImpl {
out_arr->null_count = out_arr->length - valid_count;
return Status::OK();
}

static Status ChunkedExec(KernelContext* ctx, const ChunkedArray& values,
const ArraySpan& indices, ArrayData* out_arr,
int64_t factor) {
const bool out_has_validity = values.null_count() > 0 || indices.MayHaveNulls();

ChunkedFixedWidthValuesSpan chunked_values{values};
ResolvedIndicesState resolved_idx;
RETURN_NOT_OK(resolved_idx.InitWithIndices<IndexCType>(
/*chunks=*/values.chunks(), /*idx_length=*/indices.length,
/*idx=*/indices.GetValues<IndexCType>(1), ctx->memory_pool()));

int64_t valid_count = 0;
arrow::internal::GatherFromChunks<kValueWidthInBits, IndexCType, WithFactor::value>
gather{chunked_values.src_residual_bit_offsets_data(),
chunked_values.src_chunks_data(),
indices.length,
resolved_idx.chunk_index_vec<IndexCType>(),
resolved_idx.index_in_chunk_vec<IndexCType>(),
/*out=*/util::MutableFixedWidthValuesPointer(out_arr),
factor};
if (out_has_validity) {
DCHECK_EQ(out_arr->offset, 0);
// out_is_valid must be zero-initiliazed, because Gather::Execute
// saves time by not having to ClearBit on every null element.
auto out_is_valid = out_arr->GetMutableValues<uint8_t>(0);
memset(out_is_valid, 0, bit_util::BytesForBits(out_arr->length));
valid_count = gather.template Execute<OutputIsZeroInitialized::value>(
/*src_validity=*/values, /*idx_validity=*/indices, out_is_valid);
} else {
valid_count = gather.Execute();
}
out_arr->null_count = out_arr->length - valid_count;
return Status::OK();
}
};

template <template <typename...> class TakeImpl, typename... Args>
Expand Down Expand Up @@ -532,6 +657,13 @@ Status FixedWidthTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult*
return FixedWidthTakeExecImpl(ctx, values, batch[1].array, out_arr);
}

Status FixedWidthTakeChunkedExec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
ValuesSpan values{batch[0].chunked_array()};
auto& indices = batch[1].array();
auto* out_arr = out->mutable_array();
return FixedWidthTakeExecImpl(ctx, values, *indices, out_arr);
}

namespace {

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -867,16 +999,15 @@ void PopulateTakeKernels(std::vector<SelectionKernelData>* out) {

*out = {
{InputType(match::Primitive()), take_indices, FixedWidthTakeExec,
// XXX: doing this for testing SpecialTakeChunkedExec
SpecialTakeChunkedExecFunctor<
FixedWidthTakeExec,
GenericTakeChunkedExecFunctor<FixedWidthTakeExec>::Exec>::Exec},
SpecialTakeChunkedExecFunctor<FixedWidthTakeExec,
FixedWidthTakeChunkedExec>::Exec},
{InputType(match::BinaryLike()), take_indices, VarBinaryTakeExec,
GenericTakeChunkedExecFunctor<VarBinaryTakeExec>::Exec},
{InputType(match::LargeBinaryLike()), take_indices, LargeVarBinaryTakeExec,
GenericTakeChunkedExecFunctor<LargeVarBinaryTakeExec>::Exec},
{InputType(match::FixedSizeBinaryLike()), take_indices, FixedWidthTakeExec,
GenericTakeChunkedExecFunctor<FixedWidthTakeExec>::Exec},
SpecialTakeChunkedExecFunctor<FixedWidthTakeExec,
FixedWidthTakeChunkedExec>::Exec},
{InputType(null()), take_indices, NullTakeExec,
GenericTakeChunkedExecFunctor<NullTakeExec>::Exec},
{InputType(Type::DICTIONARY), take_indices, DictionaryTake,
Expand Down

0 comments on commit f4b4e12

Please sign in to comment.