Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the performance of VAR_LIST storage layout #3093

Merged
merged 1 commit into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.15)

project(Kuzu VERSION 0.3.2.3 LANGUAGES CXX C)
project(Kuzu VERSION 0.3.2.4 LANGUAGES CXX C)

find_package(Threads REQUIRED)

Expand Down
2 changes: 1 addition & 1 deletion src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ListAuxiliaryBuffer::ListAuxiliaryBuffer(
: capacity{DEFAULT_VECTOR_CAPACITY}, size{0}, dataVector{std::make_shared<ValueVector>(
dataVectorType, memoryManager)} {}

list_entry_t ListAuxiliaryBuffer::addList(uint64_t listSize) {
list_entry_t ListAuxiliaryBuffer::addList(list_size_t listSize) {
auto listEntry = list_entry_t{size, listSize};
bool needResizeDataVector = size + listSize > capacity;
while (size + listSize > capacity) {
Expand Down
7 changes: 4 additions & 3 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ constexpr node_group_idx_t INVALID_NODE_GROUP_IDX = UINT64_MAX;
using partition_idx_t = uint64_t;
constexpr partition_idx_t INVALID_PARTITION_IDX = UINT64_MAX;
using length_t = uint64_t;
using list_size_t = uint32_t;

// System representation for a variable-sized overflow value.
struct overflow_value_t {
Expand All @@ -57,10 +58,10 @@ struct overflow_value_t {

struct list_entry_t {
common::offset_t offset;
uint64_t size;
list_size_t size;

list_entry_t() : offset{INVALID_OFFSET}, size{UINT64_MAX} {}
list_entry_t(common::offset_t offset, uint64_t size) : offset{offset}, size{size} {}
list_entry_t() : offset{INVALID_OFFSET}, size{UINT32_MAX} {}
list_entry_t(common::offset_t offset, list_size_t size) : offset{offset}, size{size} {}
};

struct struct_entry_t {
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/vector/auxiliary_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ListAuxiliaryBuffer : public AuxiliaryBuffer {
ValueVector* getDataVector() const { return dataVector.get(); }
std::shared_ptr<ValueVector> getSharedDataVector() const { return dataVector; }

list_entry_t addList(uint64_t listSize);
list_entry_t addList(list_size_t listSize);

uint64_t getSize() const { return size; }

Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Column {
friend class StringColumn;
friend class VarListLocalColumn;
friend class StructColumn;
friend class VarListColumn;

public:
struct ReadState {
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class ColumnChunk {

template<typename T>
void setValue(T val, common::offset_t pos) {
KU_ASSERT(pos < capacity);
((T*)buffer.get())[pos] = val;
if (pos >= numValues) {
numValues = pos + 1;
Expand Down
93 changes: 44 additions & 49 deletions src/include/storage/store/var_list_column.h
Original file line number Diff line number Diff line change
@@ -1,43 +1,45 @@
#pragma once

#include "column.h"
#include "var_list_column_chunk.h"

// List is a nested data type which is stored as two chunks:
// List is a nested data type which is stored as three chunks:
// 1. Offset column (type: INT64). Using offset to partition the data column into multiple lists.
// 2. Data column. Stores the actual data of the list.
// 2. Size column. Stores the size of each list.
// 3. Data column. Stores the actual data of the list.
// Similar to other data types, nulls are stored in the null column.
// Example layout for list of INT64:
// Four lists: [4,7,8,12], null, [2, 3], []
// Offset column: [4, 4, 6, 6]
// Size column: [4, 0, 2, 0]
// data column: [4, 7, 8, 12, 2, 3]
// When reading the data, we firstly read the offset column and utilize the offset column to find
// the data column partitions.
// 1st list(offset 4): Since it is the first element, the start offset is a constant 0, and the end
// offset is 4. Its data is stored at position 0-4 in the data column.
// 2nd list(offset 4): By reading the null column, we know that the 2nd list is null. So we don't
// need to read from the data column.
// 3rd list(offset 6): The start offset is 4(by looking up the offset for the 2nd list), and the end
// offset is 6. Its data is stored at position 4-6 in the data column.
// 4th list(offset 6): The start offset is 6(by looking up the offset for the 3rd list), and the end
// offset is 6. Its data is stored at position 6-6 in the data column (empty list).
// When updating the data, we first append the data to the data column, and then update the offset
// and size accordingly. Besides offset column, we introduce an extra size column here to enable
// in-place updates of a list column. In a list column chunk, offsets of lists are not always sorted
// after updates. This is good for writes, but it introduces extra overheads for scans, as lists can
// be scattered, and scans have to be broken into multiple small reads. To achieve a balance between
// reads and writes, during updates, we rewrite the whole var list column chunk in ascending order
// when the offsets are not sorted in ascending order and the size of data column chunk is larger
// than half of its capacity.

namespace kuzu {
namespace storage {

struct ListOffsetInfoInStorage {
common::offset_t prevNodeListOffset;
std::vector<std::unique_ptr<common::ValueVector>> offsetVectors;
struct ListOffsetSizeInfo {
common::offset_t numTotal;
std::unique_ptr<ColumnChunk> offsetColumnChunk;
std::unique_ptr<ColumnChunk> sizeColumnChunk;

ListOffsetInfoInStorage(common::offset_t prevNodeListOffset,
std::vector<std::unique_ptr<common::ValueVector>> offsetVectors)
: prevNodeListOffset{prevNodeListOffset}, offsetVectors{std::move(offsetVectors)} {}
ListOffsetSizeInfo(common::offset_t numTotal, std::unique_ptr<ColumnChunk> offsetColumnChunk,
std::unique_ptr<ColumnChunk> sizeColumnChunk)
: numTotal{numTotal}, offsetColumnChunk{std::move(offsetColumnChunk)},
sizeColumnChunk{std::move(sizeColumnChunk)} {}

common::offset_t getListOffset(uint64_t nodePos) const;
common::list_size_t getListSize(uint64_t pos) const;
common::offset_t getListEndOffset(uint64_t pos) const;
common::offset_t getListStartOffset(uint64_t pos) const;

inline uint64_t getListLength(uint64_t nodePos) const {
KU_ASSERT(getListOffset(nodePos + 1) >= getListOffset(nodePos));
return getListOffset(nodePos + 1) - getListOffset(nodePos);
}
bool isOffsetSortedAscending(uint64_t startPos, uint64_t endPos) const;
};

class VarListColumn : public Column {
Expand Down Expand Up @@ -69,45 +71,38 @@ class VarListColumn : public Column {
void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) override;

private:
inline common::offset_t readListOffsetInStorage(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup) {
return offsetInNodeGroup == 0 ?
0 :
readOffset(transaction, nodeGroupIdx, offsetInNodeGroup - 1);
}

void scanUnfiltered(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::ValueVector* resultVector,
const ListOffsetInfoInStorage& listOffsetInfoInStorage);
const ListOffsetSizeInfo& listOffsetInfoInStorage);
void scanFiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::ValueVector* offsetVector, const ListOffsetInfoInStorage& listOffsetInfoInStorage);

inline bool canCommitInPlace(transaction::Transaction*, common::node_group_idx_t,
const ChunkCollection&, const offset_to_row_idx_t&, const ChunkCollection&,
const offset_to_row_idx_t&) override {
// Always perform out-of-place commit for VAR_LIST columns.
return false;
}
inline bool canCommitInPlace(transaction::Transaction* /*transaction*/,
common::node_group_idx_t /*nodeGroupIdx*/,
const std::vector<common::offset_t>& /*dstOffsets*/, ColumnChunk* /*chunk*/,
common::offset_t /*startOffset*/) override {
// Always perform out-of-place commit for VAR_LIST columns.
return false;
}
common::ValueVector* offsetVector, const ListOffsetSizeInfo& listOffsetInfoInStorage);

void checkpointInMemory() final;
void rollbackInMemory() final;

common::offset_t readOffset(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup);
ListOffsetInfoInStorage getListOffsetInfoInStorage(transaction::Transaction* transaction,

common::list_size_t readSize(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup);

ListOffsetSizeInfo getListOffsetSizeInfo(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInNodeGroup,
common::offset_t endOffsetInNodeGroup,
const std::shared_ptr<common::DataChunkState>& state);
common::offset_t endOffsetInNodeGroup);

void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks,
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) override;
void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const std::vector<common::offset_t>& dstOffsets,
ColumnChunk* chunk, common::offset_t startSrcOffset) override;

private:
std::unique_ptr<Column> sizeColumn;
std::unique_ptr<Column> dataColumn;
// TODO(Guodong): This should be moved to table states.
std::unique_ptr<VarListDataColumnChunk> tmpDataColumnChunk;
hououou marked this conversation as resolved.
Show resolved Hide resolved
};

} // namespace storage
Expand Down
51 changes: 25 additions & 26 deletions src/include/storage/store/var_list_column_chunk.h
hououou marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,15 @@ class VarListColumnChunk final : public ColumnChunk {
return varListDataColumnChunk->dataColumnChunk.get();
}

inline ColumnChunk* getSizeColumnChunk() const { return sizeColumnChunk.get(); }

void resetToEmpty() override;

inline void setNumValues(uint64_t numValues_) override {
ColumnChunk::setNumValues(numValues_);
sizeColumnChunk->setNumValues(numValues_);
}

void append(common::ValueVector* vector, common::SelectionVector& selVector) final;

void lookup(common::offset_t offsetInChunk, common::ValueVector& output,
Expand All @@ -56,45 +63,37 @@ class VarListColumnChunk final : public ColumnChunk {
varListDataColumnChunk->resizeBuffer(numValues);
}

void finalize() override;

inline common::offset_t getListOffset(common::offset_t offset) const {
return offset == 0 ? 0 : getValue<uint64_t>(offset - 1);
inline void resize(uint64_t newCapacity) override {
ColumnChunk::resize(newCapacity);
sizeColumnChunk->resize(newCapacity);
}

common::offset_t getListStartOffset(common::offset_t offset) const;

common::offset_t getListEndOffset(common::offset_t offset) const;

common::list_size_t getListSize(common::offset_t offset) const;

void resetOffset();
void resetFromOtherChunk(VarListColumnChunk* other);
void finalize() override;
bool isOffsetsConsecutiveAndSortedAscending(uint64_t startPos, uint64_t endPos) const;
bool sanityCheck() override;

protected:
void copyListValues(const common::list_entry_t& entry, common::ValueVector* dataVector);

private:
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) override;

inline void initializeIndices() {
indicesColumnChunk = ColumnChunkFactory::createColumnChunk(
*common::LogicalType::INT64(), false /*enableCompression*/, capacity);
indicesColumnChunk->getNullChunk()->resetToAllNull();
for (auto i = 0u; i < numValues; i++) {
indicesColumnChunk->setValue<common::offset_t>(i, i);
indicesColumnChunk->getNullChunk()->setNull(i, nullChunk->isNull(i));
}
indicesColumnChunk->setNumValues(numValues);
}
inline uint64_t getListLen(common::offset_t offset) const {
return getListOffset(offset + 1) - getListOffset(offset);
}

void resetFromOtherChunk(VarListColumnChunk* other);
void appendNullList();

protected:
std::unique_ptr<ColumnChunk> sizeColumnChunk;
std::unique_ptr<VarListDataColumnChunk> varListDataColumnChunk;
// The following is needed to write var list to random positions in the column chunk.
// We first append var list to the end of the column chunk. Then use indicesColumnChunk to track
// where each var list data is inside the column chunk.
// `needFinalize` is set to true whenever `write` is called.
// During `finalize`, the whole column chunk will be re-written according to indices.
bool needFinalize;
std::unique_ptr<ColumnChunk> indicesColumnChunk;
// we use checkOffsetSortedAsc flag to indicate that we do not trigger random write
bool checkOffsetSortedAsc;
};

} // namespace storage
Expand Down
5 changes: 3 additions & 2 deletions src/processor/operator/unwind.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ void Unwind::copyTuplesToOutVector(uint64_t startPos, uint64_t endPos) const {

bool Unwind::getNextTuplesInternal(ExecutionContext* context) {
if (hasMoreToRead()) {
auto totalElementsCopy = std::min(DEFAULT_VECTOR_CAPACITY, listEntry.size - startIndex);
auto totalElementsCopy =
std::min(DEFAULT_VECTOR_CAPACITY, (uint64_t)listEntry.size - startIndex);
copyTuplesToOutVector(startIndex, (totalElementsCopy + startIndex));
startIndex += totalElementsCopy;
outValueVector->state->initOriginalAndSelectedSize(totalElementsCopy);
Expand All @@ -42,7 +43,7 @@ bool Unwind::getNextTuplesInternal(ExecutionContext* context) {
}
listEntry = expressionEvaluator->resultVector->getValue<list_entry_t>(pos);
startIndex = 0;
auto totalElementsCopy = std::min(DEFAULT_VECTOR_CAPACITY, listEntry.size);
auto totalElementsCopy = std::min(DEFAULT_VECTOR_CAPACITY, (uint64_t)listEntry.size);
copyTuplesToOutVector(0, totalElementsCopy);
startIndex += totalElementsCopy;
outValueVector->state->initOriginalAndSelectedSize(startIndex);
Expand Down
2 changes: 2 additions & 0 deletions src/storage/stats/table_statistics_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ std::unique_ptr<MetadataDAHInfo> TablesStatistics::createMetadataDAHInfo(
}
} break;
case PhysicalTypeID::VAR_LIST: {
metadataDAHInfo->childrenInfos.push_back(
createMetadataDAHInfo(*LogicalType::UINT32(), metadataFH, bm, wal));
metadataDAHInfo->childrenInfos.push_back(
createMetadataDAHInfo(*VarListType::getChildType(&dataType), metadataFH, bm, wal));
} break;
Expand Down
3 changes: 2 additions & 1 deletion src/storage/store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,8 @@ void Column::commitColumnChunkOutOfPlace(Transaction* transaction, node_group_id
auto chunkMeta = getMetadata(nodeGroupIdx, transaction->getType());
// TODO(Guodong): Should consider caching the scanned column chunk to avoid redundant
// scans in the same transaction.
auto columnChunk = getEmptyChunkForCommit(chunkMeta.numValues + dstOffsets.size());
auto columnChunk =
getEmptyChunkForCommit(1.5 * std::bit_ceil(chunkMeta.numValues + dstOffsets.size()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 1.5*? Is there performance implications behind this?

scan(transaction, nodeGroupIdx, columnChunk.get());
for (auto i = 0u; i < dstOffsets.size(); i++) {
columnChunk->write(chunk, srcOffset + i, dstOffsets[i], 1 /* numValues */);
Expand Down
1 change: 1 addition & 0 deletions src/storage/store/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ void ColumnChunk::append(
KU_ASSERT(nullChunk->getNumValues() == getNumValues());
nullChunk->append(other->nullChunk.get(), startPosInOtherChunk, numValuesToAppend);
}
KU_ASSERT(numValues + numValuesToAppend <= capacity);
memcpy(buffer.get() + numValues * numBytesPerValue,
other->buffer.get() + startPosInOtherChunk * numBytesPerValue,
numValuesToAppend * numBytesPerValue);
Expand Down
Loading
Loading