Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source sink op interface #1127

Merged
merged 1 commit into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/include/main/plan_printer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "common/profiler.h"
#include "planner/logical_plan/logical_plan.h"
#include "processor/physical_plan.h"
#include <json.hpp>

using namespace kuzu::planner;
using namespace kuzu::processor;
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/data_pos.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <cstdint>
#include <utility>
andyfengHKU marked this conversation as resolved.
Show resolved Hide resolved

namespace kuzu {
namespace processor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ namespace kuzu {
namespace processor {

class BaseAggregateScan : public PhysicalOperator {

public:
BaseAggregateScan(vector<DataPos> aggregatesPos, vector<DataType> aggregateDataTypes,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
Expand All @@ -24,16 +23,14 @@ class BaseAggregateScan : public PhysicalOperator {
aggregatesPos{std::move(aggregatesPos)}, aggregateDataTypes{
std::move(aggregateDataTypes)} {}

bool isSource() const override { return true; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override = 0;

unique_ptr<PhysicalOperator> clone() override = 0;

inline double getExecutionTime(Profiler& profiler) const override {
return profiler.sumAllTimeMetricsWithKey(getTimeMetricKey());
}

protected:
void writeAggregateResultToVector(
ValueVector& vector, uint64_t pos, AggregateState* aggregateState);
Expand Down
20 changes: 10 additions & 10 deletions src/include/processor/operator/aggregate/hash_aggregate_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@ namespace kuzu {
namespace processor {

class HashAggregateScan : public BaseAggregateScan {

public:
HashAggregateScan(shared_ptr<HashAggregateSharedState> sharedState,
vector<DataPos> groupByKeyVectorsPos, vector<DataType> groupByKeyVectorDataTypes,
vector<DataPos> aggregatesPos, vector<DataType> aggregateDataTypes,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: BaseAggregateScan{move(aggregatesPos), move(aggregateDataTypes), move(child), id,
paramsString},
groupByKeyVectorsPos{move(groupByKeyVectorsPos)},
groupByKeyVectorDataTypes{move(groupByKeyVectorDataTypes)}, sharedState{
move(sharedState)} {}
: BaseAggregateScan{std::move(aggregatesPos), std::move(aggregateDataTypes),
std::move(child), id, paramsString},
groupByKeyVectorsPos{std::move(groupByKeyVectorsPos)},
groupByKeyVectorDataTypes{std::move(groupByKeyVectorDataTypes)}, sharedState{std::move(
sharedState)} {}

HashAggregateScan(shared_ptr<HashAggregateSharedState> sharedState,
vector<DataPos> groupByKeyVectorsPos, vector<DataType> groupByKeyVectorDataTypes,
vector<DataPos> aggregatesPos, vector<DataType> aggregateDataTypes, uint32_t id,
const string& paramsString)
: BaseAggregateScan{move(aggregatesPos), move(aggregateDataTypes), id, paramsString},
groupByKeyVectorsPos{move(groupByKeyVectorsPos)},
groupByKeyVectorDataTypes{move(groupByKeyVectorDataTypes)}, sharedState{
move(sharedState)} {}
: BaseAggregateScan{std::move(aggregatesPos), std::move(aggregateDataTypes), id,
paramsString},
groupByKeyVectorsPos{std::move(groupByKeyVectorsPos)},
groupByKeyVectorDataTypes{std::move(groupByKeyVectorDataTypes)}, sharedState{std::move(
sharedState)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/copy_csv/copy_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class CopyCSV : public PhysicalOperator {
}
virtual ~CopyCSV() override = default;

inline bool isSource() const override { return true; }

virtual string execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext) = 0;

bool getNextTuplesInternal() override {
Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/ddl/ddl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class DDL : public PhysicalOperator {
: PhysicalOperator{operatorType, id, paramsString}, catalog{catalog} {}
virtual ~DDL() override = default;

inline bool isSource() const override { return true; }

virtual string execute() = 0;

bool getNextTuplesInternal() override {
Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/index_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class IndexScan : public PhysicalOperator {
pkIndex{pkIndex}, indexKeyEvaluator{std::move(indexKeyEvaluator)}, outDataPos{
outDataPos} {}

bool isSource() const override { return true; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
Expand Down
6 changes: 2 additions & 4 deletions src/include/processor/operator/order_by/order_by_merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class OrderByMerge : public Sink {
paramsString},
sharedState{std::move(sharedState)}, sharedDispatcher{std::move(sharedDispatcher)} {}

inline bool isSource() const override { return true; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

void executeInternal(ExecutionContext* context) override;
Expand All @@ -39,10 +41,6 @@ class OrderByMerge : public Sink {
return make_unique<OrderByMerge>(sharedState, sharedDispatcher, id, paramsString);
}

inline double getExecutionTime(Profiler& profiler) const override {
return profiler.sumAllTimeMetricsWithKey(getTimeMetricKey());
}

private:
void initGlobalStateInternal(ExecutionContext* context) override;

Expand Down
6 changes: 2 additions & 4 deletions src/include/processor/operator/order_by/order_by_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class OrderByScan : public PhysicalOperator {
: PhysicalOperator{PhysicalOperatorType::ORDER_BY_SCAN, id, paramsString},
outVectorPos{std::move(outVectorPos)}, sharedState{std::move(sharedState)} {}

inline bool isSource() const override { return true; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
Expand All @@ -41,10 +43,6 @@ class OrderByScan : public PhysicalOperator {
return make_unique<OrderByScan>(outVectorPos, sharedState, id, paramsString);
}

inline double getExecutionTime(Profiler& profiler) const override {
return profiler.sumAllTimeMetricsWithKey(getTimeMetricKey());
}

private:
void initMergedKeyBlockScanState();

Expand Down
26 changes: 11 additions & 15 deletions src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once

#include "json.hpp"
#include "processor/data_pos.h"
#include "processor/execution_context.h"
#include "processor/result/result_set.h"
Expand Down Expand Up @@ -88,12 +87,17 @@ class PhysicalOperator {

inline uint32_t getOperatorID() const { return id; }

inline PhysicalOperatorType getOperatorType() const { return operatorType; }

inline virtual bool isSource() const { return false; }
inline virtual bool isSink() const { return false; }

inline void addChild(unique_ptr<PhysicalOperator> op) { children.push_back(std::move(op)); }
inline PhysicalOperator* getChild(uint64_t idx) const { return children[idx].get(); }
inline uint64_t getNumChildren() const { return children.size(); }
unique_ptr<PhysicalOperator> moveUnaryChild();

inline PhysicalOperatorType getOperatorType() const { return operatorType; }
inline string getParamsString() const { return paramsString; }

// Global state is initialized once.
void initGlobalState(ExecutionContext* context);
Expand All @@ -107,19 +111,10 @@ class PhysicalOperator {
return result;
}

virtual unique_ptr<PhysicalOperator> clone() = 0;

virtual void printMetricsToJson(nlohmann::json& json, Profiler& profiler);
unordered_map<string, string> getProfilerKeyValAttributes(Profiler& profiler) const;
vector<string> getProfilerAttributes(Profiler& profiler) const;

virtual double getExecutionTime(Profiler& profiler) const;

inline uint64_t getNumOutputTuples(Profiler& profiler) const {
return profiler.sumAllNumericMetricsWithKey(getNumTupleMetricKey());
}

vector<string> getAttributes(Profiler& profiler) const;

inline string getParamsString() const { return paramsString; }
virtual unique_ptr<PhysicalOperator> clone() = 0;

protected:
virtual void initGlobalStateInternal(ExecutionContext* context) {}
Expand All @@ -132,7 +127,8 @@ class PhysicalOperator {

void registerProfilingMetrics(Profiler* profiler);

void printTimeAndNumOutputMetrics(nlohmann::json& json, Profiler& profiler);
double getExecutionTime(Profiler& profiler) const;
uint64_t getNumOutputTuples(Profiler& profiler) const;

protected:
uint32_t id;
Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/scan_node_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ class ScanNodeID : public PhysicalOperator {
nodeName{std::move(nodeName)}, outDataPos{outDataPos}, sharedState{
std::move(sharedState)} {}

bool isSource() const override { return true; }

inline string getNodeName() const { return nodeName; }
inline ScanNodeIDSharedState* getSharedState() const { return sharedState.get(); }

Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class Sink : public PhysicalOperator {
: PhysicalOperator{operatorType, std::move(child), id, paramsString},
resultSetDescriptor{std::move(resultSetDescriptor)} {}

inline bool isSink() const override { return true; }

ResultSetDescriptor* getResultSetDescriptor() { return resultSetDescriptor.get(); }

inline void execute(ResultSet* resultSet, ExecutionContext* context) {
Expand Down
6 changes: 2 additions & 4 deletions src/include/processor/operator/table_scan/base_table_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,15 @@ class BaseTableScan : public PhysicalOperator {
outVecPositions{std::move(outVecPositions)}, colIndicesToScan{
std::move(colIndicesToScan)} {}

inline bool isSource() const override { return true; }

virtual void setMaxMorselSize() = 0;
virtual unique_ptr<FTableScanMorsel> getMorsel() = 0;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;

inline double getExecutionTime(Profiler& profiler) const override {
return profiler.sumAllTimeMetricsWithKey(getTimeMetricKey());
}

protected:
uint64_t maxMorselSize;
vector<DataPos> outVecPositions;
Expand Down
1 change: 0 additions & 1 deletion src/include/processor/operator/table_scan/union_all_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class UnionAllScanSharedState {
};

class UnionAllScan : public BaseTableScan {

public:
UnionAllScan(vector<DataPos> outVecPositions, vector<uint32_t> colIndicesToScan,
shared_ptr<UnionAllScanSharedState> sharedState,
Expand Down
17 changes: 8 additions & 9 deletions src/main/plan_printer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void OpProfileTree::calculateNumRowsAndColsForOp(
uint32_t OpProfileTree::fillOpProfileBoxes(PhysicalOperator* op, uint32_t rowIdx, uint32_t colIdx,
uint32_t& maxFieldWidth, Profiler& profiler) {
auto opProfileBox = make_unique<OpProfileBox>(PlanPrinter::getOperatorName(op),
PlanPrinter::getOperatorParams(op), op->getAttributes(profiler));
PlanPrinter::getOperatorParams(op), op->getProfilerAttributes(profiler));
maxFieldWidth = max(opProfileBox->getAttributeMaxLen(), maxFieldWidth);
insertOpProfileBox(rowIdx, colIdx, move(opProfileBox));
if (!op->getNumChildren()) {
Expand Down Expand Up @@ -290,15 +290,14 @@ uint32_t OpProfileTree::calculateRowHeight(uint32_t rowIdx) const {

nlohmann::json PlanPrinter::toJson(PhysicalOperator* physicalOperator, Profiler& profiler) {
auto json = nlohmann::json();
json["name"] = getOperatorName(physicalOperator);
if (physicalOperator->getNumChildren()) {
json["prev"] = toJson(physicalOperator->getChild(0), profiler);
}
if (physicalOperator->getNumChildren() > 1) {
json["right"] = toJson(physicalOperator->getChild(1), profiler);
}
json["Name"] = getOperatorName(physicalOperator);
if (profiler.enabled) {
physicalOperator->printMetricsToJson(json, profiler);
for (auto& [key, val] : physicalOperator->getProfilerKeyValAttributes(profiler)) {
json[key] = val;
}
}
for (auto i = 0u; i < physicalOperator->getNumChildren(); ++i) {
json["Child"] = toJson(physicalOperator->getChild(i), profiler);
}
return json;
}
Expand Down
41 changes: 18 additions & 23 deletions src/processor/operator/physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,39 +184,34 @@ void PhysicalOperator::initLocalState(ResultSet* _resultSet, ExecutionContext* c
void PhysicalOperator::registerProfilingMetrics(Profiler* profiler) {
auto executionTime = profiler->registerTimeMetric(getTimeMetricKey());
auto numOutputTuple = profiler->registerNumericMetric(getNumTupleMetricKey());

metrics = make_unique<OperatorMetrics>(*executionTime, *numOutputTuple);
}

void PhysicalOperator::printMetricsToJson(nlohmann::json& json, Profiler& profiler) {
printTimeAndNumOutputMetrics(json, profiler);
double PhysicalOperator::getExecutionTime(Profiler& profiler) const {
auto executionTime = profiler.sumAllTimeMetricsWithKey(getTimeMetricKey());
if (!isSource()) {
executionTime -= profiler.sumAllTimeMetricsWithKey(children[0]->getTimeMetricKey());
}
return executionTime;
}

void PhysicalOperator::printTimeAndNumOutputMetrics(nlohmann::json& json, Profiler& profiler) {
double prevExecutionTime = 0.0;
if (getNumChildren()) {
prevExecutionTime = profiler.sumAllTimeMetricsWithKey(children[0]->getTimeMetricKey());
}
// Time metric measures execution time of the subplan under current operator (like a CDF).
// By subtracting prevOperator runtime, we get the runtime of current operator
auto executionTime = profiler.sumAllTimeMetricsWithKey(getTimeMetricKey()) - prevExecutionTime;
auto numOutputTuples = profiler.sumAllNumericMetricsWithKey(getNumTupleMetricKey());
json["executionTime"] = to_string(executionTime);
json["numOutputTuples"] = numOutputTuples;
uint64_t PhysicalOperator::getNumOutputTuples(Profiler& profiler) const {
return profiler.sumAllNumericMetricsWithKey(getNumTupleMetricKey());
}

double PhysicalOperator::getExecutionTime(Profiler& profiler) const {
double prevExecutionTime = 0.0;
for (auto i = 0u; i < getNumChildren(); i++) {
prevExecutionTime += profiler.sumAllTimeMetricsWithKey(children[i]->getTimeMetricKey());
}
return profiler.sumAllTimeMetricsWithKey(getTimeMetricKey()) - prevExecutionTime;
unordered_map<string, string> PhysicalOperator::getProfilerKeyValAttributes(
Profiler& profiler) const {
unordered_map<string, string> result;
result.insert({"ExecutionTime", to_string(getExecutionTime(profiler))});
result.insert({"NumOutputTuples", to_string(getNumOutputTuples(profiler))});
return result;
}

vector<string> PhysicalOperator::getAttributes(Profiler& profiler) const {
vector<string> PhysicalOperator::getProfilerAttributes(Profiler& profiler) const {
vector<string> result;
result.emplace_back("ExecutionTime: " + to_string(getExecutionTime(profiler)));
result.emplace_back("NumOutputTuples: " + to_string(getNumOutputTuples(profiler)));
for (auto& [key, val] : getProfilerKeyValAttributes(profiler)) {
result.emplace_back(key + ": " + val);
}
return result;
}

Expand Down
Loading