diff --git a/src/include/storage/store/column.h b/src/include/storage/store/column.h index 4e9966eba4d..7e5b8b90517 100644 --- a/src/include/storage/store/column.h +++ b/src/include/storage/store/column.h @@ -165,8 +165,8 @@ class Column { const ColumnChunkMetadata& metadata, const offset_to_row_idx_t& insertInfo); bool isMaxOffsetOutOfPagesCapacity( const ColumnChunkMetadata& metadata, common::offset_t maxOffset); - virtual bool checkUpdateInPlace(const ColumnChunkMetadata& metadata, const ChunkCollection& localChunks, - const offset_to_row_idx_t& writeInfo); + virtual bool checkUpdateInPlace(const ColumnChunkMetadata& metadata, + const ChunkCollection& localChunks, const offset_to_row_idx_t& writeInfo); virtual bool canCommitInPlace(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, diff --git a/src/include/storage/store/var_list_column.h b/src/include/storage/store/var_list_column.h index b01b1d2a733..3eaf5a03073 100644 --- a/src/include/storage/store/var_list_column.h +++ b/src/include/storage/store/var_list_column.h @@ -56,8 +56,8 @@ struct VarListInfo { std::unique_ptr sizeInsertChunkCollection, std::unique_ptr offsetUpdateChunkCollection, std::unique_ptr offsetInsertChunkCollection) - : totalListNum{totalListNum}, - dataInsertUpdateChunkCollection{std::move(dataInsertUpdateChunkCollection)}, + : totalListNum{totalListNum}, dataInsertUpdateChunkCollection{std::move( + dataInsertUpdateChunkCollection)}, sizeUpdateChunkCollection{std::move(sizeUpdateChunkCollection)}, sizeInsertChunkCollection{std::move(sizeInsertChunkCollection)}, offsetUpdateChunkCollection{std::move(offsetUpdateChunkCollection)}, @@ -99,23 +99,6 @@ class VarListColumn : public Column { void scanFiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, common::ValueVector* offsetVector, const ListOffsetSizeInfo& listOffsetInfoInStorage); - void commitOffsetLocalChunkOutOfPlace(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, - bool isNewNodeGroup, const ChunkCollection& localInsertChunks, - const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, - const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo); - - VarListInfo getVarListInfoForPrepareCommit(transaction::Transaction* transaction,common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks, - const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, - const offset_to_row_idx_t& updateInfo); - - void dataLocalChunkInPlaceCommit(common::node_group_idx_t nodeGroupIdx, - const ChunkCollection& localChunks); - void dataLocalChunkOutPlaceCommit(transaction::Transaction* transaction, - common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localChunks); - bool canDataInPlaceCommit(transaction::Transaction* transaction, - common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localChunks, - const common::offset_t totalListNum); - void checkpointInMemory() final; void rollbackInMemory() final; @@ -133,7 +116,7 @@ class VarListColumn : public Column { common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) override; - void prepareCommitForChunk(transaction::Transaction* transaction, + void prepareCommitForChunk(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, const std::vector& dstOffsets, ColumnChunk* chunk, common::offset_t startSrcOffset) override; diff --git a/src/storage/store/column.cpp b/src/storage/store/column.cpp index a10d7f48bae..418902519af 100644 --- a/src/storage/store/column.cpp +++ b/src/storage/store/column.cpp @@ -792,7 +792,9 @@ void Column::commitColumnChunkOutOfPlace(Transaction* transaction, node_group_id auto chunkMeta = getMetadata(nodeGroupIdx, transaction->getType()); // TODO(Guodong): Should consider caching the scanned column chunk to avoid redundant // scans in the same transaction. - auto columnChunk = getEmptyChunkForCommit(chunkMeta.numValues + dstOffsets.size()); + auto columnChunk = + getEmptyChunkForCommit(4 * std::bit_ceil(chunkMeta.numValues + dstOffsets.size())); + auto a = columnChunk->getCapacity(); scan(transaction, nodeGroupIdx, columnChunk.get()); for (auto i = 0u; i < dstOffsets.size(); i++) { columnChunk->write(chunk, srcOffset + i, dstOffsets[i], 1 /* numValues */); @@ -807,7 +809,8 @@ void Column::applyLocalChunkToColumnChunk(const ChunkCollection& localChunks, for (auto& [offsetInDstChunk, rowIdx] : updateInfo) { auto [chunkIdx, offsetInLocalChunk] = LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); - KU_ASSERT(localChunks[chunkIdx]->getDataType().getLogicalTypeID()==columnChunk->getDataType().getLogicalTypeID()); + KU_ASSERT(localChunks[chunkIdx]->getDataType().getLogicalTypeID() == + columnChunk->getDataType().getLogicalTypeID()); columnChunk->write( localChunks[chunkIdx], offsetInLocalChunk, offsetInDstChunk, 1 /* numValues */); } diff --git a/src/storage/store/column_chunk.cpp b/src/storage/store/column_chunk.cpp index 350cc48125f..07a0ae092f1 100644 --- a/src/storage/store/column_chunk.cpp +++ b/src/storage/store/column_chunk.cpp @@ -382,6 +382,11 @@ bool ColumnChunk::numValuesSanityCheck() const { bool ColumnChunk::sanityCheck() { if (nullChunk) { + auto a = nullChunk->sanityCheck(); + auto b = numValuesSanityCheck(); + if (a != b) { + KU_UNREACHABLE; + } return nullChunk->sanityCheck() && numValuesSanityCheck(); } return numValues <= capacity; diff --git a/src/storage/store/string_column.cpp b/src/storage/store/string_column.cpp index 7c6dc934b48..5112c499deb 100644 --- a/src/storage/store/string_column.cpp +++ b/src/storage/store/string_column.cpp @@ -205,7 +205,7 @@ bool StringColumn::canCommitInPlace(Transaction* transaction, node_group_idx_t n auto strChunk = ku_dynamic_cast(chunk); auto length = std::min((uint64_t)dstOffsets.size(), strChunk->getNumValues()); for (auto i = 0u; i < length; i++) { - if (strChunk->getNullChunk()->isNull(i)) { //TODO(Jiamin): i should be i + srcOffset? + if (strChunk->getNullChunk()->isNull(i)) { // TODO(Jiamin): i should be i + srcOffset? continue; } strLenToAdd += strChunk->getStringLength(i + srcOffset); diff --git a/src/storage/store/var_list_column.cpp b/src/storage/store/var_list_column.cpp index cc50f0560f5..fe89212b88c 100644 --- a/src/storage/store/var_list_column.cpp +++ b/src/storage/store/var_list_column.cpp @@ -15,7 +15,8 @@ offset_t ListOffsetSizeInfo::getListStartOffset(uint64_t pos) const { if (numTotal == 0) { return 0; } - if(pos!=numTotal) KU_ASSERT(getListEndOffset(pos) >= getListLength(pos)); + if (pos != numTotal) + KU_ASSERT(getListEndOffset(pos) >= getListLength(pos)); return pos == numTotal ? getListEndOffset(pos - 1) : getListEndOffset(pos) - getListLength(pos); } @@ -193,8 +194,11 @@ void VarListColumn::append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) { auto varListColumnChunk = ku_dynamic_cast(columnChunk); Column::append(varListColumnChunk, nodeGroupIdx); auto sizeColumnChunk = varListColumnChunk->getSizeColumnChunk(); + KU_ASSERT(sizeColumnChunk->sanityCheck()); sizeColumn->append(sizeColumnChunk, nodeGroupIdx); KU_ASSERT(varListColumnChunk->getNumValues() == sizeColumnChunk->getNumValues()); + KU_ASSERT(varListColumnChunk->getNullChunk()->getNumValues() == + sizeColumnChunk->getNullChunk()->getNumValues()); auto dataColumnChunk = varListColumnChunk->getDataColumnChunk(); dataColumn->append(dataColumnChunk, nodeGroupIdx); } @@ -290,26 +294,27 @@ uint32_t VarListColumn::readSize( }); return value; } -void printOffset(ColumnChunk* columnChunk){ - std::cout<<"offset\n"; - for(auto i=0u;igetNumValues();i++){ - std::cout<getValue(i)<<" "; - } - std::cout<<"\n"; - std::cout<<"null\n"; - for(auto i=0u;igetNumValues();i++){ - std::cout<getNullChunk()->isNull(i)<<" "; - } - std::cout<<"\n"; +void printOffset(ColumnChunk* columnChunk) { + // std::cout<<"offset\n"; + // for(auto i=0u;igetNumValues();i++){ + // std::cout<getValue(i)<<" "; + // } + // std::cout<<"\n"; + // + // std::cout<<"null\n"; + // for(auto i=0u;igetNumValues();i++){ + // std::cout<getNullChunk()->isNull(i)<<" "; + // } + // std::cout<<"\n"; } -void printSize(ColumnChunk* columnChunk){ - std::cout<<"Size\n"; - for(auto i=0u;igetNumValues();i++){ - std::cout<getValue(i)<<" "; - } - std::cout<<"\n"; +void printSize(ColumnChunk* columnChunk) { + // std::cout<<"Size\n"; + // for(auto i=0u;igetNumValues();i++){ + // std::cout<getValue(i)<<" "; + // } + // std::cout<<"\n"; } ListOffsetSizeInfo VarListColumn::getListOffsetSizeInfo(Transaction* transaction, @@ -323,7 +328,7 @@ ListOffsetSizeInfo VarListColumn::getListOffsetSizeInfo(Transaction* transaction endOffsetInNodeGroup); sizeColumn->scan(transaction, nodeGroupIdx, sizeColumnChunk.get(), startOffsetInNodeGroup, endOffsetInNodeGroup); - //TODO: SHOULD REMOVE + // TODO: SHOULD REMOVE printOffset(offsetColumnChunk.get()); printSize(sizeColumnChunk.get()); KU_ASSERT(offsetColumnChunk->getNumValues() == numOffsetsToRead); @@ -331,164 +336,40 @@ ListOffsetSizeInfo VarListColumn::getListOffsetSizeInfo(Transaction* transaction return {numOffsetsToRead, std::move(offsetColumnChunk), std::move(sizeColumnChunk)}; } - -VarListInfo VarListColumn::getVarListInfoForPrepareCommit(Transaction* transaction, - node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks, - const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, - const offset_to_row_idx_t& updateInfo){ - auto offsetUpdateChunkCollection = std::make_unique(); - auto offsetInsertChunkCollection = std::make_unique(); - auto sizeUpdateChunkCollection = std::make_unique(); - auto sizeInsertChunkCollection = std::make_unique(); - auto dataInsertUpdateChunkCollection = std::make_unique(); - auto dataListSize=dataColumn->getMetadata(nodeGroupIdx,transaction->getType()).numValues; - auto totalListNum=dataListSize; - for (auto& [_, rowIdx] : updateInfo) { - auto [chunkIdx, offsetInLocalChunk] = - LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); - auto localUpdateChunk = localUpdateChunks[chunkIdx]; - auto varListChunk = ku_dynamic_cast(localUpdateChunk); - auto offsetColumnChunk=ColumnChunk(std::move(dataType), 0, enableCompression); - offsetColumnChunk.resize(varListChunk->getNumValues()); - offsetColumnChunk.getNullChunk()->append(varListChunk->getNullChunk(),0,varListChunk->getNumValues()); - for(auto i=0u;igetNumValues();i++){ - auto listEndOffset=varListChunk->getListEndOffset(i); - totalListNum+=varListChunk->getListLen(i); - offsetColumnChunk.setValue(dataListSize+listEndOffset,i); - offsetColumnChunk.getNullChunk()->setNull(i,varListChunk->getNullChunk()->isNull(i)); - } - KU_ASSERT(chunkIdx==offsetUpdateChunkCollection->size()); - offsetUpdateChunkCollection->push_back(&offsetColumnChunk); - sizeUpdateChunkCollection->push_back(varListChunk->getSizeColumnChunk()); - dataInsertUpdateChunkCollection->push_back(varListChunk->getDataColumnChunk()); - } - for (auto& [offset, rowIdx] : insertInfo) { - auto [chunkIdx, offsetInLocalChunk] = - LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); - auto localInsertChunk = localInsertChunks[chunkIdx]; - auto varListChunk = ku_dynamic_cast(localInsertChunk); - //TODO: SHOULD REMOVE - std::cout<<"original info\n"; - varListChunk->showInfo(); - auto offsetColumnChunk=ColumnChunk(*common::LogicalType::UINT64(), 0, enableCompression); - offsetColumnChunk.resize(varListChunk->getNumValues()); - offsetColumnChunk.getNullChunk()->append(varListChunk->getNullChunk(),0,varListChunk->getNumValues()); - for(auto i=0u;igetNumValues();i++){ - auto listEndOffset=varListChunk->getListEndOffset(i); - totalListNum+=varListChunk->getListLen(i); - offsetColumnChunk.setValue(dataListSize+listEndOffset,i); - offsetColumnChunk.getNullChunk()->setNull(i,varListChunk->getNullChunk()->isNull(i)); - } - //TODO: SHOULD REMOVE - std::cout<<"rewrite after\n"; - printOffset(&offsetColumnChunk); - KU_ASSERT(chunkIdx==offsetInsertChunkCollection->size()); - offsetInsertChunkCollection->push_back(&offsetColumnChunk); - sizeInsertChunkCollection->push_back(varListChunk->getSizeColumnChunk()); - dataInsertUpdateChunkCollection->push_back(varListChunk->getDataColumnChunk()); - } - return {totalListNum, std::move(dataInsertUpdateChunkCollection), std::move(sizeUpdateChunkCollection), std::move(sizeInsertChunkCollection), std::move(offsetUpdateChunkCollection), std::move(offsetInsertChunkCollection)}; -} - -bool VarListColumn::canDataInPlaceCommit(Transaction* transaction, node_group_idx_t nodeGroupIdx, const ChunkCollection& localChunks, const offset_t totalListNum){ - auto dataMeta=dataColumn->getMetadata(nodeGroupIdx,transaction->getType()); - if(dataColumn->isMaxOffsetOutOfPagesCapacity(dataMeta,totalListNum)){ - return false; - } - if (dataMeta.compMeta.canAlwaysUpdateInPlace()) { - return true; - } - for (auto localChunk : localChunks) { - for(auto i=0u;igetNumValues();i++){ - if (!dataMeta.compMeta.canUpdateInPlace( - localChunk->getData(), i, VarListType::getChildType(&dataType)->getPhysicalType())) { - return false; - } - } - } - return true; -} - -void VarListColumn::commitOffsetLocalChunkOutOfPlace(Transaction* transaction, node_group_idx_t nodeGroupIdx, - bool isNewNodeGroup, const ChunkCollection& localInsertChunks, - const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, - const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) { - //out of place for offset column - auto columnChunk = ColumnChunkFactory::createColumnChunk(*LogicalType::UINT64(), enableCompression, common::StorageConstants::NODE_GROUP_SIZE); - if (isNewNodeGroup) { - KU_ASSERT(updateInfo.empty() && deleteInfo.empty()); - // Apply inserts from the local chunk. - applyLocalChunkToColumnChunk(localUpdateChunks, columnChunk.get(), updateInfo); - } else { - // First, scan the whole column chunk from persistent storage. - Column::scan(transaction, nodeGroupIdx, columnChunk.get()); - //TODO: SHOULD REMOVE - std::cout<<"apply before\n"; - printOffset(columnChunk.get()); - // Then, apply updates from the local chunk. - //TODO: FIX HERE - applyLocalChunkToColumnChunk(localUpdateChunks, columnChunk.get(), updateInfo); - // Lastly, apply inserts from the local chunk. - //TODO: SHOULD REMOVE - applyLocalChunkToColumnChunk(localInsertChunks, columnChunk.get(), insertInfo); - std::cout<<"apply after\n"; - printOffset(columnChunk.get()); - if (columnChunk->getNullChunk()) { - // Set nulls based on deleteInfo. - for (auto offsetInChunk : deleteInfo) { - columnChunk->getNullChunk()->setNull(offsetInChunk, true /* isNull */); - } - } - } - columnChunk->finalize(); - KU_ASSERT(columnChunk->sanityCheck()); - Column::append(columnChunk.get(), nodeGroupIdx); -} - -void VarListColumn::dataLocalChunkInPlaceCommit(node_group_idx_t nodeGroupIdx, - const ChunkCollection& localChunks) { - auto currentIndex=dataColumn->getMetadata(nodeGroupIdx, TransactionType::WRITE).numValues; - for(auto localChunk:localChunks){ - dataColumn->write(nodeGroupIdx, currentIndex, localChunk, 0, localChunk->getNumValues()); - } -} - -void VarListColumn::dataLocalChunkOutPlaceCommit(Transaction* transaction,node_group_idx_t nodeGroupIdx, - const ChunkCollection& localChunks) { - auto columnChunk = dataColumn->getEmptyChunkForCommit(common::StorageConstants::NODE_GROUP_SIZE); - dataColumn->scan(transaction, nodeGroupIdx, columnChunk.get()); - for(auto localChunk:localChunks){ - columnChunk->append(localChunk, 0, localChunk->getNumValues()); - } - columnChunk->finalize(); - dataColumn->append(columnChunk.get(), nodeGroupIdx); -} - void VarListColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) { + // TODO(Jiamin): should consider delete info, update the null in localchunk auto currentNumNodeGroups = metadataDA->getNumElements(transaction->getType()); auto isNewNodeGroup = nodeGroupIdx >= currentNumNodeGroups; if (isNewNodeGroup) { - // If this is a new node group, updateInfo should be empty. We should perform out-of-place - // commit with a new column chunk. commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, localInsertChunks, insertInfo, localUpdateChunks, updateInfo, deleteInfo); } else { - auto varListInfo= getVarListInfoForPrepareCommit(transaction,nodeGroupIdx,localInsertChunks,insertInfo,localUpdateChunks,updateInfo); - //offset need to rewrite, always do out-of place commit - commitOffsetLocalChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, - *varListInfo.offsetInsertChunkCollection, insertInfo, *varListInfo.offsetUpdateChunkCollection, updateInfo, - deleteInfo); - //size column can reuse the prepare commit - sizeColumn->prepareCommitForChunk(transaction, nodeGroupIdx, *varListInfo.sizeInsertChunkCollection, - insertInfo, *varListInfo.sizeUpdateChunkCollection, updateInfo, deleteInfo); - //data column need to rewrite, we always append or should we rewrite the insert info? - if(canDataInPlaceCommit(transaction,nodeGroupIdx,*varListInfo.dataInsertUpdateChunkCollection,varListInfo.totalListNum)) { - dataLocalChunkInPlaceCommit(nodeGroupIdx, *varListInfo.dataInsertUpdateChunkCollection); - } else { - dataLocalChunkOutPlaceCommit(transaction,nodeGroupIdx, *varListInfo.dataInsertUpdateChunkCollection); + for (auto& [offsetInDstChunk, rowIdx] : updateInfo) { + auto [chunkIdx, offsetInLocalChunk] = + LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); + auto localUpdateChunk = localUpdateChunks[chunkIdx]; + std::vector dstOffsets; + dstOffsets.push_back(offsetInDstChunk); + auto tmpDataColumnChunk = ColumnChunkFactory::createColumnChunk( + *dataType.copy(), enableCompression, localUpdateChunk->getCapacity()); + tmpDataColumnChunk->append(localUpdateChunk, 0, localUpdateChunk->getNumValues()); + prepareCommitForChunk(transaction, nodeGroupIdx, dstOffsets, tmpDataColumnChunk.get(), + offsetInLocalChunk); + } + for (auto& [offsetInDstChunk, rowIdx] : insertInfo) { + auto [chunkIdx, offsetInLocalChunk] = + LocalChunkedGroupCollection::getChunkIdxAndOffsetInChunk(rowIdx); + auto localInsertChunk = localInsertChunks[chunkIdx]; + std::vector dstOffsets; + dstOffsets.push_back(offsetInDstChunk); + auto tmpDataColumnChunk = ColumnChunkFactory::createColumnChunk( + *dataType.copy(), enableCompression, localInsertChunk->getCapacity()); + tmpDataColumnChunk->append(localInsertChunk, 0, localInsertChunk->getNumValues()); + prepareCommitForChunk(transaction, nodeGroupIdx, dstOffsets, tmpDataColumnChunk.get(), + offsetInLocalChunk); } } } @@ -501,32 +382,40 @@ void VarListColumn::prepareCommitForChunk(Transaction* transaction, node_group_i commitColumnChunkOutOfPlace( transaction, nodeGroupIdx, isNewNodeGroup, dstOffsets, chunk, startSrcOffset); } else { - //size column chunk commit - auto varListChunk= ku_dynamic_cast(chunk); - sizeColumn->prepareCommitForChunk(transaction, nodeGroupIdx, dstOffsets, varListChunk->getSizeColumnChunk(), startSrcOffset); - //data column chunk commit - auto dataColumnSize=dataColumn->getMetadata(nodeGroupIdx,transaction->getType()).numValues; - auto dataColumnChunk=varListChunk->getDataColumnChunk(); - auto appendListNum=std::min(chunk->getNumValues(),(uint64_t)dstOffsets.size()); - auto startVarListOffset = varListChunk->getListStartOffset(startSrcOffset); - std::vector dstOffsetsInDataColumn; - for(auto i=0u;iprepareCommitForChunk(transaction,nodeGroupIdx,dstOffsetsInDataColumn,dataColumnChunk,startVarListOffset); - //offset column chunk commit - //update offset from the data chunk info;since we append the data in the endof data column, we need to plus the original dataColumn Size - for(auto i=0u;igetListEndOffset(startSrcOffset+i); - varListChunk->setValue(dataColumnSize+listEndOffset,startSrcOffset+i); + // size column chunk commit + auto varListChunk = ku_dynamic_cast(chunk); + sizeColumn->prepareCommitForChunk(transaction, nodeGroupIdx, dstOffsets, + varListChunk->getSizeColumnChunk(), startSrcOffset); + // data column chunk commit + auto dataColumnSize = + dataColumn->getMetadata(nodeGroupIdx, transaction->getType()).numValues; + auto dataColumnChunk = varListChunk->getDataColumnChunk(); + auto appendListNum = std::min(chunk->getNumValues(), (uint64_t)dstOffsets.size()); + auto appendListDataNum = 0u; + auto startVarListOffset = varListChunk->getListStartOffset(startSrcOffset); + std::vector dstOffsetsInDataColumn; + for (auto i = 0u; i < appendListNum; i++) { + auto appendLen = varListChunk->getListLen(startSrcOffset + i); + for (auto j = 0u; j < appendLen; j++) { + dstOffsetsInDataColumn.push_back(dataColumnSize + appendListDataNum + j); + } + appendListDataNum += appendLen; } + dataColumn->prepareCommitForChunk( + transaction, nodeGroupIdx, dstOffsetsInDataColumn, dataColumnChunk, startVarListOffset); + // offset column chunk commit + // update offset from the data chunk info;since we append the data in the endof data column, + // we need to plus the original dataColumn Size auto offsetChunkMeta = getMetadata(nodeGroupIdx, transaction->getType()); - auto offsetColumnChunk = ColumnChunkFactory::createColumnChunk(*LogicalType::UINT64(), enableCompression, offsetChunkMeta.numValues + dstOffsets.size()); + auto offsetColumnChunk = ColumnChunkFactory::createColumnChunk(*dataType.copy(), + enableCompression, 4 * std::bit_ceil(offsetChunkMeta.numValues + dstOffsets.size())); Column::scan(transaction, nodeGroupIdx, offsetColumnChunk.get()); - for (auto i = 0u; i < dstOffsets.size(); i++) { - offsetColumnChunk->write(chunk, startSrcOffset + i, dstOffsets[i], 1 /* numValues */); + for (auto i = 0u; i < appendListNum; i++) { + auto listEndOffset = varListChunk->getListEndOffset(startSrcOffset + i); + auto isNull = varListChunk->getNullChunk()->isNull(startSrcOffset + i); + offsetColumnChunk->setValue(dataColumnSize + listEndOffset, dstOffsets[i]); + offsetColumnChunk->getNullChunk()->setNull(dstOffsets[i], isNull); } - offsetColumnChunk->finalize(); Column::append(offsetColumnChunk.get(), nodeGroupIdx); } } diff --git a/src/storage/store/var_list_column_chunk.cpp b/src/storage/store/var_list_column_chunk.cpp index 752011c7fef..2862d1d3afe 100644 --- a/src/storage/store/var_list_column_chunk.cpp +++ b/src/storage/store/var_list_column_chunk.cpp @@ -1,11 +1,11 @@ #include "storage/store/var_list_column_chunk.h" +#include + #include "common/cast.h" #include "common/data_chunk/sel_vector.h" #include "common/types/value/value.h" #include "storage/store/column_chunk.h" - -#include using namespace kuzu::common; namespace kuzu { @@ -51,18 +51,17 @@ bool VarListColumnChunk::isOffsetSortedAscending(uint64_t startPos, uint64_t end return true; } - -bool VarListColumnChunk::asendingRatio() const{ +bool VarListColumnChunk::asendingRatio() const { offset_t prevEndOffset = getListStartOffset(0); - uint32_t asendingNum=0; - uint32_t threshold=numValues/2; + uint32_t asendingNum = 0; + uint32_t threshold = numValues / 2; for (auto i = 0u; i < numValues; i++) { offset_t currentEndOffset = getListEndOffset(i); auto length = getListLen(i); prevEndOffset += length; if (currentEndOffset == prevEndOffset) { asendingNum++; - if(asendingNum>=threshold){ + if (asendingNum >= threshold) { return true; } } @@ -70,10 +69,9 @@ bool VarListColumnChunk::asendingRatio() const{ return false; } - void VarListColumnChunk::append( ColumnChunk* other, offset_t startPosInOtherChunk, uint32_t numValuesToAppend) { - checkOrder=false; + checkOrder = false; auto otherListChunk = ku_dynamic_cast(other); nullChunk->append(other->getNullChunk(), startPosInOtherChunk, numValuesToAppend); sizeColumnChunk->getNullChunk()->append( @@ -90,11 +88,12 @@ void VarListColumnChunk::append( auto startOffset = otherListChunk->getListStartOffset(startPosInOtherChunk + i); auto appendLen = otherListChunk->getListLen(startPosInOtherChunk + i); varListDataColumnChunk->dataColumnChunk->append( - otherListChunk->varListDataColumnChunk->dataColumnChunk.get(), startOffset, - appendLen); + otherListChunk->varListDataColumnChunk->dataColumnChunk.get(), startOffset, appendLen); } KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); KU_ASSERT(nullChunk->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->getNullChunk()->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->sanityCheck()); } void VarListColumnChunk::resetToEmpty() { @@ -139,6 +138,8 @@ void VarListColumnChunk::append(ValueVector* vector, SelectionVector& selVector) numValues += numToAppend; KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); KU_ASSERT(nullChunk->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->getNullChunk()->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->sanityCheck()); } void VarListColumnChunk::appendNullList() { @@ -169,6 +170,8 @@ void VarListColumnChunk::lookup( } KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); KU_ASSERT(nullChunk->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->getNullChunk()->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->sanityCheck()); } void VarListColumnChunk::write( @@ -205,6 +208,8 @@ void VarListColumnChunk::write( } KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); KU_ASSERT(nullChunk->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->getNullChunk()->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->sanityCheck()); } void VarListColumnChunk::write( @@ -232,6 +237,7 @@ void VarListColumnChunk::write( } KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); KU_ASSERT(nullChunk->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->sanityCheck()); } void VarListColumnChunk::write(ColumnChunk* srcChunk, offset_t srcOffsetInChunk, @@ -260,6 +266,7 @@ void VarListColumnChunk::write(ColumnChunk* srcChunk, offset_t srcOffsetInChunk, } KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); KU_ASSERT(nullChunk->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->sanityCheck()); } void VarListColumnChunk::copy(ColumnChunk* srcChunk, offset_t srcOffsetInChunk, @@ -301,26 +308,27 @@ void VarListColumnChunk::resetOffset() { } void VarListColumnChunk::finalize() { - //rewrite the column chunk for better scanning performance + // rewrite the column chunk for better scanning performance auto newColumnChunk = ColumnChunkFactory::createColumnChunk( std::move(*dataType.copy()), enableCompression, capacity); uint64_t totalListLen = varListDataColumnChunk->getNumValues(); uint64_t resizeThreshold = varListDataColumnChunk->capacity / 2; - //if the list is not very long, we do not need to rewrite + // if the list is not very long, we do not need to rewrite if (totalListLen < resizeThreshold) { return; } - //if we do not trigger random write, we do not need to rewrite - if(checkOrder){ + // if we do not trigger random write, we do not need to rewrite + if (checkOrder) { return; } - //if the list is in ascending order, we do not need to rewrite - if(isOffsetSortedAscending(0, numValues)){ + // if the list is in ascending order, we do not need to rewrite + if (isOffsetSortedAscending(0, numValues)) { return; } - //if the list is not very unordered, we do not need to rewrite - // TODO(Jimain): we need to rewrite the scan logical to support the case that the list is not very unordered - if(asendingRatio()){ + // if the list is not very unordered, we do not need to rewrite + // TODO(Jimain): we need to rewrite the scan logical to support the case that the list is not + // very unordered + if (asendingRatio()) { return; } auto newVarListChunk = ku_dynamic_cast(newColumnChunk.get()); @@ -346,7 +354,8 @@ void VarListColumnChunk::finalize() { } currentIndex++; } - KU_ASSERT(sizeColumnChunk->getNumValues()==numValues); + KU_ASSERT(sizeColumnChunk->getNumValues() == numValues); + KU_ASSERT(sizeColumnChunk->getNullChunk()->getNumValues() == numValues); // Move offsets, null, data from newVarListChunk to this column chunk. And release indices. resetFromOtherChunk(newVarListChunk); } @@ -360,19 +369,19 @@ void VarListColumnChunk::resetFromOtherChunk(VarListColumnChunk* other) { } void VarListColumnChunk::showInfo() { - std::cout<<"offset \n"; - for (auto i = 0u; i < numValues; i++) { - std::cout<(i)<<" "; - } - - std::cout<<"\nsize\n"; - for(auto j=0;jgetValue(j)<<" "; - } - std::cout<<"\nnull\n"; - for(auto j=0;jisNull(j)<<" "; - } + // std::cout<<"offset \n"; + // for (auto i = 0u; i < numValues; i++) { + // std::cout<(i)<<" "; + // } + // + // std::cout<<"\nsize\n"; + // for(auto j=0;jgetValue(j)<<" "; + // } + // std::cout<<"\nnull\n"; + // for(auto j=0;jisNull(j)<<" "; + // } } } // namespace storage } // namespace kuzu