Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scan after delete bug #3176

Merged
merged 1 commit into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions src/include/storage/stats/node_table_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,8 @@ class NodeTableStatsAndDeletedIDs : public TableStatistics {

void deleteNode(common::offset_t nodeOffset);

// This function assumes that it is being called right after ScanNodeID has obtained a
// morsel and that the nodeID structs in nodeOffsetVector.values have consecutive node
// offsets and the same tableID.
void setDeletedNodeOffsetsForMorsel(
const std::shared_ptr<common::ValueVector>& nodeOffsetVector);
// This function assumes nodeIDVector have consecutive node offsets and the same tableID.
void setDeletedNodeOffsetsForMorsel(common::ValueVector* nodeIDVector) const;

void setNumTuples(uint64_t numTuples) override;

Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/stats/nodes_store_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ class NodesStoreStatsAndDeletedIDs : public TablesStatistics {
getNodeTableStats(transaction::TransactionType::WRITE, tableID)->deleteNode(nodeOffset);
}

void setDeletedNodeOffsetsForMorsel(transaction::Transaction* transaction,
const std::shared_ptr<common::ValueVector>& nodeOffsetVector, common::table_id_t tableID);
void setDeletedNodeOffsetsForMorsel(transaction::Transaction* tx,
common::ValueVector* nodeIDVector, common::table_id_t tableID);

void addNodeStatisticsAndDeletedIDs(catalog::NodeTableCatalogEntry* nodeTableEntry);

Expand Down
10 changes: 10 additions & 0 deletions src/include/storage/stats/table_statistics_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ namespace storage {

struct TablesStatisticsContent {
std::unordered_map<common::table_id_t, std::unique_ptr<TableStatistics>> tableStatisticPerTable;

const TableStatistics* getTableStat(common::table_id_t tableID) const {
KU_ASSERT(tableStatisticPerTable.contains(tableID));
return tableStatisticPerTable.at(tableID).get();
}
};

class WAL;
Expand Down Expand Up @@ -87,6 +92,11 @@ class TablesStatistics {
virtual std::string getTableStatisticsFilePath(
const std::string& directory, common::FileVersionType dbFileType) = 0;

const TablesStatisticsContent* getVersion(transaction::TransactionType type) const {
return type == transaction::TransactionType::READ_ONLY ? readOnlyVersion.get() :
readWriteVersion.get();
}

void readFromFile();
void readFromFile(common::FileVersionType dbFileType);

Expand Down
10 changes: 6 additions & 4 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ class NodeTable final : public Table {
tablesStatistics);
return nodesStats->getMaxNodeOffset(transaction, tableID);
}
inline void setSelVectorForDeletedOffsets(
transaction::Transaction* trx, std::shared_ptr<common::ValueVector>& vector) const {
void setSelVectorForDeletedOffsets(
transaction::Transaction* trx, common::ValueVector* vector) const {
KU_ASSERT(vector->isSequential());
common::ku_dynamic_cast<TablesStatistics*, NodesStoreStatsAndDeletedIDs*>(tablesStatistics)
->setDeletedNodeOffsetsForMorsel(trx, vector, tableID);
auto nodeStateCollection =
common::ku_dynamic_cast<TablesStatistics*, NodesStoreStatsAndDeletedIDs*>(
tablesStatistics);
nodeStateCollection->setDeletedNodeOffsetsForMorsel(trx, vector, tableID);
}

inline void initializeReadState(transaction::Transaction* transaction,
Expand Down
18 changes: 10 additions & 8 deletions src/processor/operator/scan_node_id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ bool ScanNodeID::getNextTuplesInternal(ExecutionContext* context) {
if (state == nullptr) {
return false;
}
outValueVector->state->selVector->resetSelectorToUnselected();
auto nodeIDValues = (nodeID_t*)(outValueVector->getData());
auto size = endOffset - startOffset;
for (auto i = 0u; i < size; ++i) {
Expand All @@ -83,24 +84,25 @@ bool ScanNodeID::getNextTuplesInternal(ExecutionContext* context) {

void ScanNodeID::setSelVector(ExecutionContext* context, NodeTableScanState* tableState,
offset_t startOffset, offset_t endOffset) {
// Apply changes to the selVector from nodes metadata.
tableState->getTable()->setSelVectorForDeletedOffsets(
context->clientContext->getTx(), outValueVector.get());
if (tableState->isSemiMaskEnabled()) {
outValueVector->state->selVector->resetSelectorToValuePosBuffer();
auto selectedBuffer = outValueVector->state->selVector->getSelectedPositionsBuffer();
sel_t prevSelectedSize = outValueVector->state->selVector->selectedSize;
// Fill selected positions based on node mask for nodes between the given startOffset and
// endOffset. If the node is masked (i.e., valid for read), then it is set to the selected
// positions. Finally, we update the selectedSize for selVector.
sel_t numSelectedValues = 0;
for (auto i = 0u; i < (endOffset - startOffset); i++) {
outValueVector->state->selVector->selectedPositions[numSelectedValues] = i;
selectedBuffer[numSelectedValues] = i;
numSelectedValues += tableState->getSemiMask()->isNodeMasked(i + startOffset);
}
outValueVector->state->selVector->selectedSize = numSelectedValues;
} else {
// By default, the selected positions is set to the const incremental pos array.
outValueVector->state->selVector->resetSelectorToUnselected();
if (prevSelectedSize != numSelectedValues) {
outValueVector->state->selVector->resetSelectorToValuePosBuffer();
}
}
// Apply changes to the selVector from nodes metadata.
tableState->getTable()->setSelVectorForDeletedOffsets(
context->clientContext->getTx(), outValueVector);
}

double ScanNodeID::getProgress(ExecutionContext* /*context*/) const {
Expand Down
43 changes: 21 additions & 22 deletions src/storage/stats/node_table_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,33 +104,32 @@ void NodeTableStatsAndDeletedIDs::deleteNode(offset_t nodeOffset) {

// Note: this function will always be called right after scanNodeID, so we have the guarantee
// that the nodeOffsetVector is always unselected.
void NodeTableStatsAndDeletedIDs::setDeletedNodeOffsetsForMorsel(
const std::shared_ptr<ValueVector>& nodeOffsetVector) {
auto morselIdxAndOffset = StorageUtils::getQuotientRemainder(
nodeOffsetVector->readNodeOffset(0), DEFAULT_VECTOR_CAPACITY);
if (hasDeletedNodesPerMorsel[morselIdxAndOffset.first]) {
auto deletedNodeOffsets = deletedNodeOffsetsPerMorsel[morselIdxAndOffset.first];
uint64_t morselBeginOffset = morselIdxAndOffset.first * DEFAULT_VECTOR_CAPACITY;
nodeOffsetVector->state->selVector->resetSelectorToValuePosBuffer();
void NodeTableStatsAndDeletedIDs::setDeletedNodeOffsetsForMorsel(ValueVector* nodeIDVector) const {
auto [morselIdx, _] = StorageUtils::getQuotientRemainder(
nodeIDVector->readNodeOffset(0), DEFAULT_VECTOR_CAPACITY);
if (hasDeletedNodesPerMorsel[morselIdx]) {
auto& deletedNodeOffsets = deletedNodeOffsetsPerMorsel.at(morselIdx);
uint64_t morselBeginOffset = morselIdx * DEFAULT_VECTOR_CAPACITY;
auto originalSize = nodeIDVector->state->getOriginalSize();
auto itr = deletedNodeOffsets.begin();
sel_t nextDeletedNodeOffset = *itr - morselBeginOffset;
uint64_t nextSelectedPosition = 0;
for (auto pos = 0u; pos < nodeOffsetVector->state->getOriginalSize(); ++pos) {
if (pos == nextDeletedNodeOffset) {
common::sel_t numSelectedValue = 0;
auto selectedBuffer = nodeIDVector->state->selVector->getSelectedPositionsBuffer();
KU_ASSERT(nodeIDVector->state->selVector->isUnfiltered());
for (auto pos = 0u; pos < nodeIDVector->state->getOriginalSize(); ++pos) {
if (itr == deletedNodeOffsets.end()) { // no more deleted offset to check.
selectedBuffer[numSelectedValue++] = pos;
continue;
}
if (pos + morselBeginOffset == *itr) { // node has been deleted.
itr++;
if (itr == deletedNodeOffsets.end()) {
nextDeletedNodeOffset = UINT16_MAX;
// We do not break because we need to keep setting the positions after
// the last deletedNodeOffset.
continue;
}
nextDeletedNodeOffset = *itr - morselBeginOffset;
continue;
}
nodeOffsetVector->state->selVector->selectedPositions[nextSelectedPosition++] = pos;
selectedBuffer[numSelectedValue++] = pos;
}
if (numSelectedValue != originalSize) {
nodeIDVector->state->selVector->resetSelectorToValuePosBuffer();
}
nodeOffsetVector->state->selVector->selectedSize =
nodeOffsetVector->state->getOriginalSize() - deletedNodeOffsets.size();
nodeIDVector->state->selVector->selectedSize = numSelectedValue;
}
}

Expand Down
19 changes: 12 additions & 7 deletions src/storage/stats/nodes_store_statistics.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "storage/stats/nodes_store_statistics.h"

using namespace kuzu::common;
using namespace kuzu::transaction;

namespace kuzu {
namespace storage {
Expand Down Expand Up @@ -28,8 +29,7 @@ void NodesStoreStatsAndDeletedIDs::updateNumTuplesByValue(table_id_t tableID, in
}

void NodesStoreStatsAndDeletedIDs::setDeletedNodeOffsetsForMorsel(
transaction::Transaction* transaction, const std::shared_ptr<ValueVector>& nodeOffsetVector,
table_id_t tableID) {
transaction::Transaction* tx, ValueVector* nodeIDVector, table_id_t tableID) {
// NOTE: We can remove the lock under the following assumptions, that should currently hold:
// 1) During the phases when nodeStatisticsAndDeletedIDsPerTableForReadOnlyTrx change, which
// is during checkpointing, this function, which is called during scans, cannot be called.
Expand All @@ -39,11 +39,16 @@ void NodesStoreStatsAndDeletedIDs::setDeletedNodeOffsetsForMorsel(
// query where scans/reads happen in a write transaction cannot run concurrently with the
// pipeline that performs an add/delete node.
lock_t lck{mtx};
(transaction->isReadOnly() || readWriteVersion == nullptr) ?
getNodeStatisticsAndDeletedIDs(transaction, tableID)
->setDeletedNodeOffsetsForMorsel(nodeOffsetVector) :
((NodeTableStatsAndDeletedIDs*)readWriteVersion->tableStatisticPerTable[tableID].get())
->setDeletedNodeOffsetsForMorsel(nodeOffsetVector);
const TablesStatisticsContent* content;
if (tx->isReadOnly() || readWriteVersion == nullptr) {
content = getVersion(TransactionType::READ_ONLY);
} else {
content = getVersion(TransactionType::WRITE);
}
auto tableStat = content->getTableStat(tableID);
auto nodeStat =
ku_dynamic_cast<const TableStatistics*, const NodeTableStatsAndDeletedIDs*>(tableStat);
nodeStat->setDeletedNodeOffsetsForMorsel(nodeIDVector);
}

void NodesStoreStatsAndDeletedIDs::addNodeStatisticsAndDeletedIDs(
Expand Down
7 changes: 6 additions & 1 deletion test/test_files/update_node/delete_tinysnb.test
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
9
10


-CASE OptionalDelete
-STATEMENT OPTIONAL MATCH (a:person) WHERE a.ID > 100 DELETE a;
---- ok
Expand Down Expand Up @@ -115,3 +114,9 @@ Runtime exception: Node(nodeOffset: 2) has connected edges in table workAt in th
-STATEMENT MATCH (a:person) RETURN COUNT(*)
---- 1
8

-CASE DeleteNodeWithSemiMaskBug
-STATEMENT MATCH (a) WHERE a.ID = 0 OR a.ID = 3 OR a.ID= 5 DETACH DELETE a;
---- ok
-STATEMENT MATCH (a:person)-[:knows]->(b:person) WHERE b.ID = 2 RETURN a.fName, b.fName;
---- 0
Loading