Skip to content

Commit

Permalink
fix scan and lookup of regular columns
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Nov 21, 2023
1 parent ed31305 commit ec23eaa
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 37 deletions.
21 changes: 20 additions & 1 deletion src/include/storage/local_storage/local_rel_table.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "common/column_data_format.h"
#include "common/vector/value_vector.h"
#include "storage/local_storage/local_table.h"

namespace kuzu {
Expand Down Expand Up @@ -82,9 +83,19 @@ class LocalRelNG final : public LocalNodeGroup {
common::row_idx_t scanCSR(common::offset_t srcOffsetInChunk,
common::offset_t posToReadForOffset, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVector);
void applyCSRUpdatesAndDeletions(common::offset_t srcOffsetInChunk,
// For CSR, we need to apply updates and deletions here, while insertions are handled by
// `scanCSR`.
void applyLocalChangesForCSRColumns(common::offset_t srcOffsetInChunk,
const std::vector<common::column_id_t>& columnIDs, common::ValueVector* relIDVector,
const std::vector<common::ValueVector*>& outputVector);
void applyLocalChangesForRegularColumns(common::ValueVector* srcNodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVector);
// Note that there is an implicit assumption that all outputVectors share the same state, thus
// only one posInVector is passed.
void applyLocalChangesForRegularColumns(common::offset_t offsetInChunk,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors, common::sel_t posInVector);

bool insert(common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
Expand All @@ -105,6 +116,14 @@ class LocalRelNG final : public LocalNodeGroup {
const std::vector<common::ValueVector*>& outputVector);
void applyCSRDeletions(common::offset_t srcOffsetInChunk,
const offset_to_offset_set_t& deleteInfo, common::ValueVector* relIDVector);
void applyRegularChangesToVector(common::ValueVector* srcNodeIDVector,
LocalVectorCollection* chunk, const offset_to_row_idx_t& updateInfo,
const offset_to_row_idx_t& insertInfo, const offset_set_t& deleteInfo,
common::ValueVector* outputVector);
void applyRegularChangesForOffset(common::offset_t offsetInChunk, LocalVectorCollection* chunk,
const offset_to_row_idx_t& updateInfo, const offset_to_row_idx_t& insertInfo,
const offset_set_t& deleteInfo, common::ValueVector* outputVector,
common::sel_t posInVector);

private:
std::unique_ptr<LocalVectorCollection> adjChunk;
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class RelTableData final : public TableData {
void rollbackInMemory();

private:
LocalRelNG* getLocalNodeGroup(
transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx);

void scanRegularColumns(transaction::Transaction* transaction, RelDataReadState& readState,
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors);
Expand Down
84 changes: 77 additions & 7 deletions src/storage/local_storage/local_rel_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ bool RegularRelNGInfo::insert(offset_t srcOffsetInChunk, offset_t /*relOffset*/,
if (adjInsertInfo.contains(srcOffsetInChunk) && !wasDeleted) {
throw RuntimeException{"Many-one, one-one relationship violated."};
}
if (wasDeleted) {
deleteInfo.erase(srcOffsetInChunk);
}
adjInsertInfo[srcOffsetInChunk] = adjNodeRowIdx;
for (auto i = 0u; i < propertyNodesRowIdx.size(); ++i) {
KU_ASSERT(!updateInfoPerChunk[i].contains(srcOffsetInChunk));
Expand Down Expand Up @@ -43,13 +46,12 @@ bool RegularRelNGInfo::delete_(offset_t srcOffsetInChunk, offset_t /*relOffset*/
if (adjInsertInfo.contains(srcOffsetInChunk)) {
// Delete newly inserted tuple.
adjInsertInfo.erase(srcOffsetInChunk);
}
if (deleteInfo.contains(srcOffsetInChunk)) {
// The node is already deleted.
return false;
} else {
if (deleteInfo.contains(srcOffsetInChunk)) {
// The node is already deleted.
return false;
} else {
deleteInfo.insert(srcOffsetInChunk);
}
deleteInfo.insert(srcOffsetInChunk);
}
return true;
}
Expand Down Expand Up @@ -183,7 +185,7 @@ row_idx_t LocalRelNG::scanCSR(offset_t srcOffsetInChunk, offset_t posToReadForOf
return posInVector;
}

void LocalRelNG::applyCSRUpdatesAndDeletions(offset_t srcOffsetInChunk,
void LocalRelNG::applyLocalChangesForCSRColumns(offset_t srcOffsetInChunk,
const std::vector<column_id_t>& columnIDs, ValueVector* relIDVector,
const std::vector<ValueVector*>& outputVector) {
KU_ASSERT(columnIDs.size() + 1 == outputVector.size());
Expand All @@ -202,6 +204,42 @@ void LocalRelNG::applyCSRUpdatesAndDeletions(offset_t srcOffsetInChunk,
}
}

void LocalRelNG::applyLocalChangesForRegularColumns(ValueVector* srcNodeIDVector,
const std::vector<column_id_t>& columnIDs, const std::vector<ValueVector*>& outputVector) {
KU_ASSERT(columnIDs.size() + 1 == outputVector.size());
auto regularRelNGInfo = ku_dynamic_cast<RelNGInfo*, RegularRelNGInfo*>(relNGInfo.get());
KU_ASSERT(regularRelNGInfo);
applyRegularChangesToVector(srcNodeIDVector, adjChunk.get(), {} /* updateInfo */,
regularRelNGInfo->adjInsertInfo, regularRelNGInfo->deleteInfo, outputVector[0]);
for (auto colIdx = 0u; colIdx < columnIDs.size(); colIdx++) {
auto columnID = columnIDs[colIdx];
// There is no need to apply deleteInfo on property columns, as adj column will be the one
// always read first and used to check nulls.
applyRegularChangesToVector(srcNodeIDVector, chunks[columnID].get(),
regularRelNGInfo->updateInfoPerChunk[columnID],
regularRelNGInfo->insertInfoPerChunk[columnID], {} /* deleteInfo */,
outputVector[colIdx + 1]);
}
}

void LocalRelNG::applyLocalChangesForRegularColumns(offset_t offsetInChunk,
const std::vector<column_id_t>& columnIDs, const std::vector<ValueVector*>& outputVectors,
sel_t posInVector) {
KU_ASSERT(columnIDs.size() + 1 == outputVectors.size());
auto regularRelNGInfo = ku_dynamic_cast<RelNGInfo*, RegularRelNGInfo*>(relNGInfo.get());
KU_ASSERT(regularRelNGInfo);
applyRegularChangesForOffset(offsetInChunk, adjChunk.get(), {} /* updateInfo */,
regularRelNGInfo->adjInsertInfo, regularRelNGInfo->deleteInfo, outputVectors[0],
posInVector);
for (auto colIdx = 0u; colIdx < columnIDs.size(); colIdx++) {
auto columnID = columnIDs[colIdx];
applyRegularChangesForOffset(offsetInChunk, chunks[columnID].get(),
regularRelNGInfo->updateInfoPerChunk[columnID],
regularRelNGInfo->insertInfoPerChunk[columnID], {} /* deleteInfo */,
outputVectors[colIdx + 1], posInVector);
}
}

void LocalRelNG::applyCSRUpdates(offset_t srcOffsetInChunk, column_id_t columnID,
const offset_to_offset_to_row_idx_t& updateInfo, ValueVector* relIDVector,
const std::vector<ValueVector*>& outputVector) {
Expand Down Expand Up @@ -243,6 +281,38 @@ void LocalRelNG::applyCSRDeletions(
}
}

void LocalRelNG::applyRegularChangesToVector(common::ValueVector* srcNodeIDVector,
LocalVectorCollection* chunk, const offset_to_row_idx_t& updateInfo,
const offset_to_row_idx_t& insertInfo, const offset_set_t& deleteInfo,
common::ValueVector* outputVector) {
if (updateInfo.empty() && insertInfo.empty() && deleteInfo.empty()) {
return;
}
for (auto i = 0u; i < srcNodeIDVector->state->selVector->selectedSize; i++) {
auto selPos = srcNodeIDVector->state->selVector->selectedPositions[i];
auto offsetInChunk =
srcNodeIDVector->getValue<nodeID_t>(selPos).offset - nodeGroupStartOffset;
applyRegularChangesForOffset(
offsetInChunk, chunk, updateInfo, insertInfo, deleteInfo, outputVector, selPos);
}
}

void LocalRelNG::applyRegularChangesForOffset(common::offset_t offsetInChunk,
LocalVectorCollection* chunk, const offset_to_row_idx_t& updateInfo,
const offset_to_row_idx_t& insertInfo, const offset_set_t& deleteInfo,
common::ValueVector* outputVector, common::sel_t posInVector) {
row_idx_t rowIdx = updateInfo.contains(offsetInChunk) ? updateInfo.at(offsetInChunk) :
insertInfo.contains(offsetInChunk) ? insertInfo.at(offsetInChunk) :
INVALID_ROW_IDX;
if (rowIdx != INVALID_ROW_IDX) {
auto posInLocalVector = rowIdx & (DEFAULT_VECTOR_CAPACITY - 1);
outputVector->copyFromVectorData(
posInVector, chunk->getLocalVector(rowIdx)->getVector(), posInLocalVector);
} else if (deleteInfo.contains(offsetInChunk)) {
outputVector->setNull(posInVector, true /* isNull */);
}
}

bool LocalRelNG::insert(ValueVector* srcNodeIDVector, ValueVector* dstNodeIDVector,
const std::vector<ValueVector*>& propertyVectors) {
KU_ASSERT(propertyVectors.size() == chunks.size() && propertyVectors.size() >= 1);
Expand Down
61 changes: 47 additions & 14 deletions src/storage/store/rel_table_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ void RelTableData::initializeReadState(Transaction* transaction,
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
if (dataFormat == ColumnDataFormat::CSR) {
auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
// Reset to read from beginning for the csr of the new node offset.
readState->posInCurrentCSR = 0;
if (readState->isOutOfRange(nodeOffset)) {
// Scan csr offsets and populate csr list entries for the new node group.
Expand All @@ -130,17 +131,7 @@ void RelTableData::initializeReadState(Transaction* transaction,
readState->numNodes = readState->csrOffsetChunk->getNumValues();
readState->populateCSRListEntries();
if (transaction->isWriteTransaction()) {
auto localTableData = transaction->getLocalStorage()->getLocalTableData(
tableID, getDataIdxFromDirection(direction));
if (localTableData) {
auto localRelTableData =
ku_dynamic_cast<LocalTableData*, LocalRelTableData*>(localTableData);
readState->localNodeGroup =
localRelTableData->nodeGroups.contains(nodeGroupIdx) ?
ku_dynamic_cast<LocalNodeGroup*, LocalRelNG*>(
localRelTableData->nodeGroups.at(nodeGroupIdx).get()) :
nullptr;
}
readState->localNodeGroup = getLocalNodeGroup(transaction, nodeGroupIdx);
}
}
if (nodeOffset != readState->currentNodeOffset) {
Expand All @@ -151,10 +142,26 @@ void RelTableData::initializeReadState(Transaction* transaction,
readState->readFromLocalStorage = false;
}

LocalRelNG* RelTableData::getLocalNodeGroup(
transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx) {
auto localTableData = transaction->getLocalStorage()->getLocalTableData(
tableID, getDataIdxFromDirection(direction));
LocalRelNG* localNodeGroup = nullptr;
if (localTableData) {
auto localRelTableData =
ku_dynamic_cast<LocalTableData*, LocalRelTableData*>(localTableData);
if (localRelTableData->nodeGroups.contains(nodeGroupIdx)) {
localNodeGroup = ku_dynamic_cast<LocalNodeGroup*, LocalRelNG*>(
localRelTableData->nodeGroups.at(nodeGroupIdx).get());
}
}
return localNodeGroup;
}

void RelTableData::scanRegularColumns(Transaction* transaction, RelDataReadState& readState,
ValueVector* inNodeIDVector, const std::vector<ValueVector*>& outputVectors) {
adjColumn->scan(transaction, inNodeIDVector, outputVectors[0]);
if (!ValueVector::discardNull(*outputVectors[0])) {
if (transaction->isReadOnly() && !ValueVector::discardNull(*outputVectors[0])) {
return;
}
for (auto i = 0u; i < readState.columnIDs.size(); i++) {
Expand All @@ -167,6 +174,18 @@ void RelTableData::scanRegularColumns(Transaction* transaction, RelDataReadState
columns[readState.columnIDs[i]]->scan(
transaction, inNodeIDVector, outputVectors[outputVectorId]);
}
if (transaction->isWriteTransaction()) {
auto nodeOffset = inNodeIDVector->readNodeOffset(0);
auto relIDVector = outputVectors[REL_ID_COLUMN_ID];
auto relID = relIDVector->getValue<offset_t>(0);
auto localNodeGroup =
getLocalNodeGroup(transaction, StorageUtils::getNodeGroupIdx(nodeOffset));
if (localNodeGroup) {
localNodeGroup->applyLocalChangesForRegularColumns(
inNodeIDVector, readState.columnIDs, outputVectors);
}
ValueVector::discardNull(*outputVectors[0]);
}
}

void RelTableData::scanCSRColumns(Transaction* transaction, RelDataReadState& readState,
Expand Down Expand Up @@ -205,7 +224,7 @@ void RelTableData::scanCSRColumns(Transaction* transaction, RelDataReadState& re
inNodeIDVector->readNodeOffset(inNodeIDVector->state->selVector->selectedPositions[0]);
KU_ASSERT(relIDVectorIdx != INVALID_VECTOR_IDX);
auto relIDVector = outputVectors[relIDVectorIdx];
readState.localNodeGroup->applyCSRUpdatesAndDeletions(
readState.localNodeGroup->applyLocalChangesForCSRColumns(
nodeOffset - readState.startNodeOffset, readState.columnIDs, relIDVector,
outputVectors);
}
Expand All @@ -216,7 +235,7 @@ void RelTableData::lookup(Transaction* transaction, TableReadState& readState,
KU_ASSERT(dataFormat == ColumnDataFormat::REGULAR);
// Note: The scan operator should guarantee that the first property in the output is adj column.
adjColumn->lookup(transaction, inNodeIDVector, outputVectors[0]);
if (!ValueVector::discardNull(*outputVectors[0])) {
if (transaction->isReadOnly() && !ValueVector::discardNull(*outputVectors[0])) {
return;
}
for (auto i = 0u; i < readState.columnIDs.size(); i++) {
Expand All @@ -229,6 +248,20 @@ void RelTableData::lookup(Transaction* transaction, TableReadState& readState,
columns[readState.columnIDs[i]]->lookup(
transaction, inNodeIDVector, outputVectors[outputVectorId]);
}
if (transaction->isWriteTransaction()) {
for (auto pos = 0u; pos < inNodeIDVector->state->selVector->selectedSize; pos++) {
auto selPos = inNodeIDVector->state->selVector->selectedPositions[pos];
auto nodeOffset = inNodeIDVector->readNodeOffset(selPos);
auto [nodeGroupIdx, offsetInChunk] =
StorageUtils::getNodeGroupIdxAndOffsetInChunk(nodeOffset);
auto localNodeGroup = getLocalNodeGroup(transaction, nodeGroupIdx);
if (localNodeGroup) {
localNodeGroup->applyLocalChangesForRegularColumns(
offsetInChunk, readState.columnIDs, outputVectors, selPos);
}
}
ValueVector::discardNull(*outputVectors[0]);
}
}

void RelTableData::insert(transaction::Transaction* transaction, ValueVector* srcNodeIDVector,
Expand Down
14 changes: 0 additions & 14 deletions test/test_files/transaction/create_rel/violate_error.test
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,6 @@
--


-CASE ViolateManyOneMultiplicityError
-STATEMENT BEGIN TRANSACTION
---- ok
-STATEMENT MATCH (p1:person), (p2:person) WHERE p1.ID = 11 AND p2.ID = 10 CREATE (p1)-[:teaches]->(p2);
---- error
Runtime exception: Many-one, one-one relationship violated.

-CASE ViolateOneOneMultiplicityError
-STATEMENT BEGIN TRANSACTION
---- ok
-STATEMENT MATCH (a:animal), (p:person) WHERE a.ID = 2 AND p.ID = 10 CREATE (a)-[:hasOwner]->(p);
---- error
Runtime exception: Many-one, one-one relationship violated.

-CASE InsertAfterDeleteRel
-STATEMENT BEGIN TRANSACTION
---- ok
Expand Down
24 changes: 23 additions & 1 deletion test/test_files/update_rel/create_empty.test
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,29 @@
---- 1
1099512015344|3

-CASE CreateAndScanRel
-CASE CreateAndScanOneToOneRel
-STATEMENT CREATE NODE TABLE N1(ID INT64, PRIMARY KEY(ID));
---- ok
-STATEMENT CREATE NODE TABLE N2(ID INT64, PRIMARY KEY(ID));
---- ok
-STATEMENT CREATE REL TABLE Rel1(FROM N1 TO N2, ONE_ONE);
---- ok
-STATEMENT CREATE (:N1 {ID: 10}), (:N1 {ID: 1}), (:N2 {ID: 12}), (:N2 {ID: 8})
---- ok
-STATEMENT BEGIN TRANSACTION
---- ok
-STATEMENT MATCH (n1:N1), (n2:N2) WHERE n1.ID=10 AND n2.ID=12 CREATE (n1)-[r:Rel1]->(n2)
---- ok
-STATEMENT MATCH (n:N1)-[r:Rel1]->(m:N2) RETURN n.ID, m.ID
---- 1
10|12
-STATEMENT COMMIT
---- ok
-STATEMENT MATCH (n:N1)-[r:Rel1]->(m:N2) RETURN n.ID, m.ID
---- 1
10|12

-CASE CreateAndScanManyToManyRel
-STATEMENT CREATE NODE TABLE N1(ID INT64, PRIMARY KEY(ID));
---- ok
-STATEMENT CREATE NODE TABLE N2(ID INT64, PRIMARY KEY(ID));
Expand Down
24 changes: 24 additions & 0 deletions test/test_files/update_rel/delete_tinysnb.test
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,27 @@ Binder exception: Delete undirected rel is not supported.
-STATEMENT MATCH (a:person)-[e]->(b:person) DETACH DELETE e
---- error
Binder exception: Detach delete on rel tables is not supported.

-CASE MixedDeleteInsertAndSetOneToOneRel
-STATEMENT BEGIN TRANSACTION
---- ok
-STATEMENT MATCH (a:person)-[e:marries]->(b:person) WHERE a.ID = 3 AND b.ID = 5 DELETE e
---- ok
-STATEMENT MATCH (a:person)-[e:marries]->(b:person) WHERE a.ID = 3 AND b.ID = 5 RETURN COUNT(*)
---- 1
0
-STATEMENT MATCH (a:person), (b:person) WHERE a.ID = 3 AND b.ID =10 CREATE (a)-[:marries]->(b)
---- ok
-STATEMENT MATCH (a:person)-[e:marries]->(b:person) WHERE a.ID = 3 AND b.ID =10 RETURN COUNT(*)
---- 1
1
-STATEMENT MATCH (a:person)-[e:marries]->(b:person) WHERE a.ID = 3 AND b.ID =10 SET e.note='new one';
---- ok
-STATEMENT MATCH (a:person)-[e:marries]->(b:person) WHERE a.ID = 3 AND b.ID =10 RETURN e.note
---- 1
new one
-STATEMENT COMMIT
---- ok
-STATEMENT MATCH (a:person)-[e:marries]->(b:person) WHERE a.ID = 3 AND b.ID =10 RETURN e.note
---- 1
new one
7 changes: 7 additions & 0 deletions test/test_files/update_rel/merge_tinysnb.test
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,10 @@
-STATEMENT MATCH (a:person), (b:person) WHERE a.ID = 0 AND b.ID = 7 MERGE (a)-[r:knows {date:a.birthdate}]->(b) RETURN r;
---- 1
(0:0)-{_LABEL: knows, _ID: 0:14, date: 1900-01-01}->(0:4)

-CASE MergeOneToOne
-STATEMENT MATCH (a:person), (b:person) WHERE a.ID = 9 AND b.ID = 10 MERGE (a)-[r:marries]->(b) ON CREATE SET a.age = 0, r.note = 'merged ntoes'
---- ok
-STATEMENT MATCH (a:person), (b:person) WHERE a.ID = 9 AND b.ID = 10 MATCH (a)-[r:marries]->(b) RETURN r.note, a.age
---- 1
merged ntoes|0
Loading

0 comments on commit ec23eaa

Please sign in to comment.