From 599b80f138ad0262d6dedbcc54643af37e3ebc55 Mon Sep 17 00:00:00 2001 From: Jiamin Hou Date: Sun, 24 Mar 2024 00:19:03 -0400 Subject: [PATCH] Rework var list storage layout (#3093) --- CMakeLists.txt | 2 +- src/common/vector/auxiliary_buffer.cpp | 2 +- src/include/common/types/types.h | 7 +- src/include/common/vector/auxiliary_buffer.h | 2 +- src/include/storage/store/column.h | 1 + src/include/storage/store/column_chunk.h | 1 + src/include/storage/store/var_list_column.h | 93 +++-- .../storage/store/var_list_column_chunk.h | 51 ++- src/processor/operator/unwind.cpp | 5 +- .../stats/table_statistics_collection.cpp | 2 + src/storage/store/column.cpp | 3 +- src/storage/store/column_chunk.cpp | 1 + src/storage/store/var_list_column.cpp | 328 ++++++++++++++---- src/storage/store/var_list_column_chunk.cpp | 253 ++++++++++---- test/test_files/issue/issue4.test | 25 ++ 15 files changed, 545 insertions(+), 231 deletions(-) create mode 100644 test/test_files/issue/issue4.test diff --git a/CMakeLists.txt b/CMakeLists.txt index 7cc3baa58e..ef4a8b5e37 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.15) -project(Kuzu VERSION 0.3.2.3 LANGUAGES CXX C) +project(Kuzu VERSION 0.3.2.4 LANGUAGES CXX C) find_package(Threads REQUIRED) diff --git a/src/common/vector/auxiliary_buffer.cpp b/src/common/vector/auxiliary_buffer.cpp index 27e0bc91c3..20c4a47fd0 100644 --- a/src/common/vector/auxiliary_buffer.cpp +++ b/src/common/vector/auxiliary_buffer.cpp @@ -19,7 +19,7 @@ ListAuxiliaryBuffer::ListAuxiliaryBuffer( : capacity{DEFAULT_VECTOR_CAPACITY}, size{0}, dataVector{std::make_shared( dataVectorType, memoryManager)} {} -list_entry_t ListAuxiliaryBuffer::addList(uint64_t listSize) { +list_entry_t ListAuxiliaryBuffer::addList(list_size_t listSize) { auto listEntry = list_entry_t{size, listSize}; bool needResizeDataVector = size + listSize > capacity; while (size + listSize > capacity) { diff --git a/src/include/common/types/types.h b/src/include/common/types/types.h index 6f4b202fcd..94167124eb 100644 --- a/src/include/common/types/types.h +++ b/src/include/common/types/types.h @@ -46,6 +46,7 @@ constexpr node_group_idx_t INVALID_NODE_GROUP_IDX = UINT64_MAX; using partition_idx_t = uint64_t; constexpr partition_idx_t INVALID_PARTITION_IDX = UINT64_MAX; using length_t = uint64_t; +using list_size_t = uint32_t; // System representation for a variable-sized overflow value. struct overflow_value_t { @@ -57,10 +58,10 @@ struct overflow_value_t { struct list_entry_t { common::offset_t offset; - uint64_t size; + list_size_t size; - list_entry_t() : offset{INVALID_OFFSET}, size{UINT64_MAX} {} - list_entry_t(common::offset_t offset, uint64_t size) : offset{offset}, size{size} {} + list_entry_t() : offset{INVALID_OFFSET}, size{UINT32_MAX} {} + list_entry_t(common::offset_t offset, list_size_t size) : offset{offset}, size{size} {} }; struct struct_entry_t { diff --git a/src/include/common/vector/auxiliary_buffer.h b/src/include/common/vector/auxiliary_buffer.h index bac0a4002e..476b1d009a 100644 --- a/src/include/common/vector/auxiliary_buffer.h +++ b/src/include/common/vector/auxiliary_buffer.h @@ -73,7 +73,7 @@ class ListAuxiliaryBuffer : public AuxiliaryBuffer { ValueVector* getDataVector() const { return dataVector.get(); } std::shared_ptr getSharedDataVector() const { return dataVector; } - list_entry_t addList(uint64_t listSize); + list_entry_t addList(list_size_t listSize); uint64_t getSize() const { return size; } diff --git a/src/include/storage/store/column.h b/src/include/storage/store/column.h index 0683d1308d..4a8493c438 100644 --- a/src/include/storage/store/column.h +++ b/src/include/storage/store/column.h @@ -32,6 +32,7 @@ class Column { friend class StringColumn; friend class VarListLocalColumn; friend class StructColumn; + friend class VarListColumn; public: struct ReadState { diff --git a/src/include/storage/store/column_chunk.h b/src/include/storage/store/column_chunk.h index ac1495ccf7..c9674ae6db 100644 --- a/src/include/storage/store/column_chunk.h +++ b/src/include/storage/store/column_chunk.h @@ -93,6 +93,7 @@ class ColumnChunk { template void setValue(T val, common::offset_t pos) { + KU_ASSERT(pos < capacity); ((T*)buffer.get())[pos] = val; if (pos >= numValues) { numValues = pos + 1; diff --git a/src/include/storage/store/var_list_column.h b/src/include/storage/store/var_list_column.h index 8fdef63e78..43d8e1b753 100644 --- a/src/include/storage/store/var_list_column.h +++ b/src/include/storage/store/var_list_column.h @@ -1,43 +1,45 @@ #pragma once #include "column.h" +#include "var_list_column_chunk.h" -// List is a nested data type which is stored as two chunks: +// List is a nested data type which is stored as three chunks: // 1. Offset column (type: INT64). Using offset to partition the data column into multiple lists. -// 2. Data column. Stores the actual data of the list. +// 2. Size column. Stores the size of each list. +// 3. Data column. Stores the actual data of the list. // Similar to other data types, nulls are stored in the null column. // Example layout for list of INT64: // Four lists: [4,7,8,12], null, [2, 3], [] // Offset column: [4, 4, 6, 6] +// Size column: [4, 0, 2, 0] // data column: [4, 7, 8, 12, 2, 3] -// When reading the data, we firstly read the offset column and utilize the offset column to find -// the data column partitions. -// 1st list(offset 4): Since it is the first element, the start offset is a constant 0, and the end -// offset is 4. Its data is stored at position 0-4 in the data column. -// 2nd list(offset 4): By reading the null column, we know that the 2nd list is null. So we don't -// need to read from the data column. -// 3rd list(offset 6): The start offset is 4(by looking up the offset for the 2nd list), and the end -// offset is 6. Its data is stored at position 4-6 in the data column. -// 4th list(offset 6): The start offset is 6(by looking up the offset for the 3rd list), and the end -// offset is 6. Its data is stored at position 6-6 in the data column (empty list). +// When updating the data, we first append the data to the data column, and then update the offset +// and size accordingly. Besides offset column, we introduce an extra size column here to enable +// in-place updates of a list column. In a list column chunk, offsets of lists are not always sorted +// after updates. This is good for writes, but it introduces extra overheads for scans, as lists can +// be scattered, and scans have to be broken into multiple small reads. To achieve a balance between +// reads and writes, during updates, we rewrite the whole var list column chunk in ascending order +// when the offsets are not sorted in ascending order and the size of data column chunk is larger +// than half of its capacity. namespace kuzu { namespace storage { -struct ListOffsetInfoInStorage { - common::offset_t prevNodeListOffset; - std::vector> offsetVectors; +struct ListOffsetSizeInfo { + common::offset_t numTotal; + std::unique_ptr offsetColumnChunk; + std::unique_ptr sizeColumnChunk; - ListOffsetInfoInStorage(common::offset_t prevNodeListOffset, - std::vector> offsetVectors) - : prevNodeListOffset{prevNodeListOffset}, offsetVectors{std::move(offsetVectors)} {} + ListOffsetSizeInfo(common::offset_t numTotal, std::unique_ptr offsetColumnChunk, + std::unique_ptr sizeColumnChunk) + : numTotal{numTotal}, offsetColumnChunk{std::move(offsetColumnChunk)}, + sizeColumnChunk{std::move(sizeColumnChunk)} {} - common::offset_t getListOffset(uint64_t nodePos) const; + common::list_size_t getListSize(uint64_t pos) const; + common::offset_t getListEndOffset(uint64_t pos) const; + common::offset_t getListStartOffset(uint64_t pos) const; - inline uint64_t getListLength(uint64_t nodePos) const { - KU_ASSERT(getListOffset(nodePos + 1) >= getListOffset(nodePos)); - return getListOffset(nodePos + 1) - getListOffset(nodePos); - } + bool isOffsetSortedAscending(uint64_t startPos, uint64_t endPos) const; }; class VarListColumn : public Column { @@ -69,45 +71,38 @@ class VarListColumn : public Column { void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) override; private: - inline common::offset_t readListOffsetInStorage(transaction::Transaction* transaction, - common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup) { - return offsetInNodeGroup == 0 ? - 0 : - readOffset(transaction, nodeGroupIdx, offsetInNodeGroup - 1); - } - void scanUnfiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, common::ValueVector* resultVector, - const ListOffsetInfoInStorage& listOffsetInfoInStorage); + const ListOffsetSizeInfo& listOffsetInfoInStorage); void scanFiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, - common::ValueVector* offsetVector, const ListOffsetInfoInStorage& listOffsetInfoInStorage); - - inline bool canCommitInPlace(transaction::Transaction*, common::node_group_idx_t, - const ChunkCollection&, const offset_to_row_idx_t&, const ChunkCollection&, - const offset_to_row_idx_t&) override { - // Always perform out-of-place commit for VAR_LIST columns. - return false; - } - inline bool canCommitInPlace(transaction::Transaction* /*transaction*/, - common::node_group_idx_t /*nodeGroupIdx*/, - const std::vector& /*dstOffsets*/, ColumnChunk* /*chunk*/, - common::offset_t /*startOffset*/) override { - // Always perform out-of-place commit for VAR_LIST columns. - return false; - } + common::ValueVector* offsetVector, const ListOffsetSizeInfo& listOffsetInfoInStorage); void checkpointInMemory() final; void rollbackInMemory() final; common::offset_t readOffset(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup); - ListOffsetInfoInStorage getListOffsetInfoInStorage(transaction::Transaction* transaction, + + common::list_size_t readSize(transaction::Transaction* transaction, + common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup); + + ListOffsetSizeInfo getListOffsetSizeInfo(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInNodeGroup, - common::offset_t endOffsetInNodeGroup, - const std::shared_ptr& state); + common::offset_t endOffsetInNodeGroup); + + void prepareCommitForChunk(transaction::Transaction* transaction, + common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks, + const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, + const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) override; + void prepareCommitForChunk(transaction::Transaction* transaction, + common::node_group_idx_t nodeGroupIdx, const std::vector& dstOffsets, + ColumnChunk* chunk, common::offset_t startSrcOffset) override; private: + std::unique_ptr sizeColumn; std::unique_ptr dataColumn; + // TODO(Guodong): This should be moved to table states. + std::unique_ptr tmpDataColumnChunk; }; } // namespace storage diff --git a/src/include/storage/store/var_list_column_chunk.h b/src/include/storage/store/var_list_column_chunk.h index 3790334cdb..edbb62a87b 100644 --- a/src/include/storage/store/var_list_column_chunk.h +++ b/src/include/storage/store/var_list_column_chunk.h @@ -35,8 +35,15 @@ class VarListColumnChunk final : public ColumnChunk { return varListDataColumnChunk->dataColumnChunk.get(); } + inline ColumnChunk* getSizeColumnChunk() const { return sizeColumnChunk.get(); } + void resetToEmpty() override; + inline void setNumValues(uint64_t numValues_) override { + ColumnChunk::setNumValues(numValues_); + sizeColumnChunk->setNumValues(numValues_); + } + void append(common::ValueVector* vector, common::SelectionVector& selVector) final; void lookup(common::offset_t offsetInChunk, common::ValueVector& output, @@ -56,12 +63,23 @@ class VarListColumnChunk final : public ColumnChunk { varListDataColumnChunk->resizeBuffer(numValues); } - void finalize() override; - - inline common::offset_t getListOffset(common::offset_t offset) const { - return offset == 0 ? 0 : getValue(offset - 1); + inline void resize(uint64_t newCapacity) override { + ColumnChunk::resize(newCapacity); + sizeColumnChunk->resize(newCapacity); } + common::offset_t getListStartOffset(common::offset_t offset) const; + + common::offset_t getListEndOffset(common::offset_t offset) const; + + common::list_size_t getListSize(common::offset_t offset) const; + + void resetOffset(); + void resetFromOtherChunk(VarListColumnChunk* other); + void finalize() override; + bool isOffsetsConsecutiveAndSortedAscending(uint64_t startPos, uint64_t endPos) const; + bool sanityCheck() override; + protected: void copyListValues(const common::list_entry_t& entry, common::ValueVector* dataVector); @@ -69,32 +87,13 @@ class VarListColumnChunk final : public ColumnChunk { void append(ColumnChunk* other, common::offset_t startPosInOtherChunk, uint32_t numValuesToAppend) override; - inline void initializeIndices() { - indicesColumnChunk = ColumnChunkFactory::createColumnChunk( - *common::LogicalType::INT64(), false /*enableCompression*/, capacity); - indicesColumnChunk->getNullChunk()->resetToAllNull(); - for (auto i = 0u; i < numValues; i++) { - indicesColumnChunk->setValue(i, i); - indicesColumnChunk->getNullChunk()->setNull(i, nullChunk->isNull(i)); - } - indicesColumnChunk->setNumValues(numValues); - } - inline uint64_t getListLen(common::offset_t offset) const { - return getListOffset(offset + 1) - getListOffset(offset); - } - - void resetFromOtherChunk(VarListColumnChunk* other); void appendNullList(); protected: + std::unique_ptr sizeColumnChunk; std::unique_ptr varListDataColumnChunk; - // The following is needed to write var list to random positions in the column chunk. - // We first append var list to the end of the column chunk. Then use indicesColumnChunk to track - // where each var list data is inside the column chunk. - // `needFinalize` is set to true whenever `write` is called. - // During `finalize`, the whole column chunk will be re-written according to indices. - bool needFinalize; - std::unique_ptr indicesColumnChunk; + // we use checkOffsetSortedAsc flag to indicate that we do not trigger random write + bool checkOffsetSortedAsc; }; } // namespace storage diff --git a/src/processor/operator/unwind.cpp b/src/processor/operator/unwind.cpp index cd3b60fa63..0697672b0e 100644 --- a/src/processor/operator/unwind.cpp +++ b/src/processor/operator/unwind.cpp @@ -24,7 +24,8 @@ void Unwind::copyTuplesToOutVector(uint64_t startPos, uint64_t endPos) const { bool Unwind::getNextTuplesInternal(ExecutionContext* context) { if (hasMoreToRead()) { - auto totalElementsCopy = std::min(DEFAULT_VECTOR_CAPACITY, listEntry.size - startIndex); + auto totalElementsCopy = + std::min(DEFAULT_VECTOR_CAPACITY, (uint64_t)listEntry.size - startIndex); copyTuplesToOutVector(startIndex, (totalElementsCopy + startIndex)); startIndex += totalElementsCopy; outValueVector->state->initOriginalAndSelectedSize(totalElementsCopy); @@ -42,7 +43,7 @@ bool Unwind::getNextTuplesInternal(ExecutionContext* context) { } listEntry = expressionEvaluator->resultVector->getValue(pos); startIndex = 0; - auto totalElementsCopy = std::min(DEFAULT_VECTOR_CAPACITY, listEntry.size); + auto totalElementsCopy = std::min(DEFAULT_VECTOR_CAPACITY, (uint64_t)listEntry.size); copyTuplesToOutVector(0, totalElementsCopy); startIndex += totalElementsCopy; outValueVector->state->initOriginalAndSelectedSize(startIndex); diff --git a/src/storage/stats/table_statistics_collection.cpp b/src/storage/stats/table_statistics_collection.cpp index f423066e74..965765d6f8 100644 --- a/src/storage/stats/table_statistics_collection.cpp +++ b/src/storage/stats/table_statistics_collection.cpp @@ -97,6 +97,8 @@ std::unique_ptr TablesStatistics::createMetadataDAHInfo( } } break; case PhysicalTypeID::VAR_LIST: { + metadataDAHInfo->childrenInfos.push_back( + createMetadataDAHInfo(*LogicalType::UINT32(), metadataFH, bm, wal)); metadataDAHInfo->childrenInfos.push_back( createMetadataDAHInfo(*VarListType::getChildType(&dataType), metadataFH, bm, wal)); } break; diff --git a/src/storage/store/column.cpp b/src/storage/store/column.cpp index 7d5a453f69..8d2e29d389 100644 --- a/src/storage/store/column.cpp +++ b/src/storage/store/column.cpp @@ -792,7 +792,8 @@ void Column::commitColumnChunkOutOfPlace(Transaction* transaction, node_group_id auto chunkMeta = getMetadata(nodeGroupIdx, transaction->getType()); // TODO(Guodong): Should consider caching the scanned column chunk to avoid redundant // scans in the same transaction. - auto columnChunk = getEmptyChunkForCommit(chunkMeta.numValues + dstOffsets.size()); + auto columnChunk = + getEmptyChunkForCommit(1.5 * std::bit_ceil(chunkMeta.numValues + dstOffsets.size())); scan(transaction, nodeGroupIdx, columnChunk.get()); for (auto i = 0u; i < dstOffsets.size(); i++) { columnChunk->write(chunk, srcOffset + i, dstOffsets[i], 1 /* numValues */); diff --git a/src/storage/store/column_chunk.cpp b/src/storage/store/column_chunk.cpp index ef82e5c6b1..637e33e723 100644 --- a/src/storage/store/column_chunk.cpp +++ b/src/storage/store/column_chunk.cpp @@ -223,6 +223,7 @@ void ColumnChunk::append( KU_ASSERT(nullChunk->getNumValues() == getNumValues()); nullChunk->append(other->nullChunk.get(), startPosInOtherChunk, numValuesToAppend); } + KU_ASSERT(numValues + numValuesToAppend <= capacity); memcpy(buffer.get() + numValues * numBytesPerValue, other->buffer.get() + startPosInOtherChunk * numBytesPerValue, numValuesToAppend * numBytesPerValue); diff --git a/src/storage/store/var_list_column.cpp b/src/storage/store/var_list_column.cpp index a10cf48aa1..bacf684d3b 100644 --- a/src/storage/store/var_list_column.cpp +++ b/src/storage/store/var_list_column.cpp @@ -11,13 +11,40 @@ using namespace kuzu::transaction; namespace kuzu { namespace storage { -offset_t ListOffsetInfoInStorage::getListOffset(uint64_t nodePos) const { - if (nodePos == 0) { - return prevNodeListOffset; - } else { - auto offsetVector = offsetVectors[(nodePos - 1) / DEFAULT_VECTOR_CAPACITY].get(); - return offsetVector->getValue((nodePos - 1) % DEFAULT_VECTOR_CAPACITY); +offset_t ListOffsetSizeInfo::getListStartOffset(uint64_t pos) const { + if (numTotal == 0) { + return 0; + } + return pos == numTotal ? getListEndOffset(pos - 1) : getListEndOffset(pos) - getListSize(pos); +} + +offset_t ListOffsetSizeInfo::getListEndOffset(uint64_t pos) const { + if (numTotal == 0) { + return 0; + } + KU_ASSERT(pos < offsetColumnChunk->getNumValues()); + return offsetColumnChunk->getValue(pos); +} + +list_size_t ListOffsetSizeInfo::getListSize(uint64_t pos) const { + if (numTotal == 0) { + return 0; + } + KU_ASSERT(pos < sizeColumnChunk->getNumValues()); + return sizeColumnChunk->getValue(pos); +} + +bool ListOffsetSizeInfo::isOffsetSortedAscending(uint64_t startPos, uint64_t endPos) const { + offset_t prevEndOffset = getListStartOffset(startPos); + for (auto i = startPos; i < endPos; i++) { + offset_t currentEndOffset = getListEndOffset(i); + auto size = getListSize(i); + prevEndOffset += size; + if (currentEndOffset != prevEndOffset) { + return false; + } } + return true; } VarListColumn::VarListColumn(std::string name, LogicalType dataType, @@ -26,36 +53,55 @@ VarListColumn::VarListColumn(std::string name, LogicalType dataType, RWPropertyStats propertyStatistics, bool enableCompression) : Column{name, std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal, transaction, propertyStatistics, enableCompression, true /* requireNullColumn */} { + auto sizeColName = StorageUtils::getColumnName(name, StorageUtils::ColumnType::OFFSET, ""); auto dataColName = StorageUtils::getColumnName(name, StorageUtils::ColumnType::DATA, ""); + sizeColumn = ColumnFactory::createColumn(sizeColName, *LogicalType::UINT32(), + *metaDAHeaderInfo.childrenInfos[0], dataFH, metadataFH, bufferManager, wal, transaction, + propertyStatistics, enableCompression); dataColumn = ColumnFactory::createColumn(dataColName, - *VarListType::getChildType(&this->dataType)->copy(), *metaDAHeaderInfo.childrenInfos[0], + *VarListType::getChildType(&this->dataType)->copy(), *metaDAHeaderInfo.childrenInfos[1], dataFH, metadataFH, bufferManager, wal, transaction, propertyStatistics, enableCompression); + tmpDataColumnChunk = + std::make_unique(ColumnChunkFactory::createColumnChunk( + *VarListType::getChildType(&this->dataType)->copy(), enableCompression, 0)); } void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx, offset_t startOffsetInGroup, offset_t endOffsetInGroup, ValueVector* resultVector, uint64_t offsetInVector) { - // TODO(Ziyi): the current scan function requires two dynamic allocation of vectors which may be - // a bottleneck of the scan performance. We need to further optimize this. nullColumn->scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector, offsetInVector); - auto listOffsetInfoInStorage = getListOffsetInfoInStorage( - transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector->state); + auto listOffsetInfoInStorage = + getListOffsetSizeInfo(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup); offset_t listOffsetInVector = offsetInVector == 0 ? 0 : resultVector->getValue(offsetInVector - 1).offset + resultVector->getValue(offsetInVector - 1).size; auto offsetToWriteListData = listOffsetInVector; auto numValues = endOffsetInGroup - startOffsetInGroup; + KU_ASSERT(numValues >= 0); for (auto i = 0u; i < numValues; i++) { - auto length = listOffsetInfoInStorage.getListLength(i); - resultVector->setValue(i + offsetInVector, list_entry_t{listOffsetInVector, length}); - listOffsetInVector += length; + list_size_t size = listOffsetInfoInStorage.getListSize(i); + resultVector->setValue(i + offsetInVector, list_entry_t{listOffsetInVector, size}); + listOffsetInVector += size; } ListVector::resizeDataVector(resultVector, listOffsetInVector); - dataColumn->scan(transaction, nodeGroupIdx, listOffsetInfoInStorage.getListOffset(0), - listOffsetInfoInStorage.getListOffset(numValues), ListVector::getDataVector(resultVector), - offsetToWriteListData); + auto dataVector = ListVector::getDataVector(resultVector); + bool isOffsetSortedAscending = listOffsetInfoInStorage.isOffsetSortedAscending(0, numValues); + if (isOffsetSortedAscending) { + dataColumn->scan(transaction, nodeGroupIdx, listOffsetInfoInStorage.getListStartOffset(0), + listOffsetInfoInStorage.getListStartOffset(numValues), dataVector, + offsetToWriteListData); + } else { + for (auto i = 0u; i < numValues; i++) { + offset_t startOffset = listOffsetInfoInStorage.getListStartOffset(i); + offset_t appendSize = listOffsetInfoInStorage.getListSize(i); + KU_ASSERT(appendSize >= 0); + dataColumn->scan(transaction, nodeGroupIdx, startOffset, startOffset + appendSize, + dataVector, offsetToWriteListData); + offsetToWriteListData += appendSize; + } + } } void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx, @@ -63,15 +109,47 @@ void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx if (nodeGroupIdx >= metadataDA->getNumElements(transaction->getType())) { columnChunk->setNumValues(0); } else { - Column::scan(transaction, nodeGroupIdx, columnChunk, startOffset, endOffset); - // TODO: FIX-ME. auto varListColumnChunk = ku_dynamic_cast(columnChunk); - auto startVarListOffset = varListColumnChunk->getListOffset(0); - auto endVarListOffset = varListColumnChunk->getListOffset(columnChunk->getNumValues()); - auto numElements = endVarListOffset - startVarListOffset + 1; - varListColumnChunk->resizeDataColumnChunk(std::bit_ceil(numElements)); - dataColumn->scan(transaction, nodeGroupIdx, varListColumnChunk->getDataColumnChunk(), - startVarListOffset, endVarListOffset); + Column::scan(transaction, nodeGroupIdx, columnChunk, startOffset, endOffset); + auto sizeColumnChunk = varListColumnChunk->getSizeColumnChunk(); + sizeColumn->scan(transaction, nodeGroupIdx, sizeColumnChunk, startOffset, endOffset); + auto resizeNumValues = varListColumnChunk->getDataColumnChunk()->getNumValues(); + bool isOffsetSortedAscending = true; + offset_t prevOffset = varListColumnChunk->getListStartOffset(0); + for (auto i = 0u; i < columnChunk->getNumValues(); i++) { + auto currentEndOffset = varListColumnChunk->getListEndOffset(i); + auto appendSize = varListColumnChunk->getListSize(i); + prevOffset += appendSize; + if (currentEndOffset != prevOffset) { + isOffsetSortedAscending = false; + } + resizeNumValues += appendSize; + } + if (isOffsetSortedAscending) { + varListColumnChunk->resizeDataColumnChunk(std::bit_ceil(resizeNumValues)); + offset_t startVarListOffset = varListColumnChunk->getListStartOffset(0); + offset_t endVarListOffset = + varListColumnChunk->getListStartOffset(columnChunk->getNumValues()); + dataColumn->scan(transaction, nodeGroupIdx, varListColumnChunk->getDataColumnChunk(), + startVarListOffset, endVarListOffset); + varListColumnChunk->resetOffset(); + } else { + varListColumnChunk->resizeDataColumnChunk(std::bit_ceil(resizeNumValues)); + tmpDataColumnChunk->resizeBuffer(std::bit_ceil(resizeNumValues)); + auto dataVarListColumnChunk = varListColumnChunk->getDataColumnChunk(); + for (auto i = 0u; i < columnChunk->getNumValues(); i++) { + offset_t startVarListOffset = varListColumnChunk->getListStartOffset(i); + offset_t endVarListOffset = varListColumnChunk->getListEndOffset(i); + dataColumn->scan(transaction, nodeGroupIdx, + tmpDataColumnChunk->dataColumnChunk.get(), startVarListOffset, + endVarListOffset); + KU_ASSERT(endVarListOffset - startVarListOffset == + tmpDataColumnChunk->dataColumnChunk->getNumValues()); + dataVarListColumnChunk->append(tmpDataColumnChunk->dataColumnChunk.get(), 0, + tmpDataColumnChunk->dataColumnChunk->getNumValues()); + } + varListColumnChunk->resetOffset(); + } } } @@ -82,13 +160,13 @@ void VarListColumn::scanInternal( auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(startNodeOffset); auto startNodeOffsetInGroup = startNodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); - auto listOffsetInfoInStorage = - getListOffsetInfoInStorage(transaction, nodeGroupIdx, startNodeOffsetInGroup, - startNodeOffsetInGroup + nodeIDVector->state->getOriginalSize(), resultVector->state); + KU_ASSERT(resultVector->state); + auto listOffsetSizeInfo = getListOffsetSizeInfo(transaction, nodeGroupIdx, + startNodeOffsetInGroup, startNodeOffsetInGroup + nodeIDVector->state->getOriginalSize()); if (resultVector->state->selVector->isUnfiltered()) { - scanUnfiltered(transaction, nodeGroupIdx, resultVector, listOffsetInfoInStorage); + scanUnfiltered(transaction, nodeGroupIdx, resultVector, listOffsetSizeInfo); } else { - scanFiltered(transaction, nodeGroupIdx, resultVector, listOffsetInfoInStorage); + scanFiltered(transaction, nodeGroupIdx, resultVector, listOffsetSizeInfo); } } @@ -96,68 +174,88 @@ void VarListColumn::lookupValue(Transaction* transaction, offset_t nodeOffset, ValueVector* resultVector, uint32_t posInVector) { auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); auto nodeOffsetInGroup = nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); - auto listOffset = readListOffsetInStorage(transaction, nodeGroupIdx, nodeOffsetInGroup); - auto length = readListOffsetInStorage(transaction, nodeGroupIdx, nodeOffsetInGroup + 1) - - readListOffsetInStorage(transaction, nodeGroupIdx, nodeOffsetInGroup); + auto listEndOffset = readOffset(transaction, nodeGroupIdx, nodeOffsetInGroup); + auto size = readSize(transaction, nodeGroupIdx, nodeOffsetInGroup); + auto listStartOffset = listEndOffset - size; auto offsetInVector = posInVector == 0 ? 0 : resultVector->getValue(posInVector - 1); - resultVector->setValue(posInVector, list_entry_t{offsetInVector, length}); - ListVector::resizeDataVector(resultVector, offsetInVector + length); - dataColumn->scan(transaction, StorageUtils::getNodeGroupIdx(nodeOffset), listOffset, - listOffset + length, ListVector::getDataVector(resultVector), offsetInVector); + resultVector->setValue(posInVector, list_entry_t{offsetInVector, size}); + ListVector::resizeDataVector(resultVector, offsetInVector + size); + auto dataVector = ListVector::getDataVector(resultVector); + dataColumn->scan(transaction, StorageUtils::getNodeGroupIdx(nodeOffset), listStartOffset, + listEndOffset, dataVector, offsetInVector); } void VarListColumn::append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) { KU_ASSERT(columnChunk->getDataType().getPhysicalType() == dataType.getPhysicalType()); - Column::append(columnChunk, nodeGroupIdx); - auto dataColumnChunk = - ku_dynamic_cast(columnChunk)->getDataColumnChunk(); + auto varListColumnChunk = ku_dynamic_cast(columnChunk); + Column::append(varListColumnChunk, nodeGroupIdx); + auto sizeColumnChunk = varListColumnChunk->getSizeColumnChunk(); + sizeColumn->append(sizeColumnChunk, nodeGroupIdx); + auto dataColumnChunk = varListColumnChunk->getDataColumnChunk(); dataColumn->append(dataColumnChunk, nodeGroupIdx); } void VarListColumn::scanUnfiltered(Transaction* transaction, node_group_idx_t nodeGroupIdx, - ValueVector* resultVector, const ListOffsetInfoInStorage& listOffsetInfoInStorage) { + ValueVector* resultVector, const ListOffsetSizeInfo& listOffsetInfoInStorage) { auto numValuesToScan = resultVector->state->selVector->selectedSize; offset_t offsetInVector = 0; for (auto i = 0u; i < numValuesToScan; i++) { - auto listLen = listOffsetInfoInStorage.getListLength(i); + auto listLen = listOffsetInfoInStorage.getListSize(i); resultVector->setValue(i, list_entry_t{offsetInVector, listLen}); offsetInVector += listLen; } ListVector::resizeDataVector(resultVector, offsetInVector); - auto startListOffsetInStorage = listOffsetInfoInStorage.getListOffset(0); - auto endListOffsetInStorage = listOffsetInfoInStorage.getListOffset(numValuesToScan); - dataColumn->scan(transaction, nodeGroupIdx, startListOffsetInStorage, endListOffsetInStorage, - ListVector::getDataVector(resultVector), 0 /* offsetInVector */); + auto dataVector = ListVector::getDataVector(resultVector); + offsetInVector = 0; + bool checkOffsetOrder = listOffsetInfoInStorage.isOffsetSortedAscending(0, numValuesToScan); + if (checkOffsetOrder) { + auto startListOffsetInStorage = listOffsetInfoInStorage.getListStartOffset(0); + numValuesToScan = numValuesToScan == 0 ? 0 : numValuesToScan - 1; + auto endListOffsetInStorage = listOffsetInfoInStorage.getListEndOffset(numValuesToScan); + dataColumn->scan(transaction, nodeGroupIdx, startListOffsetInStorage, + endListOffsetInStorage, dataVector, 0 /* offsetInVector */); + } else { + for (auto i = 0u; i < numValuesToScan; i++) { + auto startListOffsetInStorage = listOffsetInfoInStorage.getListStartOffset(i); + auto appendSize = listOffsetInfoInStorage.getListSize(i); + dataColumn->scan(transaction, nodeGroupIdx, startListOffsetInStorage, + startListOffsetInStorage + appendSize, dataVector, offsetInVector); + offsetInVector += appendSize; + } + } } void VarListColumn::scanFiltered(Transaction* transaction, node_group_idx_t nodeGroupIdx, - ValueVector* resultVector, const ListOffsetInfoInStorage& listOffsetInfoInStorage) { + ValueVector* resultVector, const ListOffsetSizeInfo& listOffsetSizeInfo) { offset_t listOffset = 0; for (auto i = 0u; i < resultVector->state->selVector->selectedSize; i++) { auto pos = resultVector->state->selVector->selectedPositions[i]; - auto listLen = listOffsetInfoInStorage.getListLength(pos); - resultVector->setValue(pos, list_entry_t{(offset_t)listOffset, (uint64_t)listLen}); - listOffset += listLen; + auto listSize = listOffsetSizeInfo.getListSize(pos); + resultVector->setValue(pos, list_entry_t{(offset_t)listOffset, listSize}); + listOffset += listSize; } ListVector::resizeDataVector(resultVector, listOffset); listOffset = 0; for (auto i = 0u; i < resultVector->state->selVector->selectedSize; i++) { auto pos = resultVector->state->selVector->selectedPositions[i]; - auto startOffsetInStorageToScan = listOffsetInfoInStorage.getListOffset(pos); - auto endOffsetInStorageToScan = listOffsetInfoInStorage.getListOffset(pos + 1); + auto startOffsetInStorageToScan = listOffsetSizeInfo.getListStartOffset(pos); + auto appendSize = listOffsetSizeInfo.getListSize(pos); + auto dataVector = ListVector::getDataVector(resultVector); dataColumn->scan(transaction, nodeGroupIdx, startOffsetInStorageToScan, - endOffsetInStorageToScan, ListVector::getDataVector(resultVector), listOffset); + startOffsetInStorageToScan + appendSize, dataVector, listOffset); listOffset += resultVector->getValue(pos).size; } } void VarListColumn::checkpointInMemory() { Column::checkpointInMemory(); + sizeColumn->checkpointInMemory(); dataColumn->checkpointInMemory(); } void VarListColumn::rollbackInMemory() { Column::rollbackInMemory(); + sizeColumn->rollbackInMemory(); dataColumn->rollbackInMemory(); } @@ -175,29 +273,111 @@ offset_t VarListColumn::readOffset( return value; } -ListOffsetInfoInStorage VarListColumn::getListOffsetInfoInStorage(Transaction* transaction, - node_group_idx_t nodeGroupIdx, offset_t startOffsetInNodeGroup, offset_t endOffsetInNodeGroup, - const std::shared_ptr& state) { +list_size_t VarListColumn::readSize( + Transaction* transaction, node_group_idx_t nodeGroupIdx, offset_t offsetInNodeGroup) { + auto chunkMeta = sizeColumn->getMetadataDA()->get(nodeGroupIdx, transaction->getType()); + auto pageCursor = PageUtils::getPageCursorForPos(offsetInNodeGroup, + chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType)); + pageCursor.pageIdx += chunkMeta.pageIdx; + offset_t value; + readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void { + readToPageFunc(frame, pageCursor, (uint8_t*)&value, 0 /* posInVector */, + 1 /* numValuesToRead */, chunkMeta.compMeta); + }); + return value; +} + +ListOffsetSizeInfo VarListColumn::getListOffsetSizeInfo(Transaction* transaction, + node_group_idx_t nodeGroupIdx, offset_t startOffsetInNodeGroup, offset_t endOffsetInNodeGroup) { auto numOffsetsToRead = endOffsetInNodeGroup - startOffsetInNodeGroup; - auto numOffsetVectors = numOffsetsToRead / DEFAULT_VECTOR_CAPACITY + - (numOffsetsToRead % DEFAULT_VECTOR_CAPACITY ? 1 : 0); - std::vector> offsetVectors; - offsetVectors.reserve(numOffsetVectors); - uint64_t numOffsetsRead = 0; - for (auto i = 0u; i < numOffsetVectors; i++) { - auto offsetVector = std::make_unique(LogicalTypeID::INT64); - auto numOffsetsToReadInCurBatch = - std::min(numOffsetsToRead - numOffsetsRead, DEFAULT_VECTOR_CAPACITY); - offsetVector->setState(state); - Column::scan(transaction, nodeGroupIdx, startOffsetInNodeGroup + numOffsetsRead, - startOffsetInNodeGroup + numOffsetsRead + numOffsetsToReadInCurBatch, - offsetVector.get(), 0 /* offsetInVector */); - offsetVectors.push_back(std::move(offsetVector)); - numOffsetsRead += numOffsetsToReadInCurBatch; + auto offsetColumnChunk = ColumnChunkFactory::createColumnChunk( + *common::LogicalType::INT64(), enableCompression, numOffsetsToRead); + auto sizeColumnChunk = ColumnChunkFactory::createColumnChunk( + *common::LogicalType::UINT32(), enableCompression, numOffsetsToRead); + Column::scan(transaction, nodeGroupIdx, offsetColumnChunk.get(), startOffsetInNodeGroup, + endOffsetInNodeGroup); + sizeColumn->scan(transaction, nodeGroupIdx, sizeColumnChunk.get(), startOffsetInNodeGroup, + endOffsetInNodeGroup); + return {numOffsetsToRead, std::move(offsetColumnChunk), std::move(sizeColumnChunk)}; +} + +void VarListColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_t nodeGroupIdx, + const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo, + const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo, + const offset_set_t& deleteInfo) { + auto currentNumNodeGroups = metadataDA->getNumElements(transaction->getType()); + auto isNewNodeGroup = nodeGroupIdx >= currentNumNodeGroups; + if (isNewNodeGroup) { + commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, localInsertChunks, + insertInfo, localUpdateChunks, updateInfo, deleteInfo); + } else { + auto columnChunk = getEmptyChunkForCommit(updateInfo.size() + insertInfo.size()); + std::vector dstOffsets; + for (auto& [offsetInDstChunk, rowIdx] : updateInfo) { + auto [chunkIdx, offsetInLocalChunk] = + LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); + auto localUpdateChunk = localUpdateChunks[chunkIdx]; + dstOffsets.push_back(offsetInDstChunk); + columnChunk->append(localUpdateChunk, offsetInLocalChunk, 1); + } + for (auto& [offsetInDstChunk, rowIdx] : insertInfo) { + auto [chunkIdx, offsetInLocalChunk] = + LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); + auto localInsertChunk = localInsertChunks[chunkIdx]; + dstOffsets.push_back(offsetInDstChunk); + columnChunk->append(localInsertChunk, offsetInLocalChunk, 1); + } + prepareCommitForChunk(transaction, nodeGroupIdx, dstOffsets, columnChunk.get(), 0); + } +} + +void VarListColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_t nodeGroupIdx, + const std::vector& dstOffsets, ColumnChunk* chunk, offset_t startSrcOffset) { + auto currentNumNodeGroups = metadataDA->getNumElements(transaction->getType()); + auto isNewNodeGroup = nodeGroupIdx >= currentNumNodeGroups; + if (isNewNodeGroup) { + commitColumnChunkOutOfPlace( + transaction, nodeGroupIdx, isNewNodeGroup, dstOffsets, chunk, startSrcOffset); + } else { + // we separate the commit into three parts: offset chunk commit, size column chunk commit, + // data column chunk + auto varListChunk = ku_dynamic_cast(chunk); + sizeColumn->prepareCommitForChunk(transaction, nodeGroupIdx, dstOffsets, + varListChunk->getSizeColumnChunk(), startSrcOffset); + auto dataColumnSize = + dataColumn->getMetadata(nodeGroupIdx, transaction->getType()).numValues; + auto dataColumnChunk = varListChunk->getDataColumnChunk(); + auto numListsToAppend = std::min(chunk->getNumValues(), (uint64_t)dstOffsets.size()); + auto dataSize = 0u; + auto startVarListOffset = varListChunk->getListStartOffset(startSrcOffset); + std::vector dstOffsetsInDataColumn; + for (auto i = 0u; i < numListsToAppend; i++) { + for (auto j = 0u; j < varListChunk->getListSize(startSrcOffset + i); j++) { + dstOffsetsInDataColumn.push_back(dataColumnSize + dataSize++); + } + } + dataColumn->prepareCommitForChunk( + transaction, nodeGroupIdx, dstOffsetsInDataColumn, dataColumnChunk, startVarListOffset); + // we need to update the offset since we do not do in-place list data update but append data + // in the end of list data column we need to plus to original data column size to get the + // new offset + // TODO(Jiamin): A better way is to store the offset in a offset column, just like size + // column. Then we can reuse prepareCommitForChunk interface for offset column. + auto offsetChunkMeta = getMetadata(nodeGroupIdx, transaction->getType()); + auto offsetColumnChunk = ColumnChunkFactory::createColumnChunk(*dataType.copy(), + enableCompression, 1.5 * std::bit_ceil(offsetChunkMeta.numValues + dstOffsets.size())); + Column::scan(transaction, nodeGroupIdx, offsetColumnChunk.get()); + for (auto i = 0u; i < numListsToAppend; i++) { + auto listEndOffset = varListChunk->getListEndOffset(startSrcOffset + i); + auto isNull = varListChunk->getNullChunk()->isNull(startSrcOffset + i); + offsetColumnChunk->setValue(dataColumnSize + listEndOffset, dstOffsets[i]); + offsetColumnChunk->getNullChunk()->setNull(dstOffsets[i], isNull); + } + auto offsetListChunk = + ku_dynamic_cast(offsetColumnChunk.get()); + offsetListChunk->getSizeColumnChunk()->setNumValues(offsetColumnChunk->getNumValues()); + Column::append(offsetColumnChunk.get(), nodeGroupIdx); } - auto prevNodeListOffsetInStorage = - readListOffsetInStorage(transaction, nodeGroupIdx, startOffsetInNodeGroup); - return {prevNodeListOffsetInStorage, std::move(offsetVectors)}; } } // namespace storage diff --git a/src/storage/store/var_list_column_chunk.cpp b/src/storage/store/var_list_column_chunk.cpp index 597dca2c94..527efc28ab 100644 --- a/src/storage/store/var_list_column_chunk.cpp +++ b/src/storage/store/var_list_column_chunk.cpp @@ -27,35 +27,78 @@ void VarListDataColumnChunk::resizeBuffer(uint64_t numValues) { VarListColumnChunk::VarListColumnChunk( LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory) - : ColumnChunk{std::move(dataType), capacity, enableCompression, true /* hasNullChunk */}, - needFinalize{false} { + : ColumnChunk{std::move(dataType), capacity, enableCompression, true /* hasNullChunk */} { + sizeColumnChunk = ColumnChunkFactory::createColumnChunk( + *common::LogicalType::UINT32(), enableCompression, capacity); varListDataColumnChunk = std::make_unique( ColumnChunkFactory::createColumnChunk(*VarListType::getChildType(&this->dataType)->copy(), enableCompression, 0 /* capacity */, inMemory)); + checkOffsetSortedAsc = false; KU_ASSERT(this->dataType.getPhysicalType() == PhysicalTypeID::VAR_LIST); } +bool VarListColumnChunk::isOffsetsConsecutiveAndSortedAscending( + uint64_t startPos, uint64_t endPos) const { + offset_t prevEndOffset = getListStartOffset(startPos); + for (auto i = startPos; i < endPos; i++) { + offset_t currentEndOffset = getListEndOffset(i); + auto size = getListSize(i); + prevEndOffset += size; + if (currentEndOffset != prevEndOffset) { + return false; + } + } + return true; +} + +offset_t VarListColumnChunk::getListStartOffset(offset_t offset) const { + if (numValues == 0) + return 0; + return offset == numValues ? getListEndOffset(offset - 1) : + getListEndOffset(offset) - getListSize(offset); +} + +offset_t VarListColumnChunk::getListEndOffset(offset_t offset) const { + if (numValues == 0) + return 0; + KU_ASSERT(offset < numValues); + return getValue(offset); +} + +list_size_t VarListColumnChunk::getListSize(common::offset_t offset) const { + if (numValues == 0) + return 0; + KU_ASSERT(offset < sizeColumnChunk->getNumValues()); + return sizeColumnChunk->getValue(offset); +} + void VarListColumnChunk::append( ColumnChunk* other, offset_t startPosInOtherChunk, uint32_t numValuesToAppend) { - nullChunk->append(other->getNullChunk(), startPosInOtherChunk, numValuesToAppend); + checkOffsetSortedAsc = true; auto otherListChunk = ku_dynamic_cast(other); - auto offsetInDataChunkToAppend = varListDataColumnChunk->getNumValues(); - KU_ASSERT(numValues + numValuesToAppend <= capacity); + nullChunk->append(other->getNullChunk(), startPosInOtherChunk, numValuesToAppend); + sizeColumnChunk->getNullChunk()->append( + other->getNullChunk(), startPosInOtherChunk, numValuesToAppend); + offset_t offsetInDataChunkToAppend = varListDataColumnChunk->getNumValues(); for (auto i = 0u; i < numValuesToAppend; i++) { - offsetInDataChunkToAppend += otherListChunk->getListLen(startPosInOtherChunk + i); - setValue(offsetInDataChunkToAppend, numValues); + auto appendSize = otherListChunk->getListSize(startPosInOtherChunk + i); + sizeColumnChunk->setValue(appendSize, numValues); + offsetInDataChunkToAppend += appendSize; + setValue(offsetInDataChunkToAppend, numValues); } - auto startOffset = otherListChunk->getListOffset(startPosInOtherChunk); - auto endOffset = otherListChunk->getListOffset(startPosInOtherChunk + numValuesToAppend); - KU_ASSERT(endOffset >= startOffset); varListDataColumnChunk->resizeBuffer(offsetInDataChunkToAppend); - varListDataColumnChunk->dataColumnChunk->append( - otherListChunk->varListDataColumnChunk->dataColumnChunk.get(), startOffset, - endOffset - startOffset); + for (auto i = 0u; i < numValuesToAppend; i++) { + auto startOffset = otherListChunk->getListStartOffset(startPosInOtherChunk + i); + auto appendSize = otherListChunk->getListSize(startPosInOtherChunk + i); + varListDataColumnChunk->dataColumnChunk->append( + otherListChunk->varListDataColumnChunk->dataColumnChunk.get(), startOffset, appendSize); + } + sanityCheck(); } void VarListColumnChunk::resetToEmpty() { ColumnChunk::resetToEmpty(); + sizeColumnChunk->resetToEmpty(); varListDataColumnChunk = std::make_unique( ColumnChunkFactory::createColumnChunk(*VarListType::getChildType(&this->dataType)->copy(), enableCompression, 0 /* capacity */)); @@ -70,11 +113,13 @@ void VarListColumnChunk::append(ValueVector* vector, SelectionVector& selVector) if (capacity < newCapacity) { resize(newCapacity); } - auto nextListOffsetInChunk = getListOffset(numValues); + offset_t nextListOffsetInChunk = varListDataColumnChunk->getNumValues(); auto offsetBufferToWrite = (offset_t*)(buffer.get()); for (auto i = 0u; i < selVector.selectedSize; i++) { auto pos = selVector.selectedPositions[i]; - uint64_t listLen = vector->isNull(pos) ? 0 : vector->getValue(pos).size; + auto listLen = vector->isNull(pos) ? 0 : vector->getValue(pos).size; + sizeColumnChunk->setValue(listLen, numValues + i); + sizeColumnChunk->getNullChunk()->setNull(numValues + i, vector->isNull(pos)); nullChunk->setNull(numValues + i, vector->isNull(pos)); nextListOffsetInChunk += listLen; offsetBufferToWrite[numValues + i] = nextListOffsetInChunk; @@ -91,11 +136,14 @@ void VarListColumnChunk::append(ValueVector* vector, SelectionVector& selVector) copyListValues(vector->getValue(pos), dataVector); } numValues += numToAppend; + sanityCheck(); } void VarListColumnChunk::appendNullList() { - auto nextListOffsetInChunk = getListOffset(numValues); + offset_t nextListOffsetInChunk = varListDataColumnChunk->getNumValues(); auto offsetBufferToWrite = (offset_t*)(buffer.get()); + sizeColumnChunk->setValue(0, numValues); + sizeColumnChunk->getNullChunk()->setNull(numValues, true); offsetBufferToWrite[numValues] = nextListOffsetInChunk; nullChunk->setNull(numValues, true); numValues++; @@ -109,83 +157,102 @@ void VarListColumnChunk::lookup( return; } auto startOffset = offsetInChunk == 0 ? 0 : getValue(offsetInChunk - 1); - auto endOffset = getValue(offsetInChunk); - auto listLen = endOffset - startOffset; + auto listSize = getListSize(offsetInChunk); auto dataVector = ListVector::getDataVector(&output); auto currentListDataSize = ListVector::getDataVectorSize(&output); - ListVector::resizeDataVector(&output, currentListDataSize + listLen); + ListVector::resizeDataVector(&output, currentListDataSize + listSize); // TODO(Guodong): Should add `scan` interface and use `scan` here. - for (auto i = 0u; i < listLen; i++) { + for (auto i = 0u; i < listSize; i++) { varListDataColumnChunk->dataColumnChunk->lookup(startOffset + i, *dataVector, i); } } void VarListColumnChunk::write( ColumnChunk* chunk, ColumnChunk* dstOffsets, RelMultiplicity /*multiplicity*/) { - KU_ASSERT(dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID); - needFinalize = true; - if (!indicesColumnChunk) { - initializeIndices(); - } KU_ASSERT(chunk->getDataType().getPhysicalType() == dataType.getPhysicalType() && + dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID && chunk->getNumValues() == dstOffsets->getNumValues()); - auto currentIndex = numValues; - append(chunk, 0, chunk->getNumValues()); + checkOffsetSortedAsc = true; + offset_t currentIndex = varListDataColumnChunk->getNumValues(); + auto otherListChunk = ku_dynamic_cast(chunk); + varListDataColumnChunk->resizeBuffer(varListDataColumnChunk->getNumValues() + + otherListChunk->varListDataColumnChunk->getNumValues()); + varListDataColumnChunk->dataColumnChunk->append( + otherListChunk->varListDataColumnChunk->dataColumnChunk.get(), 0, + otherListChunk->varListDataColumnChunk->getNumValues()); + offset_t maxDstOffset = 0; for (auto i = 0u; i < dstOffsets->getNumValues(); i++) { auto posInChunk = dstOffsets->getValue(i); - KU_ASSERT(posInChunk < capacity); - indicesColumnChunk->setValue(currentIndex++, posInChunk); - indicesColumnChunk->getNullChunk()->setNull(posInChunk, false); - if (indicesColumnChunk->getNumValues() <= posInChunk) { - indicesColumnChunk->setNumValues(posInChunk + 1); + if (posInChunk > maxDstOffset) { + maxDstOffset = posInChunk; } } - KU_ASSERT(currentIndex == numValues && - indicesColumnChunk->getNumValues() <= indicesColumnChunk->getCapacity()); + while (maxDstOffset >= numValues) { + appendNullList(); + } + for (auto i = 0u; i < dstOffsets->getNumValues(); i++) { + auto posInChunk = dstOffsets->getValue(i); + auto appendSize = otherListChunk->getListSize(i); + currentIndex += appendSize; + setValue(currentIndex, posInChunk); + nullChunk->setNull(posInChunk, otherListChunk->nullChunk->isNull(i)); + sizeColumnChunk->setValue(appendSize, posInChunk); + sizeColumnChunk->getNullChunk()->setNull(posInChunk, otherListChunk->nullChunk->isNull(i)); + } + sanityCheck(); } void VarListColumnChunk::write( ValueVector* vector, offset_t offsetInVector, offset_t offsetInChunk) { - needFinalize = true; - if (!indicesColumnChunk) { - initializeIndices(); + checkOffsetSortedAsc = true; + auto selVector = std::make_unique(1); + selVector->resetSelectorToValuePosBuffer(); + selVector->selectedPositions[0] = offsetInVector; + auto appendSize = + vector->isNull(offsetInVector) ? 0 : vector->getValue(offsetInVector).size; + varListDataColumnChunk->resizeBuffer(varListDataColumnChunk->getNumValues() + appendSize); + auto dataVector = ListVector::getDataVector(vector); + dataVector->setState(std::make_unique()); + dataVector->state->selVector->resetSelectorToValuePosBuffer(); + copyListValues(vector->getValue(offsetInVector), dataVector); + while (offsetInChunk >= numValues) { + appendNullList(); } - auto currentIndex = numValues; - append(vector, *vector->state->selVector); - KU_ASSERT(offsetInChunk < capacity); - indicesColumnChunk->setValue(currentIndex, offsetInChunk); - indicesColumnChunk->getNullChunk()->setNull(offsetInChunk, vector->isNull(offsetInVector)); - if (indicesColumnChunk->getNumValues() <= offsetInChunk) { - indicesColumnChunk->setNumValues(offsetInChunk + 1); + auto isNull = vector->isNull(offsetInVector); + nullChunk->setNull(offsetInChunk, isNull); + sizeColumnChunk->getNullChunk()->setNull(offsetInChunk, isNull); + if (!isNull) { + sizeColumnChunk->setValue(appendSize, offsetInChunk); + setValue(varListDataColumnChunk->getNumValues(), offsetInChunk); } + sanityCheck(); } void VarListColumnChunk::write(ColumnChunk* srcChunk, offset_t srcOffsetInChunk, offset_t dstOffsetInChunk, offset_t numValuesToCopy) { KU_ASSERT(srcChunk->getDataType().getPhysicalType() == PhysicalTypeID::VAR_LIST); - needFinalize = true; - if (!indicesColumnChunk) { - initializeIndices(); - } - nullChunk->write(srcChunk->getNullChunk(), srcOffsetInChunk, dstOffsetInChunk, numValuesToCopy); + checkOffsetSortedAsc = true; auto srcListChunk = ku_dynamic_cast(srcChunk); auto offsetInDataChunkToAppend = varListDataColumnChunk->getNumValues(); - auto currentIndex = numValues; for (auto i = 0u; i < numValuesToCopy; i++) { - offsetInDataChunkToAppend += srcListChunk->getListLen(srcOffsetInChunk + i); - setValue(offsetInDataChunkToAppend, currentIndex + i); - indicesColumnChunk->setValue(currentIndex + i, dstOffsetInChunk + i); - indicesColumnChunk->getNullChunk()->setNull(dstOffsetInChunk + i, false); + auto appendSize = srcListChunk->getListSize(srcOffsetInChunk + i); + offsetInDataChunkToAppend += appendSize; + sizeColumnChunk->setValue(appendSize, dstOffsetInChunk + i); + setValue(offsetInDataChunkToAppend, dstOffsetInChunk + i); + nullChunk->setNull( + dstOffsetInChunk + i, srcListChunk->nullChunk->isNull(srcOffsetInChunk + i)); + sizeColumnChunk->getNullChunk()->setNull( + dstOffsetInChunk + i, srcListChunk->nullChunk->isNull(srcOffsetInChunk + i)); } varListDataColumnChunk->resizeBuffer(offsetInDataChunkToAppend); - auto startOffsetInSrcChunk = srcListChunk->getListOffset(srcOffsetInChunk); - auto endOffsetInSrcChunk = srcListChunk->getListOffset(srcOffsetInChunk + numValuesToCopy); - varListDataColumnChunk->dataColumnChunk->append( - srcListChunk->varListDataColumnChunk->dataColumnChunk.get(), startOffsetInSrcChunk, - endOffsetInSrcChunk - startOffsetInSrcChunk); - if (indicesColumnChunk->getNumValues() < dstOffsetInChunk + numValuesToCopy) { - indicesColumnChunk->setNumValues(dstOffsetInChunk + numValuesToCopy + 1); + for (auto i = 0u; i < numValuesToCopy; i++) { + auto startOffsetInSrcChunk = srcListChunk->getListStartOffset(srcOffsetInChunk + i); + auto appendSize = srcListChunk->getListSize(srcOffsetInChunk + i); + varListDataColumnChunk->dataColumnChunk->append( + srcListChunk->varListDataColumnChunk->dataColumnChunk.get(), startOffsetInSrcChunk, + appendSize); } + sanityCheck(); } void VarListColumnChunk::copy(ColumnChunk* srcChunk, offset_t srcOffsetInChunk, @@ -214,35 +281,75 @@ void VarListColumnChunk::copyListValues(const list_entry_t& entry, ValueVector* } } -void VarListColumnChunk::finalize() { - if (!needFinalize) { - return; +void VarListColumnChunk::resetOffset() { + offset_t nextListOffsetReset = 0; + for (auto i = 0u; i < numValues; i++) { + auto listSize = getListSize(i); + nextListOffsetReset += uint64_t(listSize); + setValue(nextListOffsetReset, i); + sizeColumnChunk->setValue(listSize, i); } +} + +void VarListColumnChunk::finalize() { + // rewrite the column chunk for better scanning performance auto newColumnChunk = ColumnChunkFactory::createColumnChunk( std::move(*dataType.copy()), enableCompression, capacity); - auto totalListLen = getListOffset(numValues) + 1; + uint64_t totalListLen = varListDataColumnChunk->getNumValues(); + uint64_t resizeThreshold = varListDataColumnChunk->capacity / 2; + // if the list is not very long, we do not need to rewrite + if (totalListLen < resizeThreshold) { + return; + } + // if we do not trigger random write, we do not need to rewrite + if (!checkOffsetSortedAsc) { + return; + } + // if the list is in ascending order, we do not need to rewrite + if (isOffsetsConsecutiveAndSortedAscending(0, numValues)) { + return; + } auto newVarListChunk = ku_dynamic_cast(newColumnChunk.get()); + newVarListChunk->resize(numValues); newVarListChunk->getDataColumnChunk()->resize(totalListLen); - for (auto i = 0u; i < indicesColumnChunk->getNumValues(); i++) { - if (indicesColumnChunk->getNullChunk()->isNull(i)) { + auto dataColumnChunk = newVarListChunk->getDataColumnChunk(); + newVarListChunk->varListDataColumnChunk->resizeBuffer(totalListLen); + offset_t offsetInChunk = 0; + offset_t currentIndex = 0; + for (auto i = 0u; i < numValues; i++) { + if (nullChunk->isNull(i)) { newVarListChunk->appendNullList(); } else { - auto index = indicesColumnChunk->getValue(i); - newColumnChunk->append(this, index, 1); + auto startOffset = getListStartOffset(i); + auto listSize = getListSize(i); + dataColumnChunk->append( + varListDataColumnChunk->dataColumnChunk.get(), startOffset, listSize); + offsetInChunk += listSize; + newVarListChunk->getNullChunk()->setNull(currentIndex, false); + newVarListChunk->sizeColumnChunk->getNullChunk()->setNull(currentIndex, false); + newVarListChunk->sizeColumnChunk->setValue(listSize, currentIndex); + newVarListChunk->setValue(offsetInChunk, currentIndex); } + currentIndex++; } + newVarListChunk->sanityCheck(); // Move offsets, null, data from newVarListChunk to this column chunk. And release indices. resetFromOtherChunk(newVarListChunk); } - void VarListColumnChunk::resetFromOtherChunk(VarListColumnChunk* other) { buffer = std::move(other->buffer); nullChunk = std::move(other->nullChunk); + sizeColumnChunk = std::move(other->sizeColumnChunk); varListDataColumnChunk = std::move(other->varListDataColumnChunk); numValues = other->numValues; - // Reset indices and needFinalize. - indicesColumnChunk.reset(); - needFinalize = false; + checkOffsetSortedAsc = false; +} + +bool VarListColumnChunk::sanityCheck() { + KU_ASSERT(ColumnChunk::sanityCheck()); + KU_ASSERT(sizeColumnChunk->sanityCheck()); + KU_ASSERT(getDataColumnChunk()->sanityCheck()); + return sizeColumnChunk->getNumValues() == numValues; } } // namespace storage diff --git a/test/test_files/issue/issue4.test b/test/test_files/issue/issue4.test new file mode 100644 index 0000000000..f27f8e8a9b --- /dev/null +++ b/test/test_files/issue/issue4.test @@ -0,0 +1,25 @@ +-GROUP IssueTest +-DATASET CSV empty + +-- + +-CASE 3083 +-STATEMENT CREATE NODE TABLE test ( prop0 STRING, prop1 STRING[], prop2 STRING, prop3 INT64, prop4 STRING, PRIMARY KEY(prop4) ) +---- ok +-STATEMENT MERGE (n:test { prop4: "efwb2143d10ccfw" }) SET n.prop0 = "efwoj23", n.prop1 = ["eee", "wefwhiihifwe23343", "dmkwlenfwef232323"], n.prop2 = "NOT eee IS NULL AND dmkwlenfwef232323 < '2023-01-10T00:00:00-05:00' AND dmkwlenfwef232323 >= '2022-01-01T00:00:00-05:00'", n.prop3 = 5 RETURN n.prop4 +---- ok +-STATEMENT MERGE (n:test { prop4: "sdnweh2382933228" }) SET n.prop0 = "efwoj23", n.prop1 = ["customer_name", "wefwhiihifwe23343", "dmkwlenfwef232323"], n.prop2 = "NOT customer_name IS NULL AND NOT dmkwlenfwef232323 IS NULL" RETURN n.prop4 +---- ok +-STATEMENT MERGE (n:test { prop4: "sdnjb232*23ksfew" }) SET n.prop0 = "fw", n.prop1 = ["wefwhiihifwe23343", "dmkwlenfwef232323"], n.prop2 = "wefwhiihifwe23343 > 5" RETURN n.prop4 +---- ok +-STATEMENT MERGE (n:test { prop4: "dsnfjwne*&232" }) SET n.prop0 = "tweee" RETURN n.prop4 +---- ok +-STATEMENT MERGE (n:test { prop4: "nsdwew*232njfds^" }) SET n.prop0 = "fw" RETURN n.prop4 +---- ok +-STATEMENT MERGE (n:test { prop4: "nsdwew*232njfds^" }) SET n.prop0 = "fw" RETURN n.prop4 +---- ok +-STATEMENT MERGE (n:test { prop4: "fwsdmwfnw&" }) SET n.prop0 = "sds" RETURN n.prop4 +---- ok +-STATEMENT MATCH (n:test) WHERE n.prop4 = "efwb2143d10ccfw" RETURN n.prop1 +---- 1 +[eee,wefwhiihifwe23343,dmkwlenfwef232323]