Skip to content

Commit

Permalink
Solve issue-1983
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Sep 16, 2023
1 parent 906cf62 commit b8a3ca9
Show file tree
Hide file tree
Showing 19 changed files with 69 additions and 70 deletions.
8 changes: 5 additions & 3 deletions src/include/processor/operator/aggregate/base_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ class BaseAggregateSharedState {
};

class BaseAggregate : public Sink {
public:
bool containDistinctAggregate() const;

protected:
BaseAggregate(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::vector<std::unique_ptr<function::AggregateFunction>> aggregateFunctions,
Expand All @@ -38,12 +35,17 @@ class BaseAggregate : public Sink {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

inline bool canParallel() const final { return !containDistinctAggregate(); }

void finalize(ExecutionContext* context) override = 0;

std::vector<std::unique_ptr<function::AggregateFunction>> cloneAggFunctions();
std::vector<std::unique_ptr<AggregateInputInfo>> cloneAggInputInfos();
std::unique_ptr<PhysicalOperator> clone() override = 0;

private:
bool containDistinctAggregate() const;

protected:
std::vector<std::unique_ptr<function::AggregateFunction>> aggregateFunctions;
std::vector<std::unique_ptr<AggregateInputInfo>> aggregateInputInfos;
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/call/standalone_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class StandaloneCall : public PhysicalOperator {
operatorType, id, paramsString} {}

inline bool isSource() const override { return true; }
inline bool canParallel() const final { return false; }

bool getNextTuplesInternal(ExecutionContext* context) override;

Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/comment_on.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class CommentOn : public PhysicalOperator {
id, paramsString} {}

inline bool isSource() const override { return true; }
inline bool canParallel() const final { return false; }

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
outputVector = resultSet->getValueVector(commentOnInfo->outputPos).get();
Expand Down
3 changes: 2 additions & 1 deletion src/include/processor/operator/ddl/ddl.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class DDL : public PhysicalOperator {
: PhysicalOperator{operatorType, id, paramsString}, catalog{catalog}, outputPos{outputPos},
outputVector{nullptr}, hasExecuted{false} {}

inline bool isSource() const override { return true; }
inline bool isSource() const final { return true; }
inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/macro/create_macro.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class CreateMacro : public PhysicalOperator {
std::move(createMacroInfo)} {}

inline bool isSource() const override { return true; }
inline bool canParallel() const final { return false; }

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
outputVector = resultSet->getValueVector(createMacroInfo->outputPos).get();
Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/order_by/order_by_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class OrderByScan : public PhysicalOperator {
std::move(sharedState)} {}

inline bool isSource() const final { return true; }
// Ordered table should be scanned in single-thread mode.
inline bool canParallel() const final { return false; }

bool getNextTuplesInternal(ExecutionContext* context) final;

Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/order_by/top_k_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class TopKScan : public PhysicalOperator {
sharedState{std::move(sharedState)} {}

inline bool isSource() const final { return true; }
// Ordered table should be scanned in single-thread mode.
inline bool canParallel() const override { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/persistent/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class CopyTo : public PhysicalOperator {
sharedState{std::move(sharedState)}, vectorsToCopyPos{std::move(vectorsToCopyPos)},
copyDescription{copyDescription} {}

inline bool canParallel() const final { return false; }

bool getNextTuplesInternal(ExecutionContext* context) override;

common::CopyDescription& getCopyDescription() { return copyDescription; }
Expand Down
4 changes: 4 additions & 0 deletions src/include/processor/operator/persistent/delete.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class DeleteNode : public PhysicalOperator {
: PhysicalOperator{PhysicalOperatorType::DELETE_NODE, std::move(child), id, paramsString},
executors{std::move(executors)} {}

inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) final;
Expand All @@ -30,6 +32,8 @@ class DeleteRel : public PhysicalOperator {
: PhysicalOperator{PhysicalOperatorType::DELETE_REL, std::move(child), id, paramsString},
executors{std::move(executors)} {}

inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) final;
Expand Down
4 changes: 4 additions & 0 deletions src/include/processor/operator/persistent/insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class InsertNode : public PhysicalOperator {
: PhysicalOperator{PhysicalOperatorType::INSERT_NODE, std::move(child), id, paramsString},
executors{std::move(executors)} {}

inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) final;
Expand All @@ -33,6 +35,8 @@ class InsertRel : public PhysicalOperator {
: PhysicalOperator{PhysicalOperatorType::INSERT_REL, std::move(child), id, paramsString},
executors{std::move(executors)} {}

inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) final;
Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/persistent/merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class Merge : public PhysicalOperator {
onMatchNodeSetExecutors{std::move(onMatchNodeSetExecutors)},
onMatchRelSetExecutors{std::move(onMatchRelSetExecutors)} {}

inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) final;
Expand Down
14 changes: 6 additions & 8 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,23 @@ class Reader : public PhysicalOperator {
sharedState{std::move(sharedState)}, dataChunk{nullptr},
nodeOffsetVector{nullptr}, readFunc{nullptr}, initFunc{nullptr}, readFuncData{nullptr} {}

inline bool isSource() const final { return true; }
inline bool canParallel() const final {
return !info->containsSerial &&
sharedState->copyDescription->fileType != common::CopyDescription::FileType::TURTLE;
}

void initGlobalStateInternal(ExecutionContext* context) final;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

inline bool isSource() const final { return true; }

inline ReaderInfo* getReaderInfo() const { return info.get(); }
inline ReaderSharedState* getSharedState() const { return sharedState.get(); }

inline std::unique_ptr<PhysicalOperator> clone() final {
return make_unique<Reader>(info->copy(), sharedState, getOperatorID(), paramsString);
}

inline bool isCopyTurtleFile() const {
return sharedState->copyDescription->fileType == common::CopyDescription::FileType::TURTLE;
}

inline bool getContainsSerial() const { return info->containsSerial; }

protected:
bool getNextTuplesInternal(ExecutionContext* context) final;

Expand Down
4 changes: 4 additions & 0 deletions src/include/processor/operator/persistent/set.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class SetNodeProperty : public PhysicalOperator {
paramsString},
executors{std::move(executors)} {}

inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) final;
Expand All @@ -35,6 +37,8 @@ class SetRelProperty : public PhysicalOperator {
paramsString},
executors{std::move(executors)} {}

inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) final;
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class PhysicalOperator {

inline virtual bool isSource() const { return false; }
inline virtual bool isSink() const { return false; }
inline virtual bool canParallel() const { return true; }

inline void addChild(std::unique_ptr<PhysicalOperator> op) {
children.push_back(std::move(op));
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Profile : public PhysicalOperator {
outputPos{outputPos}, info{std::move(info)}, localState{std::move(localState)} {}

inline bool isSource() const override { return true; }
inline bool canParallel() const final { return false; }

inline void setPhysicalPlan(PhysicalPlan* physicalPlan) { info.physicalPlan = physicalPlan; }

Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/operator/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ class Transaction : public PhysicalOperator {
: PhysicalOperator{PhysicalOperatorType::TRANSACTION, id, paramsString},
transactionAction{transactionAction}, hasExecuted{false} {}

inline bool isSource() const final { return true; }
inline bool canParallel() const final { return false; }

inline void initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) final {
hasExecuted = false;
}

bool getNextTuplesInternal(ExecutionContext* context) final;

bool isSource() const final { return true; }

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<Transaction>(transactionAction, id, paramsString);
}
Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ class QueryProcessor {
std::shared_ptr<FactorizedTable> execute(PhysicalPlan* physicalPlan, ExecutionContext* context);

private:
void decomposePlanIntoTasks(PhysicalOperator* op, PhysicalOperator* parent,
common::Task* parentTask, ExecutionContext* context);
void decomposePlanIntoTask(PhysicalOperator* op, common::Task* task, ExecutionContext* context);

void initTask(common::Task* task);

private:
std::unique_ptr<common::TaskScheduler> taskScheduler;
Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/processor_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace kuzu {
namespace processor {

class ProcessorTask : public common::Task {
friend class QueryProcessor;

public:
ProcessorTask(Sink* sink, ExecutionContext* executionContext)
: Task{executionContext->numThreads}, sink{sink}, executionContext{executionContext} {}
Expand Down
77 changes: 23 additions & 54 deletions src/processor/processor.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#include "processor/processor.h"

#include "processor/operator/aggregate/base_aggregate.h"
#include "processor/operator/persistent/copy.h"
#include "processor/operator/persistent/copy_node.h"
#include "processor/operator/persistent/reader.h"
#include "processor/operator/result_collector.h"
#include "processor/operator/sink.h"
#include "processor/processor_task.h"
Expand All @@ -30,69 +26,42 @@ std::shared_ptr<FactorizedTable> QueryProcessor::execute(
// prevOperator in the same pipeline, and decompose build and its prevOperator into another
// one.
auto task = std::make_shared<ProcessorTask>(resultCollector, context);
decomposePlanIntoTasks(lastOperator, nullptr, task.get(), context);
decomposePlanIntoTask(lastOperator->getChild(0), task.get(), context);
initTask(task.get());
taskScheduler->scheduleTaskAndWaitOrError(task, context);
return resultCollector->getResultFactorizedTable();
}

void QueryProcessor::decomposePlanIntoTasks(
PhysicalOperator* op, PhysicalOperator* parent, Task* parentTask, ExecutionContext* context) {
if (op->isSink() && parent != nullptr) {
void QueryProcessor::decomposePlanIntoTask(
PhysicalOperator* op, Task* task, ExecutionContext* context) {
if (op->isSink()) {
auto childTask = std::make_unique<ProcessorTask>(reinterpret_cast<Sink*>(op), context);
if (op->getOperatorType() == PhysicalOperatorType::AGGREGATE) {
auto aggregate = (BaseAggregate*)op;
if (aggregate->containDistinctAggregate()) {
// Distinct aggregate should be executed in single-thread mode.
childTask->setSingleThreadedTask();
}
}
for (auto i = (int64_t)op->getNumChildren() - 1; i >= 0; --i) {
decomposePlanIntoTasks(op->getChild(i), op, childTask.get(), context);
decomposePlanIntoTask(op->getChild(i), childTask.get(), context);
}
parentTask->addChildTask(std::move(childTask));
task->addChildTask(std::move(childTask));
} else {
// Schedule the right most side (e.g., build side of the hash join) first.
for (auto i = (int64_t)op->getNumChildren() - 1; i >= 0; --i) {
decomposePlanIntoTasks(op->getChild(i), op, parentTask, context);
decomposePlanIntoTask(op->getChild(i), task, context);
}
}
switch (op->getOperatorType()) {
// Ordered table should be scanned in single-thread mode.
case PhysicalOperatorType::ORDER_BY_MERGE:
case PhysicalOperatorType::TOP_K:
// DDL should be executed exactly once.
case PhysicalOperatorType::CREATE_NODE_TABLE:
case PhysicalOperatorType::CREATE_REL_TABLE:
case PhysicalOperatorType::CREATE_RDF_GRAPH:
case PhysicalOperatorType::DROP_TABLE:
case PhysicalOperatorType::DROP_PROPERTY:
case PhysicalOperatorType::ADD_PROPERTY:
case PhysicalOperatorType::RENAME_PROPERTY:
case PhysicalOperatorType::RENAME_TABLE:
// As a temporary solution, update is executed in single thread mode.
case PhysicalOperatorType::SET_NODE_PROPERTY:
case PhysicalOperatorType::SET_REL_PROPERTY:
case PhysicalOperatorType::INSERT_NODE:
case PhysicalOperatorType::INSERT_REL:
case PhysicalOperatorType::DELETE_NODE:
case PhysicalOperatorType::DELETE_REL:
case PhysicalOperatorType::MERGE:
case PhysicalOperatorType::COPY_TO:
case PhysicalOperatorType::STANDALONE_CALL:
case PhysicalOperatorType::PROFILE:
case PhysicalOperatorType::CREATE_MACRO:
case PhysicalOperatorType::COMMENT_ON:
case PhysicalOperatorType::TRANSACTION: {
parentTask->setSingleThreadedTask();
} break;
case PhysicalOperatorType::READER: {
auto reader = (Reader*)op;
if (reader->getContainsSerial() || reader->isCopyTurtleFile()) {
parentTask->setSingleThreadedTask();
}

void QueryProcessor::initTask(Task* task) {
auto processorTask = reinterpret_cast<ProcessorTask*>(task);
PhysicalOperator* op = processorTask->sink;
while (!op->isSource()) {
if (!op->canParallel()) {
task->setSingleThreadedTask();
}
} break;
default:
break;
op = op->getChild(0);
}
if (!op->canParallel()) {
task->setSingleThreadedTask();
}
for (auto& child : task->children) {
initTask(child.get());
}
}

Expand Down

0 comments on commit b8a3ca9

Please sign in to comment.