From 65ccd6f0f196425a7802ff7f7a8161e9529a9dd5 Mon Sep 17 00:00:00 2001 From: hououou Date: Tue, 19 Mar 2024 22:46:53 -0400 Subject: [PATCH] in place commit --- src/include/storage/store/var_list_column.h | 62 ++- .../storage/store/var_list_column_chunk.h | 1 + src/storage/store/column.cpp | 1 + src/storage/store/var_list_column.cpp | 517 ++++++------------ src/storage/store/var_list_column_chunk.cpp | 17 + 5 files changed, 222 insertions(+), 376 deletions(-) diff --git a/src/include/storage/store/var_list_column.h b/src/include/storage/store/var_list_column.h index 000bcdffc44..b01b1d2a733 100644 --- a/src/include/storage/store/var_list_column.h +++ b/src/include/storage/store/var_list_column.h @@ -42,6 +42,28 @@ struct ListOffsetSizeInfo { bool isOffsetSortedAscending(uint64_t startPos, uint64_t endPos) const; }; +struct VarListInfo { + common::offset_t totalListNum; + std::unique_ptr dataInsertUpdateChunkCollection; + std::unique_ptr sizeUpdateChunkCollection; + std::unique_ptr sizeInsertChunkCollection; + std::unique_ptr offsetUpdateChunkCollection; + std::unique_ptr offsetInsertChunkCollection; + + VarListInfo(common::offset_t totalListNum, + std::unique_ptr dataInsertUpdateChunkCollection, + std::unique_ptr sizeUpdateChunkCollection, + std::unique_ptr sizeInsertChunkCollection, + std::unique_ptr offsetUpdateChunkCollection, + std::unique_ptr offsetInsertChunkCollection) + : totalListNum{totalListNum}, + dataInsertUpdateChunkCollection{std::move(dataInsertUpdateChunkCollection)}, + sizeUpdateChunkCollection{std::move(sizeUpdateChunkCollection)}, + sizeInsertChunkCollection{std::move(sizeInsertChunkCollection)}, + offsetUpdateChunkCollection{std::move(offsetUpdateChunkCollection)}, + offsetInsertChunkCollection{std::move(offsetInsertChunkCollection)} {} +}; + class VarListColumn : public Column { friend class VarListLocalColumn; @@ -77,40 +99,22 @@ class VarListColumn : public Column { void scanFiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, common::ValueVector* offsetVector, const ListOffsetSizeInfo& listOffsetInfoInStorage); - void write(common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk, - common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) override; - - void write(common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk, - ColumnChunk* data, common::offset_t dataOffset, common::length_t numValues) override; - - bool canCommitInPlace(transaction::Transaction* transaction, - common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks, - const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, - const offset_to_row_idx_t& updateInfo) override; - bool canCommitInPlace(transaction::Transaction* transaction, - common::node_group_idx_t nodeGroupIdx, const std::vector& dstOffsets, - ColumnChunk* chunk, common::offset_t srcOffset) override; - - void commitLocalChunkInPlace(transaction::Transaction* transaction, - common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks, - const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, - const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) override; - - void commitColumnChunkInPlace(common::node_group_idx_t nodeGroupIdx, - const std::vector& dstOffsets, ColumnChunk* chunk, - common::offset_t srcOffset) override; - - std::pair,std::unique_ptr> extractDataChunks(const ChunkCollection& localInsertChunks, + void commitOffsetLocalChunkOutOfPlace(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, + bool isNewNodeGroup, const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, - const offset_to_row_idx_t& updateInfo); + const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo); - std::pair,std::unique_ptr> extractSizeChunks(const ChunkCollection& localInsertChunks, + VarListInfo getVarListInfoForPrepareCommit(transaction::Transaction* transaction,common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo); - std::pair,std::unique_ptr> extractOffsetChunks(transaction::Transaction* transaction,common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks, - const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, - const offset_to_row_idx_t& updateInfo); + void dataLocalChunkInPlaceCommit(common::node_group_idx_t nodeGroupIdx, + const ChunkCollection& localChunks); + void dataLocalChunkOutPlaceCommit(transaction::Transaction* transaction, + common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localChunks); + bool canDataInPlaceCommit(transaction::Transaction* transaction, + common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localChunks, + const common::offset_t totalListNum); void checkpointInMemory() final; void rollbackInMemory() final; diff --git a/src/include/storage/store/var_list_column_chunk.h b/src/include/storage/store/var_list_column_chunk.h index be959a4065c..4e5ed1ae379 100644 --- a/src/include/storage/store/var_list_column_chunk.h +++ b/src/include/storage/store/var_list_column_chunk.h @@ -95,6 +95,7 @@ class VarListColumnChunk final : public ColumnChunk { bool isOffsetSortedAscending(uint64_t startPos, uint64_t endPos) const; bool asendingRatio() const; + void showInfo(); protected: void copyListValues(const common::list_entry_t& entry, common::ValueVector* dataVector); diff --git a/src/storage/store/column.cpp b/src/storage/store/column.cpp index b5587f7f4be..a10d7f48bae 100644 --- a/src/storage/store/column.cpp +++ b/src/storage/store/column.cpp @@ -807,6 +807,7 @@ void Column::applyLocalChunkToColumnChunk(const ChunkCollection& localChunks, for (auto& [offsetInDstChunk, rowIdx] : updateInfo) { auto [chunkIdx, offsetInLocalChunk] = LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); + KU_ASSERT(localChunks[chunkIdx]->getDataType().getLogicalTypeID()==columnChunk->getDataType().getLogicalTypeID()); columnChunk->write( localChunks[chunkIdx], offsetInLocalChunk, offsetInDstChunk, 1 /* numValues */); } diff --git a/src/storage/store/var_list_column.cpp b/src/storage/store/var_list_column.cpp index e97efd78e44..cc50f0560f5 100644 --- a/src/storage/store/var_list_column.cpp +++ b/src/storage/store/var_list_column.cpp @@ -15,6 +15,7 @@ offset_t ListOffsetSizeInfo::getListStartOffset(uint64_t pos) const { if (numTotal == 0) { return 0; } + if(pos!=numTotal) KU_ASSERT(getListEndOffset(pos) >= getListLength(pos)); return pos == numTotal ? getListEndOffset(pos - 1) : getListEndOffset(pos) - getListLength(pos); } @@ -289,6 +290,27 @@ uint32_t VarListColumn::readSize( }); return value; } +void printOffset(ColumnChunk* columnChunk){ + std::cout<<"offset\n"; + for(auto i=0u;igetNumValues();i++){ + std::cout<getValue(i)<<" "; + } + std::cout<<"\n"; + + std::cout<<"null\n"; + for(auto i=0u;igetNumValues();i++){ + std::cout<getNullChunk()->isNull(i)<<" "; + } + std::cout<<"\n"; +} + +void printSize(ColumnChunk* columnChunk){ + std::cout<<"Size\n"; + for(auto i=0u;igetNumValues();i++){ + std::cout<getValue(i)<<" "; + } + std::cout<<"\n"; +} ListOffsetSizeInfo VarListColumn::getListOffsetSizeInfo(Transaction* transaction, node_group_idx_t nodeGroupIdx, offset_t startOffsetInNodeGroup, offset_t endOffsetInNodeGroup) { @@ -301,215 +323,145 @@ ListOffsetSizeInfo VarListColumn::getListOffsetSizeInfo(Transaction* transaction endOffsetInNodeGroup); sizeColumn->scan(transaction, nodeGroupIdx, sizeColumnChunk.get(), startOffsetInNodeGroup, endOffsetInNodeGroup); + //TODO: SHOULD REMOVE + printOffset(offsetColumnChunk.get()); + printSize(sizeColumnChunk.get()); KU_ASSERT(offsetColumnChunk->getNumValues() == numOffsetsToRead); KU_ASSERT(sizeColumnChunk->getNumValues() == numOffsetsToRead); return {numOffsetsToRead, std::move(offsetColumnChunk), std::move(sizeColumnChunk)}; } -void VarListColumn::write(common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk, - common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { - auto dataVector=ListVector::getDataVector(vectorToWriteFrom); - //append vector in the end of data column - bool isNull = vectorToWriteFrom->isNull(posInVectorToWriteFrom); - auto dataChunkMeta = dataColumn->metadataDA->get(nodeGroupIdx, TransactionType::WRITE); - auto currentIndex=dataChunkMeta.numValues; - auto dataPosStartInVectorToWriteFrom = vectorToWriteFrom->getValue(posInVectorToWriteFrom).offset; - auto dataAppendLen=vectorToWriteFrom->getValue(posInVectorToWriteFrom).size; - if (!isNull) { - for(auto i=0u;iwriteValue( - dataChunkMeta, nodeGroupIdx, currentIndex+i, dataVector, dataPosStartInVectorToWriteFrom+i); - } - dataChunkMeta.numValues += dataAppendLen; - dataColumn->metadataDA->update(nodeGroupIdx, dataChunkMeta); - } - //update offset and size - auto offsetVector=std::make_unique(LogicalTypeID::UINT64); - auto sizeVector=std::make_unique(LogicalTypeID::UINT32); - offsetVector->setState(std::make_unique()); - offsetVector->state->selVector->resetSelectorToValuePosBuffer(); - offsetVector->state->selVector->selectedPositions[0] = 0; - offsetVector->setNull(0, isNull); - sizeVector->setState(offsetVector->state); - offsetVector->setValue(0,currentIndex+dataAppendLen); - sizeVector->setValue(0,dataAppendLen); - Column::write(nodeGroupIdx, offsetInChunk, offsetVector.get(), 0); - sizeColumn->write(nodeGroupIdx,offsetInChunk,sizeVector.get(),0); -} - -void VarListColumn::write(common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk, - ColumnChunk* chunk, common::offset_t dataOffset, common::length_t numValues) { - auto varListColumnChunk = ku_dynamic_cast(chunk); - //append data column chunk in the end of data column - auto dataChunkMeta = dataColumn->metadataDA->get(nodeGroupIdx, TransactionType::WRITE); - auto currentIndex=dataChunkMeta.numValues; - for(auto i=0u;igetListStartOffset(dataOffset+i); - auto appendLen=varListColumnChunk->getListLen(dataOffset+i); - currentIndex+=appendLen; - //update offset and size one by one - //TODO(Jimain): offset info should be rewritten - Column::write(nodeGroupIdx, offsetInChunk, varListColumnChunk, dataOffset+i, 1); - sizeColumn->write(nodeGroupIdx,offsetInChunk,varListColumnChunk->getSizeColumnChunk(),dataOffset+i,1); - dataColumn->write(nodeGroupIdx, currentIndex, varListColumnChunk->getDataColumnChunk(),startOffsetInChunk,appendLen); - } -} - -//TODO(Jiamin): should remove -void VarListColumn::commitLocalChunkInPlace(Transaction* /*transaction*/, node_group_idx_t nodeGroupIdx, - const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo, - const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo, - const offset_set_t& /*deleteInfo*/) { - //update size column - //update data column - sizeColumn->applyLocalChunkToColumn(nodeGroupIdx, localUpdateChunks, updateInfo); - sizeColumn->applyLocalChunkToColumn(nodeGroupIdx, localInsertChunks, insertInfo); -} - -//TODO(Jiamin): should remove -void VarListColumn::commitColumnChunkInPlace(node_group_idx_t nodeGroupIdx, - const std::vector& dstOffsets, ColumnChunk* chunk, offset_t srcOffset) { - for (auto i = 0u; i < dstOffsets.size(); i++) { - write(nodeGroupIdx, dstOffsets[i], chunk, srcOffset + i, 1 /* numValues */); - } -} - -bool VarListColumn::canCommitInPlace(Transaction* transaction, node_group_idx_t nodeGroupIdx, - const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo, - const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo) { - //TODO(Jiamin): always false beacuse of increasing offset -// auto offsetFlag=canOffsetCommitInPlace(transaction, nodeGroupIdx, localInsertChunks, insertInfo, -// localUpdateChunks, updateInfo); -// auto sizeFlag=canSizeCommitInPlace(transaction, nodeGroupIdx, localInsertChunks, insertInfo, -// localUpdateChunks, updateInfo); -// auto dataFlag=canDataCommitInPlace(transaction, nodeGroupIdx, localInsertChunks, insertInfo, -// localUpdateChunks, updateInfo); - //TODO(Jiamin): check offset flag why this is incorrect -// if(!offsetFlag) return false; -// if(!sizeFlag) return false; -// if(!dataFlag) return false; - return false; -} - -bool VarListColumn::canCommitInPlace(Transaction* transaction, node_group_idx_t nodeGroupIdx, - const std::vector& dstOffsets, ColumnChunk* chunk, - common::offset_t srcOffset) { -// auto listDataLenToAdd = 0u; -// auto varListChunk = ku_dynamic_cast(chunk); -// auto length = std::min((uint64_t)dstOffsets.size(), varListChunk->getNumValues()); -// for (auto i = 0u; i < length; i++) { -// if (varListChunk->getNullChunk()->isNull(srcOffset+i)) { -// continue; -// } -// listDataLenToAdd += varListChunk->getListLen(srcOffset+i); -// } -// auto numStrings = dstOffsets.size(); -// if (!dictionary.canCommitInPlace(transaction, nodeGroupIdx, numStrings, strLenToAdd)) { -// return false; -// } -// auto maxDstOffset = getMaxOffset(dstOffsets); -// return canIndexCommitInPlace(transaction, nodeGroupIdx, numStrings, maxDstOffset); - return false; -} -std::pair,std::unique_ptr> VarListColumn::extractDataChunks(const ChunkCollection& localInsertChunks, +VarListInfo VarListColumn::getVarListInfoForPrepareCommit(Transaction* transaction, + node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo){ - auto dataUpdateChunkCollection = std::make_unique(); - auto dataInsertChunkCollection = std::make_unique(); + auto offsetUpdateChunkCollection = std::make_unique(); + auto offsetInsertChunkCollection = std::make_unique(); + auto sizeUpdateChunkCollection = std::make_unique(); + auto sizeInsertChunkCollection = std::make_unique(); + auto dataInsertUpdateChunkCollection = std::make_unique(); + auto dataListSize=dataColumn->getMetadata(nodeGroupIdx,transaction->getType()).numValues; + auto totalListNum=dataListSize; for (auto& [_, rowIdx] : updateInfo) { auto [chunkIdx, offsetInLocalChunk] = LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); auto localUpdateChunk = localUpdateChunks[chunkIdx]; auto varListChunk = ku_dynamic_cast(localUpdateChunk); - auto dataColumnChunk=varListChunk->getDataColumnChunk(); - KU_ASSERT(chunkIdx==dataUpdateChunkCollection->size()); - dataUpdateChunkCollection->push_back(dataColumnChunk); + auto offsetColumnChunk=ColumnChunk(std::move(dataType), 0, enableCompression); + offsetColumnChunk.resize(varListChunk->getNumValues()); + offsetColumnChunk.getNullChunk()->append(varListChunk->getNullChunk(),0,varListChunk->getNumValues()); + for(auto i=0u;igetNumValues();i++){ + auto listEndOffset=varListChunk->getListEndOffset(i); + totalListNum+=varListChunk->getListLen(i); + offsetColumnChunk.setValue(dataListSize+listEndOffset,i); + offsetColumnChunk.getNullChunk()->setNull(i,varListChunk->getNullChunk()->isNull(i)); + } + KU_ASSERT(chunkIdx==offsetUpdateChunkCollection->size()); + offsetUpdateChunkCollection->push_back(&offsetColumnChunk); + sizeUpdateChunkCollection->push_back(varListChunk->getSizeColumnChunk()); + dataInsertUpdateChunkCollection->push_back(varListChunk->getDataColumnChunk()); } for (auto& [offset, rowIdx] : insertInfo) { auto [chunkIdx, offsetInLocalChunk] = LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); auto localInsertChunk = localInsertChunks[chunkIdx]; auto varListChunk = ku_dynamic_cast(localInsertChunk); - auto dataColumnChunk=varListChunk->getDataColumnChunk(); - KU_ASSERT(chunkIdx==dataInsertChunkCollection->size()); - dataInsertChunkCollection->push_back(dataColumnChunk); + //TODO: SHOULD REMOVE + std::cout<<"original info\n"; + varListChunk->showInfo(); + auto offsetColumnChunk=ColumnChunk(*common::LogicalType::UINT64(), 0, enableCompression); + offsetColumnChunk.resize(varListChunk->getNumValues()); + offsetColumnChunk.getNullChunk()->append(varListChunk->getNullChunk(),0,varListChunk->getNumValues()); + for(auto i=0u;igetNumValues();i++){ + auto listEndOffset=varListChunk->getListEndOffset(i); + totalListNum+=varListChunk->getListLen(i); + offsetColumnChunk.setValue(dataListSize+listEndOffset,i); + offsetColumnChunk.getNullChunk()->setNull(i,varListChunk->getNullChunk()->isNull(i)); + } + //TODO: SHOULD REMOVE + std::cout<<"rewrite after\n"; + printOffset(&offsetColumnChunk); + KU_ASSERT(chunkIdx==offsetInsertChunkCollection->size()); + offsetInsertChunkCollection->push_back(&offsetColumnChunk); + sizeInsertChunkCollection->push_back(varListChunk->getSizeColumnChunk()); + dataInsertUpdateChunkCollection->push_back(varListChunk->getDataColumnChunk()); } - return std::make_pair(std::move(dataInsertChunkCollection),std::move(dataUpdateChunkCollection)); + return {totalListNum, std::move(dataInsertUpdateChunkCollection), std::move(sizeUpdateChunkCollection), std::move(sizeInsertChunkCollection), std::move(offsetUpdateChunkCollection), std::move(offsetInsertChunkCollection)}; } - -std::pair,std::unique_ptr> VarListColumn::extractSizeChunks(const ChunkCollection& localInsertChunks, - const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, - const offset_to_row_idx_t& updateInfo){ - auto sizeUpdateChunkCollection = std::make_unique(); - auto sizeInsertChunkCollection = std::make_unique(); - for (auto& [_, rowIdx] : updateInfo) { - auto [chunkIdx, offsetInLocalChunk] = - LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); - auto localUpdateChunk = localUpdateChunks[chunkIdx]; - auto varListChunk = ku_dynamic_cast(localUpdateChunk); - auto sizeColumnChunk=varListChunk->getSizeColumnChunk(); - KU_ASSERT(chunkIdx==sizeUpdateChunkCollection->size()); - sizeUpdateChunkCollection->push_back(sizeColumnChunk); +bool VarListColumn::canDataInPlaceCommit(Transaction* transaction, node_group_idx_t nodeGroupIdx, const ChunkCollection& localChunks, const offset_t totalListNum){ + auto dataMeta=dataColumn->getMetadata(nodeGroupIdx,transaction->getType()); + if(dataColumn->isMaxOffsetOutOfPagesCapacity(dataMeta,totalListNum)){ + return false; } - for (auto& [offset, rowIdx] : insertInfo) { - auto [chunkIdx, offsetInLocalChunk] = - LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); - auto localInsertChunk = localInsertChunks[chunkIdx]; - auto varListChunk = ku_dynamic_cast(localInsertChunk); - auto sizeColumnChunk=varListChunk->getSizeColumnChunk(); - KU_ASSERT(chunkIdx==sizeInsertChunkCollection->size()); - sizeInsertChunkCollection->push_back(sizeColumnChunk); + if (dataMeta.compMeta.canAlwaysUpdateInPlace()) { + return true; } - return std::make_pair(std::move(sizeInsertChunkCollection),std::move(sizeUpdateChunkCollection)); + for (auto localChunk : localChunks) { + for(auto i=0u;igetNumValues();i++){ + if (!dataMeta.compMeta.canUpdateInPlace( + localChunk->getData(), i, VarListType::getChildType(&dataType)->getPhysicalType())) { + return false; + } + } + } + return true; } -std::pair,std::unique_ptr> VarListColumn::extractOffsetChunks(Transaction* transaction, - node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks, +void VarListColumn::commitOffsetLocalChunkOutOfPlace(Transaction* transaction, node_group_idx_t nodeGroupIdx, + bool isNewNodeGroup, const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, - const offset_to_row_idx_t& updateInfo){ - auto offsetUpdateChunkCollection = std::make_unique(); - auto offsetInsertChunkCollection = std::make_unique(); - auto offsetColumnChunk=ColumnChunkFactory::createColumnChunk( - *common::LogicalType::UINT64(), enableCompression, 0); - auto currentIndex=dataColumn->getMetadata(nodeGroupIdx, transaction->getType()).numValues; - for (auto& [_, rowIdx] : updateInfo) { - auto [chunkIdx, offsetInLocalChunk] = - LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); - auto localUpdateChunk = localUpdateChunks[chunkIdx]; - auto varListChunk = ku_dynamic_cast(localUpdateChunk); - //update offset Info here so we do not need to update it again in in-commit place update - offsetColumnChunk->resetToEmpty(); - offsetColumnChunk->resize(varListChunk->getNumValues()); - for(auto i=0u;igetNumValues();i++){ - auto appendLen=varListChunk->getNullChunk()->isNull(i)?0:varListChunk->getListLen(i); - currentIndex+=appendLen; - offsetColumnChunk->setValue(currentIndex,i); - offsetColumnChunk->getNullChunk()->setNull(i,varListChunk->getNullChunk()->isNull(i)); + const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) { + //out of place for offset column + auto columnChunk = ColumnChunkFactory::createColumnChunk(*LogicalType::UINT64(), enableCompression, common::StorageConstants::NODE_GROUP_SIZE); + if (isNewNodeGroup) { + KU_ASSERT(updateInfo.empty() && deleteInfo.empty()); + // Apply inserts from the local chunk. + applyLocalChunkToColumnChunk(localUpdateChunks, columnChunk.get(), updateInfo); + } else { + // First, scan the whole column chunk from persistent storage. + Column::scan(transaction, nodeGroupIdx, columnChunk.get()); + //TODO: SHOULD REMOVE + std::cout<<"apply before\n"; + printOffset(columnChunk.get()); + // Then, apply updates from the local chunk. + //TODO: FIX HERE + applyLocalChunkToColumnChunk(localUpdateChunks, columnChunk.get(), updateInfo); + // Lastly, apply inserts from the local chunk. + //TODO: SHOULD REMOVE + applyLocalChunkToColumnChunk(localInsertChunks, columnChunk.get(), insertInfo); + std::cout<<"apply after\n"; + printOffset(columnChunk.get()); + if (columnChunk->getNullChunk()) { + // Set nulls based on deleteInfo. + for (auto offsetInChunk : deleteInfo) { + columnChunk->getNullChunk()->setNull(offsetInChunk, true /* isNull */); + } } - KU_ASSERT(chunkIdx==offsetUpdateChunkCollection->size()); - offsetUpdateChunkCollection->push_back(offsetColumnChunk.get()); } - for (auto& [offset, rowIdx] : insertInfo) { - auto [chunkIdx, offsetInLocalChunk] = - LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); - auto localInsertChunk = localInsertChunks[chunkIdx]; - auto varListChunk = ku_dynamic_cast(localInsertChunk); - offsetColumnChunk->resetToEmpty(); - offsetColumnChunk->resize(varListChunk->getNumValues()); - for(auto i=0u;igetNumValues();i++){ - auto appendLen=varListChunk->getNullChunk()->isNull(i)?0:varListChunk->getListLen(i); - currentIndex+=appendLen; - offsetColumnChunk->setValue(currentIndex,i); - offsetColumnChunk->getNullChunk()->setNull(i,varListChunk->getNullChunk()->isNull(i)); - } - KU_ASSERT(chunkIdx==offsetInsertChunkCollection->size()); - offsetInsertChunkCollection->push_back(offsetColumnChunk.get()); + columnChunk->finalize(); + KU_ASSERT(columnChunk->sanityCheck()); + Column::append(columnChunk.get(), nodeGroupIdx); +} + +void VarListColumn::dataLocalChunkInPlaceCommit(node_group_idx_t nodeGroupIdx, + const ChunkCollection& localChunks) { + auto currentIndex=dataColumn->getMetadata(nodeGroupIdx, TransactionType::WRITE).numValues; + for(auto localChunk:localChunks){ + dataColumn->write(nodeGroupIdx, currentIndex, localChunk, 0, localChunk->getNumValues()); + } +} + +void VarListColumn::dataLocalChunkOutPlaceCommit(Transaction* transaction,node_group_idx_t nodeGroupIdx, + const ChunkCollection& localChunks) { + auto columnChunk = dataColumn->getEmptyChunkForCommit(common::StorageConstants::NODE_GROUP_SIZE); + dataColumn->scan(transaction, nodeGroupIdx, columnChunk.get()); + for(auto localChunk:localChunks){ + columnChunk->append(localChunk, 0, localChunk->getNumValues()); } - return std::make_pair(std::move(offsetInsertChunkCollection),std::move(offsetUpdateChunkCollection)); -// Column::canCommitInPlace(transaction, nodeGroupIdx, *offsetInsertChunkCollection, -// insertInfo, *offsetUpdateChunkCollection, updateInfo); + columnChunk->finalize(); + dataColumn->append(columnChunk.get(), nodeGroupIdx); } void VarListColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_t nodeGroupIdx, @@ -524,189 +476,60 @@ void VarListColumn::prepareCommitForChunk(Transaction* transaction, node_group_i commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, localInsertChunks, insertInfo, localUpdateChunks, updateInfo, deleteInfo); } else { - auto [offsetInsertChunkCollection, offsetUpdateChunkCollection] = - extractOffsetChunks(transaction, nodeGroupIdx, localInsertChunks, insertInfo, - localUpdateChunks, updateInfo); - auto [dataInsertChunkCollection, dataUpdateChunkCollection] = - extractDataChunks(localInsertChunks, insertInfo, localUpdateChunks, updateInfo); - auto [sizeInsertChunkCollection, sizeUpdateChunkCollection] = - extractSizeChunks(localInsertChunks, insertInfo, localUpdateChunks, updateInfo); - dataColumn->prepareCommitForChunk(transaction, nodeGroupIdx, *dataInsertChunkCollection, - insertInfo, *dataUpdateChunkCollection, updateInfo, deleteInfo); - sizeColumn->prepareCommitForChunk(transaction, nodeGroupIdx, *sizeInsertChunkCollection, - insertInfo, *sizeUpdateChunkCollection, updateInfo, deleteInfo); - Column::prepareCommitForChunk(transaction, nodeGroupIdx, *offsetInsertChunkCollection, - insertInfo, *offsetUpdateChunkCollection, updateInfo, deleteInfo); - // for data column and size column we do not to carefully prepare the commit + auto varListInfo= getVarListInfoForPrepareCommit(transaction,nodeGroupIdx,localInsertChunks,insertInfo,localUpdateChunks,updateInfo); + //offset need to rewrite, always do out-of place commit + commitOffsetLocalChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, + *varListInfo.offsetInsertChunkCollection, insertInfo, *varListInfo.offsetUpdateChunkCollection, updateInfo, + deleteInfo); + //size column can reuse the prepare commit + sizeColumn->prepareCommitForChunk(transaction, nodeGroupIdx, *varListInfo.sizeInsertChunkCollection, + insertInfo, *varListInfo.sizeUpdateChunkCollection, updateInfo, deleteInfo); + //data column need to rewrite, we always append or should we rewrite the insert info? + if(canDataInPlaceCommit(transaction,nodeGroupIdx,*varListInfo.dataInsertUpdateChunkCollection,varListInfo.totalListNum)) { + dataLocalChunkInPlaceCommit(nodeGroupIdx, *varListInfo.dataInsertUpdateChunkCollection); + } else { + dataLocalChunkOutPlaceCommit(transaction,nodeGroupIdx, *varListInfo.dataInsertUpdateChunkCollection); + } } } void VarListColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_t nodeGroupIdx, const std::vector& dstOffsets, ColumnChunk* chunk, offset_t startSrcOffset) { - KU_UNREACHABLE; auto currentNumNodeGroups = metadataDA->getNumElements(transaction->getType()); auto isNewNodeGroup = nodeGroupIdx >= currentNumNodeGroups; if (isNewNodeGroup) { commitColumnChunkOutOfPlace( transaction, nodeGroupIdx, isNewNodeGroup, dstOffsets, chunk, startSrcOffset); } else { - bool didInPlaceCommit = false; - // If this is not a new node group, we should first check if we can perform in-place commit. - if (canCommitInPlace(transaction, nodeGroupIdx, dstOffsets, chunk, startSrcOffset)) { - commitColumnChunkInPlace(nodeGroupIdx, dstOffsets, chunk, startSrcOffset); - didInPlaceCommit = true; - } else { - commitColumnChunkOutOfPlace( - transaction, nodeGroupIdx, isNewNodeGroup, dstOffsets, chunk, startSrcOffset); + //size column chunk commit + auto varListChunk= ku_dynamic_cast(chunk); + sizeColumn->prepareCommitForChunk(transaction, nodeGroupIdx, dstOffsets, varListChunk->getSizeColumnChunk(), startSrcOffset); + //data column chunk commit + auto dataColumnSize=dataColumn->getMetadata(nodeGroupIdx,transaction->getType()).numValues; + auto dataColumnChunk=varListChunk->getDataColumnChunk(); + auto appendListNum=std::min(chunk->getNumValues(),(uint64_t)dstOffsets.size()); + auto startVarListOffset = varListChunk->getListStartOffset(startSrcOffset); + std::vector dstOffsetsInDataColumn; + for(auto i=0u;iprepareCommitForChunk(transaction,nodeGroupIdx,dstOffsetsInDataColumn,dataColumnChunk,startVarListOffset); + //offset column chunk commit + //update offset from the data chunk info;since we append the data in the endof data column, we need to plus the original dataColumn Size + for(auto i=0u;igetListEndOffset(startSrcOffset+i); + varListChunk->setValue(dataColumnSize+listEndOffset,startSrcOffset+i); } - if (nullColumn) { - if (nullColumn->canCommitInPlace( - transaction, nodeGroupIdx, dstOffsets, chunk->getNullChunk(), startSrcOffset)) { - nullColumn->commitColumnChunkInPlace( - nodeGroupIdx, dstOffsets, chunk->getNullChunk(), startSrcOffset); - } else if (didInPlaceCommit) { - nullColumn->commitColumnChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, - dstOffsets, chunk->getNullChunk(), startSrcOffset); - } + auto offsetChunkMeta = getMetadata(nodeGroupIdx, transaction->getType()); + auto offsetColumnChunk = ColumnChunkFactory::createColumnChunk(*LogicalType::UINT64(), enableCompression, offsetChunkMeta.numValues + dstOffsets.size()); + Column::scan(transaction, nodeGroupIdx, offsetColumnChunk.get()); + for (auto i = 0u; i < dstOffsets.size(); i++) { + offsetColumnChunk->write(chunk, startSrcOffset + i, dstOffsets[i], 1 /* numValues */); } + offsetColumnChunk->finalize(); + Column::append(offsetColumnChunk.get(), nodeGroupIdx); } } -//bool VarListColumn::canOffsetCommitInPlace(Transaction* transaction, -// common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk, -// const offset_to_row_idx_t& insertInfo, -// const offset_to_row_idx_t& updateInfo){ -// std::vector rowIdxesToRead; -// for (auto& [offset, rowIdx] : updateInfo) { -// rowIdxesToRead.push_back(rowIdx); -// } -// for (auto& [offset, rowIdx] : insertInfo) { -// rowIdxesToRead.push_back(rowIdx); -// } -// std::sort(rowIdxesToRead.begin(), rowIdxesToRead.end()); -// auto metadata = getMetadata(nodeGroupIdx, transaction->getType()); -// auto currentIndex = metadata.numValues; -// if(!isMaxOffsetOutOfPagesCapacity(metadata,currentIndex+insertInfo.size())){ -// return false; -// } -// offset_t totalLen=0; -// auto offsetVector=std::make_unique(LogicalTypeID::UINT64); -// offsetVector->setState(std::make_unique()); -// offsetVector->state->selVector->resetSelectorToValuePosBuffer(); -// offsetVector->state->selVector->selectedPositions[0] = 0; -// for (auto rowIdx : rowIdxesToRead) { -// auto localVector = localChunk->getLocalVector(rowIdx); -// auto offsetInVector = rowIdx & (DEFAULT_VECTOR_CAPACITY - 1); -// auto listVector=localVector->getVector(); -// auto isNull=listVector->isNull(offsetInVector); -// auto listOffsetSize = listVector->getValue(offsetInVector); -// currentIndex += listOffsetSize.size; -// totalLen+=listOffsetSize.size; -// offsetVector->setNull(0, isNull); -// offsetVector->setValue(0,currentIndex); -// if (!metadata.compMeta.canUpdateInPlace( -// offsetVector->getData(), 0, PhysicalTypeID::UINT64)) { -// return false; -// } -// } -//// if(!canScaleCommitInPlace(this,transaction,nodeGroupIdx,totalLen)){ -//// return false; -//// } -// return true; -//} -// -//bool VarListColumn::canSizeCommitInPlace(Transaction* transaction, -// common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk, -// const offset_to_row_idx_t& insertInfo, -// const offset_to_row_idx_t& updateInfo){ -//// if(!canScaleCommitInPlace(sizeColumn.get(),transaction,nodeGroupIdx,insertInfo.size())){ -//// return false; -//// } -// std::vector rowIdxesToRead; -// for (auto& [offset, rowIdx] : updateInfo) { -// rowIdxesToRead.push_back(rowIdx); -// } -// for (auto& [offset, rowIdx] : insertInfo) { -// rowIdxesToRead.push_back(rowIdx); -// } -// std::sort(rowIdxesToRead.begin(), rowIdxesToRead.end()); -// auto metadata = sizeColumn->getMetadata(nodeGroupIdx, transaction->getType()); -// auto currentIndex = metadata.numValues; -// if(!isMaxOffsetOutOfPagesCapacity(metadata,currentIndex+insertInfo.size())){ -// return false; -// } -// auto sizeVector=std::make_unique(LogicalTypeID::UINT32); -// sizeVector->setState(std::make_unique()); -// sizeVector->state->selVector->resetSelectorToValuePosBuffer(); -// sizeVector->state->selVector->selectedPositions[0] = 0; -// for (auto rowIdx : rowIdxesToRead) { -// auto localVector = localChunk->getLocalVector(rowIdx); -// auto offsetInVector = rowIdx & (DEFAULT_VECTOR_CAPACITY - 1); -// auto listVector=localVector->getVector(); -// auto isNull=listVector->isNull(offsetInVector); -// auto listOffsetSize = listVector->getValue(offsetInVector); -// auto listLen = listOffsetSize.size; -// sizeVector->setNull(0, isNull); -// sizeVector->setValue(0,listLen); -// if (!metadata.compMeta.canUpdateInPlace( -// sizeVector->getData(), 0, PhysicalTypeID::UINT32)) { -// return false; -// } -// } -// return true; -//} -// -// -//bool VarListColumn::canScaleCommitInPlace(Column* column, Transaction* transaction, -// node_group_idx_t nodeGroupIdx, uint64_t numNewAppend){ -// auto metadata = column->getMetadata(nodeGroupIdx, transaction->getType()); -// auto totalAfterAppend = metadata.numValues + numNewAppend; -// auto upperNum=metadata.numPages * BufferPoolConstants::PAGE_4KB_SIZE; -// if (totalAfterAppend > -// upperNum) { -// // Data cannot be updated in place -// return false; -// } -// return true; -//} - -//void VarListColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_t nodeGroupIdx, -// LocalVectorCollection* localColumnChunk, const offset_to_row_idx_t& insertInfo, -// const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) { -// auto currentNumNodeGroups = metadataDA->getNumElements(transaction->getType()); -// auto isNewNodeGroup = nodeGroupIdx >= currentNumNodeGroups; -// if (isNewNodeGroup) { -// commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, localColumnChunk, isNewNodeGroup, -// insertInfo, updateInfo, deleteInfo); -// } else { -// bool didInPlaceCommit = false; -// // If this is not a new node group, we should first check if we can perform in-place commit. -// if (canCommitInPlace(transaction, nodeGroupIdx, localColumnChunk, insertInfo, updateInfo)) { -// commitLocalChunkInPlace( -// transaction, nodeGroupIdx, localColumnChunk, insertInfo, updateInfo, deleteInfo); -// didInPlaceCommit = true; -// } else { -// commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, localColumnChunk, isNewNodeGroup, -// insertInfo, updateInfo, deleteInfo); -// } -// // TODO(Guodong/Ben): The logic here on NullColumn is confusing as out-of-place commits and -// // in-place commits handle it differently. See if we can unify them. -// if (nullColumn) { -// // Uses functions written for the null chunk which only access the localColumnChunk's -// // null information -// if (nullColumn->canCommitInPlace( -// transaction, nodeGroupIdx, localColumnChunk, insertInfo, updateInfo)) { -// nullColumn->commitLocalChunkInPlace(transaction, nodeGroupIdx, localColumnChunk, -// insertInfo, updateInfo, deleteInfo); -// } else if (didInPlaceCommit) { -// // Out-of-place commits also commit the null chunk out of place, -// // so we only need to do a separate out of place commit for the null chunk if the -// // main chunk did an in-place commit. -// nullColumn->commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, localColumnChunk, -// isNewNodeGroup, insertInfo, updateInfo, deleteInfo); -// } -// } -// } -//} } // namespace storage } // namespace kuzu diff --git a/src/storage/store/var_list_column_chunk.cpp b/src/storage/store/var_list_column_chunk.cpp index adc88aebaa5..752011c7fef 100644 --- a/src/storage/store/var_list_column_chunk.cpp +++ b/src/storage/store/var_list_column_chunk.cpp @@ -5,6 +5,7 @@ #include "common/types/value/value.h" #include "storage/store/column_chunk.h" +#include using namespace kuzu::common; namespace kuzu { @@ -357,5 +358,21 @@ void VarListColumnChunk::resetFromOtherChunk(VarListColumnChunk* other) { numValues = other->numValues; checkOrder = true; } + +void VarListColumnChunk::showInfo() { + std::cout<<"offset \n"; + for (auto i = 0u; i < numValues; i++) { + std::cout<(i)<<" "; + } + + std::cout<<"\nsize\n"; + for(auto j=0;jgetValue(j)<<" "; + } + std::cout<<"\nnull\n"; + for(auto j=0;jisNull(j)<<" "; + } +} } // namespace storage } // namespace kuzu