From f6f8ebbf52ef43d65575fa03d8ebe41e75a6d276 Mon Sep 17 00:00:00 2001 From: Guodong Jin Date: Fri, 23 Dec 2022 15:18:52 -0500 Subject: [PATCH] simplify list sync state --- ...j_list_extend.h => scan_rel_table_lists.h} | 13 ++- .../processor/result/factorized_table.h | 12 +-- .../storage_structure/lists/list_sync_state.h | 42 +++++---- .../storage/storage_structure/lists/lists.h | 15 ++-- src/processor/mapper/map_extend.cpp | 9 +- src/processor/operator/generic_extend.cpp | 2 +- .../operator/scan_list/CMakeLists.txt | 2 +- ...st_extend.cpp => scan_rel_table_lists.cpp} | 11 ++- .../var_length_adj_list_extend.cpp | 2 +- src/processor/result/factorized_table.cpp | 4 +- .../lists/list_sync_state.cpp | 15 ++-- src/storage/storage_structure/lists/lists.cpp | 86 ++++++++----------- 12 files changed, 109 insertions(+), 104 deletions(-) rename src/include/processor/operator/scan_list/{adj_list_extend.h => scan_rel_table_lists.h} (68%) rename src/processor/operator/scan_list/{adj_list_extend.cpp => scan_rel_table_lists.cpp} (84%) diff --git a/src/include/processor/operator/scan_list/adj_list_extend.h b/src/include/processor/operator/scan_list/scan_rel_table_lists.h similarity index 68% rename from src/include/processor/operator/scan_list/adj_list_extend.h rename to src/include/processor/operator/scan_list/scan_rel_table_lists.h index 6fe6de0659..5f31070c19 100644 --- a/src/include/processor/operator/scan_list/adj_list_extend.h +++ b/src/include/processor/operator/scan_list/scan_rel_table_lists.h @@ -6,24 +6,23 @@ namespace kuzu { namespace processor { -class ListExtendAndScanRelProperties : public BaseExtendAndScanRelProperties { +class ScanRelTableLists : public BaseExtendAndScanRelProperties { public: - ListExtendAndScanRelProperties(const DataPos& inNodeIDVectorPos, - const DataPos& outNodeIDVectorPos, vector outPropertyVectorsPos, Lists* adjList, - vector propertyLists, unique_ptr child, uint32_t id, - const string& paramsString) + ScanRelTableLists(const DataPos& inNodeIDVectorPos, const DataPos& outNodeIDVectorPos, + vector outPropertyVectorsPos, Lists* adjList, vector propertyLists, + unique_ptr child, uint32_t id, const string& paramsString) : BaseExtendAndScanRelProperties{PhysicalOperatorType::LIST_EXTEND, inNodeIDVectorPos, outNodeIDVectorPos, std::move(outPropertyVectorsPos), std::move(child), id, paramsString}, adjList{adjList}, propertyLists{std::move(propertyLists)} {} - ~ListExtendAndScanRelProperties() override = default; + ~ScanRelTableLists() override = default; void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override; bool getNextTuplesInternal() override; inline unique_ptr clone() override { - return make_unique(inNodeIDVectorPos, outNodeIDVectorPos, + return make_unique(inNodeIDVectorPos, outNodeIDVectorPos, outPropertyVectorsPos, adjList, propertyLists, children[0]->clone(), id, paramsString); } diff --git a/src/include/processor/result/factorized_table.h b/src/include/processor/result/factorized_table.h index 9660bfe4e4..b7f59e1cbb 100644 --- a/src/include/processor/result/factorized_table.h +++ b/src/include/processor/result/factorized_table.h @@ -64,11 +64,13 @@ class DataBlockCollection { DataBlockCollection(uint32_t numBytesPerTuple, uint32_t numTuplesPerBlock) : numBytesPerTuple{numBytesPerTuple}, numTuplesPerBlock{numTuplesPerBlock} {} - inline void append(unique_ptr otherBlock) { blocks.push_back(move(otherBlock)); } + inline void append(unique_ptr otherBlock) { + blocks.push_back(std::move(otherBlock)); + } inline void append(vector> otherBlocks) { move(begin(otherBlocks), end(otherBlocks), back_inserter(blocks)); } - inline void append(unique_ptr other) { append(move(other->blocks)); } + inline void append(unique_ptr other) { append(std::move(other->blocks)); } inline bool isEmpty() { return blocks.empty(); } inline vector>& getBlocks() { return blocks; } inline DataBlock* getBlock(uint32_t blockIdx) { return blocks[blockIdx].get(); } @@ -121,7 +123,7 @@ class FactorizedTableSchema { FactorizedTableSchema(const FactorizedTableSchema& other); explicit FactorizedTableSchema(vector> columns) - : columns{move(columns)} {} + : columns{std::move(columns)} {} void appendColumn(unique_ptr column); @@ -240,7 +242,7 @@ class FactorizedTable { // inside overflowFileOfInMemList. void copyToInMemList(uint32_t colIdx, vector& tupleIdxesToRead, uint8_t* data, NullMask* nullMask, uint64_t startElemPosInList, DiskOverflowFile* overflowFileOfInMemList, - DataType type, NodeIDCompressionScheme* nodeIDCompressionScheme) const; + const DataType& type, NodeIDCompressionScheme* nodeIDCompressionScheme) const; void clear(); int64_t findValueInFlatColumn(uint64_t colIdx, int64_t value) const; @@ -292,7 +294,7 @@ class FactorizedTable { readFlatColToUnflatVector(tuplesToRead, colIdx, vector, numTuplesToRead); } static void copyOverflowIfNecessary( - uint8_t* dst, uint8_t* src, DataType type, DiskOverflowFile* diskOverflowFile); + uint8_t* dst, uint8_t* src, const DataType& type, DiskOverflowFile* diskOverflowFile); private: MemoryManager* memoryManager; diff --git a/src/include/storage/storage_structure/lists/list_sync_state.h b/src/include/storage/storage_structure/lists/list_sync_state.h index 5a7d68f447..863cd6252c 100644 --- a/src/include/storage/storage_structure/lists/list_sync_state.h +++ b/src/include/storage/storage_structure/lists/list_sync_state.h @@ -11,8 +11,8 @@ namespace kuzu { namespace storage { enum class ListSourceStore : uint8_t { - PersistentStore = 0, - ListsUpdateStore = 1, + PERSISTENT_STORE = 0, + UPDATE_STORE = 1, }; // ListSyncState holds the data that is required to synchronize reading from multiple Lists that @@ -28,12 +28,16 @@ class ListSyncState { public: ListSyncState() { reset(); }; - inline void setNumValuesInList(uint64_t numValuesInList_) { - this->numValuesInList = numValuesInList_; - } - inline void setBoundNodeOffset(node_offset_t boundNodeOffset_) { + inline void init(node_offset_t boundNodeOffset_, list_header_t listHeader_, + uint64_t numValuesInUpdateStore_, uint64_t numValuesInPersistentStore_, + ListSourceStore sourceStore_) { this->boundNodeOffset = boundNodeOffset_; + this->listHeader = listHeader_; + this->numValuesInUpdateStore = numValuesInUpdateStore_; + this->numValuesInPersistentStore = numValuesInPersistentStore_; + this->sourceStore = sourceStore_; } + inline void setRangeToRead(uint32_t startIdx_, uint32_t numValuesToRead_) { this->startElemOffset = startIdx_; this->numValuesToRead = numValuesToRead_; @@ -43,27 +47,33 @@ class ListSyncState { inline uint32_t getEndElemOffset() const { return startElemOffset + numValuesToRead; } inline bool hasValidRangeToRead() const { return UINT32_MAX != startElemOffset; } inline uint32_t getNumValuesToRead() const { return numValuesToRead; } - inline uint64_t getNumValuesInList() const { return numValuesInList; } inline ListSourceStore getListSourceStore() const { return sourceStore; } - inline void setSourceStore(ListSourceStore sourceStore) { this->sourceStore = sourceStore; } - inline void setDataToReadFromUpdateStore(bool dataToReadFromUpdateStore_) { - dataToReadFromUpdateStore = dataToReadFromUpdateStore_; - } - inline bool hasDataToReadFromUpdateStore() const { return dataToReadFromUpdateStore; } inline list_header_t getListHeader() const { return listHeader; } - inline void setListHeader(list_header_t listHeader_) { listHeader = listHeader_; } + inline uint32_t getNumValuesInList() { + return sourceStore == ListSourceStore::PERSISTENT_STORE ? numValuesInPersistentStore : + numValuesInUpdateStore; + } - bool hasMoreToRead(); + bool hasMoreAndSwitchSourceIfNecessary(); void reset(); +private: + inline bool hasMoreLeftInList() { + return (startElemOffset + numValuesToRead) < getNumValuesInList(); + } + inline void switchToUpdateStore() { + sourceStore = ListSourceStore::UPDATE_STORE; + startElemOffset = UINT32_MAX; + } + private: node_offset_t boundNodeOffset; list_header_t listHeader; + uint32_t numValuesInUpdateStore; + uint32_t numValuesInPersistentStore; uint32_t startElemOffset; uint32_t numValuesToRead; - uint64_t numValuesInList; ListSourceStore sourceStore; - bool dataToReadFromUpdateStore; }; } // namespace storage diff --git a/src/include/storage/storage_structure/lists/lists.h b/src/include/storage/storage_structure/lists/lists.h index cf06addc75..e800101c0e 100644 --- a/src/include/storage/storage_structure/lists/lists.h +++ b/src/include/storage/storage_structure/lists/lists.h @@ -88,10 +88,11 @@ class Lists : public BaseColumnOrList { Lists(const StorageStructureIDAndFName& storageStructureIDAndFName, const DataType& dataType, const size_t& elementSize, shared_ptr headers, BufferManager& bufferManager, bool isInMemory, WAL* wal, ListsUpdateStore* listsUpdateStore) - : Lists{storageStructureIDAndFName, dataType, elementSize, move(headers), bufferManager, - true /*hasNULLBytes*/, isInMemory, wal, listsUpdateStore} {}; + : Lists{storageStructureIDAndFName, dataType, elementSize, std::move(headers), + bufferManager, true /*hasNULLBytes*/, isInMemory, wal, listsUpdateStore} {}; inline ListsMetadata& getListsMetadata() { return metadata; }; inline shared_ptr getHeaders() const { return headers; }; + // TODO(Guodong): change the input to header. inline uint64_t getNumElementsFromListHeader(node_offset_t nodeOffset) const { auto header = headers->getHeader(nodeOffset); return ListHeaders::isALargeList(header) ? @@ -150,7 +151,7 @@ class Lists : public BaseColumnOrList { : BaseColumnOrList{storageStructureIDAndFName, dataType, elementSize, bufferManager, hasNULLBytes, isInMemory, wal}, storageStructureIDAndFName{storageStructureIDAndFName}, - metadata{storageStructureIDAndFName, &bufferManager, wal}, headers{move(headers)}, + metadata{storageStructureIDAndFName, &bufferManager, wal}, headers{std::move(headers)}, listsUpdateStore{listsUpdateStore} {}; private: @@ -171,7 +172,7 @@ class PropertyListsWithOverflow : public Lists { const DataType& dataType, shared_ptr headers, BufferManager& bufferManager, bool isInMemory, WAL* wal, ListsUpdateStore* listsUpdateStore) : Lists{storageStructureIDAndFName, dataType, Types::getDataTypeSize(dataType), - move(headers), bufferManager, isInMemory, wal, listsUpdateStore}, + std::move(headers), bufferManager, isInMemory, wal, listsUpdateStore}, diskOverflowFile{storageStructureIDAndFName, bufferManager, isInMemory, wal} {} private: @@ -253,6 +254,8 @@ class AdjLists : public Lists { const shared_ptr& valueVector, ListHandle& listHandle) override; void readFromListsUpdateStore( ListSyncState& listSyncState, const shared_ptr& valueVector); + void readFromListsPersistentStore( + ListHandle& listHandle, const shared_ptr& valueVector); private: NodeIDCompressionScheme nodeIDCompressionScheme; @@ -264,8 +267,8 @@ class RelIDList : public Lists { RelIDList(const StorageStructureIDAndFName& storageStructureIDAndFName, const DataType& dataType, const size_t& elementSize, shared_ptr headers, BufferManager& bufferManager, bool isInMemory, WAL* wal, ListsUpdateStore* listsUpdateStore) - : Lists{storageStructureIDAndFName, dataType, elementSize, headers, bufferManager, - isInMemory, wal, listsUpdateStore} {} + : Lists{storageStructureIDAndFName, dataType, elementSize, std::move(headers), + bufferManager, isInMemory, wal, listsUpdateStore} {} void setDeletedRelsIfNecessary(Transaction* transaction, ListSyncState& listSyncState, const shared_ptr& relIDVector) override; unordered_set getDeletedRelOffsetsInListForNodeOffset(node_offset_t nodeOffset); diff --git a/src/processor/mapper/map_extend.cpp b/src/processor/mapper/map_extend.cpp index 8c6d4465d8..705d83e458 100644 --- a/src/processor/mapper/map_extend.cpp +++ b/src/processor/mapper/map_extend.cpp @@ -2,7 +2,7 @@ #include "processor/mapper/plan_mapper.h" #include "processor/operator/generic_extend.h" #include "processor/operator/scan_column/adj_column_extend.h" -#include "processor/operator/scan_list/adj_list_extend.h" +#include "processor/operator/scan_list/scan_rel_table_lists.h" #include "processor/operator/var_length_extend/var_length_adj_list_extend.h" #include "processor/operator/var_length_extend/var_length_column_extend.h" @@ -133,10 +133,9 @@ unique_ptr PlanMapper::mapLogicalExtendToPhysical( } else { auto propertyLists = populatePropertyLists( boundNodeTableID, relTableID, direction, extend->getProperties(), relsStore); - return make_unique(inNodeIDVectorPos, - outNodeIDVectorPos, std::move(outPropertyVectorsPos), adjList, - std::move(propertyLists), std::move(prevOperator), getOperatorID(), - extend->getExpressionsForPrinting()); + return make_unique(inNodeIDVectorPos, outNodeIDVectorPos, + std::move(outPropertyVectorsPos), adjList, std::move(propertyLists), + std::move(prevOperator), getOperatorID(), extend->getExpressionsForPrinting()); } } } else { // map to generic extend diff --git a/src/processor/operator/generic_extend.cpp b/src/processor/operator/generic_extend.cpp index a426b50716..6bad0ecb2c 100644 --- a/src/processor/operator/generic_extend.cpp +++ b/src/processor/operator/generic_extend.cpp @@ -57,7 +57,7 @@ bool AdjAndPropertyCollection::scanLists(const shared_ptr& inVector if (currentListIdx != UINT32_MAX) { // check current list auto currentAdjList = adjCollection->lists[currentListIdx]; auto currentAdjListHandle = adjCollection->listHandles[currentListIdx].get(); - if (currentAdjListHandle->listSyncState.hasMoreToRead()) { + if (currentAdjListHandle->listSyncState.hasMoreAndSwitchSourceIfNecessary()) { // scan current adjList currentAdjList->readValues(outNodeVector, *currentAdjListHandle); scanPropertyList(currentListIdx, outPropertyVectors, transaction); diff --git a/src/processor/operator/scan_list/CMakeLists.txt b/src/processor/operator/scan_list/CMakeLists.txt index bb703eceb2..6d5715cebf 100644 --- a/src/processor/operator/scan_list/CMakeLists.txt +++ b/src/processor/operator/scan_list/CMakeLists.txt @@ -1,6 +1,6 @@ add_library(kuzu_processor_operator_scan_list OBJECT - adj_list_extend.cpp + scan_rel_table_lists.cpp ) set(ALL_OBJECT_FILES diff --git a/src/processor/operator/scan_list/adj_list_extend.cpp b/src/processor/operator/scan_list/scan_rel_table_lists.cpp similarity index 84% rename from src/processor/operator/scan_list/adj_list_extend.cpp rename to src/processor/operator/scan_list/scan_rel_table_lists.cpp index 73a0e3b4d6..81af69812e 100644 --- a/src/processor/operator/scan_list/adj_list_extend.cpp +++ b/src/processor/operator/scan_list/scan_rel_table_lists.cpp @@ -1,10 +1,9 @@ -#include "processor/operator/scan_list/adj_list_extend.h" +#include "processor/operator/scan_list/scan_rel_table_lists.h" namespace kuzu { namespace processor { -void ListExtendAndScanRelProperties::initLocalStateInternal( - ResultSet* resultSet, ExecutionContext* context) { +void ScanRelTableLists::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { BaseExtendAndScanRelProperties::initLocalStateInternal(resultSet, context); syncState = make_unique(); adjListHandle = make_shared(*syncState); @@ -13,8 +12,8 @@ void ListExtendAndScanRelProperties::initLocalStateInternal( } } -bool ListExtendAndScanRelProperties::getNextTuplesInternal() { - if (adjListHandle->listSyncState.hasMoreToRead()) { +bool ScanRelTableLists::getNextTuplesInternal() { + if (adjListHandle->listSyncState.hasMoreAndSwitchSourceIfNecessary()) { adjList->readValues(outNodeIDVector, *adjListHandle); } else { do { @@ -41,7 +40,7 @@ bool ListExtendAndScanRelProperties::getNextTuplesInternal() { return true; } -void ListExtendAndScanRelProperties::scanPropertyLists() { +void ScanRelTableLists::scanPropertyLists() { for (auto i = 0u; i < propertyLists.size(); ++i) { outPropertyVectors[i]->resetOverflowBuffer(); propertyLists[i]->readValues(outPropertyVectors[i], *propertyListHandles[i]); diff --git a/src/processor/operator/var_length_extend/var_length_adj_list_extend.cpp b/src/processor/operator/var_length_extend/var_length_adj_list_extend.cpp index ae527b8674..53a29cf2df 100644 --- a/src/processor/operator/var_length_extend/var_length_adj_list_extend.cpp +++ b/src/processor/operator/var_length_extend/var_length_adj_list_extend.cpp @@ -91,7 +91,7 @@ bool VarLengthAdjListExtend::addDFSLevelToStackIfParentExtends(uint64_t parent, bool VarLengthAdjListExtend::getNextBatchOfNbrNodes( shared_ptr& dfsLevel) const { - if (dfsLevel->listHandle->listSyncState.hasMoreToRead()) { + if (dfsLevel->listHandle->listSyncState.hasMoreAndSwitchSourceIfNecessary()) { ((AdjLists*)storage)->readValues(dfsLevel->children, *dfsLevel->listHandle); return true; } diff --git a/src/processor/result/factorized_table.cpp b/src/processor/result/factorized_table.cpp index ea7496d557..38b7918f6a 100644 --- a/src/processor/result/factorized_table.cpp +++ b/src/processor/result/factorized_table.cpp @@ -269,7 +269,7 @@ void FactorizedTable::setNonOverflowColNull(uint8_t* nullBuffer, uint32_t colIdx void FactorizedTable::copyToInMemList(uint32_t colIdx, vector& tupleIdxesToRead, uint8_t* data, NullMask* nullMask, uint64_t startElemPosInList, - DiskOverflowFile* overflowFileOfInMemList, DataType type, + DiskOverflowFile* overflowFileOfInMemList, const DataType& type, NodeIDCompressionScheme* nodeIDCompressionScheme) const { auto column = tableSchema->getColumn(colIdx); assert(column->isFlat() == true); @@ -641,7 +641,7 @@ void FactorizedTable::readFlatColToUnflatVector( } void FactorizedTable::copyOverflowIfNecessary( - uint8_t* dst, uint8_t* src, DataType type, DiskOverflowFile* diskOverflowFile) { + uint8_t* dst, uint8_t* src, const DataType& type, DiskOverflowFile* diskOverflowFile) { switch (type.typeID) { case STRING: { ku_string_t* stringToWriteFrom = (ku_string_t*)src; diff --git a/src/storage/storage_structure/lists/list_sync_state.cpp b/src/storage/storage_structure/lists/list_sync_state.cpp index 4a5978e6b6..9e05a57135 100644 --- a/src/storage/storage_structure/lists/list_sync_state.cpp +++ b/src/storage/storage_structure/lists/list_sync_state.cpp @@ -3,11 +3,14 @@ namespace kuzu { namespace storage { -bool ListSyncState::hasMoreToRead() { - if (hasValidRangeToRead() && (startElemOffset + numValuesToRead != numValuesInList)) { +bool ListSyncState::hasMoreAndSwitchSourceIfNecessary() { + if (hasValidRangeToRead() && hasMoreLeftInList()) { + // Has more in the current source store. return true; } - if (dataToReadFromUpdateStore && sourceStore == ListSourceStore::PersistentStore) { + if (sourceStore == ListSourceStore::PERSISTENT_STORE && numValuesInUpdateStore > 0) { + // Switch from PERSISTENT_STORE to UPDATE_STORE. + switchToUpdateStore(); return true; } return false; @@ -17,9 +20,9 @@ void ListSyncState::reset() { boundNodeOffset = UINT64_MAX; startElemOffset = UINT32_MAX; numValuesToRead = UINT32_MAX; - numValuesInList = UINT64_MAX; - sourceStore = ListSourceStore::PersistentStore; - dataToReadFromUpdateStore = false; + numValuesInUpdateStore = 0; + numValuesInPersistentStore = 0; + sourceStore = ListSourceStore::PERSISTENT_STORE; } } // namespace storage diff --git a/src/storage/storage_structure/lists/lists.cpp b/src/storage/storage_structure/lists/lists.cpp index c118490e7d..14f3ae3dfd 100644 --- a/src/storage/storage_structure/lists/lists.cpp +++ b/src/storage/storage_structure/lists/lists.cpp @@ -15,7 +15,7 @@ namespace storage { // be containing information about the last portion of the last large list that was read). void Lists::readValues(const shared_ptr& valueVector, ListHandle& listHandle) { auto& listSyncState = listHandle.listSyncState; - if (listSyncState.getListSourceStore() == ListSourceStore::ListsUpdateStore) { + if (listSyncState.getListSourceStore() == ListSourceStore::UPDATE_STORE) { listsUpdateStore->readValues( storageStructureIDAndFName.storageStructureID.listFileID, listSyncState, valueVector); } else { @@ -65,33 +65,28 @@ void Lists::initListReadingState( node_offset_t nodeOffset, ListHandle& listHandle, TransactionType transactionType) { auto& listSyncState = listHandle.listSyncState; listSyncState.reset(); - listSyncState.setBoundNodeOffset(nodeOffset); auto isNewlyAddedNode = listsUpdateStore->isNewlyAddedNode( storageStructureIDAndFName.storageStructureID.listFileID, nodeOffset); - if (transactionType == TransactionType::READ_ONLY || !isNewlyAddedNode) { - listSyncState.setListHeader(headers->getHeader(nodeOffset)); + uint64_t numElementsInPersistentStore = 0, numElementsInUpdateStore = 0; + list_header_t listHeader; + if (transactionType == TransactionType::WRITE) { + numElementsInUpdateStore = listsUpdateStore->getNumInsertedRelsForNodeOffset( + storageStructureIDAndFName.storageStructureID.listFileID, nodeOffset); + // ListHeader is UINT32_MAX in two cases: (i) ListSyncState is not initialized; or (ii) + // the list of a new node is being scanned so there is no header for the new node. + listHeader = isNewlyAddedNode ? UINT32_MAX : headers->getHeader(nodeOffset); + numElementsInPersistentStore = + isNewlyAddedNode ? 0 : getNumElementsFromListHeader(nodeOffset); } else { - // ListHeader is UINT32_MAX in two cases: (i) ListSyncState is not initialized; or (ii) the - // list of a new node is being scanned so there is no header for the new node. - listSyncState.setListHeader(UINT32_MAX); + listHeader = headers->getHeader(nodeOffset); + numElementsInPersistentStore = getNumElementsFromListHeader(nodeOffset); } - auto numElementsInPersistentStore = - getNumElementsInPersistentStore(transactionType, nodeOffset); - auto numElementsInUpdateStore = - transactionType == WRITE ? - listsUpdateStore->getNumInsertedRelsForNodeOffset( - storageStructureIDAndFName.storageStructureID.listFileID, nodeOffset) : - 0; - listSyncState.setNumValuesInList(numElementsInPersistentStore == 0 ? - numElementsInUpdateStore : - numElementsInPersistentStore); - listSyncState.setDataToReadFromUpdateStore(numElementsInUpdateStore != 0); - // If there's no element is persistentStore and the listsUpdateStore is non-empty, - // we can skip reading from persistentStore and start reading from listsUpdateStore directly. - listSyncState.setSourceStore( - ((numElementsInPersistentStore == 0 && numElementsInUpdateStore > 0) || isNewlyAddedNode) ? - ListSourceStore::ListsUpdateStore : - ListSourceStore::PersistentStore); + // If there's no element is persistentStore, we can skip reading from persistentStore and start + // reading from listsUpdateStore directly. + auto sourceStore = numElementsInPersistentStore == 0 ? ListSourceStore::UPDATE_STORE : + ListSourceStore::PERSISTENT_STORE; + listSyncState.init(nodeOffset, listHeader, numElementsInUpdateStore, + numElementsInPersistentStore, sourceStore); } unique_ptr Lists::getInMemListWithDataFromUpdateStoreOnly( @@ -203,20 +198,10 @@ void ListPropertyLists::readFromSmallList( void AdjLists::readValues(const shared_ptr& valueVector, ListHandle& listHandle) { auto& listSyncState = listHandle.listSyncState; valueVector->state->selVector->resetSelectorToUnselected(); - if (listSyncState.getListSourceStore() == ListSourceStore::PersistentStore && - listSyncState.getStartElemOffset() + listSyncState.getNumValuesToRead() == - listSyncState.getNumValuesInList()) { - listSyncState.setSourceStore(ListSourceStore::ListsUpdateStore); - } - if (listSyncState.getListSourceStore() == ListSourceStore::ListsUpdateStore) { + if (listSyncState.getListSourceStore() == ListSourceStore::UPDATE_STORE) { readFromListsUpdateStore(listSyncState, valueVector); } else { - // If the startElemOffset is invalid, it means that we never read from the list. As a - // result, we need to reset the cursor and mapper. - if (listHandle.listSyncState.getStartElemOffset() == -1) { - listHandle.resetCursorMapper(metadata, numElementsPerPage); - } - readFromList(valueVector, listHandle); + readFromListsPersistentStore(listHandle, valueVector); } } @@ -283,8 +268,7 @@ void AdjLists::readFromLargeList( min((uint32_t)(listSyncState.getNumValuesInList() - nextPartBeginElemOffset), numElementsPerPage - (uint32_t)(nextPartBeginElemOffset % numElementsPerPage)); valueVector->state->initOriginalAndSelectedSize(numValuesToCopy); - listSyncState.setRangeToRead( - nextPartBeginElemOffset, valueVector->state->selVector->selectedSize); + listSyncState.setRangeToRead(nextPartBeginElemOffset, numValuesToCopy); // map logical pageIdx to physical pageIdx auto physicalPageId = listHandle.cursorAndMapper.mapper(listHandle.cursorAndMapper.cursor.pageIdx); @@ -319,27 +303,33 @@ void AdjLists::readFromSmallList( void AdjLists::readFromListsUpdateStore( ListSyncState& listSyncState, const shared_ptr& valueVector) { - if (listSyncState.getStartElemOffset() + listSyncState.getNumValuesToRead() == - listSyncState.getNumValuesInList() || - !listSyncState.hasValidRangeToRead()) { + assert(listSyncState.getListSourceStore() == ListSourceStore::UPDATE_STORE); + if (!listSyncState.hasValidRangeToRead()) { // We have read all values from persistent store or the persistent store is empty, we should // reset listSyncState to indicate ranges in listsUpdateStore and start // reading from it. - listSyncState.setNumValuesInList(listsUpdateStore->getNumInsertedRelsForNodeOffset( - storageStructureIDAndFName.storageStructureID.listFileID, - listSyncState.getBoundNodeOffset())); listSyncState.setRangeToRead( - 0, min(DEFAULT_VECTOR_CAPACITY, listSyncState.getNumValuesInList())); + 0, min(DEFAULT_VECTOR_CAPACITY, (uint64_t)listSyncState.getNumValuesInList())); } else { listSyncState.setRangeToRead(listSyncState.getEndElemOffset(), min(DEFAULT_VECTOR_CAPACITY, - listSyncState.getNumValuesInList() - listSyncState.getEndElemOffset())); + (uint64_t)listSyncState.getNumValuesInList() - listSyncState.getEndElemOffset())); } // Note that: we always store nbr node in the second column of factorizedTable. listsUpdateStore->readValues( storageStructureIDAndFName.storageStructureID.listFileID, listSyncState, valueVector); } +void AdjLists::readFromListsPersistentStore( + ListHandle& listHandle, const shared_ptr& valueVector) { + // If the startElemOffset is invalid, it means that we never read from the list. As a + // result, we need to reset the cursor and mapper. + if (!listHandle.listSyncState.hasValidRangeToRead()) { + listHandle.resetCursorMapper(metadata, numElementsPerPage); + } + readFromList(valueVector, listHandle); +} + // Note: this function will always be called right after scanRelID, so we have the // guarantee that the relIDVector is always unselected. void RelIDList::setDeletedRelsIfNecessary(Transaction* transaction, ListSyncState& listSyncState, @@ -348,14 +338,14 @@ void RelIDList::setDeletedRelsIfNecessary(Transaction* transaction, ListSyncStat // persistent store in a write transaction and the current nodeOffset has deleted rels in // persistent store. if (!transaction->isReadOnly() && - listSyncState.getListSourceStore() != ListSourceStore::ListsUpdateStore && + listSyncState.getListSourceStore() != ListSourceStore::UPDATE_STORE && listsUpdateStore->hasAnyDeletedRelsInPersistentStore( storageStructureIDAndFName.storageStructureID.listFileID, listSyncState.getBoundNodeOffset())) { relIDVector->state->selVector->resetSelectorToValuePosBuffer(); auto& selVector = relIDVector->state->selVector; auto nextSelectedPos = 0u; - for (sel_t pos = 0; pos < relIDVector->state->originalSize; ++pos) { + for (auto pos = 0; pos < relIDVector->state->originalSize; ++pos) { if (!listsUpdateStore->isRelDeletedInPersistentStore( storageStructureIDAndFName.storageStructureID.listFileID, listSyncState.getBoundNodeOffset(), relIDVector->getValue(pos))) {