diff --git a/src/include/storage/store/column_chunk.h b/src/include/storage/store/column_chunk.h index 8eecf3857f..63eccdf2ee 100644 --- a/src/include/storage/store/column_chunk.h +++ b/src/include/storage/store/column_chunk.h @@ -92,6 +92,7 @@ class ColumnChunk { template void setValue(T val, common::offset_t pos) { + KU_ASSERT(pos < capacity); ((T*)buffer.get())[pos] = val; if (pos >= numValues) { numValues = pos + 1; diff --git a/src/include/storage/store/var_list_column.h b/src/include/storage/store/var_list_column.h index 8fdef63e78..2c7b75a386 100644 --- a/src/include/storage/store/var_list_column.h +++ b/src/include/storage/store/var_list_column.h @@ -1,6 +1,7 @@ #pragma once #include "column.h" +#include "var_list_column_chunk.h" // List is a nested data type which is stored as two chunks: // 1. Offset column (type: INT64). Using offset to partition the data column into multiple lists. @@ -24,20 +25,21 @@ namespace kuzu { namespace storage { -struct ListOffsetInfoInStorage { - common::offset_t prevNodeListOffset; - std::vector> offsetVectors; +struct ListOffsetSizeInfo { + common::offset_t numTotal; + std::unique_ptr offsetColumnChunk; + std::unique_ptr sizeColumnChunk; - ListOffsetInfoInStorage(common::offset_t prevNodeListOffset, - std::vector> offsetVectors) - : prevNodeListOffset{prevNodeListOffset}, offsetVectors{std::move(offsetVectors)} {} + ListOffsetSizeInfo(common::offset_t numTotal, std::unique_ptr offsetColumnChunk, + std::unique_ptr sizeColumnChunk) + : numTotal{numTotal}, offsetColumnChunk{std::move(offsetColumnChunk)}, + sizeColumnChunk{std::move(sizeColumnChunk)} {} - common::offset_t getListOffset(uint64_t nodePos) const; + uint32_t getListLength(uint64_t pos) const; + common::offset_t getListEndOffset(uint64_t pos) const; + common::offset_t getListStartOffset(uint64_t pos) const; - inline uint64_t getListLength(uint64_t nodePos) const { - KU_ASSERT(getListOffset(nodePos + 1) >= getListOffset(nodePos)); - return getListOffset(nodePos + 1) - getListOffset(nodePos); - } + bool isOffsetSortedAscending(uint64_t startPos, uint64_t endPos) const; }; class VarListColumn : public Column { @@ -69,18 +71,11 @@ class VarListColumn : public Column { void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) override; private: - inline common::offset_t readListOffsetInStorage(transaction::Transaction* transaction, - common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup) { - return offsetInNodeGroup == 0 ? - 0 : - readOffset(transaction, nodeGroupIdx, offsetInNodeGroup - 1); - } - void scanUnfiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, common::ValueVector* resultVector, - const ListOffsetInfoInStorage& listOffsetInfoInStorage); + const ListOffsetSizeInfo& listOffsetInfoInStorage); void scanFiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, - common::ValueVector* offsetVector, const ListOffsetInfoInStorage& listOffsetInfoInStorage); + common::ValueVector* offsetVector, const ListOffsetSizeInfo& listOffsetInfoInStorage); inline bool canCommitInPlace(transaction::Transaction*, common::node_group_idx_t, const ChunkCollection&, const offset_to_row_idx_t&, const ChunkCollection&, @@ -101,13 +96,18 @@ class VarListColumn : public Column { common::offset_t readOffset(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup); - ListOffsetInfoInStorage getListOffsetInfoInStorage(transaction::Transaction* transaction, + + uint32_t readSize(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, + common::offset_t offsetInNodeGroup); + + ListOffsetSizeInfo getListOffsetSizeInfo(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInNodeGroup, - common::offset_t endOffsetInNodeGroup, - const std::shared_ptr& state); + common::offset_t endOffsetInNodeGroup); private: + std::unique_ptr sizeColumn; std::unique_ptr dataColumn; + std::unique_ptr tmpDataColumnChunk; }; } // namespace storage diff --git a/src/include/storage/store/var_list_column_chunk.h b/src/include/storage/store/var_list_column_chunk.h index 3790334cdb..933a700bd7 100644 --- a/src/include/storage/store/var_list_column_chunk.h +++ b/src/include/storage/store/var_list_column_chunk.h @@ -35,8 +35,15 @@ class VarListColumnChunk final : public ColumnChunk { return varListDataColumnChunk->dataColumnChunk.get(); } + inline ColumnChunk* getSizeColumnChunk() const { return sizeColumnChunk.get(); } + void resetToEmpty() override; + inline void setNumValues(uint64_t numValues_) override { + ColumnChunk::setNumValues(numValues_); + sizeColumnChunk->setNumValues(numValues_); + } + void append(common::ValueVector* vector, common::SelectionVector& selVector) final; void lookup(common::offset_t offsetInChunk, common::ValueVector& output, @@ -56,12 +63,34 @@ class VarListColumnChunk final : public ColumnChunk { varListDataColumnChunk->resizeBuffer(numValues); } - void finalize() override; + inline void resize(uint64_t newCapacity) override { + ColumnChunk::resize(newCapacity); + sizeColumnChunk->resize(newCapacity); + } + + inline common::offset_t getListStartOffset(common::offset_t offset) const { + if (numValues == 0) + return 0; + return offset == numValues ? getListEndOffset(offset - 1) : + getListEndOffset(offset) - getListLen(offset); + } + + inline common::offset_t getListEndOffset(common::offset_t offset) const { + if (numValues == 0) + return 0; + KU_ASSERT(offset < numValues); + return getValue(offset); + } - inline common::offset_t getListOffset(common::offset_t offset) const { - return offset == 0 ? 0 : getValue(offset - 1); + inline uint64_t getListLen(common::offset_t offset) const { + if (numValues == 0) + return 0; + KU_ASSERT(offset < sizeColumnChunk->getNumValues()); + return sizeColumnChunk->getValue(offset); } + void resetOffset(); + protected: void copyListValues(const common::list_entry_t& entry, common::ValueVector* dataVector); @@ -69,32 +98,11 @@ class VarListColumnChunk final : public ColumnChunk { void append(ColumnChunk* other, common::offset_t startPosInOtherChunk, uint32_t numValuesToAppend) override; - inline void initializeIndices() { - indicesColumnChunk = ColumnChunkFactory::createColumnChunk( - *common::LogicalType::INT64(), false /*enableCompression*/, capacity); - indicesColumnChunk->getNullChunk()->resetToAllNull(); - for (auto i = 0u; i < numValues; i++) { - indicesColumnChunk->setValue(i, i); - indicesColumnChunk->getNullChunk()->setNull(i, nullChunk->isNull(i)); - } - indicesColumnChunk->setNumValues(numValues); - } - inline uint64_t getListLen(common::offset_t offset) const { - return getListOffset(offset + 1) - getListOffset(offset); - } - - void resetFromOtherChunk(VarListColumnChunk* other); void appendNullList(); protected: + std::unique_ptr sizeColumnChunk; std::unique_ptr varListDataColumnChunk; - // The following is needed to write var list to random positions in the column chunk. - // We first append var list to the end of the column chunk. Then use indicesColumnChunk to track - // where each var list data is inside the column chunk. - // `needFinalize` is set to true whenever `write` is called. - // During `finalize`, the whole column chunk will be re-written according to indices. - bool needFinalize; - std::unique_ptr indicesColumnChunk; }; } // namespace storage diff --git a/src/storage/stats/table_statistics_collection.cpp b/src/storage/stats/table_statistics_collection.cpp index f423066e74..965765d6f8 100644 --- a/src/storage/stats/table_statistics_collection.cpp +++ b/src/storage/stats/table_statistics_collection.cpp @@ -97,6 +97,8 @@ std::unique_ptr TablesStatistics::createMetadataDAHInfo( } } break; case PhysicalTypeID::VAR_LIST: { + metadataDAHInfo->childrenInfos.push_back( + createMetadataDAHInfo(*LogicalType::UINT32(), metadataFH, bm, wal)); metadataDAHInfo->childrenInfos.push_back( createMetadataDAHInfo(*VarListType::getChildType(&dataType), metadataFH, bm, wal)); } break; diff --git a/src/storage/store/column_chunk.cpp b/src/storage/store/column_chunk.cpp index 2d44a553d0..350cc48125 100644 --- a/src/storage/store/column_chunk.cpp +++ b/src/storage/store/column_chunk.cpp @@ -221,6 +221,7 @@ void ColumnChunk::append( KU_ASSERT(nullChunk->getNumValues() == getNumValues()); nullChunk->append(other->nullChunk.get(), startPosInOtherChunk, numValuesToAppend); } + KU_ASSERT(numValues + numValuesToAppend <= capacity); memcpy(buffer.get() + numValues * numBytesPerValue, other->buffer.get() + startPosInOtherChunk * numBytesPerValue, numValuesToAppend * numBytesPerValue); diff --git a/src/storage/store/var_list_column.cpp b/src/storage/store/var_list_column.cpp index a10cf48aa1..283df9a0a0 100644 --- a/src/storage/store/var_list_column.cpp +++ b/src/storage/store/var_list_column.cpp @@ -11,13 +11,40 @@ using namespace kuzu::transaction; namespace kuzu { namespace storage { -offset_t ListOffsetInfoInStorage::getListOffset(uint64_t nodePos) const { - if (nodePos == 0) { - return prevNodeListOffset; - } else { - auto offsetVector = offsetVectors[(nodePos - 1) / DEFAULT_VECTOR_CAPACITY].get(); - return offsetVector->getValue((nodePos - 1) % DEFAULT_VECTOR_CAPACITY); +offset_t ListOffsetSizeInfo::getListStartOffset(uint64_t pos) const { + if (numTotal == 0) { + return 0; + } + return pos == numTotal ? getListEndOffset(pos - 1) : getListEndOffset(pos) - getListLength(pos); +} + +offset_t ListOffsetSizeInfo::getListEndOffset(uint64_t pos) const { + if (numTotal == 0) { + return 0; + } + KU_ASSERT(pos < offsetColumnChunk->getNumValues()); + return offsetColumnChunk->getValue(pos); +} + +uint32_t ListOffsetSizeInfo::getListLength(uint64_t pos) const { + if (numTotal == 0) { + return 0; + } + KU_ASSERT(pos < sizeColumnChunk->getNumValues()); + return sizeColumnChunk->getValue(pos); +} + +bool ListOffsetSizeInfo::isOffsetSortedAscending(uint64_t startPos, uint64_t endPos) const { + offset_t prevEndOffset = getListStartOffset(startPos); + for (auto i = startPos; i < endPos; i++) { + offset_t currentEndOffset = getListEndOffset(i); + auto length = getListLength(i); + prevEndOffset += length; + if (currentEndOffset != prevEndOffset) { + return false; + } } + return true; } VarListColumn::VarListColumn(std::string name, LogicalType dataType, @@ -26,10 +53,17 @@ VarListColumn::VarListColumn(std::string name, LogicalType dataType, RWPropertyStats propertyStatistics, bool enableCompression) : Column{name, std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal, transaction, propertyStatistics, enableCompression, true /* requireNullColumn */} { + auto sizeColName = StorageUtils::getColumnName(name, StorageUtils::ColumnType::OFFSET, ""); auto dataColName = StorageUtils::getColumnName(name, StorageUtils::ColumnType::DATA, ""); + sizeColumn = ColumnFactory::createColumn(sizeColName, *LogicalType::UINT32(), + *metaDAHeaderInfo.childrenInfos[0], dataFH, metadataFH, bufferManager, wal, transaction, + propertyStatistics, enableCompression); dataColumn = ColumnFactory::createColumn(dataColName, - *VarListType::getChildType(&this->dataType)->copy(), *metaDAHeaderInfo.childrenInfos[0], + *VarListType::getChildType(&this->dataType)->copy(), *metaDAHeaderInfo.childrenInfos[1], dataFH, metadataFH, bufferManager, wal, transaction, propertyStatistics, enableCompression); + tmpDataColumnChunk = + std::make_unique(ColumnChunkFactory::createColumnChunk( + *VarListType::getChildType(&this->dataType)->copy(), enableCompression, 0)); } void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx, @@ -39,23 +73,37 @@ void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx // a bottleneck of the scan performance. We need to further optimize this. nullColumn->scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector, offsetInVector); - auto listOffsetInfoInStorage = getListOffsetInfoInStorage( - transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector->state); + auto listOffsetInfoInStorage = + getListOffsetSizeInfo(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup); offset_t listOffsetInVector = offsetInVector == 0 ? 0 : resultVector->getValue(offsetInVector - 1).offset + resultVector->getValue(offsetInVector - 1).size; auto offsetToWriteListData = listOffsetInVector; auto numValues = endOffsetInGroup - startOffsetInGroup; + KU_ASSERT(numValues >= 0); for (auto i = 0u; i < numValues; i++) { - auto length = listOffsetInfoInStorage.getListLength(i); + uint64_t length = listOffsetInfoInStorage.getListLength(i); resultVector->setValue(i + offsetInVector, list_entry_t{listOffsetInVector, length}); listOffsetInVector += length; } ListVector::resizeDataVector(resultVector, listOffsetInVector); - dataColumn->scan(transaction, nodeGroupIdx, listOffsetInfoInStorage.getListOffset(0), - listOffsetInfoInStorage.getListOffset(numValues), ListVector::getDataVector(resultVector), - offsetToWriteListData); + auto dataVector = ListVector::getDataVector(resultVector); + bool isOffsetSortedAscending = listOffsetInfoInStorage.isOffsetSortedAscending(0, numValues); + if (isOffsetSortedAscending) { + dataColumn->scan(transaction, nodeGroupIdx, listOffsetInfoInStorage.getListStartOffset(0), + listOffsetInfoInStorage.getListStartOffset(numValues), dataVector, + offsetToWriteListData); + } else { + for (auto i = 0u; i < numValues; i++) { + offset_t startOffset = listOffsetInfoInStorage.getListStartOffset(i); + offset_t appendLen = listOffsetInfoInStorage.getListLength(i); + KU_ASSERT(appendLen >= 0); + dataColumn->scan(transaction, nodeGroupIdx, startOffset, startOffset + appendLen, + dataVector, offsetToWriteListData); + offsetToWriteListData += appendLen; + } + } } void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx, @@ -63,15 +111,47 @@ void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx if (nodeGroupIdx >= metadataDA->getNumElements(transaction->getType())) { columnChunk->setNumValues(0); } else { - Column::scan(transaction, nodeGroupIdx, columnChunk, startOffset, endOffset); - // TODO: FIX-ME. auto varListColumnChunk = ku_dynamic_cast(columnChunk); - auto startVarListOffset = varListColumnChunk->getListOffset(0); - auto endVarListOffset = varListColumnChunk->getListOffset(columnChunk->getNumValues()); - auto numElements = endVarListOffset - startVarListOffset + 1; - varListColumnChunk->resizeDataColumnChunk(std::bit_ceil(numElements)); - dataColumn->scan(transaction, nodeGroupIdx, varListColumnChunk->getDataColumnChunk(), - startVarListOffset, endVarListOffset); + Column::scan(transaction, nodeGroupIdx, columnChunk, startOffset, endOffset); + auto sizeColumnChunk = varListColumnChunk->getSizeColumnChunk(); + sizeColumn->scan(transaction, nodeGroupIdx, sizeColumnChunk, startOffset, endOffset); + auto resizeNumValues = varListColumnChunk->getDataColumnChunk()->getNumValues(); + bool isOffsetSortedAscending = true; + offset_t prevOffset = varListColumnChunk->getListStartOffset(0); + for (auto i = 0u; i < columnChunk->getNumValues(); i++) { + auto currentEndOffset = varListColumnChunk->getListEndOffset(i); + auto appendLen = varListColumnChunk->getListLen(i); + prevOffset += (uint64_t)appendLen; + if (currentEndOffset != prevOffset) { + isOffsetSortedAscending = false; + } + resizeNumValues += appendLen; + } + if (isOffsetSortedAscending) { + varListColumnChunk->resizeDataColumnChunk(std::bit_ceil(resizeNumValues)); + offset_t startVarListOffset = varListColumnChunk->getListStartOffset(0); + offset_t endVarListOffset = + varListColumnChunk->getListStartOffset(columnChunk->getNumValues()); + dataColumn->scan(transaction, nodeGroupIdx, varListColumnChunk->getDataColumnChunk(), + startVarListOffset, endVarListOffset); + varListColumnChunk->resetOffset(); + } else { + varListColumnChunk->resizeDataColumnChunk(std::bit_ceil(resizeNumValues)); + tmpDataColumnChunk->resizeBuffer(std::bit_ceil(resizeNumValues)); + auto dataVarListColumnChunk = varListColumnChunk->getDataColumnChunk(); + for (auto i = 0u; i < columnChunk->getNumValues(); i++) { + offset_t startVarListOffset = varListColumnChunk->getListStartOffset(i); + offset_t endVarListOffset = varListColumnChunk->getListEndOffset(i); + dataColumn->scan(transaction, nodeGroupIdx, + tmpDataColumnChunk->dataColumnChunk.get(), startVarListOffset, + endVarListOffset); + KU_ASSERT(endVarListOffset - startVarListOffset == + tmpDataColumnChunk->dataColumnChunk->getNumValues()); + dataVarListColumnChunk->append(tmpDataColumnChunk->dataColumnChunk.get(), 0, + tmpDataColumnChunk->dataColumnChunk->getNumValues()); + } + varListColumnChunk->resetOffset(); + } } } @@ -82,13 +162,13 @@ void VarListColumn::scanInternal( auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(startNodeOffset); auto startNodeOffsetInGroup = startNodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); - auto listOffsetInfoInStorage = - getListOffsetInfoInStorage(transaction, nodeGroupIdx, startNodeOffsetInGroup, - startNodeOffsetInGroup + nodeIDVector->state->getOriginalSize(), resultVector->state); + KU_ASSERT(resultVector->state); + auto listOffsetSizeInfo = getListOffsetSizeInfo(transaction, nodeGroupIdx, + startNodeOffsetInGroup, startNodeOffsetInGroup + nodeIDVector->state->getOriginalSize()); if (resultVector->state->selVector->isUnfiltered()) { - scanUnfiltered(transaction, nodeGroupIdx, resultVector, listOffsetInfoInStorage); + scanUnfiltered(transaction, nodeGroupIdx, resultVector, listOffsetSizeInfo); } else { - scanFiltered(transaction, nodeGroupIdx, resultVector, listOffsetInfoInStorage); + scanFiltered(transaction, nodeGroupIdx, resultVector, listOffsetSizeInfo); } } @@ -96,26 +176,29 @@ void VarListColumn::lookupValue(Transaction* transaction, offset_t nodeOffset, ValueVector* resultVector, uint32_t posInVector) { auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); auto nodeOffsetInGroup = nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); - auto listOffset = readListOffsetInStorage(transaction, nodeGroupIdx, nodeOffsetInGroup); - auto length = readListOffsetInStorage(transaction, nodeGroupIdx, nodeOffsetInGroup + 1) - - readListOffsetInStorage(transaction, nodeGroupIdx, nodeOffsetInGroup); + auto listEndOffset = readOffset(transaction, nodeGroupIdx, nodeOffsetInGroup); + auto length = readSize(transaction, nodeGroupIdx, nodeOffsetInGroup); + auto listStartOffset = listEndOffset - length; auto offsetInVector = posInVector == 0 ? 0 : resultVector->getValue(posInVector - 1); - resultVector->setValue(posInVector, list_entry_t{offsetInVector, length}); + resultVector->setValue(posInVector, list_entry_t{offsetInVector, (uint64_t)length}); ListVector::resizeDataVector(resultVector, offsetInVector + length); - dataColumn->scan(transaction, StorageUtils::getNodeGroupIdx(nodeOffset), listOffset, - listOffset + length, ListVector::getDataVector(resultVector), offsetInVector); + auto dataVector = ListVector::getDataVector(resultVector); + dataColumn->scan(transaction, StorageUtils::getNodeGroupIdx(nodeOffset), listStartOffset, + listEndOffset, dataVector, offsetInVector); } void VarListColumn::append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) { KU_ASSERT(columnChunk->getDataType().getPhysicalType() == dataType.getPhysicalType()); - Column::append(columnChunk, nodeGroupIdx); - auto dataColumnChunk = - ku_dynamic_cast(columnChunk)->getDataColumnChunk(); + auto varListColumnChunk = ku_dynamic_cast(columnChunk); + Column::append(varListColumnChunk, nodeGroupIdx); + auto sizeColumnChunk = varListColumnChunk->getSizeColumnChunk(); + sizeColumn->append(sizeColumnChunk, nodeGroupIdx); + auto dataColumnChunk = varListColumnChunk->getDataColumnChunk(); dataColumn->append(dataColumnChunk, nodeGroupIdx); } void VarListColumn::scanUnfiltered(Transaction* transaction, node_group_idx_t nodeGroupIdx, - ValueVector* resultVector, const ListOffsetInfoInStorage& listOffsetInfoInStorage) { + ValueVector* resultVector, const ListOffsetSizeInfo& listOffsetInfoInStorage) { auto numValuesToScan = resultVector->state->selVector->selectedSize; offset_t offsetInVector = 0; for (auto i = 0u; i < numValuesToScan; i++) { @@ -124,18 +207,32 @@ void VarListColumn::scanUnfiltered(Transaction* transaction, node_group_idx_t no offsetInVector += listLen; } ListVector::resizeDataVector(resultVector, offsetInVector); - auto startListOffsetInStorage = listOffsetInfoInStorage.getListOffset(0); - auto endListOffsetInStorage = listOffsetInfoInStorage.getListOffset(numValuesToScan); - dataColumn->scan(transaction, nodeGroupIdx, startListOffsetInStorage, endListOffsetInStorage, - ListVector::getDataVector(resultVector), 0 /* offsetInVector */); + auto dataVector = ListVector::getDataVector(resultVector); + offsetInVector = 0; + bool checkOffsetOrder = listOffsetInfoInStorage.isOffsetSortedAscending(0, numValuesToScan); + if (checkOffsetOrder) { + auto startListOffsetInStorage = listOffsetInfoInStorage.getListStartOffset(0); + numValuesToScan = numValuesToScan == 0 ? 0 : numValuesToScan - 1; + auto endListOffsetInStorage = listOffsetInfoInStorage.getListEndOffset(numValuesToScan); + dataColumn->scan(transaction, nodeGroupIdx, startListOffsetInStorage, + endListOffsetInStorage, dataVector, 0 /* offsetInVector */); + } else { + for (auto i = 0u; i < numValuesToScan; i++) { + auto startListOffsetInStorage = listOffsetInfoInStorage.getListStartOffset(i); + auto appendLen = listOffsetInfoInStorage.getListLength(i); + dataColumn->scan(transaction, nodeGroupIdx, startListOffsetInStorage, + startListOffsetInStorage + appendLen, dataVector, offsetInVector); + offsetInVector += appendLen; + } + } } void VarListColumn::scanFiltered(Transaction* transaction, node_group_idx_t nodeGroupIdx, - ValueVector* resultVector, const ListOffsetInfoInStorage& listOffsetInfoInStorage) { + ValueVector* resultVector, const ListOffsetSizeInfo& listOffsetSizeInfo) { offset_t listOffset = 0; for (auto i = 0u; i < resultVector->state->selVector->selectedSize; i++) { auto pos = resultVector->state->selVector->selectedPositions[i]; - auto listLen = listOffsetInfoInStorage.getListLength(pos); + auto listLen = listOffsetSizeInfo.getListLength(pos); resultVector->setValue(pos, list_entry_t{(offset_t)listOffset, (uint64_t)listLen}); listOffset += listLen; } @@ -143,21 +240,24 @@ void VarListColumn::scanFiltered(Transaction* transaction, node_group_idx_t node listOffset = 0; for (auto i = 0u; i < resultVector->state->selVector->selectedSize; i++) { auto pos = resultVector->state->selVector->selectedPositions[i]; - auto startOffsetInStorageToScan = listOffsetInfoInStorage.getListOffset(pos); - auto endOffsetInStorageToScan = listOffsetInfoInStorage.getListOffset(pos + 1); + auto startOffsetInStorageToScan = listOffsetSizeInfo.getListStartOffset(pos); + auto appendLen = listOffsetSizeInfo.getListLength(pos); + auto dataVector = ListVector::getDataVector(resultVector); dataColumn->scan(transaction, nodeGroupIdx, startOffsetInStorageToScan, - endOffsetInStorageToScan, ListVector::getDataVector(resultVector), listOffset); + startOffsetInStorageToScan + appendLen, dataVector, listOffset); listOffset += resultVector->getValue(pos).size; } } void VarListColumn::checkpointInMemory() { Column::checkpointInMemory(); + sizeColumn->checkpointInMemory(); dataColumn->checkpointInMemory(); } void VarListColumn::rollbackInMemory() { Column::rollbackInMemory(); + sizeColumn->rollbackInMemory(); dataColumn->rollbackInMemory(); } @@ -175,29 +275,32 @@ offset_t VarListColumn::readOffset( return value; } -ListOffsetInfoInStorage VarListColumn::getListOffsetInfoInStorage(Transaction* transaction, - node_group_idx_t nodeGroupIdx, offset_t startOffsetInNodeGroup, offset_t endOffsetInNodeGroup, - const std::shared_ptr& state) { +uint32_t VarListColumn::readSize( + Transaction* transaction, node_group_idx_t nodeGroupIdx, offset_t offsetInNodeGroup) { + auto chunkMeta = sizeColumn->getMetadataDA()->get(nodeGroupIdx, transaction->getType()); + auto pageCursor = PageUtils::getPageCursorForPos(offsetInNodeGroup, + chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType)); + pageCursor.pageIdx += chunkMeta.pageIdx; + offset_t value; + readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void { + readToPageFunc(frame, pageCursor, (uint8_t*)&value, 0 /* posInVector */, + 1 /* numValuesToRead */, chunkMeta.compMeta); + }); + return value; +} + +ListOffsetSizeInfo VarListColumn::getListOffsetSizeInfo(Transaction* transaction, + node_group_idx_t nodeGroupIdx, offset_t startOffsetInNodeGroup, offset_t endOffsetInNodeGroup) { auto numOffsetsToRead = endOffsetInNodeGroup - startOffsetInNodeGroup; - auto numOffsetVectors = numOffsetsToRead / DEFAULT_VECTOR_CAPACITY + - (numOffsetsToRead % DEFAULT_VECTOR_CAPACITY ? 1 : 0); - std::vector> offsetVectors; - offsetVectors.reserve(numOffsetVectors); - uint64_t numOffsetsRead = 0; - for (auto i = 0u; i < numOffsetVectors; i++) { - auto offsetVector = std::make_unique(LogicalTypeID::INT64); - auto numOffsetsToReadInCurBatch = - std::min(numOffsetsToRead - numOffsetsRead, DEFAULT_VECTOR_CAPACITY); - offsetVector->setState(state); - Column::scan(transaction, nodeGroupIdx, startOffsetInNodeGroup + numOffsetsRead, - startOffsetInNodeGroup + numOffsetsRead + numOffsetsToReadInCurBatch, - offsetVector.get(), 0 /* offsetInVector */); - offsetVectors.push_back(std::move(offsetVector)); - numOffsetsRead += numOffsetsToReadInCurBatch; - } - auto prevNodeListOffsetInStorage = - readListOffsetInStorage(transaction, nodeGroupIdx, startOffsetInNodeGroup); - return {prevNodeListOffsetInStorage, std::move(offsetVectors)}; + auto offsetColumnChunk = ColumnChunkFactory::createColumnChunk( + *common::LogicalType::INT64(), enableCompression, numOffsetsToRead); + auto sizeColumnChunk = ColumnChunkFactory::createColumnChunk( + *common::LogicalType::UINT32(), enableCompression, numOffsetsToRead); + Column::scan(transaction, nodeGroupIdx, offsetColumnChunk.get(), startOffsetInNodeGroup, + endOffsetInNodeGroup); + sizeColumn->scan(transaction, nodeGroupIdx, sizeColumnChunk.get(), startOffsetInNodeGroup, + endOffsetInNodeGroup); + return {numOffsetsToRead, std::move(offsetColumnChunk), std::move(sizeColumnChunk)}; } } // namespace storage diff --git a/src/storage/store/var_list_column_chunk.cpp b/src/storage/store/var_list_column_chunk.cpp index e7893e31ff..7adad1d3e0 100644 --- a/src/storage/store/var_list_column_chunk.cpp +++ b/src/storage/store/var_list_column_chunk.cpp @@ -27,8 +27,9 @@ void VarListDataColumnChunk::resizeBuffer(uint64_t numValues) { VarListColumnChunk::VarListColumnChunk( LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory) - : ColumnChunk{std::move(dataType), capacity, enableCompression, true /* hasNullChunk */}, - needFinalize{false} { + : ColumnChunk{std::move(dataType), capacity, enableCompression, true /* hasNullChunk */} { + sizeColumnChunk = ColumnChunkFactory::createColumnChunk( + *common::LogicalType::UINT32(), enableCompression, capacity); varListDataColumnChunk = std::make_unique( ColumnChunkFactory::createColumnChunk(*VarListType::getChildType(&this->dataType)->copy(), enableCompression, 0 /* capacity */, inMemory)); @@ -37,25 +38,31 @@ VarListColumnChunk::VarListColumnChunk( void VarListColumnChunk::append( ColumnChunk* other, offset_t startPosInOtherChunk, uint32_t numValuesToAppend) { - nullChunk->append(other->getNullChunk(), startPosInOtherChunk, numValuesToAppend); auto otherListChunk = ku_dynamic_cast(other); - auto offsetInDataChunkToAppend = varListDataColumnChunk->getNumValues(); - KU_ASSERT(numValues + numValuesToAppend <= capacity); + nullChunk->append(other->getNullChunk(), startPosInOtherChunk, numValuesToAppend); + sizeColumnChunk->getNullChunk()->append( + other->getNullChunk(), startPosInOtherChunk, numValuesToAppend); + offset_t offsetInDataChunkToAppend = varListDataColumnChunk->getNumValues(); for (auto i = 0u; i < numValuesToAppend; i++) { - offsetInDataChunkToAppend += otherListChunk->getListLen(startPosInOtherChunk + i); - setValue(offsetInDataChunkToAppend, numValues); + auto appendListLen = otherListChunk->getListLen(startPosInOtherChunk + i); + sizeColumnChunk->setValue(appendListLen, numValues); + offsetInDataChunkToAppend += appendListLen; + setValue(offsetInDataChunkToAppend, numValues); } - auto startOffset = otherListChunk->getListOffset(startPosInOtherChunk); - auto endOffset = otherListChunk->getListOffset(startPosInOtherChunk + numValuesToAppend); - KU_ASSERT(endOffset >= startOffset); varListDataColumnChunk->resizeBuffer(offsetInDataChunkToAppend); - varListDataColumnChunk->dataColumnChunk->append( - otherListChunk->varListDataColumnChunk->dataColumnChunk.get(), startOffset, - endOffset - startOffset); + for (auto i = 0u; i < numValuesToAppend; i++) { + auto startOffset = otherListChunk->getListStartOffset(startPosInOtherChunk + i); + auto appendLen = otherListChunk->getListLen(startPosInOtherChunk + i); + varListDataColumnChunk->dataColumnChunk->append( + otherListChunk->varListDataColumnChunk->dataColumnChunk.get(), startOffset, appendLen); + } + KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); + KU_ASSERT(nullChunk->getNumValues() == numValues); } void VarListColumnChunk::resetToEmpty() { ColumnChunk::resetToEmpty(); + sizeColumnChunk->resetToEmpty(); varListDataColumnChunk = std::make_unique( ColumnChunkFactory::createColumnChunk(*VarListType::getChildType(&this->dataType)->copy(), enableCompression, 0 /* capacity */)); @@ -70,11 +77,13 @@ void VarListColumnChunk::append(ValueVector* vector, SelectionVector& selVector) if (capacity < newCapacity) { resize(newCapacity); } - auto nextListOffsetInChunk = getListOffset(numValues); + offset_t nextListOffsetInChunk = varListDataColumnChunk->getNumValues(); auto offsetBufferToWrite = (offset_t*)(buffer.get()); for (auto i = 0u; i < selVector.selectedSize; i++) { auto pos = selVector.selectedPositions[i]; - uint64_t listLen = vector->isNull(pos) ? 0 : vector->getValue(pos).size; + auto listLen = vector->isNull(pos) ? 0 : vector->getValue(pos).size; + sizeColumnChunk->setValue(listLen, numValues + i); + sizeColumnChunk->getNullChunk()->setNull(numValues + i, vector->isNull(pos)); nullChunk->setNull(numValues + i, vector->isNull(pos)); nextListOffsetInChunk += listLen; offsetBufferToWrite[numValues + i] = nextListOffsetInChunk; @@ -91,11 +100,15 @@ void VarListColumnChunk::append(ValueVector* vector, SelectionVector& selVector) copyListValues(vector->getValue(pos), dataVector); } numValues += numToAppend; + KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); + KU_ASSERT(nullChunk->getNumValues() == numValues); } void VarListColumnChunk::appendNullList() { - auto nextListOffsetInChunk = getListOffset(numValues); + offset_t nextListOffsetInChunk = varListDataColumnChunk->getNumValues(); auto offsetBufferToWrite = (offset_t*)(buffer.get()); + sizeColumnChunk->setValue(0, numValues); + sizeColumnChunk->getNullChunk()->setNull(numValues, true); offsetBufferToWrite[numValues] = nextListOffsetInChunk; nullChunk->setNull(numValues, true); numValues++; @@ -109,8 +122,7 @@ void VarListColumnChunk::lookup( return; } auto startOffset = offsetInChunk == 0 ? 0 : getValue(offsetInChunk - 1); - auto endOffset = getValue(offsetInChunk); - auto listLen = endOffset - startOffset; + auto listLen = getListLen(offsetInChunk); auto dataVector = ListVector::getDataVector(&output); auto currentListDataSize = ListVector::getDataVectorSize(&output); ListVector::resizeDataVector(&output, currentListDataSize + listLen); @@ -118,74 +130,96 @@ void VarListColumnChunk::lookup( for (auto i = 0u; i < listLen; i++) { varListDataColumnChunk->dataColumnChunk->lookup(startOffset + i, *dataVector, i); } + KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); + KU_ASSERT(nullChunk->getNumValues() == numValues); } void VarListColumnChunk::write( ColumnChunk* chunk, ColumnChunk* dstOffsets, RelMultiplicity /*multiplicity*/) { - needFinalize = true; - if (!indicesColumnChunk) { - initializeIndices(); - } KU_ASSERT(chunk->getDataType().getPhysicalType() == dataType.getPhysicalType() && dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INT64 && chunk->getNumValues() == dstOffsets->getNumValues()); - auto currentIndex = numValues; - append(chunk, 0, chunk->getNumValues()); + offset_t currentIndex = varListDataColumnChunk->getNumValues(); + auto otherListChunk = ku_dynamic_cast(chunk); + varListDataColumnChunk->resizeBuffer(varListDataColumnChunk->getNumValues() + + otherListChunk->varListDataColumnChunk->getNumValues()); + varListDataColumnChunk->dataColumnChunk->append( + otherListChunk->varListDataColumnChunk->dataColumnChunk.get(), 0, + otherListChunk->varListDataColumnChunk->getNumValues()); + offset_t maxDstOffset = 0; for (auto i = 0u; i < dstOffsets->getNumValues(); i++) { auto posInChunk = dstOffsets->getValue(i); - KU_ASSERT(posInChunk < capacity); - indicesColumnChunk->setValue(currentIndex++, posInChunk); - indicesColumnChunk->getNullChunk()->setNull(posInChunk, false); - if (indicesColumnChunk->getNumValues() <= posInChunk) { - indicesColumnChunk->setNumValues(posInChunk + 1); + if (posInChunk > maxDstOffset) { + maxDstOffset = posInChunk; } } - KU_ASSERT(currentIndex == numValues && - indicesColumnChunk->getNumValues() <= indicesColumnChunk->getCapacity()); + while (maxDstOffset >= numValues) { + appendNullList(); + } + for (auto i = 0u; i < dstOffsets->getNumValues(); i++) { + auto posInChunk = dstOffsets->getValue(i); + auto appendLen = otherListChunk->getListLen(i); + currentIndex += appendLen; + setValue(currentIndex, posInChunk); + nullChunk->setNull(posInChunk, otherListChunk->nullChunk->isNull(i)); + sizeColumnChunk->setValue(appendLen, posInChunk); + sizeColumnChunk->getNullChunk()->setNull(posInChunk, otherListChunk->nullChunk->isNull(i)); + } + KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); + KU_ASSERT(nullChunk->getNumValues() == numValues); } void VarListColumnChunk::write( ValueVector* vector, offset_t offsetInVector, offset_t offsetInChunk) { - needFinalize = true; - if (!indicesColumnChunk) { - initializeIndices(); - } - auto currentIndex = numValues; - append(vector, *vector->state->selVector); - KU_ASSERT(offsetInChunk < capacity); - indicesColumnChunk->setValue(currentIndex, offsetInChunk); - indicesColumnChunk->getNullChunk()->setNull(offsetInChunk, vector->isNull(offsetInVector)); - if (indicesColumnChunk->getNumValues() <= offsetInChunk) { - indicesColumnChunk->setNumValues(offsetInChunk + 1); + auto selVector = std::make_unique(1); + selVector->resetSelectorToValuePosBuffer(); + selVector->selectedPositions[0] = offsetInVector; + auto appendLen = + vector->isNull(offsetInVector) ? 0 : vector->getValue(offsetInVector).size; + varListDataColumnChunk->resizeBuffer(varListDataColumnChunk->getNumValues() + appendLen); + auto dataVector = ListVector::getDataVector(vector); + dataVector->setState(std::make_unique()); + dataVector->state->selVector->resetSelectorToValuePosBuffer(); + copyListValues(vector->getValue(offsetInVector), dataVector); + while (offsetInChunk >= numValues) { + appendNullList(); } + auto isNull = vector->isNull(offsetInVector); + nullChunk->setNull(offsetInChunk, isNull); + sizeColumnChunk->getNullChunk()->setNull(offsetInChunk, isNull); + if (!isNull) { + sizeColumnChunk->setValue(appendLen, offsetInChunk); + setValue(varListDataColumnChunk->getNumValues(), offsetInChunk); + } + KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); + KU_ASSERT(nullChunk->getNumValues() == numValues); } void VarListColumnChunk::write(ColumnChunk* srcChunk, offset_t srcOffsetInChunk, offset_t dstOffsetInChunk, offset_t numValuesToCopy) { KU_ASSERT(srcChunk->getDataType().getPhysicalType() == PhysicalTypeID::VAR_LIST); - needFinalize = true; - if (!indicesColumnChunk) { - initializeIndices(); - } - nullChunk->write(srcChunk->getNullChunk(), srcOffsetInChunk, dstOffsetInChunk, numValuesToCopy); auto srcListChunk = ku_dynamic_cast(srcChunk); auto offsetInDataChunkToAppend = varListDataColumnChunk->getNumValues(); - auto currentIndex = numValues; for (auto i = 0u; i < numValuesToCopy; i++) { - offsetInDataChunkToAppend += srcListChunk->getListLen(srcOffsetInChunk + i); - setValue(offsetInDataChunkToAppend, currentIndex + i); - indicesColumnChunk->setValue(currentIndex + i, dstOffsetInChunk + i); - indicesColumnChunk->getNullChunk()->setNull(dstOffsetInChunk + i, false); + auto appendListLen = srcListChunk->getListLen(srcOffsetInChunk + i); + offsetInDataChunkToAppend += appendListLen; + sizeColumnChunk->setValue(appendListLen, dstOffsetInChunk + i); + setValue(offsetInDataChunkToAppend, dstOffsetInChunk + i); + nullChunk->setNull( + dstOffsetInChunk + i, srcListChunk->nullChunk->isNull(srcOffsetInChunk + i)); + sizeColumnChunk->getNullChunk()->setNull( + dstOffsetInChunk + i, srcListChunk->nullChunk->isNull(srcOffsetInChunk + i)); } varListDataColumnChunk->resizeBuffer(offsetInDataChunkToAppend); - auto startOffsetInSrcChunk = srcListChunk->getListOffset(srcOffsetInChunk); - auto endOffsetInSrcChunk = srcListChunk->getListOffset(srcOffsetInChunk + numValuesToCopy); - varListDataColumnChunk->dataColumnChunk->append( - srcListChunk->varListDataColumnChunk->dataColumnChunk.get(), startOffsetInSrcChunk, - endOffsetInSrcChunk - startOffsetInSrcChunk); - if (indicesColumnChunk->getNumValues() < dstOffsetInChunk + numValuesToCopy) { - indicesColumnChunk->setNumValues(dstOffsetInChunk + numValuesToCopy + 1); + for (auto i = 0u; i < numValuesToCopy; i++) { + auto startOffsetInSrcChunk = srcListChunk->getListStartOffset(srcOffsetInChunk + i); + auto appendLen = srcListChunk->getListLen(srcOffsetInChunk + i); + varListDataColumnChunk->dataColumnChunk->append( + srcListChunk->varListDataColumnChunk->dataColumnChunk.get(), startOffsetInSrcChunk, + appendLen); } + KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); + KU_ASSERT(nullChunk->getNumValues() == numValues); } void VarListColumnChunk::copy(ColumnChunk* srcChunk, offset_t srcOffsetInChunk, @@ -196,6 +230,8 @@ void VarListColumnChunk::copy(ColumnChunk* srcChunk, offset_t srcOffsetInChunk, appendNullList(); } append(srcChunk, srcOffsetInChunk, numValuesToCopy); + KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); + KU_ASSERT(nullChunk->getNumValues() == numValues); } void VarListColumnChunk::copyListValues(const list_entry_t& entry, ValueVector* dataVector) { @@ -214,35 +250,14 @@ void VarListColumnChunk::copyListValues(const list_entry_t& entry, ValueVector* } } -void VarListColumnChunk::finalize() { - if (!needFinalize) { - return; - } - auto newColumnChunk = ColumnChunkFactory::createColumnChunk( - std::move(*dataType.copy()), enableCompression, capacity); - auto totalListLen = getListOffset(numValues) + 1; - auto newVarListChunk = ku_dynamic_cast(newColumnChunk.get()); - newVarListChunk->getDataColumnChunk()->resize(totalListLen); - for (auto i = 0u; i < indicesColumnChunk->getNumValues(); i++) { - if (indicesColumnChunk->getNullChunk()->isNull(i)) { - newVarListChunk->appendNullList(); - } else { - auto index = indicesColumnChunk->getValue(i); - newColumnChunk->append(this, index, 1); - } +void VarListColumnChunk::resetOffset() { + offset_t nextListOffsetReset = 0; + for (auto i = 0u; i < numValues; i++) { + auto listLen = getListLen(i); + nextListOffsetReset += uint64_t(listLen); + setValue(nextListOffsetReset, i); + sizeColumnChunk->setValue(listLen, i); } - // Move offsets, null, data from newVarListChunk to this column chunk. And release indices. - resetFromOtherChunk(newVarListChunk); -} - -void VarListColumnChunk::resetFromOtherChunk(VarListColumnChunk* other) { - buffer = std::move(other->buffer); - nullChunk = std::move(other->nullChunk); - varListDataColumnChunk = std::move(other->varListDataColumnChunk); - numValues = other->numValues; - // Reset indices and needFinalize. - indicesColumnChunk.reset(); - needFinalize = false; } } // namespace storage