Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scan and lookup of regular columns #2480

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion src/include/storage/local_storage/local_rel_table.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "common/column_data_format.h"
#include "common/vector/value_vector.h"
#include "storage/local_storage/local_table.h"

namespace kuzu {
Expand Down Expand Up @@ -82,9 +83,19 @@ class LocalRelNG final : public LocalNodeGroup {
common::row_idx_t scanCSR(common::offset_t srcOffsetInChunk,
common::offset_t posToReadForOffset, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVector);
void applyCSRUpdatesAndDeletions(common::offset_t srcOffsetInChunk,
// For CSR, we need to apply updates and deletions here, while insertions are handled by
// `scanCSR`.
void applyLocalChangesForCSRColumns(common::offset_t srcOffsetInChunk,
const std::vector<common::column_id_t>& columnIDs, common::ValueVector* relIDVector,
const std::vector<common::ValueVector*>& outputVector);
void applyLocalChangesForRegularColumns(common::ValueVector* srcNodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVector);
// Note that there is an implicit assumption that all outputVectors share the same state, thus
// only one posInVector is passed.
void applyLocalChangesForRegularColumns(common::offset_t offsetInChunk,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors, common::sel_t posInVector);

bool insert(common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
Expand All @@ -105,6 +116,14 @@ class LocalRelNG final : public LocalNodeGroup {
const std::vector<common::ValueVector*>& outputVector);
void applyCSRDeletions(common::offset_t srcOffsetInChunk,
const offset_to_offset_set_t& deleteInfo, common::ValueVector* relIDVector);
void applyRegularChangesToVector(common::ValueVector* srcNodeIDVector,
LocalVectorCollection* chunk, const offset_to_row_idx_t& updateInfo,
const offset_to_row_idx_t& insertInfo, const offset_set_t& deleteInfo,
common::ValueVector* outputVector);
void applyRegularChangesForOffset(common::offset_t offsetInChunk, LocalVectorCollection* chunk,
const offset_to_row_idx_t& updateInfo, const offset_to_row_idx_t& insertInfo,
const offset_set_t& deleteInfo, common::ValueVector* outputVector,
common::sel_t posInVector);

private:
std::unique_ptr<LocalVectorCollection> adjChunk;
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class RelTableData final : public TableData {
void rollbackInMemory();

private:
LocalRelNG* getLocalNodeGroup(
transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx);

void scanRegularColumns(transaction::Transaction* transaction, RelDataReadState& readState,
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors);
Expand Down
84 changes: 77 additions & 7 deletions src/storage/local_storage/local_rel_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ bool RegularRelNGInfo::insert(offset_t srcOffsetInChunk, offset_t /*relOffset*/,
if (adjInsertInfo.contains(srcOffsetInChunk) && !wasDeleted) {
throw RuntimeException{"Many-one, one-one relationship violated."};
}
if (wasDeleted) {
deleteInfo.erase(srcOffsetInChunk);
}
adjInsertInfo[srcOffsetInChunk] = adjNodeRowIdx;
for (auto i = 0u; i < propertyNodesRowIdx.size(); ++i) {
KU_ASSERT(!updateInfoPerChunk[i].contains(srcOffsetInChunk));
Expand Down Expand Up @@ -43,13 +46,12 @@ bool RegularRelNGInfo::delete_(offset_t srcOffsetInChunk, offset_t /*relOffset*/
if (adjInsertInfo.contains(srcOffsetInChunk)) {
// Delete newly inserted tuple.
adjInsertInfo.erase(srcOffsetInChunk);
}
if (deleteInfo.contains(srcOffsetInChunk)) {
// The node is already deleted.
return false;
} else {
if (deleteInfo.contains(srcOffsetInChunk)) {
// The node is already deleted.
return false;
} else {
deleteInfo.insert(srcOffsetInChunk);
}
deleteInfo.insert(srcOffsetInChunk);
}
return true;
}
Expand Down Expand Up @@ -183,7 +185,7 @@ row_idx_t LocalRelNG::scanCSR(offset_t srcOffsetInChunk, offset_t posToReadForOf
return posInVector;
}

void LocalRelNG::applyCSRUpdatesAndDeletions(offset_t srcOffsetInChunk,
void LocalRelNG::applyLocalChangesForCSRColumns(offset_t srcOffsetInChunk,
const std::vector<column_id_t>& columnIDs, ValueVector* relIDVector,
const std::vector<ValueVector*>& outputVector) {
KU_ASSERT(columnIDs.size() + 1 == outputVector.size());
Expand All @@ -202,6 +204,42 @@ void LocalRelNG::applyCSRUpdatesAndDeletions(offset_t srcOffsetInChunk,
}
}

void LocalRelNG::applyLocalChangesForRegularColumns(ValueVector* srcNodeIDVector,
const std::vector<column_id_t>& columnIDs, const std::vector<ValueVector*>& outputVector) {
KU_ASSERT(columnIDs.size() + 1 == outputVector.size());
auto regularRelNGInfo = ku_dynamic_cast<RelNGInfo*, RegularRelNGInfo*>(relNGInfo.get());
KU_ASSERT(regularRelNGInfo);
applyRegularChangesToVector(srcNodeIDVector, adjChunk.get(), {} /* updateInfo */,
regularRelNGInfo->adjInsertInfo, regularRelNGInfo->deleteInfo, outputVector[0]);
for (auto colIdx = 0u; colIdx < columnIDs.size(); colIdx++) {
auto columnID = columnIDs[colIdx];
// There is no need to apply deleteInfo on property columns, as adj column will be the one
// always read first and used to check nulls.
applyRegularChangesToVector(srcNodeIDVector, chunks[columnID].get(),
regularRelNGInfo->updateInfoPerChunk[columnID],
regularRelNGInfo->insertInfoPerChunk[columnID], {} /* deleteInfo */,
outputVector[colIdx + 1]);
}
}

void LocalRelNG::applyLocalChangesForRegularColumns(offset_t offsetInChunk,
const std::vector<column_id_t>& columnIDs, const std::vector<ValueVector*>& outputVectors,
sel_t posInVector) {
KU_ASSERT(columnIDs.size() + 1 == outputVectors.size());
auto regularRelNGInfo = ku_dynamic_cast<RelNGInfo*, RegularRelNGInfo*>(relNGInfo.get());
KU_ASSERT(regularRelNGInfo);
applyRegularChangesForOffset(offsetInChunk, adjChunk.get(), {} /* updateInfo */,
regularRelNGInfo->adjInsertInfo, regularRelNGInfo->deleteInfo, outputVectors[0],
posInVector);
for (auto colIdx = 0u; colIdx < columnIDs.size(); colIdx++) {
auto columnID = columnIDs[colIdx];
applyRegularChangesForOffset(offsetInChunk, chunks[columnID].get(),
regularRelNGInfo->updateInfoPerChunk[columnID],
regularRelNGInfo->insertInfoPerChunk[columnID], {} /* deleteInfo */,
outputVectors[colIdx + 1], posInVector);
}
}

void LocalRelNG::applyCSRUpdates(offset_t srcOffsetInChunk, column_id_t columnID,
const offset_to_offset_to_row_idx_t& updateInfo, ValueVector* relIDVector,
const std::vector<ValueVector*>& outputVector) {
Expand Down Expand Up @@ -243,6 +281,38 @@ void LocalRelNG::applyCSRDeletions(
}
}

void LocalRelNG::applyRegularChangesToVector(common::ValueVector* srcNodeIDVector,
LocalVectorCollection* chunk, const offset_to_row_idx_t& updateInfo,
const offset_to_row_idx_t& insertInfo, const offset_set_t& deleteInfo,
common::ValueVector* outputVector) {
if (updateInfo.empty() && insertInfo.empty() && deleteInfo.empty()) {
return;
}
for (auto i = 0u; i < srcNodeIDVector->state->selVector->selectedSize; i++) {
auto selPos = srcNodeIDVector->state->selVector->selectedPositions[i];
auto offsetInChunk =
srcNodeIDVector->getValue<nodeID_t>(selPos).offset - nodeGroupStartOffset;
applyRegularChangesForOffset(
offsetInChunk, chunk, updateInfo, insertInfo, deleteInfo, outputVector, selPos);
}
}

void LocalRelNG::applyRegularChangesForOffset(common::offset_t offsetInChunk,
LocalVectorCollection* chunk, const offset_to_row_idx_t& updateInfo,
const offset_to_row_idx_t& insertInfo, const offset_set_t& deleteInfo,
common::ValueVector* outputVector, common::sel_t posInVector) {
row_idx_t rowIdx = updateInfo.contains(offsetInChunk) ? updateInfo.at(offsetInChunk) :
insertInfo.contains(offsetInChunk) ? insertInfo.at(offsetInChunk) :
INVALID_ROW_IDX;
if (rowIdx != INVALID_ROW_IDX) {
auto posInLocalVector = rowIdx & (DEFAULT_VECTOR_CAPACITY - 1);
outputVector->copyFromVectorData(
posInVector, chunk->getLocalVector(rowIdx)->getVector(), posInLocalVector);
} else if (deleteInfo.contains(offsetInChunk)) {
outputVector->setNull(posInVector, true /* isNull */);
}
}

bool LocalRelNG::insert(ValueVector* srcNodeIDVector, ValueVector* dstNodeIDVector,
const std::vector<ValueVector*>& propertyVectors) {
KU_ASSERT(propertyVectors.size() == chunks.size() && propertyVectors.size() >= 1);
Expand Down
59 changes: 45 additions & 14 deletions src/storage/store/rel_table_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ void RelTableData::initializeReadState(Transaction* transaction,
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
if (dataFormat == ColumnDataFormat::CSR) {
auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
// Reset to read from beginning for the csr of the new node offset.
readState->posInCurrentCSR = 0;
if (readState->isOutOfRange(nodeOffset)) {
// Scan csr offsets and populate csr list entries for the new node group.
Expand All @@ -130,17 +131,7 @@ void RelTableData::initializeReadState(Transaction* transaction,
readState->numNodes = readState->csrOffsetChunk->getNumValues();
readState->populateCSRListEntries();
if (transaction->isWriteTransaction()) {
auto localTableData = transaction->getLocalStorage()->getLocalTableData(
tableID, getDataIdxFromDirection(direction));
if (localTableData) {
auto localRelTableData =
ku_dynamic_cast<LocalTableData*, LocalRelTableData*>(localTableData);
readState->localNodeGroup =
localRelTableData->nodeGroups.contains(nodeGroupIdx) ?
ku_dynamic_cast<LocalNodeGroup*, LocalRelNG*>(
localRelTableData->nodeGroups.at(nodeGroupIdx).get()) :
nullptr;
}
readState->localNodeGroup = getLocalNodeGroup(transaction, nodeGroupIdx);
}
}
if (nodeOffset != readState->currentNodeOffset) {
Expand All @@ -151,10 +142,26 @@ void RelTableData::initializeReadState(Transaction* transaction,
readState->readFromLocalStorage = false;
}

LocalRelNG* RelTableData::getLocalNodeGroup(
transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx) {
auto localTableData = transaction->getLocalStorage()->getLocalTableData(
tableID, getDataIdxFromDirection(direction));
LocalRelNG* localNodeGroup = nullptr;
if (localTableData) {
auto localRelTableData =
ku_dynamic_cast<LocalTableData*, LocalRelTableData*>(localTableData);
if (localRelTableData->nodeGroups.contains(nodeGroupIdx)) {
localNodeGroup = ku_dynamic_cast<LocalNodeGroup*, LocalRelNG*>(
localRelTableData->nodeGroups.at(nodeGroupIdx).get());
}
}
return localNodeGroup;
}

void RelTableData::scanRegularColumns(Transaction* transaction, RelDataReadState& readState,
ValueVector* inNodeIDVector, const std::vector<ValueVector*>& outputVectors) {
adjColumn->scan(transaction, inNodeIDVector, outputVectors[0]);
if (!ValueVector::discardNull(*outputVectors[0])) {
if (transaction->isReadOnly() && !ValueVector::discardNull(*outputVectors[0])) {
return;
}
for (auto i = 0u; i < readState.columnIDs.size(); i++) {
Expand All @@ -167,6 +174,16 @@ void RelTableData::scanRegularColumns(Transaction* transaction, RelDataReadState
columns[readState.columnIDs[i]]->scan(
transaction, inNodeIDVector, outputVectors[outputVectorId]);
}
if (transaction->isWriteTransaction()) {
auto nodeOffset = inNodeIDVector->readNodeOffset(0);
auto localNodeGroup =
getLocalNodeGroup(transaction, StorageUtils::getNodeGroupIdx(nodeOffset));
if (localNodeGroup) {
localNodeGroup->applyLocalChangesForRegularColumns(
inNodeIDVector, readState.columnIDs, outputVectors);
}
ValueVector::discardNull(*outputVectors[0]);
}
}

void RelTableData::scanCSRColumns(Transaction* transaction, RelDataReadState& readState,
Expand Down Expand Up @@ -205,7 +222,7 @@ void RelTableData::scanCSRColumns(Transaction* transaction, RelDataReadState& re
inNodeIDVector->readNodeOffset(inNodeIDVector->state->selVector->selectedPositions[0]);
KU_ASSERT(relIDVectorIdx != INVALID_VECTOR_IDX);
auto relIDVector = outputVectors[relIDVectorIdx];
readState.localNodeGroup->applyCSRUpdatesAndDeletions(
readState.localNodeGroup->applyLocalChangesForCSRColumns(
nodeOffset - readState.startNodeOffset, readState.columnIDs, relIDVector,
outputVectors);
}
Expand All @@ -216,7 +233,7 @@ void RelTableData::lookup(Transaction* transaction, TableReadState& readState,
KU_ASSERT(dataFormat == ColumnDataFormat::REGULAR);
// Note: The scan operator should guarantee that the first property in the output is adj column.
adjColumn->lookup(transaction, inNodeIDVector, outputVectors[0]);
if (!ValueVector::discardNull(*outputVectors[0])) {
if (transaction->isReadOnly() && !ValueVector::discardNull(*outputVectors[0])) {
return;
}
for (auto i = 0u; i < readState.columnIDs.size(); i++) {
Expand All @@ -229,6 +246,20 @@ void RelTableData::lookup(Transaction* transaction, TableReadState& readState,
columns[readState.columnIDs[i]]->lookup(
transaction, inNodeIDVector, outputVectors[outputVectorId]);
}
if (transaction->isWriteTransaction()) {
for (auto pos = 0u; pos < inNodeIDVector->state->selVector->selectedSize; pos++) {
auto selPos = inNodeIDVector->state->selVector->selectedPositions[pos];
auto nodeOffset = inNodeIDVector->readNodeOffset(selPos);
auto [nodeGroupIdx, offsetInChunk] =
StorageUtils::getNodeGroupIdxAndOffsetInChunk(nodeOffset);
auto localNodeGroup = getLocalNodeGroup(transaction, nodeGroupIdx);
if (localNodeGroup) {
localNodeGroup->applyLocalChangesForRegularColumns(
offsetInChunk, readState.columnIDs, outputVectors, selPos);
}
}
ValueVector::discardNull(*outputVectors[0]);
}
}

void RelTableData::insert(transaction::Transaction* transaction, ValueVector* srcNodeIDVector,
Expand Down
14 changes: 0 additions & 14 deletions test/test_files/transaction/create_rel/violate_error.test
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,6 @@
--


-CASE ViolateManyOneMultiplicityError
-STATEMENT BEGIN TRANSACTION
---- ok
-STATEMENT MATCH (p1:person), (p2:person) WHERE p1.ID = 11 AND p2.ID = 10 CREATE (p1)-[:teaches]->(p2);
---- error
Runtime exception: Many-one, one-one relationship violated.

-CASE ViolateOneOneMultiplicityError
-STATEMENT BEGIN TRANSACTION
---- ok
-STATEMENT MATCH (a:animal), (p:person) WHERE a.ID = 2 AND p.ID = 10 CREATE (a)-[:hasOwner]->(p);
---- error
Runtime exception: Many-one, one-one relationship violated.

-CASE InsertAfterDeleteRel
-STATEMENT BEGIN TRANSACTION
---- ok
Expand Down
24 changes: 23 additions & 1 deletion test/test_files/update_rel/create_empty.test
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,29 @@
---- 1
1099512015344|3

-CASE CreateAndScanRel
-CASE CreateAndScanOneToOneRel
-STATEMENT CREATE NODE TABLE N1(ID INT64, PRIMARY KEY(ID));
---- ok
-STATEMENT CREATE NODE TABLE N2(ID INT64, PRIMARY KEY(ID));
---- ok
-STATEMENT CREATE REL TABLE Rel1(FROM N1 TO N2, ONE_ONE);
---- ok
-STATEMENT CREATE (:N1 {ID: 10}), (:N1 {ID: 1}), (:N2 {ID: 12}), (:N2 {ID: 8})
---- ok
-STATEMENT BEGIN TRANSACTION
---- ok
-STATEMENT MATCH (n1:N1), (n2:N2) WHERE n1.ID=10 AND n2.ID=12 CREATE (n1)-[r:Rel1]->(n2)
---- ok
-STATEMENT MATCH (n:N1)-[r:Rel1]->(m:N2) RETURN n.ID, m.ID
---- 1
10|12
-STATEMENT COMMIT
---- ok
-STATEMENT MATCH (n:N1)-[r:Rel1]->(m:N2) RETURN n.ID, m.ID
---- 1
10|12

-CASE CreateAndScanManyToManyRel
-STATEMENT CREATE NODE TABLE N1(ID INT64, PRIMARY KEY(ID));
---- ok
-STATEMENT CREATE NODE TABLE N2(ID INT64, PRIMARY KEY(ID));
Expand Down
24 changes: 24 additions & 0 deletions test/test_files/update_rel/delete_tinysnb.test
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,27 @@ Binder exception: Delete undirected rel is not supported.
-STATEMENT MATCH (a:person)-[e]->(b:person) DETACH DELETE e
---- error
Binder exception: Detach delete on rel tables is not supported.

-CASE MixedDeleteInsertAndSetOneToOneRel
-STATEMENT BEGIN TRANSACTION
---- ok
-STATEMENT MATCH (a:person)-[e:marries]->(b:person) WHERE a.ID = 3 AND b.ID = 5 DELETE e
---- ok
-STATEMENT MATCH (a:person)-[e:marries]->(b:person) WHERE a.ID = 3 AND b.ID = 5 RETURN COUNT(*)
---- 1
0
-STATEMENT MATCH (a:person), (b:person) WHERE a.ID = 3 AND b.ID =10 CREATE (a)-[:marries]->(b)
---- ok
-STATEMENT MATCH (a:person)-[e:marries]->(b:person) WHERE a.ID = 3 AND b.ID =10 RETURN COUNT(*)
---- 1
1
-STATEMENT MATCH (a:person)-[e:marries]->(b:person) WHERE a.ID = 3 AND b.ID =10 SET e.note='new one';
---- ok
-STATEMENT MATCH (a:person)-[e:marries]->(b:person) WHERE a.ID = 3 AND b.ID =10 RETURN e.note
---- 1
new one
-STATEMENT COMMIT
---- ok
-STATEMENT MATCH (a:person)-[e:marries]->(b:person) WHERE a.ID = 3 AND b.ID =10 RETURN e.note
---- 1
new one
7 changes: 7 additions & 0 deletions test/test_files/update_rel/merge_tinysnb.test
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,10 @@
-STATEMENT MATCH (a:person), (b:person) WHERE a.ID = 0 AND b.ID = 7 MERGE (a)-[r:knows {date:a.birthdate}]->(b) RETURN r;
---- 1
(0:0)-{_LABEL: knows, _ID: 0:14, date: 1900-01-01}->(0:4)

-CASE MergeOneToOne
-STATEMENT MATCH (a:person), (b:person) WHERE a.ID = 9 AND b.ID = 10 MERGE (a)-[r:marries]->(b) ON CREATE SET a.age = 0, r.note = 'merged ntoes'
---- ok
-STATEMENT MATCH (a:person), (b:person) WHERE a.ID = 9 AND b.ID = 10 MATCH (a)-[r:marries]->(b) RETURN r.note, a.age
---- 1
merged ntoes|0
Loading