Skip to content

Commit

Permalink
Refactor table read state (#3392)
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU authored and manh9203 committed Apr 29, 2024
1 parent a8ea1fc commit 06f5a91
Show file tree
Hide file tree
Showing 18 changed files with 167 additions and 144 deletions.
3 changes: 3 additions & 0 deletions src/include/common/types/internal_id_t.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <cstdint>
#include <unordered_map>
#include <unordered_set>
#include <vector>

Expand All @@ -16,6 +17,8 @@ using relID_t = internalID_t;
using table_id_t = uint64_t;
using table_id_vector_t = std::vector<table_id_t>;
using table_id_set_t = std::unordered_set<table_id_t>;
template<typename T>
using table_id_map_t = std::unordered_map<table_id_t, T>;
using offset_t = uint64_t;
constexpr table_id_t INVALID_TABLE_ID = UINT64_MAX;
constexpr offset_t INVALID_OFFSET = UINT64_MAX;
Expand Down
28 changes: 8 additions & 20 deletions src/include/processor/operator/scan/scan_multi_node_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,22 @@ namespace processor {
class ScanMultiNodeTables : public ScanTable {
public:
ScanMultiNodeTables(const DataPos& inVectorPos, std::vector<DataPos> outVectorsPos,
std::unordered_map<common::table_id_t, std::unique_ptr<ScanNodeTableInfo>> tables,
common::table_id_map_t<std::unique_ptr<ScanNodeTableInfo>> infos,
std::unique_ptr<PhysicalOperator> prevOperator, uint32_t id,
const std::string& paramsString)
: ScanTable{PhysicalOperatorType::SCAN_MULTI_NODE_TABLES, inVectorPos,
std::move(outVectorsPos), std::move(prevOperator), id, paramsString},
tables{std::move(tables)} {}

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
ScanTable::initLocalStateInternal(resultSet, context);
for (auto& [tableID, scanInfo] : tables) {
readStates[tableID] = std::make_unique<storage::TableReadState>(*inVector,
scanInfo->columnIDs, outVectors);
}
}
infos{std::move(infos)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
std::unordered_map<common::table_id_t, std::unique_ptr<ScanNodeTableInfo>> clonedTables;
for (const auto& table : tables) {
clonedTables[table.first] = table.second->copy();
}
return make_unique<ScanMultiNodeTables>(inVectorPos, outVectorsPos, std::move(clonedTables),
children[0]->clone(), id, paramsString);
}
std::unique_ptr<PhysicalOperator> clone() override;

private:
std::unordered_map<common::table_id_t, std::unique_ptr<ScanNodeTableInfo>> tables;
std::unordered_map<common::table_id_t, std::unique_ptr<storage::TableReadState>> readStates;
common::table_id_map_t<std::unique_ptr<ScanNodeTableInfo>> infos;
common::table_id_map_t<std::unique_ptr<storage::NodeTableReadState>> readStates;
};

} // namespace processor
Expand Down
13 changes: 6 additions & 7 deletions src/include/processor/operator/scan/scan_multi_rel_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@ namespace kuzu {
namespace processor {

class RelTableCollectionScanner {
friend class ScanMultiRelTable;

public:
explicit RelTableCollectionScanner(std::vector<std::unique_ptr<ScanRelTableInfo>> scanInfos)
: scanInfos{std::move(scanInfos)} {}

inline void resetState() {
void resetState() {
currentTableIdx = 0;
nextTableIdx = 0;
}

void init(common::ValueVector* inVector,
const std::vector<common::ValueVector*>& outputVectors);
bool scan(common::ValueVector* inVector, const std::vector<common::ValueVector*>& outputVectors,
transaction::Transaction* transaction);
bool scan(common::SelectionVector* selVector, transaction::Transaction* transaction);

std::unique_ptr<RelTableCollectionScanner> clone() const;

Expand All @@ -29,15 +28,15 @@ class RelTableCollectionScanner {
uint32_t nextTableIdx = 0;
};

class ScanMultiRelTable : public ScanRelTable {
class ScanMultiRelTable : public ScanTable {
using node_table_id_scanner_map_t =
std::unordered_map<common::table_id_t, std::unique_ptr<RelTableCollectionScanner>>;

public:
ScanMultiRelTable(node_table_id_scanner_map_t scannerPerNodeTable, const DataPos& inVectorPos,
std::vector<DataPos> outVectorsPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString)
: ScanRelTable{PhysicalOperatorType::SCAN_MULTI_REL_TABLES, nullptr /* info */, inVectorPos,
: ScanTable{PhysicalOperatorType::SCAN_MULTI_REL_TABLES, inVectorPos,
std::move(outVectorsPos), std::move(child), id, paramsString},
scannerPerNodeTable{std::move(scannerPerNodeTable)} {}

Expand Down
21 changes: 8 additions & 13 deletions src/include/processor/operator/scan/scan_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,25 @@ struct ScanNodeTableInfo {
}
};

class ScanSingleNodeTable : public ScanTable {
class ScanNodeTable final : public ScanTable {
public:
ScanSingleNodeTable(std::unique_ptr<ScanNodeTableInfo> info, const DataPos& inVectorPos,
ScanNodeTable(std::unique_ptr<ScanNodeTableInfo> info, const DataPos& inVectorPos,
std::vector<DataPos> outVectorsPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString)
: ScanSingleNodeTable{PhysicalOperatorType::SCAN_NODE_TABLE, std::move(info), inVectorPos,
: ScanNodeTable{PhysicalOperatorType::SCAN_NODE_TABLE, std::move(info), inVectorPos,
std::move(outVectorsPos), std::move(child), id, paramsString} {}

inline void initLocalStateInternal(ResultSet* resultSet,
ExecutionContext* executionContext) final {
ScanTable::initLocalStateInternal(resultSet, executionContext);
readState =
std::make_unique<storage::NodeTableReadState>(*inVector, info->columnIDs, outVectors);
}
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* executionContext) override;

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<ScanSingleNodeTable>(info->copy(), inVectorPos, outVectorsPos,
std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<ScanNodeTable>(info->copy(), nodeIDPos, outVectorsPos,
children[0]->clone(), id, paramsString);
}

protected:
ScanSingleNodeTable(PhysicalOperatorType operatorType, std::unique_ptr<ScanNodeTableInfo> info,
ScanNodeTable(PhysicalOperatorType operatorType, std::unique_ptr<ScanNodeTableInfo> info,
const DataPos& inVectorPos, std::vector<DataPos> outVectorsPos,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: ScanTable{operatorType, inVectorPos, std::move(outVectorsPos), std::move(child), id,
Expand All @@ -54,7 +49,7 @@ class ScanSingleNodeTable : public ScanTable {

private:
std::unique_ptr<ScanNodeTableInfo> info;
std::unique_ptr<storage::TableReadState> readState;
std::unique_ptr<storage::NodeTableReadState> readState;
};

} // namespace processor
Expand Down
14 changes: 3 additions & 11 deletions src/include/processor/operator/scan/scan_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,13 @@ class ScanRelTable : public ScanTable {
const std::string& paramsString)
: ScanRelTable{PhysicalOperatorType::SCAN_REL_TABLE, std::move(info), inVectorPos,
std::move(outVectorsPos), std::move(child), id, paramsString} {}
~ScanRelTable() override = default;

inline void initLocalStateInternal(ResultSet* resultSet,
ExecutionContext* executionContext) override {
ScanTable::initLocalStateInternal(resultSet, executionContext);
if (info) {
scanState = std::make_unique<storage::RelTableReadState>(*inVector, info->columnIDs,
outVectors, info->direction);
}
}
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* executionContext) override;

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ScanRelTable>(info->copy(), inVectorPos, outVectorsPos,
std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ScanRelTable>(info->copy(), nodeIDPos, outVectorsPos,
children[0]->clone(), id, paramsString);
}

Expand Down
22 changes: 15 additions & 7 deletions src/include/processor/operator/scan/scan_table.h
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
#pragma once

#include "processor/operator/physical_operator.h"
#include "storage/store/table.h"

namespace kuzu {
namespace processor {

class ScanTable : public PhysicalOperator {
public:
ScanTable(PhysicalOperatorType operatorType, const DataPos& inVectorPos,
ScanTable(PhysicalOperatorType operatorType, const DataPos& nodeIDPos,
std::vector<DataPos> outVectorsPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramString)
: PhysicalOperator{operatorType, std::move(child), id, paramString},
inVectorPos{inVectorPos}, outVectorsPos{std::move(outVectorsPos)} {}
: PhysicalOperator{operatorType, std::move(child), id, paramString}, nodeIDPos{nodeIDPos},
outVectorsPos{std::move(outVectorsPos)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* executionContext) override;
protected:
void initLocalStateInternal(ResultSet*, ExecutionContext*) override;

void initVectors(storage::TableReadState& state, const ResultSet& resultSet);

protected:
DataPos inVectorPos;
// Input node id vector position.
DataPos nodeIDPos;
// Output vector (properties or CSRs) positions
std::vector<DataPos> outVectorsPos;
common::ValueVector* inVector;
std::vector<common::ValueVector*> outVectors;
// Node id vector.
common::ValueVector* nodeIDVector;
// All output vectors share the same state.
common::DataChunkState* outState;
};

} // namespace processor
Expand Down
8 changes: 4 additions & 4 deletions src/include/processor/result/result_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ class ResultSet {
explicit ResultSet(uint32_t numDataChunks) : multiplicity{1}, dataChunks(numDataChunks) {}
ResultSet(ResultSetDescriptor* resultSetDescriptor, storage::MemoryManager* memoryManager);

inline void insert(uint32_t pos, std::shared_ptr<common::DataChunk> dataChunk) {
void insert(uint32_t pos, std::shared_ptr<common::DataChunk> dataChunk) {
KU_ASSERT(dataChunks.size() > pos);
dataChunks[pos] = std::move(dataChunk);
}

inline std::shared_ptr<common::DataChunk> getDataChunk(data_chunk_pos_t dataChunkPos) {
std::shared_ptr<common::DataChunk> getDataChunk(data_chunk_pos_t dataChunkPos) {
return dataChunks[dataChunkPos];
}
inline std::shared_ptr<common::ValueVector> getValueVector(const DataPos& dataPos) const {
std::shared_ptr<common::ValueVector> getValueVector(const DataPos& dataPos) const {
return dataChunks[dataPos.dataChunkPos]->valueVectors[dataPos.valueVectorPos];
}

// Our projection does NOT explicitly remove dataChunk from resultSet. Therefore, caller should
// always provide a set of positions when reading from multiple dataChunks.
inline uint64_t getNumTuples(const std::unordered_set<uint32_t>& dataChunksPosInScope) {
uint64_t getNumTuples(const std::unordered_set<uint32_t>& dataChunksPosInScope) {
return getNumTuplesWithoutMultiplicity(dataChunksPosInScope) * multiplicity;
}

Expand Down
17 changes: 9 additions & 8 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ namespace storage {
class LocalNodeTable;

struct NodeTableReadState : public TableReadState {
NodeTableReadState(const common::ValueVector& nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors)
: TableReadState{nodeIDVector, columnIDs, outputVectors} {
explicit NodeTableReadState(std::vector<common::column_id_t> columnIDs)
: TableReadState{std::move(columnIDs)} {
dataReadState = std::make_unique<NodeDataReadState>();
}
NodeTableReadState(const common::ValueVector* nodeIDVector,
std::vector<common::column_id_t> columnIDs, std::vector<common::ValueVector*> outputVectors)
: TableReadState{nodeIDVector, std::move(columnIDs), std::move(outputVectors)} {
dataReadState = std::make_unique<NodeDataReadState>();
}
};
Expand Down Expand Up @@ -79,10 +82,8 @@ class NodeTable final : public Table {
}

void initializeReadState(transaction::Transaction* transaction,
std::vector<common::column_id_t> columnIDs, const common::ValueVector& inNodeIDVector,
TableReadState& readState) {
readState.dataReadState = std::make_unique<NodeDataReadState>();
tableData->initializeReadState(transaction, std::move(columnIDs), inNodeIDVector,
std::vector<common::column_id_t> columnIDs, TableReadState& readState) {
tableData->initializeReadState(transaction, std::move(columnIDs), *readState.nodeIDVector,
*readState.dataReadState);
}
void read(transaction::Transaction* transaction, TableReadState& readState) override;
Expand Down
21 changes: 5 additions & 16 deletions src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ namespace storage {
struct RelTableReadState : public TableReadState {
common::RelDataDirection direction;

RelTableReadState(const common::ValueVector& nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors, common::RelDataDirection direction)
: TableReadState{nodeIDVector, columnIDs, outputVectors}, direction{direction} {
RelTableReadState(const std::vector<common::column_id_t>& columnIDs,
common::RelDataDirection direction)
: TableReadState{columnIDs}, direction{direction} {
dataReadState = std::make_unique<RelDataReadState>();
}

Expand Down Expand Up @@ -75,18 +74,8 @@ class RelTable final : public Table {

void initializeReadState(transaction::Transaction* transaction,
common::RelDataDirection direction, const std::vector<common::column_id_t>& columnIDs,
const common::ValueVector& inNodeIDVector, RelTableReadState& readState) {
if (!readState.dataReadState) {
readState.dataReadState = std::make_unique<RelDataReadState>();
}
return direction == common::RelDataDirection::FWD ?
fwdRelTableData->initializeReadState(transaction, columnIDs, inNodeIDVector,
common::ku_dynamic_cast<TableDataReadState&, RelDataReadState&>(
*readState.dataReadState)) :
bwdRelTableData->initializeReadState(transaction, columnIDs, inNodeIDVector,
common::ku_dynamic_cast<TableDataReadState&, RelDataReadState&>(
*readState.dataReadState));
}
RelTableReadState& readState);

void read(transaction::Transaction* transaction, TableReadState& readState) override;

void insert(transaction::Transaction* transaction, TableInsertState& insertState) override;
Expand Down
15 changes: 9 additions & 6 deletions src/include/storage/store/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@ namespace kuzu {
namespace storage {

struct TableReadState {
const common::ValueVector& nodeIDVector;
// Read only input node id vector.
const common::ValueVector* nodeIDVector;
std::vector<common::column_id_t> columnIDs;
const std::vector<common::ValueVector*>& outputVectors;
std::vector<common::ValueVector*> outputVectors;
std::unique_ptr<TableDataReadState> dataReadState;

TableReadState(const common::ValueVector& nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors)
explicit TableReadState(std::vector<common::column_id_t> columnIDs)
: columnIDs{std::move(columnIDs)} {}
TableReadState(const common::ValueVector* nodeIDVector,
std::vector<common::column_id_t> columnIDs, std::vector<common::ValueVector*> outputVectors)
: nodeIDVector{nodeIDVector}, columnIDs{std::move(columnIDs)},
outputVectors{outputVectors} {}
outputVectors{std::move(outputVectors)} {}
virtual ~TableReadState() = default;
DELETE_COPY_AND_MOVE(TableReadState);
};

struct TableInsertState {
Expand Down
2 changes: 1 addition & 1 deletion src/processor/map/map_scan_node_property.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapScanNodeProperty(
ku_dynamic_cast<storage::Table*, storage::NodeTable*>(
clientContext->getStorageManager()->getTable(tableID)),
std::move(columnIDs));
return std::make_unique<ScanSingleNodeTable>(std::move(info), inputNodeIDVectorPos,
return std::make_unique<ScanNodeTable>(std::move(info), inputNodeIDVectorPos,
std::move(outVectorsPos), std::move(prevOperator), getOperatorID(),
scanProperty.getExpressionsForPrinting());
}
Expand Down
32 changes: 25 additions & 7 deletions src/processor/operator/scan/scan_multi_node_tables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,37 @@ using namespace kuzu::common;
namespace kuzu {
namespace processor {

void ScanMultiNodeTables::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) {
ScanTable::initLocalStateInternal(resultSet, context);
for (auto& [id, info] : infos) {
auto readState = std::make_unique<storage::NodeTableReadState>(info->columnIDs);
ScanTable::initVectors(*readState, *resultSet);
readStates.insert({id, std::move(readState)});
}
}

bool ScanMultiNodeTables::getNextTuplesInternal(ExecutionContext* context) {
if (!children[0]->getNextTuple(context)) {
return false;
}
auto tableID =
inVector->getValue<nodeID_t>(inVector->state->selVector->selectedPositions[0]).tableID;
KU_ASSERT(readStates.contains(tableID) && tables.contains(tableID));
auto scanTableInfo = tables.at(tableID).get();
scanTableInfo->table->initializeReadState(context->clientContext->getTx(),
scanTableInfo->columnIDs, *inVector, *readStates[tableID]);
scanTableInfo->table->read(context->clientContext->getTx(), *readStates.at(tableID));
auto pos = nodeIDVector->state->selVector->selectedPositions[0];
auto tableID = nodeIDVector->getValue<nodeID_t>(pos).tableID;
KU_ASSERT(readStates.contains(tableID) && infos.contains(tableID));
auto info = infos.at(tableID).get();
info->table->initializeReadState(context->clientContext->getTx(), info->columnIDs,
*readStates[tableID]);
info->table->read(context->clientContext->getTx(), *readStates.at(tableID));
return true;
}

std::unique_ptr<PhysicalOperator> ScanMultiNodeTables::clone() {
common::table_id_map_t<std::unique_ptr<ScanNodeTableInfo>> clonedInfos;
for (auto& [id, info] : infos) {
clonedInfos.insert({id, info->copy()});
}
return make_unique<ScanMultiNodeTables>(nodeIDPos, outVectorsPos, std::move(clonedInfos),
children[0]->clone(), id, paramsString);
}

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit 06f5a91

Please sign in to comment.