Skip to content

Commit

Permalink
add isSource & isSink api to physical operator
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Dec 20, 2022
1 parent c76e011 commit d6868e7
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/include/processor/operator/ddl/ddl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class DDL : public PhysicalOperator {
: PhysicalOperator{operatorType, id, paramsString}, catalog{catalog} {}
virtual ~DDL() override = default;

inline bool isSource() const override { return true; }
inline bool isSource() const override { return true; }

virtual string execute() = 0;

Expand Down
3 changes: 2 additions & 1 deletion src/processor/operator/physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ uint64_t PhysicalOperator::getNumOutputTuples(Profiler& profiler) const {
return profiler.sumAllNumericMetricsWithKey(getNumTupleMetricKey());
}

unordered_map<string, string> PhysicalOperator::getProfilerKeyValAttributes(Profiler& profiler) const {
unordered_map<string, string> PhysicalOperator::getProfilerKeyValAttributes(
Profiler& profiler) const {
unordered_map<string, string> result;
result.insert({"ExecutionTime", to_string(getExecutionTime(profiler))});
result.insert({"NumOutputTuples", to_string(getNumOutputTuples(profiler))});
Expand Down
56 changes: 19 additions & 37 deletions src/processor/processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,59 +37,41 @@ shared_ptr<FactorizedTable> QueryProcessor::execute(
// prevOperator in the same pipeline, and decompose build and its prevOperator into another
// one.
auto task = make_shared<ProcessorTask>(resultCollector, context);
decomposePlanIntoTasks(lastOperator, lastOperator, task.get(), context);
decomposePlanIntoTasks(lastOperator, nullptr, task.get(), context);
taskScheduler->scheduleTaskAndWaitOrError(task);
return resultCollector->getResultFactorizedTable();
}
}

void QueryProcessor::decomposePlanIntoTasks(
PhysicalOperator* op, PhysicalOperator* parent, Task* parentTask, ExecutionContext* context) {
switch (op->getOperatorType()) {
case PhysicalOperatorType::RESULT_COLLECTOR: {
if (parent->getOperatorType() == PhysicalOperatorType::UNION_ALL_SCAN ||
parent->getOperatorType() == PhysicalOperatorType::FACTORIZED_TABLE_SCAN ||
parent->getOperatorType() == PhysicalOperatorType::HASH_JOIN_PROBE ||
parent->getOperatorType() == PhysicalOperatorType::INTERSECT ||
parent->getOperatorType() == PhysicalOperatorType::CROSS_PRODUCT) {
auto childTask = make_unique<ProcessorTask>(reinterpret_cast<Sink*>(op), context);
decomposePlanIntoTasks(op->getChild(0), op, childTask.get(), context);
parentTask->addChildTask(std::move(childTask));
} else {
decomposePlanIntoTasks(op->getChild(0), op, parentTask, context);
}
} break;
case PhysicalOperatorType::ORDER_BY_MERGE: {
auto childTask = make_unique<ProcessorTask>(reinterpret_cast<Sink*>(op), context);
decomposePlanIntoTasks(op->getChild(0), op, childTask.get(), context);
parentTask->addChildTask(std::move(childTask));
parentTask->setSingleThreadedTask();
} break;
case PhysicalOperatorType::ORDER_BY:
case PhysicalOperatorType::HASH_JOIN_BUILD:
case PhysicalOperatorType::INTERSECT_BUILD: {
if (op->isSink() && parent != nullptr) {
auto childTask = make_unique<ProcessorTask>(reinterpret_cast<Sink*>(op), context);
decomposePlanIntoTasks(op->getChild(0), op, childTask.get(), context);
parentTask->addChildTask(std::move(childTask));
} break;
case PhysicalOperatorType::AGGREGATE: {
auto aggregate = (BaseAggregate*)op;
auto childTask = make_unique<ProcessorTask>(aggregate, context);
if (aggregate->containDistinctAggregate()) {
childTask->setSingleThreadedTask();
if (op->getOperatorType() == PhysicalOperatorType::AGGREGATE) {
auto aggregate = (BaseAggregate*)op;
if (aggregate->containDistinctAggregate()) {
// Distinct aggregate should be executed in single-thread mode.
childTask->setSingleThreadedTask();
}
}
decomposePlanIntoTasks(op->getChild(0), op, childTask.get(), context);
parentTask->addChildTask(std::move(childTask));
} break;
case PhysicalOperatorType::INDEX_SCAN: {
parentTask->setSingleThreadedTask();
} break;
default: {
} else {
// Schedule the right most side (e.g., build side of the hash join) first.
for (auto i = (int64_t)op->getNumChildren() - 1; i >= 0; --i) {
decomposePlanIntoTasks(op->getChild(i), op, parentTask, context);
}
}
switch (op->getOperatorType()) {
// Ordered table should be scanned in single-thread mode.
case PhysicalOperatorType::ORDER_BY_MERGE:
// Index lookup should happen exactly once. We don't need to lock if index look is executed
// in single-thread mode.
case PhysicalOperatorType::INDEX_SCAN: {
parentTask->setSingleThreadedTask();
} break;
default:
break;
}
}

Expand Down

0 comments on commit d6868e7

Please sign in to comment.