Skip to content

Commit

Permalink
fix node table states in scan multi node tables
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Apr 28, 2024
1 parent 543df8c commit 4dddb48
Show file tree
Hide file tree
Showing 11 changed files with 40 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ScanMultiNodeTables : public ScanTable {

private:
common::table_id_map_t<std::unique_ptr<ScanNodeTableInfo>> infos;
common::table_id_map_t<std::unique_ptr<storage::TableReadState>> readStates;
common::table_id_map_t<std::unique_ptr<storage::NodeTableReadState>> readStates;
};

} // namespace processor
Expand Down
12 changes: 6 additions & 6 deletions src/include/processor/operator/scan/scan_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,25 @@ struct ScanNodeTableInfo {
}
};

class ScanSingleNodeTable final : public ScanTable {
class ScanNodeTable final : public ScanTable {
public:
ScanSingleNodeTable(std::unique_ptr<ScanNodeTableInfo> info, const DataPos& inVectorPos,
ScanNodeTable(std::unique_ptr<ScanNodeTableInfo> info, const DataPos& inVectorPos,
std::vector<DataPos> outVectorsPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString)
: ScanSingleNodeTable{PhysicalOperatorType::SCAN_NODE_TABLE, std::move(info), inVectorPos,
: ScanNodeTable{PhysicalOperatorType::SCAN_NODE_TABLE, std::move(info), inVectorPos,
std::move(outVectorsPos), std::move(child), id, paramsString} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* executionContext) override;

bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<ScanSingleNodeTable>(info->copy(), nodeIDPos, outVectorsPos,
return make_unique<ScanNodeTable>(info->copy(), nodeIDPos, outVectorsPos,
children[0]->clone(), id, paramsString);
}

protected:
ScanSingleNodeTable(PhysicalOperatorType operatorType, std::unique_ptr<ScanNodeTableInfo> info,
ScanNodeTable(PhysicalOperatorType operatorType, std::unique_ptr<ScanNodeTableInfo> info,
const DataPos& inVectorPos, std::vector<DataPos> outVectorsPos,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: ScanTable{operatorType, inVectorPos, std::move(outVectorsPos), std::move(child), id,
Expand All @@ -49,7 +49,7 @@ class ScanSingleNodeTable final : public ScanTable {

private:
std::unique_ptr<ScanNodeTableInfo> info;
std::unique_ptr<storage::TableReadState> readState;
std::unique_ptr<storage::NodeTableReadState> readState;
};

} // namespace processor
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/scan/scan_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class ScanTable : public PhysicalOperator {
std::vector<DataPos> outVectorsPos;
// Node id vector.
common::ValueVector* nodeIDVector;
// All output vectors share the same state. Keep one of them to retrieve state.
common::ValueVector* anchorOutVector;
// All output vectors share the same state.
common::DataChunkState* outState;
};

} // namespace processor
Expand Down
7 changes: 6 additions & 1 deletion src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ namespace storage {
class LocalNodeTable;

struct NodeTableReadState : public TableReadState {
NodeTableReadState(std::vector<common::column_id_t> columnIDs)
explicit NodeTableReadState(std::vector<common::column_id_t> columnIDs)
: TableReadState{std::move(columnIDs)} {
dataReadState = std::make_unique<NodeDataReadState>();
}
NodeTableReadState(const common::ValueVector* nodeIDVector,
std::vector<common::column_id_t> columnIDs, std::vector<common::ValueVector*> outputVectors)
: TableReadState{nodeIDVector, std::move(columnIDs), std::move(outputVectors)} {
dataReadState = std::make_unique<NodeDataReadState>();
}
};

struct NodeTableInsertState : public TableInsertState {
Expand Down
14 changes: 2 additions & 12 deletions src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,8 @@ class RelTable final : public Table {

void initializeReadState(transaction::Transaction* transaction,
common::RelDataDirection direction, const std::vector<common::column_id_t>& columnIDs,
RelTableReadState& readState) {
if (!readState.dataReadState) {
readState.dataReadState = std::make_unique<RelDataReadState>();
}
auto& dataState = common::ku_dynamic_cast<TableDataReadState&, RelDataReadState&>(
*readState.dataReadState);
return direction == common::RelDataDirection::FWD ?
fwdRelTableData->initializeReadState(transaction, columnIDs,
*readState.nodeIDVector, dataState) :
bwdRelTableData->initializeReadState(transaction, columnIDs,
*readState.nodeIDVector, dataState);
}
RelTableReadState& readState);

void read(transaction::Transaction* transaction, TableReadState& readState) override;

void insert(transaction::Transaction* transaction, TableInsertState& insertState) override;
Expand Down
2 changes: 1 addition & 1 deletion src/processor/map/map_scan_node_property.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapScanNodeProperty(
ku_dynamic_cast<storage::Table*, storage::NodeTable*>(
clientContext->getStorageManager()->getTable(tableID)),
std::move(columnIDs));
return std::make_unique<ScanSingleNodeTable>(std::move(info), inputNodeIDVectorPos,
return std::make_unique<ScanNodeTable>(std::move(info), inputNodeIDVectorPos,
std::move(outVectorsPos), std::move(prevOperator), getOperatorID(),
scanProperty.getExpressionsForPrinting());
}
Expand Down
7 changes: 3 additions & 4 deletions src/processor/operator/scan/scan_multi_rel_tables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ void ScanMultiRelTable::initLocalStateInternal(ResultSet* resultSet, ExecutionCo
bool ScanMultiRelTable::getNextTuplesInternal(ExecutionContext* context) {
while (true) {
if (currentScanner != nullptr &&
currentScanner->scan(anchorOutVector->state->selVector.get(),
context->clientContext->getTx())) {
metrics->numOutputTuple.increase(anchorOutVector->state->selVector->selectedSize);
currentScanner->scan(outState->selVector.get(), context->clientContext->getTx())) {
metrics->numOutputTuple.increase(outState->selVector->selectedSize);
return true;
}
if (!children[0]->getNextTuple(context)) {
Expand All @@ -65,7 +64,7 @@ bool ScanMultiRelTable::getNextTuplesInternal(ExecutionContext* context) {
}
auto currentIdx = nodeIDVector->state->selVector->selectedPositions[0];
if (nodeIDVector->isNull(currentIdx)) {
anchorOutVector->state->selVector->selectedSize = 0;
outState->selVector->selectedSize = 0;
continue;
}
auto nodeID = nodeIDVector->getValue<nodeID_t>(currentIdx);
Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/scan/scan_node_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ using namespace kuzu::common;
namespace kuzu {
namespace processor {

void ScanSingleNodeTable::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) {
void ScanNodeTable::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) {
ScanTable::initLocalStateInternal(resultSet, context);
readState = std::make_unique<storage::NodeTableReadState>(info->columnIDs);
ScanTable::initVectors(*readState, *resultSet);
}

bool ScanSingleNodeTable::getNextTuplesInternal(ExecutionContext* context) {
bool ScanNodeTable::getNextTuplesInternal(ExecutionContext* context) {
if (!children[0]->getNextTuple(context)) {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/scan/scan_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace processor {
void ScanTable::initLocalStateInternal(ResultSet* resultSet, ExecutionContext*) {
nodeIDVector = resultSet->getValueVector(nodeIDPos).get();
KU_ASSERT(!outVectorsPos.empty());
anchorOutVector = resultSet->getValueVector(outVectorsPos[0]).get();
outState = resultSet->getValueVector(outVectorsPos[0])->state.get();
}

void ScanTable::initVectors(storage::TableReadState& state, const ResultSet& resultSet) {
Expand Down
6 changes: 2 additions & 4 deletions src/storage/store/node_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ void NodeTable::delete_(Transaction* transaction, TableDeleteState& deleteState)
}
auto pkColumnIDs = {pkColumnID};
auto pkVectors = std::vector<ValueVector*>{&nodeDeleteState.pkVector};
// TODO(Guodong): we are we creating TableReadState instead of NodeTableReadState?
auto readState =
std::make_unique<TableReadState>(&nodeDeleteState.nodeIDVector, pkColumnIDs, pkVectors);
std::make_unique<NodeTableReadState>(&nodeDeleteState.nodeIDVector, pkColumnIDs, pkVectors);
initializeReadState(transaction, pkColumnIDs, *readState);
read(transaction, *readState);
if (pkIndex) {
Expand Down Expand Up @@ -187,8 +186,7 @@ void NodeTable::updatePK(Transaction* transaction, column_id_t columnID,
pkVector->state = nodeIDVector.state;
auto outputVectors = std::vector<ValueVector*>{pkVector.get()};
auto columnIDs = {columnID};
// TODO(Guodong): we are we creating TableReadState instead of NodeTableReadState?
auto readState = std::make_unique<TableReadState>(&nodeIDVector, columnIDs, outputVectors);
auto readState = std::make_unique<NodeTableReadState>(&nodeIDVector, columnIDs, outputVectors);
initializeReadState(transaction, columnIDs, *readState);
read(transaction, *readState);
pkIndex->delete_(pkVector.get());
Expand Down
14 changes: 14 additions & 0 deletions src/storage/store/rel_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ RelTable::RelTable(BMFileHandle* dataFH, BMFileHandle* metadataFH, RelsStoreStat
relTableEntry, relsStoreStats, RelDataDirection::BWD, enableCompression);
}

void RelTable::initializeReadState(Transaction* transaction, RelDataDirection direction,
const std::vector<column_id_t>& columnIDs, RelTableReadState& readState) {
if (!readState.dataReadState) {
readState.dataReadState = std::make_unique<RelDataReadState>();
}
auto& dataState =
common::ku_dynamic_cast<TableDataReadState&, RelDataReadState&>(*readState.dataReadState);
return direction == common::RelDataDirection::FWD ?
fwdRelTableData->initializeReadState(transaction, columnIDs, *readState.nodeIDVector,
dataState) :
bwdRelTableData->initializeReadState(transaction, columnIDs, *readState.nodeIDVector,
dataState);
}

void RelTable::read(Transaction* transaction, TableReadState& readState) {
auto& relReadState = ku_dynamic_cast<TableReadState&, RelTableReadState&>(readState);
scan(transaction, relReadState);
Expand Down

0 comments on commit 4dddb48

Please sign in to comment.