Skip to content

Commit

Permalink
Merge pull request #2577 from kuzudb/init-gloabl-state-within-pipeline
Browse files Browse the repository at this point in the history
Init global state within pipeline
  • Loading branch information
andyfengHKU committed Dec 13, 2023
2 parents 60ebb05 + 24ae736 commit ac33484
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 38 deletions.
4 changes: 2 additions & 2 deletions src/common/task_system/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ Task::Task(uint64_t maxNumThreads) : maxNumThreads{maxNumThreads} {}

bool Task::registerThread() {
lock_t lck{mtx};
if (!hasExceptionNoLock() && canRegisterInternalNoLock()) {
if (!hasExceptionNoLock() && canRegisterNoLock()) {
numThreadsRegistered++;
return true;
}
return false;
}

void Task::deRegisterThreadAndFinalizeTaskIfNecessary() {
void Task::deRegisterThreadAndFinalizeTask() {
lock_t lck{mtx};
++numThreadsFinished;
if (!hasExceptionNoLock() && isCompletedNoLock()) {
Expand Down
4 changes: 2 additions & 2 deletions src/common/task_system/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ void TaskScheduler::runWorkerThread() {
}
try {
scheduledTask->task->run();
scheduledTask->task->deRegisterThreadAndFinalizeTaskIfNecessary();
scheduledTask->task->deRegisterThreadAndFinalizeTask();
} catch (std::exception& e) {
scheduledTask->task->setException(std::current_exception());
scheduledTask->task->deRegisterThreadAndFinalizeTaskIfNecessary();
scheduledTask->task->deRegisterThreadAndFinalizeTask();
continue;
}
}
Expand Down
14 changes: 2 additions & 12 deletions src/include/common/task_system/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,6 @@ class Task {
children.push_back(std::move(child));
}

inline bool isCompletedOrHasException() {
lock_t lck{mtx};
return hasExceptionNoLock() || isCompletedNoLock();
}

inline bool isCompleted() {
lock_t lck{mtx};
return isCompletedNoLock();
}

inline bool isCompletedSuccessfully() {
lock_t lck{mtx};
return isCompletedNoLock() && !hasExceptionNoLock();
Expand All @@ -65,7 +55,7 @@ class Task {

bool registerThread();

void deRegisterThreadAndFinalizeTaskIfNecessary();
void deRegisterThreadAndFinalizeTask();

inline void setException(std::exception_ptr exceptionPtr) {
lock_t lck{mtx};
Expand All @@ -83,7 +73,7 @@ class Task {
}

private:
bool canRegisterInternalNoLock() const {
bool canRegisterNoLock() const {
return 0 == numThreadsFinished && maxNumThreads > numThreadsRegistered;
}

Expand Down
1 change: 0 additions & 1 deletion src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class CopyNodeSharedState {
common::vector_idx_t pkColumnIdx;
std::unique_ptr<common::LogicalType> pkType;
std::unique_ptr<storage::PrimaryKeyIndexBuilder> pkIndex;
bool isIndexReserved = false;

InQueryCallSharedState* readerSharedState;
HashAggregateSharedState* distinctSharedState;
Expand Down
4 changes: 3 additions & 1 deletion src/include/processor/processor_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class ProcessorTask : public common::Task {

public:
ProcessorTask(Sink* sink, ExecutionContext* executionContext)
: Task{executionContext->numThreads}, sink{sink}, executionContext{executionContext} {}
: Task{executionContext->numThreads}, sharedStateInitialized{false}, sink{sink},
executionContext{executionContext} {}

void run() override;
void finalizeIfNecessary() override;
Expand All @@ -21,6 +22,7 @@ class ProcessorTask : public common::Task {
Sink* op, storage::MemoryManager* memoryManager);

private:
bool sharedStateInitialized;
Sink* sink;
ExecutionContext* executionContext;
};
Expand Down
16 changes: 4 additions & 12 deletions src/processor/operator/persistent/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ void CopyNodeSharedState::init() {
auto indexFName = StorageUtils::getNodeIndexFName(
wal->getDirectory(), table->getTableID(), FileVersionType::ORIGINAL);
pkIndex = std::make_unique<PrimaryKeyIndexBuilder>(indexFName, *pkType);
uint64_t numRows = 0;
if (readerSharedState != nullptr) {
KU_ASSERT(distinctSharedState == nullptr);
uint64_t numRows = 0;
auto sharedState =
reinterpret_cast<function::ScanSharedState*>(readerSharedState->sharedState.get());
numRows = sharedState->numRows;
pkIndex->bulkReserve(numRows);
isIndexReserved = true;
} else {
numRows = distinctSharedState->getFactorizedTable()->getNumTuples();
}
pkIndex->bulkReserve(numRows);
}
wal->logCopyTableRecord(table->getTableID(), TableType::NODE);
wal->flushAllPages();
Expand Down Expand Up @@ -67,15 +68,6 @@ void CopyNode::initGlobalStateInternal(ExecutionContext* /*context*/) {
}

void CopyNode::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* /*context*/) {
if (sharedState->distinctSharedState != nullptr) {
std::unique_lock xLck{sharedState->mtx};
// TODO(Xiyang): we should init shared state after the previous pipeline finishes execution.
if (!sharedState->isIndexReserved) {
auto numRows = sharedState->distinctSharedState->getFactorizedTable()->getNumTuples();
sharedState->pkIndex->bulkReserve(numRows);
sharedState->isIndexReserved = true;
}
}
std::shared_ptr<DataChunkState> state;
for (auto& pos : info->columnPositions) {
if (pos.isValid()) {
Expand Down
7 changes: 2 additions & 5 deletions src/processor/operator/physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,8 @@ std::unique_ptr<PhysicalOperator> PhysicalOperator::moveUnaryChild() {
}

void PhysicalOperator::initGlobalState(ExecutionContext* context) {
// Init from right to left so that we init in the same order as we decompose.
// TODO(Xiyang): this is a very implicit assumption. We should init global state during
// decomposition ideally.
for (auto i = children.size(); i > 0; --i) {
children[i - 1]->initGlobalState(context);
if (!isSource()) {
children[0]->initGlobalState(context);
}
initGlobalStateInternal(context);
}
Expand Down
3 changes: 0 additions & 3 deletions src/processor/processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ QueryProcessor::QueryProcessor(uint64_t numThreads) {
std::shared_ptr<FactorizedTable> QueryProcessor::execute(
PhysicalPlan* physicalPlan, ExecutionContext* context) {
auto lastOperator = physicalPlan->lastOperator.get();
// Init global state before decompose into pipelines. Otherwise, each pipeline will try to
// init global state. Result in global state being initialized multiple times.
lastOperator->initGlobalState(context);
auto resultCollector = reinterpret_cast<ResultCollector*>(lastOperator);
// The root pipeline(task) consists of operators and its prevOperator only, because we
// expect to have linear plans. For binary operators, e.g., HashJoin, we keep probe and its
Expand Down
4 changes: 4 additions & 0 deletions src/processor/processor_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ void ProcessorTask::run() {
// We need the lock when cloning because multiple threads can be accessing to clone,
// which is not thread safe
lock_t lck{mtx};
if (!sharedStateInitialized) {
sink->initGlobalState(executionContext);
sharedStateInitialized = true;
}
auto clonedPipelineRoot = sink->clone();
lck.unlock();
auto currentSink = (Sink*)clonedPipelineRoot.get();
Expand Down

0 comments on commit ac33484

Please sign in to comment.