Skip to content

Commit

Permalink
Merge pull request #3017 from kuzudb/remove-append-one
Browse files Browse the repository at this point in the history
Combine append(ValueVector) with appendOne
  • Loading branch information
ray6080 committed Mar 11, 2024
2 parents f4a95ab + c149349 commit 67e9204
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 131 deletions.
10 changes: 5 additions & 5 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <functional>

#include "common/constants.h"
#include "common/data_chunk/sel_vector.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "storage/buffer_manager/bm_file_handle.h"
Expand Down Expand Up @@ -54,8 +55,7 @@ class ColumnChunk {
// Note that the startPageIdx is not known, so it will always be common::INVALID_PAGE_IDX
virtual ColumnChunkMetadata getMetadataToFlush() const;

virtual void append(common::ValueVector* vector);
virtual void appendOne(common::ValueVector* vector, common::vector_idx_t pos);
virtual void append(common::ValueVector* vector, common::SelectionVector& selVector);
virtual void append(
ColumnChunk* other, common::offset_t startPosInOtherChunk, uint32_t numValuesToAppend);

Expand Down Expand Up @@ -109,7 +109,8 @@ class ColumnChunk {

common::offset_t getOffsetInBuffer(common::offset_t pos) const;

virtual void copyVectorToBuffer(common::ValueVector* vector, common::offset_t startPosInChunk);
virtual void copyVectorToBuffer(common::ValueVector* vector, common::offset_t startPosInChunk,
common::SelectionVector& selVector);

private:
uint64_t getBufferSize(uint64_t capacity_) const;
Expand Down Expand Up @@ -150,8 +151,7 @@ class BoolColumnChunk : public ColumnChunk {
// Booleans are always bitpacked, but this can also enable constant compression
enableCompression, hasNullChunk) {}

void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
void append(common::ValueVector* vector, common::SelectionVector& sel) final;
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) override;
void write(common::ValueVector* vector, common::offset_t offsetInVector,
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/store/string_column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "common/assert.h"
#include "common/data_chunk/sel_vector.h"
#include "common/types/types.h"
#include "storage/store/column_chunk.h"
#include "storage/store/dictionary_chunk.h"
Expand All @@ -14,8 +15,7 @@ class StringColumnChunk : public ColumnChunk {
common::LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory);

void resetToEmpty() final;
void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
void append(common::ValueVector* vector, common::SelectionVector& selVector) final;
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) final;

Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/store/struct_column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/data_chunk/sel_vector.h"
#include "common/types/internal_id_t.h"
#include "common/types/types.h"
#include "storage/store/column_chunk.h"
Expand All @@ -22,8 +23,7 @@ class StructColumnChunk : public ColumnChunk {
protected:
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) final;
void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
void append(common::ValueVector* vector, common::SelectionVector& selVector) final;

void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) final;
Expand Down
8 changes: 4 additions & 4 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/data_chunk/sel_vector.h"
#include "storage/store/column_chunk.h"

namespace kuzu {
Expand All @@ -17,8 +18,8 @@ struct VarListDataColumnChunk {

void resizeBuffer(uint64_t numValues);

inline void append(common::ValueVector* dataVector) const {
dataColumnChunk->append(dataVector);
inline void append(common::ValueVector* dataVector, common::SelectionVector& selVector) const {
dataColumnChunk->append(dataVector, selVector);
}

inline uint64_t getNumValues() const { return dataColumnChunk->getNumValues(); }
Expand All @@ -36,8 +37,7 @@ class VarListColumnChunk : public ColumnChunk {

void resetToEmpty() final;

void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
void append(common::ValueVector* vector, common::SelectionVector& selVector) final;
// Note: `write` assumes that no `append` will be called afterward.
void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) final;
Expand Down
15 changes: 8 additions & 7 deletions src/processor/operator/partitioner.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#include "processor/operator/partitioner.h"

#include <cstdint>

#include "common/constants.h"
#include "common/data_chunk/sel_vector.h"
#include "processor/execution_context.h"
#include "storage/store/column_chunk.h"
#include "storage/store/node_table.h"
Expand Down Expand Up @@ -153,6 +152,8 @@ void Partitioner::executeInternal(ExecutionContext* context) {

void Partitioner::copyDataToPartitions(
partition_idx_t partitioningIdx, DataChunk* chunkToCopyFrom) {
SelectionVector selVector(1);
selVector.resetSelectorToValuePosBufferWithSize(1);
for (auto i = 0u; i < chunkToCopyFrom->state->selVector->selectedSize; i++) {
auto posToCopyFrom = chunkToCopyFrom->state->selVector->selectedPositions[i];
auto partitionIdx = partitionIdxes->getValue<partition_idx_t>(posToCopyFrom);
Expand All @@ -164,16 +165,16 @@ void Partitioner::copyDataToPartitions(
partition.chunks.back()[0]->getCapacity()) {
partition.chunks.emplace_back();
partition.chunks.back().reserve(chunkToCopyFrom->getNumValueVectors());
for (auto i = 0u; i < chunkToCopyFrom->getNumValueVectors(); i++) {
for (auto j = 0u; j < chunkToCopyFrom->getNumValueVectors(); j++) {
partition.chunks.back().emplace_back(ColumnChunkFactory::createColumnChunk(
chunkToCopyFrom->getValueVector(i)->dataType, false /*enableCompression*/,
chunkToCopyFrom->getValueVector(j)->dataType, false /*enableCompression*/,
Partitioner::CHUNK_SIZE));
}
}
KU_ASSERT(partition.chunks.back().size() == chunkToCopyFrom->getNumValueVectors());
for (auto i = 0u; i < chunkToCopyFrom->getNumValueVectors(); i++) {
partition.chunks.back()[i]->appendOne(
chunkToCopyFrom->getValueVector(i).get(), posToCopyFrom);
selVector.selectedPositions[0] = posToCopyFrom;
for (auto j = 0u; j < chunkToCopyFrom->getNumValueVectors(); j++) {
partition.chunks.back()[j]->append(chunkToCopyFrom->getValueVector(j).get(), selVector);
}
}
}
Expand Down
70 changes: 28 additions & 42 deletions src/storage/store/column_chunk.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "storage/store/column_chunk.h"

#include "common/data_chunk/sel_vector.h"
#include "common/exception/copy.h"
#include "common/types/internal_id_t.h"
#include "common/types/types.h"
Expand Down Expand Up @@ -207,20 +208,10 @@ void ColumnChunk::resetToEmpty() {
numValues = 0;
}

void ColumnChunk::append(ValueVector* vector) {
void ColumnChunk::append(ValueVector* vector, SelectionVector& selVector) {
KU_ASSERT(vector->dataType.getPhysicalType() == dataType.getPhysicalType());
copyVectorToBuffer(vector, numValues);
numValues += vector->state->selVector->selectedSize;
}

void ColumnChunk::appendOne(common::ValueVector* vector, common::vector_idx_t pos) {
KU_ASSERT(vector->dataType.getPhysicalType() == dataType.getPhysicalType());
KU_ASSERT(numValues < capacity);
memcpy(buffer.get() + numValues * numBytesPerValue, vector->getData() + pos * numBytesPerValue,
1 * numBytesPerValue);
// TODO(Guodong): Should be wrapped into nullChunk->appendOne(vector);
nullChunk->setNull(this->numValues, vector->isNull(pos));
numValues += 1;
copyVectorToBuffer(vector, numValues, selVector);
numValues += selVector.selectedSize;
}

void ColumnChunk::append(
Expand Down Expand Up @@ -325,7 +316,7 @@ void ColumnChunk::populateWithDefaultVal(ValueVector* defaultValueVector) {
auto numValuesToAppend =
std::min(DEFAULT_VECTOR_CAPACITY, numValuesToPopulate - numValuesAppended);
defaultValueVector->state->selVector->selectedSize = numValuesToAppend;
append(defaultValueVector);
append(defaultValueVector, *defaultValueVector->state->selVector);
numValuesAppended += numValuesToAppend;
}
}
Expand All @@ -340,20 +331,20 @@ offset_t ColumnChunk::getOffsetInBuffer(offset_t pos) const {
return offsetInBuffer;
}

void ColumnChunk::copyVectorToBuffer(ValueVector* vector, offset_t startPosInChunk) {
void ColumnChunk::copyVectorToBuffer(
ValueVector* vector, offset_t startPosInChunk, SelectionVector& selVector) {
auto bufferToWrite = buffer.get() + startPosInChunk * numBytesPerValue;
KU_ASSERT(startPosInChunk + vector->state->selVector->selectedSize <= capacity);
KU_ASSERT(startPosInChunk + selVector.selectedSize <= capacity);
auto vectorDataToWriteFrom = vector->getData();
if (vector->state->selVector->isUnfiltered()) {
memcpy(bufferToWrite, vectorDataToWriteFrom,
vector->state->selVector->selectedSize * numBytesPerValue);
if (selVector.isUnfiltered()) {
memcpy(bufferToWrite, vectorDataToWriteFrom, selVector.selectedSize * numBytesPerValue);
// TODO(Guodong): Should be wrapped into nullChunk->append(vector);
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
for (auto i = 0u; i < selVector.selectedSize; i++) {
nullChunk->setNull(startPosInChunk + i, vector->isNull(i));
}
} else {
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
auto pos = vector->state->selVector->selectedPositions[i];
for (auto i = 0u; i < selVector.selectedSize; i++) {
auto pos = selVector.selectedPositions[i];
// TODO(Guodong): Should be wrapped into nullChunk->append(vector);
nullChunk->setNull(startPosInChunk + i, vector->isNull(pos));
memcpy(bufferToWrite, vectorDataToWriteFrom + pos * numBytesPerValue, numBytesPerValue);
Expand Down Expand Up @@ -416,21 +407,14 @@ uint64_t ColumnChunk::getBufferSize(uint64_t capacity_) const {
}
}

void BoolColumnChunk::append(ValueVector* vector) {
void BoolColumnChunk::append(ValueVector* vector, SelectionVector& selVector) {
KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::BOOL);
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
auto pos = vector->state->selVector->selectedPositions[i];
for (auto i = 0u; i < selVector.selectedSize; i++) {
auto pos = selVector.selectedPositions[i];
nullChunk->setNull(numValues + i, vector->isNull(pos));
NullMask::setNull((uint64_t*)buffer.get(), numValues + i, vector->getValue<bool>(pos));
}
numValues += vector->state->selVector->selectedSize;
}

void BoolColumnChunk::appendOne(ValueVector* vector, vector_idx_t pos) {
KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::BOOL);
nullChunk->setNull(numValues, vector->isNull(pos));
NullMask::setNull((uint64_t*)buffer.get(), numValues, vector->getValue<bool>(pos));
numValues += 1;
numValues += selVector.selectedSize;
}

void BoolColumnChunk::append(
Expand Down Expand Up @@ -553,10 +537,11 @@ class FixedListColumnChunk : public ColumnChunk {
numValues = offsetInChunk >= numValues ? offsetInChunk + 1 : numValues;
}

void copyVectorToBuffer(ValueVector* vector, offset_t startPosInChunk) final {
void copyVectorToBuffer(
ValueVector* vector, offset_t startPosInChunk, SelectionVector& selVector) final {
auto vectorDataToWriteFrom = vector->getData();
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
auto pos = vector->state->selVector->selectedPositions[i];
for (auto i = 0u; i < selVector.selectedSize; i++) {
auto pos = selVector.selectedPositions[i];
nullChunk->setNull(startPosInChunk + i, vector->isNull(pos));
memcpy(buffer.get() + getOffsetInBuffer(startPosInChunk + i),
vectorDataToWriteFrom + pos * numBytesPerValue, numBytesPerValue);
Expand All @@ -570,16 +555,17 @@ class InternalIDColumnChunk final : public ColumnChunk {
explicit InternalIDColumnChunk(uint64_t capacity)
: ColumnChunk(*LogicalType::INT64(), capacity, false /*enableCompression*/) {}

void append(ValueVector* vector) override {
void append(ValueVector* vector, common::SelectionVector& selVector) override {
KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::INTERNAL_ID);
copyVectorToBuffer(vector, numValues);
numValues += vector->state->selVector->selectedSize;
copyVectorToBuffer(vector, numValues, selVector);
numValues += selVector.selectedSize;
}

void copyVectorToBuffer(ValueVector* vector, offset_t startPosInChunk) override {
void copyVectorToBuffer(ValueVector* vector, offset_t startPosInChunk,
common::SelectionVector& selVector) override {
auto relIDsInVector = (internalID_t*)vector->getData();
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
auto pos = vector->state->selVector->selectedPositions[i];
for (auto i = 0u; i < selVector.selectedSize; i++) {
auto pos = selVector.selectedPositions[i];
nullChunk->setNull(startPosInChunk + i, vector->isNull(pos));
memcpy(buffer.get() + (startPosInChunk + i) * numBytesPerValue,
&relIDsInVector[pos].offset, numBytesPerValue);
Expand Down
6 changes: 4 additions & 2 deletions src/storage/store/node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ uint64_t NodeGroup::append(const std::vector<ValueVector*>& columnVectors,
serialSkip++;
continue;
}
KU_ASSERT(chunk->getDataType() == columnVectors[i - serialSkip]->dataType);
chunk->append(columnVectors[i - serialSkip]);
KU_ASSERT((i - serialSkip) < columnVectors.size());
auto columnVector = columnVectors[i - serialSkip];
KU_ASSERT(chunk->getDataType() == columnVector->dataType);
chunk->append(columnVector, *columnVector->state->selVector);
}
columnState->selVector->selectedSize = originalSize;
numRows += numValuesToAppendInChunk;
Expand Down
29 changes: 13 additions & 16 deletions src/storage/store/string_column_chunk.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "storage/store/string_column_chunk.h"

#include "common/data_chunk/sel_vector.h"
#include "storage/store/column_chunk.h"
#include "storage/store/dictionary_chunk.h"

Expand All @@ -20,24 +21,20 @@ void StringColumnChunk::resetToEmpty() {
dictionaryChunk->resetToEmpty();
}

void StringColumnChunk::append(ValueVector* vector) {
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
void StringColumnChunk::append(ValueVector* vector, SelectionVector& selVector) {
for (auto i = 0u; i < selVector.selectedSize; i++) {
// index is stored in main chunk, data is stored in the data chunk
auto pos = vector->state->selVector->selectedPositions[i];
appendOne(vector, pos);
}
}

void StringColumnChunk::appendOne(common::ValueVector* vector, common::vector_idx_t pos) {
KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::STRING);
// index is stored in main chunk, data is stored in the data chunk
nullChunk->setNull(numValues, vector->isNull(pos));
auto dstPos = numValues++;
if (vector->isNull(pos)) {
return;
auto pos = selVector.selectedPositions[i];
KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::STRING);
// index is stored in main chunk, data is stored in the data chunk
nullChunk->setNull(numValues, vector->isNull(pos));
auto dstPos = numValues++;
if (vector->isNull(pos)) {
continue;
}
auto kuString = vector->getValue<ku_string_t>(pos);
setValueFromString(kuString.getAsStringView(), dstPos);
}
auto kuString = vector->getValue<ku_string_t>(pos);
setValueFromString(kuString.getAsStringView(), dstPos);
}

void StringColumnChunk::append(
Expand Down
23 changes: 6 additions & 17 deletions src/storage/store/struct_column_chunk.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "storage/store/struct_column_chunk.h"

#include "common/data_chunk/sel_vector.h"
#include "common/types/internal_id_t.h"
#include "common/types/types.h"
#include "storage/store/column_chunk.h"
Expand Down Expand Up @@ -41,27 +42,15 @@ void StructColumnChunk::append(
numValues += numValuesToAppend;
}

void StructColumnChunk::append(ValueVector* vector) {
void StructColumnChunk::append(ValueVector* vector, SelectionVector& selVector) {
auto numFields = StructType::getNumFields(&dataType);
for (auto i = 0u; i < numFields; i++) {
childChunks[i]->append(StructVector::getFieldVector(vector, i).get());
childChunks[i]->append(StructVector::getFieldVector(vector, i).get(), selVector);
}
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
nullChunk->setNull(
numValues + i, vector->isNull(vector->state->selVector->selectedPositions[i]));
for (auto i = 0u; i < selVector.selectedSize; i++) {
nullChunk->setNull(numValues + i, vector->isNull(selVector.selectedPositions[i]));
}
numValues += vector->state->selVector->selectedSize;
}

void StructColumnChunk::appendOne(ValueVector* vector, vector_idx_t pos) {
auto numFields = StructType::getNumFields(&dataType);
for (auto i = 0u; i < numFields; i++) {
childChunks[i]->appendOne(StructVector::getFieldVector(vector, i).get(), pos);
}
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
nullChunk->setNull(numValues + i, vector->isNull(pos));
}
numValues += 1;
numValues += selVector.selectedSize;
}

void StructColumnChunk::resize(uint64_t newCapacity) {
Expand Down
Loading

0 comments on commit 67e9204

Please sign in to comment.