diff --git a/src/include/storage/store/column.h b/src/include/storage/store/column.h index a1f23cd0bbb..726f4bcaf05 100644 --- a/src/include/storage/store/column.h +++ b/src/include/storage/store/column.h @@ -32,6 +32,7 @@ class Column { friend class StringColumn; friend class VarListLocalColumn; friend class StructColumn; + friend class VarListColumn; public: struct ReadState { diff --git a/src/include/storage/store/var_list_column.h b/src/include/storage/store/var_list_column.h index 2c7b75a386c..4de9e3b183c 100644 --- a/src/include/storage/store/var_list_column.h +++ b/src/include/storage/store/var_list_column.h @@ -3,24 +3,24 @@ #include "column.h" #include "var_list_column_chunk.h" -// List is a nested data type which is stored as two chunks: +// List is a nested data type which is stored as three chunks: // 1. Offset column (type: INT64). Using offset to partition the data column into multiple lists. -// 2. Data column. Stores the actual data of the list. +// 2. Size column. Stores the size of each list. +// 3. Data column. Stores the actual data of the list. // Similar to other data types, nulls are stored in the null column. // Example layout for list of INT64: // Four lists: [4,7,8,12], null, [2, 3], [] // Offset column: [4, 4, 6, 6] +// Size column: [4, 0, 2, 0] // data column: [4, 7, 8, 12, 2, 3] -// When reading the data, we firstly read the offset column and utilize the offset column to find -// the data column partitions. -// 1st list(offset 4): Since it is the first element, the start offset is a constant 0, and the end -// offset is 4. Its data is stored at position 0-4 in the data column. -// 2nd list(offset 4): By reading the null column, we know that the 2nd list is null. So we don't -// need to read from the data column. -// 3rd list(offset 6): The start offset is 4(by looking up the offset for the 2nd list), and the end -// offset is 6. Its data is stored at position 4-6 in the data column. -// 4th list(offset 6): The start offset is 6(by looking up the offset for the 3rd list), and the end -// offset is 6. Its data is stored at position 6-6 in the data column (empty list). +// When updating the data, we first append the data to the data column, and then update the offset +// and size. For example, we want to update [4,7,8,12] to [1,3,4]. We first append [1,3,4] to the +// data column, and then update the offset and size. The layout after the update: Offset column: [9, +// 4, 6, 6] Size column: [3, 0, 3, 0] data column: [4, 7, 8, 12, 2, 3, 1, 3, 4] This design is good +// for writing performance. But it is bad for scan performance since it may cause random access to +// the data column. Worse, the data column will be increased every update/insert. To balance the +// write performance and read performance, we rewrite the whole var list column in ascending order +// when the size of data column is larger than a threshold(Currently we use capacity/2). namespace kuzu { namespace storage { @@ -77,20 +77,6 @@ class VarListColumn : public Column { void scanFiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, common::ValueVector* offsetVector, const ListOffsetSizeInfo& listOffsetInfoInStorage); - inline bool canCommitInPlace(transaction::Transaction*, common::node_group_idx_t, - const ChunkCollection&, const offset_to_row_idx_t&, const ChunkCollection&, - const offset_to_row_idx_t&) override { - // Always perform out-of-place commit for VAR_LIST columns. - return false; - } - inline bool canCommitInPlace(transaction::Transaction* /*transaction*/, - common::node_group_idx_t /*nodeGroupIdx*/, - const std::vector& /*dstOffsets*/, ColumnChunk* /*chunk*/, - common::offset_t /*startOffset*/) override { - // Always perform out-of-place commit for VAR_LIST columns. - return false; - } - void checkpointInMemory() final; void rollbackInMemory() final; @@ -104,6 +90,14 @@ class VarListColumn : public Column { common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInNodeGroup, common::offset_t endOffsetInNodeGroup); + void prepareCommitForChunk(transaction::Transaction* transaction, + common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks, + const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, + const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) override; + void prepareCommitForChunk(transaction::Transaction* transaction, + common::node_group_idx_t nodeGroupIdx, const std::vector& dstOffsets, + ColumnChunk* chunk, common::offset_t startSrcOffset) override; + private: std::unique_ptr sizeColumn; std::unique_ptr dataColumn; diff --git a/src/include/storage/store/var_list_column_chunk.h b/src/include/storage/store/var_list_column_chunk.h index 933a700bd75..e95344f46c8 100644 --- a/src/include/storage/store/var_list_column_chunk.h +++ b/src/include/storage/store/var_list_column_chunk.h @@ -90,6 +90,9 @@ class VarListColumnChunk final : public ColumnChunk { } void resetOffset(); + void resetFromOtherChunk(VarListColumnChunk* other); + void finalize() override; + bool isOffsetSortedAscending(uint64_t startPos, uint64_t endPos) const; protected: void copyListValues(const common::list_entry_t& entry, common::ValueVector* dataVector); @@ -103,6 +106,7 @@ class VarListColumnChunk final : public ColumnChunk { protected: std::unique_ptr sizeColumnChunk; std::unique_ptr varListDataColumnChunk; + bool checkOrder; }; } // namespace storage diff --git a/src/storage/store/column.cpp b/src/storage/store/column.cpp index b5587f7f4be..6d24edb1dfa 100644 --- a/src/storage/store/column.cpp +++ b/src/storage/store/column.cpp @@ -792,7 +792,8 @@ void Column::commitColumnChunkOutOfPlace(Transaction* transaction, node_group_id auto chunkMeta = getMetadata(nodeGroupIdx, transaction->getType()); // TODO(Guodong): Should consider caching the scanned column chunk to avoid redundant // scans in the same transaction. - auto columnChunk = getEmptyChunkForCommit(chunkMeta.numValues + dstOffsets.size()); + auto columnChunk = + getEmptyChunkForCommit(1.5 * std::bit_ceil(chunkMeta.numValues + dstOffsets.size())); scan(transaction, nodeGroupIdx, columnChunk.get()); for (auto i = 0u; i < dstOffsets.size(); i++) { columnChunk->write(chunk, srcOffset + i, dstOffsets[i], 1 /* numValues */); diff --git a/src/storage/store/var_list_column.cpp b/src/storage/store/var_list_column.cpp index 283df9a0a09..10fd7f085a4 100644 --- a/src/storage/store/var_list_column.cpp +++ b/src/storage/store/var_list_column.cpp @@ -303,5 +303,83 @@ ListOffsetSizeInfo VarListColumn::getListOffsetSizeInfo(Transaction* transaction return {numOffsetsToRead, std::move(offsetColumnChunk), std::move(sizeColumnChunk)}; } +void VarListColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_t nodeGroupIdx, + const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo, + const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo, + const offset_set_t& deleteInfo) { + auto currentNumNodeGroups = metadataDA->getNumElements(transaction->getType()); + auto isNewNodeGroup = nodeGroupIdx >= currentNumNodeGroups; + if (isNewNodeGroup) { + commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, localInsertChunks, + insertInfo, localUpdateChunks, updateInfo, deleteInfo); + } else { + auto columnChunk = getEmptyChunkForCommit(common::StorageConstants::NODE_GROUP_SIZE); + std::vector dstOffsets; + for (auto& [offsetInDstChunk, rowIdx] : updateInfo) { + auto [chunkIdx, offsetInLocalChunk] = + LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); + auto localUpdateChunk = localUpdateChunks[chunkIdx]; + dstOffsets.push_back(offsetInDstChunk); + columnChunk->append(localUpdateChunk, offsetInLocalChunk, 1); + } + for (auto& [offsetInDstChunk, rowIdx] : insertInfo) { + auto [chunkIdx, offsetInLocalChunk] = + LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); + auto localInsertChunk = localInsertChunks[chunkIdx]; + dstOffsets.push_back(offsetInDstChunk); + columnChunk->append(localInsertChunk, offsetInLocalChunk, 1); + } + prepareCommitForChunk(transaction, nodeGroupIdx, dstOffsets, columnChunk.get(), 0); + } +} + +void VarListColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_t nodeGroupIdx, + const std::vector& dstOffsets, ColumnChunk* chunk, offset_t startSrcOffset) { + auto currentNumNodeGroups = metadataDA->getNumElements(transaction->getType()); + auto isNewNodeGroup = nodeGroupIdx >= currentNumNodeGroups; + if (isNewNodeGroup) { + commitColumnChunkOutOfPlace( + transaction, nodeGroupIdx, isNewNodeGroup, dstOffsets, chunk, startSrcOffset); + } else { + // we separate the commit into three parts: offset chunk commit, size column chunk commit, + // data column chunk + auto varListChunk = ku_dynamic_cast(chunk); + sizeColumn->prepareCommitForChunk(transaction, nodeGroupIdx, dstOffsets, + varListChunk->getSizeColumnChunk(), startSrcOffset); + auto dataColumnSize = + dataColumn->getMetadata(nodeGroupIdx, transaction->getType()).numValues; + auto dataColumnChunk = varListChunk->getDataColumnChunk(); + auto appendListNum = std::min(chunk->getNumValues(), (uint64_t)dstOffsets.size()); + auto appendListDataNum = 0u; + auto startVarListOffset = varListChunk->getListStartOffset(startSrcOffset); + std::vector dstOffsetsInDataColumn; + for (auto i = 0u; i < appendListNum; i++) { + auto appendLen = varListChunk->getListLen(startSrcOffset + i); + for (auto j = 0u; j < appendLen; j++) { + dstOffsetsInDataColumn.push_back(dataColumnSize + appendListDataNum + j); + } + appendListDataNum += appendLen; + } + dataColumn->prepareCommitForChunk( + transaction, nodeGroupIdx, dstOffsetsInDataColumn, dataColumnChunk, startVarListOffset); + // we need to update the offset since we do not do in-place list data update but append data + // in the end of list data column we need to plus to original data column size to get the + // new offset + // TODO(Jiamin): A better way is to store the offset in a offset column, just like size + // column. Then we can reuse prepareCommitForChunk interface for offset column. + auto offsetChunkMeta = getMetadata(nodeGroupIdx, transaction->getType()); + auto offsetColumnChunk = ColumnChunkFactory::createColumnChunk(*dataType.copy(), + enableCompression, 1.5 * std::bit_ceil(offsetChunkMeta.numValues + dstOffsets.size())); + Column::scan(transaction, nodeGroupIdx, offsetColumnChunk.get()); + for (auto i = 0u; i < appendListNum; i++) { + auto listEndOffset = varListChunk->getListEndOffset(startSrcOffset + i); + auto isNull = varListChunk->getNullChunk()->isNull(startSrcOffset + i); + offsetColumnChunk->setValue(dataColumnSize + listEndOffset, dstOffsets[i]); + offsetColumnChunk->getNullChunk()->setNull(dstOffsets[i], isNull); + } + Column::append(offsetColumnChunk.get(), nodeGroupIdx); + } +} + } // namespace storage } // namespace kuzu diff --git a/src/storage/store/var_list_column_chunk.cpp b/src/storage/store/var_list_column_chunk.cpp index 7adad1d3e08..de69a325b15 100644 --- a/src/storage/store/var_list_column_chunk.cpp +++ b/src/storage/store/var_list_column_chunk.cpp @@ -4,7 +4,6 @@ #include "common/data_chunk/sel_vector.h" #include "common/types/value/value.h" #include "storage/store/column_chunk.h" - using namespace kuzu::common; namespace kuzu { @@ -33,11 +32,26 @@ VarListColumnChunk::VarListColumnChunk( varListDataColumnChunk = std::make_unique( ColumnChunkFactory::createColumnChunk(*VarListType::getChildType(&this->dataType)->copy(), enableCompression, 0 /* capacity */, inMemory)); + checkOrder = true; KU_ASSERT(this->dataType.getPhysicalType() == PhysicalTypeID::VAR_LIST); } +bool VarListColumnChunk::isOffsetSortedAscending(uint64_t startPos, uint64_t endPos) const { + offset_t prevEndOffset = getListStartOffset(startPos); + for (auto i = startPos; i < endPos; i++) { + offset_t currentEndOffset = getListEndOffset(i); + auto length = getListLen(i); + prevEndOffset += length; + if (currentEndOffset != prevEndOffset) { + return false; + } + } + return true; +} + void VarListColumnChunk::append( ColumnChunk* other, offset_t startPosInOtherChunk, uint32_t numValuesToAppend) { + checkOrder = false; auto otherListChunk = ku_dynamic_cast(other); nullChunk->append(other->getNullChunk(), startPosInOtherChunk, numValuesToAppend); sizeColumnChunk->getNullChunk()->append( @@ -102,6 +116,8 @@ void VarListColumnChunk::append(ValueVector* vector, SelectionVector& selVector) numValues += numToAppend; KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); KU_ASSERT(nullChunk->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->getNullChunk()->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->sanityCheck()); } void VarListColumnChunk::appendNullList() { @@ -132,6 +148,8 @@ void VarListColumnChunk::lookup( } KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); KU_ASSERT(nullChunk->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->getNullChunk()->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->sanityCheck()); } void VarListColumnChunk::write( @@ -139,6 +157,7 @@ void VarListColumnChunk::write( KU_ASSERT(chunk->getDataType().getPhysicalType() == dataType.getPhysicalType() && dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INT64 && chunk->getNumValues() == dstOffsets->getNumValues()); + checkOrder = false; offset_t currentIndex = varListDataColumnChunk->getNumValues(); auto otherListChunk = ku_dynamic_cast(chunk); varListDataColumnChunk->resizeBuffer(varListDataColumnChunk->getNumValues() + @@ -167,10 +186,13 @@ void VarListColumnChunk::write( } KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); KU_ASSERT(nullChunk->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->getNullChunk()->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->sanityCheck()); } void VarListColumnChunk::write( ValueVector* vector, offset_t offsetInVector, offset_t offsetInChunk) { + checkOrder = false; auto selVector = std::make_unique(1); selVector->resetSelectorToValuePosBuffer(); selVector->selectedPositions[0] = offsetInVector; @@ -193,11 +215,13 @@ void VarListColumnChunk::write( } KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); KU_ASSERT(nullChunk->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->sanityCheck()); } void VarListColumnChunk::write(ColumnChunk* srcChunk, offset_t srcOffsetInChunk, offset_t dstOffsetInChunk, offset_t numValuesToCopy) { KU_ASSERT(srcChunk->getDataType().getPhysicalType() == PhysicalTypeID::VAR_LIST); + checkOrder = false; auto srcListChunk = ku_dynamic_cast(srcChunk); auto offsetInDataChunkToAppend = varListDataColumnChunk->getNumValues(); for (auto i = 0u; i < numValuesToCopy; i++) { @@ -220,6 +244,7 @@ void VarListColumnChunk::write(ColumnChunk* srcChunk, offset_t srcOffsetInChunk, } KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); KU_ASSERT(nullChunk->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->sanityCheck()); } void VarListColumnChunk::copy(ColumnChunk* srcChunk, offset_t srcOffsetInChunk, @@ -260,5 +285,60 @@ void VarListColumnChunk::resetOffset() { } } +void VarListColumnChunk::finalize() { + // rewrite the column chunk for better scanning performance + auto newColumnChunk = ColumnChunkFactory::createColumnChunk( + std::move(*dataType.copy()), enableCompression, capacity); + uint64_t totalListLen = varListDataColumnChunk->getNumValues(); + uint64_t resizeThreshold = varListDataColumnChunk->capacity / 2; + // if the list is not very long, we do not need to rewrite + if (totalListLen < resizeThreshold) { + return; + } + // if we do not trigger random write, we do not need to rewrite + if (checkOrder) { + return; + } + // if the list is in ascending order, we do not need to rewrite + if (isOffsetSortedAscending(0, numValues)) { + return; + } + auto newVarListChunk = ku_dynamic_cast(newColumnChunk.get()); + newVarListChunk->resize(numValues); + newVarListChunk->getDataColumnChunk()->resize(totalListLen); + auto dataColumnChunk = newVarListChunk->getDataColumnChunk(); + newVarListChunk->varListDataColumnChunk->resizeBuffer(totalListLen); + offset_t offsetInChunk = 0; + offset_t currentIndex = 0; + for (auto i = 0u; i < numValues; i++) { + if (nullChunk->isNull(i)) { + newVarListChunk->appendNullList(); + } else { + auto startOffset = getListStartOffset(i); + auto appendLen = getListLen(i); + dataColumnChunk->append( + varListDataColumnChunk->dataColumnChunk.get(), startOffset, appendLen); + offsetInChunk += appendLen; + newVarListChunk->getNullChunk()->setNull(currentIndex, false); + newVarListChunk->sizeColumnChunk->getNullChunk()->setNull(currentIndex, false); + newVarListChunk->sizeColumnChunk->setValue(appendLen, currentIndex); + newVarListChunk->setValue(offsetInChunk, currentIndex); + } + currentIndex++; + } + KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->getNullChunk()->getNumValues() == numValues); + // Move offsets, null, data from newVarListChunk to this column chunk. And release indices. + resetFromOtherChunk(newVarListChunk); +} +void VarListColumnChunk::resetFromOtherChunk(VarListColumnChunk* other) { + buffer = std::move(other->buffer); + nullChunk = std::move(other->nullChunk); + sizeColumnChunk = std::move(other->sizeColumnChunk); + varListDataColumnChunk = std::move(other->varListDataColumnChunk); + numValues = other->numValues; + checkOrder = true; +} + } // namespace storage } // namespace kuzu