From 7f69ff9bdbe16da01bc7749bfa7eac4511be4def Mon Sep 17 00:00:00 2001 From: Guodong Jin Date: Mon, 20 Nov 2023 07:16:50 -0500 Subject: [PATCH] scan csr local storage --- .../storage/local_storage/local_rel_table.h | 38 +++- src/include/storage/store/rel_table_data.h | 27 ++- .../operator/scan/scan_multi_rel_tables.cpp | 2 +- .../operator/scan/scan_rel_csr_columns.cpp | 2 +- src/storage/local_storage/local_rel_table.cpp | 203 +++++++++++++----- src/storage/store/rel_table.cpp | 3 +- src/storage/store/rel_table_data.cpp | 115 ++++++++-- src/storage/wal_replayer_utils.cpp | 4 +- test/test_files/issue/issue.test | 1 + .../delete_rel/delete_all_inserted_rels.test | 2 - .../delete_rels_from_update_store.test | 2 - .../mixed_delete_and_create_rels.test | 4 +- ..._and_update_rels_for_newly_added_node.test | 1 - ...t_delete_and_update_rels_in_same_list.test | 2 +- .../update_newly_inserted_rels.test | 1 - test/test_files/update_rel/create_empty.test | 52 +++++ tools/python_api/test/test_scan_pandas.py | 2 +- 17 files changed, 349 insertions(+), 112 deletions(-) diff --git a/src/include/storage/local_storage/local_rel_table.h b/src/include/storage/local_storage/local_rel_table.h index e8b720d5a1..faf3c44eb1 100644 --- a/src/include/storage/local_storage/local_rel_table.h +++ b/src/include/storage/local_storage/local_rel_table.h @@ -11,12 +11,14 @@ static constexpr common::column_id_t REL_ID_COLUMN_ID = 0; struct RelNGInfo { virtual ~RelNGInfo() = default; - virtual bool insert(common::offset_t srcNodeOffset, common::offset_t relOffset, + virtual bool insert(common::offset_t srcOffsetInChunk, common::offset_t relOffset, common::row_idx_t adjNodeRowIdx, const std::vector& propertyNodesRowIdx) = 0; - virtual void update(common::offset_t srcNodeOffset, common::offset_t relOffset, + virtual void update(common::offset_t srcOffsetInChunk, common::offset_t relOffset, common::column_id_t columnID, common::row_idx_t rowIdx) = 0; - virtual bool delete_(common::offset_t srcNodeOffset, common::offset_t relOffset) = 0; + virtual bool delete_(common::offset_t srcOffsetInChunk, common::offset_t relOffset) = 0; + + virtual uint64_t getNumInsertedTuples(common::offset_t srcOffsetInChunk) = 0; protected: inline static bool contains( @@ -39,12 +41,14 @@ struct RegularRelNGInfo final : public RelNGInfo { updateInfoPerChunk.resize(numChunks); } - bool insert(common::offset_t srcNodeOffset, common::offset_t relOffset, + bool insert(common::offset_t srcOffsetInChunk, common::offset_t relOffset, common::row_idx_t adjNodeRowIdx, const std::vector& propertyNodesRowIdx) override; - void update(common::offset_t srcNodeOffset, common::offset_t relOffset, + void update(common::offset_t srcOffsetInChunk, common::offset_t relOffset, common::column_id_t columnID, common::row_idx_t rowIdx) override; - bool delete_(common::offset_t srcNodeOffset, common::offset_t relOffset) final; + bool delete_(common::offset_t srcOffsetInChunk, common::offset_t relOffset) override; + + uint64_t getNumInsertedTuples(common::offset_t srcOffsetInChunk) override; }; // Info of node groups with CSR chunks for rel tables. @@ -60,12 +64,14 @@ struct CSRRelNGInfo final : public RelNGInfo { updateInfoPerChunk.resize(numChunks); } - bool insert(common::offset_t srcNodeOffset, common::offset_t relOffset, + bool insert(common::offset_t srcOffsetInChunk, common::offset_t relOffset, common::row_idx_t adjNodeRowIdx, const std::vector& propertyNodesRowIdx) override; - void update(common::offset_t srcNodeOffset, common::offset_t relOffset, + void update(common::offset_t srcOffsetInChunk, common::offset_t relOffset, common::column_id_t columnID, common::row_idx_t rowIdx) override; - bool delete_(common::offset_t srcNodeOffset, common::offset_t relOffset) override; + bool delete_(common::offset_t srcOffsetInChunk, common::offset_t relOffset) override; + + uint64_t getNumInsertedTuples(common::offset_t srcOffsetInChunk) override; }; class LocalRelNG final : public LocalNodeGroup { @@ -73,6 +79,13 @@ class LocalRelNG final : public LocalNodeGroup { LocalRelNG(common::offset_t nodeGroupStartOffset, common::ColumnDataFormat dataFormat, std::vector dataTypes, MemoryManager* mm); + common::row_idx_t scanCSR(common::offset_t srcOffsetInChunk, + common::offset_t posToReadForOffset, const std::vector& columnIDs, + const std::vector& outputVector); + void applyCSRUpdatesAndDeletions(common::offset_t srcOffsetInChunk, + const std::vector& columnIDs, common::ValueVector* relIDVector, + const std::vector& outputVector); + bool insert(common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector, const std::vector& propertyVectors); void update(common::ValueVector* srcNodeIDVector, common::ValueVector* relIDVector, @@ -86,6 +99,13 @@ class LocalRelNG final : public LocalNodeGroup { } inline RelNGInfo* getRelNGInfo() { return relNGInfo.get(); } +private: + void applyCSRUpdates(common::offset_t srcOffsetInChunk, common::column_id_t columnID, + const offset_to_offset_to_row_idx_t& updateInfo, common::ValueVector* relIDVector, + const std::vector& outputVector); + void applyCSRDeletions(common::offset_t srcOffsetInChunk, + const offset_to_offset_set_t& deleteInfo, common::ValueVector* relIDVector); + private: std::unique_ptr adjChunk; std::unique_ptr relNGInfo; diff --git a/src/include/storage/store/rel_table_data.h b/src/include/storage/store/rel_table_data.h index 810617dda8..6026016d8b 100644 --- a/src/include/storage/store/rel_table_data.h +++ b/src/include/storage/store/rel_table_data.h @@ -7,35 +7,40 @@ namespace kuzu { namespace storage { +class LocalRelNG; struct RelDataReadState : public TableReadState { common::RelDataDirection direction; common::ColumnDataFormat dataFormat; - common::offset_t startNodeOffsetInState; - common::offset_t numNodesInState; - common::offset_t currentCSRNodeOffset; + common::offset_t startNodeOffset; + common::offset_t numNodes; + common::offset_t currentNodeOffset; common::offset_t posInCurrentCSR; std::vector csrListEntries; // Temp auxiliary data structure to scan the offset of each CSR node in the offset column chunk. std::unique_ptr csrOffsetChunk; + // Following fields are used for local storage. + bool readFromLocalStorage; + LocalRelNG* localNodeGroup; + RelDataReadState(common::ColumnDataFormat dataFormat); inline bool isOutOfRange(common::offset_t nodeOffset) { - return nodeOffset < startNodeOffsetInState || - nodeOffset >= (startNodeOffsetInState + numNodesInState); - } - inline bool hasMoreToRead() { - return dataFormat == common::ColumnDataFormat::CSR && - posInCurrentCSR < - csrListEntries[(currentCSRNodeOffset - startNodeOffsetInState)].size; + return nodeOffset < startNodeOffset || nodeOffset >= (startNodeOffset + numNodes); } + bool hasMoreToRead(transaction::Transaction* transaction); void populateCSRListEntries(); std::pair getStartAndEndOffset(); + + inline bool hasMoreToReadInPersistentStorage() { + return posInCurrentCSR < csrListEntries[(currentNodeOffset - startNodeOffset)].size; + } + bool hasMoreToReadFromLocalStorage(); + bool trySwitchToLocalStorage(); }; class RelsStoreStats; class LocalRelTableData; struct CSRRelNGInfo; -class LocalRelNG; class RelTableData final : public TableData { public: RelTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, diff --git a/src/processor/operator/scan/scan_multi_rel_tables.cpp b/src/processor/operator/scan/scan_multi_rel_tables.cpp index cd7bd1e18a..5ccca402dd 100644 --- a/src/processor/operator/scan/scan_multi_rel_tables.cpp +++ b/src/processor/operator/scan/scan_multi_rel_tables.cpp @@ -17,7 +17,7 @@ void RelTableCollectionScanner::init() { bool RelTableCollectionScanner::scan(ValueVector* inVector, const std::vector& outputVectors, Transaction* transaction) { while (true) { - if (readStates[currentTableIdx]->hasMoreToRead()) { + if (readStates[currentTableIdx]->hasMoreToRead(transaction)) { KU_ASSERT(readStates[currentTableIdx]->dataFormat == ColumnDataFormat::CSR); auto scanInfo = scanInfos[currentTableIdx].get(); scanInfo->table->read( diff --git a/src/processor/operator/scan/scan_rel_csr_columns.cpp b/src/processor/operator/scan/scan_rel_csr_columns.cpp index 91c7f79e06..ab6c69bf96 100644 --- a/src/processor/operator/scan/scan_rel_csr_columns.cpp +++ b/src/processor/operator/scan/scan_rel_csr_columns.cpp @@ -5,7 +5,7 @@ namespace processor { bool ScanRelCSRColumns::getNextTuplesInternal(ExecutionContext* context) { while (true) { - if (scanState->hasMoreToRead()) { + if (scanState->hasMoreToRead(context->clientContext->getActiveTransaction())) { info->table->read(transaction, *scanState, inVector, outVectors); return true; } diff --git a/src/storage/local_storage/local_rel_table.cpp b/src/storage/local_storage/local_rel_table.cpp index ca4fbf6d3e..d841f8a0fc 100644 --- a/src/storage/local_storage/local_rel_table.cpp +++ b/src/storage/local_storage/local_rel_table.cpp @@ -8,127 +8,127 @@ using namespace kuzu::common; namespace kuzu { namespace storage { -bool RegularRelNGInfo::insert(offset_t srcNodeOffset, offset_t /*relOffset*/, +bool RegularRelNGInfo::insert(offset_t srcOffsetInChunk, offset_t /*relOffset*/, row_idx_t adjNodeRowIdx, const std::vector& propertyNodesRowIdx) { KU_ASSERT(propertyNodesRowIdx.size() == insertInfoPerChunk.size()); - bool isDeleted = deleteInfo.contains(srcNodeOffset); - adjInsertInfo[srcNodeOffset] = adjNodeRowIdx; - if (updateInfoPerChunk[0].contains(srcNodeOffset) && !isDeleted) { + bool wasDeleted = deleteInfo.contains(srcOffsetInChunk); + if (adjInsertInfo.contains(srcOffsetInChunk) && !wasDeleted) { throw RuntimeException{"Many-one, one-one relationship violated."}; } + adjInsertInfo[srcOffsetInChunk] = adjNodeRowIdx; for (auto i = 0u; i < propertyNodesRowIdx.size(); ++i) { - KU_ASSERT(!updateInfoPerChunk[i].contains(srcNodeOffset)); - insertInfoPerChunk[i][srcNodeOffset] = propertyNodesRowIdx[i]; + KU_ASSERT(!updateInfoPerChunk[i].contains(srcOffsetInChunk)); + insertInfoPerChunk[i][srcOffsetInChunk] = propertyNodesRowIdx[i]; } - return !isDeleted; + return !wasDeleted; } void RegularRelNGInfo::update( - offset_t srcNodeOffset, offset_t /*relOffset*/, column_id_t columnID, row_idx_t rowIdx) { - if (deleteInfo.contains(srcNodeOffset)) { + offset_t srcOffsetInChunk, offset_t /*relOffset*/, column_id_t columnID, row_idx_t rowIdx) { + if (deleteInfo.contains(srcOffsetInChunk)) { // We choose to ignore the update operation if the node is deleted. return; } KU_ASSERT(columnID != REL_ID_COLUMN_ID); // Rel ID is immutable. KU_ASSERT(columnID < updateInfoPerChunk.size()); - if (insertInfoPerChunk[columnID].contains(srcNodeOffset)) { + if (insertInfoPerChunk[columnID].contains(srcOffsetInChunk)) { // Update newly inserted value. - insertInfoPerChunk[columnID][srcNodeOffset] = rowIdx; + insertInfoPerChunk[columnID][srcOffsetInChunk] = rowIdx; } else { - updateInfoPerChunk[columnID][srcNodeOffset] = rowIdx; + updateInfoPerChunk[columnID][srcOffsetInChunk] = rowIdx; } } -bool RegularRelNGInfo::delete_(offset_t srcNodeOffset, offset_t /*relOffset*/) { - if (adjInsertInfo.contains(srcNodeOffset)) { +bool RegularRelNGInfo::delete_(offset_t srcOffsetInChunk, offset_t /*relOffset*/) { + if (adjInsertInfo.contains(srcOffsetInChunk)) { // Delete newly inserted tuple. - adjInsertInfo.erase(srcNodeOffset); - for (auto& insertInfo : insertInfoPerChunk) { - insertInfo.erase(srcNodeOffset); - } + adjInsertInfo.erase(srcOffsetInChunk); } else { - if (deleteInfo.contains(srcNodeOffset)) { + if (deleteInfo.contains(srcOffsetInChunk)) { // The node is already deleted. return false; } else { - deleteInfo.insert(srcNodeOffset); + deleteInfo.insert(srcOffsetInChunk); } } return true; } -bool CSRRelNGInfo::insert(offset_t srcNodeOffset, offset_t relOffset, row_idx_t adjNodeRowIdx, +uint64_t RegularRelNGInfo::getNumInsertedTuples(offset_t srcOffsetInChunk) { + return adjInsertInfo.contains(srcOffsetInChunk) ? 1 : 0; +} + +bool CSRRelNGInfo::insert(offset_t srcOffsetInChunk, offset_t relOffset, row_idx_t adjNodeRowIdx, const std::vector& propertyNodesRowIdx) { KU_ASSERT(propertyNodesRowIdx.size() == insertInfoPerChunk.size()); - if (deleteInfo.contains(srcNodeOffset) && contains(deleteInfo.at(srcNodeOffset), relOffset)) { - // We choose to ignore the insert operation if the node is deleted. - return false; + if (deleteInfo.contains(srcOffsetInChunk) && + contains(deleteInfo.at(srcOffsetInChunk), relOffset)) { + deleteInfo.at(srcOffsetInChunk).erase(relOffset); } - if (adjInsertInfo.contains(srcNodeOffset)) { - adjInsertInfo.at(srcNodeOffset)[relOffset] = adjNodeRowIdx; + if (adjInsertInfo.contains(srcOffsetInChunk)) { + adjInsertInfo.at(srcOffsetInChunk)[relOffset] = adjNodeRowIdx; } else { - adjInsertInfo[srcNodeOffset] = {{relOffset, adjNodeRowIdx}}; + adjInsertInfo[srcOffsetInChunk] = {{relOffset, adjNodeRowIdx}}; } for (auto i = 0u; i < propertyNodesRowIdx.size(); ++i) { - if (insertInfoPerChunk[i].contains(srcNodeOffset)) { - insertInfoPerChunk[i].at(srcNodeOffset)[relOffset] = propertyNodesRowIdx[i]; + if (insertInfoPerChunk[i].contains(srcOffsetInChunk)) { + insertInfoPerChunk[i].at(srcOffsetInChunk)[relOffset] = propertyNodesRowIdx[i]; } else { - insertInfoPerChunk[i][srcNodeOffset] = {{relOffset, propertyNodesRowIdx[i]}}; + insertInfoPerChunk[i][srcOffsetInChunk] = {{relOffset, propertyNodesRowIdx[i]}}; } } return false; } void CSRRelNGInfo::update( - offset_t srcNodeOffset, offset_t relOffset, column_id_t columnID, row_idx_t rowIdx) { + offset_t srcOffsetInChunk, offset_t relOffset, column_id_t columnID, row_idx_t rowIdx) { // REL_ID_COLUMN_ID is immutable. KU_ASSERT(columnID != REL_ID_COLUMN_ID && columnID < updateInfoPerChunk.size()); - if (deleteInfo.contains(srcNodeOffset) && contains(deleteInfo.at(srcNodeOffset), relOffset)) { + if (deleteInfo.contains(srcOffsetInChunk) && + contains(deleteInfo.at(srcOffsetInChunk), relOffset)) { // We choose to ignore the update operation if the node is deleted. return; } - if (insertInfoPerChunk[columnID].contains(srcNodeOffset) && - insertInfoPerChunk[columnID].at(srcNodeOffset).contains(relOffset)) { + if (insertInfoPerChunk[columnID].contains(srcOffsetInChunk) && + insertInfoPerChunk[columnID].at(srcOffsetInChunk).contains(relOffset)) { // Update newly inserted value. - insertInfoPerChunk[columnID].at(srcNodeOffset)[relOffset] = rowIdx; + insertInfoPerChunk[columnID].at(srcOffsetInChunk)[relOffset] = rowIdx; } else { - if (updateInfoPerChunk[columnID].contains(srcNodeOffset)) { - updateInfoPerChunk[columnID].at(srcNodeOffset)[relOffset] = rowIdx; + if (updateInfoPerChunk[columnID].contains(srcOffsetInChunk)) { + updateInfoPerChunk[columnID].at(srcOffsetInChunk)[relOffset] = rowIdx; } else { - updateInfoPerChunk[columnID][srcNodeOffset] = {{relOffset, rowIdx}}; + updateInfoPerChunk[columnID][srcOffsetInChunk] = {{relOffset, rowIdx}}; } } } -bool CSRRelNGInfo::delete_(offset_t srcNodeOffset, offset_t relOffset) { - if (adjInsertInfo.contains(srcNodeOffset) && - adjInsertInfo.at(srcNodeOffset).contains(relOffset)) { +bool CSRRelNGInfo::delete_(offset_t srcOffsetInChunk, offset_t relOffset) { + if (adjInsertInfo.contains(srcOffsetInChunk) && + adjInsertInfo.at(srcOffsetInChunk).contains(relOffset)) { // Delete newly inserted tuple. - adjInsertInfo.at(srcNodeOffset).erase(relOffset); - if (adjInsertInfo.at(srcNodeOffset).empty()) { - adjInsertInfo.erase(srcNodeOffset); - } + adjInsertInfo.at(srcOffsetInChunk).erase(relOffset); for (auto& insertInfo : insertInfoPerChunk) { - insertInfo.at(srcNodeOffset).erase(relOffset); - if (insertInfo.at(srcNodeOffset).empty()) { - insertInfo.erase(srcNodeOffset); - } + insertInfo.at(srcOffsetInChunk).erase(relOffset); } } else { - if (deleteInfo.contains(srcNodeOffset)) { - if (deleteInfo.at(srcNodeOffset).contains(relOffset)) { + if (deleteInfo.contains(srcOffsetInChunk)) { + if (deleteInfo.at(srcOffsetInChunk).contains(relOffset)) { // The node is already deleted. return false; } else { - deleteInfo.at(srcNodeOffset).insert(relOffset); + deleteInfo.at(srcOffsetInChunk).insert(relOffset); } } else { - deleteInfo[srcNodeOffset] = {relOffset}; + deleteInfo[srcOffsetInChunk] = {relOffset}; } } return true; } +uint64_t CSRRelNGInfo::getNumInsertedTuples(offset_t srcOffsetInChunk) { + return adjInsertInfo.contains(srcOffsetInChunk) ? adjInsertInfo.at(srcOffsetInChunk).size() : 0; +} + LocalRelNG::LocalRelNG(offset_t nodeGroupStartOffset, ColumnDataFormat dataFormat, std::vector dataTypes, kuzu::storage::MemoryManager* mm) : LocalNodeGroup{nodeGroupStartOffset, std::move(dataTypes), mm} { @@ -146,6 +146,103 @@ LocalRelNG::LocalRelNG(offset_t nodeGroupStartOffset, ColumnDataFormat dataForma adjChunk = std::make_unique(LogicalType::INTERNAL_ID(), mm); } +// TODO(Guodong): We should change the map between relID and rowIdx to a vector of pairs, which is +// more friendly for scan. +row_idx_t LocalRelNG::scanCSR(offset_t srcOffsetInChunk, offset_t posToReadForOffset, + const std::vector& columnIDs, const std::vector& outputVectors) { + KU_ASSERT(columnIDs.size() + 1 == outputVectors.size()); + auto csrRelNGInfo = ku_dynamic_cast(relNGInfo.get()); + KU_ASSERT(csrRelNGInfo); + KU_ASSERT(csrRelNGInfo->adjInsertInfo.contains(srcOffsetInChunk)); + uint64_t posInVector = 0; + auto iteratorIdx = 0u; + for (auto& [relID, rowIdx] : csrRelNGInfo->adjInsertInfo.at(srcOffsetInChunk)) { + if (iteratorIdx++ < posToReadForOffset) { + continue; + } + auto posInLocalVector = rowIdx & (DEFAULT_VECTOR_CAPACITY - 1); + outputVectors[0]->copyFromVectorData( + posInVector++, adjChunk->getLocalVector(rowIdx)->getVector(), posInLocalVector); + } + for (auto i = 0u; i < columnIDs.size(); ++i) { + auto columnID = columnIDs[i]; + posInVector = 0; + iteratorIdx = 0u; + auto& insertInfo = csrRelNGInfo->insertInfoPerChunk[columnID]; + KU_ASSERT(insertInfo.contains(srcOffsetInChunk)); + for (auto& [relID, rowIdx] : insertInfo.at(srcOffsetInChunk)) { + if (iteratorIdx++ < posToReadForOffset) { + continue; + } + auto posInLocalVector = rowIdx & (DEFAULT_VECTOR_CAPACITY - 1); + outputVectors[i + 1]->copyFromVectorData(posInVector++, + chunks[columnID]->getLocalVector(rowIdx)->getVector(), posInLocalVector); + } + } + outputVectors[0]->state->selVector->resetSelectorToUnselectedWithSize(posInVector); + return posInVector; +} + +void LocalRelNG::applyCSRUpdatesAndDeletions(offset_t srcOffsetInChunk, + const std::vector& columnIDs, ValueVector* relIDVector, + const std::vector& outputVector) { + KU_ASSERT(columnIDs.size() + 1 == outputVector.size()); + auto csrRelNGInfo = ku_dynamic_cast(relNGInfo.get()); + KU_ASSERT(csrRelNGInfo); + // Apply updates first, as applying deletions might change selected state. + for (auto i = 0u; i < columnIDs.size(); ++i) { + auto columnID = columnIDs[i]; + applyCSRUpdates(srcOffsetInChunk, columnID, csrRelNGInfo->updateInfoPerChunk[columnID], + relIDVector, outputVector); + } + // Apply deletions and update selVector if necessary. + if (csrRelNGInfo->deleteInfo.contains(srcOffsetInChunk) && + csrRelNGInfo->deleteInfo.at(srcOffsetInChunk).size() > 0) { + applyCSRDeletions(srcOffsetInChunk, csrRelNGInfo->deleteInfo, relIDVector); + } +} + +void LocalRelNG::applyCSRUpdates(offset_t srcOffsetInChunk, column_id_t columnID, + const offset_to_offset_to_row_idx_t& updateInfo, ValueVector* relIDVector, + const std::vector& outputVector) { + if (!updateInfo.contains(srcOffsetInChunk) || updateInfo.at(srcOffsetInChunk).empty()) { + return; + } + auto& updateInfoForOffset = updateInfo.at(srcOffsetInChunk); + for (auto i = 0u; i < relIDVector->state->selVector->selectedSize; i++) { + auto pos = relIDVector->state->selVector->selectedPositions[i]; + auto relOffset = relIDVector->getValue(pos).offset; + if (updateInfoForOffset.contains(relOffset)) { + auto rowIdx = updateInfoForOffset.at(relOffset); + auto posInLocalVector = rowIdx & (DEFAULT_VECTOR_CAPACITY - 1); + outputVector[i + 1]->copyFromVectorData( + pos, chunks[columnID]->getLocalVector(rowIdx)->getVector(), posInLocalVector); + } + } +} + +void LocalRelNG::applyCSRDeletions( + offset_t srcOffsetInChunk, const offset_to_offset_set_t& deleteInfo, ValueVector* relIDVector) { + auto& deleteInfoForOffset = deleteInfo.at(srcOffsetInChunk); + auto selectPos = 0u; + auto selVector = std::make_unique(DEFAULT_VECTOR_CAPACITY); + selVector->resetSelectorToValuePosBuffer(); + for (auto i = 0u; i < relIDVector->state->selVector->selectedSize; i++) { + auto relIDPos = relIDVector->state->selVector->selectedPositions[i]; + auto relOffset = relIDVector->getValue(relIDPos).offset; + if (deleteInfoForOffset.contains(relOffset)) { + continue; + } + selVector->selectedPositions[selectPos++] = relIDPos; + } + if (selectPos != relIDVector->state->selVector->selectedSize) { + relIDVector->state->selVector->resetSelectorToValuePosBuffer(); + memcpy(relIDVector->state->selVector->selectedPositions, selVector->selectedPositions, + selectPos * sizeof(sel_t)); + relIDVector->state->selVector->selectedSize = selectPos; + } +} + bool LocalRelNG::insert(ValueVector* srcNodeIDVector, ValueVector* dstNodeIDVector, const std::vector& propertyVectors) { KU_ASSERT(propertyVectors.size() == chunks.size() && propertyVectors.size() >= 1); diff --git a/src/storage/store/rel_table.cpp b/src/storage/store/rel_table.cpp index cece57e4ec..85e971de44 100644 --- a/src/storage/store/rel_table.cpp +++ b/src/storage/store/rel_table.cpp @@ -112,10 +112,9 @@ common::row_idx_t RelTable::detachDeleteForCSRRels(Transaction* transaction, RelDataReadState* relDataReadState, RelDetachDeleteState* deleteState) { row_idx_t numRelsDeleted = 0; auto tempState = deleteState->dstNodeIDVector->state.get(); - while (relDataReadState->hasMoreToRead()) { + while (relDataReadState->hasMoreToRead(transaction)) { scan(transaction, *relDataReadState, srcNodeIDVector, {deleteState->dstNodeIDVector.get(), deleteState->relIDVector.get()}); - KU_ASSERT(tempState->selVector->isUnfiltered()); auto numRelsScanned = tempState->selVector->selectedSize; tempState->selVector->resetSelectorToValuePosBufferWithSize(1); for (auto i = 0u; i < numRelsScanned; i++) { diff --git a/src/storage/store/rel_table_data.cpp b/src/storage/store/rel_table_data.cpp index 63c64e6ebf..68a5c2765a 100644 --- a/src/storage/store/rel_table_data.cpp +++ b/src/storage/store/rel_table_data.cpp @@ -13,25 +13,62 @@ namespace kuzu { namespace storage { RelDataReadState::RelDataReadState(ColumnDataFormat dataFormat) - : dataFormat{dataFormat}, startNodeOffsetInState{0}, numNodesInState{0}, - currentCSRNodeOffset{0}, posInCurrentCSR{0} { + : dataFormat{dataFormat}, startNodeOffset{0}, numNodes{0}, currentNodeOffset{0}, + posInCurrentCSR{0}, readFromLocalStorage{false}, localNodeGroup{nullptr} { csrListEntries.resize(StorageConstants::NODE_GROUP_SIZE, {0, 0}); csrOffsetChunk = ColumnChunkFactory::createColumnChunk(LogicalType::INT64(), false /* enableCompression */); } +bool RelDataReadState::hasMoreToReadFromLocalStorage() { + KU_ASSERT(localNodeGroup); + return posInCurrentCSR < localNodeGroup->getRelNGInfo()->getNumInsertedTuples( + currentNodeOffset - startNodeOffset); +} + +bool RelDataReadState::trySwitchToLocalStorage() { + if (localNodeGroup && localNodeGroup->getRelNGInfo()->getNumInsertedTuples( + currentNodeOffset - startNodeOffset) > 0) { + readFromLocalStorage = true; + posInCurrentCSR = 0; + return true; + } + return false; +} + +bool RelDataReadState::hasMoreToRead(transaction::Transaction* transaction) { + if (dataFormat == ColumnDataFormat::REGULAR) { + return false; + } + if (transaction->isWriteTransaction()) { + if (readFromLocalStorage) { + // Already read from local storage. Check if there are more in local storage. + return hasMoreToReadFromLocalStorage(); + } else { + if (hasMoreToReadInPersistentStorage()) { + return true; + } else { + // Try switch to read from local storage. + return trySwitchToLocalStorage(); + } + } + } else { + return hasMoreToReadInPersistentStorage(); + } +} + void RelDataReadState::populateCSRListEntries() { auto csrOffsets = (offset_t*)csrOffsetChunk->getData(); csrListEntries[0].offset = 0; csrListEntries[0].size = csrOffsets[0]; - for (auto i = 1; i < numNodesInState; i++) { + for (auto i = 1; i < numNodes; i++) { csrListEntries[i].offset = csrOffsets[i - 1]; csrListEntries[i].size = csrOffsets[i] - csrOffsets[i - 1]; } } std::pair RelDataReadState::getStartAndEndOffset() { - auto currCSRListEntry = csrListEntries[currentCSRNodeOffset - startNodeOffsetInState]; + auto currCSRListEntry = csrListEntries[currentNodeOffset - startNodeOffset]; auto currCSRSize = currCSRListEntry.size; auto startOffset = currCSRListEntry.offset + posInCurrentCSR; auto numRowsToRead = std::min(currCSRSize - posInCurrentCSR, DEFAULT_VECTOR_CAPACITY); @@ -80,24 +117,38 @@ void RelTableData::initializeReadState(Transaction* transaction, RelDataReadState* readState) { readState->direction = direction; readState->columnIDs = std::move(columnIDs); - if (dataFormat == ColumnDataFormat::REGULAR) { - return; - } auto nodeOffset = inNodeIDVector->readNodeOffset(inNodeIDVector->state->selVector->selectedPositions[0]); auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); - auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); - readState->posInCurrentCSR = 0; - if (readState->isOutOfRange(nodeOffset)) { - // Scan csr offsets and populate csr list entries for the new node group. - readState->startNodeOffsetInState = startNodeOffset; - csrOffsetColumn->scan(transaction, nodeGroupIdx, readState->csrOffsetChunk.get()); - readState->numNodesInState = readState->csrOffsetChunk->getNumValues(); - readState->populateCSRListEntries(); - } - if (nodeOffset != readState->currentCSRNodeOffset) { - readState->currentCSRNodeOffset = nodeOffset; + if (dataFormat == ColumnDataFormat::CSR) { + auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); + readState->posInCurrentCSR = 0; + if (readState->isOutOfRange(nodeOffset)) { + // Scan csr offsets and populate csr list entries for the new node group. + readState->startNodeOffset = startNodeOffset; + csrOffsetColumn->scan(transaction, nodeGroupIdx, readState->csrOffsetChunk.get()); + readState->numNodes = readState->csrOffsetChunk->getNumValues(); + readState->populateCSRListEntries(); + if (transaction->isWriteTransaction()) { + auto localTableData = transaction->getLocalStorage()->getLocalTableData( + tableID, getDataIdxFromDirection(direction)); + if (localTableData) { + auto localRelTableData = + ku_dynamic_cast(localTableData); + readState->localNodeGroup = + localRelTableData->nodeGroups.contains(nodeGroupIdx) ? + ku_dynamic_cast( + localRelTableData->nodeGroups.at(nodeGroupIdx).get()) : + nullptr; + } + } + } + if (nodeOffset != readState->currentNodeOffset) { + readState->currentNodeOffset = nodeOffset; + } } + // Reset to read from persistent storage. + readState->readFromLocalStorage = false; } void RelTableData::scanRegularColumns(Transaction* transaction, RelDataReadState& readState, @@ -119,15 +170,23 @@ void RelTableData::scanRegularColumns(Transaction* transaction, RelDataReadState } void RelTableData::scanCSRColumns(Transaction* transaction, RelDataReadState& readState, - ValueVector* /*inNodeIDVector*/, const std::vector& outputVectors) { + ValueVector* inNodeIDVector, const std::vector& outputVectors) { KU_ASSERT(dataFormat == ColumnDataFormat::CSR); + if (readState.readFromLocalStorage) { + auto offsetInChunk = readState.currentNodeOffset - readState.startNodeOffset; + auto numValuesRead = readState.localNodeGroup->scanCSR( + offsetInChunk, readState.posInCurrentCSR, readState.columnIDs, outputVectors); + readState.posInCurrentCSR += numValuesRead; + return; + } auto [startOffset, endOffset] = readState.getStartAndEndOffset(); auto numRowsToRead = endOffset - startOffset; outputVectors[0]->state->selVector->resetSelectorToUnselectedWithSize(numRowsToRead); outputVectors[0]->state->setOriginalSize(numRowsToRead); - auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(readState.currentCSRNodeOffset); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(readState.currentNodeOffset); adjColumn->scan(transaction, nodeGroupIdx, startOffset, endOffset, outputVectors[0], 0 /* offsetInVector */); + auto relIDVectorIdx = INVALID_VECTOR_IDX; for (auto i = 0u; i < readState.columnIDs.size(); i++) { auto columnID = readState.columnIDs[i]; auto outputVectorId = i + 1; // Skip output from adj column. @@ -135,9 +194,21 @@ void RelTableData::scanCSRColumns(Transaction* transaction, RelDataReadState& re outputVectors[outputVectorId]->setAllNull(); continue; } + if (columnID == REL_ID_COLUMN_ID) { + relIDVectorIdx = outputVectorId; + } columns[readState.columnIDs[i]]->scan(transaction, nodeGroupIdx, startOffset, endOffset, outputVectors[outputVectorId], 0 /* offsetInVector */); } + if (transaction->isWriteTransaction() && readState.localNodeGroup) { + auto nodeOffset = + inNodeIDVector->readNodeOffset(inNodeIDVector->state->selVector->selectedPositions[0]); + KU_ASSERT(relIDVectorIdx != INVALID_VECTOR_IDX); + auto relIDVector = outputVectors[relIDVectorIdx]; + readState.localNodeGroup->applyCSRUpdatesAndDeletions( + nodeOffset - readState.startNodeOffset, readState.columnIDs, relIDVector, + outputVectors); + } } void RelTableData::lookup(Transaction* transaction, TableReadState& readState, @@ -444,8 +515,8 @@ std::unique_ptr RelTableData::slideCSRColumnChunk(Transaction* tran if (hasDeletions && deleteInfo.at(offsetInNG).contains(relID)) { // This rel is deleted now. Skip. } else if (hasUpdates && updateInfo.at(offsetInNG).contains(relID)) { - // This rel is inserted or updated. Append from local data to the column chunk. - auto rowIdx = insertInfo.at(offsetInNG).at(relID); + // This rel is updated. Append from local data to the column chunk. + auto rowIdx = updateInfo.at(offsetInNG).at(relID); auto localVector = localChunk->getLocalVector(rowIdx)->getVector(); auto offsetInVector = rowIdx & (DEFAULT_VECTOR_CAPACITY - 1); localVector->state->selVector->selectedPositions[0] = offsetInVector; diff --git a/src/storage/wal_replayer_utils.cpp b/src/storage/wal_replayer_utils.cpp index 09191e0152..ab6b16f068 100644 --- a/src/storage/wal_replayer_utils.cpp +++ b/src/storage/wal_replayer_utils.cpp @@ -18,7 +18,7 @@ void WALReplayerUtils::createEmptyHashIndexFiles( StorageUtils::getNodeIndexFName( directory, nodeTableSchema->tableID, FileVersionType::ORIGINAL), *pk->getDataType()); - pkIndex->bulkReserve(0 /* numNodesInState */); + pkIndex->bulkReserve(0 /* numNodes */); pkIndex->flush(); } break; case LogicalTypeID::STRING: { @@ -26,7 +26,7 @@ void WALReplayerUtils::createEmptyHashIndexFiles( StorageUtils::getNodeIndexFName( directory, nodeTableSchema->tableID, FileVersionType::ORIGINAL), *pk->getDataType()); - pkIndex->bulkReserve(0 /* numNodesInState */); + pkIndex->bulkReserve(0 /* numNodes */); pkIndex->flush(); } break; case LogicalTypeID::SERIAL: { diff --git a/test/test_files/issue/issue.test b/test/test_files/issue/issue.test index 9f3095094f..ad2078e9fa 100644 --- a/test/test_files/issue/issue.test +++ b/test/test_files/issue/issue.test @@ -131,6 +131,7 @@ Runtime exception: Found duplicated primary key value rename 2x, which violates ---- 1 || + -CASE 2303 -STATEMENT CREATE NODE TABLE T (id STRING, PRIMARY KEY(id)); ---- ok diff --git a/test/test_files/transaction/delete_rel/delete_all_inserted_rels.test b/test/test_files/transaction/delete_rel/delete_all_inserted_rels.test index c4037f9348..9b1a88d7e8 100644 --- a/test/test_files/transaction/delete_rel/delete_all_inserted_rels.test +++ b/test/test_files/transaction/delete_rel/delete_all_inserted_rels.test @@ -1,9 +1,7 @@ -GROUP DeleteRelTest -DATASET CSV rel-update-tests --SKIP -- -# This test should be added back once support read under write transaction. -DEFINE_STATEMENT_BLOCK DELETE_REL_TEST_DELETE_ALL_INSERTED_RELS [ -STATEMENT MATCH (p1:person), (p2:person) WHERE p1.ID = 1 AND p2.ID = 51 create (p1)-[:knows {length: 51}]->(p2); ---- ok diff --git a/test/test_files/transaction/delete_rel/delete_rels_from_update_store.test b/test/test_files/transaction/delete_rel/delete_rels_from_update_store.test index 1536dcc5fe..0ee3e9a7c3 100644 --- a/test/test_files/transaction/delete_rel/delete_rels_from_update_store.test +++ b/test/test_files/transaction/delete_rel/delete_rels_from_update_store.test @@ -1,9 +1,7 @@ -GROUP DeleteRelTest -DATASET CSV rel-update-tests --SKIP -- -# This test should be added back once support read under write transaction. -DEFINE_STATEMENT_BLOCK DELETE_RELS_FROM_UPDATE_STORE [ -STATEMENT MATCH (p1:person), (p2:person) WHERE p1.ID = 51 AND p2.ID = 0 create (p1)-[:knows]->(p2); ---- ok diff --git a/test/test_files/transaction/delete_rel/mixed_delete_and_create_rels.test b/test/test_files/transaction/delete_rel/mixed_delete_and_create_rels.test index f69fd26ffe..bf7058b598 100644 --- a/test/test_files/transaction/delete_rel/mixed_delete_and_create_rels.test +++ b/test/test_files/transaction/delete_rel/mixed_delete_and_create_rels.test @@ -1,9 +1,7 @@ -GROUP DeleteRelTest -DATASET CSV rel-update-tests --SKIP -- -# This test should be added back once support read under write transaction. -CASE MixedDeleteAndCreateRelsCommitNormalExecution -STATEMENT BEGIN TRANSACTION ---- ok @@ -45,4 +43,4 @@ 5 6 9 -10 \ No newline at end of file +10 diff --git a/test/test_files/transaction/update_rel/insert_and_update_rels_for_newly_added_node.test b/test/test_files/transaction/update_rel/insert_and_update_rels_for_newly_added_node.test index cace5f5ea6..6fc77b51d6 100644 --- a/test/test_files/transaction/update_rel/insert_and_update_rels_for_newly_added_node.test +++ b/test/test_files/transaction/update_rel/insert_and_update_rels_for_newly_added_node.test @@ -1,6 +1,5 @@ -GROUP UpdateRelTest -DATASET CSV rel-update-tests --SKIP -- -DEFINE_STATEMENT_BLOCK INSERT_AND_UPDATE_RELS_FOR_NEWLY_ADDED_NODE [ diff --git a/test/test_files/transaction/update_rel/insert_delete_and_update_rels_in_same_list.test b/test/test_files/transaction/update_rel/insert_delete_and_update_rels_in_same_list.test index 1e28ebee70..059094b16d 100644 --- a/test/test_files/transaction/update_rel/insert_delete_and_update_rels_in_same_list.test +++ b/test/test_files/transaction/update_rel/insert_delete_and_update_rels_in_same_list.test @@ -1,6 +1,6 @@ -GROUP UpdateRelTest -DATASET CSV rel-update-tests --SKIP + -- -DEFINE_STATEMENT_BLOCK INSERT_DELETE_AND_UPDATE_RELS_IN_SAME_LIST [ diff --git a/test/test_files/transaction/update_rel/update_newly_inserted_rels.test b/test/test_files/transaction/update_rel/update_newly_inserted_rels.test index 69fce8e262..ede628a7b6 100644 --- a/test/test_files/transaction/update_rel/update_newly_inserted_rels.test +++ b/test/test_files/transaction/update_rel/update_newly_inserted_rels.test @@ -1,6 +1,5 @@ -GROUP UpdateRelTest -DATASET CSV rel-update-tests --SKIP -- -DEFINE_STATEMENT_BLOCK UPDATE_NEWLY_INSERTED_RELS [ diff --git a/test/test_files/update_rel/create_empty.test b/test/test_files/update_rel/create_empty.test index 8f13a735f9..174891fa43 100644 --- a/test/test_files/update_rel/create_empty.test +++ b/test/test_files/update_rel/create_empty.test @@ -19,3 +19,55 @@ ---- 2 12 8 + +-CASE CreateAndScanRel +-STATEMENT CREATE NODE TABLE N1(ID INT64, PRIMARY KEY(ID)); +---- ok +-STATEMENT CREATE NODE TABLE N2(ID INT64, PRIMARY KEY(ID)); +---- ok +-STATEMENT CREATE REL TABLE Rel1(FROM N1 TO N2, MANY_MANY); +---- ok +-STATEMENT CREATE (:N1 {ID: 10}), (:N1 {ID: 1}), (:N2 {ID: 12}), (:N2 {ID: 8}) +---- ok +-STATEMENT MATCH (n:N1) RETURN n.ID +---- 2 +10 +1 +-STATEMENT MATCH (n:N2) RETURN n.ID +---- 2 +12 +8 +-STATEMENT BEGIN TRANSACTION +---- ok +-STATEMENT MATCH (n1:N1), (n2:N2) WHERE n1.ID=10 AND n2.ID=12 CREATE (n1)-[r:Rel1]->(n2) +---- ok +-STATEMENT MATCH (n:N1)-[r:Rel1]->(m:N2) RETURN n.ID, m.ID +---- 1 +10|12 +-STATEMENT COMMIT +---- ok +-STATEMENT MATCH (n:N1)-[r:Rel1]->(m:N2) RETURN n.ID, m.ID +---- 1 +10|12 + +-CASE DeleteNewlyCreatedRel +-STATEMENT CREATE NODE TABLE N1(ID INT64, PRIMARY KEY(ID)); +---- ok +-STATEMENT CREATE NODE TABLE N2(ID INT64, PRIMARY KEY(ID)); +---- ok +-STATEMENT CREATE REL TABLE Rel1(FROM N1 TO N2, MANY_MANY); +---- ok +-STATEMENT CREATE (:N1 {ID: 10}), (:N1 {ID: 1}), (:N2 {ID: 12}), (:N2 {ID: 8}) +---- ok +-STATEMENT BEGIN TRANSACTION +---- ok +-STATEMENT MATCH (n1:N1), (n2:N2) WHERE n1.ID=10 AND n2.ID=12 CREATE (n1)-[r:Rel1]->(n2) +---- ok +-STATEMENT MATCH (n1:N1)-[r:Rel1]->(n2:N2) WHERE n1.ID=10 AND n2.ID=12 DELETE r +---- ok +-STATEMENT MATCH (n:N1)-[r:Rel1]->(m:N2) RETURN n.ID, m.ID +---- 0 +-STATEMENT COMMIT +---- ok +-STATEMENT MATCH (n:N1)-[r:Rel1]->(m:N2) RETURN n.ID, m.ID +---- 0 diff --git a/tools/python_api/test/test_scan_pandas.py b/tools/python_api/test/test_scan_pandas.py index 610006ee63..84aaef8ff7 100644 --- a/tools/python_api/test/test_scan_pandas.py +++ b/tools/python_api/test/test_scan_pandas.py @@ -84,7 +84,7 @@ def test_large_pd(get_tmp_path): num_rows = 40000 odd_numbers = [2 * i + 1 for i in range(num_rows)] even_numbers = [2 * i for i in range(num_rows)] - df = pd.DataFrame({'odd': odd_numbers, 'even': even_numbers}) + df = pd.DataFrame({'odd': np.array(odd_numbers, dtype=np.int64), 'even': np.array(even_numbers, dtype=np.int64)}) result = conn.execute("CALL READ_PANDAS('df') RETURN *").get_as_df() assert result['odd'].to_list() == odd_numbers assert result['even'].to_list() == even_numbers