Skip to content

Commit

Permalink
add isSource & isSink api to physical operator
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Dec 22, 2022
1 parent e8d0da8 commit 84b11e1
Show file tree
Hide file tree
Showing 17 changed files with 86 additions and 112 deletions.
1 change: 1 addition & 0 deletions src/include/main/plan_printer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "common/profiler.h"
#include "planner/logical_plan/logical_plan.h"
#include "processor/physical_plan.h"
#include <json.hpp>

using namespace kuzu::planner;
using namespace kuzu::processor;
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/data_pos.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <cstdint>
#include <utility>

namespace kuzu {
namespace processor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ namespace kuzu {
namespace processor {

class BaseAggregateScan : public PhysicalOperator {

public:
BaseAggregateScan(vector<DataPos> aggregatesPos, vector<DataType> aggregateDataTypes,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
Expand All @@ -24,16 +23,14 @@ class BaseAggregateScan : public PhysicalOperator {
aggregatesPos{std::move(aggregatesPos)}, aggregateDataTypes{
std::move(aggregateDataTypes)} {}

bool isSource() const override { return true; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override = 0;

unique_ptr<PhysicalOperator> clone() override = 0;

inline double getExecutionTime(Profiler& profiler) const override {
return profiler.sumAllTimeMetricsWithKey(getTimeMetricKey());
}

protected:
void writeAggregateResultToVector(
ValueVector& vector, uint64_t pos, AggregateState* aggregateState);
Expand Down
20 changes: 10 additions & 10 deletions src/include/processor/operator/aggregate/hash_aggregate_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@ namespace kuzu {
namespace processor {

class HashAggregateScan : public BaseAggregateScan {

public:
HashAggregateScan(shared_ptr<HashAggregateSharedState> sharedState,
vector<DataPos> groupByKeyVectorsPos, vector<DataType> groupByKeyVectorDataTypes,
vector<DataPos> aggregatesPos, vector<DataType> aggregateDataTypes,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: BaseAggregateScan{move(aggregatesPos), move(aggregateDataTypes), move(child), id,
paramsString},
groupByKeyVectorsPos{move(groupByKeyVectorsPos)},
groupByKeyVectorDataTypes{move(groupByKeyVectorDataTypes)}, sharedState{
move(sharedState)} {}
: BaseAggregateScan{std::move(aggregatesPos), std::move(aggregateDataTypes),
std::move(child), id, paramsString},
groupByKeyVectorsPos{std::move(groupByKeyVectorsPos)},
groupByKeyVectorDataTypes{std::move(groupByKeyVectorDataTypes)}, sharedState{std::move(
sharedState)} {}

HashAggregateScan(shared_ptr<HashAggregateSharedState> sharedState,
vector<DataPos> groupByKeyVectorsPos, vector<DataType> groupByKeyVectorDataTypes,
vector<DataPos> aggregatesPos, vector<DataType> aggregateDataTypes, uint32_t id,
const string& paramsString)
: BaseAggregateScan{move(aggregatesPos), move(aggregateDataTypes), id, paramsString},
groupByKeyVectorsPos{move(groupByKeyVectorsPos)},
groupByKeyVectorDataTypes{move(groupByKeyVectorDataTypes)}, sharedState{
move(sharedState)} {}
: BaseAggregateScan{std::move(aggregatesPos), std::move(aggregateDataTypes), id,
paramsString},
groupByKeyVectorsPos{std::move(groupByKeyVectorsPos)},
groupByKeyVectorDataTypes{std::move(groupByKeyVectorDataTypes)}, sharedState{std::move(
sharedState)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/copy_csv/copy_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class CopyCSV : public PhysicalOperator {
}
virtual ~CopyCSV() override = default;

inline bool isSource() const override { return true; }

virtual string execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext) = 0;

bool getNextTuplesInternal() override {
Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/ddl/ddl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class DDL : public PhysicalOperator {
: PhysicalOperator{operatorType, id, paramsString}, catalog{catalog} {}
virtual ~DDL() override = default;

inline bool isSource() const override { return true; }

virtual string execute() = 0;

bool getNextTuplesInternal() override {
Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/index_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class IndexScan : public PhysicalOperator {
pkIndex{pkIndex}, indexKeyEvaluator{std::move(indexKeyEvaluator)}, outDataPos{
outDataPos} {}

bool isSource() const override { return true; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
Expand Down
6 changes: 2 additions & 4 deletions src/include/processor/operator/order_by/order_by_merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class OrderByMerge : public Sink {
paramsString},
sharedState{std::move(sharedState)}, sharedDispatcher{std::move(sharedDispatcher)} {}

inline bool isSource() const override { return true; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

void executeInternal(ExecutionContext* context) override;
Expand All @@ -39,10 +41,6 @@ class OrderByMerge : public Sink {
return make_unique<OrderByMerge>(sharedState, sharedDispatcher, id, paramsString);
}

inline double getExecutionTime(Profiler& profiler) const override {
return profiler.sumAllTimeMetricsWithKey(getTimeMetricKey());
}

private:
void initGlobalStateInternal(ExecutionContext* context) override;

Expand Down
6 changes: 2 additions & 4 deletions src/include/processor/operator/order_by/order_by_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class OrderByScan : public PhysicalOperator {
: PhysicalOperator{PhysicalOperatorType::ORDER_BY_SCAN, id, paramsString},
outVectorPos{std::move(outVectorPos)}, sharedState{std::move(sharedState)} {}

inline bool isSource() const override { return true; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
Expand All @@ -41,10 +43,6 @@ class OrderByScan : public PhysicalOperator {
return make_unique<OrderByScan>(outVectorPos, sharedState, id, paramsString);
}

inline double getExecutionTime(Profiler& profiler) const override {
return profiler.sumAllTimeMetricsWithKey(getTimeMetricKey());
}

private:
void initMergedKeyBlockScanState();

Expand Down
26 changes: 11 additions & 15 deletions src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once

#include "json.hpp"
#include "processor/data_pos.h"
#include "processor/execution_context.h"
#include "processor/result/result_set.h"
Expand Down Expand Up @@ -88,12 +87,17 @@ class PhysicalOperator {

inline uint32_t getOperatorID() const { return id; }

inline PhysicalOperatorType getOperatorType() const { return operatorType; }

inline virtual bool isSource() const { return false; }
inline virtual bool isSink() const { return false; }

inline void addChild(unique_ptr<PhysicalOperator> op) { children.push_back(std::move(op)); }
inline PhysicalOperator* getChild(uint64_t idx) const { return children[idx].get(); }
inline uint64_t getNumChildren() const { return children.size(); }
unique_ptr<PhysicalOperator> moveUnaryChild();

inline PhysicalOperatorType getOperatorType() const { return operatorType; }
inline string getParamsString() const { return paramsString; }

// Global state is initialized once.
void initGlobalState(ExecutionContext* context);
Expand All @@ -107,19 +111,10 @@ class PhysicalOperator {
return result;
}

virtual unique_ptr<PhysicalOperator> clone() = 0;

virtual void printMetricsToJson(nlohmann::json& json, Profiler& profiler);
unordered_map<string, string> getProfilerKeyValAttributes(Profiler& profiler) const;
vector<string> getProfilerAttributes(Profiler& profiler) const;

virtual double getExecutionTime(Profiler& profiler) const;

inline uint64_t getNumOutputTuples(Profiler& profiler) const {
return profiler.sumAllNumericMetricsWithKey(getNumTupleMetricKey());
}

vector<string> getAttributes(Profiler& profiler) const;

inline string getParamsString() const { return paramsString; }
virtual unique_ptr<PhysicalOperator> clone() = 0;

protected:
virtual void initGlobalStateInternal(ExecutionContext* context) {}
Expand All @@ -132,7 +127,8 @@ class PhysicalOperator {

void registerProfilingMetrics(Profiler* profiler);

void printTimeAndNumOutputMetrics(nlohmann::json& json, Profiler& profiler);
double getExecutionTime(Profiler& profiler) const;
uint64_t getNumOutputTuples(Profiler& profiler) const;

protected:
uint32_t id;
Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/scan_node_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ class ScanNodeID : public PhysicalOperator {
nodeName{std::move(nodeName)}, outDataPos{outDataPos}, sharedState{
std::move(sharedState)} {}

bool isSource() const override { return true; }

inline string getNodeName() const { return nodeName; }
inline ScanNodeIDSharedState* getSharedState() const { return sharedState.get(); }

Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class Sink : public PhysicalOperator {
: PhysicalOperator{operatorType, std::move(child), id, paramsString},
resultSetDescriptor{std::move(resultSetDescriptor)} {}

inline bool isSink() const override { return true; }

ResultSetDescriptor* getResultSetDescriptor() { return resultSetDescriptor.get(); }

inline void execute(ResultSet* resultSet, ExecutionContext* context) {
Expand Down
6 changes: 2 additions & 4 deletions src/include/processor/operator/table_scan/base_table_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,15 @@ class BaseTableScan : public PhysicalOperator {
outVecPositions{std::move(outVecPositions)}, colIndicesToScan{
std::move(colIndicesToScan)} {}

inline bool isSource() const override { return true; }

virtual void setMaxMorselSize() = 0;
virtual unique_ptr<FTableScanMorsel> getMorsel() = 0;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;

inline double getExecutionTime(Profiler& profiler) const override {
return profiler.sumAllTimeMetricsWithKey(getTimeMetricKey());
}

protected:
uint64_t maxMorselSize;
vector<DataPos> outVecPositions;
Expand Down
1 change: 0 additions & 1 deletion src/include/processor/operator/table_scan/union_all_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class UnionAllScanSharedState {
};

class UnionAllScan : public BaseTableScan {

public:
UnionAllScan(vector<DataPos> outVecPositions, vector<uint32_t> colIndicesToScan,
shared_ptr<UnionAllScanSharedState> sharedState,
Expand Down
17 changes: 8 additions & 9 deletions src/main/plan_printer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void OpProfileTree::calculateNumRowsAndColsForOp(
uint32_t OpProfileTree::fillOpProfileBoxes(PhysicalOperator* op, uint32_t rowIdx, uint32_t colIdx,
uint32_t& maxFieldWidth, Profiler& profiler) {
auto opProfileBox = make_unique<OpProfileBox>(PlanPrinter::getOperatorName(op),
PlanPrinter::getOperatorParams(op), op->getAttributes(profiler));
PlanPrinter::getOperatorParams(op), op->getProfilerAttributes(profiler));
maxFieldWidth = max(opProfileBox->getAttributeMaxLen(), maxFieldWidth);
insertOpProfileBox(rowIdx, colIdx, move(opProfileBox));
if (!op->getNumChildren()) {
Expand Down Expand Up @@ -290,15 +290,14 @@ uint32_t OpProfileTree::calculateRowHeight(uint32_t rowIdx) const {

nlohmann::json PlanPrinter::toJson(PhysicalOperator* physicalOperator, Profiler& profiler) {
auto json = nlohmann::json();
json["name"] = getOperatorName(physicalOperator);
if (physicalOperator->getNumChildren()) {
json["prev"] = toJson(physicalOperator->getChild(0), profiler);
}
if (physicalOperator->getNumChildren() > 1) {
json["right"] = toJson(physicalOperator->getChild(1), profiler);
}
json["Name"] = getOperatorName(physicalOperator);
if (profiler.enabled) {
physicalOperator->printMetricsToJson(json, profiler);
for (auto& [key, val] : physicalOperator->getProfilerKeyValAttributes(profiler)) {
json[key] = val;
}
}
for (auto i = 0u; i < physicalOperator->getNumChildren(); ++i) {
json["Child"] = toJson(physicalOperator->getChild(i), profiler);
}
return json;
}
Expand Down
41 changes: 18 additions & 23 deletions src/processor/operator/physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,39 +184,34 @@ void PhysicalOperator::initLocalState(ResultSet* _resultSet, ExecutionContext* c
void PhysicalOperator::registerProfilingMetrics(Profiler* profiler) {
auto executionTime = profiler->registerTimeMetric(getTimeMetricKey());
auto numOutputTuple = profiler->registerNumericMetric(getNumTupleMetricKey());

metrics = make_unique<OperatorMetrics>(*executionTime, *numOutputTuple);
}

void PhysicalOperator::printMetricsToJson(nlohmann::json& json, Profiler& profiler) {
printTimeAndNumOutputMetrics(json, profiler);
double PhysicalOperator::getExecutionTime(Profiler& profiler) const {
auto executionTime = profiler.sumAllTimeMetricsWithKey(getTimeMetricKey());
if (!isSource()) {
executionTime -= profiler.sumAllTimeMetricsWithKey(children[0]->getTimeMetricKey());
}
return executionTime;
}

void PhysicalOperator::printTimeAndNumOutputMetrics(nlohmann::json& json, Profiler& profiler) {
double prevExecutionTime = 0.0;
if (getNumChildren()) {
prevExecutionTime = profiler.sumAllTimeMetricsWithKey(children[0]->getTimeMetricKey());
}
// Time metric measures execution time of the subplan under current operator (like a CDF).
// By subtracting prevOperator runtime, we get the runtime of current operator
auto executionTime = profiler.sumAllTimeMetricsWithKey(getTimeMetricKey()) - prevExecutionTime;
auto numOutputTuples = profiler.sumAllNumericMetricsWithKey(getNumTupleMetricKey());
json["executionTime"] = to_string(executionTime);
json["numOutputTuples"] = numOutputTuples;
uint64_t PhysicalOperator::getNumOutputTuples(Profiler& profiler) const {
return profiler.sumAllNumericMetricsWithKey(getNumTupleMetricKey());
}

double PhysicalOperator::getExecutionTime(Profiler& profiler) const {
double prevExecutionTime = 0.0;
for (auto i = 0u; i < getNumChildren(); i++) {
prevExecutionTime += profiler.sumAllTimeMetricsWithKey(children[i]->getTimeMetricKey());
}
return profiler.sumAllTimeMetricsWithKey(getTimeMetricKey()) - prevExecutionTime;
unordered_map<string, string> PhysicalOperator::getProfilerKeyValAttributes(
Profiler& profiler) const {
unordered_map<string, string> result;
result.insert({"ExecutionTime", to_string(getExecutionTime(profiler))});
result.insert({"NumOutputTuples", to_string(getNumOutputTuples(profiler))});
return result;
}

vector<string> PhysicalOperator::getAttributes(Profiler& profiler) const {
vector<string> PhysicalOperator::getProfilerAttributes(Profiler& profiler) const {
vector<string> result;
result.emplace_back("ExecutionTime: " + to_string(getExecutionTime(profiler)));
result.emplace_back("NumOutputTuples: " + to_string(getNumOutputTuples(profiler)));
for (auto& [key, val] : getProfilerKeyValAttributes(profiler)) {
result.emplace_back(key + ": " + val);
}
return result;
}

Expand Down
Loading

0 comments on commit 84b11e1

Please sign in to comment.