Skip to content

Commit

Permalink
Merge pull request #1110 from kuzudb/physical-operator-type
Browse files Browse the repository at this point in the history
refactor physical operator type
  • Loading branch information
andyfengHKU committed Dec 12, 2022
2 parents 21e535c + 7033759 commit de3b589
Show file tree
Hide file tree
Showing 50 changed files with 343 additions and 281 deletions.
2 changes: 1 addition & 1 deletion src/include/main/plan_printer.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class PlanPrinter {
}

static inline string getOperatorName(PhysicalOperator* physicalOperator) {
return PhysicalOperatorTypeNames[physicalOperator->getOperatorType()];
return PhysicalOperatorUtils::operatorTypeToString(physicalOperator->getOperatorType());
}

static inline string getOperatorParams(PhysicalOperator* physicalOperator) {
Expand Down
5 changes: 2 additions & 3 deletions src/include/processor/operator/aggregate/base_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@ class BaseAggregate : public Sink {
vector<DataPos> aggregateVectorsPos,
vector<unique_ptr<AggregateFunction>> aggregateFunctions,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: Sink{std::move(resultSetDescriptor), std::move(child), id, paramsString},
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::AGGREGATE, std::move(child),
id, paramsString},
aggregateVectorsPos{std::move(aggregateVectorsPos)}, aggregateFunctions{
std::move(aggregateFunctions)} {}

PhysicalOperatorType getOperatorType() override { return AGGREGATE; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

void finalize(ExecutionContext* context) override = 0;
Expand Down
13 changes: 7 additions & 6 deletions src/include/processor/operator/aggregate/base_aggregate_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ class BaseAggregateScan : public PhysicalOperator {
public:
BaseAggregateScan(vector<DataPos> aggregatesPos, vector<DataType> aggregateDataTypes,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: PhysicalOperator{move(child), id, paramsString}, aggregatesPos{move(aggregatesPos)},
aggregateDataTypes{move(aggregateDataTypes)} {}
: PhysicalOperator{PhysicalOperatorType::AGGREGATE_SCAN, std::move(child), id,
paramsString},
aggregatesPos{std::move(aggregatesPos)}, aggregateDataTypes{
std::move(aggregateDataTypes)} {}

BaseAggregateScan(vector<DataPos> aggregatesPos, vector<DataType> aggregateDataTypes,
uint32_t id, const string& paramsString)
: PhysicalOperator{id, paramsString}, aggregatesPos{move(aggregatesPos)},
aggregateDataTypes{move(aggregateDataTypes)} {}

PhysicalOperatorType getOperatorType() override { return AGGREGATE_SCAN; }
: PhysicalOperator{PhysicalOperatorType::AGGREGATE_SCAN, id, paramsString},
aggregatesPos{std::move(aggregatesPos)}, aggregateDataTypes{
std::move(aggregateDataTypes)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

Expand Down
9 changes: 5 additions & 4 deletions src/include/processor/operator/base_extend.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ namespace processor {

class BaseExtendAndScanRelProperties : public PhysicalOperator {
protected:
BaseExtendAndScanRelProperties(const DataPos& inNodeIDVectorPos,
const DataPos& outNodeIDVectorPos, vector<DataPos> outPropertyVectorsPos,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: PhysicalOperator{std::move(child), id, paramsString},
BaseExtendAndScanRelProperties(PhysicalOperatorType operatorType,
const DataPos& inNodeIDVectorPos, const DataPos& outNodeIDVectorPos,
vector<DataPos> outPropertyVectorsPos, unique_ptr<PhysicalOperator> child, uint32_t id,
const string& paramsString)
: PhysicalOperator{operatorType, std::move(child), id, paramsString},
inNodeIDVectorPos{inNodeIDVectorPos}, outNodeIDVectorPos{outNodeIDVectorPos},
outPropertyVectorsPos{std::move(outPropertyVectorsPos)} {}
virtual ~BaseExtendAndScanRelProperties() override = default;
Expand Down
11 changes: 6 additions & 5 deletions src/include/processor/operator/copy_csv/copy_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ namespace processor {

class CopyCSV : public PhysicalOperator {
public:
CopyCSV(Catalog* catalog, CSVDescription csvDescription, TableSchema tableSchema, WAL* wal,
uint32_t id, const string& paramsString)
: PhysicalOperator{id, paramsString}, catalog{catalog},
csvDescription{move(csvDescription)}, tableSchema{move(tableSchema)}, wal{wal} {}
virtual ~CopyCSV() = default;
CopyCSV(PhysicalOperatorType operatorType, Catalog* catalog, CSVDescription csvDescription,
TableSchema tableSchema, WAL* wal, uint32_t id, const string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, catalog{catalog},
csvDescription{std::move(csvDescription)}, tableSchema{std::move(tableSchema)}, wal{wal} {
}
virtual ~CopyCSV() override = default;

virtual string execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext) = 0;

Expand Down
6 changes: 2 additions & 4 deletions src/include/processor/operator/copy_csv/copy_node_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,15 @@ namespace kuzu {
namespace processor {

class CopyNodeCSV : public CopyCSV {

public:
CopyNodeCSV(Catalog* catalog, CSVDescription csvDescription, TableSchema tableSchema, WAL* wal,
uint32_t id, const string& paramsString, NodesStore* nodesStore)
: CopyCSV(catalog, move(csvDescription), move(tableSchema), wal, id, paramsString),
: CopyCSV{PhysicalOperatorType::COPY_NODE_CSV, catalog, std::move(csvDescription),
std::move(tableSchema), wal, id, paramsString},
nodesStore{nodesStore} {}

string execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;

PhysicalOperatorType getOperatorType() override { return COPY_NODE_CSV; }

unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyNodeCSV>(
catalog, csvDescription, tableSchema, wal, id, paramsString, nodesStore);
Expand Down
6 changes: 2 additions & 4 deletions src/include/processor/operator/copy_csv/copy_rel_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ namespace kuzu {
namespace processor {

class CopyRelCSV : public CopyCSV {

public:
CopyRelCSV(Catalog* catalog, CSVDescription csvDescription, TableSchema tableSchema, WAL* wal,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs, uint32_t id,
const string& paramsString, RelsStatistics* relsStatistics)
: CopyCSV(catalog, move(csvDescription), move(tableSchema), wal, id, paramsString),
: CopyCSV{PhysicalOperatorType::COPY_REL_CSV, catalog, move(csvDescription),
move(tableSchema), wal, id, paramsString},
nodesStatisticsAndDeletedIDs{nodesStatisticsAndDeletedIDs}, relsStatistics{
relsStatistics} {}

string execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;

PhysicalOperatorType getOperatorType() override { return COPY_REL_CSV; }

unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyRelCSV>(catalog, csvDescription, tableSchema, wal,
nodesStatisticsAndDeletedIDs, id, paramsString, relsStatistics);
Expand Down
10 changes: 5 additions & 5 deletions src/include/processor/operator/cross_product.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ class CrossProduct : public PhysicalOperator {
CrossProduct(shared_ptr<FTableSharedState> sharedState, vector<DataPos> outVecPos,
vector<uint32_t> colIndicesToScan, unique_ptr<PhysicalOperator> probeChild,
unique_ptr<PhysicalOperator> buildChild, uint32_t id, const string& paramsString)
: PhysicalOperator{std::move(probeChild), std::move(buildChild), id, paramsString},
: PhysicalOperator{PhysicalOperatorType::CROSS_PRODUCT, std::move(probeChild),
std::move(buildChild), id, paramsString},
sharedState{std::move(sharedState)}, outVecPos{std::move(outVecPos)},
colIndicesToScan{std::move(colIndicesToScan)} {}

// Clone only.
CrossProduct(shared_ptr<FTableSharedState> sharedState, vector<DataPos> outVecPos,
vector<uint32_t> colIndicesToScan, unique_ptr<PhysicalOperator> child, uint32_t id,
const string& paramsString)
: PhysicalOperator{std::move(child), id, paramsString}, sharedState{std::move(sharedState)},
outVecPos{std::move(outVecPos)}, colIndicesToScan{std::move(colIndicesToScan)} {}

PhysicalOperatorType getOperatorType() override { return PhysicalOperatorType::CROSS_PRODUCT; }
: PhysicalOperator{PhysicalOperatorType::CROSS_PRODUCT, std::move(child), id, paramsString},
sharedState{std::move(sharedState)}, outVecPos{std::move(outVecPos)},
colIndicesToScan{std::move(colIndicesToScan)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

Expand Down
6 changes: 2 additions & 4 deletions src/include/processor/operator/ddl/create_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,15 @@ namespace kuzu {
namespace processor {

class CreateNodeTable : public CreateTable {

public:
CreateNodeTable(Catalog* catalog, string tableName,
vector<PropertyNameDataType> propertyNameDataTypes, uint32_t primaryKeyIdx, uint32_t id,
const string& paramsString, NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs)
: CreateTable{catalog, move(tableName), move(propertyNameDataTypes), id, paramsString},
: CreateTable{PhysicalOperatorType::CREATE_NODE_TABLE, catalog, std::move(tableName),
std::move(propertyNameDataTypes), id, paramsString},
primaryKeyIdx{primaryKeyIdx}, nodesStatisticsAndDeletedIDs{nodesStatisticsAndDeletedIDs} {
}

PhysicalOperatorType getOperatorType() override { return CREATE_NODE_TABLE; }

string execute() override;

unique_ptr<PhysicalOperator> clone() override {
Expand Down
8 changes: 3 additions & 5 deletions src/include/processor/operator/ddl/create_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,16 @@ namespace kuzu {
namespace processor {

class CreateRelTable : public CreateTable {

public:
CreateRelTable(Catalog* catalog, string tableName,
vector<PropertyNameDataType> propertyNameDataTypes, RelMultiplicity relMultiplicity,
vector<pair<table_id_t, table_id_t>> srcDstTableIDs, uint32_t id,
const string& paramsString, RelsStatistics* relsStatistics)
: CreateTable{catalog, move(tableName), move(propertyNameDataTypes), id, paramsString},
relMultiplicity{relMultiplicity}, srcDstTableIDs{move(srcDstTableIDs)},
: CreateTable{PhysicalOperatorType::CREATE_REL_TABLE, catalog, std::move(tableName),
std::move(propertyNameDataTypes), id, paramsString},
relMultiplicity{relMultiplicity}, srcDstTableIDs{std::move(srcDstTableIDs)},
relsStatistics{relsStatistics} {}

PhysicalOperatorType getOperatorType() override { return CREATE_REL_TABLE; }

string execute() override;

unique_ptr<PhysicalOperator> clone() override {
Expand Down
9 changes: 4 additions & 5 deletions src/include/processor/operator/ddl/create_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ namespace kuzu {
namespace processor {

class CreateTable : public DDL {

public:
CreateTable(Catalog* catalog, string tableName,
CreateTable(PhysicalOperatorType operatorType, Catalog* catalog, string tableName,
vector<PropertyNameDataType> propertyNameDataTypes, uint32_t id, const string& paramsString)
: DDL{catalog, id, paramsString}, tableName{move(tableName)}, propertyNameDataTypes{move(
propertyNameDataTypes)} {}
: DDL{operatorType, catalog, id, paramsString}, tableName{std::move(tableName)},
propertyNameDataTypes{move(propertyNameDataTypes)} {}

virtual ~CreateTable() = default;
virtual ~CreateTable() override = default;

protected:
string tableName;
Expand Down
8 changes: 4 additions & 4 deletions src/include/processor/operator/ddl/ddl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ namespace kuzu {
namespace processor {

class DDL : public PhysicalOperator {

public:
DDL(Catalog* catalog, uint32_t id, const string& paramsString)
: PhysicalOperator{id, paramsString}, catalog{catalog} {}
virtual ~DDL() = default;
DDL(PhysicalOperatorType operatorType, Catalog* catalog, uint32_t id,
const string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, catalog{catalog} {}
virtual ~DDL() override = default;

virtual string execute() = 0;

Expand Down
7 changes: 2 additions & 5 deletions src/include/processor/operator/ddl/drop_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ namespace kuzu {
namespace processor {

class DropTable : public DDL {

public:
DropTable(Catalog* catalog, TableSchema* tableSchema, StorageManager& storageManager,
uint32_t id, const string& paramsString)
: DDL{catalog, id, paramsString}, tableSchema{tableSchema}, storageManager{storageManager} {
}

PhysicalOperatorType getOperatorType() override { return DROP_TABLE; }
: DDL{PhysicalOperatorType::DROP_TABLE, catalog, id, paramsString},
tableSchema{tableSchema}, storageManager{storageManager} {}

string execute() override {
catalog->removeTableSchema(tableSchema);
Expand Down
5 changes: 1 addition & 4 deletions src/include/processor/operator/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,14 @@ namespace kuzu {
namespace processor {

class Filter : public PhysicalOperator, public FilteringOperator {

public:
Filter(unique_ptr<BaseExpressionEvaluator> expressionEvaluator, uint32_t dataChunkToSelectPos,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: PhysicalOperator{std::move(child), id, paramsString},
: PhysicalOperator{PhysicalOperatorType::FILTER, std::move(child), id, paramsString},
FilteringOperator{1 /* numStatesToSave */}, expressionEvaluator{std::move(
expressionEvaluator)},
dataChunkToSelectPos(dataChunkToSelectPos) {}

PhysicalOperatorType getOperatorType() override { return FILTER; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
Expand Down
7 changes: 2 additions & 5 deletions src/include/processor/operator/flatten.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@ namespace kuzu {
namespace processor {

class Flatten : public PhysicalOperator {

public:
Flatten(uint32_t dataChunkToFlattenPos, unique_ptr<PhysicalOperator> child, uint32_t id,
const string& paramsString)
: PhysicalOperator{std::move(child), id, paramsString}, dataChunkToFlattenPos{
dataChunkToFlattenPos} {}

PhysicalOperatorType getOperatorType() override { return FLATTEN; }
: PhysicalOperator{PhysicalOperatorType::FLATTEN, std::move(child), id, paramsString},
dataChunkToFlattenPos{dataChunkToFlattenPos} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

Expand Down
9 changes: 3 additions & 6 deletions src/include/processor/operator/generic_extend.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,12 @@ class GenericExtendAndScanRelProperties : public BaseExtendAndScanRelProperties
unordered_map<table_id_t, unique_ptr<AdjAndPropertyCollection>>
adjAndPropertyCollectionPerNodeTable,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: BaseExtendAndScanRelProperties{inNodeIDVectorPos, outNodeIDVectorPos,
std::move(outPropertyVectorsPos), std::move(child), id, paramsString},
: BaseExtendAndScanRelProperties{PhysicalOperatorType::GENERIC_EXTEND, inNodeIDVectorPos,
outNodeIDVectorPos, std::move(outPropertyVectorsPos), std::move(child), id,
paramsString},
adjAndPropertyCollectionPerNodeTable{std::move(adjAndPropertyCollectionPerNodeTable)} {}
~GenericExtendAndScanRelProperties() override = default;

inline PhysicalOperatorType getOperatorType() override {
return PhysicalOperatorType::GENERIC_EXTEND;
}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
Expand Down
10 changes: 7 additions & 3 deletions src/include/processor/operator/hash_join/hash_join_build.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,16 @@ class HashJoinBuild : public Sink {
HashJoinBuild(unique_ptr<ResultSetDescriptor> resultSetDescriptor,
shared_ptr<HashJoinSharedState> sharedState, const BuildDataInfo& buildDataInfo,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: Sink{std::move(resultSetDescriptor), std::move(child), id, paramsString},
: HashJoinBuild{std::move(resultSetDescriptor), PhysicalOperatorType::HASH_JOIN_BUILD,
std::move(sharedState), buildDataInfo, std::move(child), id, paramsString} {}
HashJoinBuild(unique_ptr<ResultSetDescriptor> resultSetDescriptor,
PhysicalOperatorType operatorType, shared_ptr<HashJoinSharedState> sharedState,
const BuildDataInfo& buildDataInfo, unique_ptr<PhysicalOperator> child, uint32_t id,
const string& paramsString)
: Sink{std::move(resultSetDescriptor), operatorType, std::move(child), id, paramsString},
sharedState{std::move(sharedState)}, buildDataInfo{buildDataInfo} {}
~HashJoinBuild() override = default;

inline PhysicalOperatorType getOperatorType() override { return HASH_JOIN_BUILD; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

void executeInternal(ExecutionContext* context) override;
Expand Down
8 changes: 4 additions & 4 deletions src/include/processor/operator/hash_join/hash_join_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class HashJoinProbe : public PhysicalOperator, FilteringOperator {
HashJoinProbe(shared_ptr<HashJoinSharedState> sharedState, JoinType joinType,
const ProbeDataInfo& probeDataInfo, unique_ptr<PhysicalOperator> probeChild,
unique_ptr<PhysicalOperator> buildChild, uint32_t id, const string& paramsString)
: PhysicalOperator{std::move(probeChild), std::move(buildChild), id, paramsString},
: PhysicalOperator{PhysicalOperatorType::HASH_JOIN_PROBE, std::move(probeChild),
std::move(buildChild), id, paramsString},
FilteringOperator{probeDataInfo.keysDataPos.size()},
sharedState{std::move(sharedState)}, joinType{joinType}, probeDataInfo{probeDataInfo} {}

Expand All @@ -60,12 +61,11 @@ class HashJoinProbe : public PhysicalOperator, FilteringOperator {
HashJoinProbe(shared_ptr<HashJoinSharedState> sharedState, JoinType joinType,
const ProbeDataInfo& probeDataInfo, unique_ptr<PhysicalOperator> probeChild, uint32_t id,
const string& paramsString)
: PhysicalOperator{std::move(probeChild), id, paramsString},
: PhysicalOperator{PhysicalOperatorType::HASH_JOIN_PROBE, std::move(probeChild), id,
paramsString},
FilteringOperator{probeDataInfo.keysDataPos.size()},
sharedState{std::move(sharedState)}, joinType{joinType}, probeDataInfo{probeDataInfo} {}

inline PhysicalOperatorType getOperatorType() override { return HASH_JOIN_PROBE; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
Expand Down
7 changes: 3 additions & 4 deletions src/include/processor/operator/index_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ class IndexScan : public PhysicalOperator {
IndexScan(table_id_t tableID, PrimaryKeyIndex* pkIndex,
unique_ptr<BaseExpressionEvaluator> indexKeyEvaluator, const DataPos& outDataPos,
uint32_t id, const string& paramsString)
: PhysicalOperator{id, paramsString}, tableID{tableID}, pkIndex{pkIndex},
indexKeyEvaluator{std::move(indexKeyEvaluator)}, outDataPos{outDataPos} {}

PhysicalOperatorType getOperatorType() override { return PhysicalOperatorType::INDEX_SCAN; }
: PhysicalOperator{PhysicalOperatorType::INDEX_SCAN, id, paramsString}, tableID{tableID},
pkIndex{pkIndex}, indexKeyEvaluator{std::move(indexKeyEvaluator)}, outDataPos{
outDataPos} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

Expand Down
7 changes: 3 additions & 4 deletions src/include/processor/operator/intersect/intersect.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ class Intersect : public PhysicalOperator {
Intersect(const DataPos& outputDataPos, vector<IntersectDataInfo> intersectDataInfos,
vector<shared_ptr<IntersectSharedState>> sharedHTs,
vector<unique_ptr<PhysicalOperator>> children, uint32_t id, const string& paramsString)
: PhysicalOperator{move(children), id, paramsString}, outputDataPos{outputDataPos},
intersectDataInfos{move(intersectDataInfos)}, sharedHTs{move(sharedHTs)} {}

inline PhysicalOperatorType getOperatorType() override { return INTERSECT; }
: PhysicalOperator{PhysicalOperatorType::INTERSECT, std::move(children), id, paramsString},
outputDataPos{outputDataPos},
intersectDataInfos{std::move(intersectDataInfos)}, sharedHTs{std::move(sharedHTs)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

Expand Down
Loading

0 comments on commit de3b589

Please sign in to comment.