Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solve issue-1983 #2044

Merged
merged 1 commit into from
Sep 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/include/processor/operator/aggregate/base_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ class BaseAggregateSharedState {
};

class BaseAggregate : public Sink {
public:
bool containDistinctAggregate() const;

protected:
BaseAggregate(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::vector<std::unique_ptr<function::AggregateFunction>> aggregateFunctions,
Expand All @@ -38,12 +35,17 @@ class BaseAggregate : public Sink {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

inline bool canParallel() const final { return !containDistinctAggregate(); }

void finalize(ExecutionContext* context) override = 0;

std::vector<std::unique_ptr<function::AggregateFunction>> cloneAggFunctions();
std::vector<std::unique_ptr<AggregateInputInfo>> cloneAggInputInfos();
std::unique_ptr<PhysicalOperator> clone() override = 0;

private:
bool containDistinctAggregate() const;

protected:
std::vector<std::unique_ptr<function::AggregateFunction>> aggregateFunctions;
std::vector<std::unique_ptr<AggregateInputInfo>> aggregateInputInfos;
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/call/standalone_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class StandaloneCall : public PhysicalOperator {
operatorType, id, paramsString} {}

inline bool isSource() const override { return true; }
inline bool canParallel() const final { return false; }

bool getNextTuplesInternal(ExecutionContext* context) override;

Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/comment_on.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class CommentOn : public PhysicalOperator {
id, paramsString} {}

inline bool isSource() const override { return true; }
inline bool canParallel() const final { return false; }

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
outputVector = resultSet->getValueVector(commentOnInfo->outputPos).get();
Expand Down
3 changes: 2 additions & 1 deletion src/include/processor/operator/ddl/ddl.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class DDL : public PhysicalOperator {
: PhysicalOperator{operatorType, id, paramsString}, catalog{catalog}, outputPos{outputPos},
outputVector{nullptr}, hasExecuted{false} {}

inline bool isSource() const override { return true; }
inline bool isSource() const final { return true; }
inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/macro/create_macro.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class CreateMacro : public PhysicalOperator {
std::move(createMacroInfo)} {}

inline bool isSource() const override { return true; }
inline bool canParallel() const final { return false; }

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
outputVector = resultSet->getValueVector(createMacroInfo->outputPos).get();
Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/order_by/order_by_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class OrderByScan : public PhysicalOperator {
std::move(sharedState)} {}

inline bool isSource() const final { return true; }
// Ordered table should be scanned in single-thread mode.
inline bool canParallel() const final { return false; }

bool getNextTuplesInternal(ExecutionContext* context) final;

Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/order_by/top_k_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class TopKScan : public PhysicalOperator {
sharedState{std::move(sharedState)} {}

inline bool isSource() const final { return true; }
// Ordered table should be scanned in single-thread mode.
inline bool canParallel() const override { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/persistent/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class CopyTo : public PhysicalOperator {
sharedState{std::move(sharedState)}, vectorsToCopyPos{std::move(vectorsToCopyPos)},
copyDescription{copyDescription} {}

inline bool canParallel() const final { return false; }

bool getNextTuplesInternal(ExecutionContext* context) override;

common::CopyDescription& getCopyDescription() { return copyDescription; }
Expand Down
4 changes: 4 additions & 0 deletions src/include/processor/operator/persistent/delete.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class DeleteNode : public PhysicalOperator {
: PhysicalOperator{PhysicalOperatorType::DELETE_NODE, std::move(child), id, paramsString},
executors{std::move(executors)} {}

inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) final;
Expand All @@ -30,6 +32,8 @@ class DeleteRel : public PhysicalOperator {
: PhysicalOperator{PhysicalOperatorType::DELETE_REL, std::move(child), id, paramsString},
executors{std::move(executors)} {}

inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) final;
Expand Down
4 changes: 4 additions & 0 deletions src/include/processor/operator/persistent/insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class InsertNode : public PhysicalOperator {
: PhysicalOperator{PhysicalOperatorType::INSERT_NODE, std::move(child), id, paramsString},
executors{std::move(executors)} {}

inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) final;
Expand All @@ -33,6 +35,8 @@ class InsertRel : public PhysicalOperator {
: PhysicalOperator{PhysicalOperatorType::INSERT_REL, std::move(child), id, paramsString},
executors{std::move(executors)} {}

inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) final;
Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/operator/persistent/merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class Merge : public PhysicalOperator {
onMatchNodeSetExecutors{std::move(onMatchNodeSetExecutors)},
onMatchRelSetExecutors{std::move(onMatchRelSetExecutors)} {}

inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) final;
Expand Down
14 changes: 6 additions & 8 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,23 @@ class Reader : public PhysicalOperator {
sharedState{std::move(sharedState)}, dataChunk{nullptr},
nodeOffsetVector{nullptr}, readFunc{nullptr}, initFunc{nullptr}, readFuncData{nullptr} {}

inline bool isSource() const final { return true; }
inline bool canParallel() const final {
return !info->containsSerial &&
sharedState->copyDescription->fileType != common::CopyDescription::FileType::TURTLE;
}

void initGlobalStateInternal(ExecutionContext* context) final;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

inline bool isSource() const final { return true; }

inline ReaderInfo* getReaderInfo() const { return info.get(); }
inline ReaderSharedState* getSharedState() const { return sharedState.get(); }

inline std::unique_ptr<PhysicalOperator> clone() final {
return make_unique<Reader>(info->copy(), sharedState, getOperatorID(), paramsString);
}

inline bool isCopyTurtleFile() const {
return sharedState->copyDescription->fileType == common::CopyDescription::FileType::TURTLE;
}

inline bool getContainsSerial() const { return info->containsSerial; }

protected:
bool getNextTuplesInternal(ExecutionContext* context) final;

Expand Down
4 changes: 4 additions & 0 deletions src/include/processor/operator/persistent/set.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class SetNodeProperty : public PhysicalOperator {
paramsString},
executors{std::move(executors)} {}

inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) final;
Expand All @@ -35,6 +37,8 @@ class SetRelProperty : public PhysicalOperator {
paramsString},
executors{std::move(executors)} {}

inline bool canParallel() const final { return false; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) final;
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class PhysicalOperator {

inline virtual bool isSource() const { return false; }
inline virtual bool isSink() const { return false; }
inline virtual bool canParallel() const { return true; }

inline void addChild(std::unique_ptr<PhysicalOperator> op) {
children.push_back(std::move(op));
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Profile : public PhysicalOperator {
outputPos{outputPos}, info{std::move(info)}, localState{std::move(localState)} {}

inline bool isSource() const override { return true; }
inline bool canParallel() const final { return false; }

inline void setPhysicalPlan(PhysicalPlan* physicalPlan) { info.physicalPlan = physicalPlan; }

Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/operator/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ class Transaction : public PhysicalOperator {
: PhysicalOperator{PhysicalOperatorType::TRANSACTION, id, paramsString},
transactionAction{transactionAction}, hasExecuted{false} {}

inline bool isSource() const final { return true; }
inline bool canParallel() const final { return false; }

inline void initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) final {
hasExecuted = false;
}

bool getNextTuplesInternal(ExecutionContext* context) final;

bool isSource() const final { return true; }

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<Transaction>(transactionAction, id, paramsString);
}
Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ class QueryProcessor {
std::shared_ptr<FactorizedTable> execute(PhysicalPlan* physicalPlan, ExecutionContext* context);

private:
void decomposePlanIntoTasks(PhysicalOperator* op, PhysicalOperator* parent,
common::Task* parentTask, ExecutionContext* context);
void decomposePlanIntoTask(PhysicalOperator* op, common::Task* task, ExecutionContext* context);

void initTask(common::Task* task);

private:
std::unique_ptr<common::TaskScheduler> taskScheduler;
Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/processor_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace kuzu {
namespace processor {

class ProcessorTask : public common::Task {
friend class QueryProcessor;

public:
ProcessorTask(Sink* sink, ExecutionContext* executionContext)
: Task{executionContext->numThreads}, sink{sink}, executionContext{executionContext} {}
Expand Down
77 changes: 23 additions & 54 deletions src/processor/processor.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#include "processor/processor.h"

#include "processor/operator/aggregate/base_aggregate.h"
#include "processor/operator/persistent/copy.h"
#include "processor/operator/persistent/copy_node.h"
#include "processor/operator/persistent/reader.h"
#include "processor/operator/result_collector.h"
#include "processor/operator/sink.h"
#include "processor/processor_task.h"
Expand All @@ -30,69 +26,42 @@ std::shared_ptr<FactorizedTable> QueryProcessor::execute(
// prevOperator in the same pipeline, and decompose build and its prevOperator into another
// one.
auto task = std::make_shared<ProcessorTask>(resultCollector, context);
decomposePlanIntoTasks(lastOperator, nullptr, task.get(), context);
decomposePlanIntoTask(lastOperator->getChild(0), task.get(), context);
initTask(task.get());
taskScheduler->scheduleTaskAndWaitOrError(task, context);
return resultCollector->getResultFactorizedTable();
}

void QueryProcessor::decomposePlanIntoTasks(
PhysicalOperator* op, PhysicalOperator* parent, Task* parentTask, ExecutionContext* context) {
if (op->isSink() && parent != nullptr) {
void QueryProcessor::decomposePlanIntoTask(
PhysicalOperator* op, Task* task, ExecutionContext* context) {
if (op->isSink()) {
auto childTask = std::make_unique<ProcessorTask>(reinterpret_cast<Sink*>(op), context);
andyfengHKU marked this conversation as resolved.
Show resolved Hide resolved
if (op->getOperatorType() == PhysicalOperatorType::AGGREGATE) {
auto aggregate = (BaseAggregate*)op;
if (aggregate->containDistinctAggregate()) {
// Distinct aggregate should be executed in single-thread mode.
childTask->setSingleThreadedTask();
}
}
for (auto i = (int64_t)op->getNumChildren() - 1; i >= 0; --i) {
decomposePlanIntoTasks(op->getChild(i), op, childTask.get(), context);
decomposePlanIntoTask(op->getChild(i), childTask.get(), context);
}
parentTask->addChildTask(std::move(childTask));
task->addChildTask(std::move(childTask));
} else {
// Schedule the right most side (e.g., build side of the hash join) first.
for (auto i = (int64_t)op->getNumChildren() - 1; i >= 0; --i) {
decomposePlanIntoTasks(op->getChild(i), op, parentTask, context);
decomposePlanIntoTask(op->getChild(i), task, context);
}
}
switch (op->getOperatorType()) {
// Ordered table should be scanned in single-thread mode.
case PhysicalOperatorType::ORDER_BY_MERGE:
case PhysicalOperatorType::TOP_K:
// DDL should be executed exactly once.
case PhysicalOperatorType::CREATE_NODE_TABLE:
case PhysicalOperatorType::CREATE_REL_TABLE:
case PhysicalOperatorType::CREATE_RDF_GRAPH:
case PhysicalOperatorType::DROP_TABLE:
case PhysicalOperatorType::DROP_PROPERTY:
case PhysicalOperatorType::ADD_PROPERTY:
case PhysicalOperatorType::RENAME_PROPERTY:
case PhysicalOperatorType::RENAME_TABLE:
// As a temporary solution, update is executed in single thread mode.
case PhysicalOperatorType::SET_NODE_PROPERTY:
case PhysicalOperatorType::SET_REL_PROPERTY:
case PhysicalOperatorType::INSERT_NODE:
case PhysicalOperatorType::INSERT_REL:
case PhysicalOperatorType::DELETE_NODE:
case PhysicalOperatorType::DELETE_REL:
case PhysicalOperatorType::MERGE:
case PhysicalOperatorType::COPY_TO:
case PhysicalOperatorType::STANDALONE_CALL:
case PhysicalOperatorType::PROFILE:
case PhysicalOperatorType::CREATE_MACRO:
case PhysicalOperatorType::COMMENT_ON:
case PhysicalOperatorType::TRANSACTION: {
parentTask->setSingleThreadedTask();
} break;
case PhysicalOperatorType::READER: {
auto reader = (Reader*)op;
if (reader->getContainsSerial() || reader->isCopyTurtleFile()) {
parentTask->setSingleThreadedTask();
}

void QueryProcessor::initTask(Task* task) {
auto processorTask = reinterpret_cast<ProcessorTask*>(task);
PhysicalOperator* op = processorTask->sink;
while (!op->isSource()) {
if (!op->canParallel()) {
task->setSingleThreadedTask();
}
} break;
default:
break;
op = op->getChild(0);
}
if (!op->canParallel()) {
task->setSingleThreadedTask();
}
for (auto& child : task->children) {
initTask(child.get());
}
}

Expand Down