Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
rework var list

improve performance

trigger ci

solve comments

fix

bump version
  • Loading branch information
ray6080 authored and hououou committed Mar 23, 2024
1 parent f9bc0c6 commit 6870d28
Show file tree
Hide file tree
Showing 15 changed files with 545 additions and 231 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.15)

project(Kuzu VERSION 0.3.2.3 LANGUAGES CXX C)
project(Kuzu VERSION 0.3.2.4 LANGUAGES CXX C)

find_package(Threads REQUIRED)

Expand Down
2 changes: 1 addition & 1 deletion src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ListAuxiliaryBuffer::ListAuxiliaryBuffer(
: capacity{DEFAULT_VECTOR_CAPACITY}, size{0}, dataVector{std::make_shared<ValueVector>(
dataVectorType, memoryManager)} {}

list_entry_t ListAuxiliaryBuffer::addList(uint64_t listSize) {
list_entry_t ListAuxiliaryBuffer::addList(list_size_t listSize) {
auto listEntry = list_entry_t{size, listSize};
bool needResizeDataVector = size + listSize > capacity;
while (size + listSize > capacity) {
Expand Down
7 changes: 4 additions & 3 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ constexpr node_group_idx_t INVALID_NODE_GROUP_IDX = UINT64_MAX;
using partition_idx_t = uint64_t;
constexpr partition_idx_t INVALID_PARTITION_IDX = UINT64_MAX;
using length_t = uint64_t;
using list_size_t = uint32_t;

// System representation for a variable-sized overflow value.
struct overflow_value_t {
Expand All @@ -57,10 +58,10 @@ struct overflow_value_t {

struct list_entry_t {
common::offset_t offset;
uint64_t size;
list_size_t size;

list_entry_t() : offset{INVALID_OFFSET}, size{UINT64_MAX} {}
list_entry_t(common::offset_t offset, uint64_t size) : offset{offset}, size{size} {}
list_entry_t() : offset{INVALID_OFFSET}, size{UINT32_MAX} {}
list_entry_t(common::offset_t offset, list_size_t size) : offset{offset}, size{size} {}
};

struct struct_entry_t {
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/vector/auxiliary_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ListAuxiliaryBuffer : public AuxiliaryBuffer {
ValueVector* getDataVector() const { return dataVector.get(); }
std::shared_ptr<ValueVector> getSharedDataVector() const { return dataVector; }

list_entry_t addList(uint64_t listSize);
list_entry_t addList(list_size_t listSize);

uint64_t getSize() const { return size; }

Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Column {
friend class StringColumn;
friend class VarListLocalColumn;
friend class StructColumn;
friend class VarListColumn;

public:
struct ReadState {
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class ColumnChunk {

template<typename T>
void setValue(T val, common::offset_t pos) {
KU_ASSERT(pos < capacity);
((T*)buffer.get())[pos] = val;
if (pos >= numValues) {
numValues = pos + 1;
Expand Down
93 changes: 44 additions & 49 deletions src/include/storage/store/var_list_column.h
Original file line number Diff line number Diff line change
@@ -1,43 +1,45 @@
#pragma once

#include "column.h"
#include "var_list_column_chunk.h"

// List is a nested data type which is stored as two chunks:
// List is a nested data type which is stored as three chunks:
// 1. Offset column (type: INT64). Using offset to partition the data column into multiple lists.
// 2. Data column. Stores the actual data of the list.
// 2. Size column. Stores the size of each list.
// 3. Data column. Stores the actual data of the list.
// Similar to other data types, nulls are stored in the null column.
// Example layout for list of INT64:
// Four lists: [4,7,8,12], null, [2, 3], []
// Offset column: [4, 4, 6, 6]
// Size column: [4, 0, 2, 0]
// data column: [4, 7, 8, 12, 2, 3]
// When reading the data, we firstly read the offset column and utilize the offset column to find
// the data column partitions.
// 1st list(offset 4): Since it is the first element, the start offset is a constant 0, and the end
// offset is 4. Its data is stored at position 0-4 in the data column.
// 2nd list(offset 4): By reading the null column, we know that the 2nd list is null. So we don't
// need to read from the data column.
// 3rd list(offset 6): The start offset is 4(by looking up the offset for the 2nd list), and the end
// offset is 6. Its data is stored at position 4-6 in the data column.
// 4th list(offset 6): The start offset is 6(by looking up the offset for the 3rd list), and the end
// offset is 6. Its data is stored at position 6-6 in the data column (empty list).
// When updating the data, we first append the data to the data column, and then update the offset
// and size accordingly. Besides offset column, we introduce an extra size column here to enable
// in-place updates of a list column. In a list column chunk, offsets of lists are not always sorted
// after updates. This is good for writes, but it introduces extra overheads for scans, as lists can
// be scattered, and scans have to be broken into multiple small reads. To achieve a balance between
// reads and writes, during updates, we rewrite the whole var list column chunk in ascending order
// when the offsets are not sorted in ascending order and the size of data column chunk is larger
// than half of its capacity.

namespace kuzu {
namespace storage {

struct ListOffsetInfoInStorage {
common::offset_t prevNodeListOffset;
std::vector<std::unique_ptr<common::ValueVector>> offsetVectors;
struct ListOffsetSizeInfo {
common::offset_t numTotal;
std::unique_ptr<ColumnChunk> offsetColumnChunk;
std::unique_ptr<ColumnChunk> sizeColumnChunk;

ListOffsetInfoInStorage(common::offset_t prevNodeListOffset,
std::vector<std::unique_ptr<common::ValueVector>> offsetVectors)
: prevNodeListOffset{prevNodeListOffset}, offsetVectors{std::move(offsetVectors)} {}
ListOffsetSizeInfo(common::offset_t numTotal, std::unique_ptr<ColumnChunk> offsetColumnChunk,
std::unique_ptr<ColumnChunk> sizeColumnChunk)
: numTotal{numTotal}, offsetColumnChunk{std::move(offsetColumnChunk)},
sizeColumnChunk{std::move(sizeColumnChunk)} {}

common::offset_t getListOffset(uint64_t nodePos) const;
common::list_size_t getListSize(uint64_t pos) const;
common::offset_t getListEndOffset(uint64_t pos) const;
common::offset_t getListStartOffset(uint64_t pos) const;

inline uint64_t getListLength(uint64_t nodePos) const {
KU_ASSERT(getListOffset(nodePos + 1) >= getListOffset(nodePos));
return getListOffset(nodePos + 1) - getListOffset(nodePos);
}
bool isOffsetSortedAscending(uint64_t startPos, uint64_t endPos) const;
};

class VarListColumn : public Column {
Expand Down Expand Up @@ -69,45 +71,38 @@ class VarListColumn : public Column {
void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) override;

private:
inline common::offset_t readListOffsetInStorage(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup) {
return offsetInNodeGroup == 0 ?
0 :
readOffset(transaction, nodeGroupIdx, offsetInNodeGroup - 1);
}

void scanUnfiltered(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::ValueVector* resultVector,
const ListOffsetInfoInStorage& listOffsetInfoInStorage);
const ListOffsetSizeInfo& listOffsetInfoInStorage);
void scanFiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::ValueVector* offsetVector, const ListOffsetInfoInStorage& listOffsetInfoInStorage);

inline bool canCommitInPlace(transaction::Transaction*, common::node_group_idx_t,
const ChunkCollection&, const offset_to_row_idx_t&, const ChunkCollection&,
const offset_to_row_idx_t&) override {
// Always perform out-of-place commit for VAR_LIST columns.
return false;
}
inline bool canCommitInPlace(transaction::Transaction* /*transaction*/,
common::node_group_idx_t /*nodeGroupIdx*/,
const std::vector<common::offset_t>& /*dstOffsets*/, ColumnChunk* /*chunk*/,
common::offset_t /*startOffset*/) override {
// Always perform out-of-place commit for VAR_LIST columns.
return false;
}
common::ValueVector* offsetVector, const ListOffsetSizeInfo& listOffsetInfoInStorage);

void checkpointInMemory() final;
void rollbackInMemory() final;

common::offset_t readOffset(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup);
ListOffsetInfoInStorage getListOffsetInfoInStorage(transaction::Transaction* transaction,

common::list_size_t readSize(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup);

ListOffsetSizeInfo getListOffsetSizeInfo(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInNodeGroup,
common::offset_t endOffsetInNodeGroup,
const std::shared_ptr<common::DataChunkState>& state);
common::offset_t endOffsetInNodeGroup);

void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks,
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) override;
void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const std::vector<common::offset_t>& dstOffsets,
ColumnChunk* chunk, common::offset_t startSrcOffset) override;

private:
std::unique_ptr<Column> sizeColumn;
std::unique_ptr<Column> dataColumn;
// TODO(Guodong): This should be moved to table states.
std::unique_ptr<VarListDataColumnChunk> tmpDataColumnChunk;
};

} // namespace storage
Expand Down
51 changes: 25 additions & 26 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,15 @@ class VarListColumnChunk final : public ColumnChunk {
return varListDataColumnChunk->dataColumnChunk.get();
}

inline ColumnChunk* getSizeColumnChunk() const { return sizeColumnChunk.get(); }

void resetToEmpty() override;

inline void setNumValues(uint64_t numValues_) override {
ColumnChunk::setNumValues(numValues_);
sizeColumnChunk->setNumValues(numValues_);
}

void append(common::ValueVector* vector, common::SelectionVector& selVector) final;

void lookup(common::offset_t offsetInChunk, common::ValueVector& output,
Expand All @@ -56,45 +63,37 @@ class VarListColumnChunk final : public ColumnChunk {
varListDataColumnChunk->resizeBuffer(numValues);
}

void finalize() override;

inline common::offset_t getListOffset(common::offset_t offset) const {
return offset == 0 ? 0 : getValue<uint64_t>(offset - 1);
inline void resize(uint64_t newCapacity) override {
ColumnChunk::resize(newCapacity);
sizeColumnChunk->resize(newCapacity);
}

common::offset_t getListStartOffset(common::offset_t offset) const;

common::offset_t getListEndOffset(common::offset_t offset) const;

common::list_size_t getListSize(common::offset_t offset) const;

void resetOffset();
void resetFromOtherChunk(VarListColumnChunk* other);
void finalize() override;
bool isOffsetsConsecutiveAndSortedAscending(uint64_t startPos, uint64_t endPos) const;
bool sanityCheck() override;

protected:
void copyListValues(const common::list_entry_t& entry, common::ValueVector* dataVector);

private:
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) override;

inline void initializeIndices() {
indicesColumnChunk = ColumnChunkFactory::createColumnChunk(
*common::LogicalType::INT64(), false /*enableCompression*/, capacity);
indicesColumnChunk->getNullChunk()->resetToAllNull();
for (auto i = 0u; i < numValues; i++) {
indicesColumnChunk->setValue<common::offset_t>(i, i);
indicesColumnChunk->getNullChunk()->setNull(i, nullChunk->isNull(i));
}
indicesColumnChunk->setNumValues(numValues);
}
inline uint64_t getListLen(common::offset_t offset) const {
return getListOffset(offset + 1) - getListOffset(offset);
}

void resetFromOtherChunk(VarListColumnChunk* other);
void appendNullList();

protected:
std::unique_ptr<ColumnChunk> sizeColumnChunk;
std::unique_ptr<VarListDataColumnChunk> varListDataColumnChunk;
// The following is needed to write var list to random positions in the column chunk.
// We first append var list to the end of the column chunk. Then use indicesColumnChunk to track
// where each var list data is inside the column chunk.
// `needFinalize` is set to true whenever `write` is called.
// During `finalize`, the whole column chunk will be re-written according to indices.
bool needFinalize;
std::unique_ptr<ColumnChunk> indicesColumnChunk;
// we use checkOffsetSortedAsc flag to indicate that we do not trigger random write
bool checkOffsetSortedAsc;
};

} // namespace storage
Expand Down
5 changes: 3 additions & 2 deletions src/processor/operator/unwind.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ void Unwind::copyTuplesToOutVector(uint64_t startPos, uint64_t endPos) const {

bool Unwind::getNextTuplesInternal(ExecutionContext* context) {
if (hasMoreToRead()) {
auto totalElementsCopy = std::min(DEFAULT_VECTOR_CAPACITY, listEntry.size - startIndex);
auto totalElementsCopy =
std::min(DEFAULT_VECTOR_CAPACITY, (uint64_t)listEntry.size - startIndex);
copyTuplesToOutVector(startIndex, (totalElementsCopy + startIndex));
startIndex += totalElementsCopy;
outValueVector->state->initOriginalAndSelectedSize(totalElementsCopy);
Expand All @@ -42,7 +43,7 @@ bool Unwind::getNextTuplesInternal(ExecutionContext* context) {
}
listEntry = expressionEvaluator->resultVector->getValue<list_entry_t>(pos);
startIndex = 0;
auto totalElementsCopy = std::min(DEFAULT_VECTOR_CAPACITY, listEntry.size);
auto totalElementsCopy = std::min(DEFAULT_VECTOR_CAPACITY, (uint64_t)listEntry.size);
copyTuplesToOutVector(0, totalElementsCopy);
startIndex += totalElementsCopy;
outValueVector->state->initOriginalAndSelectedSize(startIndex);
Expand Down
2 changes: 2 additions & 0 deletions src/storage/stats/table_statistics_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ std::unique_ptr<MetadataDAHInfo> TablesStatistics::createMetadataDAHInfo(
}
} break;
case PhysicalTypeID::VAR_LIST: {
metadataDAHInfo->childrenInfos.push_back(
createMetadataDAHInfo(*LogicalType::UINT32(), metadataFH, bm, wal));
metadataDAHInfo->childrenInfos.push_back(
createMetadataDAHInfo(*VarListType::getChildType(&dataType), metadataFH, bm, wal));
} break;
Expand Down
3 changes: 2 additions & 1 deletion src/storage/store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,8 @@ void Column::commitColumnChunkOutOfPlace(Transaction* transaction, node_group_id
auto chunkMeta = getMetadata(nodeGroupIdx, transaction->getType());
// TODO(Guodong): Should consider caching the scanned column chunk to avoid redundant
// scans in the same transaction.
auto columnChunk = getEmptyChunkForCommit(chunkMeta.numValues + dstOffsets.size());
auto columnChunk =
getEmptyChunkForCommit(1.5 * std::bit_ceil(chunkMeta.numValues + dstOffsets.size()));
scan(transaction, nodeGroupIdx, columnChunk.get());
for (auto i = 0u; i < dstOffsets.size(); i++) {
columnChunk->write(chunk, srcOffset + i, dstOffsets[i], 1 /* numValues */);
Expand Down
1 change: 1 addition & 0 deletions src/storage/store/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ void ColumnChunk::append(
KU_ASSERT(nullChunk->getNumValues() == getNumValues());
nullChunk->append(other->nullChunk.get(), startPosInOtherChunk, numValuesToAppend);
}
KU_ASSERT(numValues + numValuesToAppend <= capacity);
memcpy(buffer.get() + numValues * numBytesPerValue,
other->buffer.get() + startPosInOtherChunk * numBytesPerValue,
numValuesToAppend * numBytesPerValue);
Expand Down
Loading

0 comments on commit 6870d28

Please sign in to comment.