Skip to content

Commit

Permalink
Merge pull request #2104 from kuzudb/clean-arrow
Browse files Browse the repository at this point in the history
Clean up arrow code in node column chunk and copy node
  • Loading branch information
ray6080 committed Sep 28, 2023
2 parents d713e7e + b608e09 commit f05d348
Show file tree
Hide file tree
Showing 9 changed files with 8 additions and 466 deletions.
28 changes: 0 additions & 28 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ class ColumnChunk {
virtual void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend);

virtual void append(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

ColumnChunkMetadata flushBuffer(
BMFileHandle* dataFH, common::page_idx_t startPageIdx, const ColumnChunkMetadata& metadata);

Expand Down Expand Up @@ -140,22 +137,6 @@ class ColumnChunk {
// Initializes the data buffer. Is (and should be) only called in constructor.
virtual void initialize(common::offset_t capacity);

template<typename T>
void templateCopyArrowArray(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);
template<typename ARROW_TYPE>
void templateCopyStringArrowArray(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);
// TODO(Guodong/Ziyi): The conversion from string to values should be handled inside ReadFile.
// ARROW_TYPE can be either arrow::StringArray or arrow::LargeStringArray.
template<typename KU_TYPE, typename ARROW_TYPE>
void templateCopyValuesAsString(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

virtual inline common::page_idx_t getNumPagesForBuffer() const {
return getNumPagesForBytes(bufferSize);
}

common::offset_t getOffsetInBuffer(common::offset_t pos) const;

virtual void copyVectorToBuffer(common::ValueVector* vector, common::offset_t startPosInChunk);
Expand Down Expand Up @@ -199,9 +180,6 @@ class BoolColumnChunk : public ColumnChunk {

void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;

void append(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) override;

Expand Down Expand Up @@ -288,12 +266,6 @@ struct ColumnChunkFactory {
const common::LogicalType& dataType, common::CSVReaderConfig* csvReaderConfig = nullptr);
};

template<>
void ColumnChunk::templateCopyArrowArray<bool>(
arrow::Array* array, common::offset_t startPosInSegment, uint32_t numValuesToAppend);
template<>
void ColumnChunk::templateCopyArrowArray<uint8_t*>(
arrow::Array* array, common::offset_t startPosInSegment, uint32_t numValuesToAppend);
// BOOL
template<>
void ColumnChunk::setValueFromString<bool>(const char* value, uint64_t length, uint64_t pos);
Expand Down
9 changes: 0 additions & 9 deletions src/include/storage/store/string_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ class StringColumnChunk : public ColumnChunk {

void resetToEmpty() final;
void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;
void append(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;

Expand All @@ -36,13 +34,6 @@ class StringColumnChunk : public ColumnChunk {
inline common::offset_t getLastOffsetInPage() { return overflowCursor.offsetInPage; }

private:
template<typename T>
void templateCopyStringArrowArray(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);
template<typename KU_TYPE, typename ARROW_TYPE>
void templateCopyStringValues(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

void appendStringColumnChunk(StringColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend);

Expand Down
7 changes: 0 additions & 7 deletions src/include/storage/store/struct_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ class StructColumnChunk : public ColumnChunk {
common::LogicalType dataType, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig);

protected:
void append(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;
void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;
Expand All @@ -24,11 +22,6 @@ class StructColumnChunk : public ColumnChunk {
void setValueToStructField(common::offset_t pos, const std::string& structFieldValue,
common::struct_field_idx_t structFiledIdx);
void write(const common::Value& val, uint64_t posToWrite) final;
void copyStructFromArrowStruct(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);
template<typename ARROW_TYPE>
void copyStructFromArrowString(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);
};

} // namespace storage
Expand Down
26 changes: 0 additions & 26 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,35 +52,9 @@ class VarListColumnChunk : public ColumnChunk {
}

private:
void append(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) override;

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;

void copyVarListFromArrowString(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

template<typename T>
void copyVarListFromArrowList(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) {
auto listArray = (T*)array;
auto dataChunkOffsetToAppend = varListDataColumnChunk.getNumValues();
auto curListOffset = varListDataColumnChunk.getNumValues();
for (auto i = 0u; i < numValuesToAppend; i++) {
nullChunk->setNull(i + startPosInChunk, listArray->IsNull(i));
auto length = listArray->value_length(i);
curListOffset += length;
setValue(curListOffset, i + startPosInChunk);
}
auto startOffset = listArray->value_offset(0);
auto endOffset = listArray->value_offset(numValuesToAppend);
varListDataColumnChunk.resizeBuffer(curListOffset);
varListDataColumnChunk.dataColumnChunk->append(
listArray->values().get(), dataChunkOffsetToAppend, endOffset - startOffset);
numValues += numValuesToAppend;
}

void write(const common::Value& listVal, uint64_t posToWrite) override;

private:
Expand Down
21 changes: 7 additions & 14 deletions src/processor/operator/persistent/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,14 @@ void CopyNode::executeInternal(ExecutionContext* context) {

void CopyNode::sliceDataChunk(
const DataChunk& dataChunk, const std::vector<DataPos>& dataColumnPoses, offset_t offset) {
if (dataChunk.valueVectors[0]->dataType.getPhysicalType() == PhysicalTypeID::ARROW_COLUMN) {
for (auto& dataColumnPos : dataColumnPoses) {
ArrowColumnVector::slice(
dataChunk.valueVectors[dataColumnPos.valueVectorPos].get(), offset);
}
} else {
auto slicedSelVector = std::make_unique<SelectionVector>(DEFAULT_VECTOR_CAPACITY);
slicedSelVector->resetSelectorToValuePosBufferWithSize(
dataChunk.state->selVector->selectedSize - offset);
for (auto i = 0u; i < slicedSelVector->selectedSize; i++) {
slicedSelVector->selectedPositions[i] =
dataChunk.state->selVector->selectedPositions[i + offset];
}
dataChunk.state->selVector = std::move(slicedSelVector);
auto slicedSelVector = std::make_unique<SelectionVector>(DEFAULT_VECTOR_CAPACITY);
slicedSelVector->resetSelectorToValuePosBufferWithSize(
dataChunk.state->selVector->selectedSize - offset);
for (auto i = 0u; i < slicedSelVector->selectedSize; i++) {
slicedSelVector->selectedPositions[i] =
dataChunk.state->selVector->selectedPositions[i + offset];
}
dataChunk.state->selVector = std::move(slicedSelVector);
}

void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx,
Expand Down
Loading

0 comments on commit f05d348

Please sign in to comment.