diff --git a/src/common/task_system/task.cpp b/src/common/task_system/task.cpp index c0873173ea..143d413926 100644 --- a/src/common/task_system/task.cpp +++ b/src/common/task_system/task.cpp @@ -7,14 +7,14 @@ Task::Task(uint64_t maxNumThreads) : maxNumThreads{maxNumThreads} {} bool Task::registerThread() { lock_t lck{mtx}; - if (!hasExceptionNoLock() && canRegisterInternalNoLock()) { + if (!hasExceptionNoLock() && canRegisterNoLock()) { numThreadsRegistered++; return true; } return false; } -void Task::deRegisterThreadAndFinalizeTaskIfNecessary() { +void Task::deRegisterThreadAndFinalizeTask() { lock_t lck{mtx}; ++numThreadsFinished; if (!hasExceptionNoLock() && isCompletedNoLock()) { diff --git a/src/common/task_system/task_scheduler.cpp b/src/common/task_system/task_scheduler.cpp index f9f7d18c0c..a01a411458 100644 --- a/src/common/task_system/task_scheduler.cpp +++ b/src/common/task_system/task_scheduler.cpp @@ -122,10 +122,10 @@ void TaskScheduler::runWorkerThread() { } try { scheduledTask->task->run(); - scheduledTask->task->deRegisterThreadAndFinalizeTaskIfNecessary(); + scheduledTask->task->deRegisterThreadAndFinalizeTask(); } catch (std::exception& e) { scheduledTask->task->setException(std::current_exception()); - scheduledTask->task->deRegisterThreadAndFinalizeTaskIfNecessary(); + scheduledTask->task->deRegisterThreadAndFinalizeTask(); continue; } } diff --git a/src/include/common/task_system/task.h b/src/include/common/task_system/task.h index 71abeb51a7..8e46df2b4c 100644 --- a/src/include/common/task_system/task.h +++ b/src/include/common/task_system/task.h @@ -42,16 +42,6 @@ class Task { children.push_back(std::move(child)); } - inline bool isCompletedOrHasException() { - lock_t lck{mtx}; - return hasExceptionNoLock() || isCompletedNoLock(); - } - - inline bool isCompleted() { - lock_t lck{mtx}; - return isCompletedNoLock(); - } - inline bool isCompletedSuccessfully() { lock_t lck{mtx}; return isCompletedNoLock() && !hasExceptionNoLock(); @@ -65,7 +55,7 @@ class Task { bool registerThread(); - void deRegisterThreadAndFinalizeTaskIfNecessary(); + void deRegisterThreadAndFinalizeTask(); inline void setException(std::exception_ptr exceptionPtr) { lock_t lck{mtx}; @@ -83,7 +73,7 @@ class Task { } private: - bool canRegisterInternalNoLock() const { + bool canRegisterNoLock() const { return 0 == numThreadsFinished && maxNumThreads > numThreadsRegistered; } diff --git a/src/include/processor/operator/persistent/copy_node.h b/src/include/processor/operator/persistent/copy_node.h index 6012970781..71665f339c 100644 --- a/src/include/processor/operator/persistent/copy_node.h +++ b/src/include/processor/operator/persistent/copy_node.h @@ -44,7 +44,6 @@ class CopyNodeSharedState { common::vector_idx_t pkColumnIdx; std::unique_ptr pkType; std::unique_ptr pkIndex; - bool isIndexReserved = false; InQueryCallSharedState* readerSharedState; HashAggregateSharedState* distinctSharedState; diff --git a/src/include/processor/processor_task.h b/src/include/processor/processor_task.h index 4a53bf88d3..957568d3a7 100644 --- a/src/include/processor/processor_task.h +++ b/src/include/processor/processor_task.h @@ -11,7 +11,8 @@ class ProcessorTask : public common::Task { public: ProcessorTask(Sink* sink, ExecutionContext* executionContext) - : Task{executionContext->numThreads}, sink{sink}, executionContext{executionContext} {} + : Task{executionContext->numThreads}, sharedStateInitialized{false}, sink{sink}, + executionContext{executionContext} {} void run() override; void finalizeIfNecessary() override; @@ -21,6 +22,7 @@ class ProcessorTask : public common::Task { Sink* op, storage::MemoryManager* memoryManager); private: + bool sharedStateInitialized; Sink* sink; ExecutionContext* executionContext; }; diff --git a/src/processor/operator/persistent/copy_node.cpp b/src/processor/operator/persistent/copy_node.cpp index 410d5e0cf1..1994179077 100644 --- a/src/processor/operator/persistent/copy_node.cpp +++ b/src/processor/operator/persistent/copy_node.cpp @@ -21,15 +21,16 @@ void CopyNodeSharedState::init() { auto indexFName = StorageUtils::getNodeIndexFName( wal->getDirectory(), table->getTableID(), FileVersionType::ORIGINAL); pkIndex = std::make_unique(indexFName, *pkType); + uint64_t numRows = 0; if (readerSharedState != nullptr) { KU_ASSERT(distinctSharedState == nullptr); - uint64_t numRows = 0; auto sharedState = reinterpret_cast(readerSharedState->sharedState.get()); numRows = sharedState->numRows; - pkIndex->bulkReserve(numRows); - isIndexReserved = true; + } else { + numRows = distinctSharedState->getFactorizedTable()->getNumTuples(); } + pkIndex->bulkReserve(numRows); } wal->logCopyTableRecord(table->getTableID(), TableType::NODE); wal->flushAllPages(); @@ -67,15 +68,6 @@ void CopyNode::initGlobalStateInternal(ExecutionContext* /*context*/) { } void CopyNode::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* /*context*/) { - if (sharedState->distinctSharedState != nullptr) { - std::unique_lock xLck{sharedState->mtx}; - // TODO(Xiyang): we should init shared state after the previous pipeline finishes execution. - if (!sharedState->isIndexReserved) { - auto numRows = sharedState->distinctSharedState->getFactorizedTable()->getNumTuples(); - sharedState->pkIndex->bulkReserve(numRows); - sharedState->isIndexReserved = true; - } - } std::shared_ptr state; for (auto& pos : info->columnPositions) { if (pos.isValid()) { diff --git a/src/processor/operator/physical_operator.cpp b/src/processor/operator/physical_operator.cpp index 98712f1a30..fa4bdc06bd 100644 --- a/src/processor/operator/physical_operator.cpp +++ b/src/processor/operator/physical_operator.cpp @@ -227,11 +227,8 @@ std::unique_ptr PhysicalOperator::moveUnaryChild() { } void PhysicalOperator::initGlobalState(ExecutionContext* context) { - // Init from right to left so that we init in the same order as we decompose. - // TODO(Xiyang): this is a very implicit assumption. We should init global state during - // decomposition ideally. - for (auto i = children.size(); i > 0; --i) { - children[i - 1]->initGlobalState(context); + if (!isSource()) { + children[0]->initGlobalState(context); } initGlobalStateInternal(context); } diff --git a/src/processor/processor.cpp b/src/processor/processor.cpp index 4f29980920..4cad2ebb89 100644 --- a/src/processor/processor.cpp +++ b/src/processor/processor.cpp @@ -17,9 +17,6 @@ QueryProcessor::QueryProcessor(uint64_t numThreads) { std::shared_ptr QueryProcessor::execute( PhysicalPlan* physicalPlan, ExecutionContext* context) { auto lastOperator = physicalPlan->lastOperator.get(); - // Init global state before decompose into pipelines. Otherwise, each pipeline will try to - // init global state. Result in global state being initialized multiple times. - lastOperator->initGlobalState(context); auto resultCollector = reinterpret_cast(lastOperator); // The root pipeline(task) consists of operators and its prevOperator only, because we // expect to have linear plans. For binary operators, e.g., HashJoin, we keep probe and its diff --git a/src/processor/processor_task.cpp b/src/processor/processor_task.cpp index 068ed0fdb3..47465e8739 100644 --- a/src/processor/processor_task.cpp +++ b/src/processor/processor_task.cpp @@ -9,6 +9,10 @@ void ProcessorTask::run() { // We need the lock when cloning because multiple threads can be accessing to clone, // which is not thread safe lock_t lck{mtx}; + if (!sharedStateInitialized) { + sink->initGlobalState(executionContext); + sharedStateInitialized = true; + } auto clonedPipelineRoot = sink->clone(); lck.unlock(); auto currentSink = (Sink*)clonedPipelineRoot.get();