Skip to content

Commit

Permalink
improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
hououou committed Mar 20, 2024
1 parent e4571a8 commit a8f15f7
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 28 deletions.
1 change: 1 addition & 0 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Column {
friend class StringColumn;
friend class VarListLocalColumn;
friend class StructColumn;
friend class VarListColumn;

public:
struct ReadState {
Expand Down
46 changes: 20 additions & 26 deletions src/include/storage/store/var_list_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@
#include "column.h"
#include "var_list_column_chunk.h"

// List is a nested data type which is stored as two chunks:
// List is a nested data type which is stored as three chunks:
// 1. Offset column (type: INT64). Using offset to partition the data column into multiple lists.
// 2. Data column. Stores the actual data of the list.
// 2. Size column. Stores the size of each list.
// 3. Data column. Stores the actual data of the list.
// Similar to other data types, nulls are stored in the null column.
// Example layout for list of INT64:
// Four lists: [4,7,8,12], null, [2, 3], []
// Offset column: [4, 4, 6, 6]
// Size column: [4, 0, 2, 0]
// data column: [4, 7, 8, 12, 2, 3]
// When reading the data, we firstly read the offset column and utilize the offset column to find
// the data column partitions.
// 1st list(offset 4): Since it is the first element, the start offset is a constant 0, and the end
// offset is 4. Its data is stored at position 0-4 in the data column.
// 2nd list(offset 4): By reading the null column, we know that the 2nd list is null. So we don't
// need to read from the data column.
// 3rd list(offset 6): The start offset is 4(by looking up the offset for the 2nd list), and the end
// offset is 6. Its data is stored at position 4-6 in the data column.
// 4th list(offset 6): The start offset is 6(by looking up the offset for the 3rd list), and the end
// offset is 6. Its data is stored at position 6-6 in the data column (empty list).
// When updating the data, we first append the data to the data column, and then update the offset
// and size. For example, we want to update [4,7,8,12] to [1,3,4]. We first append [1,3,4] to the
// data column, and then update the offset and size. The layout after the update: Offset column: [9,
// 4, 6, 6] Size column: [3, 0, 3, 0] data column: [4, 7, 8, 12, 2, 3, 1, 3, 4] This design is good
// for writing performance. But it is bad for scan performance since it may cause random access to
// the data column. Worse, the data column will be increased every update/insert. To balance the
// write performance and read performance, we rewrite the whole var list column in ascending order
// when the size of data column is larger than a threshold(Currently we use capacity/2).

namespace kuzu {
namespace storage {
Expand Down Expand Up @@ -77,20 +77,6 @@ class VarListColumn : public Column {
void scanFiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::ValueVector* offsetVector, const ListOffsetSizeInfo& listOffsetInfoInStorage);

inline bool canCommitInPlace(transaction::Transaction*, common::node_group_idx_t,
const ChunkCollection&, const offset_to_row_idx_t&, const ChunkCollection&,
const offset_to_row_idx_t&) override {
// Always perform out-of-place commit for VAR_LIST columns.
return false;
}
inline bool canCommitInPlace(transaction::Transaction* /*transaction*/,
common::node_group_idx_t /*nodeGroupIdx*/,
const std::vector<common::offset_t>& /*dstOffsets*/, ColumnChunk* /*chunk*/,
common::offset_t /*startOffset*/) override {
// Always perform out-of-place commit for VAR_LIST columns.
return false;
}

void checkpointInMemory() final;
void rollbackInMemory() final;

Expand All @@ -104,6 +90,14 @@ class VarListColumn : public Column {
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInNodeGroup,
common::offset_t endOffsetInNodeGroup);

void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks,
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) override;
void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const std::vector<common::offset_t>& dstOffsets,
ColumnChunk* chunk, common::offset_t startSrcOffset) override;

private:
std::unique_ptr<Column> sizeColumn;
std::unique_ptr<Column> dataColumn;
Expand Down
4 changes: 4 additions & 0 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class VarListColumnChunk final : public ColumnChunk {
}

void resetOffset();
void resetFromOtherChunk(VarListColumnChunk* other);
void finalize() override;
bool isOffsetSortedAscending(uint64_t startPos, uint64_t endPos) const;

protected:
void copyListValues(const common::list_entry_t& entry, common::ValueVector* dataVector);
Expand All @@ -103,6 +106,7 @@ class VarListColumnChunk final : public ColumnChunk {
protected:
std::unique_ptr<ColumnChunk> sizeColumnChunk;
std::unique_ptr<VarListDataColumnChunk> varListDataColumnChunk;
bool checkOrder;
};

} // namespace storage
Expand Down
3 changes: 2 additions & 1 deletion src/storage/store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,8 @@ void Column::commitColumnChunkOutOfPlace(Transaction* transaction, node_group_id
auto chunkMeta = getMetadata(nodeGroupIdx, transaction->getType());
// TODO(Guodong): Should consider caching the scanned column chunk to avoid redundant
// scans in the same transaction.
auto columnChunk = getEmptyChunkForCommit(chunkMeta.numValues + dstOffsets.size());
auto columnChunk =
getEmptyChunkForCommit(1.5 * std::bit_ceil(chunkMeta.numValues + dstOffsets.size()));
scan(transaction, nodeGroupIdx, columnChunk.get());
for (auto i = 0u; i < dstOffsets.size(); i++) {
columnChunk->write(chunk, srcOffset + i, dstOffsets[i], 1 /* numValues */);
Expand Down
78 changes: 78 additions & 0 deletions src/storage/store/var_list_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,5 +303,83 @@ ListOffsetSizeInfo VarListColumn::getListOffsetSizeInfo(Transaction* transaction
return {numOffsetsToRead, std::move(offsetColumnChunk), std::move(sizeColumnChunk)};
}

void VarListColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_t nodeGroupIdx,
const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo) {
auto currentNumNodeGroups = metadataDA->getNumElements(transaction->getType());
auto isNewNodeGroup = nodeGroupIdx >= currentNumNodeGroups;
if (isNewNodeGroup) {
commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, localInsertChunks,
insertInfo, localUpdateChunks, updateInfo, deleteInfo);
} else {
auto columnChunk = getEmptyChunkForCommit(common::StorageConstants::NODE_GROUP_SIZE);
std::vector<offset_t> dstOffsets;
for (auto& [offsetInDstChunk, rowIdx] : updateInfo) {
auto [chunkIdx, offsetInLocalChunk] =
LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx);
auto localUpdateChunk = localUpdateChunks[chunkIdx];
dstOffsets.push_back(offsetInDstChunk);
columnChunk->append(localUpdateChunk, offsetInLocalChunk, 1);
}
for (auto& [offsetInDstChunk, rowIdx] : insertInfo) {
auto [chunkIdx, offsetInLocalChunk] =
LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx);
auto localInsertChunk = localInsertChunks[chunkIdx];
dstOffsets.push_back(offsetInDstChunk);
columnChunk->append(localInsertChunk, offsetInLocalChunk, 1);
}
prepareCommitForChunk(transaction, nodeGroupIdx, dstOffsets, columnChunk.get(), 0);
}
}

void VarListColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_t nodeGroupIdx,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk, offset_t startSrcOffset) {
auto currentNumNodeGroups = metadataDA->getNumElements(transaction->getType());
auto isNewNodeGroup = nodeGroupIdx >= currentNumNodeGroups;
if (isNewNodeGroup) {
commitColumnChunkOutOfPlace(
transaction, nodeGroupIdx, isNewNodeGroup, dstOffsets, chunk, startSrcOffset);
} else {
// we separate the commit into three parts: offset chunk commit, size column chunk commit,
// data column chunk
auto varListChunk = ku_dynamic_cast<ColumnChunk*, VarListColumnChunk*>(chunk);
sizeColumn->prepareCommitForChunk(transaction, nodeGroupIdx, dstOffsets,
varListChunk->getSizeColumnChunk(), startSrcOffset);
auto dataColumnSize =
dataColumn->getMetadata(nodeGroupIdx, transaction->getType()).numValues;
auto dataColumnChunk = varListChunk->getDataColumnChunk();
auto appendListNum = std::min(chunk->getNumValues(), (uint64_t)dstOffsets.size());
auto appendListDataNum = 0u;
auto startVarListOffset = varListChunk->getListStartOffset(startSrcOffset);
std::vector<common::offset_t> dstOffsetsInDataColumn;
for (auto i = 0u; i < appendListNum; i++) {
auto appendLen = varListChunk->getListLen(startSrcOffset + i);
for (auto j = 0u; j < appendLen; j++) {
dstOffsetsInDataColumn.push_back(dataColumnSize + appendListDataNum + j);
}
appendListDataNum += appendLen;
}
dataColumn->prepareCommitForChunk(
transaction, nodeGroupIdx, dstOffsetsInDataColumn, dataColumnChunk, startVarListOffset);
// we need to update the offset since we do not do in-place list data update but append data
// in the end of list data column we need to plus to original data column size to get the
// new offset
// TODO(Jiamin): A better way is to store the offset in a offset column, just like size
// column. Then we can reuse prepareCommitForChunk interface for offset column.
auto offsetChunkMeta = getMetadata(nodeGroupIdx, transaction->getType());
auto offsetColumnChunk = ColumnChunkFactory::createColumnChunk(*dataType.copy(),
enableCompression, 1.5 * std::bit_ceil(offsetChunkMeta.numValues + dstOffsets.size()));
Column::scan(transaction, nodeGroupIdx, offsetColumnChunk.get());
for (auto i = 0u; i < appendListNum; i++) {
auto listEndOffset = varListChunk->getListEndOffset(startSrcOffset + i);
auto isNull = varListChunk->getNullChunk()->isNull(startSrcOffset + i);
offsetColumnChunk->setValue<offset_t>(dataColumnSize + listEndOffset, dstOffsets[i]);
offsetColumnChunk->getNullChunk()->setNull(dstOffsets[i], isNull);
}
Column::append(offsetColumnChunk.get(), nodeGroupIdx);
}
}

} // namespace storage
} // namespace kuzu
82 changes: 81 additions & 1 deletion src/storage/store/var_list_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include "common/data_chunk/sel_vector.h"
#include "common/types/value/value.h"
#include "storage/store/column_chunk.h"

using namespace kuzu::common;

namespace kuzu {
Expand Down Expand Up @@ -33,11 +32,26 @@ VarListColumnChunk::VarListColumnChunk(
varListDataColumnChunk = std::make_unique<VarListDataColumnChunk>(
ColumnChunkFactory::createColumnChunk(*VarListType::getChildType(&this->dataType)->copy(),
enableCompression, 0 /* capacity */, inMemory));
checkOrder = true;
KU_ASSERT(this->dataType.getPhysicalType() == PhysicalTypeID::VAR_LIST);
}

bool VarListColumnChunk::isOffsetSortedAscending(uint64_t startPos, uint64_t endPos) const {
offset_t prevEndOffset = getListStartOffset(startPos);
for (auto i = startPos; i < endPos; i++) {
offset_t currentEndOffset = getListEndOffset(i);
auto length = getListLen(i);
prevEndOffset += length;
if (currentEndOffset != prevEndOffset) {
return false;
}
}
return true;
}

void VarListColumnChunk::append(
ColumnChunk* other, offset_t startPosInOtherChunk, uint32_t numValuesToAppend) {
checkOrder = false;
auto otherListChunk = ku_dynamic_cast<ColumnChunk*, VarListColumnChunk*>(other);
nullChunk->append(other->getNullChunk(), startPosInOtherChunk, numValuesToAppend);
sizeColumnChunk->getNullChunk()->append(
Expand Down Expand Up @@ -102,6 +116,8 @@ void VarListColumnChunk::append(ValueVector* vector, SelectionVector& selVector)
numValues += numToAppend;
KU_ASSERT(sizeColumnChunk->getNumValues() == numValues);
KU_ASSERT(nullChunk->getNumValues() == numValues);
KU_ASSERT(sizeColumnChunk->getNullChunk()->getNumValues() == numValues);
KU_ASSERT(sizeColumnChunk->sanityCheck());
}

void VarListColumnChunk::appendNullList() {
Expand Down Expand Up @@ -132,13 +148,16 @@ void VarListColumnChunk::lookup(
}
KU_ASSERT(sizeColumnChunk->getNumValues() == numValues);
KU_ASSERT(nullChunk->getNumValues() == numValues);
KU_ASSERT(sizeColumnChunk->getNullChunk()->getNumValues() == numValues);
KU_ASSERT(sizeColumnChunk->sanityCheck());
}

void VarListColumnChunk::write(
ColumnChunk* chunk, ColumnChunk* dstOffsets, RelMultiplicity /*multiplicity*/) {
KU_ASSERT(chunk->getDataType().getPhysicalType() == dataType.getPhysicalType() &&
dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INT64 &&
chunk->getNumValues() == dstOffsets->getNumValues());
checkOrder = false;
offset_t currentIndex = varListDataColumnChunk->getNumValues();
auto otherListChunk = ku_dynamic_cast<ColumnChunk*, VarListColumnChunk*>(chunk);
varListDataColumnChunk->resizeBuffer(varListDataColumnChunk->getNumValues() +
Expand Down Expand Up @@ -167,10 +186,13 @@ void VarListColumnChunk::write(
}
KU_ASSERT(sizeColumnChunk->getNumValues() == numValues);
KU_ASSERT(nullChunk->getNumValues() == numValues);
KU_ASSERT(sizeColumnChunk->getNullChunk()->getNumValues() == numValues);
KU_ASSERT(sizeColumnChunk->sanityCheck());
}

void VarListColumnChunk::write(
ValueVector* vector, offset_t offsetInVector, offset_t offsetInChunk) {
checkOrder = false;
auto selVector = std::make_unique<SelectionVector>(1);
selVector->resetSelectorToValuePosBuffer();
selVector->selectedPositions[0] = offsetInVector;
Expand All @@ -193,11 +215,13 @@ void VarListColumnChunk::write(
}
KU_ASSERT(sizeColumnChunk->getNumValues() == numValues);
KU_ASSERT(nullChunk->getNumValues() == numValues);
KU_ASSERT(sizeColumnChunk->sanityCheck());
}

void VarListColumnChunk::write(ColumnChunk* srcChunk, offset_t srcOffsetInChunk,
offset_t dstOffsetInChunk, offset_t numValuesToCopy) {
KU_ASSERT(srcChunk->getDataType().getPhysicalType() == PhysicalTypeID::VAR_LIST);
checkOrder = false;
auto srcListChunk = ku_dynamic_cast<ColumnChunk*, VarListColumnChunk*>(srcChunk);
auto offsetInDataChunkToAppend = varListDataColumnChunk->getNumValues();
for (auto i = 0u; i < numValuesToCopy; i++) {
Expand All @@ -220,6 +244,7 @@ void VarListColumnChunk::write(ColumnChunk* srcChunk, offset_t srcOffsetInChunk,
}
KU_ASSERT(sizeColumnChunk->getNumValues() == numValues);
KU_ASSERT(nullChunk->getNumValues() == numValues);
KU_ASSERT(sizeColumnChunk->sanityCheck());
}

void VarListColumnChunk::copy(ColumnChunk* srcChunk, offset_t srcOffsetInChunk,
Expand Down Expand Up @@ -260,5 +285,60 @@ void VarListColumnChunk::resetOffset() {
}
}

void VarListColumnChunk::finalize() {
// rewrite the column chunk for better scanning performance
auto newColumnChunk = ColumnChunkFactory::createColumnChunk(
std::move(*dataType.copy()), enableCompression, capacity);
uint64_t totalListLen = varListDataColumnChunk->getNumValues();
uint64_t resizeThreshold = varListDataColumnChunk->capacity / 2;
// if the list is not very long, we do not need to rewrite
if (totalListLen < resizeThreshold) {
return;
}
// if we do not trigger random write, we do not need to rewrite
if (checkOrder) {
return;
}
// if the list is in ascending order, we do not need to rewrite
if (isOffsetSortedAscending(0, numValues)) {
return;
}
auto newVarListChunk = ku_dynamic_cast<ColumnChunk*, VarListColumnChunk*>(newColumnChunk.get());
newVarListChunk->resize(numValues);
newVarListChunk->getDataColumnChunk()->resize(totalListLen);
auto dataColumnChunk = newVarListChunk->getDataColumnChunk();
newVarListChunk->varListDataColumnChunk->resizeBuffer(totalListLen);
offset_t offsetInChunk = 0;
offset_t currentIndex = 0;
for (auto i = 0u; i < numValues; i++) {
if (nullChunk->isNull(i)) {
newVarListChunk->appendNullList();
} else {
auto startOffset = getListStartOffset(i);
auto appendLen = getListLen(i);
dataColumnChunk->append(
varListDataColumnChunk->dataColumnChunk.get(), startOffset, appendLen);
offsetInChunk += appendLen;
newVarListChunk->getNullChunk()->setNull(currentIndex, false);
newVarListChunk->sizeColumnChunk->getNullChunk()->setNull(currentIndex, false);
newVarListChunk->sizeColumnChunk->setValue<uint32_t>(appendLen, currentIndex);
newVarListChunk->setValue<offset_t>(offsetInChunk, currentIndex);
}
currentIndex++;
}
KU_ASSERT(sizeColumnChunk->getNumValues() == numValues);
KU_ASSERT(sizeColumnChunk->getNullChunk()->getNumValues() == numValues);
// Move offsets, null, data from newVarListChunk to this column chunk. And release indices.
resetFromOtherChunk(newVarListChunk);
}
void VarListColumnChunk::resetFromOtherChunk(VarListColumnChunk* other) {
buffer = std::move(other->buffer);
nullChunk = std::move(other->nullChunk);
sizeColumnChunk = std::move(other->sizeColumnChunk);
varListDataColumnChunk = std::move(other->varListDataColumnChunk);
numValues = other->numValues;
checkOrder = true;
}

} // namespace storage
} // namespace kuzu

0 comments on commit a8f15f7

Please sign in to comment.