Skip to content

Commit

Permalink
Fix delete then scan bug (#3176)
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Mar 30, 2024
1 parent f80a6eb commit fa528c1
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 49 deletions.
7 changes: 2 additions & 5 deletions src/include/storage/stats/node_table_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,8 @@ class NodeTableStatsAndDeletedIDs : public TableStatistics {

void deleteNode(common::offset_t nodeOffset);

// This function assumes that it is being called right after ScanNodeID has obtained a
// morsel and that the nodeID structs in nodeOffsetVector.values have consecutive node
// offsets and the same tableID.
void setDeletedNodeOffsetsForMorsel(
const std::shared_ptr<common::ValueVector>& nodeOffsetVector);
// This function assumes nodeIDVector have consecutive node offsets and the same tableID.
void setDeletedNodeOffsetsForMorsel(common::ValueVector* nodeIDVector) const;

void setNumTuples(uint64_t numTuples) override;

Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/stats/nodes_store_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ class NodesStoreStatsAndDeletedIDs : public TablesStatistics {
getNodeTableStats(transaction::TransactionType::WRITE, tableID)->deleteNode(nodeOffset);
}

void setDeletedNodeOffsetsForMorsel(transaction::Transaction* transaction,
const std::shared_ptr<common::ValueVector>& nodeOffsetVector, common::table_id_t tableID);
void setDeletedNodeOffsetsForMorsel(transaction::Transaction* tx,
common::ValueVector* nodeIDVector, common::table_id_t tableID);

void addNodeStatisticsAndDeletedIDs(catalog::NodeTableCatalogEntry* nodeTableEntry);

Expand Down
10 changes: 10 additions & 0 deletions src/include/storage/stats/table_statistics_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ namespace storage {

struct TablesStatisticsContent {
std::unordered_map<common::table_id_t, std::unique_ptr<TableStatistics>> tableStatisticPerTable;

const TableStatistics* getTableStat(common::table_id_t tableID) const {
KU_ASSERT(tableStatisticPerTable.contains(tableID));
return tableStatisticPerTable.at(tableID).get();
}
};

class WAL;
Expand Down Expand Up @@ -87,6 +92,11 @@ class TablesStatistics {
virtual std::string getTableStatisticsFilePath(
const std::string& directory, common::FileVersionType dbFileType) = 0;

const TablesStatisticsContent* getVersion(transaction::TransactionType type) const {
return type == transaction::TransactionType::READ_ONLY ? readOnlyVersion.get() :
readWriteVersion.get();
}

void readFromFile();
void readFromFile(common::FileVersionType dbFileType);

Expand Down
10 changes: 6 additions & 4 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ class NodeTable final : public Table {
tablesStatistics);
return nodesStats->getMaxNodeOffset(transaction, tableID);
}
inline void setSelVectorForDeletedOffsets(
transaction::Transaction* trx, std::shared_ptr<common::ValueVector>& vector) const {
void setSelVectorForDeletedOffsets(
transaction::Transaction* trx, common::ValueVector* vector) const {
KU_ASSERT(vector->isSequential());
common::ku_dynamic_cast<TablesStatistics*, NodesStoreStatsAndDeletedIDs*>(tablesStatistics)
->setDeletedNodeOffsetsForMorsel(trx, vector, tableID);
auto nodeStateCollection =
common::ku_dynamic_cast<TablesStatistics*, NodesStoreStatsAndDeletedIDs*>(
tablesStatistics);
nodeStateCollection->setDeletedNodeOffsetsForMorsel(trx, vector, tableID);
}

inline void initializeReadState(transaction::Transaction* transaction,
Expand Down
18 changes: 10 additions & 8 deletions src/processor/operator/scan_node_id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ bool ScanNodeID::getNextTuplesInternal(ExecutionContext* context) {
if (state == nullptr) {
return false;
}
outValueVector->state->selVector->resetSelectorToUnselected();
auto nodeIDValues = (nodeID_t*)(outValueVector->getData());
auto size = endOffset - startOffset;
for (auto i = 0u; i < size; ++i) {
Expand All @@ -83,24 +84,25 @@ bool ScanNodeID::getNextTuplesInternal(ExecutionContext* context) {

void ScanNodeID::setSelVector(ExecutionContext* context, NodeTableScanState* tableState,
offset_t startOffset, offset_t endOffset) {
// Apply changes to the selVector from nodes metadata.
tableState->getTable()->setSelVectorForDeletedOffsets(
context->clientContext->getTx(), outValueVector.get());
if (tableState->isSemiMaskEnabled()) {
outValueVector->state->selVector->resetSelectorToValuePosBuffer();
auto selectedBuffer = outValueVector->state->selVector->getSelectedPositionsBuffer();
sel_t prevSelectedSize = outValueVector->state->selVector->selectedSize;
// Fill selected positions based on node mask for nodes between the given startOffset and
// endOffset. If the node is masked (i.e., valid for read), then it is set to the selected
// positions. Finally, we update the selectedSize for selVector.
sel_t numSelectedValues = 0;
for (auto i = 0u; i < (endOffset - startOffset); i++) {
outValueVector->state->selVector->selectedPositions[numSelectedValues] = i;
selectedBuffer[numSelectedValues] = i;
numSelectedValues += tableState->getSemiMask()->isNodeMasked(i + startOffset);
}
outValueVector->state->selVector->selectedSize = numSelectedValues;
} else {
// By default, the selected positions is set to the const incremental pos array.
outValueVector->state->selVector->resetSelectorToUnselected();
if (prevSelectedSize != numSelectedValues) {
outValueVector->state->selVector->resetSelectorToValuePosBuffer();
}
}
// Apply changes to the selVector from nodes metadata.
tableState->getTable()->setSelVectorForDeletedOffsets(
context->clientContext->getTx(), outValueVector);
}

double ScanNodeID::getProgress(ExecutionContext* /*context*/) const {
Expand Down
43 changes: 21 additions & 22 deletions src/storage/stats/node_table_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,33 +104,32 @@ void NodeTableStatsAndDeletedIDs::deleteNode(offset_t nodeOffset) {

// Note: this function will always be called right after scanNodeID, so we have the guarantee
// that the nodeOffsetVector is always unselected.
void NodeTableStatsAndDeletedIDs::setDeletedNodeOffsetsForMorsel(
const std::shared_ptr<ValueVector>& nodeOffsetVector) {
auto morselIdxAndOffset = StorageUtils::getQuotientRemainder(
nodeOffsetVector->readNodeOffset(0), DEFAULT_VECTOR_CAPACITY);
if (hasDeletedNodesPerMorsel[morselIdxAndOffset.first]) {
auto deletedNodeOffsets = deletedNodeOffsetsPerMorsel[morselIdxAndOffset.first];
uint64_t morselBeginOffset = morselIdxAndOffset.first * DEFAULT_VECTOR_CAPACITY;
nodeOffsetVector->state->selVector->resetSelectorToValuePosBuffer();
void NodeTableStatsAndDeletedIDs::setDeletedNodeOffsetsForMorsel(ValueVector* nodeIDVector) const {
auto [morselIdx, _] = StorageUtils::getQuotientRemainder(
nodeIDVector->readNodeOffset(0), DEFAULT_VECTOR_CAPACITY);
if (hasDeletedNodesPerMorsel[morselIdx]) {
auto& deletedNodeOffsets = deletedNodeOffsetsPerMorsel.at(morselIdx);
uint64_t morselBeginOffset = morselIdx * DEFAULT_VECTOR_CAPACITY;
auto originalSize = nodeIDVector->state->getOriginalSize();
auto itr = deletedNodeOffsets.begin();
sel_t nextDeletedNodeOffset = *itr - morselBeginOffset;
uint64_t nextSelectedPosition = 0;
for (auto pos = 0u; pos < nodeOffsetVector->state->getOriginalSize(); ++pos) {
if (pos == nextDeletedNodeOffset) {
common::sel_t numSelectedValue = 0;
auto selectedBuffer = nodeIDVector->state->selVector->getSelectedPositionsBuffer();
KU_ASSERT(nodeIDVector->state->selVector->isUnfiltered());
for (auto pos = 0u; pos < nodeIDVector->state->getOriginalSize(); ++pos) {
if (itr == deletedNodeOffsets.end()) { // no more deleted offset to check.
selectedBuffer[numSelectedValue++] = pos;
continue;
}
if (pos + morselBeginOffset == *itr) { // node has been deleted.
itr++;
if (itr == deletedNodeOffsets.end()) {
nextDeletedNodeOffset = UINT16_MAX;
// We do not break because we need to keep setting the positions after
// the last deletedNodeOffset.
continue;
}
nextDeletedNodeOffset = *itr - morselBeginOffset;
continue;
}
nodeOffsetVector->state->selVector->selectedPositions[nextSelectedPosition++] = pos;
selectedBuffer[numSelectedValue++] = pos;
}
if (numSelectedValue != originalSize) {
nodeIDVector->state->selVector->resetSelectorToValuePosBuffer();
}
nodeOffsetVector->state->selVector->selectedSize =
nodeOffsetVector->state->getOriginalSize() - deletedNodeOffsets.size();
nodeIDVector->state->selVector->selectedSize = numSelectedValue;
}
}

Expand Down
19 changes: 12 additions & 7 deletions src/storage/stats/nodes_store_statistics.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "storage/stats/nodes_store_statistics.h"

using namespace kuzu::common;
using namespace kuzu::transaction;

namespace kuzu {
namespace storage {
Expand Down Expand Up @@ -28,8 +29,7 @@ void NodesStoreStatsAndDeletedIDs::updateNumTuplesByValue(table_id_t tableID, in
}

void NodesStoreStatsAndDeletedIDs::setDeletedNodeOffsetsForMorsel(
transaction::Transaction* transaction, const std::shared_ptr<ValueVector>& nodeOffsetVector,
table_id_t tableID) {
transaction::Transaction* tx, ValueVector* nodeIDVector, table_id_t tableID) {
// NOTE: We can remove the lock under the following assumptions, that should currently hold:
// 1) During the phases when nodeStatisticsAndDeletedIDsPerTableForReadOnlyTrx change, which
// is during checkpointing, this function, which is called during scans, cannot be called.
Expand All @@ -39,11 +39,16 @@ void NodesStoreStatsAndDeletedIDs::setDeletedNodeOffsetsForMorsel(
// query where scans/reads happen in a write transaction cannot run concurrently with the
// pipeline that performs an add/delete node.
lock_t lck{mtx};
(transaction->isReadOnly() || readWriteVersion == nullptr) ?
getNodeStatisticsAndDeletedIDs(transaction, tableID)
->setDeletedNodeOffsetsForMorsel(nodeOffsetVector) :
((NodeTableStatsAndDeletedIDs*)readWriteVersion->tableStatisticPerTable[tableID].get())
->setDeletedNodeOffsetsForMorsel(nodeOffsetVector);
const TablesStatisticsContent* content;
if (tx->isReadOnly() || readWriteVersion == nullptr) {
content = getVersion(TransactionType::READ_ONLY);
} else {
content = getVersion(TransactionType::WRITE);
}
auto tableStat = content->getTableStat(tableID);
auto nodeStat =
ku_dynamic_cast<const TableStatistics*, const NodeTableStatsAndDeletedIDs*>(tableStat);
nodeStat->setDeletedNodeOffsetsForMorsel(nodeIDVector);
}

void NodesStoreStatsAndDeletedIDs::addNodeStatisticsAndDeletedIDs(
Expand Down
7 changes: 6 additions & 1 deletion test/test_files/update_node/delete_tinysnb.test
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
9
10


-CASE OptionalDelete
-STATEMENT OPTIONAL MATCH (a:person) WHERE a.ID > 100 DELETE a;
---- ok
Expand Down Expand Up @@ -115,3 +114,9 @@ Runtime exception: Node(nodeOffset: 2) has connected edges in table workAt in th
-STATEMENT MATCH (a:person) RETURN COUNT(*)
---- 1
8

-CASE DeleteNodeWithSemiMaskBug
-STATEMENT MATCH (a) WHERE a.ID = 0 OR a.ID = 3 OR a.ID= 5 DETACH DELETE a;
---- ok
-STATEMENT MATCH (a:person)-[:knows]->(b:person) WHERE b.ID = 2 RETURN a.fName, b.fName;
---- 0

0 comments on commit fa528c1

Please sign in to comment.