Skip to content

Commit

Permalink
Refactor table read state
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Apr 28, 2024
1 parent 12b8495 commit f7d3ca1
Show file tree
Hide file tree
Showing 17 changed files with 152 additions and 133 deletions.
3 changes: 3 additions & 0 deletions src/include/common/types/internal_id_t.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <cstdint>
#include <unordered_map>
#include <unordered_set>
#include <vector>

Expand All @@ -16,6 +17,8 @@ using relID_t = internalID_t;
using table_id_t = uint64_t;
using table_id_vector_t = std::vector<table_id_t>;
using table_id_set_t = std::unordered_set<table_id_t>;
template<typename T>
using table_id_map_t = std::unordered_map<table_id_t, T>;
using offset_t = uint64_t;
constexpr table_id_t INVALID_TABLE_ID = UINT64_MAX;
constexpr offset_t INVALID_OFFSET = UINT64_MAX;
Expand Down
28 changes: 8 additions & 20 deletions src/include/processor/operator/scan/scan_multi_node_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,22 @@ namespace processor {
class ScanMultiNodeTables : public ScanTable {
public:
ScanMultiNodeTables(const DataPos& inVectorPos, std::vector<DataPos> outVectorsPos,
std::unordered_map<common::table_id_t, std::unique_ptr<ScanNodeTableInfo>> tables,
common::table_id_map_t<std::unique_ptr<ScanNodeTableInfo>> infos,
std::unique_ptr<PhysicalOperator> prevOperator, uint32_t id,
const std::string& paramsString)
: ScanTable{PhysicalOperatorType::SCAN_MULTI_NODE_TABLES, inVectorPos,
std::move(outVectorsPos), std::move(prevOperator), id, paramsString},
tables{std::move(tables)} {}

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
ScanTable::initLocalStateInternal(resultSet, context);
for (auto& [tableID, scanInfo] : tables) {
readStates[tableID] = std::make_unique<storage::TableReadState>(*inVector,
scanInfo->columnIDs, outVectors);
}
}
infos{std::move(infos)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
std::unordered_map<common::table_id_t, std::unique_ptr<ScanNodeTableInfo>> clonedTables;
for (const auto& table : tables) {
clonedTables[table.first] = table.second->copy();
}
return make_unique<ScanMultiNodeTables>(inVectorPos, outVectorsPos, std::move(clonedTables),
children[0]->clone(), id, paramsString);
}
std::unique_ptr<PhysicalOperator> clone() override;

private:
std::unordered_map<common::table_id_t, std::unique_ptr<ScanNodeTableInfo>> tables;
std::unordered_map<common::table_id_t, std::unique_ptr<storage::TableReadState>> readStates;
common::table_id_map_t<std::unique_ptr<ScanNodeTableInfo>> infos;
common::table_id_map_t<std::unique_ptr<storage::TableReadState>> readStates;
};

} // namespace processor
Expand Down
12 changes: 5 additions & 7 deletions src/include/processor/operator/scan/scan_multi_rel_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,17 @@ namespace kuzu {
namespace processor {

class RelTableCollectionScanner {

public:
explicit RelTableCollectionScanner(std::vector<std::unique_ptr<ScanRelTableInfo>> scanInfos)
: scanInfos{std::move(scanInfos)} {}

inline void resetState() {
void resetState() {
currentTableIdx = 0;
nextTableIdx = 0;
}

void init(common::ValueVector* inVector,
const std::vector<common::ValueVector*>& outputVectors);
bool scan(common::ValueVector* inVector, const std::vector<common::ValueVector*>& outputVectors,
transaction::Transaction* transaction);
bool scan(common::SelectionVector* selVector, transaction::Transaction* transaction);

std::unique_ptr<RelTableCollectionScanner> clone() const;

Expand All @@ -29,15 +27,15 @@ class RelTableCollectionScanner {
uint32_t nextTableIdx = 0;
};

class ScanMultiRelTable : public ScanRelTable {
class ScanMultiRelTable : public ScanTable {
using node_table_id_scanner_map_t =
std::unordered_map<common::table_id_t, std::unique_ptr<RelTableCollectionScanner>>;

public:
ScanMultiRelTable(node_table_id_scanner_map_t scannerPerNodeTable, const DataPos& inVectorPos,
std::vector<DataPos> outVectorsPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString)
: ScanRelTable{PhysicalOperatorType::SCAN_MULTI_REL_TABLES, nullptr /* info */, inVectorPos,
: ScanTable{PhysicalOperatorType::SCAN_MULTI_REL_TABLES, inVectorPos,
std::move(outVectorsPos), std::move(child), id, paramsString},
scannerPerNodeTable{std::move(scannerPerNodeTable)} {}

Expand Down
13 changes: 4 additions & 9 deletions src/include/processor/operator/scan/scan_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,20 @@ struct ScanNodeTableInfo {
}
};

class ScanSingleNodeTable : public ScanTable {
class ScanSingleNodeTable final : public ScanTable {
public:
ScanSingleNodeTable(std::unique_ptr<ScanNodeTableInfo> info, const DataPos& inVectorPos,
std::vector<DataPos> outVectorsPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString)
: ScanSingleNodeTable{PhysicalOperatorType::SCAN_NODE_TABLE, std::move(info), inVectorPos,
std::move(outVectorsPos), std::move(child), id, paramsString} {}

inline void initLocalStateInternal(ResultSet* resultSet,
ExecutionContext* executionContext) final {
ScanTable::initLocalStateInternal(resultSet, executionContext);
readState =
std::make_unique<storage::NodeTableReadState>(*inVector, info->columnIDs, outVectors);
}
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* executionContext) override;

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<ScanSingleNodeTable>(info->copy(), inVectorPos, outVectorsPos,
std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<ScanSingleNodeTable>(info->copy(), nodeIDPos, outVectorsPos,
children[0]->clone(), id, paramsString);
}

Expand Down
14 changes: 3 additions & 11 deletions src/include/processor/operator/scan/scan_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,13 @@ class ScanRelTable : public ScanTable {
const std::string& paramsString)
: ScanRelTable{PhysicalOperatorType::SCAN_REL_TABLE, std::move(info), inVectorPos,
std::move(outVectorsPos), std::move(child), id, paramsString} {}
~ScanRelTable() override = default;

inline void initLocalStateInternal(ResultSet* resultSet,
ExecutionContext* executionContext) override {
ScanTable::initLocalStateInternal(resultSet, executionContext);
if (info) {
scanState = std::make_unique<storage::RelTableReadState>(*inVector, info->columnIDs,
outVectors, info->direction);
}
}
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* executionContext) override;

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ScanRelTable>(info->copy(), inVectorPos, outVectorsPos,
std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ScanRelTable>(info->copy(), nodeIDPos, outVectorsPos,
children[0]->clone(), id, paramsString);
}

Expand Down
22 changes: 15 additions & 7 deletions src/include/processor/operator/scan/scan_table.h
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
#pragma once

#include "processor/operator/physical_operator.h"
#include "storage/store/table.h"

namespace kuzu {
namespace processor {

class ScanTable : public PhysicalOperator {
public:
ScanTable(PhysicalOperatorType operatorType, const DataPos& inVectorPos,
ScanTable(PhysicalOperatorType operatorType, const DataPos& nodeIDPos,
std::vector<DataPos> outVectorsPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramString)
: PhysicalOperator{operatorType, std::move(child), id, paramString},
inVectorPos{inVectorPos}, outVectorsPos{std::move(outVectorsPos)} {}
: PhysicalOperator{operatorType, std::move(child), id, paramString}, nodeIDPos{nodeIDPos},
outVectorsPos{std::move(outVectorsPos)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* executionContext) override;
protected:
void initLocalStateInternal(ResultSet*, ExecutionContext*) override;

void initVectors(storage::TableReadState& state, const ResultSet& resultSet);

protected:
DataPos inVectorPos;
// Input node id vector position.
DataPos nodeIDPos;
// Output vector (properties or CSRs) positions
std::vector<DataPos> outVectorsPos;
common::ValueVector* inVector;
std::vector<common::ValueVector*> outVectors;
// Node id vector.
common::ValueVector* nodeIDVector;
// All output vectors share the same state. Keep one of them to retrieve state.
common::ValueVector* anchorOutVector;
};

} // namespace processor
Expand Down
8 changes: 4 additions & 4 deletions src/include/processor/result/result_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ class ResultSet {
explicit ResultSet(uint32_t numDataChunks) : multiplicity{1}, dataChunks(numDataChunks) {}
ResultSet(ResultSetDescriptor* resultSetDescriptor, storage::MemoryManager* memoryManager);

inline void insert(uint32_t pos, std::shared_ptr<common::DataChunk> dataChunk) {
void insert(uint32_t pos, std::shared_ptr<common::DataChunk> dataChunk) {
KU_ASSERT(dataChunks.size() > pos);
dataChunks[pos] = std::move(dataChunk);
}

inline std::shared_ptr<common::DataChunk> getDataChunk(data_chunk_pos_t dataChunkPos) {
std::shared_ptr<common::DataChunk> getDataChunk(data_chunk_pos_t dataChunkPos) {
return dataChunks[dataChunkPos];
}
inline std::shared_ptr<common::ValueVector> getValueVector(const DataPos& dataPos) const {
std::shared_ptr<common::ValueVector> getValueVector(const DataPos& dataPos) const {
return dataChunks[dataPos.dataChunkPos]->valueVectors[dataPos.valueVectorPos];
}

// Our projection does NOT explicitly remove dataChunk from resultSet. Therefore, caller should
// always provide a set of positions when reading from multiple dataChunks.
inline uint64_t getNumTuples(const std::unordered_set<uint32_t>& dataChunksPosInScope) {
uint64_t getNumTuples(const std::unordered_set<uint32_t>& dataChunksPosInScope) {
return getNumTuplesWithoutMultiplicity(dataChunksPosInScope) * multiplicity;
}

Expand Down
12 changes: 5 additions & 7 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ namespace storage {
class LocalNodeTable;

struct NodeTableReadState : public TableReadState {
NodeTableReadState(const common::ValueVector& nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors)
: TableReadState{nodeIDVector, columnIDs, outputVectors} {
NodeTableReadState(std::vector<common::column_id_t> columnIDs)
: TableReadState{std::move(columnIDs)} {
dataReadState = std::make_unique<NodeDataReadState>();
}
};
Expand Down Expand Up @@ -79,10 +77,10 @@ class NodeTable final : public Table {
}

void initializeReadState(transaction::Transaction* transaction,
std::vector<common::column_id_t> columnIDs, const common::ValueVector& inNodeIDVector,
TableReadState& readState) {
std::vector<common::column_id_t> columnIDs, TableReadState& readState) {
// TODO(Guodong): we shouldn't create new read state.
readState.dataReadState = std::make_unique<NodeDataReadState>();
tableData->initializeReadState(transaction, std::move(columnIDs), inNodeIDVector,
tableData->initializeReadState(transaction, std::move(columnIDs), *readState.nodeIDVector,
*readState.dataReadState);
}
void read(transaction::Transaction* transaction, TableReadState& readState) override;
Expand Down
24 changes: 12 additions & 12 deletions src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ namespace storage {
struct RelTableReadState : public TableReadState {
common::RelDataDirection direction;

RelTableReadState(const common::ValueVector& nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors, common::RelDataDirection direction)
: TableReadState{nodeIDVector, columnIDs, outputVectors}, direction{direction} {
RelTableReadState(const std::vector<common::column_id_t>& columnIDs,
common::RelDataDirection direction)
: TableReadState{columnIDs}, direction{direction} {
dataReadState = std::make_unique<RelDataReadState>();
}

Expand Down Expand Up @@ -73,19 +72,20 @@ class RelTable final : public Table {
MemoryManager* memoryManager, catalog::RelTableCatalogEntry* relTableEntry, WAL* wal,
bool enableCompression);

inline void initializeReadState(transaction::Transaction* transaction,
// TODO: MOVE TO CPP
void initializeReadState(transaction::Transaction* transaction,
common::RelDataDirection direction, const std::vector<common::column_id_t>& columnIDs,
const common::ValueVector& inNodeIDVector, RelTableReadState& readState) {
RelTableReadState& readState) {
if (!readState.dataReadState) {
readState.dataReadState = std::make_unique<RelDataReadState>();
}
auto& dataState = common::ku_dynamic_cast<TableDataReadState&, RelDataReadState&>(
*readState.dataReadState);
return direction == common::RelDataDirection::FWD ?
fwdRelTableData->initializeReadState(transaction, columnIDs, inNodeIDVector,
common::ku_dynamic_cast<TableDataReadState&, RelDataReadState&>(
*readState.dataReadState)) :
bwdRelTableData->initializeReadState(transaction, columnIDs, inNodeIDVector,
common::ku_dynamic_cast<TableDataReadState&, RelDataReadState&>(
*readState.dataReadState));
fwdRelTableData->initializeReadState(transaction, columnIDs,
*readState.nodeIDVector, dataState) :
bwdRelTableData->initializeReadState(transaction, columnIDs,
*readState.nodeIDVector, dataState);
}
void read(transaction::Transaction* transaction, TableReadState& readState) override;

Expand Down
15 changes: 9 additions & 6 deletions src/include/storage/store/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@ namespace kuzu {
namespace storage {

struct TableReadState {
const common::ValueVector& nodeIDVector;
// Read only input node id vector.
const common::ValueVector* nodeIDVector;
std::vector<common::column_id_t> columnIDs;
const std::vector<common::ValueVector*>& outputVectors;
std::vector<common::ValueVector*> outputVectors;
std::unique_ptr<TableDataReadState> dataReadState;

TableReadState(const common::ValueVector& nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors)
explicit TableReadState(std::vector<common::column_id_t> columnIDs)
: columnIDs{std::move(columnIDs)} {}
TableReadState(const common::ValueVector* nodeIDVector,
std::vector<common::column_id_t> columnIDs, std::vector<common::ValueVector*> outputVectors)
: nodeIDVector{nodeIDVector}, columnIDs{std::move(columnIDs)},
outputVectors{outputVectors} {}
outputVectors{std::move(outputVectors)} {}
virtual ~TableReadState() = default;
DELETE_COPY_AND_MOVE(TableReadState);
};

struct TableInsertState {
Expand Down
32 changes: 25 additions & 7 deletions src/processor/operator/scan/scan_multi_node_tables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,37 @@ using namespace kuzu::common;
namespace kuzu {
namespace processor {

void ScanMultiNodeTables::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) {
ScanTable::initLocalStateInternal(resultSet, context);
for (auto& [id, info] : infos) {
auto readState = std::make_unique<storage::NodeTableReadState>(info->columnIDs);
ScanTable::initVectors(*readState, *resultSet);
readStates.insert({id, std::move(readState)});
}
}

bool ScanMultiNodeTables::getNextTuplesInternal(ExecutionContext* context) {
if (!children[0]->getNextTuple(context)) {
return false;
}
auto tableID =
inVector->getValue<nodeID_t>(inVector->state->selVector->selectedPositions[0]).tableID;
KU_ASSERT(readStates.contains(tableID) && tables.contains(tableID));
auto scanTableInfo = tables.at(tableID).get();
scanTableInfo->table->initializeReadState(context->clientContext->getTx(),
scanTableInfo->columnIDs, *inVector, *readStates[tableID]);
scanTableInfo->table->read(context->clientContext->getTx(), *readStates.at(tableID));
auto pos = nodeIDVector->state->selVector->selectedPositions[0];
auto tableID = nodeIDVector->getValue<nodeID_t>(pos).tableID;
KU_ASSERT(readStates.contains(tableID) && infos.contains(tableID));
auto info = infos.at(tableID).get();
info->table->initializeReadState(context->clientContext->getTx(), info->columnIDs,
*readStates[tableID]);
info->table->read(context->clientContext->getTx(), *readStates.at(tableID));
return true;
}

std::unique_ptr<PhysicalOperator> ScanMultiNodeTables::clone() {
common::table_id_map_t<std::unique_ptr<ScanNodeTableInfo>> clonedInfos;
for (auto& [id, info] : infos) {
clonedInfos.insert({id, info->copy()});
}
return make_unique<ScanMultiNodeTables>(nodeIDPos, outVectorsPos, std::move(clonedInfos),
children[0]->clone(), id, paramsString);
}

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit f7d3ca1

Please sign in to comment.